#!/usr/bin/env python3
"""
Step1b.1 Bayesian ECM for PEM water electrolysis EIS.
This script fits frequency-resolved EIS data with two candidate equivalent circuits:
Model A: L_par + R_ohm + ZARC_1
Model B: L_par + R_ohm + ZARC_1 + ZARC_2
Why these models?
- L_par captures the high-frequency inductive hook that is typically caused by the
measurement setup (cables / fixture / leads), not by the cell electrochemistry itself.
- R_ohm captures the high-frequency ohmic contribution (membrane + contacts + current collectors).
- ZARC_1 / ZARC_2 are depressed semicircles that act as *operational proxies* for
electrochemical sub-processes. In PEMWE data these usually represent mixtures of
kinetics / ionic transport / mass transport rather than a unique one-to-one mechanism.
The script is intentionally written as a compact but readable research prototype:
- plain NumPy / SciPy / pandas / matplotlib only
- no heavy probabilistic programming dependency
- Bayesian uncertainty via adaptive random-walk Metropolis-Hastings
- WAIC-based model comparison
Input formats
-------------
1) KIT txt export:
columns are stored as:
Number Frequency/Hz Impedance/Ohm Phase/deg
2) CSV:
must contain:
frequency_Hz, Zre_ohm, Zim_ohm
Outputs
-------
- summary.json : posterior summaries, WAIC, fit metrics
- posterior_draws_*.csv : posterior samples for each model
- fit_nyquist_compare.png : Nyquist plot (data + both models)
- fit_bode_compare.png : Bode-like magnitude/phase plot (data + both models)
Notes
-----
This is a *single-spectrum* model. Time evolution / state-space coupling should be
built on top of these posterior summaries in the next step.
"""
from __future__ import annotations
import argparse
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Tuple
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.optimize import least_squares
from scipy.special import gammaln
# ---------------------------------------------------------------------------
# Small math helpers
# ---------------------------------------------------------------------------
def softplus(x: np.ndarray | float) -> np.ndarray | float:
"""Stable softplus transform used to map unconstrained -> strictly positive."""
return np.log1p(np.exp(-np.abs(x))) + np.maximum(x, 0)
def inv_softplus(y: float) -> float:
"""Inverse softplus for positive initialization."""
y = max(y, 1e-12)
return np.log(np.expm1(y))
def sigmoid(x: np.ndarray | float) -> np.ndarray | float:
"""Map unconstrained -> (0, 1), used for alpha parameters."""
return 1.0 / (1.0 + np.exp(-x))
def logit(p: float) -> float:
"""Inverse sigmoid for initialization."""
p = np.clip(p, 1e-8, 1.0 - 1e-8)
return np.log(p / (1.0 - p))
# ---------------------------------------------------------------------------
# Equivalent-circuit building blocks
# ---------------------------------------------------------------------------
def zarc(omega_rad_s: np.ndarray, resistance: float, tau_s: float, alpha: float) -> np.ndarray:
"""
Depressed semicircle element.
Z_ZARC = R / [1 + (j * omega * tau)^alpha]
alpha = 1.0 gives the ideal RC semicircle.
alpha < 1.0 gives a depressed semicircle, which is often needed for real EIS data.
"""
return resistance / (1.0 + (1j * omega_rad_s * tau_s) ** alpha)
def model_l_1zarc(freq_hz: np.ndarray, params: Dict[str, float]) -> np.ndarray:
"""L_par + R_ohm + ZARC_1."""
omega = 2.0 * np.pi * freq_hz
return 1j * omega * params["l_par"] + params["r_ohm"] + zarc(
omega, params["r1"], params["tau1"], params["alpha1"]
)
def model_l_2zarc(freq_hz: np.ndarray, params: Dict[str, float]) -> np.ndarray:
"""L_par + R_ohm + ZARC_1 + ZARC_2."""
omega = 2.0 * np.pi * freq_hz
return (
1j * omega * params["l_par"]
+ params["r_ohm"]
+ zarc(omega, params["r1"], params["tau1"], params["alpha1"])
+ zarc(omega, params["r2"], params["tau2"], params["alpha2"])
)
# Column order used when posterior draws are stored to CSV.
MODEL1_COLUMNS = ["l_par", "r_ohm", "r1", "tau1", "alpha1", "sigma_re", "sigma_im"]
MODEL2_COLUMNS = [
"l_par", "r_ohm", "r1", "tau1", "alpha1", "r2", "tau2", "alpha2", "sigma_re", "sigma_im"
]
# ---------------------------------------------------------------------------
# Parameter transforms:
# unconstrained theta -> physical parameters
# We keep the sampler in unconstrained space so proposals stay simple.
# ---------------------------------------------------------------------------
def unconstrained_to_model1(theta: np.ndarray) -> Dict[str, float]:
"""Convert unconstrained vector -> physical parameters for L + 1-ZARC."""
return {
"l_par": float(softplus(theta[0]) + 1e-15),
"r_ohm": float(softplus(theta[1]) + 1e-12),
"r1": float(softplus(theta[2]) + 1e-12),
"tau1": float(np.exp(theta[3])),
"alpha1": float(sigmoid(theta[4])),
"sigma_re": float(softplus(theta[5]) + 1e-9),
"sigma_im": float(softplus(theta[6]) + 1e-9),
}
def unconstrained_to_model2(theta: np.ndarray) -> Dict[str, float]:
"""Convert unconstrained vector -> physical parameters for L + 2-ZARC."""
return {
"l_par": float(softplus(theta[0]) + 1e-15),
"r_ohm": float(softplus(theta[1]) + 1e-12),
"r1": float(softplus(theta[2]) + 1e-12),
"tau1": float(np.exp(theta[3])),
"alpha1": float(sigmoid(theta[4])),
"r2": float(softplus(theta[5]) + 1e-12),
"tau2": float(np.exp(theta[6])),
"alpha2": float(sigmoid(theta[7])),
"sigma_re": float(softplus(theta[8]) + 1e-9),
"sigma_im": float(softplus(theta[9]) + 1e-9),
}
# ---------------------------------------------------------------------------
# Initialization
# ---------------------------------------------------------------------------
def initial_guess_model1(freq_hz: np.ndarray, z_re: np.ndarray, z_im: np.ndarray) -> np.ndarray:
"""
Crude but robust initialization for L + 1-ZARC.
We estimate:
- L_par from the highest-frequency imaginary component
- R_ohm from the minimum real part
- R1 from Nyquist span
- tau1 from a mid-band characteristic frequency
"""
idx_hf = np.argmax(freq_hz)
l0 = max(1e-12, abs(z_im[idx_hf]) / (2.0 * np.pi * freq_hz[idx_hf]))
r_ohm0 = max(1e-6, float(np.min(z_re)))
span = max(1e-6, float(np.max(z_re) - np.min(z_re)))
r10 = span
tau10 = 1.0 / (2.0 * np.pi * float(np.median(freq_hz)))
alpha10 = 0.80
sigma_re0 = max(1e-5, float(np.std(z_re)) * 0.03)
sigma_im0 = max(1e-5, float(np.std(z_im)) * 0.03)
return np.array([
inv_softplus(l0),
inv_softplus(r_ohm0),
inv_softplus(r10),
np.log(tau10),
logit(alpha10),
inv_softplus(sigma_re0),
inv_softplus(sigma_im0),
])
def initial_guess_model2(freq_hz: np.ndarray, z_re: np.ndarray, z_im: np.ndarray) -> np.ndarray:
"""
Initialization for L + 2-ZARC.
The total arc span is split into two parts with separated characteristic frequencies.
"""
idx_hf = np.argmax(freq_hz)
l0 = max(1e-12, abs(z_im[idx_hf]) / (2.0 * np.pi * freq_hz[idx_hf]))
r_ohm0 = max(1e-6, float(np.min(z_re)))
span = max(1e-6, float(np.max(z_re) - np.min(z_re)))
r10 = 0.6 * span
r20 = 0.4 * span
tau10 = 1.0 / (2.0 * np.pi * float(np.quantile(freq_hz, 0.75)))
tau20 = 1.0 / (2.0 * np.pi * float(np.quantile(freq_hz, 0.25)))
alpha10 = 0.88
alpha20 = 0.75
sigma_re0 = max(1e-5, float(np.std(z_re)) * 0.03)
sigma_im0 = max(1e-5, float(np.std(z_im)) * 0.03)
return np.array([
inv_softplus(l0),
inv_softplus(r_ohm0),
inv_softplus(r10),
np.log(tau10),
logit(alpha10),
inv_softplus(r20),
np.log(tau20),
logit(alpha20),
inv_softplus(sigma_re0),
inv_softplus(sigma_im0),
])
# ---------------------------------------------------------------------------
# Deterministic residuals for least-squares warm start
# ---------------------------------------------------------------------------
def residuals_model1(theta: np.ndarray, freq_hz: np.ndarray, z_re: np.ndarray, z_im: np.ndarray) -> np.ndarray:
"""
Residual vector for least-squares warm start.
We normalize by a global scale so real and imaginary parts contribute comparably.
"""
z_hat = model_l_1zarc(freq_hz, unconstrained_to_model1(theta))
scale = max(float(np.std(np.r_[z_re, z_im])), 1e-6)
return np.r_[(np.real(z_hat) - z_re) / scale, (np.imag(z_hat) - z_im) / scale]
def residuals_model2(theta: np.ndarray, freq_hz: np.ndarray, z_re: np.ndarray, z_im: np.ndarray) -> np.ndarray:
"""Residual vector for least-squares warm start of L + 2-ZARC."""
z_hat = model_l_2zarc(freq_hz, unconstrained_to_model2(theta))
scale = max(float(np.std(np.r_[z_re, z_im])), 1e-6)
return np.r_[(np.real(z_hat) - z_re) / scale, (np.imag(z_hat) - z_im) / scale]
# ---------------------------------------------------------------------------
# Bayesian pieces
# ---------------------------------------------------------------------------
def student_t_logpdf(x: np.ndarray, mu: np.ndarray, sigma: np.ndarray, nu: float) -> np.ndarray:
"""
Elementwise Student-t log-likelihood.
Why Student-t instead of Gaussian?
- EIS spectra often contain mild outliers at very high frequencies
- setup artefacts can make a Gaussian likelihood too brittle
"""
z = (x - mu) / sigma
return (
gammaln((nu + 1.0) / 2.0)
- gammaln(nu / 2.0)
- 0.5 * np.log(nu * np.pi)
- np.log(sigma)
- ((nu + 1.0) / 2.0) * np.log1p((z ** 2) / nu)
)
def log_prior_model1(theta: np.ndarray) -> float:
"""
Weakly informative prior in unconstrained space.
We intentionally keep L_par on a somewhat tighter prior than other parameters because:
- high-frequency inductive artefacts are usually setup-dependent
- L_par otherwise confounds with the HF edge of ZARC_1 and destabilizes interpretation
"""
return float(
-0.5 * (theta[0] / 2.0) ** 2 - np.log(2.0)
-0.5 * np.sum((theta[1:5] / 3.0) ** 2) - 4.0 * np.log(3.0)
-0.5 * np.sum((theta[5:] / 2.0) ** 2) - 2.0 * np.log(2.0)
)
def log_prior_model2(theta: np.ndarray) -> float:
"""Weakly informative prior for L + 2-ZARC."""
return float(
-0.5 * (theta[0] / 2.0) ** 2 - np.log(2.0)
-0.5 * np.sum((theta[1:8] / 3.0) ** 2) - 7.0 * np.log(3.0)
-0.5 * np.sum((theta[8:] / 2.0) ** 2) - 2.0 * np.log(2.0)
)
def pointwise_loglik_model1(theta: np.ndarray, freq_hz: np.ndarray, z_re: np.ndarray, z_im: np.ndarray, nu: float = 4.0) -> np.ndarray:
"""
Pointwise log-likelihood for WAIC.
We keep one contribution per observed frequency *and* per channel (Re / Im).
"""
params = unconstrained_to_model1(theta)
z_hat = model_l_1zarc(freq_hz, params)
ll_re = student_t_logpdf(z_re, np.real(z_hat), np.full_like(z_re, params["sigma_re"]), nu)
ll_im = student_t_logpdf(z_im, np.imag(z_hat), np.full_like(z_im, params["sigma_im"]), nu)
return ll_re + ll_im
def pointwise_loglik_model2(theta: np.ndarray, freq_hz: np.ndarray, z_re: np.ndarray, z_im: np.ndarray, nu: float = 4.0) -> np.ndarray:
"""Pointwise log-likelihood for L + 2-ZARC."""
params = unconstrained_to_model2(theta)
z_hat = model_l_2zarc(freq_hz, params)
ll_re = student_t_logpdf(z_re, np.real(z_hat), np.full_like(z_re, params["sigma_re"]), nu)
ll_im = student_t_logpdf(z_im, np.imag(z_hat), np.full_like(z_im, params["sigma_im"]), nu)
return ll_re + ll_im
def log_posterior_model1(theta: np.ndarray, freq_hz: np.ndarray, z_re: np.ndarray, z_im: np.ndarray) -> float:
"""Full log-posterior for model 1."""
return float(log_prior_model1(theta) + np.sum(pointwise_loglik_model1(theta, freq_hz, z_re, z_im)))
def log_posterior_model2(theta: np.ndarray, freq_hz: np.ndarray, z_re: np.ndarray, z_im: np.ndarray) -> float:
"""Full log-posterior for model 2."""
return float(log_prior_model2(theta) + np.sum(pointwise_loglik_model2(theta, freq_hz, z_re, z_im)))
def adaptive_random_walk_mh(
logposterior: Callable[[np.ndarray], float],
initial_theta: np.ndarray,
n_samples: int,
burn_in: int,
seed: int = 42,
) -> Tuple[np.ndarray, float]:
"""
Adaptive random-walk Metropolis-Hastings sampler.
This is not meant to compete with HMC/NUTS. The goal is:
- zero heavy dependency
- reproducible uncertainty estimates for a compact prototype
- easy hand-off to future reimplementation in PyMC / Stan if desired
"""
rng = np.random.default_rng(seed)
dim = len(initial_theta)
# Start with a small isotropic proposal covariance.
proposal_cov = np.eye(dim) * 0.03
theta = initial_theta.copy()
logp = logposterior(theta)
total_iters = burn_in + n_samples
draws = np.zeros((n_samples, dim))
accepted = 0
out_idx = 0
running_mean = theta.copy()
running_cov = np.eye(dim) * 1e-3
for i in range(total_iters):
proposal = rng.multivariate_normal(theta, proposal_cov)
logp_prop = logposterior(proposal)
if np.log(rng.uniform()) < (logp_prop - logp):
theta = proposal
logp = logp_prop
accepted += 1
# During burn-in adapt the proposal covariance.
if i < burn_in:
eta = 1.0 / (i + 2.0)
delta = theta - running_mean
running_mean = running_mean + eta * delta
running_cov = (1.0 - eta) * running_cov + eta * np.outer(delta, delta)
if (i + 1) % 25 == 0 and i > 50:
acceptance_rate = accepted / (i + 1)
# Aim for a modest RW-MH acceptance rate near 0.234.
multiplier = np.exp(np.clip(acceptance_rate - 0.234, -0.2, 0.2))
proposal_cov = (2.38 ** 2 / dim) * (running_cov + 1e-6 * np.eye(dim)) * multiplier
else:
draws[out_idx] = theta
out_idx += 1
return draws, accepted / total_iters
def waic_from_pointwise_loglik(pointwise_ll_draws: np.ndarray) -> Tuple[float, float, float]:
"""
Compute WAIC from posterior draws of pointwise log-likelihood.
Returns
-------
waic, lppd, p_waic
"""
max_ll = np.max(pointwise_ll_draws, axis=0)
lppd = np.sum(max_ll + np.log(np.mean(np.exp(pointwise_ll_draws - max_ll), axis=0)))
p_waic = np.sum(np.var(pointwise_ll_draws, axis=0, ddof=1))
waic = -2.0 * (lppd - p_waic)
return float(waic), float(lppd), float(p_waic)
# ---------------------------------------------------------------------------
# Utility / reporting
# ---------------------------------------------------------------------------
def summarize_posterior(df: pd.DataFrame) -> Dict[str, Dict[str, float]]:
"""Median / mean / sd / 5th / 95th percentile summary for every posterior column."""
out: Dict[str, Dict[str, float]] = {}
for col in df.columns:
values = df[col].to_numpy()
out[col] = {
"median": float(np.median(values)),
"mean": float(np.mean(values)),
"sd": float(np.std(values, ddof=1)),
"q05": float(np.quantile(values, 0.05)),
"q95": float(np.quantile(values, 0.95)),
}
return out
def r_squared(y: np.ndarray, y_hat: np.ndarray) -> float:
"""Plain R² helper."""
ss_res = float(np.sum((y - y_hat) ** 2))
ss_tot = float(np.sum((y - np.mean(y)) ** 2))
return float(1.0 - ss_res / ss_tot) if ss_tot > 1e-15 else np.nan
@dataclass
class FitResult:
"""Container used internally and for report export."""
name: str
acceptance_rate: float
waic: float
lppd: float
p_waic: float
summary: Dict[str, Dict[str, float]]
fitted_curve: np.ndarray
metrics: Dict[str, float]
draws: pd.DataFrame
pointwise_loglik: np.ndarray
# ---------------------------------------------------------------------------
# Main fitting routine
# ---------------------------------------------------------------------------
def fit_single_spectrum(
freq_hz: np.ndarray,
z_re: np.ndarray,
z_im: np.ndarray,
seed: int = 42,
n_samples_l1: int = 3500,
burn_l1: int = 2000,
n_samples_l2: int = 4500,
burn_l2: int = 2500,
) -> Tuple[FitResult, FitResult]:
"""
Fit both candidate models to a single spectrum.
Workflow:
1) least-squares warm start
2) Bayesian RW-MH posterior sampling
3) WAIC calculation
4) posterior-median fitted curve export
"""
# ---- Model 1: L + 1-ZARC
theta0_l1 = least_squares(
residuals_model1,
x0=initial_guess_model1(freq_hz, z_re, z_im),
args=(freq_hz, z_re, z_im),
max_nfev=4000,
).x
draws_l1, acc_l1 = adaptive_random_walk_mh(
lambda th: log_posterior_model1(th, freq_hz, z_re, z_im),
theta0_l1,
n_samples=n_samples_l1,
burn_in=burn_l1,
seed=seed,
)
df_l1 = pd.DataFrame([unconstrained_to_model1(theta) for theta in draws_l1], columns=MODEL1_COLUMNS)
ll_l1 = np.vstack([pointwise_loglik_model1(theta, freq_hz, z_re, z_im) for theta in draws_l1])
waic_l1, lppd_l1, pwaic_l1 = waic_from_pointwise_loglik(ll_l1)
med_l1 = {name: float(np.median(df_l1[name])) for name in MODEL1_COLUMNS}
z_hat_l1 = model_l_1zarc(freq_hz, med_l1)
result_l1 = FitResult(
name="L+1-ZARC",
acceptance_rate=acc_l1,
waic=waic_l1,
lppd=lppd_l1,
p_waic=pwaic_l1,
summary=summarize_posterior(df_l1),
fitted_curve=z_hat_l1,
metrics={
"R2_Re": r_squared(z_re, np.real(z_hat_l1)),
"R2_minusIm": r_squared(-z_im, -np.imag(z_hat_l1)),
"complex_RMSE": float(np.sqrt(np.mean(np.abs((z_re + 1j * z_im) - z_hat_l1) ** 2))),
},
draws=df_l1,
pointwise_loglik=ll_l1,
)
# ---- Model 2: L + 2-ZARC
theta0_l2 = least_squares(
residuals_model2,
x0=initial_guess_model2(freq_hz, z_re, z_im),
args=(freq_hz, z_re, z_im),
max_nfev=6000,
).x
draws_l2, acc_l2 = adaptive_random_walk_mh(
lambda th: log_posterior_model2(th, freq_hz, z_re, z_im),
theta0_l2,
n_samples=n_samples_l2,
burn_in=burn_l2,
seed=seed + 1,
)
df_l2 = pd.DataFrame([unconstrained_to_model2(theta) for theta in draws_l2], columns=MODEL2_COLUMNS)
ll_l2 = np.vstack([pointwise_loglik_model2(theta, freq_hz, z_re, z_im) for theta in draws_l2])
waic_l2, lppd_l2, pwaic_l2 = waic_from_pointwise_loglik(ll_l2)
med_l2 = {name: float(np.median(df_l2[name])) for name in MODEL2_COLUMNS}
z_hat_l2 = model_l_2zarc(freq_hz, med_l2)
result_l2 = FitResult(
name="L+2-ZARC",
acceptance_rate=acc_l2,
waic=waic_l2,
lppd=lppd_l2,
p_waic=pwaic_l2,
summary=summarize_posterior(df_l2),
fitted_curve=z_hat_l2,
metrics={
"R2_Re": r_squared(z_re, np.real(z_hat_l2)),
"R2_minusIm": r_squared(-z_im, -np.imag(z_hat_l2)),
"complex_RMSE": float(np.sqrt(np.mean(np.abs((z_re + 1j * z_im) - z_hat_l2) ** 2))),
},
draws=df_l2,
pointwise_loglik=ll_l2,
)
return result_l1, result_l2
# ---------------------------------------------------------------------------
# Plotting
# ---------------------------------------------------------------------------
def plot_model_comparison(
freq_hz: np.ndarray,
z_re: np.ndarray,
z_im: np.ndarray,
results: List[FitResult],
output_prefix: Path,
) -> None:
"""Save Nyquist and Bode comparison plots."""
# ---- Nyquist
plt.figure(figsize=(6.2, 4.8))
plt.plot(z_re, -z_im, "o", ms=3, label="data")
for result in results:
z_hat = result.fitted_curve
plt.plot(np.real(z_hat), -np.imag(z_hat), lw=2, label=result.name)
plt.xlabel(r"$Z^\prime$ (Ohm)")
plt.ylabel(r"$-Z^{\prime\prime}$ (Ohm)")
plt.legend()
plt.tight_layout()
plt.savefig(output_prefix.with_name(output_prefix.name + "_nyquist_compare.png"), dpi=180)
plt.close()
# ---- Bode-like magnitude / phase
z_data = z_re + 1j * z_im
magnitude = np.abs(z_data)
phase_deg = np.degrees(np.angle(z_data))
fig, axes = plt.subplots(2, 1, figsize=(7.0, 5.6), sharex=True)
axes[0].semilogx(freq_hz, magnitude, "o", ms=3, label="data")
axes[1].semilogx(freq_hz, phase_deg, "o", ms=3, label="data")
for result in results:
z_hat = result.fitted_curve
axes[0].semilogx(freq_hz, np.abs(z_hat), lw=2, label=result.name)
axes[1].semilogx(freq_hz, np.degrees(np.angle(z_hat)), lw=2, label=result.name)
axes[0].set_ylabel("|Z| (Ohm)")
axes[1].set_ylabel("Phase (deg)")
axes[1].set_xlabel("Frequency (Hz)")
axes[0].legend(fontsize=8)
plt.tight_layout()
plt.savefig(output_prefix.with_name(output_prefix.name + "_bode_compare.png"), dpi=180)
plt.close()
# ---------------------------------------------------------------------------
# Input readers
# ---------------------------------------------------------------------------
def load_kit_txt(path: str | Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Dict[str, float]]:
"""
Parse the KIT txt export used in the uploaded dataset.
Returns
-------
freq_hz, z_re, z_im, metadata
"""
text = Path(path).read_text(encoding="utf-8", errors="ignore").replace("\r", "")
metadata: Dict[str, float] = {}
match = re.search(r"Potential\.\.\:\s*([0-9.]+)V", text)
if match:
metadata["potential_V"] = float(match.group(1))
match = re.search(r"Current\.\.\.\.\:\s*([0-9.]+)A,\s*Ampl:\s*([0-9.]+)mA", text)
if match:
metadata["current_A"] = float(match.group(1))
metadata["ampl_mA"] = float(match.group(2))
lines = text.splitlines()
start_idx = None
for idx, line in enumerate(lines):
if "Frequency/Hz" in line and "Impedance/Ohm" in line and "Phase/deg" in line:
start_idx = idx + 1
break
if start_idx is None:
raise ValueError("Could not find data table header in KIT txt file.")
rows: List[Tuple[int, float, float, float]] = []
for line in lines[start_idx:]:
if not line.strip():
continue
parts = line.split()
if len(parts) < 4:
continue
try:
number = int(parts[0])
freq_hz = float(parts[1])
impedance_ohm = float(parts[2])
phase_deg = float(parts[3])
rows.append((number, freq_hz, impedance_ohm, phase_deg))
except ValueError:
continue
df = pd.DataFrame(rows, columns=["Number", "Frequency_Hz", "Impedance_Ohm", "Phase_deg"])
# Sort from high to low frequency for plotting consistency.
df = df.sort_values("Frequency_Hz", ascending=False).reset_index(drop=True)
phase_rad = np.deg2rad(df["Phase_deg"].to_numpy())
z_mag = df["Impedance_Ohm"].to_numpy()
z_re = z_mag * np.cos(phase_rad)
z_im = z_mag * np.sin(phase_rad)
return df["Frequency_Hz"].to_numpy(), z_re, z_im, metadata
def load_csv(path: str | Path) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Dict[str, float]]:
"""Load a simple frequency-resolved CSV."""
df = pd.read_csv(path)
required = {"frequency_Hz", "Zre_ohm", "Zim_ohm"}
if not required.issubset(df.columns):
raise ValueError(f"CSV must contain columns {sorted(required)}; got {list(df.columns)}")
return (
df["frequency_Hz"].to_numpy(),
df["Zre_ohm"].to_numpy(),
df["Zim_ohm"].to_numpy(),
{},
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Step1b.1 Bayesian ECM fit for PEMWE EIS.")
parser.add_argument("--input", required=True, help="KIT txt file or CSV with frequency_Hz, Zre_ohm, Zim_ohm")
parser.add_argument("--outdir", required=True, help="Output directory")
parser.add_argument("--seed", type=int, default=42, help="Random seed")
args = parser.parse_args()
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
input_path = Path(args.input)
if input_path.suffix.lower() == ".txt":
freq_hz, z_re, z_im, metadata = load_kit_txt(input_path)
else:
freq_hz, z_re, z_im, metadata = load_csv(input_path)
result_l1, result_l2 = fit_single_spectrum(freq_hz, z_re, z_im, seed=args.seed)
plot_model_comparison(freq_hz, z_re, z_im, [result_l1, result_l2], outdir / "fit")
preferred = "L+1-ZARC" if result_l1.waic < result_l2.waic else "L+2-ZARC"
summary = {
"file": input_path.name,
"meta": metadata,
"n_points": int(len(freq_hz)),
"waic": {
"L+1-ZARC": result_l1.waic,
"L+2-ZARC": result_l2.waic,
"preferred": preferred,
},
"metrics": {
"L+1-ZARC": result_l1.metrics,
"L+2-ZARC": result_l2.metrics,
},
"params_L1ZARC": result_l1.summary,
"params_L2ZARC": result_l2.summary,
"acceptance": {
"L+1-ZARC": result_l1.acceptance_rate,
"L+2-ZARC": result_l2.acceptance_rate,
},
}
with open(outdir / "summary.json", "w", encoding="utf-8") as f:
json.dump(summary, f, indent=2)
# Save raw posterior draws for downstream reporting and state-space coupling.
result_l1.draws.to_csv(outdir / "posterior_draws_L1ZARC.csv", index=False)
result_l2.draws.to_csv(outdir / "posterior_draws_L2ZARC.csv", index=False)
if __name__ == "__main__":
main()