Payload Consumers

Upon the reception and decoding of a payload, a receiver passes it to a consumer. Consumers are a simple mechanism for decoupling data reception from any further data processing.

The cbf-sdp-emulator package currently comes with 2 built-in consumers, but arbitrary consumers can be used as well. The mechanism used to choose a consumer can be found here (see the reception.consumer option).

mswriter

The mswriter consumer, as derived from its name, writes incoming payloads into a Measurement Set. If payloads are missing the resulting Measurement Set will still have the missing rows, but with invalid data.

plasma_writer

The plasma_writer consumer puts the incoming payloads into a shared plasma store using the sdp-dal-prototype. The sdp-dal-prototype implements an RPC-like API; from its standpoint our plasma_writer is a Caller, and the payload is written into plasma representing a remote method invocation. In order to demonstrate the full cycle of writing and reading data into/from plasma we also provide a corresponding Processor under cbf_sdp.plasma_processor.SimpleProcessor, which corresponds to the callee. Upon invocation, this processor takes incoming payloads and writes them into a Measurement Set, similarly to how mswrite does.

Adding Custom Consumers

Third-party consumers are also supported, which users can provide within their own code bases. Consumers are implemented as classes with the following signatures:

  • An __init__(config, tm) method for initialization. The config parameter contains the full receiver configuration dictionary, as loaded from its command-line and configuration file. The tm is an instance of cbf_sdp.utils.FakeTM containing most metadata about the observation.
  • An async def consume(self, payload) method for payload consumption. The payload parameter is an instance of cbf_sdp.icd.Payload. Note that this is a coroutine, so potentially long-running tasks should be spawned off using executors to avoid hanging the event loop.