Source code for cbf_sdp.packetiser

# -*- coding: utf-8 -*-
"""Default Emulator

   Reads data from a MS and creates SPEAD2 packets.

   This package is designed to be easily extensible to different payloads and protocols. This module is
   currently installed as emu_send when the package is installed.

"""

import argparse
import asyncio
import configparser
import logging
import time

from oskar import measurement_set
import ska.logging

from cbf_sdp import transmitters, msutils
from cbf_sdp.utils import baselines


logger = logging.getLogger(__name__)
DEFAULT_CONFIG_FILE = 'packetiser.conf'


[docs]async def packetise(config, ms, loop=None): """ Reads data off a Measurement Set and transmits it using the transmitter specified in the configuration. Uses the vis_reader get data from the measurement set then gives it to the transmitter for packaging and transmission. This code is transmission protocol agnostic. :param config: The configuration :param ms: The measurement set """ loop = loop or asyncio.get_event_loop() ms = measurement_set.MeasurementSet.open(ms, readonly=True) num_stations = ms.num_stations num_baselines = msutils.get_baselines_from_ms(ms) if num_baselines == baselines(num_stations, False): logger.warning('Baseline count indicates AUTO are not present') elif num_baselines == baselines(num_stations, True): logger.info('AUTOs present') chan_per_stream = int(config['transmission'].get('channels_per_stream', 0)) start_chan = config['reader'].get('start_chan', 0) num_chan = config['reader'].get('num_chan', 0) num_chan = msutils.clamp_num_chan(ms, start_chan, num_chan) config['transmission']['total_channels'] = str(num_chan) logger.info(f'no. stations : {num_stations}') logger.info(f'no. baselines : {num_baselines}') logger.info(f'no. channels : {num_chan}') logger.info(f'first channel : {start_chan}') logger.info(f'channels per stream : {chan_per_stream} (0 == all)') # Repeats num_repeats = int(config['reader'].get('num_repeats', 1)) if num_repeats <= 0: raise ValueError(f'num_repeats must be > 0: {num_repeats}') # Iterate over timesteps in the data transmitter = transmitters.create(config['transmission'], num_baselines, num_chan, loop) start_time = time.time() async with transmitter: num_timestamps = int(config['reader'].get('num_timestamps', 0)) vis_reader = msutils.vis_reader(ms, start_chan=start_chan, num_chan=num_chan, num_timestamps=num_timestamps) async for vis_amps, ts, ts_fraction in vis_reader: repeat_count = 0 while repeat_count < num_repeats: repeat_count += 1 await transmitter.send(ts, ts_fraction, vis_amps) # Print time taken. duration = time.time() - start_time data_size = transmitter.bytes_sent / 1024 / 1024 logger.info( "Sent %.3f MB in %.3f sec (%.3f MB/sec)", data_size, duration, (data_size / duration), )
def _config_parser(f): config_parser = configparser.ConfigParser() config_parser.read(f) if 'transmission' not in config_parser: config_parser['transmission'] = {} if 'reader' not in config_parser: config_parser['reader'] = {} if 'payload' not in config_parser: config_parser['payload'] = {} return config_parser def _augment_config(config, options): for opt in options: name, value = opt.split('=') category, name = name.split('.') if category not in config: config[category] = {} config[category][name] = value def main(): parser = argparse.ArgumentParser( description="Creates SPEAD2 heaps out of a MS file" ) parser.add_argument( "-c", '--config', help="The configuration file to load, defaults to %s" % DEFAULT_CONFIG_FILE, default=DEFAULT_CONFIG_FILE, type=_config_parser, ) parser.add_argument( "-o", "--option", help="Additional configuration options in the form of category.name=value", action='append' ) parser.add_argument( "-v", "--verbose", help="If set, more verbose output will be produced", action="store_true" ) parser.add_argument('measurement_set', help="The measurement set to read data from") args = parser.parse_args() logging_level = logging.DEBUG if args.verbose else logging.INFO ska.logging.configure_logging(level=logging_level) config = args.config if (args.option): _augment_config(config, args.option) loop = asyncio.get_event_loop() loop.run_until_complete( packetise(config, args.measurement_set, loop)) if __name__ == '__main__': main()