Skip to content

Manage spectra

spectra

MakeSpectra

Class to create spectra from event lists output by SIXTE. This creates one spectrum per bin, given a binning, with the corresponding ARF that accounts for the vignetting.

Source code in src/xifu_cluster_sim/spectra.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class MakeSpectra:
    """
    Class to create spectra from event lists output by SIXTE. This creates one spectrum per bin, given a binning, 
    with the corresponding ARF that accounts for the vignetting.
    """
    def __init__(self,
                binning,
                xifu_config = XIFU_Config()):
        """
        Instantiate the class

        Parameters:
            binning (binning): Binning object created with the binning functions
            xifu_config (XIFU_Config): X-IFU Config object
        """
        self.binning = binning
        self.xifu_config = xifu_config

    def load_vignetting(self,
                       vign_file_path = '/xifu/usr/share/sixte/instruments/athena-xifu_2024_11/baseline/instdata/athena_vig_13rows_20240326.fits',
                       arf_file_path = '/xifu/home/mola/XIFU_Sims_Turbulence_NewConfig/ARF.arf'):
        """
        Load the vignetting file and compute the vignetting as a continuous function of energy using 
        a quadratic function. This allows to take the vignetting into account without awkwarldy dealing with 
        the fact that it is binned in energy. 
        The vignetting file contains the average vignetting for a given offset angle and within a given energy bin.
        I approximate it by a smooth continuous quadratic function in energy in each bin, such that the average in each bin
        corresponds to the file.

        Parameters:
            vign_file_path (str): Path to the vignetting file
            arf_file_path (str): Path to the ARF file
        """

        # Load vignetting file and assign arrays
        hdu_vign=fits.open(vign_file_path)
        vign=hdu_vign[1].data
        self.vign_E_centers = (vign['ENERG_LO'] + vign['ENERG_HI'])[0]/2
        self.vign_E_widths = (vign['ENERG_HI'] - vign['ENERG_LO'])[0]
        self.vign_E_bins = np.hstack((vign['ENERG_LO'][0], vign['ENERG_HI'][0][-1]))
        self.vignet_integre = np.array(vign['VIGNET'][0,0])
        self.theta = np.array(vign['THETA'][0])*60. #Arcminutes

        # Load arf data
        self.arf_file_path = arf_file_path
        hdu_arf = fits.open(arf_file_path)
        arf = hdu_arf[1].data
        self.arf_specresp = np.array(arf['SPECRESP'])
        self.E_arf = np.array(arf['ENERG_LO'] + arf['ENERG_HI'])/2

        self.vign_on_arf_bins = np.zeros(
        								(self.theta.shape[0],
                                    	 self.E_arf.shape[0]
                                    	)
        								)

        # For each offset angle in the vignetting file
        for i in range(self.theta.shape[0]):
            # Compute coefficients of the quadratic function
            coeffs = quadratic_from_bin_averages(self.vign_E_bins, 
                                                 self.vignet_integre[i], 
                                                 boundary = 'natural')

            # Compute quadratic function at offset angle
            self.vign_on_arf_bins[i] = eval_quadratic_piecewise(self.vign_E_bins, 
                                                           coeffs, 
                                                           self.E_arf) 


    def load_event(self,
                   event_file_path,
                   grades_to_keep = [1]):

        """
        Load event file to transform into spectra

        Parameters:
            event_file_path (str): Path to the event file
            grades_to_keep (list): List of valid grades to keep for event making
        """

        hdu_evt = fits.open(event_file_path)
        self.event_file_path = event_file_path
        self.evt_data = hdu_evt[1].data
        self.evt_data = self.evt_data[np.isin(self.evt_data['GRADING'], grades_to_keep)] 


        ra_offset = self.evt_data['DETY']*self.xifu_config.pixsize_degree.value
        dec_offset = self.evt_data['DETX']*self.xifu_config.pixsize_degree.value
        # Angular distance from center in arcminutes
        self.evt_theta = np.degrees(
                                            np.arccos(
                                                np.cos(np.radians(dec_offset)) * np.cos(np.radians(ra_offset))
                                                    ) 
                                            ) * 60

    def make_arfs_and_spectra(self, 
                              spectra_path,
                              numproc = 1):

        """
        Create one spectrum and one arf per bin in the binning.
        Parallelized over N processes.

        Parameters:
            spectra_path (str): Path to where the events and spectra should be stored
            numproc (int): Number of parallel processes 
        """

        results = Parallel(n_jobs=numproc, 
                           prefer="threads",
                          batch_size = 'auto')(
                                delayed(
                                    self.make_single_arf_and_spectrum
                                        )(bin_number, 
                                        spectra_path
                                        ) for bin_number in tqdm_notebook(np.arange(self.binning.nb_bins), 
                                                                          total = self.binning.nb_bins, 
                                                                          desc = 'Making arfs and spectra'
                                                                         )
                                )

    def make_single_arf_and_spectrum(self, 
                                     bin_number,
                                     spectra_path):
        """
        Create a spectrum and arf for a given bin number.

        Parameters:
            bin_number (int): Bin number in the binning object
            spectra_path (str): Where to save the ARF, event file and spectrum that have been created
        """

        pixels_in_region=self.binning.binning_dict[bin_number][0]
        interpolated_vignet = np.zeros_like(self.E_arf)
        counts_tot = 0
        for i in range(0, len(pixels_in_region)):

            # Real pixel number
            pix_num=pixels_in_region[i] 


            #Compute theta
            theta_pix= np.mean(self.evt_theta[self.evt_data['PIXID'] == pix_num])

            # Weight with counts
            counts_in_band=np.sum(self.evt_data['PIXID'] == pix_num)
            counts_tot += counts_in_band

            # Broadcast interpolation on axis of energies
            func_to_vmap = lambda vignet_pt : jnp.interp(theta_pix, self.theta, vignet_pt) 
            vmapped_interp = jax.vmap(func_to_vmap)
            interpolated_vignet += vmapped_interp(self.vign_on_arf_bins.T) * counts_in_band 

        interpolated_vignet /= counts_tot

        # Load and save modified ARF
        hdu_arf = fits.open(self.arf_file_path)
        hdu_arf[1].data['SPECRESP'] = self.arf_specresp * np.array(interpolated_vignet)
        hdu_arf.writeto(spectra_path + 'spec_{}.arf'.format(bin_number), overwrite=True)

        # Load and save modified event
        hdu_evt = fits.open(self.event_file_path)
        hdu_evt['EVENTS'].header['ANCRFILE']= spectra_path + '/spec_{}.arf'.format(bin_number)
        hdu_evt['EVENTS'].header['RESPFILE']= '/xifu/home/mola/XIFU_Sims_Turbulence_NewConfig/RMF.rmf'

        hdu_evt[1].data = self.evt_data[np.isin(self.evt_data['PIXID'],
                                                     pixels_in_region)]

        evt_filename = spectra_path + '/spec_{}.evt'.format(bin_number)
        hdu_evt.writeto(evt_filename, overwrite=True)

        #Make spectrum using makespec
        pha_filename = evt_filename.replace('.evt','.pha')
        arguments=["makespec",
                   "EvtFile=%s" % evt_filename,
                   "Spectrum=%s" % pha_filename,
                   "clobber=T"]
        print("Calling:",arguments)
        subprocess.check_call(arguments)

        return None

__init__(binning, xifu_config=XIFU_Config())

Instantiate the class

Parameters:

Name Type Description Default
binning binning

Binning object created with the binning functions

required
xifu_config XIFU_Config

X-IFU Config object

XIFU_Config()
Source code in src/xifu_cluster_sim/spectra.py
162
163
164
165
166
167
168
169
170
171
172
173
def __init__(self,
            binning,
            xifu_config = XIFU_Config()):
    """
    Instantiate the class

    Parameters:
        binning (binning): Binning object created with the binning functions
        xifu_config (XIFU_Config): X-IFU Config object
    """
    self.binning = binning
    self.xifu_config = xifu_config

load_event(event_file_path, grades_to_keep=[1])

Load event file to transform into spectra

Parameters:

Name Type Description Default
event_file_path str

Path to the event file

required
grades_to_keep list

List of valid grades to keep for event making

[1]
Source code in src/xifu_cluster_sim/spectra.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def load_event(self,
               event_file_path,
               grades_to_keep = [1]):

    """
    Load event file to transform into spectra

    Parameters:
        event_file_path (str): Path to the event file
        grades_to_keep (list): List of valid grades to keep for event making
    """

    hdu_evt = fits.open(event_file_path)
    self.event_file_path = event_file_path
    self.evt_data = hdu_evt[1].data
    self.evt_data = self.evt_data[np.isin(self.evt_data['GRADING'], grades_to_keep)] 


    ra_offset = self.evt_data['DETY']*self.xifu_config.pixsize_degree.value
    dec_offset = self.evt_data['DETX']*self.xifu_config.pixsize_degree.value
    # Angular distance from center in arcminutes
    self.evt_theta = np.degrees(
                                        np.arccos(
                                            np.cos(np.radians(dec_offset)) * np.cos(np.radians(ra_offset))
                                                ) 
                                        ) * 60

load_vignetting(vign_file_path='/xifu/usr/share/sixte/instruments/athena-xifu_2024_11/baseline/instdata/athena_vig_13rows_20240326.fits', arf_file_path='/xifu/home/mola/XIFU_Sims_Turbulence_NewConfig/ARF.arf')

Load the vignetting file and compute the vignetting as a continuous function of energy using a quadratic function. This allows to take the vignetting into account without awkwarldy dealing with the fact that it is binned in energy. The vignetting file contains the average vignetting for a given offset angle and within a given energy bin. I approximate it by a smooth continuous quadratic function in energy in each bin, such that the average in each bin corresponds to the file.

Parameters:

Name Type Description Default
vign_file_path str

Path to the vignetting file

'/xifu/usr/share/sixte/instruments/athena-xifu_2024_11/baseline/instdata/athena_vig_13rows_20240326.fits'
arf_file_path str

Path to the ARF file

'/xifu/home/mola/XIFU_Sims_Turbulence_NewConfig/ARF.arf'
Source code in src/xifu_cluster_sim/spectra.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def load_vignetting(self,
                   vign_file_path = '/xifu/usr/share/sixte/instruments/athena-xifu_2024_11/baseline/instdata/athena_vig_13rows_20240326.fits',
                   arf_file_path = '/xifu/home/mola/XIFU_Sims_Turbulence_NewConfig/ARF.arf'):
    """
    Load the vignetting file and compute the vignetting as a continuous function of energy using 
    a quadratic function. This allows to take the vignetting into account without awkwarldy dealing with 
    the fact that it is binned in energy. 
    The vignetting file contains the average vignetting for a given offset angle and within a given energy bin.
    I approximate it by a smooth continuous quadratic function in energy in each bin, such that the average in each bin
    corresponds to the file.

    Parameters:
        vign_file_path (str): Path to the vignetting file
        arf_file_path (str): Path to the ARF file
    """

    # Load vignetting file and assign arrays
    hdu_vign=fits.open(vign_file_path)
    vign=hdu_vign[1].data
    self.vign_E_centers = (vign['ENERG_LO'] + vign['ENERG_HI'])[0]/2
    self.vign_E_widths = (vign['ENERG_HI'] - vign['ENERG_LO'])[0]
    self.vign_E_bins = np.hstack((vign['ENERG_LO'][0], vign['ENERG_HI'][0][-1]))
    self.vignet_integre = np.array(vign['VIGNET'][0,0])
    self.theta = np.array(vign['THETA'][0])*60. #Arcminutes

    # Load arf data
    self.arf_file_path = arf_file_path
    hdu_arf = fits.open(arf_file_path)
    arf = hdu_arf[1].data
    self.arf_specresp = np.array(arf['SPECRESP'])
    self.E_arf = np.array(arf['ENERG_LO'] + arf['ENERG_HI'])/2

    self.vign_on_arf_bins = np.zeros(
    								(self.theta.shape[0],
                                	 self.E_arf.shape[0]
                                	)
    								)

    # For each offset angle in the vignetting file
    for i in range(self.theta.shape[0]):
        # Compute coefficients of the quadratic function
        coeffs = quadratic_from_bin_averages(self.vign_E_bins, 
                                             self.vignet_integre[i], 
                                             boundary = 'natural')

        # Compute quadratic function at offset angle
        self.vign_on_arf_bins[i] = eval_quadratic_piecewise(self.vign_E_bins, 
                                                       coeffs, 
                                                       self.E_arf) 

make_arfs_and_spectra(spectra_path, numproc=1)

Create one spectrum and one arf per bin in the binning. Parallelized over N processes.

Parameters:

Name Type Description Default
spectra_path str

Path to where the events and spectra should be stored

required
numproc int

Number of parallel processes

1
Source code in src/xifu_cluster_sim/spectra.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def make_arfs_and_spectra(self, 
                          spectra_path,
                          numproc = 1):

    """
    Create one spectrum and one arf per bin in the binning.
    Parallelized over N processes.

    Parameters:
        spectra_path (str): Path to where the events and spectra should be stored
        numproc (int): Number of parallel processes 
    """

    results = Parallel(n_jobs=numproc, 
                       prefer="threads",
                      batch_size = 'auto')(
                            delayed(
                                self.make_single_arf_and_spectrum
                                    )(bin_number, 
                                    spectra_path
                                    ) for bin_number in tqdm_notebook(np.arange(self.binning.nb_bins), 
                                                                      total = self.binning.nb_bins, 
                                                                      desc = 'Making arfs and spectra'
                                                                     )
                            )

make_single_arf_and_spectrum(bin_number, spectra_path)

Create a spectrum and arf for a given bin number.

Parameters:

Name Type Description Default
bin_number int

Bin number in the binning object

required
spectra_path str

Where to save the ARF, event file and spectrum that have been created

required
Source code in src/xifu_cluster_sim/spectra.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def make_single_arf_and_spectrum(self, 
                                 bin_number,
                                 spectra_path):
    """
    Create a spectrum and arf for a given bin number.

    Parameters:
        bin_number (int): Bin number in the binning object
        spectra_path (str): Where to save the ARF, event file and spectrum that have been created
    """

    pixels_in_region=self.binning.binning_dict[bin_number][0]
    interpolated_vignet = np.zeros_like(self.E_arf)
    counts_tot = 0
    for i in range(0, len(pixels_in_region)):

        # Real pixel number
        pix_num=pixels_in_region[i] 


        #Compute theta
        theta_pix= np.mean(self.evt_theta[self.evt_data['PIXID'] == pix_num])

        # Weight with counts
        counts_in_band=np.sum(self.evt_data['PIXID'] == pix_num)
        counts_tot += counts_in_band

        # Broadcast interpolation on axis of energies
        func_to_vmap = lambda vignet_pt : jnp.interp(theta_pix, self.theta, vignet_pt) 
        vmapped_interp = jax.vmap(func_to_vmap)
        interpolated_vignet += vmapped_interp(self.vign_on_arf_bins.T) * counts_in_band 

    interpolated_vignet /= counts_tot

    # Load and save modified ARF
    hdu_arf = fits.open(self.arf_file_path)
    hdu_arf[1].data['SPECRESP'] = self.arf_specresp * np.array(interpolated_vignet)
    hdu_arf.writeto(spectra_path + 'spec_{}.arf'.format(bin_number), overwrite=True)

    # Load and save modified event
    hdu_evt = fits.open(self.event_file_path)
    hdu_evt['EVENTS'].header['ANCRFILE']= spectra_path + '/spec_{}.arf'.format(bin_number)
    hdu_evt['EVENTS'].header['RESPFILE']= '/xifu/home/mola/XIFU_Sims_Turbulence_NewConfig/RMF.rmf'

    hdu_evt[1].data = self.evt_data[np.isin(self.evt_data['PIXID'],
                                                 pixels_in_region)]

    evt_filename = spectra_path + '/spec_{}.evt'.format(bin_number)
    hdu_evt.writeto(evt_filename, overwrite=True)

    #Make spectrum using makespec
    pha_filename = evt_filename.replace('.evt','.pha')
    arguments=["makespec",
               "EvtFile=%s" % evt_filename,
               "Spectrum=%s" % pha_filename,
               "clobber=T"]
    print("Calling:",arguments)
    subprocess.check_call(arguments)

    return None

eval_quadratic_piecewise(E_edges, coeffs, E_eval)

Evaluates a quadratic function, given its coefficients as computed by quadratic_from_bin_averages. In each bin, the function is \(\(f(x) = a x^2 + bx + c\)\)

Parameters:

Name Type Description Default
E_edges array

Edges of the bins used for the computation of the coefficients

required
coeffs array

Coefficients of the quadratic function

required
E_eval array

Points at which to compute the quadratic function

required
Source code in src/xifu_cluster_sim/spectra.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def eval_quadratic_piecewise(E_edges, coeffs, E_eval):
    r"""
    Evaluates a quadratic function, given its coefficients as computed by quadratic_from_bin_averages.
    In each bin, the function is 
    $$f(x) = a x^2 + bx + c$$

    Parameters:
        E_edges (np.array): Edges of the bins used for the computation of the coefficients
        coeffs (np.array): Coefficients of the quadratic function
        E_eval (np.array): Points at which to compute the quadratic function
    """


    # Make sure it's arrays
    E_edges = np.asarray(E_edges, dtype=float)
    E_eval = np.asarray(E_eval, dtype=float)
    vals = np.zeros_like(E_eval, dtype=float)


    # For each coefficient
    for i in range(len(coeffs)):
        a,b,c = coeffs[i]
        left = E_edges[i]; right = E_edges[i+1]
        mask = (E_eval >= left) & (E_eval <= right)
        if not np.any(mask):
            continue
        x = E_eval[mask] - left
        vals[mask] = a*x**2 + b*x + c
    vals[(E_eval < E_edges[0]) | (E_eval > E_edges[-1])] = np.nan
    return vals

quadratic_from_bin_averages(E_edges, V_row, boundary='natural', reg_eps=1e-12)

Build piecewise quadratic function per bin with exact bin averages and C1 continuity. This function is used to build a smooth function of vignetting that is not binned. It defines the right system of equations as a matrix multiplication and then solves it to obtain the coefficients of the quadratic function.

Parameters:

Name Type Description Default
E_edges array

Edges of the bins at which the averages are computed

required
V_row array

Average in each bin

required
boundary str

'zero_slope', 'natural', or 'extrapolate'. 'zero_slope': enforce derivative = 0 at both ends. 'natural' : enforce second derivative = 0 at both ends (a_0 = a_last = 0). 'extrapolate': set right-end derivative from trend of last bins (left derivative = 0)

'natural'
reg_eps float

Regularization term

1e-12
Source code in src/xifu_cluster_sim/spectra.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def quadratic_from_bin_averages(E_edges, V_row, boundary='natural', reg_eps=1e-12):
    """
    Build piecewise quadratic function per bin with exact bin averages and C1 continuity.
    This function is used to build a smooth function of vignetting that is not binned.
    It defines the right system of equations as a matrix multiplication and then solves it to obtain the coefficients of the quadratic function.

    Parameters:
        E_edges (np.array): Edges of the bins at which the averages are computed
        V_row (np.array): Average in each bin
        boundary (str): 'zero_slope', 'natural', or 'extrapolate'. 'zero_slope': enforce derivative = 0 at both ends. 'natural'   : enforce second derivative = 0 at both ends (a_0 = a_last = 0). 'extrapolate': set right-end derivative from trend of last bins (left derivative = 0)
        reg_eps (float): Regularization term
    """
    E_edges = np.asarray(E_edges, dtype=float)
    V_row = np.asarray(V_row, dtype=float)
    n_bins = len(V_row)
    n_unknowns = 3 * n_bins

    # prepare system
    A = np.zeros((n_unknowns, n_unknowns), dtype=float)
    rhs = np.zeros(n_unknowns, dtype=float)
    row_idx = 0

    def avg_basis(Δ):
        return np.array([Δ**2 / 3.0, Δ / 2.0, 1.0], dtype=float)

    # bin-average constraints
    for i in range(n_bins):
        Ei = E_edges[i]; Ej = E_edges[i+1]; Δ = Ej - Ei
        A[row_idx, 3*i:3*i+3] = avg_basis(Δ)
        rhs[row_idx] = V_row[i]
        row_idx += 1

    # continuity constraints (value + slope) between bins
    for i in range(n_bins - 1):
        Ei = E_edges[i]; Ej = E_edges[i+1]; Δ = Ej - Ei
        # value continuity: a_i*Δ^2 + b_i*Δ + c_i - c_{i+1} = 0
        A[row_idx, 3*i:3*i+3] = [Δ**2, Δ, 1.0]
        A[row_idx, 3*(i+1) + 2] = -1.0
        rhs[row_idx] = 0.0
        row_idx += 1
        # slope continuity: 2*a_i*Δ + b_i - b_{i+1} = 0
        A[row_idx, 3*i:3*i+3] = [2.0*Δ, 1.0, 0.0]
        A[row_idx, 3*(i+1) + 1] = -1.0
        rhs[row_idx] = 0.0
        row_idx += 1

    # boundary conditions
    if boundary == 'zero_slope':
        # left derivative b_0 = 0
        A[row_idx, 1] = 1.0
        rhs[row_idx] = 0.0
        row_idx += 1
        # right derivative at end: 2*a_last*Δ_last + b_last = 0
        Δ_last = E_edges[-1] - E_edges[-2]
        A[row_idx, -3:] = [2.0*Δ_last, 1.0, 0.0]  # FIXED
        rhs[row_idx] = 0.0
        row_idx += 1

    elif boundary == 'natural':
        # natural: second derivative = 0 at left and right -> a_0 = 0, a_last = 0
        A[row_idx, 0] = 1.0
        rhs[row_idx] = 0.0
        row_idx += 1
        A[row_idx, -3] = 1.0
        rhs[row_idx] = 0.0
        row_idx += 1

    elif boundary == 'extrapolate':
        # left derivative = 0
        A[row_idx, 1] = 1.0
        rhs[row_idx] = 0.0
        row_idx += 1
        # estimate slope at right
        centers = 0.5*(E_edges[:-1] + E_edges[1:])
        if len(centers) >= 2:
            slope_est = (V_row[-1] - V_row[-2]) / (centers[-1] - centers[-2])
        else:
            slope_est = 0.0
        Δ_last = E_edges[-1] - E_edges[-2]
        A[row_idx, -3:] = [2.0*Δ_last, 1.0, 0.0]  # FIXED
        rhs[row_idx] = slope_est
        row_idx += 1

    else:
        raise ValueError("boundary must be 'zero_slope', 'natural', or 'extrapolate'")

    assert row_idx == n_unknowns, f"constructed {row_idx} rows but need {n_unknowns}"

    # Solve robustly
    try:
        sol = np.linalg.solve(A, rhs)
    except np.linalg.LinAlgError:
        ATA = A.T @ A
        ATb = A.T @ rhs
        ATA_reg = ATA + reg_eps * np.eye(ATA.shape[0])
        try:
            sol = np.linalg.solve(ATA_reg, ATb)
        except np.linalg.LinAlgError:
            sol, *_ = np.linalg.lstsq(A, rhs, rcond=None)

    coeffs = sol.reshape(n_bins, 3)
    return coeffs