saturnin.lib.data.onepipe

Saturnin base classes for data provider and consumer microservices using the FBDP protocol.

This module provides abstract base classes (BaseDataPipeMicro, DataProviderMicro, DataConsumerMicro) and their associated configurations (BaseDataPipeConfig, DataProviderConfig, DataConsumerConfig) to simplify the creation of microservices that act as either producers or consumers of data over a Saturnin data pipe. These classes handle much of the common boilerplate for FBDP communication, allowing developers to focus on the specific data handling logic.

Globals

saturnin.lib.data.onepipe.PIPE_CHN: Final[str] = 'pipe'

Channel & endpoint name

Classes

class saturnin.lib.data.onepipe.BaseDataPipeConfig(name: str)[source]

Bases: ComponentConfig

Base data provider/consumer microservice configuration.

Parameters:

name (str)

validate() None[source]

Extended validation.

Return type:

None

batch_size: IntOption

Data batch size

pipe: StrOption

Data Pipe Identification

pipe_address: ZMQAddressOption

Data Pipe endpoint address

pipe_format: MIMEOption

Pipe data format specification

pipe_mode: EnumOption

Data Pipe Mode

ready_schedule_interval: IntOption

READY message schedule interval in milliseconds

stop_on_close: BoolOption

Stop service when pipe is closed

class saturnin.lib.data.onepipe.DataProviderConfig(name: str)[source]

Bases: BaseDataPipeConfig

Base data provider microservice configuration.

Parameters:

name (str)

class saturnin.lib.data.onepipe.DataConsumerConfig(name: str)[source]

Bases: BaseDataPipeConfig

Base data consumer microservice configuration.

Parameters:

name (str)

class saturnin.lib.data.onepipe.BaseDataPipeMicro(*args, **kwargs)[source]

Bases: MicroService

Base data provider/consumer microservice.

Abstract base class for both providers and consumers. It handles the common FBDP logic.

Descendant classes should override:

Parameters:
  • zmq_context – ZeroMQ Context.

  • descriptor – Service descriptor.

  • peer_uid – Peer ID, None means that newly generated UUID type 1 should be used.

aquire_resources() None[source]

Acquire resources required by the component.

Specifically:

If pipe_mode is CONNECT, it connects to the data pipe endpoint and initiates the FBDP OPEN handshake. The client socket type (INPUT/OUTPUT) is determined as the inverse of server_socket.

Return type:

None

handle_accept_client(channel: Channel, session: FBDPSession) None[source]

Event handler executed when client connects to the data pipe via OPEN message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

The session attributes pipe, socket, data_format and params contain information sent by client, and the event handler validates the request.

If request should be rejected, it raises the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Important

Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).

The descendant class that overrides this method must call super as first action.

handle_accept_data(channel: Channel, session: FBDPSession, data: bytes) None[source]

Event handler executed to process data received in DATA message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

  • data (bytes) – Data received from client.

Return type:

None

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

The ACK-REQUEST in received DATA message is handled automatically by protocol.

Important

The base implementation simply raises StopError with ErrorCode.OK code, so the descendant class must override this method without super call.

handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None[source]

Event handler called by handle_msg() on exception in message handler.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (Session) – Session associated with connection.

  • msg (Message) – Message.

  • exc (Exception) – Exception.

Return type:

None

Sets service outcome to ERROR and notes exception as details.

handle_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception | None = None) None[source]

Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

  • msg (FBDPMessage) – Received/sent CLOSE message.

  • exc (Exception | None) – Exception that caused the error.

Return type:

None

Important

The base implementation does only two actions:

  • If exception is provided, sets service execution outcome to ERROR and notes exception in details.

  • Stops the service if stop_on_close is True.

The descendant class that overrides this method must call super.

handle_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Event handler executed to store data into outgoing DATA message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

  • msg (FBDPMessage) – DATA message that will be sent to client.

Return type:

None

The event handler must store the data in msg.data_frame attribute. It may also set ACK-REQUEST flag and type_data attribute.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

To indicate end of data, raise StopError with ErrorCode.OK code.

Exceptions are handled by protocol, but only StopError is checked for protocol ErrorCode. As we want to report INVALID_DATA properly, we have to convert UnicodeError into StopError.

Important

The base implementation simply raises StopError with ErrorCode.OK code, so the descendant class must override this method without super call.

handle_schedule_ready(channel: Channel, session: FBDPSession) None[source]

Event handler executed in order to send the READY message to the client later.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Important

The base implementation schedules resend_ready() according to ready_schedule_interval configuration option.

initialize(config: BaseDataPipeConfig) None[source]

Verify configuration and assemble component structural parts.

  • Sets up the service based on the provided configuration.

  • Creates the appropriate FBDP protocol handler (FBDPServer for BIND, FBDPClient for CONNECT).

  • Registers default event handlers for various FBDP events (e.g., on_accept_client, on_pipe_closed, on_produce_data, on_accept_data).

  • Creates a DealerChannel for communication over the pipe.

Parameters:

config (BaseDataPipeConfig)

Return type:

None

release_resources() None[source]

Release resources aquired by component:

Specifically:

Sends an FBDP CLOSE message (indicating an error) to all active pipe sessions.

Return type:

None

batch_size: int

[Configuration] Data batch size

pipe: str

[Configuration] Data Pipe Identification

pipe_address: ZMQAddress

[Configuration] Data Pipe endpoint address

pipe_format: MIME

[Configuration] Pipe data format specification

pipe_mode: SocketMode

[Configuration] Data Pipe Mode

protocol: FBDPServer | FBDPClient

FDBP protocol handler (server or client) This object manages the FBDP state machine and message handling.

ready_schedule_interval: int

[Configuration] READY message schedule interval in milliseconds

server_socket: PipeSocket

Pipe socket this service handles if operated as server (bind). Must be set in descendant class. For PROVIDER it’s PipeSocket.OUTPUT, for CONSUMER it’s PipeSocket.INPUT

stop_on_close: bool

[Configuration] Whether service should stop when pipe is closed

class saturnin.lib.data.onepipe.DataProviderMicro(*args, **kwargs)[source]

Bases: BaseDataPipeMicro

Base data provider microservice (PRODUCER).

This class specializes BaseDataPipeMicro for services that produce data and send it over an FBDP pipe.

Descendant classes should override:

initialize(config: DataProviderConfig) None[source]

Verify configuration and assemble component structural parts.

Parameters:

config (DataProviderConfig)

Return type:

None

class saturnin.lib.data.onepipe.DataConsumerMicro(*args, **kwargs)[source]

Bases: BaseDataPipeMicro

Base data provider microservice.

Descendant classes should override:

initialize(config: DataConsumerConfig) None[source]

Verify configuration and assemble component structural parts.

Parameters:

config (DataConsumerConfig)

Return type:

None