# -*- coding: utf-8 -*-
"""Default Emulator
Reads data from a MS and creates SPEAD2 packets.
This package is designed to be easily extensible to different payloads and protocols. This module is
currently installed as emu_send when the package is installed.
"""
import argparse
import asyncio
import configparser
import logging
import time
from oskar import measurement_set
import ska.logging
from cbf_sdp import transmitters, msutils
from cbf_sdp.utils import baselines
logger = logging.getLogger(__name__)
DEFAULT_CONFIG_FILE = 'packetiser.conf'
[docs]async def packetise(config, ms, loop=None):
"""
Reads data off a Measurement Set and transmits it using the transmitter
specified in the configuration.
Uses the vis_reader get data from the measurement set then gives it to the
transmitter for packaging and transmission. This code is transmission
protocol agnostic.
:param config: The configuration
:param ms: The measurement set
"""
loop = loop or asyncio.get_event_loop()
ms = measurement_set.MeasurementSet.open(ms, readonly=True)
num_stations = ms.num_stations
num_baselines = msutils.get_baselines_from_ms(ms)
if num_baselines == baselines(num_stations, False):
logger.warning('Baseline count indicates AUTO are not present')
elif num_baselines == baselines(num_stations, True):
logger.info('AUTOs present')
chan_per_stream = int(config['transmission'].get('channels_per_stream', 0))
start_chan = config['reader'].get('start_chan', 0)
num_chan = config['reader'].get('num_chan', 0)
num_chan = msutils.clamp_num_chan(ms, start_chan, num_chan)
config['transmission']['total_channels'] = str(num_chan)
logger.info(f'no. stations : {num_stations}')
logger.info(f'no. baselines : {num_baselines}')
logger.info(f'no. channels : {num_chan}')
logger.info(f'first channel : {start_chan}')
logger.info(f'channels per stream : {chan_per_stream} (0 == all)')
# Repeats
num_repeats = int(config['reader'].get('num_repeats', 1))
if num_repeats <= 0:
raise ValueError(f'num_repeats must be > 0: {num_repeats}')
# Iterate over timesteps in the data
transmitter = transmitters.create(config['transmission'], num_baselines, num_chan, loop)
start_time = time.time()
async with transmitter:
num_timestamps = int(config['reader'].get('num_timestamps', 0))
vis_reader = msutils.vis_reader(ms, start_chan=start_chan, num_chan=num_chan,
num_timestamps=num_timestamps)
async for vis_amps, ts, ts_fraction in vis_reader:
repeat_count = 0
while repeat_count < num_repeats:
repeat_count += 1
await transmitter.send(ts, ts_fraction, vis_amps)
# Print time taken.
duration = time.time() - start_time
data_size = transmitter.bytes_sent / 1024 / 1024
logger.info(
"Sent %.3f MB in %.3f sec (%.3f MB/sec)",
data_size, duration, (data_size / duration),
)
def _config_parser(f):
config_parser = configparser.ConfigParser()
config_parser.read(f)
if 'transmission' not in config_parser:
config_parser['transmission'] = {}
if 'reader' not in config_parser:
config_parser['reader'] = {}
if 'payload' not in config_parser:
config_parser['payload'] = {}
return config_parser
def _augment_config(config, options):
for opt in options:
name, value = opt.split('=')
category, name = name.split('.')
if category not in config:
config[category] = {}
config[category][name] = value
def main():
parser = argparse.ArgumentParser(
description="Creates SPEAD2 heaps out of a MS file"
)
parser.add_argument(
"-c",
'--config',
help="The configuration file to load, defaults to %s" % DEFAULT_CONFIG_FILE,
default=DEFAULT_CONFIG_FILE,
type=_config_parser,
)
parser.add_argument(
"-o",
"--option",
help="Additional configuration options in the form of category.name=value",
action='append'
)
parser.add_argument(
"-v",
"--verbose",
help="If set, more verbose output will be produced",
action="store_true"
)
parser.add_argument('measurement_set', help="The measurement set to read data from")
args = parser.parse_args()
logging_level = logging.DEBUG if args.verbose else logging.INFO
ska.logging.configure_logging(level=logging_level)
config = args.config
if (args.option):
_augment_config(config, args.option)
loop = asyncio.get_event_loop()
loop.run_until_complete(
packetise(config, args.measurement_set, loop))
if __name__ == '__main__':
main()