Source code for cbf_sdp.utils

# -*- coding: utf-8 -*-
"""some utils to get information """

import asyncio
import bisect
import logging

import numpy as np
from oskar import measurement_set


logger = logging.getLogger(__name__)

def baselines(stations, autocorr = True):
    if (autocorr):
        return (stations * (stations + 1)) // 2
    else:
        return (stations * (stations - 1)) // 2





class DataObject(object):

    def __init__(self, time, uu, vv, ww, myinterval=1.0,myexposure=1.0):
        """Just holds the row elements we care about"""

        self.time = time
        self.uu = uu
        self.vv = vv
        self.ww = ww
        self.interval = myinterval
        self.exposure = myexposure


[docs]class FakeTM(object): """ TelescopeManager-like class that reads its model information from a Measurement Set. """ def __init__(self, ms): logger.info(f'Attempting to build model from {ms}') # Convert OSKAR's RuntimeError into an IOError try: myMS = measurement_set.MeasurementSet.open(ms, readonly=True) except RuntimeError: raise IOError() self.objects = [] # structure that holds the contents of the row we care about self.times = [] # structure that also holds the time. This will be easy to search that the structure of objects row_num = 0 try: mytime = myMS.read_column('TIME', row_num, 1) except: logger.error(f'Failed to get TIME from {ms}') nexttime = mytime while (nexttime == mytime): row_num = row_num + 1 try: nexttime = myMS.read_column('TIME', row_num, 1) except: logger.error(f'Failed to get TIME from {ms}') self._num_baselines = row_num self._is_autocorrelated = self._num_baselines != baselines(myMS.num_stations) self._freq_inc_hz = myMS.get_freq_inc_hz() self._freq_start_hz = myMS.freq_start_hz self._num_channels = myMS.num_channels self._num_pols = myMS.num_pols self._num_stations = myMS.num_stations self._phase_centre_ra_rad = myMS.phase_centre_ra_rad self._phase_centre_dec_rad = myMS.phase_centre_dec_rad row_num = 0 # reset uu = np.zeros(self.num_baselines, np.float32) vv = np.zeros_like(uu) ww = np.zeros_like(uu) while row_num < myMS.num_rows : mytime = myMS.read_column('TIME',row_num,1) myinterval = myMS.read_column('INTERVAL',row_num,1) myexposure = myMS.read_column('EXPOSURE',row_num,1) (uu,vv,ww) = myMS.read_coords(row_num,self.num_baselines) row = DataObject(mytime[0], uu, vv, ww, myinterval, myexposure) self.objects.append(row) self.times.append(mytime[0]) row_num = row_num + self.num_baselines
[docs] def get_num_channels(self): """The number of channels of the current observation""" return self._num_channels
[docs] def get_freq_start_hz(self): """The frequency of the first channel, in Hz""" return self._freq_start_hz
[docs] def get_freq_inc_hz(self): """The frequency increment between channels, in Hz""" return self._freq_inc_hz
[docs] def get_num_stations(self): """The number of stations used by the current observation""" return self._num_stations
[docs] def get_num_baselines(self): """The number of baselines used by the current observation""" return self._num_baselines
[docs] def get_is_autocorrelated(self): """Whether the current observation is used autocorrelation or not""" return self._is_autocorrelated
[docs] def get_num_pols(self): """The number of polarisations used by the current observation""" return self._num_pols
[docs] def get_phase_centre_radec_rad(self): """Return the RA/DEC phase centre in radians""" return self._phase_centre_ra_rad, self._phase_centre_dec_rad
num_channels = property(get_num_channels) freq_start_hz = property(get_freq_start_hz) freq_inc_hz = property(get_freq_inc_hz) num_stations = property(get_num_stations) num_baselines = property(get_num_baselines) is_autocorrelated = property(get_is_autocorrelated) num_pols = property(get_num_pols) phase_centre_radec_rad = property(get_phase_centre_radec_rad)
[docs] def get_nearest_data(self, time) -> DataObject : """ Returns the (meta)data associated with correlator dumps happening at a given point in time. If no exact match is found the nearest is returned. """ assert len(self.times) == len(self.objects) time_idx = bisect.bisect_left(self.times, time) if time_idx == len(self.times): raise ValueError return time_idx, self.objects[time_idx]
[docs] def get_matching_data(self, current_mjd_utc) -> DataObject : """ Like get_nearest_data, but if no exact match is found an `ValueError` exception is raised. """ time_idx, data = self.get_nearest_data(current_mjd_utc) if self.times[time_idx] != current_mjd_utc: raise ValueError return time_idx, data