saturnin.lib.data.onepipe¶
Saturnin base classes for data provider and consumer microservices
This is extended description.
Globals¶
Classes¶
- class saturnin.lib.data.onepipe.BaseDataPipeConfig(name: str)[source]¶
Bases:
ComponentConfig
Base data provider/consumer microservice configuration.
- Parameters:
name (str) –
- validate() None [source]¶
Extended validation.
pipe_format
is required for CONNECTpipe_mode
.
- Return type:
None
- batch_size: IntOption¶
Data batch size
- pipe: StrOption¶
Data Pipe Identification
- pipe_address: ZMQAddressOption¶
Data Pipe endpoint address
- pipe_format: MIMEOption¶
Pipe data format specification
- pipe_mode: EnumOption¶
Data Pipe Mode
- ready_schedule_interval: IntOption¶
READY message schedule interval in milliseconds
- stop_on_close: BoolOption¶
Stop service when pipe is closed
- class saturnin.lib.data.onepipe.DataProviderConfig(name: str)[source]¶
Bases:
BaseDataPipeConfig
Base data provider microservice configuration.
- Parameters:
name (str) –
- class saturnin.lib.data.onepipe.DataConsumerConfig(name: str)[source]¶
Bases:
BaseDataPipeConfig
Base data consumer microservice configuration.
- Parameters:
name (str) –
- class saturnin.lib.data.onepipe.BaseDataPipeMicro(*args, **kwargs)[source]¶
Bases:
MicroService
Base data provider/consumer microservice.
Descendant classes should override:
handle_accept_client
to validate client request and aquire resources associated with pipe.handle_produce_data
to produce data for outgoing DATA message. PRODUCER only.handle_accept_data
to process received data. CONSUMER only.handle_pipe_closed
to release resource assiciated with pipe.
- __init__(zmq_context: Context, descriptor: ServiceDescriptor, *, peer_uid: UUID = None)[source]¶
- Parameters:
zmq_context (Context) – ZeroMQ Context.
descriptor (ServiceDescriptor) – Service descriptor.
peer_uid (UUID) – Peer ID,
None
means that newly generated UUID type 1 should be used.
- aquire_resources() None [source]¶
Aquire resources required by component:
If
pipe_mode
isCONNECT
, it will connect to the end of the pipe that is the inverse ofserver_socket
.
- Return type:
None
- handle_accept_client(channel: Channel, session: FBDPSession) None [source]¶
Event handler executed when client connects to the data pipe via OPEN message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The session attributes
pipe
,socket
,data_format
andparams
contain information sent by client, and the event handler validates the request.If request should be rejected, it raises the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Important
Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session).
The descendant class that overrides this method must call
super
as first action.
- handle_accept_data(channel: Channel, session: FBDPSession, data: bytes) None [source]¶
Event handler executed to process data received in DATA message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
data (bytes) – Data received from client.
- Return type:
None
The event handler may cancel the transmission by raising the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Note
The ACK-REQUEST in received DATA message is handled automatically by protocol.
- handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None [source]¶
Event handler called by
handle_msg()
on exception in message handler.- Parameters:
- Return type:
None
Sets service outcome to ERROR and notes exception as details.
- handle_pipe_closed(channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception = None) None [source]¶
Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
msg (FBDPMessage) – Received/sent CLOSE message.
exc (Exception) – Exception that caused the error.
- Return type:
None
Important
The base implementation does only two actions:
If exception is provided, sets service execution outcome to ERROR and notes exception in details.
Stops the service if
stop_on_close
is True.
The descendant class that overrides this method must call
super
.
- handle_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None [source]¶
Event handler executed to store data into outgoing DATA message.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
msg (FBDPMessage) – DATA message that will be sent to client.
- Return type:
None
The event handler must store the data in
msg.data_frame
attribute. It may also set ACK-REQUEST flag andtype_data
attribute.The event handler may cancel the transmission by raising the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Note
To indicate end of data, raise StopError with ErrorCode.OK code.
Note
Exceptions are handled by protocol, but only StopError is checked for protocol ErrorCode. As we want to report INVALID_DATA properly, we have to convert UnicodeError into StopError.
- handle_schedule_ready(channel: Channel, session: FBDPSession) None [source]¶
Event handler executed in order to send the READY message to the client later.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
The event handler may cancel the transmission by raising the
StopError
exception withcode
attribute containing theErrorCode
to be returned in CLOSE message.Important
The base implementation schedules
resend_ready()
according toready_schedule_interval
configuration option.
- initialize(config: BaseDataPipeConfig) None [source]¶
Verify configuration and assemble component structural parts.
- Parameters:
config (BaseDataPipeConfig) –
- Return type:
None
- release_resources() None [source]¶
Release resources aquired by component:
CLOSE all active pipe sessions.
- Return type:
None
- pipe_address: ZMQAddress¶
[Configuration] Data Pipe endpoint address
- pipe_format: MIME¶
[Configuration] Pipe data format specification
- pipe_mode: SocketMode¶
[Configuration] Data Pipe Mode
- protocol: FBDPServer | FBDPClient¶
FDBP protocol handler (server or client)
- server_socket: PipeSocket¶
Pipe socket this service handles if operated as server (bind). Must be set in descendant class. For PROVIDER it’s
PipeSocket.OUTPUT
, for CONSUMER it’sPipeSocket.INPUT
- class saturnin.lib.data.onepipe.DataProviderMicro(*args, **kwargs)[source]¶
Bases:
BaseDataPipeMicro
Base data provider microservice.
Descendant classes should override:
handle_accept_client
to validate client request and aquire resources associated with pipe.handle_produce_data
to produce data for outgoing DATA message.handle_pipe_closed
to release resource assiciated with pipe.
- initialize(config: DataProviderConfig) None [source]¶
Verify configuration and assemble component structural parts.
- Parameters:
config (DataProviderConfig) –
- Return type:
None
- class saturnin.lib.data.onepipe.DataConsumerMicro(*args, **kwargs)[source]¶
Bases:
BaseDataPipeMicro
Base data provider microservice.
Descendant classes should override:
handle_accept_client
to validate client request and aquire resources associated with pipe.handle_accept_data
to process received data.handle_pipe_closed
to release resource assiciated with pipe.
- initialize(config: DataConsumerConfig) None [source]¶
Verify configuration and assemble component structural parts.
- Parameters:
config (DataConsumerConfig) –
- Return type:
None