Skip to content

data

xsb_fluc.data.cluster

Cluster

Data container for a cluster observation/mosaic as defined by pyproffit.

Attributes:

Name Type Description
img ndarray

Image data

exp ndarray

Exposure map

bkg ndarray

Background map

nh ndarray

Hydrogen column density map

wcs WCS

WCS object

header Header

Header object

degree_per_pixel Quantity

Angular size of a pixel in degrees

kpc_per_pixel Quantity

Angular size of a pixel in kpc

shape tuple

Shape of the image

x_c float

X coordinate of the cluster center

y_c float

Y coordinate of the cluster center

y_ref ndarray

Y coordinate of the image

x_ref ndarray

X coordinate of the image

coords SkyCoord

Coordinates of the image

z float

Redshift of the cluster

r_500 Quantity

Radius of the cluster at 500 times the critical density.

t_500 Quantity

Angular radius of the cluster at 500 times the critical density.

center SkyCoord

Coordinates of the cluster center

name str

Name of the cluster

imglink str

Path to the image file

explink str

Path to the exposure map file

bkglink str

Path to the background map file

cosmo FlatLambdaCDM

Cosmology used to compute the angular and physical sizes

regions Regions

Regions to exclude from the analysis

unmasked_img ndarray

Unmasked image data

unmasked_exposure ndarray

Unmasked exposure map

Source code in src/xsb_fluc/data/cluster.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
class Cluster:
    """
    Data container for a cluster observation/mosaic as defined by pyproffit.

    Attributes:
        img (np.ndarray): Image data
        exp (np.ndarray): Exposure map
        bkg (np.ndarray): Background map
        nh (np.ndarray): Hydrogen column density map
        wcs (astropy.wcs.WCS): WCS object
        header (astropy.io.fits.Header): Header object
        degree_per_pixel (astropy.units.Quantity): Angular size of a pixel in degrees
        kpc_per_pixel (astropy.units.Quantity): Angular size of a pixel in kpc
        shape (tuple): Shape of the image
        x_c (float): X coordinate of the cluster center
        y_c (float): Y coordinate of the cluster center
        y_ref (np.ndarray): Y coordinate of the image
        x_ref (np.ndarray): X coordinate of the image
        coords (astropy.coordinates.SkyCoord): Coordinates of the image
        z (float): Redshift of the cluster
        r_500 (astropy.units.Quantity): Radius of the cluster at 500 times the critical density.
        t_500 (astropy.units.Quantity): Angular radius of the cluster at 500 times the critical density.
        center (astropy.coordinates.SkyCoord): Coordinates of the cluster center
        name (str): Name of the cluster
        imglink (str): Path to the image file
        explink (str): Path to the exposure map file
        bkglink (str): Path to the background map file
        cosmo (astropy.cosmology.FlatLambdaCDM): Cosmology used to compute the angular and physical sizes
        regions (regions.Regions): Regions to exclude from the analysis
        unmasked_img (np.ndarray): Unmasked image data
        unmasked_exposure (np.ndarray): Unmasked exposure map
    """

    cosmo: Cosmology

    def __init__(self, imglink: str,
                 explink: str=None,
                 bkglink: str=None,
                 reglink: str=None,
                 nhlink: str=None,
                 redshift: float=0.,
                 r_500: u.Quantity|None=None,
                 t_500: u.Quantity|None=None,
                 ra: float|u.Quantity=None,
                 dec: float|u.Quantity=None,
                 name: str=None):
        r"""
        Constructor for the Cluster class.

        Parameters:
            imglink (str): Path to the image file
            explink (str): Path to the exposure map file
            bkglink (str): Path to the background map file
            redshift (float): Redshift of the cluster
            r_500 (astropy.units.Quantity): Radius of the cluster at 500 times the critical density.  Either r_500 or t_500 must be provided.
            t_500 (astropy.units.Quantity): Angular radius of the cluster at 500 times the critical density.  Either r_500 or t_500 must be provided.
            ra (float|astropy.units.Quantity): Right ascension of the cluster center
            dec (float|astropy.units.Quantity): Declination of the cluster center
            name (str): Name of the cluster
        """

        self.img = fits.getdata(imglink)
        self.imglink = imglink
        self.explink = explink
        self.bkglink = bkglink
        self.cosmo = FlatLambdaCDM(70, 0.3)
        self.z = redshift
        self.name = name

        if r_500 is not None:

            self.r_500 = r_500
            self.t_500 = (r_500/self.cosmo.kpc_proper_per_arcmin(self.z)).to(u.arcmin)

        elif t_500 is not None:

            self.t_500 = t_500
            self.r_500 = (t_500*self.cosmo.kpc_proper_per_arcmin(self.z)).to(u.kpc)

        self.center = SkyCoord(ra=ra, dec=dec, unit='degree')

        head = fits.getheader(imglink)
        self.header = head
        self.wcs = WCS(head, relax=False)

        if 'CDELT2' in head:
            self.degree_per_pixel = head['CDELT2'] * u.deg / u.pix
        elif 'CD2_2' in head:
            self.degree_per_pixel = head['CD2_2'] * u.deg / u.pix

        self.kpc_per_pixel = self.cosmo.kpc_proper_per_arcmin(self.z) * self.degree_per_pixel
        self.kpc_per_pixel = self.kpc_per_pixel.to(u.kpc / u.pixel)
        self.shape = self.img.shape
        self.exp = fits.getdata(explink, memmap=False) if explink is not None else np.ones(self.shape)
        self.bkg = fits.getdata(bkglink, memmap=False) if bkglink is not None else np.zeros(self.shape)

        self.exp *= self.kpc_per_pixel.value ** 2
        self.unmasked_img = copy.deepcopy(self.img)
        self.img *= (self.exp>0)

        self.x_c, self.y_c = self.center.to_pixel(self.wcs)
        self.y_ref, self.x_ref = np.indices(self.shape)
        self.coords = SkyCoord.from_pixel(self.x_ref, self.y_ref, self.wcs)

        self.load_nh(nhlink)
        self.region(reglink)

    @classmethod
    def from_catalog_row(cls, row):
        """
        DEPRECATED: Build a cluster object from a row in the X-COP catalog
        """

        name = row['NAME']

        imglink = os.path.join(config.DATA_PATH, f'XCOP/{name}/mosaic_{name.lower()}.fits.gz')
        explink = os.path.join(config.DATA_PATH, f'XCOP/{name}/mosaic_{name.lower()}_expo.fits.gz')
        bkglink = os.path.join(config.DATA_PATH, f'XCOP/{name}/mosaic_{name.lower()}_bkg.fits.gz')

        instance = cls(imglink,
                       explink=explink,
                       bkglink=bkglink,
                       redshift=row['REDSHIFT'],
                       r_500=row['R500_HSE'] * u.kpc,
                       ra=row['RA'],
                       dec=row['DEC'],
                       name=name)

        instance.region(os.path.join(config.DATA_PATH, f'XCOP/{name}/src_ps.reg'))
        instance.load_nh(os.path.join(config.DATA_PATH, f'XCOP/{name}/{name}_nh.fits'))

        return instance

    def region(self, region_file):
        """
        Filter out regions provided in an input DS9 region file

        Parameters:
            region_file (str): Path to region file. Accepted region file formats are fk5 and image.
        """

        regions = Regions.read(region_file)
        mask = np.ones(self.shape)*(self.exp>0.)

        for region in regions:

            if isinstance(region, SkyRegion):
                # Turn a sky region into a pixel region
                region = region.to_pixel(self.wcs)

            mask[region.to_mask().to_image(self.shape) > 0] = 0

        #print('Excluded %d sources' % (len(regions)))
        self.unmasked_exposure = copy.deepcopy(self.exp)
        self.img = self.img*mask
        self.exp = self.exp*mask
        self.regions = regions

    def load_nh(self, nh_file):
        """
        Load the hydrogen column density map

        Parameters:
            nh_file (str): Path to the hydrogen column density map file
        """

        nh = fits.getdata(nh_file)
        assert nh.shape == self.shape
        self.nh = nh

    def voronoi(self, voronoi_file, rebin_factor=5, exclusion=1, t_500_percent=1.):
        """
        Load the voronoi binning map. It must be produced using the vorbin package.
        See https://pypi.org/project/vorbin/ for more information and especially
        the scripts/voronoi.ipynb notebook for an example of how to generate the map.

        Rebin the data using the previously loaded voronoi map.
        It assumes that the Voronoi binning algorithm has been run on the data with a
        first rough 4x4 rebinning, which I used to accelerate the computation of maps.

        !!! danger
            This function contains a lot of hard-coded values that should be changed.
            It requires the user to precisely remember how the data was processed before
            using the `vorbin`package and try to applies the same to the untouched cluster
            data.

        Parameters:
            voronoi_file (str): Path to the voronoi map file
            rebin_factor (int): Rebinning factor
            exclusion (float): factor used in `reduce_to_r500`
            t_500_percent (float): Radius of the cluster in units of t_500 when selecting pixels

        Returns:
            cluster: A new Cluster object with the rebinned data.

        !!! warning
            This function does not act in place, and instead return a new object.
        """

        new_data = self.reduce_to_r500(exclusion).rebin(rebin_factor)

        new_data.voronoi = np.loadtxt(voronoi_file)

        indexes = (new_data.exp > 0) & (new_data.coords.separation(new_data.center) < new_data.t_500*t_500_percent)
        y_ref, x_ref = new_data.y_ref[indexes], new_data.x_ref[indexes]
        img = np.array(new_data.img[indexes].astype(int))
        exp = np.array(new_data.exp[indexes].astype(np.float32))

        img *= (exp > 0.)
        bkg = np.array(new_data.bkg[indexes].astype(np.float32))
        nH = np.array(new_data.nh[indexes].astype(np.float32))
        bin_number = np.array(new_data.voronoi.astype(int))

        unique_bin = np.unique(bin_number)
        x_ref_reduced = np.empty_like(unique_bin, dtype=np.float32)
        y_ref_reduced = np.empty_like(unique_bin, dtype=np.float32)
        img_reduced = np.empty_like(unique_bin)
        exp_reduced = np.empty_like(unique_bin, dtype=np.float32)
        bkg_reduced = np.empty_like(unique_bin)
        nH_reduced = np.empty_like(unique_bin, dtype=np.float32)

        for i, number in enumerate(unique_bin):
            bin_index = (bin_number == number)

            x_ref_reduced[i] = np.mean(x_ref[bin_index])
            y_ref_reduced[i] = np.mean(y_ref[bin_index])
            nH_reduced[i] = np.mean(nH[bin_index])
            img_reduced[i] = np.sum(img[bin_index])
            exp_reduced[i] = np.sum(exp[bin_index])
            bkg_reduced[i] = np.sum(bkg[bin_index])

        new_data.img = img_reduced
        new_data.exp = exp_reduced
        new_data.bkg = bkg_reduced
        new_data.nh = nH_reduced
        new_data.x_ref = x_ref_reduced
        new_data.y_ref = y_ref_reduced
        new_data.shape = unique_bin.shape

        return new_data

    def flatten(self, r_500_percent: float=1.):
        """
        Flatten the data in a 1D array and remove pixels outside the specified radius.
        It also removes pixels with no exposure.

        Parameters:
            r_500_percent (float): Radius of the cluster in units of r_500

        Returns:
            cluster: A new Cluster object with the flattened data.

        !!! warning
            This function does not act in place, and instead return a new object.
        """


        new_data = copy.deepcopy(self)
        index = (new_data.exp > 0) & (new_data.center.separation(new_data.coords) < new_data.t_500 * r_500_percent)
        new_data.img = new_data.img[index]
        new_data.exp = new_data.exp[index]
        new_data.bkg = new_data.bkg[index]
        new_data.nh = new_data.nh[index]
        new_data.shape = new_data.img.shape

        new_data.y_ref, new_data.x_ref = new_data.y_ref[index], new_data.x_ref[index]

        new_data.index = index 

        return new_data

    def rebin(self, factor: int):
        """
        Rebin the data by a factor of `factor`. It uses the astropy block_reduce function.

        Parameters:
            factor (int): Rebinning factor

        Returns:
            cluster: A new Cluster object with the rebinned data.

        !!! warning
            This function does not act in place, and instead return a new object.
        """

        def sum_reduce(vec, factor):

            return np.nan_to_num(block_reduce(np.where(self.exp > 0, vec, np.nan), factor))

        def sum_reduce_unmasked(vec, factor):

            return np.nan_to_num(block_reduce(np.where(self.unmasked_exposure > 0, vec, np.nan), factor))

        def mean_reduce(vec, factor):

            return np.nan_to_num(block_reduce(np.where(self.exp > 0, vec, np.nan), factor, np.mean))

        new_data = copy.deepcopy(self)
        new_data.wcs = new_data.wcs[::factor, ::factor]
        new_data.img = sum_reduce(self.img, factor)
        new_data.exp = sum_reduce(self.exp, factor)
        new_data.bkg = sum_reduce(self.bkg, factor)
        new_data.nh = mean_reduce(self.nh, factor)
        new_data.unmasked_img = sum_reduce_unmasked(self.unmasked_img, factor)
        new_data.unmasked_exposure = sum_reduce_unmasked(self.unmasked_exposure, factor)
        new_data.shape = new_data.img.shape
        new_data.kpc_per_pixel *= factor
        new_data.degree_per_pixel *= factor

        new_data.x_c, new_data.y_c = new_data.center.to_pixel(new_data.wcs)
        new_data.y_ref, new_data.x_ref = np.indices(new_data.shape)
        new_data.coords = SkyCoord.from_pixel(new_data.x_ref, new_data.y_ref, new_data.wcs)

        return new_data

    def reduce_fov(self, mean_model, r500_cut):
        """
        Reduce the field of view to a given radius in units of r_500, using the best-fit
        mean model, which includes ellipticity for the cluster shape.

        Parameters:
            mean_model (MeanModel): Best-fit mean model
            r500_cut (float): Radius in units of r_500

        Returns:
            cluster: A new Cluster object with the reduced field of view.

        !!! warning
            This function does not act in place, and instead return a new object.
        """


        rows = np.any(mean_model.rad<r500_cut, axis=1)
        cols = np.any(mean_model.rad<r500_cut, axis=0)
        rmin, rmax = np.where(rows)[0][[0, -1]]
        cmin, cmax = np.where(cols)[0][[0, -1]]


        new_data = copy.deepcopy(self)
        new_data.wcs = new_data.wcs[rmin:rmax, cmin:cmax]
        new_data.img = self.img[rmin:rmax, cmin:cmax]
        new_data.exp = self.exp[rmin:rmax, cmin:cmax]
        new_data.bkg = self.bkg[rmin:rmax, cmin:cmax]
        new_data.nh = self.nh[rmin:rmax, cmin:cmax]
        new_data.unmasked_exposure = self.unmasked_exposure[rmin:rmax, cmin:cmax]
        new_data.unmasked_img = self.unmasked_img[rmin:rmax, cmin:cmax]
        new_data.shape = new_data.img.shape

        new_data.x_c, new_data.y_c = new_data.center.to_pixel(new_data.wcs)
        new_data.y_ref, new_data.x_ref = np.indices(new_data.shape)
        new_data.coords = SkyCoord.from_pixel(new_data.x_ref, new_data.y_ref, new_data.wcs)

        return new_data

    def reduce_to_r500(self, r500_cut=1.):
        """
        Reduce the field of view to a given radius in units of r_500. This does not
        take into account the ellipticity of the cluster, and simply assumes a circular
        symetry and uses the first proposed center to compute the radius.

        Parameters:
            r500_cut (float): Radius in units of r_500

        Returns:
            cluster: A new Cluster object with the reduced field of view.

        !!! warning
            This function does not act in place, and instead return a new object.
        """

        valid = (self.exp>0)&(self.coords.separation(self.center) < self.t_500*r500_cut)
        rows = np.any(valid, axis=1)
        cols = np.any(valid, axis=0)
        rmin, rmax = np.where(rows)[0][[0, -1]]
        cmin, cmax = np.where(cols)[0][[0, -1]]


        new_data = copy.deepcopy(self)
        new_data.wcs = new_data.wcs[rmin:rmax, cmin:cmax]
        new_data.img = self.img[rmin:rmax, cmin:cmax]
        new_data.exp = self.exp[rmin:rmax, cmin:cmax]
        new_data.bkg = self.bkg[rmin:rmax, cmin:cmax]
        new_data.nh = self.nh[rmin:rmax, cmin:cmax]
        new_data.unmasked_exposure = self.unmasked_exposure[rmin:rmax, cmin:cmax]
        new_data.unmasked_img = self.unmasked_img[rmin:rmax, cmin:cmax]
        new_data.shape = new_data.img.shape

        new_data.x_c, new_data.y_c = new_data.center.to_pixel(new_data.wcs)
        new_data.y_ref, new_data.x_ref = np.indices(new_data.shape)
        new_data.coords = SkyCoord.from_pixel(new_data.x_ref, new_data.y_ref, new_data.wcs)

        return new_data

    def plot_cluster(self):
        """
        Helper function to plot the observation components
        """

        import matplotlib.pyplot as plt
        import cmasher as cmr
        from matplotlib.colors import LogNorm

        fig, axs = plt.subplots(
            figsize=(12, 5),
            nrows=1,
            ncols=3,
            subplot_kw={'projection': self.wcs}
        )

        img_plot = axs[0].imshow(
            np.where(self.exp > 0, self.img, np.nan),
            cmap=cmr.cosmic,
            norm=LogNorm(vmin=0.1)
        )

        exp_plot = axs[1].imshow(
            np.where(self.exp > 0, self.exp, np.nan),
            cmap=cmr.ember
        )

        bkg_plot = axs[2].imshow(
            np.where(self.exp > 0, self.bkg, np.nan),
            cmap=cmr.cosmic
        )

        plt.colorbar(
            mappable=img_plot,
            ax=axs[0],
            location='bottom',
            label="Image (Photons)"
        )

        plt.colorbar(
            mappable=exp_plot,
            ax=axs[1],
            location='bottom',
            label=r"Effective exposure (seconds $\times$ kpc$^2$)"
        )

        plt.colorbar(
            mappable=bkg_plot,
            ax=axs[2],
            location='bottom',
            label="Background (Photons)"
        )

        plt.tight_layout()

__init__(imglink, explink=None, bkglink=None, reglink=None, nhlink=None, redshift=0.0, r_500=None, t_500=None, ra=None, dec=None, name=None)

Constructor for the Cluster class.

Parameters:

Name Type Description Default
imglink str

Path to the image file

required
explink str

Path to the exposure map file

None
bkglink str

Path to the background map file

None
redshift float

Redshift of the cluster

0.0
r_500 Quantity

Radius of the cluster at 500 times the critical density. Either r_500 or t_500 must be provided.

None
t_500 Quantity

Angular radius of the cluster at 500 times the critical density. Either r_500 or t_500 must be provided.

None
ra float | Quantity

Right ascension of the cluster center

None
dec float | Quantity

Declination of the cluster center

None
name str

Name of the cluster

None
Source code in src/xsb_fluc/data/cluster.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(self, imglink: str,
             explink: str=None,
             bkglink: str=None,
             reglink: str=None,
             nhlink: str=None,
             redshift: float=0.,
             r_500: u.Quantity|None=None,
             t_500: u.Quantity|None=None,
             ra: float|u.Quantity=None,
             dec: float|u.Quantity=None,
             name: str=None):
    r"""
    Constructor for the Cluster class.

    Parameters:
        imglink (str): Path to the image file
        explink (str): Path to the exposure map file
        bkglink (str): Path to the background map file
        redshift (float): Redshift of the cluster
        r_500 (astropy.units.Quantity): Radius of the cluster at 500 times the critical density.  Either r_500 or t_500 must be provided.
        t_500 (astropy.units.Quantity): Angular radius of the cluster at 500 times the critical density.  Either r_500 or t_500 must be provided.
        ra (float|astropy.units.Quantity): Right ascension of the cluster center
        dec (float|astropy.units.Quantity): Declination of the cluster center
        name (str): Name of the cluster
    """

    self.img = fits.getdata(imglink)
    self.imglink = imglink
    self.explink = explink
    self.bkglink = bkglink
    self.cosmo = FlatLambdaCDM(70, 0.3)
    self.z = redshift
    self.name = name

    if r_500 is not None:

        self.r_500 = r_500
        self.t_500 = (r_500/self.cosmo.kpc_proper_per_arcmin(self.z)).to(u.arcmin)

    elif t_500 is not None:

        self.t_500 = t_500
        self.r_500 = (t_500*self.cosmo.kpc_proper_per_arcmin(self.z)).to(u.kpc)

    self.center = SkyCoord(ra=ra, dec=dec, unit='degree')

    head = fits.getheader(imglink)
    self.header = head
    self.wcs = WCS(head, relax=False)

    if 'CDELT2' in head:
        self.degree_per_pixel = head['CDELT2'] * u.deg / u.pix
    elif 'CD2_2' in head:
        self.degree_per_pixel = head['CD2_2'] * u.deg / u.pix

    self.kpc_per_pixel = self.cosmo.kpc_proper_per_arcmin(self.z) * self.degree_per_pixel
    self.kpc_per_pixel = self.kpc_per_pixel.to(u.kpc / u.pixel)
    self.shape = self.img.shape
    self.exp = fits.getdata(explink, memmap=False) if explink is not None else np.ones(self.shape)
    self.bkg = fits.getdata(bkglink, memmap=False) if bkglink is not None else np.zeros(self.shape)

    self.exp *= self.kpc_per_pixel.value ** 2
    self.unmasked_img = copy.deepcopy(self.img)
    self.img *= (self.exp>0)

    self.x_c, self.y_c = self.center.to_pixel(self.wcs)
    self.y_ref, self.x_ref = np.indices(self.shape)
    self.coords = SkyCoord.from_pixel(self.x_ref, self.y_ref, self.wcs)

    self.load_nh(nhlink)
    self.region(reglink)

flatten(r_500_percent=1.0)

Flatten the data in a 1D array and remove pixels outside the specified radius. It also removes pixels with no exposure.

Parameters:

Name Type Description Default
r_500_percent float

Radius of the cluster in units of r_500

1.0

Returns:

Name Type Description
cluster

A new Cluster object with the flattened data.

Warning

This function does not act in place, and instead return a new object.

Source code in src/xsb_fluc/data/cluster.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def flatten(self, r_500_percent: float=1.):
    """
    Flatten the data in a 1D array and remove pixels outside the specified radius.
    It also removes pixels with no exposure.

    Parameters:
        r_500_percent (float): Radius of the cluster in units of r_500

    Returns:
        cluster: A new Cluster object with the flattened data.

    !!! warning
        This function does not act in place, and instead return a new object.
    """


    new_data = copy.deepcopy(self)
    index = (new_data.exp > 0) & (new_data.center.separation(new_data.coords) < new_data.t_500 * r_500_percent)
    new_data.img = new_data.img[index]
    new_data.exp = new_data.exp[index]
    new_data.bkg = new_data.bkg[index]
    new_data.nh = new_data.nh[index]
    new_data.shape = new_data.img.shape

    new_data.y_ref, new_data.x_ref = new_data.y_ref[index], new_data.x_ref[index]

    new_data.index = index 

    return new_data

from_catalog_row(row) classmethod

DEPRECATED: Build a cluster object from a row in the X-COP catalog

Source code in src/xsb_fluc/data/cluster.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@classmethod
def from_catalog_row(cls, row):
    """
    DEPRECATED: Build a cluster object from a row in the X-COP catalog
    """

    name = row['NAME']

    imglink = os.path.join(config.DATA_PATH, f'XCOP/{name}/mosaic_{name.lower()}.fits.gz')
    explink = os.path.join(config.DATA_PATH, f'XCOP/{name}/mosaic_{name.lower()}_expo.fits.gz')
    bkglink = os.path.join(config.DATA_PATH, f'XCOP/{name}/mosaic_{name.lower()}_bkg.fits.gz')

    instance = cls(imglink,
                   explink=explink,
                   bkglink=bkglink,
                   redshift=row['REDSHIFT'],
                   r_500=row['R500_HSE'] * u.kpc,
                   ra=row['RA'],
                   dec=row['DEC'],
                   name=name)

    instance.region(os.path.join(config.DATA_PATH, f'XCOP/{name}/src_ps.reg'))
    instance.load_nh(os.path.join(config.DATA_PATH, f'XCOP/{name}/{name}_nh.fits'))

    return instance

load_nh(nh_file)

Load the hydrogen column density map

Parameters:

Name Type Description Default
nh_file str

Path to the hydrogen column density map file

required
Source code in src/xsb_fluc/data/cluster.py
175
176
177
178
179
180
181
182
183
184
185
def load_nh(self, nh_file):
    """
    Load the hydrogen column density map

    Parameters:
        nh_file (str): Path to the hydrogen column density map file
    """

    nh = fits.getdata(nh_file)
    assert nh.shape == self.shape
    self.nh = nh

plot_cluster()

Helper function to plot the observation components

Source code in src/xsb_fluc/data/cluster.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
def plot_cluster(self):
    """
    Helper function to plot the observation components
    """

    import matplotlib.pyplot as plt
    import cmasher as cmr
    from matplotlib.colors import LogNorm

    fig, axs = plt.subplots(
        figsize=(12, 5),
        nrows=1,
        ncols=3,
        subplot_kw={'projection': self.wcs}
    )

    img_plot = axs[0].imshow(
        np.where(self.exp > 0, self.img, np.nan),
        cmap=cmr.cosmic,
        norm=LogNorm(vmin=0.1)
    )

    exp_plot = axs[1].imshow(
        np.where(self.exp > 0, self.exp, np.nan),
        cmap=cmr.ember
    )

    bkg_plot = axs[2].imshow(
        np.where(self.exp > 0, self.bkg, np.nan),
        cmap=cmr.cosmic
    )

    plt.colorbar(
        mappable=img_plot,
        ax=axs[0],
        location='bottom',
        label="Image (Photons)"
    )

    plt.colorbar(
        mappable=exp_plot,
        ax=axs[1],
        location='bottom',
        label=r"Effective exposure (seconds $\times$ kpc$^2$)"
    )

    plt.colorbar(
        mappable=bkg_plot,
        ax=axs[2],
        location='bottom',
        label="Background (Photons)"
    )

    plt.tight_layout()

rebin(factor)

Rebin the data by a factor of factor. It uses the astropy block_reduce function.

Parameters:

Name Type Description Default
factor int

Rebinning factor

required

Returns:

Name Type Description
cluster

A new Cluster object with the rebinned data.

Warning

This function does not act in place, and instead return a new object.

Source code in src/xsb_fluc/data/cluster.py
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def rebin(self, factor: int):
    """
    Rebin the data by a factor of `factor`. It uses the astropy block_reduce function.

    Parameters:
        factor (int): Rebinning factor

    Returns:
        cluster: A new Cluster object with the rebinned data.

    !!! warning
        This function does not act in place, and instead return a new object.
    """

    def sum_reduce(vec, factor):

        return np.nan_to_num(block_reduce(np.where(self.exp > 0, vec, np.nan), factor))

    def sum_reduce_unmasked(vec, factor):

        return np.nan_to_num(block_reduce(np.where(self.unmasked_exposure > 0, vec, np.nan), factor))

    def mean_reduce(vec, factor):

        return np.nan_to_num(block_reduce(np.where(self.exp > 0, vec, np.nan), factor, np.mean))

    new_data = copy.deepcopy(self)
    new_data.wcs = new_data.wcs[::factor, ::factor]
    new_data.img = sum_reduce(self.img, factor)
    new_data.exp = sum_reduce(self.exp, factor)
    new_data.bkg = sum_reduce(self.bkg, factor)
    new_data.nh = mean_reduce(self.nh, factor)
    new_data.unmasked_img = sum_reduce_unmasked(self.unmasked_img, factor)
    new_data.unmasked_exposure = sum_reduce_unmasked(self.unmasked_exposure, factor)
    new_data.shape = new_data.img.shape
    new_data.kpc_per_pixel *= factor
    new_data.degree_per_pixel *= factor

    new_data.x_c, new_data.y_c = new_data.center.to_pixel(new_data.wcs)
    new_data.y_ref, new_data.x_ref = np.indices(new_data.shape)
    new_data.coords = SkyCoord.from_pixel(new_data.x_ref, new_data.y_ref, new_data.wcs)

    return new_data

reduce_fov(mean_model, r500_cut)

Reduce the field of view to a given radius in units of r_500, using the best-fit mean model, which includes ellipticity for the cluster shape.

Parameters:

Name Type Description Default
mean_model MeanModel

Best-fit mean model

required
r500_cut float

Radius in units of r_500

required

Returns:

Name Type Description
cluster

A new Cluster object with the reduced field of view.

Warning

This function does not act in place, and instead return a new object.

Source code in src/xsb_fluc/data/cluster.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
def reduce_fov(self, mean_model, r500_cut):
    """
    Reduce the field of view to a given radius in units of r_500, using the best-fit
    mean model, which includes ellipticity for the cluster shape.

    Parameters:
        mean_model (MeanModel): Best-fit mean model
        r500_cut (float): Radius in units of r_500

    Returns:
        cluster: A new Cluster object with the reduced field of view.

    !!! warning
        This function does not act in place, and instead return a new object.
    """


    rows = np.any(mean_model.rad<r500_cut, axis=1)
    cols = np.any(mean_model.rad<r500_cut, axis=0)
    rmin, rmax = np.where(rows)[0][[0, -1]]
    cmin, cmax = np.where(cols)[0][[0, -1]]


    new_data = copy.deepcopy(self)
    new_data.wcs = new_data.wcs[rmin:rmax, cmin:cmax]
    new_data.img = self.img[rmin:rmax, cmin:cmax]
    new_data.exp = self.exp[rmin:rmax, cmin:cmax]
    new_data.bkg = self.bkg[rmin:rmax, cmin:cmax]
    new_data.nh = self.nh[rmin:rmax, cmin:cmax]
    new_data.unmasked_exposure = self.unmasked_exposure[rmin:rmax, cmin:cmax]
    new_data.unmasked_img = self.unmasked_img[rmin:rmax, cmin:cmax]
    new_data.shape = new_data.img.shape

    new_data.x_c, new_data.y_c = new_data.center.to_pixel(new_data.wcs)
    new_data.y_ref, new_data.x_ref = np.indices(new_data.shape)
    new_data.coords = SkyCoord.from_pixel(new_data.x_ref, new_data.y_ref, new_data.wcs)

    return new_data

reduce_to_r500(r500_cut=1.0)

Reduce the field of view to a given radius in units of r_500. This does not take into account the ellipticity of the cluster, and simply assumes a circular symetry and uses the first proposed center to compute the radius.

Parameters:

Name Type Description Default
r500_cut float

Radius in units of r_500

1.0

Returns:

Name Type Description
cluster

A new Cluster object with the reduced field of view.

Warning

This function does not act in place, and instead return a new object.

Source code in src/xsb_fluc/data/cluster.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def reduce_to_r500(self, r500_cut=1.):
    """
    Reduce the field of view to a given radius in units of r_500. This does not
    take into account the ellipticity of the cluster, and simply assumes a circular
    symetry and uses the first proposed center to compute the radius.

    Parameters:
        r500_cut (float): Radius in units of r_500

    Returns:
        cluster: A new Cluster object with the reduced field of view.

    !!! warning
        This function does not act in place, and instead return a new object.
    """

    valid = (self.exp>0)&(self.coords.separation(self.center) < self.t_500*r500_cut)
    rows = np.any(valid, axis=1)
    cols = np.any(valid, axis=0)
    rmin, rmax = np.where(rows)[0][[0, -1]]
    cmin, cmax = np.where(cols)[0][[0, -1]]


    new_data = copy.deepcopy(self)
    new_data.wcs = new_data.wcs[rmin:rmax, cmin:cmax]
    new_data.img = self.img[rmin:rmax, cmin:cmax]
    new_data.exp = self.exp[rmin:rmax, cmin:cmax]
    new_data.bkg = self.bkg[rmin:rmax, cmin:cmax]
    new_data.nh = self.nh[rmin:rmax, cmin:cmax]
    new_data.unmasked_exposure = self.unmasked_exposure[rmin:rmax, cmin:cmax]
    new_data.unmasked_img = self.unmasked_img[rmin:rmax, cmin:cmax]
    new_data.shape = new_data.img.shape

    new_data.x_c, new_data.y_c = new_data.center.to_pixel(new_data.wcs)
    new_data.y_ref, new_data.x_ref = np.indices(new_data.shape)
    new_data.coords = SkyCoord.from_pixel(new_data.x_ref, new_data.y_ref, new_data.wcs)

    return new_data

region(region_file)

Filter out regions provided in an input DS9 region file

Parameters:

Name Type Description Default
region_file str

Path to region file. Accepted region file formats are fk5 and image.

required
Source code in src/xsb_fluc/data/cluster.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def region(self, region_file):
    """
    Filter out regions provided in an input DS9 region file

    Parameters:
        region_file (str): Path to region file. Accepted region file formats are fk5 and image.
    """

    regions = Regions.read(region_file)
    mask = np.ones(self.shape)*(self.exp>0.)

    for region in regions:

        if isinstance(region, SkyRegion):
            # Turn a sky region into a pixel region
            region = region.to_pixel(self.wcs)

        mask[region.to_mask().to_image(self.shape) > 0] = 0

    #print('Excluded %d sources' % (len(regions)))
    self.unmasked_exposure = copy.deepcopy(self.exp)
    self.img = self.img*mask
    self.exp = self.exp*mask
    self.regions = regions

voronoi(voronoi_file, rebin_factor=5, exclusion=1, t_500_percent=1.0)

Load the voronoi binning map. It must be produced using the vorbin package. See https://pypi.org/project/vorbin/ for more information and especially the scripts/voronoi.ipynb notebook for an example of how to generate the map.

Rebin the data using the previously loaded voronoi map. It assumes that the Voronoi binning algorithm has been run on the data with a first rough 4x4 rebinning, which I used to accelerate the computation of maps.

Danger

This function contains a lot of hard-coded values that should be changed. It requires the user to precisely remember how the data was processed before using the vorbinpackage and try to applies the same to the untouched cluster data.

Parameters:

Name Type Description Default
voronoi_file str

Path to the voronoi map file

required
rebin_factor int

Rebinning factor

5
exclusion float

factor used in reduce_to_r500

1
t_500_percent float

Radius of the cluster in units of t_500 when selecting pixels

1.0

Returns:

Name Type Description
cluster

A new Cluster object with the rebinned data.

Warning

This function does not act in place, and instead return a new object.

Source code in src/xsb_fluc/data/cluster.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def voronoi(self, voronoi_file, rebin_factor=5, exclusion=1, t_500_percent=1.):
    """
    Load the voronoi binning map. It must be produced using the vorbin package.
    See https://pypi.org/project/vorbin/ for more information and especially
    the scripts/voronoi.ipynb notebook for an example of how to generate the map.

    Rebin the data using the previously loaded voronoi map.
    It assumes that the Voronoi binning algorithm has been run on the data with a
    first rough 4x4 rebinning, which I used to accelerate the computation of maps.

    !!! danger
        This function contains a lot of hard-coded values that should be changed.
        It requires the user to precisely remember how the data was processed before
        using the `vorbin`package and try to applies the same to the untouched cluster
        data.

    Parameters:
        voronoi_file (str): Path to the voronoi map file
        rebin_factor (int): Rebinning factor
        exclusion (float): factor used in `reduce_to_r500`
        t_500_percent (float): Radius of the cluster in units of t_500 when selecting pixels

    Returns:
        cluster: A new Cluster object with the rebinned data.

    !!! warning
        This function does not act in place, and instead return a new object.
    """

    new_data = self.reduce_to_r500(exclusion).rebin(rebin_factor)

    new_data.voronoi = np.loadtxt(voronoi_file)

    indexes = (new_data.exp > 0) & (new_data.coords.separation(new_data.center) < new_data.t_500*t_500_percent)
    y_ref, x_ref = new_data.y_ref[indexes], new_data.x_ref[indexes]
    img = np.array(new_data.img[indexes].astype(int))
    exp = np.array(new_data.exp[indexes].astype(np.float32))

    img *= (exp > 0.)
    bkg = np.array(new_data.bkg[indexes].astype(np.float32))
    nH = np.array(new_data.nh[indexes].astype(np.float32))
    bin_number = np.array(new_data.voronoi.astype(int))

    unique_bin = np.unique(bin_number)
    x_ref_reduced = np.empty_like(unique_bin, dtype=np.float32)
    y_ref_reduced = np.empty_like(unique_bin, dtype=np.float32)
    img_reduced = np.empty_like(unique_bin)
    exp_reduced = np.empty_like(unique_bin, dtype=np.float32)
    bkg_reduced = np.empty_like(unique_bin)
    nH_reduced = np.empty_like(unique_bin, dtype=np.float32)

    for i, number in enumerate(unique_bin):
        bin_index = (bin_number == number)

        x_ref_reduced[i] = np.mean(x_ref[bin_index])
        y_ref_reduced[i] = np.mean(y_ref[bin_index])
        nH_reduced[i] = np.mean(nH[bin_index])
        img_reduced[i] = np.sum(img[bin_index])
        exp_reduced[i] = np.sum(exp[bin_index])
        bkg_reduced[i] = np.sum(bkg[bin_index])

    new_data.img = img_reduced
    new_data.exp = exp_reduced
    new_data.bkg = bkg_reduced
    new_data.nh = nH_reduced
    new_data.x_ref = x_ref_reduced
    new_data.y_ref = y_ref_reduced
    new_data.shape = unique_bin.shape

    return new_data

xsb_fluc.data.mean_model

MeanModel

Awfully coded class to handle the results of the mean model fitting.

Attributes:

Name Type Description
data

the cluster data.

mean_model

the mean model function.

radius

the radius function.

angle

the angle function.

inference_data

the inference data.

true_image

the true image.

posterior_median

the posterior median.

posterior_params

the posterior parameters.

ellipse_params

the ellipse parameters.

number_of_samples

the number of samples.

best_fit

the best fit image.

Source code in src/xsb_fluc/data/mean_model.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class MeanModel:
    """
    Awfully coded class to handle the results of the mean model fitting.

    Attributes:
        data: the cluster data.
        mean_model: the mean model function.
        radius: the radius function.
        angle: the angle function.
        inference_data: the inference data.
        true_image: the true image.
        posterior_median: the posterior median.
        posterior_params: the posterior parameters.
        ellipse_params: the ellipse parameters.
        number_of_samples: the number of samples.
        best_fit: the best fit image.
    """

    def __init__(self, data: Cluster, inference_data: az.InferenceData, model='all'):
        """
        Constructor for the `MeanModel` class.

        Parameters:
            data: the cluster data.
            inference_data: the inference data.
            model: (artefact of code from the X-COP paper).
        """

        self.data = data
        self.mean_model = jax.jit(hk.without_apply_rng(hk.transform(lambda : MockXrayCounts(data)())).apply)
        self.mean_model_unmasked = jax.jit(hk.without_apply_rng(hk.transform(lambda : MockXrayCountsUnmasked(data)())).apply)
        self.radius = hk.without_apply_rng(hk.transform(lambda x, y :EllipseRadius.from_data(data)(x, y))).apply
        self.angle = hk.without_apply_rng(hk.transform(lambda x, y :Angle.from_data(data)(x, y))).apply
        self.inference_data = inference_data
        self.true_image = jnp.array(data.img, dtype=jnp.float32)
        stacked = inference_data.posterior.stack(draws=("chain", "draw"))
        var_name = ['log_ne2', 'log_r_c', 'log_r_s', 'beta', 'epsilon', 'log_bkg','angle', 'eccentricity','x_c', 'y_c']
        self.posterior_median = self.inference_data.posterior.median()
        self.posterior_params = {name:jnp.asarray(stacked[name].values) for name in var_name}
        self.ellipse_params = {'ellipse_radius': 
                            {'angle': jnp.asarray(self.posterior_median['angle']),
                            'eccentricity': jnp.array(self.posterior_median['eccentricity']),
                            'x_c': jnp.asarray(self.posterior_median['x_c']),
                            'y_c': jnp.asarray(self.posterior_median['y_c'])}}

        self.number_of_samples = 10000
        self.best_fit = self.mean_model(inference_to_params(self.posterior_median))
        self.model_c = model

    @classmethod
    def from_data(cls, data: Cluster):
        """
        Load the mean model associated to a given cluster if it exists.
        """

        name = data.name
        posterior = az.from_netcdf(os.path.join(config.RESULTS_PATH, f'mean_model/{name}_all.posterior'))
        return cls(data, posterior)

    @property
    def best_fit_unmasked(self):
        return self.mean_model_unmasked(inference_to_params_unmasked(self.posterior_median))

    @property
    def rad(self):
        """
        Return best-fit radius for each pixel.
        """
        median = self.posterior_median
        ellipse_params = {'ellipse_radius': 
                            {'angle': jnp.asarray(median['angle']),
                            'eccentricity': jnp.array(median['eccentricity']),
                            'x_c': jnp.asarray(median['x_c']),
                            'y_c': jnp.asarray(median['y_c'])}}

        return self.radius(ellipse_params, self.data.x_ref, self.data.y_ref)


    @property
    def ang(self):
        """
        Return best-fit angle for each pixel.
        """
        median = self.posterior_median
        angle_params = {'angle':
                            {'x_c': jnp.asarray(median['x_c']),
                             'y_c': jnp.asarray(median['y_c']),
                             'angle': jnp.asarray(median['angle'])}}


        return self.angle(angle_params, self.data.x_ref, self.data.y_ref)

    @property
    def angle_sample(self):
        """
        Return a sample of angles for each pixel.
        """
        angle_params = {'angle':
                            {'x_c': self.posterior_params['x_c'],
                             'y_c': self.posterior_params['y_c'],
                             'angle': self.posterior_params['angle']}}

        def func(pars):
            return self.angle(pars, self.data.x_ref, self.data.y_ref)

        return jax.vmap(func)(angle_params)

    @property
    def rad_sample(self):
        """
        Return a sample of radii for each pixel.
        """
        ellipse_params = {'ellipse_radius':
                              {'angle': self.posterior_params['angle'],
                               'eccentricity': self.posterior_params['eccentricity'],
                               'x_c': self.posterior_params['x_c'],
                               'y_c': self.posterior_params['y_c']}}

        def func(pars):
            return self.radius(pars, self.data.x_ref, self.data.y_ref)

        return jax.vmap(func)(ellipse_params)


    @property
    def rad_circ_sample(self):

        ellipse_params = {'ellipse_radius':
                              {'angle': 0*self.posterior_params['angle'],
                               'eccentricity': 0.*self.posterior_params['eccentricity'],
                               'x_c': self.posterior_params['x_c'],
                               'y_c': self.posterior_params['y_c']}}

        def func(pars):
            return self.radius(pars, self.data.x_ref, self.data.y_ref)

        return jax.vmap(func)(ellipse_params)


    def compute_rad(self, x, y):
        median = self.posterior_median
        ellipse_params = {'ellipse_radius': 
                            {'angle': jnp.asarray(median['angle']),
                            'eccentricity': jnp.array(median['eccentricity']),
                            'x_c': jnp.asarray(median['x_c']),
                            'y_c': jnp.asarray(median['y_c'])}}

        return self.radius(ellipse_params, x, y)

    @property
    def fluctuation_absolute(self):
        """
        Return the absolute fluctuation map.
        """
        exp = jnp.asarray(self.data.exp)
        img = jnp.asarray(self.true_image)
        fit = jnp.asarray(self.best_fit)

        return jnp.where(exp>0., (img - fit)/(2*exp), 0.)

    @property
    def fluctuation_relative(self):
        """
        Return the relative fluctuation map.
        """
        return jnp.where(self.data.exp > 0., jnp.nan_to_num(jnp.abs(self.true_image)/jnp.abs(self.best_fit)), 0.)

    def power_spectrum_absolute(self, scales=np.geomspace(0.05, 1., 20),mask=None):
        """
        Compute the absolute power spectrum with a given mask.

        Parameters:
            scales: array of scales to compute the power spectrum.
            mask: mask to apply to the data.

        !!! warning
            This function might not work ?
        """

        power_spectrum = hk.without_apply_rng(hk.transform(lambda img: PowerSpectrum(self.data, mask=mask)(img, scales)))

        return power_spectrum.apply(None, self.fluctuation_absolute)

    def power_spectrum_relative(self, mask=None):
        """
        Compute the relative power spectrum with a given mask.

        Parameters:
            scales: array of scales to compute the power spectrum.
            mask: mask to apply to the data.

        !!! warning
            This function might not work ?
        """
        scales = np.geomspace(0.05, 1., 20)
        power_spectrum = hk.without_apply_rng(hk.transform(lambda img: PowerSpectrum(self.data, mask=mask)(img, scales)))

        return power_spectrum.apply(None, self.fluctuation_relative)

    def sample(self, size, freeze=False):

        if not freeze:

            samples = rng.choice(self.number_of_samples, size=size, replace=False)
            return {key: value[samples] for key, value in self.posterior_params.items()}

        else:

            return {key: jnp.median(value)*jnp.ones((size,)) for key, value in self.posterior_params.items()}

ang property

Return best-fit angle for each pixel.

angle_sample property

Return a sample of angles for each pixel.

fluctuation_absolute property

Return the absolute fluctuation map.

fluctuation_relative property

Return the relative fluctuation map.

rad property

Return best-fit radius for each pixel.

rad_sample property

Return a sample of radii for each pixel.

__init__(data, inference_data, model='all')

Constructor for the MeanModel class.

Parameters:

Name Type Description Default
data Cluster

the cluster data.

required
inference_data InferenceData

the inference data.

required
model

(artefact of code from the X-COP paper).

'all'
Source code in src/xsb_fluc/data/mean_model.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def __init__(self, data: Cluster, inference_data: az.InferenceData, model='all'):
    """
    Constructor for the `MeanModel` class.

    Parameters:
        data: the cluster data.
        inference_data: the inference data.
        model: (artefact of code from the X-COP paper).
    """

    self.data = data
    self.mean_model = jax.jit(hk.without_apply_rng(hk.transform(lambda : MockXrayCounts(data)())).apply)
    self.mean_model_unmasked = jax.jit(hk.without_apply_rng(hk.transform(lambda : MockXrayCountsUnmasked(data)())).apply)
    self.radius = hk.without_apply_rng(hk.transform(lambda x, y :EllipseRadius.from_data(data)(x, y))).apply
    self.angle = hk.without_apply_rng(hk.transform(lambda x, y :Angle.from_data(data)(x, y))).apply
    self.inference_data = inference_data
    self.true_image = jnp.array(data.img, dtype=jnp.float32)
    stacked = inference_data.posterior.stack(draws=("chain", "draw"))
    var_name = ['log_ne2', 'log_r_c', 'log_r_s', 'beta', 'epsilon', 'log_bkg','angle', 'eccentricity','x_c', 'y_c']
    self.posterior_median = self.inference_data.posterior.median()
    self.posterior_params = {name:jnp.asarray(stacked[name].values) for name in var_name}
    self.ellipse_params = {'ellipse_radius': 
                        {'angle': jnp.asarray(self.posterior_median['angle']),
                        'eccentricity': jnp.array(self.posterior_median['eccentricity']),
                        'x_c': jnp.asarray(self.posterior_median['x_c']),
                        'y_c': jnp.asarray(self.posterior_median['y_c'])}}

    self.number_of_samples = 10000
    self.best_fit = self.mean_model(inference_to_params(self.posterior_median))
    self.model_c = model

from_data(data) classmethod

Load the mean model associated to a given cluster if it exists.

Source code in src/xsb_fluc/data/mean_model.py
 99
100
101
102
103
104
105
106
107
@classmethod
def from_data(cls, data: Cluster):
    """
    Load the mean model associated to a given cluster if it exists.
    """

    name = data.name
    posterior = az.from_netcdf(os.path.join(config.RESULTS_PATH, f'mean_model/{name}_all.posterior'))
    return cls(data, posterior)

power_spectrum_absolute(scales=np.geomspace(0.05, 1.0, 20), mask=None)

Compute the absolute power spectrum with a given mask.

Parameters:

Name Type Description Default
scales

array of scales to compute the power spectrum.

geomspace(0.05, 1.0, 20)
mask

mask to apply to the data.

None

Warning

This function might not work ?

Source code in src/xsb_fluc/data/mean_model.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def power_spectrum_absolute(self, scales=np.geomspace(0.05, 1., 20),mask=None):
    """
    Compute the absolute power spectrum with a given mask.

    Parameters:
        scales: array of scales to compute the power spectrum.
        mask: mask to apply to the data.

    !!! warning
        This function might not work ?
    """

    power_spectrum = hk.without_apply_rng(hk.transform(lambda img: PowerSpectrum(self.data, mask=mask)(img, scales)))

    return power_spectrum.apply(None, self.fluctuation_absolute)

power_spectrum_relative(mask=None)

Compute the relative power spectrum with a given mask.

Parameters:

Name Type Description Default
scales

array of scales to compute the power spectrum.

required
mask

mask to apply to the data.

None

Warning

This function might not work ?

Source code in src/xsb_fluc/data/mean_model.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def power_spectrum_relative(self, mask=None):
    """
    Compute the relative power spectrum with a given mask.

    Parameters:
        scales: array of scales to compute the power spectrum.
        mask: mask to apply to the data.

    !!! warning
        This function might not work ?
    """
    scales = np.geomspace(0.05, 1., 20)
    power_spectrum = hk.without_apply_rng(hk.transform(lambda img: PowerSpectrum(self.data, mask=mask)(img, scales)))

    return power_spectrum.apply(None, self.fluctuation_relative)