saturnin.lib.data.filter¶
Saturnin base class for data filter microservices
Globals¶
- saturnin.lib.data.filter.INPUT_PIPE_CHN: Final[str] = 'input-pipe'¶
Pipe INPUT channel & endpoint name
Classes¶
- class saturnin.lib.data.filter.DataFilterConfig(name: str)[source]¶
Bases:
ComponentConfig
Base data provider microservice configuration.
- Parameters:
name (str) – Conf. section name.
- validate() None [source]¶
Extended validation.
pipe_format
is required for CONNECTpipe_mode
.
- Return type:
None
- input_batch_size: IntOption¶
Input Pipe Data batch size
- input_pipe: StrOption¶
Input Data Pipe Identification
- input_pipe_address: ZMQAddressOption¶
Input Data Pipe endpoint address
- input_pipe_format: MIMEOption¶
Input Pipe data format specification
- input_pipe_mode: EnumOption¶
Input Data Pipe Mode
- input_ready_schedule_interval: IntOption¶
Input Pipe READY message schedule interval in milliseconds
- output_batch_size: IntOption¶
Output Pipe Data batch size
- output_pipe: StrOption¶
Output Data Pipe Identification
- output_pipe_address: ZMQAddressOption¶
Output Data Pipe endpoint address
- output_pipe_format: MIMEOption¶
Output Pipe data format specification
- output_pipe_mode: EnumOption¶
Output Data Pipe Mode
- output_ready_schedule_interval: IntOption¶
Output Pipe READY message schedule interval in milliseconds
- propagate_input_error: BoolOption¶
When input pipe is closed with error, close output with error as well
- class saturnin.lib.data.filter.DataFilterMicro(*args, **kwargs)[source]¶
Bases:
MicroService
Base data provider microservice.
Descendant classes should override:
handle_input_accept_client
to validate client request and aquire resources associated with input pipe.handle_output_accept_client
to validate client request and aquire resources associated with output pipe.handle_output_produce_data
to produce data for outgoing DATA message.handle_input_accept_data
to process received data.handle_input_pipe_closed
to release resource assiciated with input pipe.handle_output_pipe_closed
to release resource assiciated with output pipe.
- __init__(zmq_context: Context, descriptor: ServiceDescriptor, *, peer_uid: UUID = None)[source]¶
- Parameters:
zmq_context (Context) – ZeroMQ Context.
descriptor (ServiceDescriptor) – Service descriptor.
peer_uid (UUID) – Peer ID,
None
means that newly generated UUID type 1 should be used.
- aquire_resources() None [source]¶
Aquire resources required by component:
Connect wake PUSH channel
Connects to input and output data pipes (if necessary).
- Return type:
None
- finish_input_processing(channel: Channel, session: FBDPSession, code: ErrorCode) None [source]¶
Called when input pipe is closed while output pipe will remain open.
When code is
ErrorCode.OK
, the input was closed normally. Otherwise it indicates the type of problem that caused the input to be closed.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
code (ErrorCode) – Input pipe closing ErrorCode.
- Return type:
None
Note
The default implementation does nothing.
- handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None [source]¶
Event handler called by
handle_msg()
on exception in message handler.- Parameters:
- Return type:
None
Sets service outcome to ERROR and notes exception as details.
- handle_input_accept_client(channel: Channel, session: FBDPSession) None [source]¶
Event handler executed when client connects to INPUT data pipe via OPEN message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The session attributes
pipe
,socket
,data_format
andparams
contain information sent by client, and the event handler validates the request.If request should be rejected, it raises the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Important
Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).
The descendant class that overrides this method must call
super
as first action.
- handle_input_accept_data(channel: Channel, session: FBDPSession, data: bytes) None [source]¶
Event handler executed to process data received in DATA message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
data (bytes) – Data received from client.
- Return type:
None
Important
Any output data produced by event handler must be stored into output queue via
store_output()
method.The base implementation simply raises
StopError
withOK
code, so the descendant class must override this method withoutsuper
call.The event handler may cancel the transmission by raising the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Note
The ACK-REQUEST in received DATA message is handled automatically by protocol.
- handle_input_get_data(channel: Channel, session: FBDPSession) bool [source]¶
Event handler executed to query the service whether is ready to accept input data.
- Parameters:
channel (Channel) – Channel associated with input data pipe.
session (FBDPSession) – Session associated with server.
- Return type:
Returns True if output pipe is open, otherwise false.
- handle_input_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception = None) None [source]¶
Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
msg (FBDPMessage) – Received/sent CLOSE message.
exc (Exception) – Exception that caused the error.
- Return type:
None
Important
The base implementation does next actions:
If exception is provided, sets service execution outcome to ERROR and notes exception in details.
Closes the output pipe it’s still open and
closing
flag is False, and if the input is not closed normally andpropagate_input_error
is True.Sets the signal to stop the service.
The descendant class that overrides this method must call
super
.
- handle_input_schedule_ready(channel: Channel, session: FBDPSession) None [source]¶
Event handler executed in order to send the READY message to the client later.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The event handler may cancel the transmission by raising the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Important
The base implementation schedules
resend_ready()
according toinput_ready_schedule_interval
configuration option.
- handle_output_accept_client(channel: Channel, session: FBDPSession) None [source]¶
Event handler executed when client connects to OUTPUT data pipe via OPEN message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The session attributes
pipe
,socket
,data_format
andparams
contain information sent by client, and the event handler validates the request.If request should be rejected, it raises the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Important
Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).
The descendant class that overrides this method must call
super
as first action.
- handle_output_get_data(channel: Channel, session: FBDPSession) bool [source]¶
Event handler executed to query the data source for data availability.
- Parameters:
channel (Channel) – Channel associated with output data pipe.
session (FBDPSession) – Session associated with connection.
- Return type:
Returns True if output deque contains any data.
Cancels the transmission by raising the
StopError
if there are no output data and input pipe is closed.
- handle_output_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception = None) None [source]¶
Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
msg (FBDPMessage) – Received/sent CLOSE message.
exc (Exception) – Exception that caused the error.
- Return type:
None
Important
The base implementation does next actions:
If exception is provided, sets service execution outcome to ERROR and notes exception in details.
Closes the input pipe if it’s still open and
closing
flag is False.Sets the signal to stop the service.
The descendant class that overrides this method must call
super
.
- handle_output_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None [source]¶
Event handler executed to store data into outgoing DATA message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
msg (FBDPMessage) – DATA message that will be sent to client.
- Return type:
None
Important
The base implementation simply raises
StopError
withOK
code, so the descendant class must override this method withoutsuper
call.The event handler must
popleft()
data fromoutput
queue and store them inmsg.data_frame
attribute. It may also set ACK-REQUEST flag andtype_data
attribute.The event handler may cancel the transmission by raising the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.
- handle_output_schedule_ready(channel: Channel, session: FBDPSession) None [source]¶
Event handler executed in order to send the READY message to the client later.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The event handler may cancel the transmission by raising the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Important
The base implementation schedules
resend_ready()
according tooutput_ready_schedule_interval
configuration option.
- handle_wake_msg(channel: Channel, session: Session, msg: Message) None [source]¶
Handler for “data available” pings sent via wake channels.
- initialize(config: DataFilterConfig) None [source]¶
Verify configuration and assemble component structural parts.
- Parameters:
config (DataFilterConfig) – Service configuration
- Return type:
None
- release_resources() None [source]¶
Release resources aquired by component:
Disconnect the wake PUSH channel
Close all active data input sessions
Close all active data output sessions
- Return type:
None
- store_batch_output(batch: List) None [source]¶
Store batch of data to output queue and send wake notification.
- Parameters:
batch (List) – Data to be stored to output queue.
- Return type:
None
- store_output(data: Any) None [source]¶
Store data to output queue and send wake notification.
- Parameters:
data (Any) – Data to be stored to output queue.
- Return type:
None
- input_pipe_address: ZMQAddress¶
[Configuration] Data Pipe endpoint address
- input_pipe_format: MIME¶
[Configuration] Pipe data format specification
- input_pipe_mode: SocketMode¶
[Configuration] Data Pipe Mode
- input_protocol: FBDPServer | FBDPClient¶
FDBP protocol handler (server or client) for input pipe
- input_ready_schedule_interval: int¶
[Configuration] pipe READY message schedule interval in milliseconds
- output: deque¶
Data to be sent to output.
- output_pipe_address: ZMQAddress¶
[Configuration] Data Pipe endpoint address
- output_pipe_format: MIME¶
[Configuration] Pipe data format specification
- output_pipe_mode: SocketMode¶
[Configuration] Data Pipe Mode
- output_protocol: FBDPServer | FBDPClient¶
FDBP protocol handler (server or client) for output pipe
- output_ready_schedule_interval: int¶
[Configuration] pipe READY message schedule interval in milliseconds
- propagate_input_error: bool¶
[Configuration] When input pipe is closed with error, close output with error as well
- wake_address: ZMQAddress¶
Internal AWAKE address
- wake_in_chn: PullChannel¶
Internal AWAKE input channel
- wake_out_chn: PushChannel¶
Internal AWAKE output channel