# -*- coding: utf-8 -*-
"""
UDP Protocol Multi-stream SPEAD2 receiver
"""
import asyncio
import logging
import spead2.recv.asyncio
from cbf_sdp import consumers, icd
IS_SPEAD3 = int(spead2.__version__.split('.')[0]) == 3
logger = logging.getLogger(__name__)
def create_stream(io_thread_pool, ring_heaps, max_heaps, loop):
if IS_SPEAD3:
kwargs = {
'config': spead2.recv.StreamConfig(max_heaps=max_heaps),
'ring_config': spead2.recv.RingStreamConfig(heaps=ring_heaps)
}
else:
kwargs = {
'ring_heaps': ring_heaps,
'max_heaps': max_heaps,
'loop': loop
}
return spead2.recv.asyncio.Stream(io_thread_pool, **kwargs)
[docs]class Spead2ReceiverPayload(icd.Payload):
"""A Payload that updates itself from data coming from spead2 heaps"""
def __init__(self):
super()
self._item_group = spead2.ItemGroup()
def update(self, heap):
ig = self._item_group
ig.update(heap)
# We could perform here all kinds of checks to verify that the heap
# conforms to the ICD standard, but we choose not to for the time being
self.baseline_count = ig['visibility_baseline_count'].value
self.channel_count = ig['visibility_channel_count'].value
self.channel_id = ig['visibility_channel_id'].value
self.hardware_id = ig['visibility_hardware_id'].value
self.phase_bin_id = ig['visibility_phase_bin_id'].value
self.phase_bin_count = ig['visibility_phase_bin_count'].value
self.polarisation_id = ig['visibility_polarisation_id'].value
self.scan_id = ig['scan_id'].value
self.timestamp_count = ig['visibility_timestamp_count'].value
self.timestamp_fraction = ig['visibility_timestamp_fraction'].value
corr_out_data = ig['correlator_output_data'].value
self.time_centroid_indices = corr_out_data['TCI']
self.correlated_data_fraction = corr_out_data['FD']
self.visibilities = corr_out_data['VIS']
# TODO: Replace all the following code with the desired functionality for the package
[docs]class receiver:
"""
SPEAD2 receiver
This class uses the spead2 library to receive a multiple number of streams,
each using a single UDP reader. As heaps are received they are given to a
single consumer.
"""
def __init__(self, config, tm, loop):
assert 'reception' in config, "Configuration does not contain reception details"
assert 'transmission' in config,"Configuration does not contain transmission details"
self.loop = loop
self.consumer = consumers.create(config, tm)
channels_per_stream = int(config['transmission'].get('channels_per_stream', 0))
if channels_per_stream == 0:
channels_per_stream = tm.num_channels
num_streams = tm.num_channels // channels_per_stream
logger.info('Creating stream with %d UDP readers to receive data for %d channels',
num_streams, channels_per_stream)
self.streams = self._setup_streams(num_streams, config, loop)
def _setup_streams(self, num_streams, config, loop):
recv_port = int(config['reception'].get('receiver_port_start', 41000))
bind_hostname = config['reception'].get('bind_hostname', '')
ring_heaps = int(config['reception'].get('ring_heaps', 16))
receiver_threads = int(config['reception'].get('receiver_threads', 1))
io_thread_pool = spead2.ThreadPool(threads=receiver_threads)
streams = []
for i in range(num_streams):
stream = create_stream(io_thread_pool, ring_heaps, 32, loop)
port = recv_port + i
stream.add_udp_reader(port, bind_hostname=bind_hostname)
logger.info('Started udp_reader on port %d', port)
streams.append((stream, Spead2ReceiverPayload()))
return streams
[docs] async def run(self):
"""Receive all heaps, passing them to the consumer"""
self.num_heaps = 0
tasks = [self._process_stream_heaps(s, payload) for s, payload in self.streams]
await asyncio.gather(*tasks)
logger.info(f'Received {self.num_heaps} heaps')
async def _process_stream_heaps(self, stream, payload):
async for heap in stream:
if heap.is_start_of_stream():
continue
self.num_heaps += 1
payload.update(heap)
await self.consumer.consume(payload)