# SAF network MVP tool
このフォルダは、関東以北の空港需要と製紙プラント候補を結ぶ**議論用の最小MILPツール**です。
## 何を解くか
最小化対象は年間総コストです。
- 原料調達費
- 前処理費
- 原料輸送費
- 変換の変動費
- 技術の固定費・年換算CAPEX
- SAF製品輸送費
- 空港需要未達ペナルティ
- 既存蒸気系メリットは負コストで控除
意思決定変数は以下です。
- どの供給ノードからどの製紙プラントへ何トン送るか
- 各製紙プラントでどの技術を採用するか
- 各プラントからどの空港へSAFを何トン送るか
- 需要未達を何トン許容するか
## 実行方法
```bash
python solve_saf_network.py
```
上のデフォルトでは `params.csv` の未達ペナルティを使います。
この値だと「建設しない」解になりやすいので、議論用には次の実行を推奨します。
```bash
python solve_saf_network.py --shortfall-penalty 500000
```
## 出力
`results/` に以下を出します。
- `selected_tech.csv`
- `feed_flows.csv`
- `product_shipments.csv`
- `airport_shortfall.csv`
- `plant_summary.csv`
- `airport_summary.csv`
- `cost_summary.csv`
- `results.xlsx`
## モデル上の割り切り
- 品質制約は `compatibility.csv` の採否・ペナルティに押し込んでいます
- 規模効果は連続関数ではなく、離散技術候補で近似しています
- 季節変動はまだ入れていません
- 輸送距離や単価の多くは議論用 proxy です
## 拡張候補
- 品質平均制約(灰分・水分)
- 月次期間への拡張
- CO2制約や多目的化
- 中継・前処理拠点のノード化
- 実測データへの置換
from __future__ import annotations
import argparse
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
from scipy.optimize import Bounds, LinearConstraint, milp
@dataclass(frozen=True)
class ModelData:
airports: pd.DataFrame
plants: pd.DataFrame
feedstocks: pd.DataFrame
supplies: pd.DataFrame
techs: pd.DataFrame
compat: pd.DataFrame
links: pd.DataFrame
airport_links: pd.DataFrame
demand: pd.DataFrame
params: Dict[str, float]
def load_data(data_dir: Path) -> ModelData:
params = (
pd.read_csv(data_dir / "params.csv")
.set_index("param_name")["param_value"]
.astype(float)
.to_dict()
)
return ModelData(
airports=pd.read_csv(data_dir / "airports.csv"),
plants=pd.read_csv(data_dir / "plants.csv"),
feedstocks=pd.read_csv(data_dir / "feedstocks.csv"),
supplies=pd.read_csv(data_dir / "supply_nodes.csv"),
techs=pd.read_csv(data_dir / "technologies.csv"),
compat=pd.read_csv(data_dir / "compatibility.csv"),
links=pd.read_csv(data_dir / "transport_links.csv"),
airport_links=pd.read_csv(data_dir / "airport_links.csv"),
demand=pd.read_csv(data_dir / "demand_cases.csv"),
params=params,
)
def build_model_inputs(data: ModelData, use_starter_scope: bool = True) -> dict:
supplies = data.supplies.copy()
supplies["effective_supply_ton"] = (
supplies["annual_supply_wet_ton"]
* supplies["availability_factor"]
* supplies["max_contract_ratio"]
)
if use_starter_scope:
feedstocks_in = set(data.feedstocks.loc[data.feedstocks["starter_scope"] == 1, "feedstock_id"])
techs_in = data.techs.loc[data.techs["starter_scope"] == 1].copy()
supplies_in = supplies.loc[supplies["feedstock_id"].isin(feedstocks_in)].copy()
compat_in = data.compat.loc[
data.compat["tech_id"].isin(techs_in["tech_id"])
& data.compat["feedstock_id"].isin(feedstocks_in)
].copy()
else:
techs_in = data.techs.copy()
supplies_in = supplies.copy()
compat_in = data.compat.copy()
compat_map = {(r.tech_id, r.feedstock_id): r for r in compat_in.itertuples(index=False)}
supply_by_id = {r.supply_id: r for r in supplies_in.itertuples(index=False)}
plant_by_id = {r.plant_id: r for r in data.plants.itertuples(index=False)}
tech_by_id = {r.tech_id: r for r in techs_in.itertuples(index=False)}
feed_of_supply = {r.supply_id: r.feedstock_id for r in supplies_in.itertuples(index=False)}
link_map = {(r.supply_id, r.plant_id): r for r in data.links.itertuples(index=False)}
airport_link_map = {(r.plant_id, r.airport_id): r for r in data.airport_links.itertuples(index=False)}
demand_map = {r.airport_id: float(r.poc_saf_demand_ton_per_year) for r in data.demand.itertuples(index=False)}
valid_x: List[Tuple[str, str, str]] = []
for s in supplies_in["supply_id"]:
feedstock_id = feed_of_supply[s]
for p in data.plants["plant_id"]:
if (s, p) not in link_map:
continue
for t in techs_in["tech_id"]:
item = compat_map.get((t, feedstock_id))
if item is not None and int(item.is_allowed) == 1:
valid_x.append((s, p, t))
valid_q = [
(p, a)
for p in data.plants["plant_id"]
for a in data.airports["airport_id"]
if (p, a) in airport_link_map
]
return {
"supplies": supplies_in,
"techs": techs_in,
"compat_map": compat_map,
"supply_by_id": supply_by_id,
"plant_by_id": plant_by_id,
"tech_by_id": tech_by_id,
"link_map": link_map,
"airport_link_map": airport_link_map,
"demand_map": demand_map,
"valid_x": valid_x,
"valid_q": valid_q,
}
def capital_recovery_factor(rate: float, years: float) -> float:
return rate * (1 + rate) ** years / ((1 + rate) ** years - 1)
def solve_network(
data: ModelData,
*,
shortfall_penalty: float | None = None,
use_starter_scope: bool = True,
case_id: str = "BASE",
) -> dict:
inputs = build_model_inputs(data, use_starter_scope=use_starter_scope)
demand = data.demand.loc[data.demand["case_id"] == case_id].copy()
if demand.empty:
raise ValueError(f"Unknown case_id: {case_id}")
demand_map = {r.airport_id: float(r.poc_saf_demand_ton_per_year) for r in demand.itertuples(index=False)}
inputs["demand_map"] = demand_map
valid_x = inputs["valid_x"]
valid_q = inputs["valid_q"]
plants = data.plants
techs = inputs["techs"]
# Variable indexing.
variables: List[Tuple[str, ...]] = []
for s, p, t in valid_x:
variables.append(("x", s, p, t))
for p in plants["plant_id"]:
for t in techs["tech_id"]:
variables.append(("z", p, t))
for p, a in valid_q:
variables.append(("q", p, a))
for a in demand_map:
variables.append(("short", a))
var_idx = {v: i for i, v in enumerate(variables)}
n = len(variables)
c = np.zeros(n)
rate = data.params["discount_rate"]
years = data.params["capex_annualization_years"]
crf = capital_recovery_factor(rate, years)
shortfall_penalty = (
float(shortfall_penalty)
if shortfall_penalty is not None
else float(data.params["airport_shortfall_penalty_jpy_per_ton"])
)
for s, p, t in valid_x:
sr = inputs["supply_by_id"][s]
tr = inputs["tech_by_id"][t]
lr = inputs["link_map"][(s, p)]
cp = inputs["compat_map"][(t, sr.feedstock_id)]
unit_cost = (
float(sr.procurement_cost_jpy_per_ton)
+ float(lr.transport_cost_jpy_per_ton)
+ float(cp.pretreatment_cost_jpy_per_ton)
+ float(cp.quality_penalty_jpy_per_ton)
+ float(tr.variable_opex_jpy_per_ton_feed)
- float(inputs["plant_by_id"][p].existing_steam_credit_jpy_per_ton_feed)
)
c[var_idx[("x", s, p, t)]] = unit_cost
for p in plants["plant_id"]:
for t in techs["tech_id"]:
tr = inputs["tech_by_id"][t]
c[var_idx[("z", p, t)]] = float(tr.fixed_opex_jpy_per_year) + float(tr.capex_jpy) * crf
for p, a in valid_q:
link = inputs["airport_link_map"][(p, a)]
c[var_idx[("q", p, a)]] = float(link.transport_cost_jpy_per_ton_saf)
for a in demand_map:
c[var_idx[("short", a)]] = shortfall_penalty
lower = np.zeros(n)
upper = np.full(n, np.inf)
integrality = np.zeros(n, dtype=int)
for p in plants["plant_id"]:
for t in techs["tech_id"]:
i = var_idx[("z", p, t)]
upper[i] = 1
integrality[i] = 1
for p, a in valid_q:
upper[var_idx[("q", p, a)]] = float(inputs["airport_link_map"][(p, a)].max_saf_ton_per_year)
rows: List[np.ndarray] = []
lbs: List[float] = []
ubs: List[float] = []
def add_constraint(coeffs: Dict[Tuple[str, ...], float], lb: float = -np.inf, ub: float = np.inf) -> None:
row = np.zeros(n)
for key, value in coeffs.items():
row[var_idx[key]] += value
rows.append(row)
lbs.append(lb)
ubs.append(ub)
# Supply availability.
for s in inputs["supplies"]["supply_id"]:
add_constraint(
{("x", ss, p, t): 1.0 for (ss, p, t) in valid_x if ss == s},
ub=float(inputs["supply_by_id"][s].effective_supply_ton),
)
# Feed transport link capacities.
for (s, p), link in inputs["link_map"].items():
coeffs = {("x", ss, pp, t): 1.0 for (ss, pp, t) in valid_x if ss == s and pp == p}
if coeffs:
add_constraint(coeffs, ub=float(link.max_transport_ton_per_year))
# Plant intake cap.
for p in plants["plant_id"]:
add_constraint(
{("x", s, pp, t): 1.0 for (s, pp, t) in valid_x if pp == p},
ub=float(inputs["plant_by_id"][p].max_feed_intake_ton_per_year),
)
# One technology per plant.
for p in plants["plant_id"]:
add_constraint({("z", p, t): 1.0 for t in techs["tech_id"]}, ub=1.0)
# Per-plant capex budget.
for p in plants["plant_id"]:
add_constraint(
{("z", p, t): float(inputs["tech_by_id"][t].capex_jpy) for t in techs["tech_id"]},
ub=float(inputs["plant_by_id"][p].max_capex_jpy),
)
# Technology activation and scale constraints.
for p in plants["plant_id"]:
plant_cap = float(inputs["plant_by_id"][p].max_feed_intake_ton_per_year)
for t in techs["tech_id"]:
rel = [(s, pp, tt) for (s, pp, tt) in valid_x if pp == p and tt == t]
if not rel:
continue
add_constraint(
{("x", s, pp, tt): 1.0 for (s, pp, tt) in rel} | {("z", p, t): -plant_cap},
ub=0.0,
)
add_constraint(
{("x", s, pp, tt): -1.0 for (s, pp, tt) in rel} | {("z", p, t): float(inputs["tech_by_id"][t].min_feed_ton_per_year)},
ub=0.0,
)
add_constraint(
{("x", s, pp, tt): 1.0 for (s, pp, tt) in rel} | {("z", p, t): -float(inputs["tech_by_id"][t].max_feed_ton_per_year)},
ub=0.0,
)
# Production balance per plant.
for p in plants["plant_id"]:
coeffs: Dict[Tuple[str, ...], float] = {}
for s, pp, t in valid_x:
if pp == p:
coeffs[("x", s, pp, t)] = coeffs.get(("x", s, pp, t), 0.0) - float(inputs["tech_by_id"][t].saf_yield_ton_per_ton_feed)
for pp, a in valid_q:
if pp == p:
coeffs[("q", pp, a)] = coeffs.get(("q", pp, a), 0.0) + 1.0
if coeffs:
add_constraint(coeffs, ub=0.0)
# Airport product link capacities.
for (p, a), link in inputs["airport_link_map"].items():
if (p, a) in valid_q:
add_constraint({("q", p, a): 1.0}, ub=float(link.max_saf_ton_per_year))
# Demand satisfaction with shortfall slack.
for a, demand_ton in demand_map.items():
coeffs = {("q", p, aa): 1.0 for (p, aa) in valid_q if aa == a}
coeffs[("short", a)] = 1.0
add_constraint(coeffs, lb=float(demand_ton))
A = np.vstack(rows)
result = milp(
c=c,
integrality=integrality,
bounds=Bounds(lower, upper),
constraints=LinearConstraint(A, lbs, ubs),
)
if not result.success:
raise RuntimeError(result.message)
return {
"result": result,
"variables": variables,
"var_idx": var_idx,
"inputs": inputs,
"demand_map": demand_map,
"shortfall_penalty": shortfall_penalty,
"crf": crf,
}
def extract_outputs(solution: dict, data: ModelData) -> dict:
x = solution["result"].x
variables = solution["variables"]
inputs = solution["inputs"]
tech_by_id = inputs["tech_by_id"]
supply_by_id = inputs["supply_by_id"]
feed_rows = []
for v, value in zip(variables, x):
if v[0] != "x" or value <= 1e-6:
continue
_, s, p, t = v
sr = supply_by_id[s]
tr = tech_by_id[t]
link = inputs["link_map"][(s, p)]
compat = inputs["compat_map"][(t, sr.feedstock_id)]
feed_rows.append(
{
"supply_id": s,
"plant_id": p,
"tech_id": t,
"feedstock_id": sr.feedstock_id,
"feedstock_ton": float(value),
"procurement_cost_jpy_per_ton": float(sr.procurement_cost_jpy_per_ton),
"pretreatment_cost_jpy_per_ton": float(compat.pretreatment_cost_jpy_per_ton),
"quality_penalty_jpy_per_ton": float(compat.quality_penalty_jpy_per_ton),
"feed_transport_cost_jpy_per_ton": float(link.transport_cost_jpy_per_ton),
"variable_opex_jpy_per_ton_feed": float(tr.variable_opex_jpy_per_ton_feed),
"saf_yield_ton_per_ton_feed": float(tr.saf_yield_ton_per_ton_feed),
"implied_saf_ton": float(value) * float(tr.saf_yield_ton_per_ton_feed),
}
)
feed_df = pd.DataFrame(feed_rows)
tech_rows = []
for v, value in zip(variables, x):
if v[0] != "z" or value <= 0.5:
continue
_, p, t = v
tr = tech_by_id[t]
tech_rows.append(
{
"plant_id": p,
"tech_id": t,
"tech_name": tr.tech_name,
"capex_jpy": float(tr.capex_jpy),
"fixed_opex_jpy_per_year": float(tr.fixed_opex_jpy_per_year),
"min_feed_ton_per_year": float(tr.min_feed_ton_per_year),
"max_feed_ton_per_year": float(tr.max_feed_ton_per_year),
}
)
tech_df = pd.DataFrame(tech_rows)
product_rows = []
for v, value in zip(variables, x):
if v[0] != "q" or value <= 1e-6:
continue
_, p, a = v
link = inputs["airport_link_map"][(p, a)]
product_rows.append(
{
"plant_id": p,
"airport_id": a,
"saf_ton": float(value),
"product_transport_cost_jpy_per_ton": float(link.transport_cost_jpy_per_ton_saf),
"distance_km": float(link.distance_km),
"mode": link.mode,
}
)
product_df = pd.DataFrame(product_rows)
short_rows = []
for v, value in zip(variables, x):
if v[0] != "short":
continue
_, a = v
short_rows.append({"airport_id": a, "shortfall_ton": float(value), "demand_ton": float(solution["demand_map"][a])})
short_df = pd.DataFrame(short_rows)
# Summary tables.
if not feed_df.empty:
plant_feed = (
feed_df.groupby(["plant_id", "tech_id"], as_index=False)
.agg(feed_ton=("feedstock_ton", "sum"), produced_saf_ton=("implied_saf_ton", "sum"))
)
else:
plant_feed = pd.DataFrame(columns=["plant_id", "tech_id", "feed_ton", "produced_saf_ton"])
if not product_df.empty:
plant_ship = product_df.groupby("plant_id", as_index=False).agg(shipped_saf_ton=("saf_ton", "sum"))
else:
plant_ship = pd.DataFrame(columns=["plant_id", "shipped_saf_ton"])
plant_summary = plant_feed.merge(plant_ship, on="plant_id", how="outer").fillna(0)
if not tech_df.empty:
plant_summary = plant_summary.merge(tech_df[["plant_id", "tech_name", "capex_jpy", "fixed_opex_jpy_per_year"]], on="plant_id", how="left")
airport_summary = short_df.copy()
if not product_df.empty:
airport_in = product_df.groupby("airport_id", as_index=False).agg(delivered_saf_ton=("saf_ton", "sum"))
airport_summary = airport_summary.merge(airport_in, on="airport_id", how="left")
airport_summary["delivered_saf_ton"] = airport_summary.get("delivered_saf_ton", 0).fillna(0)
airport_summary["coverage_ratio"] = np.where(
airport_summary["demand_ton"] > 0,
airport_summary["delivered_saf_ton"] / airport_summary["demand_ton"],
0,
)
# Cost summary.
total_obj = float(solution["result"].fun)
feed_cost = 0.0
pretreat_cost = 0.0
quality_cost = 0.0
feed_transport_cost = 0.0
variable_opex_cost = 0.0
steam_credit = 0.0
for row in feed_rows:
tons = row["feedstock_ton"]
feed_cost += tons * row["procurement_cost_jpy_per_ton"]
pretreat_cost += tons * row["pretreatment_cost_jpy_per_ton"]
quality_cost += tons * row["quality_penalty_jpy_per_ton"]
feed_transport_cost += tons * row["feed_transport_cost_jpy_per_ton"]
variable_opex_cost += tons * row["variable_opex_jpy_per_ton_feed"]
steam_credit += tons * float(inputs["plant_by_id"][row["plant_id"]].existing_steam_credit_jpy_per_ton_feed)
product_transport_cost = float((product_df["saf_ton"] * product_df["product_transport_cost_jpy_per_ton"]).sum()) if not product_df.empty else 0.0
annualized_capex = float(tech_df["capex_jpy"].sum() * solution["crf"]) if not tech_df.empty else 0.0
fixed_opex = float(tech_df["fixed_opex_jpy_per_year"].sum()) if not tech_df.empty else 0.0
shortfall_cost = float(short_df["shortfall_ton"].sum() * solution["shortfall_penalty"])
cost_summary = pd.DataFrame(
[
{"cost_item": "feedstock_procurement", "jpy_per_year": feed_cost},
{"cost_item": "pretreatment", "jpy_per_year": pretreat_cost},
{"cost_item": "quality_penalty", "jpy_per_year": quality_cost},
{"cost_item": "feed_transport", "jpy_per_year": feed_transport_cost},
{"cost_item": "variable_opex", "jpy_per_year": variable_opex_cost},
{"cost_item": "steam_credit_negative", "jpy_per_year": -steam_credit},
{"cost_item": "annualized_capex", "jpy_per_year": annualized_capex},
{"cost_item": "fixed_opex", "jpy_per_year": fixed_opex},
{"cost_item": "product_transport", "jpy_per_year": product_transport_cost},
{"cost_item": "shortfall_penalty", "jpy_per_year": shortfall_cost},
{"cost_item": "objective_total", "jpy_per_year": total_obj},
]
)
return {
"feed_flows": feed_df,
"selected_tech": tech_df,
"product_shipments": product_df,
"airport_shortfall": short_df,
"plant_summary": plant_summary,
"airport_summary": airport_summary,
"cost_summary": cost_summary,
}
def write_outputs(outputs: dict, output_dir: Path) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
for name, df in outputs.items():
df.to_csv(output_dir / f"{name}.csv", index=False)
with pd.ExcelWriter(output_dir / "results.xlsx", engine="openpyxl") as writer:
for name, df in outputs.items():
df.to_excel(writer, sheet_name=name[:31], index=False)
def main() -> None:
parser = argparse.ArgumentParser(description="Solve the SAF network MVP model with SciPy MILP.")
parser.add_argument("--data-dir", type=Path, default=Path(__file__).resolve().parent / "data")
parser.add_argument("--output-dir", type=Path, default=Path(__file__).resolve().parent / "results")
parser.add_argument("--case-id", default="BASE")
parser.add_argument("--shortfall-penalty", type=float, default=None)
parser.add_argument("--full-scope", action="store_true", help="Use all feedstocks/technologies, not only starter_scope=1")
args = parser.parse_args()
data = load_data(args.data_dir)
sol = solve_network(
data,
shortfall_penalty=args.shortfall_penalty,
use_starter_scope=not args.full_scope,
case_id=args.case_id,
)
outputs = extract_outputs(sol, data)
write_outputs(outputs, args.output_dir)
print("Optimization finished.")
print(f"Objective [JPY/year]: {sol['result'].fun:,.0f}")
print(f"Shortfall penalty [JPY/ton]: {sol['shortfall_penalty']:,.0f}")
print("Selected technologies:")
tech_df = outputs["selected_tech"]
if tech_df.empty:
print(" (none)")
else:
for row in tech_df.itertuples(index=False):
print(f" - {row.plant_id}: {row.tech_name}")
print(f"Results written to: {args.output_dir}")
if __name__ == "__main__":
main()