saturnin.lib.data.onepipe

Saturnin base classes for data provider and consumer microservices

This is extended description.

Globals

saturnin.lib.data.onepipe.PIPE_CHN: Final[str] = 'pipe'

Channel & endpoint name

Classes

class saturnin.lib.data.onepipe.BaseDataPipeConfig(name: str)[source]

Bases: ComponentConfig

Base data provider/consumer microservice configuration.

Parameters:

name (str) –

validate() None[source]

Extended validation.

Return type:

None

batch_size: IntOption

Data batch size

pipe: StrOption

Data Pipe Identification

pipe_address: ZMQAddressOption

Data Pipe endpoint address

pipe_format: MIMEOption

Pipe data format specification

pipe_mode: EnumOption

Data Pipe Mode

ready_schedule_interval: IntOption

READY message schedule interval in milliseconds

stop_on_close: BoolOption

Stop service when pipe is closed

class saturnin.lib.data.onepipe.DataProviderConfig(name: str)[source]

Bases: BaseDataPipeConfig

Base data provider microservice configuration.

Parameters:

name (str) –

class saturnin.lib.data.onepipe.DataConsumerConfig(name: str)[source]

Bases: BaseDataPipeConfig

Base data consumer microservice configuration.

Parameters:

name (str) –

class saturnin.lib.data.onepipe.BaseDataPipeMicro(*args, **kwargs)[source]

Bases: MicroService

Base data provider/consumer microservice.

Descendant classes should override:

__init__(zmq_context: Context, descriptor: ServiceDescriptor, *, peer_uid: UUID = None)[source]
Parameters:
  • zmq_context (Context) – ZeroMQ Context.

  • descriptor (ServiceDescriptor) – Service descriptor.

  • peer_uid (UUID) – Peer ID, None means that newly generated UUID type 1 should be used.

aquire_resources() None[source]

Aquire resources required by component:

  1. If pipe_mode is CONNECT, it will connect to the end of the pipe that is the inverse of server_socket.

Return type:

None

handle_accept_client(channel: Channel, session: FBDPSession) None[source]

Event handler executed when client connects to the data pipe via OPEN message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

The session attributes pipe, socket, data_format and params contain information sent by client, and the event handler validates the request.

If request should be rejected, it raises the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Important

Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).

The descendant class that overrides this method must call super as first action.

handle_accept_data(channel: Channel, session: FBDPSession, data: bytes) None[source]

Event handler executed to process data received in DATA message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

  • data (bytes) – Data received from client.

Return type:

None

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

The ACK-REQUEST in received DATA message is handled automatically by protocol.

Important

The base implementation simply raises StopError with ErrorCode.OK code, so the descendant class must override this method without super call.

handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None[source]

Event handler called by handle_msg() on exception in message handler.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (Session) – Session associated with connection.

  • msg (Message) – Message.

  • exc (Exception) – Exception.

Return type:

None

Sets service outcome to ERROR and notes exception as details.

handle_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception = None) None[source]

Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

  • msg (FBDPMessage) – Received/sent CLOSE message.

  • exc (Exception) – Exception that caused the error.

Return type:

None

Important

The base implementation does only two actions:

  • If exception is provided, sets service execution outcome to ERROR and notes exception in details.

  • Stops the service if stop_on_close is True.

The descendant class that overrides this method must call super.

handle_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Event handler executed to store data into outgoing DATA message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

  • msg (FBDPMessage) – DATA message that will be sent to client.

Return type:

None

The event handler must store the data in msg.data_frame attribute. It may also set ACK-REQUEST flag and type_data attribute.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

To indicate end of data, raise StopError with ErrorCode.OK code.

Note

Exceptions are handled by protocol, but only StopError is checked for protocol ErrorCode. As we want to report INVALID_DATA properly, we have to convert UnicodeError into StopError.

Important

The base implementation simply raises StopError with ErrorCode.OK code, so the descendant class must override this method without super call.

handle_schedule_ready(channel: Channel, session: FBDPSession) None[source]

Event handler executed in order to send the READY message to the client later.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Important

The base implementation schedules resend_ready() according to ready_schedule_interval configuration option.

initialize(config: BaseDataPipeConfig) None[source]

Verify configuration and assemble component structural parts.

Parameters:

config (BaseDataPipeConfig) –

Return type:

None

release_resources() None[source]

Release resources aquired by component:

  1. CLOSE all active pipe sessions.

Return type:

None

batch_size: int

[Configuration] Data batch size

pipe: str

[Configuration] Data Pipe Identification

pipe_address: ZMQAddress

[Configuration] Data Pipe endpoint address

pipe_format: MIME

[Configuration] Pipe data format specification

pipe_mode: SocketMode

[Configuration] Data Pipe Mode

protocol: FBDPServer | FBDPClient

FDBP protocol handler (server or client)

ready_schedule_interval: int

[Configuration] READY message schedule interval in milliseconds

server_socket: PipeSocket

Pipe socket this service handles if operated as server (bind). Must be set in descendant class. For PROVIDER it’s PipeSocket.OUTPUT, for CONSUMER it’s PipeSocket.INPUT

stop_on_close: bool

[Configuration] Whether service should stop when pipe is closed

class saturnin.lib.data.onepipe.DataProviderMicro(*args, **kwargs)[source]

Bases: BaseDataPipeMicro

Base data provider microservice.

Descendant classes should override:

initialize(config: DataProviderConfig) None[source]

Verify configuration and assemble component structural parts.

Parameters:

config (DataProviderConfig) –

Return type:

None

class saturnin.lib.data.onepipe.DataConsumerMicro(*args, **kwargs)[source]

Bases: BaseDataPipeMicro

Base data provider microservice.

Descendant classes should override:

initialize(config: DataConsumerConfig) None[source]

Verify configuration and assemble component structural parts.

Parameters:

config (DataConsumerConfig) –

Return type:

None