saturnin.lib.data.onepipe¶
Saturnin base classes for data provider and consumer microservices using the FBDP protocol.
This module provides abstract base classes (BaseDataPipeMicro, DataProviderMicro,
DataConsumerMicro) and their associated configurations (BaseDataPipeConfig,
DataProviderConfig, DataConsumerConfig) to simplify the creation of
microservices that act as either producers or consumers of data over a
Saturnin data pipe. These classes handle much of the common boilerplate
for FBDP communication, allowing developers to focus on the specific
data handling logic.
Globals¶
Classes¶
- class saturnin.lib.data.onepipe.BaseDataPipeConfig(name: str)[source]¶
Bases:
ComponentConfigBase data provider/consumer microservice configuration.
- Parameters:
name (str)
- validate() None[source]¶
Extended validation.
pipe_formatis required for CONNECTpipe_mode.
- Return type:
None
- pipe_address: ZMQAddressOption¶
Data Pipe endpoint address
- pipe_format: MIMEOption¶
Pipe data format specification
- pipe_mode: EnumOption¶
Data Pipe Mode
- stop_on_close: BoolOption¶
Stop service when pipe is closed
- class saturnin.lib.data.onepipe.DataProviderConfig(name: str)[source]¶
Bases:
BaseDataPipeConfigBase data provider microservice configuration.
- Parameters:
name (str)
- class saturnin.lib.data.onepipe.DataConsumerConfig(name: str)[source]¶
Bases:
BaseDataPipeConfigBase data consumer microservice configuration.
- Parameters:
name (str)
- class saturnin.lib.data.onepipe.BaseDataPipeMicro(*args, **kwargs)[source]¶
Bases:
MicroServiceBase data provider/consumer microservice.
Abstract base class for both providers and consumers. It handles the common FBDP logic.
Descendant classes should override:
handle_accept_clientto validate client request and acquire resources associated with the pipe.handle_produce_datato produce data for outgoing DATA message (PRODUCER only).handle_accept_datato process received data (CONSUMER only).handle_pipe_closedto release resources associated with the pipe.
- Parameters:
zmq_context – ZeroMQ Context.
descriptor – Service descriptor.
peer_uid – Peer ID,
Nonemeans that newly generated UUID type 1 should be used.
- aquire_resources() None[source]¶
Acquire resources required by the component.
Specifically:
If
pipe_modeisCONNECT, it connects to the data pipe endpoint and initiates the FBDPOPENhandshake. The client socket type (INPUT/OUTPUT) is determined as the inverse ofserver_socket.- Return type:
None
- handle_accept_client(channel: Channel, session: FBDPSession) None[source]¶
Event handler executed when client connects to the data pipe via OPEN message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The session attributes
pipe,socket,data_formatandparamscontain information sent by client, and the event handler validates the request.If request should be rejected, it raises the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Important
Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).
The descendant class that overrides this method must call
superas first action.
- handle_accept_data(channel: Channel, session: FBDPSession, data: bytes) None[source]¶
Event handler executed to process data received in DATA message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
data (bytes) – Data received from client.
- Return type:
None
The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Note
The ACK-REQUEST in received DATA message is handled automatically by protocol.
- handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None[source]¶
Event handler called by
handle_msg()on exception in message handler.- Parameters:
- Return type:
None
Sets service outcome to ERROR and notes exception as details.
- handle_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception | None = None) None[source]¶
Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
msg (FBDPMessage) – Received/sent CLOSE message.
exc (Exception | None) – Exception that caused the error.
- Return type:
None
Important
The base implementation does only two actions:
If exception is provided, sets service execution outcome to ERROR and notes exception in details.
Stops the service if
stop_on_closeis True.
The descendant class that overrides this method must call
super.
- handle_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Event handler executed to store data into outgoing DATA message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
msg (FBDPMessage) – DATA message that will be sent to client.
- Return type:
None
The event handler must store the data in
msg.data_frameattribute. It may also set ACK-REQUEST flag andtype_dataattribute.The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Note
To indicate end of data, raise StopError with ErrorCode.OK code.
Exceptions are handled by protocol, but only StopError is checked for protocol ErrorCode. As we want to report INVALID_DATA properly, we have to convert UnicodeError into StopError.
- handle_schedule_ready(channel: Channel, session: FBDPSession) None[source]¶
Event handler executed in order to send the READY message to the client later.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Important
The base implementation schedules
resend_ready()according toready_schedule_intervalconfiguration option.
- initialize(config: BaseDataPipeConfig) None[source]¶
Verify configuration and assemble component structural parts.
Sets up the service based on the provided configuration.
Creates the appropriate FBDP protocol handler (
FBDPServerforBIND,FBDPClientforCONNECT).Registers default event handlers for various FBDP events (e.g.,
on_accept_client,on_pipe_closed,on_produce_data,on_accept_data).Creates a
DealerChannelfor communication over the pipe.
- Parameters:
config (BaseDataPipeConfig)
- Return type:
None
- release_resources() None[source]¶
Release resources aquired by component:
Specifically:
Sends an FBDP
CLOSEmessage (indicating an error) to all active pipe sessions.- Return type:
None
- pipe_address: ZMQAddress¶
[Configuration] Data Pipe endpoint address
- pipe_mode: SocketMode¶
[Configuration] Data Pipe Mode
- protocol: FBDPServer | FBDPClient¶
FDBP protocol handler (server or client) This object manages the FBDP state machine and message handling.
- server_socket: PipeSocket¶
Pipe socket this service handles if operated as server (bind). Must be set in descendant class. For PROVIDER it’s
PipeSocket.OUTPUT, for CONSUMER it’sPipeSocket.INPUT
- class saturnin.lib.data.onepipe.DataProviderMicro(*args, **kwargs)[source]¶
Bases:
BaseDataPipeMicroBase data provider microservice (PRODUCER).
This class specializes
BaseDataPipeMicrofor services that produce data and send it over an FBDP pipe.Descendant classes should override:
handle_accept_clientto validate client requests and acquire resources associated with the pipe.handle_produce_datato generate and provide the data for outgoingDATAmessages.handle_pipe_closedto release resources associated with the pipe.
- initialize(config: DataProviderConfig) None[source]¶
Verify configuration and assemble component structural parts.
- Parameters:
config (DataProviderConfig)
- Return type:
None
- class saturnin.lib.data.onepipe.DataConsumerMicro(*args, **kwargs)[source]¶
Bases:
BaseDataPipeMicroBase data provider microservice.
Descendant classes should override:
handle_accept_clientto validate client request and aquire resources associated with pipe.handle_accept_datato process received data.handle_pipe_closedto release resource assiciated with pipe.
- initialize(config: DataConsumerConfig) None[source]¶
Verify configuration and assemble component structural parts.
- Parameters:
config (DataConsumerConfig)
- Return type:
None