Source code for cbf_sdp.receivers.spead2_receivers

# -*- coding: utf-8 -*-
"""
UDP Protocol Multi-stream SPEAD2 receiver
"""
import asyncio
import logging

import spead2.recv.asyncio

from cbf_sdp import consumers, icd


IS_SPEAD3 = int(spead2.__version__.split('.')[0]) == 3

logger = logging.getLogger(__name__)


def create_stream(io_thread_pool, ring_heaps, max_heaps, loop):
    if IS_SPEAD3:
        kwargs = {
            'config': spead2.recv.StreamConfig(max_heaps=max_heaps),
            'ring_config': spead2.recv.RingStreamConfig(heaps=ring_heaps)
        }
    else:
        kwargs = {
            'ring_heaps': ring_heaps,
            'max_heaps': max_heaps,
            'loop': loop
        }
    return spead2.recv.asyncio.Stream(io_thread_pool,  **kwargs)

[docs]class Spead2ReceiverPayload(icd.Payload): """A Payload that updates itself from data coming from spead2 heaps""" def __init__(self): super() self._item_group = spead2.ItemGroup() def update(self, heap): ig = self._item_group ig.update(heap) # We could perform here all kinds of checks to verify that the heap # conforms to the ICD standard, but we choose not to for the time being self.baseline_count = ig['visibility_baseline_count'].value self.channel_count = ig['visibility_channel_count'].value self.channel_id = ig['visibility_channel_id'].value self.hardware_id = ig['visibility_hardware_id'].value self.phase_bin_id = ig['visibility_phase_bin_id'].value self.phase_bin_count = ig['visibility_phase_bin_count'].value self.polarisation_id = ig['visibility_polarisation_id'].value self.scan_id = ig['scan_id'].value self.timestamp_count = ig['visibility_timestamp_count'].value self.timestamp_fraction = ig['visibility_timestamp_fraction'].value corr_out_data = ig['correlator_output_data'].value self.time_centroid_indices = corr_out_data['TCI'] self.correlated_data_fraction = corr_out_data['FD'] self.visibilities = corr_out_data['VIS']
# TODO: Replace all the following code with the desired functionality for the package
[docs]class receiver: """ SPEAD2 receiver This class uses the spead2 library to receive a multiple number of streams, each using a single UDP reader. As heaps are received they are given to a single consumer. """ def __init__(self, config, tm, loop): assert 'reception' in config, "Configuration does not contain reception details" assert 'transmission' in config,"Configuration does not contain transmission details" self.loop = loop self.consumer = consumers.create(config, tm) channels_per_stream = int(config['transmission'].get('channels_per_stream', 0)) if channels_per_stream == 0: channels_per_stream = tm.num_channels num_streams = tm.num_channels // channels_per_stream logger.info('Creating stream with %d UDP readers to receive data for %d channels', num_streams, channels_per_stream) self.streams = self._setup_streams(num_streams, config, loop) def _setup_streams(self, num_streams, config, loop): recv_port = int(config['reception'].get('receiver_port_start', 41000)) bind_hostname = config['reception'].get('bind_hostname', '') ring_heaps = int(config['reception'].get('ring_heaps', 16)) receiver_threads = int(config['reception'].get('receiver_threads', 1)) io_thread_pool = spead2.ThreadPool(threads=receiver_threads) streams = [] for i in range(num_streams): stream = create_stream(io_thread_pool, ring_heaps, 32, loop) port = recv_port + i stream.add_udp_reader(port, bind_hostname=bind_hostname) logger.info('Started udp_reader on port %d', port) streams.append((stream, Spead2ReceiverPayload())) return streams
[docs] async def run(self): """Receive all heaps, passing them to the consumer""" self.num_heaps = 0 tasks = [self._process_stream_heaps(s, payload) for s, payload in self.streams] await asyncio.gather(*tasks) logger.info(f'Received {self.num_heaps} heaps')
async def _process_stream_heaps(self, stream, payload): async for heap in stream: if heap.is_start_of_stream(): continue self.num_heaps += 1 payload.update(heap) await self.consumer.consume(payload)