#!/usr/bin/env python3
"""
Integrated 0D + Step1b.1 Bayesian state-space prototype for PEMWE EIS data.
What this script does
---------------------
1) Load PEMWE EIS spectra (KIT txt export format).
2) Fit each spectrum with the Step1b.1 equivalent circuit:
L_par + R_ohm + ZARC_1 + ZARC_2
using the previously developed Bayesian single-spectrum fitter.
3) Convert the Step1b.1 posterior draws into *area-specific* feature observations:
ASR = R_ohm * A_cell
R_non = (R_HF + R_LF) * A_cell
R_LF = R_LF * A_cell
where HF/LF are defined by ordering the two ZARCs by characteristic time tau.
4) Build a compact 0D PEMWE voltage model whose latent health states are:
x_t = [log(ASR_t), log(R_non,t), log(R_LF,t)]
5) Combine the Step1b.1 feature observations and the measured polarization points
inside a Bayesian state-space model:
state transition : local-level random walk
observation (EIS) : noisy log-observations of latent states
observation (Vcell): nonlinear 0D voltage equation
6) Infer the MAP estimate and a Laplace-approximated posterior:
- MAP via scipy.optimize.minimize
- posterior covariance via finite-difference Hessian inversion
- posterior draws via multivariate-normal approximation around the MAP
Why Laplace here?
-----------------
The Stage1b.1 layer already uses MCMC and provides posterior uncertainty per spectrum.
For the *integrated* layer we only have four operating points in the supplied KIT Figure 5 data.
A full HMC / particle MCMC implementation is possible, but for this small-data prototype
a MAP + Laplace approximation is a pragmatic and transparent way to:
- keep the code lightweight,
- obtain credible intervals,
- preserve a Bayesian interpretation,
- and make the state-space structure explicit.
Important modeling decision
---------------------------
We deliberately use:
R_non = R_HF + R_LF
instead of using R_HF alone as the kinetic proxy.
Reason:
- In the supplied data, the split between the two ZARCs is not equally stable at all currents.
- The total non-ohmic resistance is more robust than the raw HF/LF split.
- The LF branch is still retained separately as a transport-sensitive proxy.
This avoids over-interpreting the internal split of the two ZARCs while still preserving
transport information.
Outputs
-------
- stage1_features_from_step1b1.csv
- posterior_state_summary.csv
- global_parameter_summary.json
- feature_smoothing.png
- voltage_fit_and_decomposition.png
- effective_states.png
- integrated_summary.json
Usage
-----
python integrated_0d_step1b1_state_space.py \
--inputs Fig5_750mAcm-2.txt Fig5_1000mAcm-2.txt Fig5_1500mAcm-2.txt Fig5_2000mAcm-2.txt \
--outdir integrated_0d_step1b1_results
"""
from __future__ import annotations
import argparse
import json
import math
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Sequence, Tuple
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.optimize import minimize
# Reuse the detailed single-spectrum Step1b.1 implementation.
# This keeps the integrated script focused on the state-space layer rather than
# re-implementing the single-spectrum EIS fitting code.
import bayesian_ecm_step1b1_commented as step1b1
# ---------------------------------------------------------------------------
# Physical constants used in the 0D proxy model
# ---------------------------------------------------------------------------
R_GAS = 8.31446261815324 # J mol^-1 K^-1
FARADAY = 96485.33212 # C mol^-1
# We use a representative PEMWE operating temperature as a reference temperature
# for the compact 0D voltage mapping.
#
# Why is this acceptable here?
# - The supplied Figure 5 data are not a time-resolved thermal experiment.
# - The integrated prototype focuses on structural coupling of EIS states and voltage.
# - Any mismatch in absolute temperature is partially absorbed by v_base and the scale
# parameters kappa_act / kappa_diff.
#
# For a richer PoC with explicit thermal dynamics, T should become a latent state
# driven by a lumped energy balance.
T_REF_K = 353.15
# Effective coefficients used in the compact 0D mapping.
# These are not claimed to be exact mechanistic constants; they are chosen so the
# voltage decomposition keeps the familiar Butler-Volmer / transport form while
# leaving scale mismatch to the calibrated kappa parameters.
ALPHA_EFF = 0.5
N_EFF = 2.0
# Voltage prefactors for activation and transport terms.
B_ACT = 2.0 * R_GAS * T_REF_K / (ALPHA_EFF * N_EFF * FARADAY)
B_DIFF = R_GAS * T_REF_K / (N_EFF * FARADAY)
# ---------------------------------------------------------------------------
# Small utilities
# ---------------------------------------------------------------------------
def qsummary(x: np.ndarray) -> Dict[str, float]:
"""Posterior summary helper."""
return {
"median": float(np.median(x)),
"mean": float(np.mean(x)),
"sd": float(np.std(x, ddof=1)),
"q05": float(np.quantile(x, 0.05)),
"q95": float(np.quantile(x, 0.95)),
}
def infer_current_density_from_filename(path: str | Path) -> float:
"""
Extract current density from filenames such as:
Fig5_750mAcm-2.txt -> 0.75 A cm^-2
Fig5_1500mAcm-2.txt -> 1.50 A cm^-2
"""
m = re.search(r"(\d+)mAcm-2", Path(path).name)
if m is None:
raise ValueError(f"Could not infer current density from filename: {path}")
return float(m.group(1)) / 1000.0
def canonicalize_l2_draws(df: pd.DataFrame) -> pd.DataFrame:
"""
Re-label the two ZARCs as HF / LF by ordering them with tau.
Why this is necessary:
----------------------
The raw Step1b.1 fit returns ZARC_1 and ZARC_2, but these labels are arbitrary:
the optimizer / sampler may exchange the two arcs.
For physical interpretation we want:
HF arc = smaller tau
LF arc = larger tau
"""
rows = []
for _, row in df.iterrows():
if row["tau1"] <= row["tau2"]:
hf = {"r_hf": row["r1"], "tau_hf": row["tau1"], "alpha_hf": row["alpha1"]}
lf = {"r_lf": row["r2"], "tau_lf": row["tau2"], "alpha_lf": row["alpha2"]}
else:
hf = {"r_hf": row["r2"], "tau_hf": row["tau2"], "alpha_hf": row["alpha2"]}
lf = {"r_lf": row["r1"], "tau_lf": row["tau1"], "alpha_lf": row["alpha1"]}
rows.append(
{
"l_par": row["l_par"],
"r_ohm": row["r_ohm"],
**hf,
**lf,
"sigma_re": row["sigma_re"],
"sigma_im": row["sigma_im"],
}
)
return pd.DataFrame(rows)
# ---------------------------------------------------------------------------
# Stage1b.1 feature extraction
# ---------------------------------------------------------------------------
def fit_stage1_features(
input_paths: Sequence[str | Path],
n_samples_l1: int = 800,
burn_l1: int = 400,
n_samples_l2: int = 1400,
burn_l2: int = 700,
) -> pd.DataFrame:
"""
Run the single-spectrum Step1b.1 fit on all supplied spectra and convert them
into area-specific feature observations for the integrated state-space model.
Returned columns
----------------
- j_cm2 : operating current density
- V : measured cell voltage
- area_cm2 : inferred active area from current / current density
- asr_ohm_cm2_med, asr_ohm_cm2_sd
- r_non_ohm_cm2_med, r_non_ohm_cm2_sd
- r_lf_ohm_cm2_med, r_lf_ohm_cm2_sd
"""
records: List[Dict[str, float]] = []
for idx, path in enumerate(input_paths):
freq_hz, z_re, z_im, meta = step1b1.load_kit_txt(path)
# Fit both L+1-ZARC and L+2-ZARC.
# For the integrated model we use the L+2-ZARC draws because Step1b.1 showed
# that the with-L model is preferred on the supplied KIT spectra and because
# the LF branch is useful as a transport proxy.
_, res_l2 = step1b1.fit_single_spectrum(
freq_hz,
z_re,
z_im,
seed=100 + idx,
n_samples_l1=n_samples_l1,
burn_l1=burn_l1,
n_samples_l2=n_samples_l2,
burn_l2=burn_l2,
)
draws = canonicalize_l2_draws(res_l2.draws)
# Current density is encoded in the filename; current comes from the metadata.
j_cm2 = infer_current_density_from_filename(path)
area_cm2 = float(meta["current_A"]) / j_cm2
# Robust proxies:
# ASR = ohmic area-specific resistance
# R_non = total non-ohmic area-specific resistance
# R_LF = LF area-specific resistance (transport-sensitive)
#
# We intentionally use total non-ohmic resistance (HF + LF) for kinetics
# in the integrated model because the HF/LF split can be fragile at some currents.
asr = draws["r_ohm"].to_numpy() * area_cm2
r_non = (draws["r_hf"].to_numpy() + draws["r_lf"].to_numpy()) * area_cm2
r_lf = draws["r_lf"].to_numpy() * area_cm2
records.append(
{
"file": Path(path).name,
"j_cm2": j_cm2,
"current_A": float(meta["current_A"]),
"V": float(meta["potential_V"]),
"area_cm2": area_cm2,
"l_par_med_H": float(np.median(draws["l_par"])),
"asr_ohm_cm2_med": float(np.median(asr)),
"asr_ohm_cm2_sd": float(np.std(asr, ddof=1)),
"r_non_ohm_cm2_med": float(np.median(r_non)),
"r_non_ohm_cm2_sd": float(np.std(r_non, ddof=1)),
"r_lf_ohm_cm2_med": float(np.median(r_lf)),
"r_lf_ohm_cm2_sd": float(np.std(r_lf, ddof=1)),
}
)
df = pd.DataFrame(records).sort_values("j_cm2").reset_index(drop=True)
return df
# ---------------------------------------------------------------------------
# Integrated 0D + state-space model
# ---------------------------------------------------------------------------
@dataclass
class IntegratedResult:
stage1_features: pd.DataFrame
theta_map: np.ndarray
covariance: np.ndarray
posterior_draws: np.ndarray
global_summary: Dict[str, Dict[str, float]]
per_point_summary: pd.DataFrame
def build_integrated_model_inputs(stage1_df: pd.DataFrame) -> Dict[str, np.ndarray]:
"""
Convert the stage1 feature table into arrays used by the Bayesian state-space layer.
"""
j = stage1_df["j_cm2"].to_numpy()
V_obs = stage1_df["V"].to_numpy()
asr_obs = stage1_df["asr_ohm_cm2_med"].to_numpy()
non_obs = stage1_df["r_non_ohm_cm2_med"].to_numpy()
lf_obs = stage1_df["r_lf_ohm_cm2_med"].to_numpy()
# We work in log-space for positive states and positive observations.
y_obs = np.stack([np.log(asr_obs), np.log(non_obs), np.log(lf_obs)], axis=1)
# Approximate standard deviation in log-space.
# If X ~ N(mu, sd), then for small relative uncertainty:
# sd(log X) ~ sd / mu
#
# The floor avoids over-trusting the Stage1 summaries when only a small number of
# EIS spectra are available.
se_asr = np.maximum(stage1_df["asr_ohm_cm2_sd"].to_numpy() / asr_obs, 0.01)
se_non = np.maximum(stage1_df["r_non_ohm_cm2_sd"].to_numpy() / non_obs, 0.05)
se_lf = np.maximum(stage1_df["r_lf_ohm_cm2_sd"].to_numpy() / lf_obs, 0.05)
y_se = np.stack([se_asr, se_non, se_lf], axis=1)
return {"j": j, "V_obs": V_obs, "y_obs": y_obs, "y_se": y_se}
def unpack_theta(theta: np.ndarray, T: int) -> Tuple[np.ndarray, np.ndarray, float, float, float, float]:
"""
Parameter vector -> structured objects.
theta layout
------------
[ latent states x_{1:T} (3*T),
log_q_asr, log_q_non, log_q_lf,
v_base,
log_kappa_act,
log_kappa_diff,
log_sigma_v ]
where each latent state x_t contains:
x_t = [log(ASR_t), log(R_non,t), log(R_LF,t)]
"""
x = theta[: 3 * T].reshape(T, 3)
log_q = theta[3 * T : 3 * T + 3]
v_base = theta[3 * T + 3]
log_kappa_act = theta[3 * T + 4]
log_kappa_diff = theta[3 * T + 5]
log_sigma_v = theta[3 * T + 6]
q = np.exp(log_q)
kappa_act = np.exp(log_kappa_act)
kappa_diff = np.exp(log_kappa_diff)
sigma_v = np.exp(log_sigma_v)
return x, q, v_base, kappa_act, kappa_diff, sigma_v
def voltage_model_from_states(
j_cm2: np.ndarray,
x: np.ndarray,
v_base: float,
kappa_act: float,
kappa_diff: float,
) -> np.ndarray:
"""
Compact 0D voltage model driven by latent EIS-derived states.
Latent states
-------------
ASR_t = exp(x_t[0]) [ohm cm^2]
R_non_t = exp(x_t[1]) [ohm cm^2]
R_LF_t = exp(x_t[2]) [ohm cm^2]
Physical interpretation
-----------------------
- ASR_t drives the ohmic voltage drop directly:
eta_ohm = j * ASR_t
- R_non_t acts as the main kinetic proxy:
i0_eff = kappa_act / R_non_t
so larger non-ohmic resistance implies smaller effective exchange current density.
- R_LF_t acts as the transport-sensitive proxy:
iL_eff = j + kappa_diff / R_LF_t
so larger low-frequency resistance means the limiting current comes closer to
the operating current.
Compact 0D equation
-------------------
V = v_base
+ B_ACT * asinh( j / (2 i0_eff) )
+ j * ASR
+ B_DIFF * ln( iL_eff / (iL_eff - j) )
Notes
-----
- v_base absorbs the ideal reversible voltage and any residual baseline offset.
- This is a physically guided proxy model, not a full mechanistic 0D thermal model.
- It is specifically designed to couple Step1b.1 outputs to a smooth state-space layer.
"""
asr = np.exp(x[:, 0])
r_non = np.exp(x[:, 1])
r_lf = np.exp(x[:, 2])
i0_eff = kappa_act / np.maximum(r_non, 1e-12)
iL_eff = j_cm2 + kappa_diff / np.maximum(r_lf, 1e-12)
eta_act = B_ACT * np.arcsinh(j_cm2 / (2.0 * np.maximum(i0_eff, 1e-12)))
eta_ohm = j_cm2 * asr
eta_diff = B_DIFF * np.log(iL_eff / np.maximum(iL_eff - j_cm2, 1e-12))
return v_base + eta_act + eta_ohm + eta_diff
def log_prior(theta: np.ndarray, y_obs: np.ndarray) -> float:
"""
Prior for the local-level state-space model.
Priors used here
----------------
- x_1 is centered on the first EIS observation (weakly informative)
- x_t | x_{t-1} follows a Gaussian random walk with scale q
- q has a weak lognormal prior
- v_base has a weak Gaussian prior centered on a plausible PEMWE voltage baseline
- kappa_act, kappa_diff, sigma_v have weak lognormal priors
"""
T = y_obs.shape[0]
x, q, v_base, kappa_act, kappa_diff, sigma_v = unpack_theta(theta, T)
lp = 0.0
# Initial state prior.
lp += np.sum(-0.5 * ((x[0] - y_obs[0]) / 0.2) ** 2 - np.log(0.2 * np.sqrt(2.0 * np.pi)))
# Local-level transition prior.
for t in range(1, T):
lp += np.sum(-0.5 * ((x[t] - x[t - 1]) / q) ** 2 - np.log(q * np.sqrt(2.0 * np.pi)))
# Weak lognormal priors for transition scales.
lp += np.sum(-0.5 * (np.log(q) / 1.0) ** 2 - np.log(q * 1.0 * np.sqrt(2.0 * np.pi)))
# Weak prior for baseline voltage.
lp += -0.5 * ((v_base - 1.15) / 0.2) ** 2 - np.log(0.2 * np.sqrt(2.0 * np.pi))
# Weak lognormal priors for positive scale parameters.
for val, mu, s in [
(kappa_act, -1.0, 1.5),
(kappa_diff, -1.0, 1.5),
(sigma_v, np.log(0.01), 1.0),
]:
lp += -0.5 * ((np.log(val) - mu) / s) ** 2 - np.log(val * s * np.sqrt(2.0 * np.pi))
return lp
def log_likelihood(theta: np.ndarray, j: np.ndarray, V_obs: np.ndarray, y_obs: np.ndarray, y_se: np.ndarray) -> float:
"""
Likelihood of the integrated model.
Two observation channels are combined:
1) EIS-derived Stage1b.1 feature observations in log-space
2) Measured cell voltage in the 0D voltage equation
"""
T = len(j)
x, _, v_base, kappa_act, kappa_diff, sigma_v = unpack_theta(theta, T)
lp = 0.0
# EIS feature observations:
# the latent states should stay close to the Stage1b.1 posterior summaries,
# but the integrated model is allowed to smooth them slightly because the voltage
# observation adds extra information.
lp += np.sum(-0.5 * ((y_obs - x) / y_se) ** 2 - np.log(y_se * np.sqrt(2.0 * np.pi)))
# Voltage observations:
v_hat = voltage_model_from_states(j, x, v_base, kappa_act, kappa_diff)
lp += np.sum(-0.5 * ((V_obs - v_hat) / sigma_v) ** 2 - np.log(sigma_v * np.sqrt(2.0 * np.pi)))
return lp
def log_posterior(theta: np.ndarray, j: np.ndarray, V_obs: np.ndarray, y_obs: np.ndarray, y_se: np.ndarray) -> float:
"""Full log-posterior."""
val = log_prior(theta, y_obs) + log_likelihood(theta, j, V_obs, y_obs, y_se)
if not np.isfinite(val):
return -1e300
return val
def finite_difference_hessian(fun, x0: np.ndarray) -> np.ndarray:
"""
Dense central-difference Hessian.
This is sufficient here because the integrated model is intentionally low-dimensional.
"""
n = len(x0)
h = np.maximum(1e-4, np.abs(x0) * 1e-3)
H = np.zeros((n, n))
f0 = fun(x0)
for i in range(n):
ei = np.zeros(n)
ei[i] = h[i]
fpp = fun(x0 + ei)
fmm = fun(x0 - ei)
H[i, i] = (fpp - 2.0 * f0 + fmm) / (h[i] ** 2)
for j in range(i + 1, n):
ej = np.zeros(n)
ej[j] = h[j]
fpp = fun(x0 + ei + ej)
fpm = fun(x0 + ei - ej)
fmp = fun(x0 - ei + ej)
fmm = fun(x0 - ei - ej)
H[i, j] = H[j, i] = (fpp - fpm - fmp + fmm) / (4.0 * h[i] * h[j])
return H
def fit_integrated_model(stage1_df: pd.DataFrame, n_draws: int = 4000, seed: int = 123) -> IntegratedResult:
"""
Fit the integrated Bayesian state-space model and return posterior draws
from the Laplace approximation.
"""
model_inputs = build_integrated_model_inputs(stage1_df)
j = model_inputs["j"]
V_obs = model_inputs["V_obs"]
y_obs = model_inputs["y_obs"]
y_se = model_inputs["y_se"]
T = len(j)
# Initialization:
# start latent states directly at the Stage1b.1 observations.
theta0 = np.concatenate(
[
y_obs.ravel(),
np.log(np.array([0.05, 0.08, 0.08])), # transition scales
np.array([1.18, np.log(0.02), np.log(0.2), np.log(0.01)]), # v_base, kappa_act, kappa_diff, sigma_v
]
)
# MAP fit.
objective = lambda th: -log_posterior(th, j, V_obs, y_obs, y_se)
opt = minimize(
objective,
theta0,
method="L-BFGS-B",
options={"maxiter": 5000, "maxfun": 20000},
)
if not opt.success:
# We do not silently fail: return the best point found, but state clearly
# in the calling report if convergence was not ideal.
print("Warning: optimizer did not report strict convergence:", opt.message)
theta_map = opt.x
# Laplace covariance from the Hessian at the MAP.
H = finite_difference_hessian(objective, theta_map)
covariance = np.linalg.inv(H)
covariance = 0.5 * (covariance + covariance.T) # numerical symmetrization
rng = np.random.default_rng(seed)
posterior_draws = rng.multivariate_normal(theta_map, covariance, size=n_draws)
# Summaries of global parameters and per-point latent states.
x_draws = posterior_draws[:, : 3 * T].reshape(n_draws, T, 3)
q_draws = np.exp(posterior_draws[:, 3 * T : 3 * T + 3])
v_base_draws = posterior_draws[:, 3 * T + 3]
kappa_act_draws = np.exp(posterior_draws[:, 3 * T + 4])
kappa_diff_draws = np.exp(posterior_draws[:, 3 * T + 5])
sigma_v_draws = np.exp(posterior_draws[:, 3 * T + 6])
asr_draws = np.exp(x_draws[:, :, 0])
rnon_draws = np.exp(x_draws[:, :, 1])
rlf_draws = np.exp(x_draws[:, :, 2])
global_summary = {
"q_asr": qsummary(q_draws[:, 0]),
"q_rnon": qsummary(q_draws[:, 1]),
"q_rlf": qsummary(q_draws[:, 2]),
"v_base": qsummary(v_base_draws),
"kappa_act": qsummary(kappa_act_draws),
"kappa_diff": qsummary(kappa_diff_draws),
"sigma_v": qsummary(sigma_v_draws),
}
i0_eff_draws = kappa_act_draws[:, None] / np.maximum(rnon_draws, 1e-12)
iL_eff_draws = j[None, :] + kappa_diff_draws[:, None] / np.maximum(rlf_draws, 1e-12)
v_pp = np.empty((n_draws, T))
for n in range(n_draws):
v_pp[n] = voltage_model_from_states(
j,
x_draws[n],
v_base_draws[n],
kappa_act_draws[n],
kappa_diff_draws[n],
)
rows = []
for t, row in stage1_df.iterrows():
rows.append(
{
"file": row["file"],
"j_A_cm2": row["j_cm2"],
"V_measured_V": row["V"],
"V_pred_median_V": float(np.median(v_pp[:, t])),
"V_pred_q05_V": float(np.quantile(v_pp[:, t], 0.05)),
"V_pred_q95_V": float(np.quantile(v_pp[:, t], 0.95)),
"ASR_med_ohm_cm2": float(np.median(asr_draws[:, t])),
"ASR_q05_ohm_cm2": float(np.quantile(asr_draws[:, t], 0.05)),
"ASR_q95_ohm_cm2": float(np.quantile(asr_draws[:, t], 0.95)),
"Rnon_med_ohm_cm2": float(np.median(rnon_draws[:, t])),
"Rnon_q05_ohm_cm2": float(np.quantile(rnon_draws[:, t], 0.05)),
"Rnon_q95_ohm_cm2": float(np.quantile(rnon_draws[:, t], 0.95)),
"Rlf_med_ohm_cm2": float(np.median(rlf_draws[:, t])),
"Rlf_q05_ohm_cm2": float(np.quantile(rlf_draws[:, t], 0.05)),
"Rlf_q95_ohm_cm2": float(np.quantile(rlf_draws[:, t], 0.95)),
"i0eff_med_A_cm2": float(np.median(i0_eff_draws[:, t])),
"iLeff_med_A_cm2": float(np.median(iL_eff_draws[:, t])),
}
)
per_point_summary = pd.DataFrame(rows)
return IntegratedResult(
stage1_features=stage1_df,
theta_map=theta_map,
covariance=covariance,
posterior_draws=posterior_draws,
global_summary=global_summary,
per_point_summary=per_point_summary,
)
# ---------------------------------------------------------------------------
# Plotting
# ---------------------------------------------------------------------------
def make_figures(result: IntegratedResult, outdir: Path) -> None:
"""Create the main diagnostic plots used in the report."""
stage1_df = result.stage1_features
j = stage1_df["j_cm2"].to_numpy()
V_obs = stage1_df["V"].to_numpy()
T = len(j)
draws = result.posterior_draws
x_draws = draws[:, : 3 * T].reshape(len(draws), T, 3)
asr_draws = np.exp(x_draws[:, :, 0])
rnon_draws = np.exp(x_draws[:, :, 1])
rlf_draws = np.exp(x_draws[:, :, 2])
v_base_draws = draws[:, 3 * T + 3]
kappa_act_draws = np.exp(draws[:, 3 * T + 4])
kappa_diff_draws = np.exp(draws[:, 3 * T + 5])
# Posterior predictive voltage draws.
v_pp = np.empty((len(draws), T))
for n in range(len(draws)):
v_pp[n] = voltage_model_from_states(
j,
x_draws[n],
v_base_draws[n],
kappa_act_draws[n],
kappa_diff_draws[n],
)
# ------------------------------------------------------------------
# Figure 1: observed EIS features vs smoothed latent states
# ------------------------------------------------------------------
y1 = stage1_df["asr_ohm_cm2_med"].to_numpy()
y2 = stage1_df["r_non_ohm_cm2_med"].to_numpy()
y3 = stage1_df["r_lf_ohm_cm2_med"].to_numpy()
fig, axes = plt.subplots(3, 1, figsize=(8, 10), sharex=True)
for ax, obs, draw_arr, label in zip(
axes,
[y1, y2, y3],
[asr_draws, rnon_draws, rlf_draws],
["ASR [Ω cm²]", "R_non-ohmic [Ω cm²]", "R_LF [Ω cm²]"],
):
med = np.median(draw_arr, axis=0)
lo = np.quantile(draw_arr, 0.05, axis=0)
hi = np.quantile(draw_arr, 0.95, axis=0)
ax.fill_between(j, lo, hi, alpha=0.25, label="90% credible band")
ax.plot(j, med, "-o", label="Latent state median")
ax.scatter(j, obs, marker="x", s=70, label="Stage1b.1 observation")
ax.set_ylabel(label)
ax.grid(True, alpha=0.3)
axes[-1].set_xlabel("Current density [A cm$^{-2}$]")
axes[0].legend(loc="best", fontsize=9)
fig.suptitle("Integrated state-space model: observed EIS features vs latent states")
fig.tight_layout()
fig.savefig(outdir / "feature_smoothing.png", dpi=180, bbox_inches="tight")
plt.close(fig)
# ------------------------------------------------------------------
# Figure 2: voltage fit and decomposition
# ------------------------------------------------------------------
med_asr = np.median(asr_draws, axis=0)
med_rnon = np.median(rnon_draws, axis=0)
med_rlf = np.median(rlf_draws, axis=0)
med_v_base = float(np.median(v_base_draws))
med_kappa_act = float(np.median(kappa_act_draws))
med_kappa_diff = float(np.median(kappa_diff_draws))
i0_eff = med_kappa_act / np.maximum(med_rnon, 1e-12)
iL_eff = j + med_kappa_diff / np.maximum(med_rlf, 1e-12)
eta_act = B_ACT * np.arcsinh(j / (2.0 * np.maximum(i0_eff, 1e-12)))
eta_ohm = j * med_asr
eta_diff = B_DIFF * np.log(iL_eff / np.maximum(iL_eff - j, 1e-12))
v_med = med_v_base + eta_act + eta_ohm + eta_diff
v_lo = np.quantile(v_pp, 0.05, axis=0)
v_hi = np.quantile(v_pp, 0.95, axis=0)
fig, ax = plt.subplots(figsize=(8, 5.5))
ax.fill_between(j, v_lo, v_hi, alpha=0.2, label="Posterior predictive 90% band")
ax.plot(j, v_med, "-o", label="Posterior median voltage")
ax.scatter(j, V_obs, marker="x", s=70, label="Measured voltage")
ax.plot(j, np.full_like(j, med_v_base), "--", label="v_base")
ax.plot(j, med_v_base + eta_act, "--", label="v_base + η_act")
ax.plot(j, med_v_base + eta_act + eta_ohm, "--", label="... + η_ohm")
ax.set_xlabel("Current density [A cm$^{-2}$]")
ax.set_ylabel("Cell voltage [V]")
ax.set_title("0D voltage fit and contribution build-up")
ax.grid(True, alpha=0.3)
ax.legend(fontsize=8, ncol=2)
fig.tight_layout()
fig.savefig(outdir / "voltage_fit_and_decomposition.png", dpi=180, bbox_inches="tight")
plt.close(fig)
# ------------------------------------------------------------------
# Figure 3: effective kinetic and transport states
# ------------------------------------------------------------------
i0_eff_draws = kappa_act_draws[:, None] / np.maximum(rnon_draws, 1e-12)
iL_eff_draws = j[None, :] + kappa_diff_draws[:, None] / np.maximum(rlf_draws, 1e-12)
fig, axes = plt.subplots(2, 1, figsize=(8, 8), sharex=True)
for ax, draw_arr, label in zip(
axes,
[i0_eff_draws, iL_eff_draws],
[r"Effective $i_{0,\mathrm{eff}}$ [A cm$^{-2}$]", r"Effective $i_{L,\mathrm{eff}}$ [A cm$^{-2}$]"],
):
med = np.median(draw_arr, axis=0)
lo = np.quantile(draw_arr, 0.05, axis=0)
hi = np.quantile(draw_arr, 0.95, axis=0)
ax.fill_between(j, lo, hi, alpha=0.25)
ax.plot(j, med, "-o")
ax.grid(True, alpha=0.3)
ax.set_ylabel(label)
axes[-1].plot(j, j, "k--", label="Operating current density j")
axes[-1].legend()
axes[-1].set_xlabel("Current density [A cm$^{-2}$]")
fig.suptitle("Latent effective kinetic and transport parameters implied by the integrated model")
fig.tight_layout()
fig.savefig(outdir / "effective_states.png", dpi=180, bbox_inches="tight")
plt.close(fig)
# ---------------------------------------------------------------------------
# Serialization
# ---------------------------------------------------------------------------
def save_outputs(result: IntegratedResult, outdir: Path) -> None:
"""Save tables, JSON summaries, and figures."""
outdir.mkdir(parents=True, exist_ok=True)
result.stage1_features.to_csv(outdir / "stage1_features_from_step1b1.csv", index=False)
result.per_point_summary.to_csv(outdir / "posterior_state_summary.csv", index=False)
with open(outdir / "global_parameter_summary.json", "w", encoding="utf-8") as f:
json.dump(result.global_summary, f, indent=2, ensure_ascii=False)
with open(outdir / "integrated_summary.json", "w", encoding="utf-8") as f:
json.dump(
{
"global_parameters": result.global_summary,
"per_point_summary": result.per_point_summary.to_dict(orient="records"),
},
f,
indent=2,
ensure_ascii=False,
)
make_figures(result, outdir)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(description="Integrated 0D + Step1b.1 Bayesian state-space prototype for PEMWE EIS.")
p.add_argument(
"--inputs",
nargs="+",
required=True,
help="KIT txt spectra to analyze.",
)
p.add_argument("--outdir", required=True, help="Output directory.")
p.add_argument("--seed", type=int, default=123)
p.add_argument("--n-draws", type=int, default=4000, help="Posterior draws from Laplace approximation.")
return p.parse_args()
def main() -> None:
args = parse_args()
outdir = Path(args.outdir)
stage1_df = fit_stage1_features(args.inputs)
result = fit_integrated_model(stage1_df, n_draws=args.n_draws, seed=args.seed)
save_outputs(result, outdir)
print(f"Saved integrated model outputs to: {outdir}")
if __name__ == "__main__":
main()