# -*- coding: utf-8 -*-
"""
UDP Protocol Multi-stream SPEAD2 receiver
"""
import asyncio
import logging
import spead2.recv.asyncio
from cbf_sdp import consumers
logger = logging.getLogger(__name__)
# TODO: Replace all the following code with the desired functionality for the package
[docs]class receiver:
"""
SPEAD2 receiver
This class uses the spead2 library to receive a multiple number of streams,
each using a single UDP reader. As heaps are received they are given to a
single consumer.
"""
def __init__(self, config, tm, loop):
assert 'reception' in config, "Configuration does not contain reception details"
assert 'transmission' in config,"Configuration does not contain transmission details"
self.loop = loop
self.consumer = consumers.create(config, tm)
channels_per_stream = int(config['transmission'].get('channels_per_stream', 0))
if channels_per_stream == 0:
channels_per_stream = tm.num_channels
num_streams = tm.num_channels // channels_per_stream
logger.info('Creating stream with %d UDP readers to receive data for %d channels',
num_streams, channels_per_stream)
self.streams = self._setup_streams(num_streams, config, loop)
def _setup_streams(self, num_streams, config, loop):
recv_port = int(config['reception'].get('receiver_port_start', 41000))
bind_hostname = config['reception'].get('bind_hostname', '')
ring_heaps = int(config['reception'].get('ring_heaps', 16))
receiver_threads = int(config['reception'].get('receiver_threads', 1))
io_thread_pool = spead2.ThreadPool(threads=receiver_threads)
streams = []
for i in range(num_streams):
stream = spead2.recv.asyncio.Stream(io_thread_pool,
ring_heaps=ring_heaps,
max_heaps=32, loop=loop)
port = recv_port + i
stream.add_udp_reader(port, bind_hostname=bind_hostname)
logger.info('Started udp_reader on port %d', port)
streams.append(stream)
return streams
[docs] async def run(self):
"""Receive all heaps, passing them to the consumer"""
self.num_heaps = 0
tasks = [self._process_stream_heaps(s) for s in self.streams]
await asyncio.gather(*tasks)
logger.info(f'Received {self.num_heaps} heaps')
async def _process_stream_heaps(self, stream):
async for heap in stream:
if heap.is_start_of_stream():
continue
self.num_heaps += 1
await self.consumer.consume(heap)