saturnin.lib.data.filter¶
Saturnin base class for data filter microservices.
This module provides DataFilterMicro and its configuration DataFilterConfig,
designed for microservices that read data from an input FBDP data pipe,
process or transform it, and then write the results to an output FBDP
data pipe. It manages the complexities of handling two concurrent pipe
connections and synchronizing data flow between them.
Globals¶
- saturnin.lib.data.filter.INPUT_PIPE_CHN: Final[str] = 'input-pipe'¶
Pipe INPUT channel & endpoint name
Classes¶
- class saturnin.lib.data.filter.DataFilterConfig(name: str)[source]¶
Bases:
ComponentConfigConfiguration for data filter microservices.
This class defines settings specific to microservices that operate as data filters, managing an input pipe and an output pipe.
- Parameters:
name (str) – Configuration section name for this component.
- validate() None[source]¶
Extended validation.
Ensures that:
input_pipe_formatis specified ifinput_pipe_modeisCONNECT.output_pipe_formatis specified ifoutput_pipe_modeisCONNECT.
- Return type:
None
- input_pipe_address: ZMQAddressOption¶
Input Data Pipe endpoint address
- input_pipe_format: MIMEOption¶
Input Pipe data format specification
- input_pipe_mode: EnumOption¶
Input Data Pipe Mode
- input_ready_schedule_interval: IntOption¶
Input Pipe READY message schedule interval in milliseconds
- output_pipe_address: ZMQAddressOption¶
Output Data Pipe endpoint address
- output_pipe_format: MIMEOption¶
Output Pipe data format specification
- output_pipe_mode: EnumOption¶
Output Data Pipe Mode
- output_ready_schedule_interval: IntOption¶
Output Pipe READY message schedule interval in milliseconds
- propagate_input_error: BoolOption¶
When input pipe is closed with error, close output with error as well
- class saturnin.lib.data.filter.DataFilterMicro(*args, **kwargs)[source]¶
Bases:
MicroService“Base class for data filter microservices.
This microservice reads data from an input pipe, processes it, and writes the results to an output pipe. It manages two FBDP connections and uses an internal “wake” mechanism to signal data availability for the output pipe.
Descendant classes should override:
handle_input_accept_clientto validate client requests and acquire resources associated with the input pipe.handle_output_accept_clientto validate client requests and acquire resources associated with the output pipe.handle_output_produce_datato provide the processed data for outgoingDATAmessages on the output pipe.handle_input_accept_datato process data received from the input pipe.handle_input_pipe_closedto release resources associated with the input pipe.handle_output_pipe_closedto release resources associated with the output pipe.
- Parameters:
zmq_context – ZeroMQ Context.
descriptor – Service descriptor.
peer_uid – Peer ID,
Nonemeans that newly generated UUID type 1 should be used.
- aquire_resources() None[source]¶
Acquire resources required by the component.
This involves:
Connecting the internal “wake” PUSH channel to its PULL counterpart.
If
input_pipe_modeisCONNECT, establishes a connection to the input data pipe and initiates the FBDPOPENhandshake.If
output_pipe_modeisCONNECT, establishes a connection to the output data pipe and initiates the FBDPOPENhandshake.
- Return type:
None
- finish_input_processing(channel: Channel, session: FBDPSession, code: ErrorCode) None[source]¶
Called when input pipe is closed while output pipe will remain open.
When code is
ErrorCode.OK, the input was closed normally. Otherwise it indicates the type of problem that caused the input to be closed.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
code (ErrorCode) – Input pipe closing ErrorCode.
- Return type:
None
Note
The default implementation does nothing.
- handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None[source]¶
Event handler called by
handle_msg()on exception in message handler.- Parameters:
- Return type:
None
Sets service outcome to ERROR and notes exception as details.
- handle_input_accept_client(channel: Channel, session: FBDPSession) None[source]¶
Event handler executed when client connects to INPUT data pipe via OPEN message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The session attributes
pipe,socket,data_formatandparamscontain information sent by client, and the event handler validates the request.If request should be rejected, it raises the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Important
Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).
The descendant class that overrides this method must call
superas first action.
- handle_input_accept_data(channel: Channel, session: FBDPSession, data: bytes) None[source]¶
Event handler executed to process data received in DATA message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
data (bytes) – Data received from client.
- Return type:
None
Important
Any output data produced by event handler must be stored into output queue via
store_output()method.The base implementation simply raises
StopErrorwithOKcode, so the descendant class must override this method withoutsupercall.The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Note
The ACK-REQUEST in received DATA message is handled automatically by protocol.
- handle_input_get_data(channel: Channel, session: FBDPSession) bool[source]¶
Event handler executed to query the service whether is ready to accept input data.
- Parameters:
channel (Channel) – Channel associated with input data pipe.
session (FBDPSession) – Session associated with server.
- Return type:
Returns True if output pipe is open, otherwise false.
- handle_input_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception | None = None) None[source]¶
Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
msg (FBDPMessage) – Received/sent CLOSE message.
exc (Exception | None) – Exception that caused the error.
- Return type:
None
Important
The base implementation does next actions:
If exception is provided, sets service execution outcome to ERROR and notes exception in details.
Closes the output pipe it’s still open and
closingflag is False, and if the input is not closed normally andpropagate_input_erroris True.Sets the signal to stop the service.
The descendant class that overrides this method must call
super.
- handle_input_schedule_ready(channel: Channel, session: FBDPSession) None[source]¶
Event handler executed in order to send the READY message to the client later.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Important
The base implementation schedules
resend_ready()according toinput_ready_schedule_intervalconfiguration option.
- handle_output_accept_client(channel: Channel, session: FBDPSession) None[source]¶
Event handler executed when client connects to OUTPUT data pipe via OPEN message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The session attributes
pipe,socket,data_formatandparamscontain information sent by client, and the event handler validates the request.If request should be rejected, it raises the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Important
Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).
The descendant class that overrides this method must call
superas first action.
- handle_output_get_data(channel: Channel, session: FBDPSession) bool[source]¶
Event handler executed to query the data source for data availability.
- Parameters:
channel (Channel) – Channel associated with output data pipe.
session (FBDPSession) – Session associated with connection.
- Return type:
Returns True if output deque contains any data.
Cancels the transmission by raising the
StopErrorif there are no output data and input pipe is closed.
- handle_output_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception | None = None) None[source]¶
Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
msg (FBDPMessage) – Received/sent CLOSE message.
exc (Exception | None) – Exception that caused the error.
- Return type:
None
Important
The base implementation does next actions:
If exception is provided, sets service execution outcome to ERROR and notes exception in details.
Closes the input pipe if it’s still open and
closingflag is False.Sets the signal to stop the service.
The descendant class that overrides this method must call
super.
- handle_output_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Event handler executed to store data into outgoing DATA message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
msg (FBDPMessage) – DATA message that will be sent to client.
- Return type:
None
Important
The base implementation simply raises
StopErrorwithOKcode, so the descendant class must override this method withoutsupercall.The event handler must
popleft()data fromoutputqueue and store them inmsg.data_frameattribute. It may also set ACK-REQUEST flag andtype_dataattribute.The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.
- handle_output_schedule_ready(channel: Channel, session: FBDPSession) None[source]¶
Event handler executed in order to send the READY message to the client later.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Important
The base implementation schedules
resend_ready()according tooutput_ready_schedule_intervalconfiguration option.
- handle_wake_msg(channel: Channel, session: Session, msg: Message) None[source]¶
Handler for “data available” pings sent via wake channels.
- initialize(config: DataFilterConfig) None[source]¶
Verify configuration and assemble component structural parts.
- Parameters:
config (DataFilterConfig) – Service configuration
- Return type:
None
- release_resources() None[source]¶
Release resources acquired by the component.
This involves:
Discarding the session for the internal “wake” PUSH channel.
Sending an FBDP
CLOSEmessage (indicating an error) to all active sessions on the input data pipe.Sending an FBDP
CLOSEmessage (indicating an error) to all active sessions on the output data pipe.
- Return type:
None
- store_batch_output(batch: list) None[source]¶
Store batch of data to output queue and send wake notification.
- Parameters:
batch (list) – Data to be stored to output queue.
- Return type:
None
- store_output(data: Any) None[source]¶
Store data to output queue and send wake notification.
- Parameters:
data (Any) – Data to be stored to output queue.
- Return type:
None
- input_pipe_address: ZMQAddress¶
[Configuration] Data Pipe endpoint address
- input_pipe_mode: SocketMode¶
[Configuration] Data Pipe Mode
- input_protocol: FBDPServer | FBDPClient¶
FDBP protocol handler (server or client) for input pipe
- input_ready_schedule_interval: int¶
[Configuration] pipe READY message schedule interval in milliseconds
- output: deque¶
Internal deque to buffer data processed from the input pipe, pending transmission on the output pipe.
- output_pipe_address: ZMQAddress¶
[Configuration] Data Pipe endpoint address
- output_pipe_mode: SocketMode¶
[Configuration] Data Pipe Mode
- output_protocol: FBDPServer | FBDPClient¶
FDBP protocol handler (server or client) for output pipe
- output_ready_schedule_interval: int¶
[Configuration] pipe READY message schedule interval in milliseconds
- propagate_input_error: bool¶
[Configuration] When input pipe is closed with error, close output with error as well
- wake_address: ZMQAddress¶
Internal AWAKE address
- wake_in_chn: PullChannel¶
Internal AWAKE input channel
- wake_out_chn: PushChannel¶
Internal AWAKE output channel