O–M–E–F Classifier
Author: H.N. van Roon
License: CC0 1.0 Universal (Public Domain)
# ============================================================================
# CLASSIFIER 5.3 — End-to-end YSO / MS / EV / XP classifier
# ============================================================================
!pip install -q astroquery
import os
from datetime import datetime
import csv
import math
import astropy.units as u
import numpy as np
from astropy.coordinates import SkyCoord
from astroquery.gaia import Gaia
from astroquery.simbad import Simbad
from astroquery.vizier import Vizier
from astroquery.alma import Alma
import pyvo
import pandas as pd
# ============================================================================
# CONFIG / CONSTANTS
# ============================================================================
RADII = {
"gaia": 2.0, # arcsec
"twomass": 3.0,
"wise": 3.0,
"simbad": 10.0,
"spitzer": 3.0,
"alma": 30.0,
"panstarrs": 3.0
}
YSO_OTYPES = {
"Y*O", "Y*?", "Y*?", "Y*?", "Y*O?", "Y*O", "Y*O?", "Y*O:",
"IR", "IR*", "IR?", "IR:", "IR?", "IR*", "IR:", "IR?",
"mm", "mm?", "mm:", "Rad", "Rad?", "Rad:", "cm", "cm?", "cm:"
}
# ============================================================================
# BLOK 1 — GAIA HARVESTING
# ============================================================================
def get_gaia_by_coordinates(ra_str, dec_str, radius_arcsec=RADII["gaia"]):
coord = SkyCoord(ra_str, dec_str, unit=(u.hourangle, u.deg))
radius_deg = radius_arcsec / 3600.0
query = f"""
SELECT TOP 10
source_id, ra, dec, parallax, parallax_error,
phot_g_mean_mag, phot_bp_mean_mag, phot_rp_mean_mag,
ruwe
FROM gaiadr3.gaia_source
WHERE 1=CONTAINS(
POINT('ICRS', ra, dec),
CIRCLE('ICRS', {coord.ra.deg}, {coord.dec.deg}, {radius_deg})
)
ORDER BY
DISTANCE(POINT('ICRS', ra, dec),
POINT('ICRS', {coord.ra.deg}, {coord.dec.deg}))
"""
try:
job = Gaia.launch_job(query)
r = job.get_results()
if len(r) == 0:
return {"success": False}
row = r[0]
color = None
if row["phot_bp_mean_mag"] is not None and row["phot_rp_mean_mag"] is not None:
color = row["phot_bp_mean_mag"] - row["phot_rp_mean_mag"]
return {
"success": True,
"source_id": int(row["source_id"]),
"ra": float(row["ra"]),
"dec": float(row["dec"]),
"parallax_mas": float(row["parallax"]) if row["parallax"] is not None else None,
"parallax_error_mas": float(row["parallax_error"]) if row["parallax_error"] is not None else None,
"g_mag": float(row["phot_g_mean_mag"]) if row["phot_g_mean_mag"] is not None else None,
"bp_mag": float(row["phot_bp_mean_mag"]) if row["phot_bp_mean_mag"] is not None else None,
"rp_mag": float(row["phot_rp_mean_mag"]) if row["phot_rp_mean_mag"] is not None else None,
"color": color,
"ruwe": float(row["ruwe"]) if row["ruwe"] is not None else None
}
except Exception:
return {"success": False}
def get_gaia_by_id(gaia_id):
query = f"""
SELECT
source_id, ra, dec, parallax, parallax_error,
phot_g_mean_mag, phot_bp_mean_mag, phot_rp_mean_mag,
ruwe
FROM gaiadr3.gaia_source
WHERE source_id = {gaia_id}
"""
try:
job = Gaia.launch_job(query)
r = job.get_results()
if len(r) == 0:
return {"success": False}
row = r[0]
color = None
if row["phot_bp_mean_mag"] is not None and row["phot_rp_mean_mag"] is not None:
color = row["phot_bp_mean_mag"] - row["phot_rp_mean_mag"]
return {
"success": True,
"source_id": int(row["source_id"]),
"ra": float(row["ra"]),
"dec": float(row["dec"]),
"parallax_mas": float(row["parallax"]) if row["parallax"] is not None else None,
"parallax_error_mas": float(row["parallax_error"]) if row["parallax_error"] is not None else None,
"g_mag": float(row["phot_g_mean_mag"]) if row["phot_g_mean_mag"] is not None else None,
"bp_mag": float(row["phot_bp_mean_mag"]) if row["phot_bp_mean_mag"] is not None else None,
"rp_mag": float(row["phot_rp_mean_mag"]) if row["phot_rp_mean_mag"] is not None else None,
"color": color,
"ruwe": float(row["ruwe"]) if row["ruwe"] is not None else None
}
except Exception:
return {"success": False}
# ============================================================================
# BLOK 2 — CATALOG HARVESTING (2MASS, WISE, SIMBAD, SPITZER, ALMA, PAN-STARRS)
# ============================================================================
def get_twomass_primary(ra_str, dec_str, radius_arcsec=RADII["twomass"]):
coord = SkyCoord(ra_str, dec_str, unit=(u.hourangle, u.deg))
vizier = Vizier(row_limit=-1)
radius = radius_arcsec * u.arcsec
try:
result = vizier.query_region(coord, radius=radius, catalog="II/246/out")
if not result or len(result[0]) == 0:
return {"success": False}
table = result[0]
best_row, best_sep = None, None
for row in table:
ra, dec = row["RAJ2000"], row["DEJ2000"]
if ra is None or dec is None:
continue
row_coord = SkyCoord(ra, dec, unit="deg")
sep = coord.separation(row_coord).arcsec
if best_sep is None or sep < best_sep:
best_sep, best_row = sep, row
if best_row is None:
return {"success": False}
photometry = {
"J": float(best_row["Jmag"]) if best_row["Jmag"] is not None else None,
"H": float(best_row["Hmag"]) if best_row["Hmag"] is not None else None,
"Ks": float(best_row["Kmag"]) if best_row["Kmag"] is not None else None
}
colors = {
"JH": photometry["J"] - photometry["H"] if photometry["J"] and photometry["H"] else None,
"HKs": photometry["H"] - photometry["Ks"] if photometry["H"] and photometry["Ks"] else None,
"JKs": photometry["J"] - photometry["Ks"] if photometry["J"] and photometry["Ks"] else None
}
return {"success": True, "photometry": photometry, "colors": colors, "sep_arcsec": best_sep}
except Exception:
return {"success": False}
def get_wise_primary(ra_str, dec_str, radius_arcsec=RADII["wise"]):
coord = SkyCoord(ra_str, dec_str, unit=(u.hourangle, u.deg))
vizier = Vizier(row_limit=-1)
radius = radius_arcsec * u.arcsec
try:
result = vizier.query_region(coord, radius=radius, catalog="II/328/allwise")
if not result or len(result[0]) == 0:
return {"success": False}
table = result[0]
best_row, best_sep = None, None
for row in table:
ra, dec = row["RAJ2000"], row["DEJ2000"]
if ra is None or dec is None:
continue
row_coord = SkyCoord(ra, dec, unit="deg")
sep = coord.separation(row_coord).arcsec
if best_sep is None or sep < best_sep:
best_sep, best_row = sep, row
if best_row is None:
return {"success": False}
photometry = {
"W1": float(best_row["W1mag"]) if best_row["W1mag"] is not None else None,
"W2": float(best_row["W2mag"]) if best_row["W2mag"] is not None else None,
"W3": float(best_row["W3mag"]) if best_row["W3mag"] is not None else None,
"W4": float(best_row["W4mag"]) if best_row["W4mag"] is not None else None
}
colors = {
"W1W2": photometry["W1"] - photometry["W2"] if photometry["W1"] and photometry["W2"] else None,
"W2W3": photometry["W2"] - photometry["W3"] if photometry["W2"] and photometry["W3"] else None,
"W3W4": photometry["W3"] - photometry["W4"] if photometry["W3"] and photometry["W4"] else None
}
return {"success": True, "photometry": photometry, "colors": colors}
except Exception:
return {"success": False}
def get_simbad_primary(ra_str, dec_str, radius_arcsec=RADII["simbad"]):
# Input RA/DEC → hourangle + deg → converteer naar graden
coord = SkyCoord(ra_str, dec_str, unit=(u.hourangle, u.deg))
coord_deg = SkyCoord(coord.ra.deg, coord.dec.deg, unit="deg")
# Reset SIMBAD fields
Simbad.reset_votable_fields()
Simbad.add_votable_fields("sp_type", "otype", "ra", "dec", "ra(d)", "dec(d)")
try:
result = Simbad.query_region(coord_deg, radius_arcsec * u.arcsec)
if result is None or len(result) == 0:
return {"success": False}
best_row = None
best_sep = None
for row in result:
# --- UNIVERSAL RA/DEC EXTRACTION ---
# SIMBAD kan RA/DEC teruggeven onder verschillende namen
if "RA_d" in row.colnames and "DEC_d" in row.colnames:
ra_deg = float(row["RA_d"])
dec_deg = float(row["DEC_d"])
elif "ra" in row.colnames and "dec" in row.colnames:
ra_deg = float(row["ra"])
dec_deg = float(row["dec"])
elif "RA" in row.colnames and "DEC" in row.colnames:
ra_deg = float(row["RA"])
dec_deg = float(row["DEC"])
elif "ra(d)" in row.colnames and "dec(d)" in row.colnames:
ra_deg = float(row["ra(d)"])
dec_deg = float(row["dec(d)"])
else:
print("SIMBAD WARNING: Geen RA/DEC kolommen gevonden:", row.colnames)
continue
row_coord_deg = SkyCoord(ra_deg, dec_deg, unit="deg")
sep = coord_deg.separation(row_coord_deg).arcsec
if best_sep is None or sep < best_sep:
best_sep = sep
best_row = row
if best_row is None:
return {"success": False}
return {
"success": True,
"sp_type": best_row["sp_type"].strip() if best_row["sp_type"] else None,
"otype": best_row["otype"].strip() if best_row["otype"] else None,
"sep_arcsec": best_sep
}
except Exception as e:
print("SIMBAD ERROR:", e)
return {"success": False}
def get_spitzer_primary(ra_str, dec_str, radius_arcsec=RADII["spitzer"]):
coord = SkyCoord(ra_str, dec_str, unit=(u.hourangle, u.deg))
vizier = Vizier(row_limit=-1)
radius = radius_arcsec * u.arcsec
try:
result = vizier.query_region(coord, radius=radius, catalog="II/293/glimpse")
if not result or len(result[0]) == 0:
return {"success": False}
table = result[0]
best_row, best_sep = None, None
for row in table:
ra, dec = row["RAJ2000"], row["DEJ2000"]
if ra is None or dec is None:
continue
row_coord = SkyCoord(ra, dec, unit="deg")
sep = coord.separation(row_coord).arcsec
if best_sep is None or sep < best_sep:
best_sep, best_row = sep, row
if best_row is None:
return {"success": False}
phot = {
"IRAC1": float(best_row["mag1"]),
"IRAC2": float(best_row["mag2"]),
"IRAC3": float(best_row["mag3"]),
"IRAC4": float(best_row["mag4"])
}
colors = {
"I1I2": phot["IRAC1"] - phot["IRAC2"],
"I2I4": phot["IRAC2"] - phot["IRAC4"],
"I1I4": phot["IRAC1"] - phot["IRAC4"]
}
return {"success": True, "photometry": phot, "colors": colors}
except Exception:
return {"success": False}
def get_alma_data_combined(coord, radius_deg):
all_dfs = []
try:
alma_data = Alma.query_region(coord, radius_deg * u.deg, science=True, public=True)
if alma_data is not None and len(alma_data) > 0:
all_dfs.append(alma_data.to_pandas())
except Exception:
pass
try:
tap_service = pyvo.dal.TAPService("https://almascience.eso.org/tap")
query = f"""
SELECT obs_id, proposal_id, target_name, dataproduct_type, calib_level, band_list,
instrument_name, obs_release_date, s_ra, s_dec, em_min, em_max, access_url
FROM ivoa.obscore
WHERE INTERSECTS(CIRCLE('ICRS', {coord.ra.deg}, {coord.dec.deg}, {radius_deg}), s_region) = 1
"""
result = tap_service.search(query)
tap_data = result.to_table().to_pandas()
if not tap_data.empty:
all_dfs.append(tap_data)
except Exception:
pass
if not all_dfs:
return None
combined = pd.concat(all_dfs, ignore_index=True).drop_duplicates()
if 'obs_release_date' in combined.columns:
combined = combined.sort_values('obs_release_date', ascending=False)
return combined
def get_alma_primary(ra_str, dec_str, radius_arcsec=RADII["alma"]):
coord = SkyCoord(ra_str, dec_str, unit=(u.hourangle, u.deg))
radius_deg = radius_arcsec / 3600.0
try:
alma_data = get_alma_data_combined(coord, radius_deg)
if alma_data is None or alma_data.empty:
return {"success": False}
min_dist = float('inf')
best_idx = 0
for idx, row in alma_data.iterrows():
try:
ra_obs = float(row['s_ra'])
dec_obs = float(row['s_dec'])
except Exception:
continue
obs_coord = SkyCoord(ra_obs, dec_obs, unit="deg")
dist = coord.separation(obs_coord).arcsec
if dist < min_dist:
min_dist = dist
best_idx = idx
best_row = alma_data.iloc[best_idx]
alma_dict = {"alma_n_observations": str(len(alma_data))}
for col in best_row.index:
val = best_row[col]
if pd.isna(val):
continue
if col in ['obs_id', 'proposal_id', 'band_list', 'dataproduct_type', 'calib_level']:
alma_dict[f"ALMA_{col}"] = str(val)
alma_dict["ALMA_sep_arcsec"] = f"{min_dist:.2f}" if min_dist != float('inf') else "unknown"
alma_dict["ALMA_radius_used"] = str(radius_arcsec)
return {"success": True, "data_dict": alma_dict}
except Exception:
return {"success": False}
def get_panstarrs_primary(ra_str, dec_str, radius_arcsec=RADII["panstarrs"]):
coord = SkyCoord(ra_str, dec_str, unit=(u.hourangle, u.deg))
vizier = Vizier(row_limit=-1, columns=["RAJ2000", "DEJ2000", "gmag", "rmag", "imag", "zmag", "ymag"])
radius = radius_arcsec * u.arcsec
try:
result = vizier.query_region(coord, radius=radius, catalog="II/349/ps1")
if not result or len(result[0]) == 0:
return {"success": False}
table = result[0]
best_row, best_sep = None, None
for row in table:
ra, dec = row["RAJ2000"], row["DEJ2000"]
if ra is None or dec is None:
continue
row_coord = SkyCoord(ra, dec, unit="deg")
sep = coord.separation(row_coord).arcsec
if best_sep is None or sep < best_sep:
best_sep, best_row = sep, row
if best_row is None:
return {"success": False}
phot = {}
for band in ["g", "r", "i", "z", "y"]:
mag = best_row.get(f"{band}mag")
phot[band] = float(mag) if mag is not None else None
wavelengths = {"g": 0.48, "r": 0.62, "i": 0.75, "z": 0.89, "y": 0.97}
return {"success": True, "photometry": phot, "wavelengths": wavelengths}
except Exception:
return {"success": False}
# ============================================================================
# BLOK 3 — SED / MASS / DOMAIN LOGIC
# ============================================================================
def estimate_mass_from_gaia(gaia_primary):
if not gaia_primary or not gaia_primary.get("success"):
return None
parallax = gaia_primary.get("parallax_mas")
g_mag = gaia_primary.get("g_mag")
color = gaia_primary.get("color")
if parallax is None or parallax <= 0 or g_mag is None or color is None:
return None
d_pc = 1000.0 / parallax
M_G = g_mag - 5.0 * (np.log10(d_pc) - 1.0)
# heel ruwe MS‑mass‑schatter
if color < 0.0:
return 2.0
elif color < 0.5:
return 1.5
elif color < 1.0:
return 1.0
elif color < 1.5:
return 0.8
else:
return 0.5
def analyze_sed(wise_primary):
if not wise_primary or not wise_primary.get("success"):
return {"success": False}
phot = wise_primary["photometry"]
if phot["W1"] is None or phot["W2"] is None or phot["W3"] is None or phot["W4"] is None:
return {"success": False}
mags = np.array([phot["W1"], phot["W2"], phot["W3"], phot["W4"]])
flux = 10.0 ** (-0.4 * mags)
lam = np.array([3.4, 4.6, 12.0, 22.0]) # micron
slope = np.polyfit(np.log10(lam), np.log10(flux), 1)[0]
idx_min = np.argmin(mags)
peak_band = ["W1", "W2", "W3", "W4"][idx_min]
return {"success": True, "slope": slope, "peak_band": peak_band}
def decide_domain(gaia_primary, simbad_primary, wise_primary, mass, sed_info):
# default XP
domain = "XP"
O = "O(XP)"
M = "M(extragalactic)"
E = "E(XP)"
F = "F(extragalactic)"
ir_excess = False
accretion = False
# IR‑excess uit WISE
if wise_primary and wise_primary.get("success"):
w23 = wise_primary["colors"]["W2W3"]
if w23 is not None and w23 > 1.0:
ir_excess = True
# heel simpele accretie‑flag (placeholder)
if simbad_primary and simbad_primary.get("success"):
sp = simbad_primary.get("sp_type", "")
if sp and "EM" in sp.upper():
accretion = True
# SIMBAD‑otype
simbad_otype = None
if simbad_primary and simbad_primary.get("success"):
simbad_otype = simbad_primary.get("otype", "").upper()
# Gaia‑kleur
color = None
if gaia_primary and gaia_primary.get("success"):
color = gaia_primary.get("color")
# YSO‑domein
if simbad_otype in YSO_OTYPES or ir_excess or accretion:
domain = "Y"
O = "O(YSO)"
M = "M(proto/YSO)"
E = "E(envelope/disk)"
F = "F(star‑forming)"
# MS‑domein
elif gaia_primary and gaia_primary.get("success") and mass is not None and mass >= 0.5 and not ir_excess:
domain = "MS"
O = "O(MS)"
M = "M(main sequence)"
E = "E(stellar photosphere)"
F = "F(field)"
# EV‑domein
elif gaia_primary and gaia_primary.get("success") and mass is not None and mass > 1.5 and ir_excess:
domain = "EV"
O = "O(evolved)"
M = "M(giant/supergiant)"
E = "E(dust shell)"
F = "F(evolved)"
return domain, O, M, E, F, ir_excess, accretion
# ============================================================================
# BLOK 4 — SANITY, FORMATTER, CLASSIFY_OBJECT, BATCH, CSV
# ============================================================================
def validate_classification_logic(domain, mass, ir_excess, accretion,
gaia_primary, simbad_primary,
wise_primary, spitzer_primary, alma_primary):
if not gaia_primary or not gaia_primary.get("success"):
return True, None
ruwe = gaia_primary.get("ruwe")
parallax = gaia_primary.get("parallax_mas")
color = gaia_primary.get("color")
# MS sanity
if domain == "MS" and ruwe and ruwe > 2.5:
return False, "MS-object met RUWE > 2.5 is fysisch onlogisch."
if domain == "MS" and parallax is not None and parallax <= 0:
return False, "MS-object met negatieve parallax."
if domain == "MS" and ir_excess:
return False, "MS-object met IR-excess is inconsistent (mogelijk YSO)."
# EV sanity
if domain == "EV" and mass and mass < 0.5:
return False, "EV-object met massa < 0.5 M_sun."
if domain == "EV" and color is not None and color < 0.0:
return False, "EV-object met BP-RP < 0."
# YSO sanity
if domain == "Y":
if wise_primary and wise_primary.get("success"):
w23 = wise_primary["colors"]["W2W3"]
if w23 is not None and w23 < 1.0 and not accretion:
return False, "YSO zonder dust-signatuur (W2-W3 < 1.0)."
if ruwe is not None and ruwe < 1.0 and not ir_excess:
return False, "YSO met lage RUWE en zonder IR-excess (mogelijk MS)."
if simbad_primary and simbad_primary.get("success"):
ot = simbad_primary.get("otype", "").upper()
if ot in ["STAR", "IR", "IR*"] and not (ir_excess or accretion):
return False, "YSO maar SIMBAD otype=STAR zonder YSO-signalen."
# XP sanity
if domain == "XP":
if ir_excess or accretion:
return False, "XP-object met YSO-signatuur (IR-excess/accretion)."
if wise_primary and wise_primary.get("success"):
w23 = wise_primary["colors"]["W2W3"]
if w23 is not None and w23 > 2.0:
return False, "XP-object met rood IR-profiel (mogelijk YSO)."
phot = wise_primary["photometry"]
mags = [("W1", phot["W1"]), ("W2", phot["W2"]),
("W3", phot["W3"]), ("W4", phot["W4"])]
mags = [m for m in mags if m[1] is not None]
if mags:
mags_sorted = sorted(mags, key=lambda x: x[1])
if mags_sorted[0][0] in ["W3", "W4"]:
return False, "XP-object met W3/W4-piek (embedded YSO-kandidaat)."
return True, None
def format_output(domain, O, M, E, F, mass, simbad_primary,
ir_excess, accretion, sed_peak, sed_info):
base = f"D:{domain} ({O}, {M}, {E}; {F})"
extras = []
if mass is not None:
extras.append(f"mass={mass:.3f}")
if simbad_primary and simbad_primary.get("success"):
sp = simbad_primary.get("sp_type")
ot = simbad_primary.get("otype")
if sp:
extras.append(f"sp={sp}")
if ot:
extras.append(f"otype={ot}")
if ir_excess:
extras.append("IR-excess")
if accretion:
extras.append("accretion")
if sed_peak:
extras.append(f"SED={sed_peak}")
if sed_info and sed_info.get("success") and sed_info.get("slope") is not None:
extras.append(f"SED_slope={sed_info['slope']:.3f}")
return base + (" [" + ", ".join(extras) + "]" if extras else "")
def classify_object(ra_str, dec_str, gaia_id=None):
log = []
log.append(f"Coordinates (J2000): {ra_str} {dec_str}")
# GAIA
if gaia_id:
gaia_primary = get_gaia_by_id(gaia_id)
else:
gaia_primary = get_gaia_by_coordinates(ra_str, dec_str)
log.append(f"Gaia success: {gaia_primary.get('success', False)}")
# OTHER CATALOGS
twomass_primary = get_twomass_primary(ra_str, dec_str)
simbad_primary = get_simbad_primary(ra_str, dec_str)
wise_primary = get_wise_primary(ra_str, dec_str)
spitzer_primary = get_spitzer_primary(ra_str, dec_str)
alma_primary = get_alma_primary(ra_str, dec_str)
panstarrs_primary = get_panstarrs_primary(ra_str, dec_str)
# ============================================================================
# UITGEBREIDE LOG — DATA HARVEST
# ============================================================================
log.append("=== DATA HARVEST SUMMARY ===")
if gaia_primary.get("success"):
log.append(
f"GAIA: OK | G={gaia_primary.get('g_mag')} BP={gaia_primary.get('bp_mag')} "
f"RP={gaia_primary.get('rp_mag')} RUWE={gaia_primary.get('ruwe')} "
f"parallax={gaia_primary.get('parallax_mas')}"
)
else:
log.append("GAIA: NO MATCH")
if twomass_primary.get("success"):
phot = twomass_primary["photometry"]
log.append(f"2MASS: OK | J={phot['J']} H={phot['H']} Ks={phot['Ks']}")
else:
log.append("2MASS: NO MATCH")
if wise_primary.get("success"):
phot = wise_primary["photometry"]
log.append(
f"WISE: OK | W1={phot['W1']} W2={phot['W2']} W3={phot['W3']} W4={phot['W4']}"
)
else:
log.append("WISE: NO MATCH")
if simbad_primary.get("success"):
log.append(
f"SIMBAD: OK | otype={simbad_primary.get('otype')} sp={simbad_primary.get('sp_type')}"
)
else:
log.append("SIMBAD: NO MATCH")
log.append("SPITZER: OK" if spitzer_primary.get("success") else "SPITZER: NO MATCH")
log.append("ALMA: OK" if alma_primary.get("success") else "ALMA: NO MATCH")
log.append("PAN-STARRS: OK" if panstarrs_primary.get("success") else "PAN-STARRS: NO MATCH")
log.append("=== END DATA HARVEST SUMMARY ===")
# ============================================================================
# MASS
mass = estimate_mass_from_gaia(gaia_primary)
# SED
sed_info = analyze_sed(wise_primary)
sed_peak = sed_info["peak_band"] if sed_info.get("success") else None
# DOMAIN
domain, O, M, E, F, ir_excess, accretion = decide_domain(
gaia_primary, simbad_primary, wise_primary, mass, sed_info
)
# SANITY
ok, reason = validate_classification_logic(
domain, mass, ir_excess, accretion,
gaia_primary, simbad_primary,
wise_primary, spitzer_primary, alma_primary
)
if not ok:
domain = "XP"
O = "O(XP)"
M = "M(extragalactic)"
E = "E(XP)"
F = "F(extragalactic)"
log.append(f"Sanity override: {reason}")
# FORMAT
result_str = format_output(domain, O, M, E, F, mass,
simbad_primary, ir_excess, accretion,
sed_peak, sed_info)
# CSV‑ready logstring
csv_log = " | ".join(log)
return {
"domain": domain,
"result_str": result_str,
"log": log,
"csv_log": csv_log,
"gaia": gaia_primary,
"simbad": simbad_primary,
"wise": wise_primary,
"twomass": twomass_primary,
"spitzer": spitzer_primary,
"alma": alma_primary,
"panstarrs": panstarrs_primary
}
# ============================================================================
# BATCH + CSV
# ============================================================================
def run_batch(objects, csv_path=None):
rows = []
print("======================================================================")
print("CLASSIFIER 5.3 — End-to-end YSO Testset")
print("======================================================================\n")
print("\n=== Running batch classification ===\n")
for obj in objects:
ra_str = obj["ra"]
dec_str = obj["dec"]
print(f"\n--- Processing {ra_str} {dec_str} ---\n")
res = classify_object(ra_str, dec_str)
print(f">>> CLASSIFICATION: {res['result_str']}\n")
rows.append({
"ra": ra_str,
"dec": dec_str,
"domain": res["domain"],
"result": res["result_str"],
"log": res["csv_log"] # <<< volledige log in CSV
})
print("\n=== Batch complete ===\n")
if csv_path:
df = pd.DataFrame(rows)
df.to_csv(csv_path, index=False)
print(f"Results exported to: {csv_path}")
return rows
# ============================================================================
# TESTSET — compact inputformaat (met RAFGL 5123)
# ============================================================================
testset = [
{"ra": "04 31 34.07736", "dec": "+18 08 04.9020"}, # RAFGL 5123
# {"ra": "05 43 51.41", "dec": "-01 02 53.1"},
# {"ra": "18 29 49.80", "dec": "+01 15 20.6"},
]
run_batch(testset, csv_path="classifier_output.csv")