Source code for cbf_sdp.transmitters.spead2_transmitters

# -*- coding: utf-8 -*-
"""spead2_transmitters

        Class that manages transmission of a SPEAD2 HEAP via UDP and of a content
        defined by the payload class

"""
import asyncio
import logging
import math

import numpy as np
import spead2.send.asyncio

from cbf_sdp import icd


CORR_OUT_TYPE = [('TCI', 'i1'), ('FD', 'u1'), ('VIS', '<c8', 4)]
IS_SPEAD3 = int(spead2.__version__.split('.')[0]) == 3

logger = logging.getLogger(__name__)


def create_stream(thread_pool, target_host, port, config, loop):
    kwargs = {
        'thread_pool': thread_pool,
        'config': config,
    }
    if IS_SPEAD3:
        kwargs['endpoints'] = (target_host, port),
    else:
        kwargs['hostname'] = target_host
        kwargs['port'] = port
        kwargs['loop'] = loop
    return spead2.send.asyncio.UdpStream(**kwargs)

[docs]class Spead2SenderPayload(icd.Payload): """SPEAD2 payload following the CSP-SDP interface document""" def __init__(self, num_baselines=None, num_channels=None): super(Spead2SenderPayload, self).__init__() self._item_group = spead2.send.ItemGroup(flavour=spead2.Flavour(4, 64, 48, 0)) self._add_items(num_baselines, num_channels) self.baseline_count = num_baselines self.channel_count = num_channels def _add_items(self, num_baselines, num_channels): """ Adds all the items to the payload as defined by the ICD :param num_baselines: number of baselines int the HEAP - used for sizing :param num_channels: number of channels in the HEAP - used for sizing """ ig = self._item_group ig.add_item( id=0x6000, name='visibility_timestamp_count', description='', shape=tuple(), format=None, dtype='<u4', ) ig.add_item( id=0x6001, name='visibility_timestamp_fraction', description='', shape=tuple(), format=None, dtype='<u4', ) ig.add_item( id=0x6002, name='visibility_channel_id', description='', shape=tuple(), format=None, dtype='<u4', ) ig.add_item( id=0x6003, name='visibility_channel_count', description='', shape=tuple(), format=None, dtype='<u4', ) ig.add_item( id=0x6004, name='visibility_polarisation_id', description='', shape=tuple(), format=None, dtype='<u4', ) ig.add_item( id=0x6005, name='visibility_baseline_count', description='', shape=tuple(), format=None, dtype='<u4', ) ig.add_item( id=0x6006, name='visibility_phase_bin_id', description='', shape=tuple(), format=None, dtype='<u2', ) ig.add_item( id=0x6007, name='visibility_phase_bin_count', description='', shape=tuple(), format=None, dtype='<u2', ) ig.add_item( id=0x6008, name='scan_id', description='', shape=tuple(), format=None, dtype='<u8', ) ig.add_item( id=0x6009, name='visibility_hardware_id', description='', shape=tuple(), format=None, dtype='<u4', ) ig.add_item( id=0x600A, name='correlator_output_data', description='', shape=(num_channels, num_baselines,), dtype=CORR_OUT_TYPE, ) vis = np.zeros(shape=(num_channels, num_baselines), dtype=CORR_OUT_TYPE) ig['correlator_output_data'].value = vis def get_heap(self): ig = self._item_group corr_out_data = ig['correlator_output_data'].value ig['visibility_baseline_count'].value = self.baseline_count ig['visibility_channel_count'].value = self.channel_count ig['visibility_channel_id'].value = self.channel_id ig['visibility_hardware_id'].value = self.hardware_id ig['visibility_phase_bin_id'].value = self.phase_bin_id ig['visibility_phase_bin_count'].value = self.phase_bin_count ig['visibility_polarisation_id'].value = self.polarisation_id ig['scan_id'].value = self.scan_id ig['visibility_timestamp_count'].value = self.timestamp_count ig['visibility_timestamp_fraction'].value = self.timestamp_fraction if len(self.time_centroid_indices): corr_out_data['TCI'] = self.time_centroid_indices if len(self.correlated_data_fraction): corr_out_data['FD'] = self.correlated_data_fraction if len(self.visibilities): corr_out_data['VIS'] = self.visibilities return ig.get_heap(descriptors='all', data='all') def get_start_heap(self): return self._item_group.get_start() def get_end_heap(self): return self._item_group.get_end()
[docs]class transmitter(object): """ SPEAD2 transmitter This class uses the spead2 library to transmit visibilities over multiple spead2 streams. Each visiblity set given to this class' `send` method is broken down by channel range (depending on the configuration parameters), and each channel range is sent through a different stream. """ def __init__(self, config, num_baselines, num_chan, loop): self.config = config max_packet_size = int(config.get('max_packet_size', 1472)) logger.info( 'Creating StreamConfig with max_packet_size=%d', max_packet_size) self.stream_config = spead2.send.StreamConfig( max_packet_size=max_packet_size, rate=int(config.get('rate', 1024 * 1024 * 1024)), burst_size=10, max_heaps=1, ) self.channels_per_stream = int( config.get('channels_per_stream', 0)) self.sender_threads = int( config.get('sender_threads', 1)) self.num_streams = 0 # set on first call to send() self.bytes_sent = 0 self.streams = [] self._loop = loop self._start_heap_sent = False self._create_streams(num_baselines, num_chan) def _create_streams(self, num_baselines, num_channels): if self.channels_per_stream == 0: self.num_streams = 1 else: self.num_streams = math.ceil(num_channels / self.channels_per_stream) logger.info( 'Creating %d spead2 streams to send data for %d channels', self.num_streams, num_channels) # Each stream uses a separate ItemGroup because Heaps created out of # ItemGroups can point to memory held by the ItemGroup; and since we # want different heaps sent through each fo the streams we then need # independent ItemGroups self.payloads = [Spead2SenderPayload(num_baselines, self.channels_per_stream) for _ in range(self.num_streams)] # Create the streams; they still share a single I/O threadpool thread_pool = spead2.ThreadPool(threads=self.sender_threads) config = self.config target_host = config.get('target_host', '127.0.0.1') target_port = int(config.get('target_port_start', 41000)) for i in range(self.num_streams): port = target_port + i logger.info("Sending to %s:%d", target_host, port) stream = create_stream( thread_pool, target_host, port, self.stream_config, self._loop ) self.streams.append(stream) async def _send_heaps(self, heaps): assert(len(heaps) == len(self.streams)) send_operations = [] for heap, stream in zip(heaps, self.streams): send_operations.append(stream.async_send_heap(heap)) results = await asyncio.gather(*send_operations) self.bytes_sent += sum(results)
[docs] async def send(self, ts, ts_fraction, vis): """ Send a visibility set through all SPEAD2 streams :param ts: the integer part of the visibilities' timestamp :param ts_fraction: the fractional part of the visibilities' timestamp :param vis: the visibilities """ if not self._start_heap_sent: await self._send_heaps([payload.get_start_heap() for payload in self.payloads]) self._start_heap_sent = True logger.debug('Sending heaps to %d spead2 streams', len(self.streams)) heaps = [] assert(len(self.payloads) == len(self.streams)) for i, payload in enumerate(self.payloads): first_chan, last_chan = self.channels_per_stream * i, self.channels_per_stream * (i + 1) payload.timestamp_count = ts payload.timestamp_fraction = ts_fraction payload.visibilities = vis[first_chan:last_chan] payload.channel_id = first_chan payload.channel_count = self.channels_per_stream heaps.append(payload.get_heap()) await self._send_heaps(heaps)
[docs] async def close(self): """Sends the end-of-stream message""" await self._send_heaps([payload.get_end_heap() for payload in self.payloads])
async def __aenter__(self): return self async def __aexit__(self, ext_type, exc, tb): await self.close()