# -*- coding: utf-8 -*-
"""some utils to get information """
import asyncio
import bisect
import logging
import numpy as np
from oskar import measurement_set
logger = logging.getLogger(__name__)
def baselines(stations, autocorr = True):
if (autocorr):
return (stations * (stations + 1)) // 2
else:
return (stations * (stations - 1)) // 2
class DataObject(object):
def __init__(self, time, uu, vv, ww, myinterval=1.0,myexposure=1.0):
"""Just holds the row elements we care about"""
self.time = time
self.uu = uu
self.vv = vv
self.ww = ww
self.interval = myinterval
self.exposure = myexposure
[docs]class FakeTM(object):
"""
TelescopeManager-like class that reads its model information from a
Measurement Set.
"""
def __init__(self, ms):
logger.info(f'Attempting to build model from {ms}')
# Convert OSKAR's RuntimeError into an IOError
try:
myMS = measurement_set.MeasurementSet.open(ms, readonly=True)
except RuntimeError:
raise IOError()
self.objects = [] # structure that holds the contents of the row we care about
self.times = [] # structure that also holds the time. This will be easy to search that the structure of objects
row_num = 0
try:
mytime = myMS.read_column('TIME', row_num, 1)
except:
logger.error(f'Failed to get TIME from {ms}')
nexttime = mytime
while (nexttime == mytime):
row_num = row_num + 1
try:
nexttime = myMS.read_column('TIME', row_num, 1)
except:
logger.error(f'Failed to get TIME from {ms}')
self._num_baselines = row_num
self._is_autocorrelated = self._num_baselines != baselines(myMS.num_stations)
self._freq_inc_hz = myMS.get_freq_inc_hz()
self._freq_start_hz = myMS.freq_start_hz
self._num_channels = myMS.num_channels
self._num_pols = myMS.num_pols
self._num_stations = myMS.num_stations
self._phase_centre_ra_rad = myMS.phase_centre_ra_rad
self._phase_centre_dec_rad = myMS.phase_centre_dec_rad
row_num = 0 # reset
uu = np.zeros(self.num_baselines, np.float32)
vv = np.zeros_like(uu)
ww = np.zeros_like(uu)
while row_num < myMS.num_rows :
mytime = myMS.read_column('TIME',row_num,1)
myinterval = myMS.read_column('INTERVAL',row_num,1)
myexposure = myMS.read_column('EXPOSURE',row_num,1)
(uu,vv,ww) = myMS.read_coords(row_num,self.num_baselines)
row = DataObject(mytime[0], uu, vv, ww, myinterval, myexposure)
self.objects.append(row)
self.times.append(mytime[0])
row_num = row_num + self.num_baselines
[docs] def get_num_channels(self):
"""The number of channels of the current observation"""
return self._num_channels
[docs] def get_freq_start_hz(self):
"""The frequency of the first channel, in Hz"""
return self._freq_start_hz
[docs] def get_freq_inc_hz(self):
"""The frequency increment between channels, in Hz"""
return self._freq_inc_hz
[docs] def get_num_stations(self):
"""The number of stations used by the current observation"""
return self._num_stations
[docs] def get_num_baselines(self):
"""The number of baselines used by the current observation"""
return self._num_baselines
[docs] def get_num_pols(self):
"""The number of polarisations used by the current observation"""
return self._num_pols
[docs] def get_phase_centre_radec_rad(self):
"""Return the RA/DEC phase centre in radians"""
return self._phase_centre_ra_rad, self._phase_centre_dec_rad
num_channels = property(get_num_channels)
freq_start_hz = property(get_freq_start_hz)
freq_inc_hz = property(get_freq_inc_hz)
num_stations = property(get_num_stations)
num_baselines = property(get_num_baselines)
is_autocorrelated = property(get_is_autocorrelated)
num_pols = property(get_num_pols)
phase_centre_radec_rad = property(get_phase_centre_radec_rad)
[docs] def get_nearest_data(self, time) -> DataObject :
"""
Returns the (meta)data associated with correlator dumps happening at a
given point in time. If no exact match is found the nearest is returned.
"""
assert len(self.times) == len(self.objects)
time_idx = bisect.bisect_left(self.times, time)
if time_idx == len(self.times):
raise ValueError
return time_idx, self.objects[time_idx]
[docs] def get_matching_data(self, current_mjd_utc) -> DataObject :
"""
Like get_nearest_data, but if no exact match is found an `ValueError`
exception is raised.
"""
time_idx, data = self.get_nearest_data(current_mjd_utc)
if self.times[time_idx] != current_mjd_utc:
raise ValueError
return time_idx, data