saturnin.lib.data.filter

Saturnin base class for data filter microservices

Globals

saturnin.lib.data.filter.INPUT_PIPE_CHN: Final[str] = 'input-pipe'

Pipe INPUT channel & endpoint name

saturnin.lib.data.filter.OUTPUT_PIPE_CHN: Final[str] = 'output-pipe'

Pipe OUTPUT channel & endpoint name

saturnin.lib.data.filter.WAKE_PUSH_CHN: Final[str] = 'wake-push'

Wake PUSH channel & endpoint name

saturnin.lib.data.filter.WAKE_PULL_CHN: Final[str] = 'wake-pull'

Wake PULL channel & endpoint name

Classes

class saturnin.lib.data.filter.DataFilterConfig(name: str)[source]

Bases: ComponentConfig

Base data provider microservice configuration.

Parameters:

name (str) – Conf. section name.

validate() None[source]

Extended validation.

  • pipe_format is required for CONNECT pipe_mode.

Return type:

None

input_batch_size: IntOption

Input Pipe Data batch size

input_pipe: StrOption

Input Data Pipe Identification

input_pipe_address: ZMQAddressOption

Input Data Pipe endpoint address

input_pipe_format: MIMEOption

Input Pipe data format specification

input_pipe_mode: EnumOption

Input Data Pipe Mode

input_ready_schedule_interval: IntOption

Input Pipe READY message schedule interval in milliseconds

output_batch_size: IntOption

Output Pipe Data batch size

output_pipe: StrOption

Output Data Pipe Identification

output_pipe_address: ZMQAddressOption

Output Data Pipe endpoint address

output_pipe_format: MIMEOption

Output Pipe data format specification

output_pipe_mode: EnumOption

Output Data Pipe Mode

output_ready_schedule_interval: IntOption

Output Pipe READY message schedule interval in milliseconds

propagate_input_error: BoolOption

When input pipe is closed with error, close output with error as well

class saturnin.lib.data.filter.DataFilterMicro(*args, **kwargs)[source]

Bases: MicroService

Base data provider microservice.

Descendant classes should override:

__init__(zmq_context: Context, descriptor: ServiceDescriptor, *, peer_uid: UUID = None)[source]
Parameters:
  • zmq_context (Context) – ZeroMQ Context.

  • descriptor (ServiceDescriptor) – Service descriptor.

  • peer_uid (UUID) – Peer ID, None means that newly generated UUID type 1 should be used.

aquire_resources() None[source]

Aquire resources required by component:

  1. Connect wake PUSH channel

  2. Connects to input and output data pipes (if necessary).

Return type:

None

finish_input_processing(channel: Channel, session: FBDPSession, code: ErrorCode) None[source]

Called when input pipe is closed while output pipe will remain open.

When code is ErrorCode.OK, the input was closed normally. Otherwise it indicates the type of problem that caused the input to be closed.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

  • code (ErrorCode) – Input pipe closing ErrorCode.

Return type:

None

Note

The default implementation does nothing.

handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None[source]

Event handler called by handle_msg() on exception in message handler.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (Session) – Session associated with connection.

  • msg (Message) – Message.

  • exc (Exception) – Exception.

Return type:

None

Sets service outcome to ERROR and notes exception as details.

handle_input_accept_client(channel: Channel, session: FBDPSession) None[source]

Event handler executed when client connects to INPUT data pipe via OPEN message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

The session attributes pipe, socket, data_format and params contain information sent by client, and the event handler validates the request.

If request should be rejected, it raises the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Important

Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).

The descendant class that overrides this method must call super as first action.

handle_input_accept_data(channel: Channel, session: FBDPSession, data: bytes) None[source]

Event handler executed to process data received in DATA message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

  • data (bytes) – Data received from client.

Return type:

None

Important

Any output data produced by event handler must be stored into output queue via store_output() method.

The base implementation simply raises StopError with OK code, so the descendant class must override this method without super call.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

The ACK-REQUEST in received DATA message is handled automatically by protocol.

handle_input_get_data(channel: Channel, session: FBDPSession) bool[source]

Event handler executed to query the service whether is ready to accept input data.

Parameters:
  • channel (Channel) – Channel associated with input data pipe.

  • session (FBDPSession) – Session associated with server.

Return type:

bool

Returns True if output pipe is open, otherwise false.

handle_input_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception = None) None[source]

Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

  • msg (FBDPMessage) – Received/sent CLOSE message.

  • exc (Exception) – Exception that caused the error.

Return type:

None

Important

The base implementation does next actions:

  • If exception is provided, sets service execution outcome to ERROR and notes exception in details.

  • Closes the output pipe it’s still open and closing flag is False, and if the input is not closed normally and propagate_input_error is True.

  • Sets the signal to stop the service.

The descendant class that overrides this method must call super.

handle_input_schedule_ready(channel: Channel, session: FBDPSession) None[source]

Event handler executed in order to send the READY message to the client later.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Important

The base implementation schedules resend_ready() according to input_ready_schedule_interval configuration option.

handle_output_accept_client(channel: Channel, session: FBDPSession) None[source]

Event handler executed when client connects to OUTPUT data pipe via OPEN message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

The session attributes pipe, socket, data_format and params contain information sent by client, and the event handler validates the request.

If request should be rejected, it raises the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Important

Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).

The descendant class that overrides this method must call super as first action.

handle_output_get_data(channel: Channel, session: FBDPSession) bool[source]

Event handler executed to query the data source for data availability.

Parameters:
  • channel (Channel) – Channel associated with output data pipe.

  • session (FBDPSession) – Session associated with connection.

Return type:

bool

Returns True if output deque contains any data.

Cancels the transmission by raising the StopError if there are no output data and input pipe is closed.

handle_output_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception = None) None[source]

Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

  • msg (FBDPMessage) – Received/sent CLOSE message.

  • exc (Exception) – Exception that caused the error.

Return type:

None

Important

The base implementation does next actions:

  • If exception is provided, sets service execution outcome to ERROR and notes exception in details.

  • Closes the input pipe if it’s still open and closing flag is False.

  • Sets the signal to stop the service.

The descendant class that overrides this method must call super.

handle_output_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Event handler executed to store data into outgoing DATA message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

  • msg (FBDPMessage) – DATA message that will be sent to client.

Return type:

None

Important

The base implementation simply raises StopError with OK code, so the descendant class must override this method without super call.

The event handler must popleft() data from output queue and store them in msg.data_frame attribute. It may also set ACK-REQUEST flag and type_data attribute.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

To indicate end of data, raise StopError with OK code.

handle_output_schedule_ready(channel: Channel, session: FBDPSession) None[source]

Event handler executed in order to send the READY message to the client later.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Important

The base implementation schedules resend_ready() according to output_ready_schedule_interval configuration option.

handle_wake_msg(channel: Channel, session: Session, msg: Message) None[source]

Handler for “data available” pings sent via wake channels.

Parameters:
  • channel (Channel) – Channel associated with wake delivery.

  • session (Session) – Session associated with client.

  • msg (Message) – Wake message.

Return type:

None

initialize(config: DataFilterConfig) None[source]

Verify configuration and assemble component structural parts.

Parameters:

config (DataFilterConfig) – Service configuration

Return type:

None

release_resources() None[source]

Release resources aquired by component:

  1. Disconnect the wake PUSH channel

  2. Close all active data input sessions

  3. Close all active data output sessions

Return type:

None

store_batch_output(batch: List) None[source]

Store batch of data to output queue and send wake notification.

Parameters:

batch (List) – Data to be stored to output queue.

Return type:

None

store_output(data: Any) None[source]

Store data to output queue and send wake notification.

Parameters:

data (Any) – Data to be stored to output queue.

Return type:

None

closing: bool

Closing flag

input_batch_size: int

[Configuration] Data batch size

input_pipe: str

[Configuration] Data Pipe Identification

input_pipe_address: ZMQAddress

[Configuration] Data Pipe endpoint address

input_pipe_format: MIME

[Configuration] Pipe data format specification

input_pipe_mode: SocketMode

[Configuration] Data Pipe Mode

input_protocol: FBDPServer | FBDPClient

FDBP protocol handler (server or client) for input pipe

input_ready_schedule_interval: int

[Configuration] pipe READY message schedule interval in milliseconds

output: deque

Data to be sent to output.

output_batch_size: int

[Configuration] Data batch size

output_pipe: str

[Configuration] Data Pipe Identification

output_pipe_address: ZMQAddress

[Configuration] Data Pipe endpoint address

output_pipe_format: MIME

[Configuration] Pipe data format specification

output_pipe_mode: SocketMode

[Configuration] Data Pipe Mode

output_protocol: FBDPServer | FBDPClient

FDBP protocol handler (server or client) for output pipe

output_ready_schedule_interval: int

[Configuration] pipe READY message schedule interval in milliseconds

pipe_in_chn: Channel

Input pipe channel

pipe_out_chn: Channel

Output pipe channel

propagate_input_error: bool

[Configuration] When input pipe is closed with error, close output with error as well

wake_address: ZMQAddress

Internal AWAKE address

wake_in_chn: PullChannel

Internal AWAKE input channel

wake_out_chn: PushChannel

Internal AWAKE output channel