Source code for cbf_sdp.receivers.spead2_receivers

# -*- coding: utf-8 -*-
"""
UDP Protocol Multi-stream SPEAD2 receiver
"""
import asyncio
import logging

import spead2.recv.asyncio

from cbf_sdp import consumers


logger = logging.getLogger(__name__)


# TODO: Replace all the following code with the desired functionality for the package
[docs]class receiver: """ SPEAD2 receiver This class uses the spead2 library to receive a multiple number of streams, each using a single UDP reader. As heaps are received they are given to a single consumer. """ def __init__(self, config, tm, loop): assert 'reception' in config, "Configuration does not contain reception details" assert 'transmission' in config,"Configuration does not contain transmission details" self.loop = loop self.consumer = consumers.create(config, tm) channels_per_stream = int(config['transmission'].get('channels_per_stream', 0)) if channels_per_stream == 0: channels_per_stream = tm.num_channels num_streams = tm.num_channels // channels_per_stream logger.info('Creating stream with %d UDP readers to receive data for %d channels', num_streams, channels_per_stream) self.streams = self._setup_streams(num_streams, config, loop) def _setup_streams(self, num_streams, config, loop): recv_port = int(config['reception'].get('receiver_port_start', 41000)) bind_hostname = config['reception'].get('bind_hostname', '') ring_heaps = int(config['reception'].get('ring_heaps', 16)) receiver_threads = int(config['reception'].get('receiver_threads', 1)) io_thread_pool = spead2.ThreadPool(threads=receiver_threads) streams = [] for i in range(num_streams): stream = spead2.recv.asyncio.Stream(io_thread_pool, ring_heaps=ring_heaps, max_heaps=32, loop=loop) port = recv_port + i stream.add_udp_reader(port, bind_hostname=bind_hostname) logger.info('Started udp_reader on port %d', port) streams.append(stream) return streams
[docs] async def run(self): """Receive all heaps, passing them to the consumer""" self.num_heaps = 0 tasks = [self._process_stream_heaps(s) for s in self.streams] await asyncio.gather(*tasks) logger.info(f'Received {self.num_heaps} heaps')
async def _process_stream_heaps(self, stream): async for heap in stream: if heap.is_start_of_stream(): continue self.num_heaps += 1 await self.consumer.consume(heap)