# -*- coding: utf-8 -*-
"""spead2_transmitters
Class that manages transmission of a SPEAD2 HEAP via UDP and of a content
defined by the payload class
"""
import asyncio
import logging
import math
import numpy as np
import spead2.send.asyncio
from cbf_sdp import icd
CORR_OUT_TYPE = [('TCI', 'i1'), ('FD', 'u1'), ('VIS', '<c8', 4)]
IS_SPEAD3 = int(spead2.__version__.split('.')[0]) == 3
logger = logging.getLogger(__name__)
def create_stream(thread_pool, target_host, port, config, loop):
kwargs = {
'thread_pool': thread_pool,
'config': config,
}
if IS_SPEAD3:
kwargs['endpoints'] = (target_host, port),
else:
kwargs['hostname'] = target_host
kwargs['port'] = port
kwargs['loop'] = loop
return spead2.send.asyncio.UdpStream(**kwargs)
[docs]class Spead2SenderPayload(icd.Payload):
"""SPEAD2 payload following the CSP-SDP interface document"""
def __init__(self, num_baselines=None, num_channels=None):
super(Spead2SenderPayload, self).__init__()
self._item_group = spead2.send.ItemGroup(flavour=spead2.Flavour(4, 64, 48, 0))
self._add_items(num_baselines, num_channels)
self.baseline_count = num_baselines
self.channel_count = num_channels
def _add_items(self, num_baselines, num_channels):
"""
Adds all the items to the payload as defined by the ICD
:param num_baselines: number of baselines int the HEAP - used for sizing
:param num_channels: number of channels in the HEAP - used for sizing
"""
ig = self._item_group
ig.add_item(
id=0x6000,
name='visibility_timestamp_count',
description='',
shape=tuple(),
format=None,
dtype='<u4',
)
ig.add_item(
id=0x6001,
name='visibility_timestamp_fraction',
description='',
shape=tuple(),
format=None,
dtype='<u4',
)
ig.add_item(
id=0x6002,
name='visibility_channel_id',
description='',
shape=tuple(),
format=None,
dtype='<u4',
)
ig.add_item(
id=0x6003,
name='visibility_channel_count',
description='',
shape=tuple(),
format=None,
dtype='<u4',
)
ig.add_item(
id=0x6004,
name='visibility_polarisation_id',
description='',
shape=tuple(),
format=None,
dtype='<u4',
)
ig.add_item(
id=0x6005,
name='visibility_baseline_count',
description='',
shape=tuple(),
format=None,
dtype='<u4',
)
ig.add_item(
id=0x6006,
name='visibility_phase_bin_id',
description='',
shape=tuple(),
format=None,
dtype='<u2',
)
ig.add_item(
id=0x6007,
name='visibility_phase_bin_count',
description='',
shape=tuple(),
format=None,
dtype='<u2',
)
ig.add_item(
id=0x6008,
name='scan_id',
description='',
shape=tuple(),
format=None,
dtype='<u8',
)
ig.add_item(
id=0x6009,
name='visibility_hardware_id',
description='',
shape=tuple(),
format=None,
dtype='<u4',
)
ig.add_item(
id=0x600A,
name='correlator_output_data',
description='',
shape=(num_channels, num_baselines,),
dtype=CORR_OUT_TYPE,
)
vis = np.zeros(shape=(num_channels, num_baselines), dtype=CORR_OUT_TYPE)
ig['correlator_output_data'].value = vis
def get_heap(self):
ig = self._item_group
corr_out_data = ig['correlator_output_data'].value
ig['visibility_baseline_count'].value = self.baseline_count
ig['visibility_channel_count'].value = self.channel_count
ig['visibility_channel_id'].value = self.channel_id
ig['visibility_hardware_id'].value = self.hardware_id
ig['visibility_phase_bin_id'].value = self.phase_bin_id
ig['visibility_phase_bin_count'].value = self.phase_bin_count
ig['visibility_polarisation_id'].value = self.polarisation_id
ig['scan_id'].value = self.scan_id
ig['visibility_timestamp_count'].value = self.timestamp_count
ig['visibility_timestamp_fraction'].value = self.timestamp_fraction
if len(self.time_centroid_indices):
corr_out_data['TCI'] = self.time_centroid_indices
if len(self.correlated_data_fraction):
corr_out_data['FD'] = self.correlated_data_fraction
if len(self.visibilities):
corr_out_data['VIS'] = self.visibilities
return ig.get_heap(descriptors='all', data='all')
def get_start_heap(self):
return self._item_group.get_start()
def get_end_heap(self):
return self._item_group.get_end()
[docs]class transmitter(object):
"""
SPEAD2 transmitter
This class uses the spead2 library to transmit visibilities over multiple
spead2 streams. Each visiblity set given to this class' `send` method is
broken down by channel range (depending on the configuration parameters),
and each channel range is sent through a different stream.
"""
def __init__(self, config, num_baselines, num_chan, loop):
self.config = config
max_packet_size = int(config.get('max_packet_size', 1472))
logger.info(
'Creating StreamConfig with max_packet_size=%d',
max_packet_size)
self.stream_config = spead2.send.StreamConfig(
max_packet_size=max_packet_size,
rate=int(config.get('rate', 1024 * 1024 * 1024)),
burst_size=10,
max_heaps=1,
)
self.channels_per_stream = int(
config.get('channels_per_stream', 0))
self.sender_threads = int(
config.get('sender_threads', 1))
self.num_streams = 0 # set on first call to send()
self.bytes_sent = 0
self.streams = []
self._loop = loop
self._start_heap_sent = False
self._create_streams(num_baselines, num_chan)
def _create_streams(self, num_baselines, num_channels):
if self.channels_per_stream == 0:
self.num_streams = 1
else:
self.num_streams = math.ceil(num_channels / self.channels_per_stream)
logger.info(
'Creating %d spead2 streams to send data for %d channels',
self.num_streams,
num_channels)
# Each stream uses a separate ItemGroup because Heaps created out of
# ItemGroups can point to memory held by the ItemGroup; and since we
# want different heaps sent through each fo the streams we then need
# independent ItemGroups
self.payloads = [Spead2SenderPayload(num_baselines, self.channels_per_stream)
for _ in range(self.num_streams)]
# Create the streams; they still share a single I/O threadpool
thread_pool = spead2.ThreadPool(threads=self.sender_threads)
config = self.config
target_host = config.get('target_host', '127.0.0.1')
target_port = int(config.get('target_port_start', 41000))
for i in range(self.num_streams):
port = target_port + i
logger.info("Sending to %s:%d", target_host, port)
stream = create_stream(
thread_pool, target_host, port,
self.stream_config, self._loop
)
self.streams.append(stream)
async def _send_heaps(self, heaps):
assert(len(heaps) == len(self.streams))
send_operations = []
for heap, stream in zip(heaps, self.streams):
send_operations.append(stream.async_send_heap(heap))
results = await asyncio.gather(*send_operations)
self.bytes_sent += sum(results)
[docs] async def send(self, ts, ts_fraction, vis):
"""
Send a visibility set through all SPEAD2 streams
:param ts: the integer part of the visibilities' timestamp
:param ts_fraction: the fractional part of the visibilities' timestamp
:param vis: the visibilities
"""
if not self._start_heap_sent:
await self._send_heaps([payload.get_start_heap() for payload in self.payloads])
self._start_heap_sent = True
logger.debug('Sending heaps to %d spead2 streams', len(self.streams))
heaps = []
assert(len(self.payloads) == len(self.streams))
for i, payload in enumerate(self.payloads):
first_chan, last_chan = self.channels_per_stream * i, self.channels_per_stream * (i + 1)
payload.timestamp_count = ts
payload.timestamp_fraction = ts_fraction
payload.visibilities = vis[first_chan:last_chan]
payload.channel_id = first_chan
payload.channel_count = self.channels_per_stream
heaps.append(payload.get_heap())
await self._send_heaps(heaps)
[docs] async def close(self):
"""Sends the end-of-stream message"""
await self._send_heaps([payload.get_end_heap() for payload in self.payloads])
async def __aenter__(self):
return self
async def __aexit__(self, ext_type, exc, tb):
await self.close()