Source code for cbf_sdp.packetiser

# -*- coding: utf-8 -*-
"""
Primary send functions for ska-sdp-cbf-emulator
"""

import asyncio
import configparser
import logging
from pathlib import Path
import time
from typing import Union


from realtime.receive.core import icd, msutils
from cbf_sdp import transmitters
from realtime.receive.core.baseline_utils import baselines

logger = logging.getLogger(__name__)


def add_missing_config_groups(config):
    if 'transmission' not in config:
        config['transmission'] = {}
    if 'reader' not in config:
        config['reader'] = {}
    if 'payload' not in config:
        config['payload'] = {}


[docs]async def packetise( config: configparser.ConfigParser, ms: Union[msutils.MeasurementSet, Path, str]): """ Reads data off a Measurement Set and transmits it using the transmitter specified in the configuration. Uses the vis_reader get data from the measurement set then gives it to the transmitter for packaging and transmission. This code is transmission protocol agnostic. """ add_missing_config_groups(config) if not isinstance(ms, msutils.MeasurementSet): ms = msutils.MeasurementSet.open(str(ms)) num_stations = ms.num_stations num_baselines = msutils.calc_baselines(ms) if num_baselines == baselines(num_stations, False): logger.warning('Baseline count indicates AUTO are not present') elif num_baselines == baselines(num_stations, True): logger.info('AUTOs present') transmission = config['transmission'] start_chan = config['reader'].getint('start_chan', 0) # start channel to read num_chan = config['reader'].getint('num_chan', 0) # number of constigeous channels to read, 0 -> all num_repeats = config['reader'].getint('num_repeats', 1) # number of times to repeat data reading num_timestamps = config['reader'].getint('num_timestamps', 0) # number of timestamps to read, 0 -> all scan_id = transmission.getint('scan_id', 1) # the scan id to use for all payloads chan_per_stream = transmission.getint('channels_per_stream', 0) # channels per stream, 0 -> 1 stream time_interval = transmission.getfloat('time_interval', 0) num_chan = msutils.clamp_num_chan(ms, start_chan, num_chan) transmission['total_channels'] = str(num_chan) logger.info(f'scan id : {scan_id}') logger.info(f'no. stations : {num_stations}') logger.info(f'no. baselines : {num_baselines}') logger.info(f'no. channels : {num_chan}') logger.info(f'first channel : {start_chan}') logger.info(f'channels per stream : {chan_per_stream} (0 == all)') logger.info(f'time interval : {time_interval} (0 == as per MS, <0 == fly through)') logger.info(f'no. repeats : {num_repeats}') if num_repeats <= 0: raise ValueError(f'num_repeats must be > 0: {num_repeats}') if num_timestamps == 1: if num_repeats > 1: raise ValueError(f'repeating a single timestamp is not recommended, increase timestamps to at least 2') # Time interval calculations (fixed, MS-driven, none) if time_interval > 0: def intervals(): while True: yield time_interval elif time_interval == 0: def intervals(): interval = 0 prev_vis_time = (yield) yield 0 while True: vis_time = (yield) if (vis_time < prev_vis_time): yield interval else: interval = vis_time - prev_vis_time yield interval prev_vis_time = vis_time else: def intervals(): while True: yield 0 # prime coroutine-like generator intervals = intervals() next(intervals) # Iterate over timesteps in the data transmitter = await transmitters.create(transmission, num_baselines, num_chan) start_time = time.time() async with transmitter: repeat_count = 0 for repeat_count in range(0, num_repeats): # FIXME: the vis_reader is not reading the weights / flags and passing it on vis_reader = msutils.vis_reader(ms, start_chan=start_chan, num_chan=num_chan, num_timestamps=num_timestamps, timestamp_offset=repeat_count) prev_send_start = time.time() async for vis_amps, ts, ts_fraction in vis_reader: # Get interval value to emulate, adjust to remove runtime overhead waiting_time = intervals.send(icd.icd_to_unix(ts, ts_fraction)) next(intervals) if waiting_time > 0: waiting_time -= (time.time() - prev_send_start) if waiting_time > 0: await asyncio.sleep(waiting_time) prev_send_start = time.time() await transmitter.send(scan_id, ts, ts_fraction, vis_amps) # Print time taken. duration = time.time() - start_time data_size = transmitter.bytes_sent / 1024 / 1024 logger.info( "Scan %s sent %.3f [MB] in %.3f [s] (%.3f [MB/s], %.3f [heaps/s])", scan_id, data_size, duration, (data_size / duration), transmitter.heaps_sent / duration, ) return transmitter.heaps_sent