#!/usr/bin/env python """ CBC with oscillatory FD waveform + TD memory (via gwmemory), returning FD polarisations for injection/summing while exposing the directly interpolated TD memory for plotting (no inverse FFT). """ #import logging # Set the logging level for bilby to ERROR or CRITICAL #logging.getLogger('bilby').setLevel(logging.ERROR) #import warnings #warnings.filterwarnings("ignore") import time import numpy as np import matplotlib.pyplot as plt import bilby import gwmemory from bilby.core.utils import nfft, infft from bilby.gw.result import CBCResult # ------------------------------ config ------------------------------ outdir = "/data/www.astro/chrism/newcos/outdir" label = f"mem_{time.strftime('%Y%m%d_%H%M%S')}" sampling_frequency = 2048 minimum_frequency = 10 duration = 4.0 # seconds MEM_ONLY = False # if True, the source returns memory-only FD polarisations bilby.core.utils.setup_logger(outdir=outdir, label=label) bilby.core.utils.random.seed(88170235) #bilby.core.utils.setup_logger(outdir=outdir, label=label, log_level="ERROR") waveform_arguments = dict( waveform_approximant="IMRPhenomXPHM", reference_frequency=50.0, minimum_frequency=minimum_frequency, ) from scipy.interpolate import PchipInterpolator as PCHIP def _interp_hold_edges(x_src, y_src, x_new): # Shape-preserving cubic; no ringing, smoother than linear p = PCHIP(x_src, y_src, extrapolate=False) y_new = np.empty_like(x_new, dtype=float) m_in = (x_new >= x_src[0]) & (x_new <= x_src[-1]) y_new[m_in] = p(x_new[m_in]) # hold edges (don’t return to zero) y_new[~m_in & (x_new < x_src[0])] = float(y_src[0]) y_new[~m_in & (x_new > x_src[-1])] = float(y_src[-1]) return y_new def _nfft_onesided(x_td, fs): """ Call bilby.core.utils.nfft and robustly return (spectrum, freq) as 1-D arrays. Works whether nfft returns just the spectrum or (spectrum, freq). """ y = nfft(x_td, sampling_frequency=fs) if isinstance(y, tuple): # Some bilby versions return (spectrum, freq) Y, f = y Y = np.asarray(Y, dtype=np.complex128).ravel() f = np.asarray(f, dtype=float).ravel() else: # Some return only the spectrum, reconstruct the frequency axis Y = np.asarray(y, dtype=np.complex128).ravel() f = np.linspace(0.0, fs / 2.0, Y.size, dtype=float) return Y, f # --- helper: robust nfft output shape --- def _nfft_onesided(x_td, fs): y = nfft(x_td, sampling_frequency=fs) if isinstance(y, tuple): Y, f = y return np.asarray(Y, np.complex128).ravel(), np.asarray(f, float).ravel() else: Y = np.asarray(y, np.complex128).ravel() f = np.linspace(0.0, fs/2.0, Y.size, dtype=float) return Y, f # --------------------- source with TD memory helper --------------------- def make_bbh_with_gwmemory_fd( *, gwmemory_model="IMRPhenomTHM", l_max=4, base_fd_source=None, sampling_frequency=None, duration=None, apply_explicit_memory_redshift=True, # apply (1+z)^-2 to memory ): """ Factory returning a callable source model: h_total(f) = h_base(f; detector-frame) + FFT[ h_mem_td(t; source-frame) ] It also exposes: source.get_td_memory(parameters, fs, T, seg_start) -> (t_abs, hplus_td, hcross_td) so you can plot the interpolated time-domain memory directly. """ if base_fd_source is None: base_fd_source = bilby.gw.source.lal_binary_black_hole if sampling_frequency is None or duration is None: raise ValueError("Provide sampling_frequency and duration.") def _ensure_lal_params(p_lal, maybe_src=None): """Fill missing LAL keys from conversion and set safe defaults.""" required = [ "mass_1", "mass_2", "luminosity_distance", "a_1", "a_2", "tilt_1", "tilt_2", "theta_jn", "phase", ] # If anything crucial is None, try a conversion if any(p_lal.get(k) is None for k in required): raw = dict(p_lal) if maybe_src is not None: raw.update({k: v for k, v in maybe_src.items() if v is not None}) conv = bilby.gw.conversion.convert_to_lal_binary_black_hole_parameters(raw) if isinstance(conv, tuple): conv = conv[0] p_lal.update(conv) # Safe defaults for angular params expected by IMRPhenomXPHM p_lal.setdefault("phi_12", 0.0) p_lal.setdefault("phi_jl", 0.0) p_lal.setdefault("psi", 0.0) # Final check missing = [k for k in required if p_lal.get(k) is None] if missing: raise RuntimeError(f"LAL base is missing: {missing}\nHave: {sorted(p_lal.keys())}") return p_lal def _td_memory_on_segment(p_lal, *, fs, T, seg_start): """ Directly interpolate the memory (from gwmemory) on the segment time axis. Returns absolute times and TD memory polarisations (already (1+z)^-2 scaled if enabled). """ z = float(p_lal.get("redshift", 0.0) or 0.0) # source-frame masses preferred; else convert from detector-frame m1_src = p_lal.get("mass_1_source", p_lal["mass_1"] / (1.0 + z)) m2_src = p_lal.get("mass_2_source", p_lal["mass_2"] / (1.0 + z)) chi1 = p_lal.get("chi_1", p_lal.get("a_1", 0.0)) chi2 = p_lal.get("chi_2", p_lal.get("a_2", 0.0)) q = (m1_src / m2_src) if m1_src >= m2_src else (m2_src / m1_src) S1 = np.array([0.0, 0.0, float(chi1)], dtype=float) S2 = np.array([0.0, 0.0, float(chi2)], dtype=float) h_mem_td, t_mem = gwmemory.gwmemory.time_domain_memory( model=gwmemory_model, q=q, total_mass=(m1_src + m2_src), spin_1=S1, spin_2=S2, distance=float(p_lal["luminosity_distance"]), inc=float(p_lal["theta_jn"]), phase=float(p_lal["phase"]), l_max=int(l_max), ) # Segment absolute time axis and relative times to coalescence fs = float(fs) T = float(T) N = int(round(T * fs)) if N % 2: N += 1 dt = 1.0 / fs t_abs = seg_start + np.arange(N) * dt tau = t_abs - float(p_lal.get("geocent_time", 0.0)) # Interpolate plus/cross directly in TD (no huge GPS subtraction!) t_mem = np.asarray(t_mem, dtype=float) hp0 = np.asarray(h_mem_td["plus"], dtype=float) hc0 = np.asarray(h_mem_td["cross"], dtype=float) left_plus, right_plus = float(hp0[0]), float(hp0[-1]) left_cross, right_cross = float(hc0[0]), float(hc0[-1]) #hplus_td = np.interp(tau, t_mem, hp0, left=left_plus, right=right_plus) #hcross_td = np.interp(tau, t_mem, hc0, left=left_cross, right=right_cross) hplus_td = _interp_hold_edges(t_mem, hp0, tau) hcross_td = _interp_hold_edges(t_mem, hc0, tau) # Optional explicit memory redshift scaling if apply_explicit_memory_redshift and z > 0.0: s = (1.0 + z) ** -2 hplus_td *= s hcross_td *= s return t_abs, hplus_td, hcross_td def fd_source_with_memory( frequency_array, # LAL detector-frame (WG will fill via parameter_conversion) mass_1=None, mass_2=None, luminosity_distance=None, a_1=None, a_2=None, tilt_1=None, tilt_2=None, theta_jn=None, phase=None, # extras redshift=0.0, geocent_time=0.0, mass_1_source=None, mass_2_source=None, **kwargs, ): # Build LAL-like parameter dict and ensure completeness p_lal = dict( mass_1=mass_1, mass_2=mass_2, luminosity_distance=luminosity_distance, a_1=a_1, a_2=a_2, tilt_1=tilt_1, tilt_2=tilt_2, theta_jn=theta_jn, phase=phase, redshift=redshift, geocent_time=geocent_time, **kwargs, ) p_lal = _ensure_lal_params( p_lal, maybe_src=dict( mass_1_source=mass_1_source, mass_2_source=mass_2_source, chi_1=kwargs.get("chi_1", None), chi_2=kwargs.get("chi_2", None), ), ) # ---------- TD memory on this exact segment (for plotting) ---------- seg_start = float(p_lal["geocent_time"]) - float(duration) / 2.0 t_abs, hplus_td_mem, hcross_td_mem = _td_memory_on_segment( p_lal, fs=sampling_frequency, T=duration, seg_start=seg_start ) # Stash for plotting code to consume (no inverse FFT) fd_source_with_memory.last_memory_td = { "t": t_abs, "plus": hplus_td_mem, "cross": hcross_td_mem, } # In your fd_source_with_memory, right after you build the TD memory arrays hplus_td_mem/hcross_td_mem: # --- apply a light Tukey window to the TD memory to avoid leakage --- #def _tukey(N, alpha=0.1): # n = np.arange(N, dtype=float) # if alpha <= 0: return np.ones(N) # if alpha >= 1: return 0.5*(1-np.cos(2*np.pi*n/(N-1))) # edge = int(alpha*(N-1)/2.0) # w = np.ones(N) # if edge > 0: # w[:edge] = 0.5*(1 + np.cos(np.pi*(2*n[:edge]/(alpha*(N-1)) - 1))) # w[-edge:] = 0.5*(1 + np.cos(np.pi*(2*(n[-edge:]-(N-1))/(alpha*(N-1)) + 1))) # return w #w = _tukey(hplus_td_mem.size, alpha=0.2) #hplus_td_mem = hplus_td_mem * w #hcross_td_mem = hcross_td_mem * w dt = 1.0 / float(sampling_frequency) n_ramp = max(1, int(round(0.01 / dt))) # 10 ms w = np.ones_like(hplus_td_mem) w[:n_ramp] = 0.5 * (1 - np.cos(np.pi * np.arange(n_ramp) / n_ramp)) hplus_td_mem = hplus_td_mem * w hcross_td_mem = hcross_td_mem * w # --- TD→FD via nfft, then zero bins below fmin to avoid low-f blow-up --- fs = float(sampling_frequency) Hp_tmp, f_mem = _nfft_onesided(hplus_td_mem, fs) Hc_tmp, _ = _nfft_onesided(hcross_td_mem, fs) # same grid, ignore second f fmin = float(getattr(kwargs, "minimum_frequency", kwargs.get("minimum_frequency", minimum_frequency))) lo_mask = f_mem < fmin Hp_tmp = np.where(lo_mask, 0.0, Hp_tmp) Hc_tmp = np.where(lo_mask, 0.0, Hc_tmp) # interpolate complex→complex to the exact Bilby grid (flatten first!) f_target = np.asarray(frequency_array, dtype=float).ravel() Hp_tmp = np.asarray(Hp_tmp, dtype=np.complex128).ravel() Hc_tmp = np.asarray(Hc_tmp, dtype=np.complex128).ravel() Hp = np.interp(f_target, f_mem, np.real(Hp_tmp), left=0.0, right=0.0) \ + 1j*np.interp(f_target, f_mem, np.imag(Hp_tmp), left=0.0, right=0.0) Hc = np.interp(f_target, f_mem, np.real(Hc_tmp), left=0.0, right=0.0) \ + 1j*np.interp(f_target, f_mem, np.imag(Hc_tmp), left=0.0, right=0.0) hplus_fd = Hp.astype(np.complex128, copy=False) hcross_fd = Hc.astype(np.complex128, copy=False) # --- Oscillatory FD polarisations (reinserted) --- osc = base_fd_source(frequency_array, **p_lal) osc_plus = np.asarray(osc["plus"], dtype=np.complex128).ravel() osc_cross = np.asarray(osc["cross"], dtype=np.complex128).ravel() # Match the Bilby frequency grid length exactly n = len(frequency_array) osc_plus = osc_plus[:n] osc_cross = osc_cross[:n] # Safety check: ensure shapes match before summing assert hplus_fd.shape == osc_plus.shape == (n,), \ f"Shape mismatch: mem {hplus_fd.shape}, osc {osc_plus.shape}, n {n}" # (Optional) sanity checks once: #print("shapes:", f_target.shape, hplus_fd.shape, hcross_fd.shape) if MEM_ONLY: return {"plus": hplus_fd, "cross": hcross_fd} return {"plus": osc_plus + hplus_fd, "cross": osc_cross + hcross_fd} # Public helper so callers can fetch TD memory directly (no inverse FFT) def get_td_memory(parameters, *, fs, T, seg_start): p = dict(parameters) conv = bilby.gw.conversion.convert_to_lal_binary_black_hole_parameters(p) if isinstance(conv, tuple): conv = conv[0] p.update(conv) p.setdefault("phi_12", 0.0) p.setdefault("phi_jl", 0.0) p.setdefault("psi", 0.0) return _td_memory_on_segment(p, fs=fs, T=T, seg_start=seg_start) fd_source_with_memory.get_td_memory = get_td_memory fd_source_with_memory.last_memory_td = None return fd_source_with_memory def plot_injected_time_series( ifo, waveform_generator, injection_parameters, color="C0", alpha=1.0, alpha_data=0.2, label=None, axes=None, linestyle_signal="-", linestyle_data="-", savepath=None, dpi=300, whiten_epsilon=1e-24, fmin_for_whitening=None, show_td_memory=True, mem_color=None, mem_alpha=None, mem_linestyle="--", memory_panel_mode="memory_only", # "memory_only"|"total"|"both" (for UNwhitened panel) whitened_shows_memory_only=True, # <--- NEW: whitened panel = memory only ): """ Unwhitened panel: choose memory_only/total/both (TD memory is direct from gwmemory). Whitened panel: if whitened_shows_memory_only=True, shows detector-projected memory ONLY. """ # ---------- grids & data ---------- freqs = np.asarray(ifo.frequency_array, dtype=float) fs = float(ifo.strain_data.sampling_frequency) data_fd = np.asarray(ifo.strain_data.frequency_domain_strain, dtype=np.complex128) # total projected signal FD (osc + mem) from your source pols_total = waveform_generator.frequency_domain_strain(injection_parameters) # dict signal_fd_total = ifo.get_detector_response(pols_total, injection_parameters).astype(np.complex128) # ---------- PSD → whitening weights on IFO grid ---------- psd_on_ifo = np.asarray(ifo.power_spectral_density.psd_array, dtype=float) asd_on_ifo = np.sqrt(psd_on_ifo) inv_asd = np.zeros_like(asd_on_ifo) good = np.isfinite(asd_on_ifo) & (asd_on_ifo > 0) inv_asd[good] = 1.0 / asd_on_ifo[good] if fmin_for_whitening is None: fmin_for_whitening = getattr(waveform_generator, "waveform_arguments", {}).get("minimum_frequency", None) if fmin_for_whitening is None: fmin_for_whitening = getattr(ifo, "minimum_frequency", 0.0) fmin = float(fmin_for_whitening) inv_asd = np.where(freqs >= fmin, inv_asd, 0.0) inv_asd[0] = 0.0 if inv_asd.size > 1: inv_asd[-1] = 0.0 # ---------- UNWHITENED TD traces ---------- # total (for optional plotting) signal_td_total = np.real(infft(signal_fd_total, sampling_frequency=fs)) data_td = np.real(infft(data_fd, sampling_frequency=fs)) # direct TD memory from your source helper (no FFT artefacts) td_mem_ok = show_td_memory and hasattr(waveform_generator.frequency_domain_source_model, "get_td_memory") if td_mem_ok: fd_src = waveform_generator.frequency_domain_source_model t_mem_abs, hplus_mem_td, hcross_mem_td = fd_src.get_td_memory( injection_parameters, fs=ifo.strain_data.sampling_frequency, T=ifo.strain_data.duration, seg_start=ifo.strain_data.start_time, ) # ---------- WHITENED: build NOISE std from noise-only stream ---------- data_fd_white = data_fd * inv_asd signal_fd_white = signal_fd_total * inv_asd noise_fd_white = (data_fd - signal_fd_total) * inv_asd data_td_white = np.real(infft(data_fd_white, sampling_frequency=fs)) signal_td_white = np.real(infft(signal_fd_white, sampling_frequency=fs)) noise_td_white = np.real(infft(noise_fd_white, sampling_frequency=fs)) # robust std of whitened noise med = np.median(noise_td_white) mad = np.median(np.abs(noise_td_white - med)) s_noise = 1.4826 * mad if mad > 0 else (np.std(noise_td_white) + 1e-12) # --- detector-projected TD memory (fixes the ~2 s offset) --- if whitened_shows_memory_only and td_mem_ok: # Antenna factors and detector delay relative to geocentre ra = injection_parameters["ra"] dec = injection_parameters["dec"] psi = injection_parameters["psi"] tc = injection_parameters["geocent_time"] # Fplus, Fcross at coalescence time (standard choice) Fp = ifo.antenna_response(ra, dec, tc, psi, mode="plus") Fx = ifo.antenna_response(ra, dec, tc, psi, mode="cross") # Detector arrival delay (seconds) dt_det = ifo.time_delay_from_geocenter(ra, dec, tc) # Build detector TD memory by **shifting in time** by dt_det # h_det(t) = Fp * h_plus(t - dt_det) + Fx * h_cross(t - dt_det) # Interpolate on the same absolute time grid returned by get_td_memory t_det_arg = t_mem_abs - dt_det #hp_shift = np.interp( # t_det_arg, t_mem_abs, hplus_mem_td, # left=float(hplus_mem_td[0]), right=float(hplus_mem_td[-1]) #) #hc_shift = np.interp( # t_det_arg, t_mem_abs, hcross_mem_td, # left=float(hcross_mem_td[0]), right=float(hcross_mem_td[-1]) #) hp_shift = _interp_hold_edges(t_mem_abs, hplus_mem_td, t_mem_abs - dt_det) hc_shift = _interp_hold_edges(t_mem_abs, hcross_mem_td, t_mem_abs - dt_det) h_mem_ifo_td = Fp * hp_shift + Fx * hc_shift h_mem_ifo_td = h_mem_ifo_td - np.mean(h_mem_ifo_td) # --- whiten the detector TD memory --- # FFT -> 1/ASD on exact IFO grid -> IFFT H_mem_ifo_fd, f_mem = _nfft_onesided(h_mem_ifo_td, fs) # High-pass below fmin to avoid low-f blow-up (optional but recommended) H_mem_ifo_fd = np.where(f_mem < fmin, 0.0, H_mem_ifo_fd) # Map to IFO freq grid (complex interpolation) H_mem_ifo_fd = ( np.interp(freqs, f_mem, np.real(H_mem_ifo_fd), left=0.0, right=0.0) + 1j * np.interp(freqs, f_mem, np.imag(H_mem_ifo_fd), left=0.0, right=0.0) ).astype(np.complex128, copy=False) # Whiten and normalise by noise std (s_noise computed earlier) mem_fd_white = H_mem_ifo_fd * inv_asd mem_td_white = np.real(infft(mem_fd_white, sampling_frequency=fs)) / s_noise else: # fallback: total whitened (already normalised) mem_td_white = signal_td_white / s_noise # ---------- time axis ---------- if hasattr(ifo.strain_data, "time_array") and ifo.strain_data.time_array is not None: t = np.asarray(ifo.strain_data.time_array, dtype=float) else: N = int(ifo.strain_data.duration * fs) t0 = float(ifo.strain_data.start_time) dt = 1.0 / fs t = t0 + np.arange(N) * dt # ---------- axes ---------- if axes is None: fig, (ax0, ax1) = plt.subplots(2, 1, sharex=True, figsize=(10, 6), constrained_layout=True) else: ax0, ax1 = axes # ---------- UNWHITENED panel: memory_only / total / both ---------- plotted_any = False if memory_panel_mode in ("memory_only", "both") and td_mem_ok: mc = mem_color if mem_color is not None else color ma = mem_alpha if mem_alpha is not None else min(0.7, alpha) ax0.plot(t_mem_abs, hplus_mem_td, linestyle=mem_linestyle, lw=1, color=mc, alpha=ma, label=(f"Memory TD (+) ({label})" if label else "Memory TD (+)")) plotted_any = True if memory_panel_mode in ("total", "both"): ax0.plot(t, signal_td_total, linestyle=linestyle_signal, lw=1, color=color, alpha=alpha, label=("Total (osc+mem)" if plotted_any else (f"Signal ({label})" if label else "Signal"))) plotted_any = True if not plotted_any: ax0.plot(t, signal_td_total, linestyle=linestyle_signal, lw=1, color=color, alpha=alpha, label=(f"Signal ({label})" if label else "Signal")) ax0.set_ylabel("Strain") ax0.set_title(f"{ifo.name} — Time domain (unwhitened)") ax0.grid(True, alpha=0.3) ax0.legend(loc="upper right", fontsize=9) # ---------- WHITENED panel: memory-only (normalised by noise σ) ---------- ax1.plot(t, mem_td_white, linestyle=linestyle_signal, lw=1, color=color, alpha=alpha, label="Memory only (whitened, norm.)" if whitened_shows_memory_only else "Signal (whitened, norm.)") ax1.set_xlabel("Time [s]") ax1.set_ylabel("Whitened strain (noise σ≈1)") ax1.set_title(f"{ifo.name} — Time domain (whitened)") ax1.grid(True, alpha=0.3) ax1.legend(loc="upper right", fontsize=9) if savepath is not None: ax0.get_figure().savefig(savepath, dpi=dpi, bbox_inches="tight") return ax0, ax1 # ------------------------------ driver code ------------------------------ from bilby.gw.result import CBCResult # Fixed angular params for sampling injection_parameters = dict( mass_1_source=39, mass_2_source=29, luminosity_distance=2000, theta_jn=0.4, psi=2.659, phase=1.3, geocent_time=1126259642.413, ra=1.375, dec=-1.2108, a_1=0.0, a_2=0.0, tilt_1=0.0, tilt_2=0.0, phi_12=0.0, phi_jl=0.0, ) # Priors (source-frame masses + redshift + distance) def convert_x_y_to_z(parameters): parameters["mass_diff"] = parameters["mass_1_source"] - parameters["mass_2_source"] return parameters priors = bilby.core.prior.PriorDict(conversion_function=convert_x_y_to_z) priors["mass_1_source"] = bilby.core.prior.Uniform(10, 100, latex_label="$m^{\\mathrm{s}}_{1}$") priors["mass_2_source"] = bilby.core.prior.Uniform(10, 100, latex_label="$m^{\\mathrm{s}}_{2}$") priors["mass_diff"] = bilby.core.prior.Constraint(minimum=0, maximum=90) priors["redshift"] = bilby.gw.prior.UniformSourceFrame(0, 2, name="redshift", latex_label="$z$") priors["luminosity_distance"] = bilby.core.prior.Uniform(100, 5000, latex_label="$d_{\\mathrm{L}}$") for key in [ #"mass_1_source", #"mass_2_source", #"luminosity_distance", "theta_jn", "a_1", "a_2", "tilt_1", "tilt_2", "phi_12", "phi_jl", "psi", "ra", "dec", "geocent_time", "phase", ]: priors[key] = injection_parameters[key] # Build the hybrid source fd_source = make_bbh_with_gwmemory_fd( gwmemory_model="IMRPhenomTHM", l_max=4, sampling_frequency=sampling_frequency, duration=duration, apply_explicit_memory_redshift=True, ) # Sampling loop: draw up to 3 examples and plot ax0, ax1 = None, None n_kept = 0 while n_kept < 1: parameters = {k: v[0] for k, v in priors.sample(1).items()} print("params:", {k: parameters[k] for k in ["mass_1_source","mass_2_source","redshift","luminosity_distance"]}) print("waveform_arguments:", waveform_arguments) waveform_generator = bilby.gw.waveform_generator.WaveformGenerator( duration=duration, sampling_frequency=sampling_frequency, frequency_domain_source_model=fd_source, parameter_conversion=bilby.gw.conversion.convert_to_lal_binary_black_hole_parameters, waveform_arguments=waveform_arguments, ) ifos = bilby.gw.detector.InterferometerList(["H1", "L1", "V1"]) for ifo in ifos: ifo.minimum_frequency = minimum_frequency # <-- move up ifo.maximum_frequency = sampling_frequency / 2.0 # good to be explicit # optional but sensible for memory-only checks: ifo.sampling_frequency = sampling_frequency ifo.duration = duration # IFOs and data ifos.set_strain_data_from_power_spectral_densities( sampling_frequency=sampling_frequency, duration=duration, start_time=parameters["geocent_time"] - duration / 2.0, ) for ifo in ifos: f_ifo = np.asarray(ifo.frequency_array, dtype=float) # Pull PSD from IFO (whatever bilby attached) psd = getattr(ifo.power_spectral_density, "psd_array", None) f_psd = getattr(ifo.power_spectral_density, "frequency_array", None) if psd is None: psd = getattr(ifo, "power_spectral_density_array", None) # legacy f_psd = None if psd is None: raise RuntimeError(f"{ifo.name}: PSD not found") psd = np.asarray(psd, dtype=float).ravel() if f_psd is None or len(f_psd) != len(psd): # assume linear 0..Nyquist f_psd = np.linspace(0.0, sampling_frequency/2.0, len(psd), dtype=float) # Re-interp with FINITE edges (no inf!) psd_interp = np.interp(f_ifo, f_psd, psd, left=psd[0], right=psd[-1]) # Mask below fmin and at DC/Nyquist to avoid huge weights fmin = float(getattr(ifo, "minimum_frequency", minimum_frequency)) mask_valid = (f_ifo >= fmin) & np.isfinite(psd_interp) & (psd_interp > 0.0) # Floor small values; typical aLIGO PSD ~1e-46–1e-45 around 100 Hz psd_floor = 3e-46 psd_clean = np.where(mask_valid, np.maximum(psd_interp, psd_floor), np.inf) # Reattach a clean PSD on the EXACT IFO grid ifo.power_spectral_density = bilby.gw.detector.PowerSpectralDensity( frequency_array=f_ifo, psd_array=psd_clean ) for ifo in ifos: psd = ifo.power_spectral_density.psd_array print(ifo.name, "PSD finite min/max on-grid:", np.nanmin(psd[np.isfinite(psd)]), np.nanmax(psd[np.isfinite(psd)])) print("finite count:", np.isfinite(psd).sum(), " / ", psd.size) ifos.inject_signal(waveform_generator=waveform_generator, parameters=parameters, raise_error=False) # Quick SNR check pols = waveform_generator.frequency_domain_strain(parameters) rhos = [] for ifo in ifos: sig_ifo = ifo.get_detector_response(pols, parameters) rhos.append(np.sqrt(np.real(ifo.optimal_snr_squared(signal=sig_ifo)))) df = ifo.frequency_array[1] - ifo.frequency_array[0] print(ifo.name, "min/max PSD:", psd.min(), psd.max()) print("Any zeros? ", np.any(psd == 0.0)) print("min/max |signal_fd|:", np.abs(sig_ifo).min(), np.abs(sig_ifo).max()) print("df:", df, "min freq:", ifo.minimum_frequency) rho_net = float(np.hypot.reduce(rhos)) print("IFOs SNRs:", rhos, "net:", rho_net) if rho_net >= 8.0: n_kept += 1 inj_params = dict(parameters) # the actual injection used likelihood = bilby.gw.likelihood.GravitationalWaveTransient( interferometers=ifos, waveform_generator=waveform_generator ) result = bilby.core.sampler.run_sampler( likelihood, priors, sampler="nessai", outdir=outdir, label=label, resume=False, sample="unif", injection_parameters=inj_params, ) # build truths robustly (fall back to detector-frame if needed) def get(d, *keys): for k in keys: if k in d: return d[k] raise KeyError(f"none of {keys} found") m1s = get(inj_params, "mass_1_source", "mass_1") # depending on conversion, one will exist m2s = get(inj_params, "mass_2_source", "mass_2") z = get(inj_params, "redshift") dL = get(inj_params, "luminosity_distance") truths = [m1s, m2s, z, dL] result.plot_corner( parameters=["mass_1_source", "mass_2_source", "redshift", "luminosity_distance"], truths=truths, priors=True, ) cbc_result = CBCResult.from_json('/data/www.astro/chrism/newcos/outdir/{}_result.json'.format(label)) for ifo in ifos: cbc_result.plot_interferometer_waveform_posterior( interferometer=ifo, n_samples=500, save=True )