# -*- coding: utf-8 -*-
"""
Primary send functions for ska-sdp-cbf-emulator
"""
import asyncio
import configparser
import logging
from pathlib import Path
import time
from typing import Union
from realtime.receive.core import icd, msutils
from cbf_sdp import transmitters
from realtime.receive.core.baseline_utils import baselines
logger = logging.getLogger(__name__)
def add_missing_config_groups(config):
if 'transmission' not in config:
config['transmission'] = {}
if 'reader' not in config:
config['reader'] = {}
if 'payload' not in config:
config['payload'] = {}
[docs]async def packetise(
config: configparser.ConfigParser,
ms: Union[msutils.MeasurementSet, Path, str]):
"""
Reads data off a Measurement Set and transmits it using the transmitter
specified in the configuration.
Uses the vis_reader get data from the measurement set then gives it to the
transmitter for packaging and transmission. This code is transmission
protocol agnostic.
"""
add_missing_config_groups(config)
if not isinstance(ms, msutils.MeasurementSet):
ms = msutils.MeasurementSet.open(str(ms))
num_stations = ms.num_stations
num_baselines = msutils.calc_baselines(ms)
if num_baselines == baselines(num_stations, False):
logger.warning('Baseline count indicates AUTO are not present')
elif num_baselines == baselines(num_stations, True):
logger.info('AUTOs present')
transmission = config['transmission']
start_chan = config['reader'].getint('start_chan', 0) # start channel to read
num_chan = config['reader'].getint('num_chan', 0) # number of constigeous channels to read, 0 -> all
num_repeats = config['reader'].getint('num_repeats', 1) # number of times to repeat data reading
num_timestamps = config['reader'].getint('num_timestamps', 0) # number of timestamps to read, 0 -> all
scan_id = transmission.getint('scan_id', 1) # the scan id to use for all payloads
chan_per_stream = transmission.getint('channels_per_stream', 0) # channels per stream, 0 -> 1 stream
time_interval = transmission.getfloat('time_interval', 0)
num_chan = msutils.clamp_num_chan(ms, start_chan, num_chan)
transmission['total_channels'] = str(num_chan)
logger.info(f'scan id : {scan_id}')
logger.info(f'no. stations : {num_stations}')
logger.info(f'no. baselines : {num_baselines}')
logger.info(f'no. channels : {num_chan}')
logger.info(f'first channel : {start_chan}')
logger.info(f'channels per stream : {chan_per_stream} (0 == all)')
logger.info(f'time interval : {time_interval} (0 == as per MS, <0 == fly through)')
logger.info(f'no. repeats : {num_repeats}')
if num_repeats <= 0:
raise ValueError(f'num_repeats must be > 0: {num_repeats}')
if num_timestamps == 1:
if num_repeats > 1:
raise ValueError(f'repeating a single timestamp is not recommended, increase timestamps to at least 2')
# Time interval calculations (fixed, MS-driven, none)
if time_interval > 0:
def intervals():
while True:
yield time_interval
elif time_interval == 0:
def intervals():
interval = 0
prev_vis_time = (yield)
yield 0
while True:
vis_time = (yield)
if (vis_time < prev_vis_time):
yield interval
else:
interval = vis_time - prev_vis_time
yield interval
prev_vis_time = vis_time
else:
def intervals():
while True:
yield 0
# prime coroutine-like generator
intervals = intervals()
next(intervals)
# Iterate over timesteps in the data
transmitter = await transmitters.create(transmission, num_baselines, num_chan)
start_time = time.time()
async with transmitter:
repeat_count = 0
for repeat_count in range(0, num_repeats):
# FIXME: the vis_reader is not reading the weights / flags and passing it on
vis_reader = msutils.vis_reader(ms,
start_chan=start_chan,
num_chan=num_chan,
num_timestamps=num_timestamps,
timestamp_offset=repeat_count)
prev_send_start = time.time()
async for vis_amps, ts, ts_fraction in vis_reader:
# Get interval value to emulate, adjust to remove runtime overhead
waiting_time = intervals.send(icd.icd_to_unix(ts, ts_fraction))
next(intervals)
if waiting_time > 0:
waiting_time -= (time.time() - prev_send_start)
if waiting_time > 0:
await asyncio.sleep(waiting_time)
prev_send_start = time.time()
await transmitter.send(scan_id, ts, ts_fraction, vis_amps)
# Print time taken.
duration = time.time() - start_time
data_size = transmitter.bytes_sent / 1024 / 1024
logger.info(
"Scan %s sent %.3f [MB] in %.3f [s] (%.3f [MB/s], %.3f [heaps/s])",
scan_id,
data_size, duration, (data_size / duration),
transmitter.heaps_sent / duration,
)
return transmitter.heaps_sent