saturnin.protocol.fbdp¶
Saturnin reference implementation of Firebird Butler Data Pipe Protocol
See https://firebird-butler.readthedocs.io/en/latest/rfc/9/FBDP.html
Constants¶
- saturnin.protocol.fbdp.PROTO_OPEN: Final[str] = 'firebird.butler.FBDPOpenDataframe'¶
Protobuf message for FBDP OPEN message
- saturnin.protocol.fbdp.PROTO_ERROR: Final[str] = 'firebird.butler.ErrorDescription'¶
Protobuf message for FBDP ERROR message
- saturnin.protocol.fbdp.HEADER_FMT_FULL: Final[str] = '!4sBBH'¶
FBDP protocol control frame
structformat
Enums¶
- class saturnin.protocol.fbdp.MsgType(value)[source]¶
Bases:
IntEnumFBDP Message Type
- CLOSE = 5¶
- DATA = 4¶
- NOOP = 3¶
- OPEN = 1¶
- READY = 2¶
- UNKNOWN = 0¶
- class saturnin.protocol.fbdp.MsgFlag(value)[source]¶
Bases:
IntFlagFBDP message flag
- _generate_next_value_(start, count, last_values)¶
Generate the next value when not given.
name: the name of the member start: the initial start value or None count: the number of existing members last_values: the last value assigned or None
- ACK_REPLY = 2¶
- ACK_REQ = 1¶
- NONE = 0¶
- class saturnin.protocol.fbdp.ErrorCode(value)[source]¶
Bases:
IntEnumFBDP Error Code
- DATA_FORMAT_NOT_SUPPORTED = 103¶
- ERROR = 3¶
- FBDP_VERSION_NOT_SUPPORTED = 101¶
- INTERNAL_ERROR = 4¶
- INVALID_DATA = 5¶
- INVALID_MESSAGE = 1¶
- NOT_IMPLEMENTED = 102¶
- OK = 0¶
- PIPE_ENDPOINT_UNAVAILABLE = 100¶
- PROTOCOL_VIOLATION = 2¶
- TIMEOUT = 6¶
Classes¶
- class saturnin.protocol.fbdp.FBDPMessage[source]¶
Bases:
MessageFirebird Butler Datapipe Protocol (FBDP) Message.
- as_zmsg() TZMQMessage[source]¶
Returns message as sequence of ZMQ data frames.
- Returns:
A list of ZMQ frames. The first frame is the packed FBDP header generated by
get_header. It is followed by any message-specific data frames (e.g., serialized protobuf forOPEN, raw data forDATA, serializedErrorDescriptionprotobufs forCLOSE).- Return type:
- clear_flag(flag: MsgFlag) None[source]¶
Clear flag specified by
flagmask.- Parameters:
flag (MsgFlag)
- Return type:
None
- copy() Message[source]¶
Returns copy of the message.
- Returns:
A new
FBDPMessageinstance with a deep copy of relevant attributes (likedata_frameif it’s a protobuf or list) based on the original message’smsg_type. Header fields (msg_type,flags,type_data) are also copied.- Return type:
- from_zmsg(zmsg: TZMQMessage) None[source]¶
Populate message data from sequence of ZMQ data frames.
- Parameters:
zmsg (TZMQMessage) – Sequence of ZMQ frames. The first frame is the FBDP header, which is parsed to set
msg_type,flags, and initialtype_data. Subsequent frames, if any, constitute the message-specific payload (e.g., protobuf data forOPEN, raw data forDATA,ErrorDescriptionprotobufs forCLOSE).- Raises:
InvalidMessageError – If message is not a valid protocol message.
- Return type:
None
- get_keys() Iterable[source]¶
Returns iterable of dictionary keys to be used with
Protocol.handlers. Keys must be provided in order of precedence (from more specific to general).- Return type:
- note_exception(exc: Exception)[source]¶
Store information from exception into CLOSE Message.
- Parameters:
exc (Exception) – Exception to be stored
- class saturnin.protocol.fbdp.FBDPSession[source]¶
Bases:
SessionFBDP session. Contains information about Data Pipe.
- socket: PipeSocket¶
Data Pipe socket Identification.
- class saturnin.protocol.fbdp._FBDP(*args, **kwargs)[source]¶
Bases:
Protocol9/FBDP - Firebird Butler Data Pipe Protocol
- _FBDP__message_factory(zmsg: TypeAliasForwardRef('~saturnin.base.transport.TZMQMessage') | None = None) Message¶
Internal message factory.
- Parameters:
zmsg (TypeAliasForwardRef('~saturnin.base.transport.TZMQMessage') | None) – The raw ZMQ message. Not used by this specific factory implementation as
FBDPMessageis always created empty and then populated, but included forTMessageFactorysignature compatibility.- Return type:
- __init__(*, session_type: type[FBDPSession] = <class 'saturnin.protocol.fbdp.FBDPSession'>)[source]¶
- Parameters:
session_type (type[FBDPSession]) – Class for session objects.
- _init_new_batch(channel: Channel, session: FBDPSession) None[source]¶
Initializes the transmission of a new batch of DATA messages.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
- Return type:
None
- _on_output_ready(channel: Channel) None[source]¶
Event handler called when channel is ready to accept at least one outgoing message without blocking (or dropping it).
- Parameters:
channel (Channel) – Channel associated with data pipe.
- Return type:
None
- _send_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Sends next
DATAmessage to the client attached to PIPE_OUTPUT.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
msg (FBDPMessage) – Message to send.
- Return type:
None
- create_ack_reply(msg: FBDPMessage) FBDPMessage[source]¶
Returns new message that is an ACK-REPLY response message.
- Parameters:
msg (FBDPMessage) – Message to be answered.
- Return type:
- create_message_for(msg_type: MsgType, type_data: int = 0, flags: MsgFlag | None = None) FBDPMessage[source]¶
Returns message of particular FBDP message type.
- Parameters:
- Return type:
- handle_accept_data(channel: Channel, session: FBDPSession, data: bytes) None[source]¶
Default event hander executed when
DATAmessage is received from pipe.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
data (bytes) – Data received from peer.
- Return type:
None
The event handler may cancel the transmission by raising the
StopErrorexception withcode attributecontaining theErrorCodeto be returned inCLOSEmessage.Note
The ACK-REQUEST in received DATA message is handled automatically by protocol.
Important
The base implementation simply raises StopError with ErrorCode.OK code, which closes the pipe normally.
- handle_close_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Process
CLOSEmessage received from client.Calls
on_pipe_closedand then discards the session.- Parameters:
channel (Channel) – Channel that received the message.
session (FBDPSession) – Session instance.
msg (FBDPMessage) – Received message.
- Return type:
None
- handle_data_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Process
DATAmessage received from peer.If the session’s socket direction indicates data flows towards this protocol instance:
It validates that the transmission has started (i.e.,
session.transmitis notNone).If
ACK_REQis set in the message andself.confirm_processingisFalse, anACK_REPLYis sent immediately.The
on_accept_dataevent is triggered with the received data.If
ACK_REQis set andself.confirm_processingisTrue, anACK_REPLYis sent after theon_accept_dataevent handler completes.session.transmitis decremented. If it reaches zero,_init_new_batchis called to potentially start a new batch negotiation.
If the message is an
ACK_REPLYfor data previously sent by this instance:If
self.send_after_confirmedisTrue, more data is pending (session.transmit > 0), and data is available (checked viaon_get_dataif set), output is re-enabled on the channel to send the nextDATAmessage.The
on_data_confirmedevent is triggered.
Otherwise (e.g., a
DATAmessage sent to an output-only pipe socket, or an unexpectedACK_REPLY), aPROTOCOL_VIOLATIONerror is raised.- Parameters:
channel (Channel) – Channel that received the message.
session (FBDPSession) – Session instance for this transmission.
msg (FBDPMessage) – Received
DATAmessage orACK_REPLYfor aDATAmessage.
- Raises:
StopError – For protocol violations (e.g., out-of-band messages) or if sending an ACK-REPLY fails. Other exceptions from event handlers are caught by
handle_exception.- Return type:
None
Note
All exceptions raised in handler are handled by
handle_exception.
- handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None[source]¶
Called by
handle_msg()on exception in message handler.Sends CLOSE message and calls
on_exceptionhandler.
- handle_noop_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Process
NOOPmessage received from peer.- Parameters:
channel (Channel) – Channel that received the message.
session (FBDPSession) – Session instance.
msg (FBDPMessage) – Received message.
- Return type:
None
Note
All exceptions raised in handler are handled by
handle_exception.
- handle_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Default event handler executed when DATA message should be sent to pipe.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with peer.
msg (FBDPMessage) – DATA message that will be sent to peer.
- Return type:
None
Important
The base implementation simply raises StopError with ErrorCode.OK code, which closes the pipe normally.
- send_close(channel: Channel, session: FBDPSession, error_code: ErrorCode, exc: Exception | None = None) None[source]¶
Sends
CLOSEmessage, callson_pipe_closedand then discards the session.- Parameters:
channel (Channel) – Channel associate with data pipe.
session (FBDPSession) – Session associated with transmission.
error_code (ErrorCode) – Error code.
exc (Exception | None) – Exception that caused the error.
- Return type:
None
- send_ready(channel: Channel, session: FBDPSession, batch_size: int) None[source]¶
Sends
READYmessage.- Parameters:
channel (Channel) – Channel associate with data pipe.
session (FBDPSession) – Session associated with transmission.
batch_size (int) – Requested data transmission batch size.
- Raises:
StopError – When sending message fails.
- Return type:
None
- validate(zmsg: TZMQMessage) None[source]¶
Verifies that sequence of ZMQ data frames is a valid protocol message.
If this validation passes without exception, then
parse()of the same message must be successful as well.- Parameters:
zmsg (TZMQMessage) – ZeroMQ multipart message to be validated.
- Return type:
None
Checks include:
Presence of at least one frame.
Correct header length (8 bytes).
Valid FourCC (
FBDP).Supported protocol version (_FBDP.REVISION).
Valid message flags (ACK_REQ, ACK_REPLY).
Recognizable message type (
MsgType).Type-specific frame counts and content validity (e.g., protobuf parsing for
OPENandCLOSEerror frames, data frame limits forDATA, no data frames forREADY/NOOP).
- Raises:
InvalidMessageError – If ZMQ message is not a valid protocol message.
- Parameters:
zmsg (TZMQMessage)
- Return type:
None
- UID: Final[uuid.UUID] = UUID('34209338-6370-5e24-a28a-802814e6327c')¶
UUID instance that identifies the protocol.
- batch_size¶
Initial batch size
- confirm_processing: bool¶
CONSUMER option. Whether ACK_REPLY message for DATA/ACK_REQ should be sent before (False) or after (True) call to
on_accept_data()callback.
- on_accept_data¶
eventsocketexecuted for CONSUMER to process data received inDATAmessage.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with client.
data – Data received from client.
The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned in CLOSE message.Note
The ACK-REQUEST in received
DATAmessage is handled automatically by protocol.
- on_data_confirmed¶
eventsocketexecuted for PRODUCER when ACK_REPLY on sentDATAis received.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with peer.
type_data – Content of
type_datafield from receivedDATAmessage confirmation.
The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned inCLOSEmessage.
- on_get_data¶
eventsocketexecuted for PRODUCER to query the data source for data availability, and for CONSUMER to query whether data could be accepted.Important
If this event does not have handler assigned, the data source is considered as “stable” source that can always produce/consume data (for example data files are stable sources).
For PRODUCERS, handler MUST return True if there are data available for sending. If handler returns False, the transmission will be suspended until its resumed via
Channel.set_wait_out(True).The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned inCLOSEmessage.
- on_noop¶
eventsocketcalled whenNOOPmessage is received, and after ACK-REPLY (if requested) is send.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with peer.
Note
All exceptions raised in event handler are handled by
_FBDP.handle_exception.
- on_pipe_closed¶
eventsocketcalled whenCLOSEmessage is received or sent, to release any resources associated with current transmission.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with peer.
msg – Received/sent CLOSE message.
exc – Exception that caused the error.
Note
All exceptions raised in event handler are ignored.
- on_produce_data¶
eventsocketexecuted for PRODUCER to store data into outgoingDATAmessage.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with client.
msg – DATA message that will be sent to client.
The event handler must store the data in
msg.data_frameattribute. It may also set ACK-REQUEST flag andtype_dataattribute.The event handler may cancel the transmission by raising the
StopErrorexception withcodeattribute containing theErrorCodeto be returned inCLOSEmessage.Note
To indicate end of data, raise
StopErrorwithErrorCode.OKcode.
- class saturnin.protocol.fbdp.FBDPServer(*args, **kwargs)[source]¶
Bases:
_FBDP9/FBDP - Firebird Butler Data Pipe Protocol - Server side.
- __init__(*, session_type: type[FBDPSession] = <class 'saturnin.protocol.fbdp.FBDPSession'>)[source]¶
- Parameters:
session_type (type[FBDPSession]) – Class for session objects.
- _init_new_batch(channel: Channel, session: FBDPSession) None[source]¶
Initializes the transmission of a new batch of
DATAmessages.As we’re server, we also have to send
READYto the client.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
- handle_accept_client(channel: Channel, session: FBDPSession) None[source]¶
Default event handler that raises
StopErrorexception with ErrorCode.INTERNAL_ERROR.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
- handle_get_ready(channel: Channel, session: FBDPSession) int[source]¶
Default event handler that returns -1, unless
on_get_dataevent handler is assigned and it returns False - then it returns 0.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
- handle_open_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Process OPEN message received from client.
- Parameters:
channel (Channel) – Channel that received the message.
session (FBDPSession) – Session associated with client.
msg (FBDPMessage) – Received message.
- Return type:
None
Note
All exceptions raised in handler are handled by
_FBDP.handle_exception.
- handle_ready_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Process READY message received from client.
- Parameters:
channel (Channel) – Channel that received the message.
session (FBDPSession) – Session associated with client.
msg (FBDPMessage) – Received message.
- Return type:
None
Note
All exceptions raised in handler are handled by
_FBDP.handle_exception.
- handle_schedule_ready(channel: Channel, session: FBDPSession) None[source]¶
Default event handler that raises
StopErrorexception with ErrorCode.INTERNAL_ERROR.Note
This handler must be reasigned or overriden only when
on_get_readyevent handler may return zero.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
- resend_ready(channel: Channel, session: FBDPSession) None[source]¶
Send another
READYmessage to the client.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
- Return type:
None
- on_accept_client¶
eventsocketexecuted when client connects to the data pipe via OPEN message.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with client.
The session attributes
pipe,socket,data_formatandparamscontain information sent by client, and the event handler must validate the request.If request should be rejected, it must raise the
StopErrorexception withcode attributecontaining theErrorCodeto be returned inCLOSEmessage.
- on_get_ready¶
eventsocketexecuted to obtain the transmission batch size for the client.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with client.
- Returns:
0 = Not ready to transmit yet
n = Ready to transmit 1..<n> messages.
-1 = Ready to transmit 1..<protocol batch size> messages.
- Return type:
Number of messages that could be transmitted (batch size)
The event handler may cancel the transmission by raising the
StopErrorexception withcode attributecontaining theErrorCodeto be returned inCLOSEmessage.
- on_schedule_ready¶
The
eventsocketis executed in order to send theREADYmessage to the client later.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with client.
The event handler may cancel the transmission by raising the
StopErrorexception withcode attributecontaining theErrorCodeto be returned inCLOSEmessage.
- class saturnin.protocol.fbdp.FBDPClient(*args, **kwargs)[source]¶
Bases:
_FBDP9/FBDP - Firebird Butler Data Pipe Protocol - Client side.
- __init__(*, session_type: type[FBDPSession] = <class 'saturnin.protocol.fbdp.FBDPSession'>)[source]¶
- Parameters:
session_type (type[FBDPSession]) – Class for session objects.
batch_size – Default batch size.
- _init_new_batch(channel: Channel, session: FBDPSession) None[source]¶
Initializes the transmission of a new batch of DATA messages.
- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with server.
- Return type:
None
- accept_new_session(channel: Channel, routing_id: RoutingID, msg: FBDPMessage) bool[source]¶
Validates incoming message that initiated new session/transmission.
- Parameters:
channel (Channel) – Channel that received the message.
routing_id (RoutingID) – Routing ID of the sender.
msg (FBDPMessage) – Received message.
- Returns:
Always False (transmission must be initiated by Client).
- Return type:
- connect_with_session(channel: Channel) bool[source]¶
Called by
Channel.connectto determine whether new session should be associated with connected peer.As FBDP require that connecting peers must send OPEN message to initiate transmission, it always returns True.
- handle_open_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
OPENmessage received from server is violation of the protocol, so it raisesStopErrorwith this error code.- Parameters:
channel (Channel) – Channel that received the message.
session (FBDPSession) – Session instance.
msg (FBDPMessage) – Received message.
- Return type:
None
Note
All exceptions raised in handler are handled by
_FBDP.handle_exception.
- handle_ready_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]¶
Process
READYmessage received from server.- Parameters:
channel (Channel) – Channel that received the message.
session (FBDPSession) – Session instance.
msg (FBDPMessage) – Received message.
- Return type:
None
Note
All exceptions raised in handler are handled by
_FBDP.handle_exception.
- handle_server_ready(channel: Channel, session: FBDPSession, batch_size: int) int[source]¶
Default event handler that returns -1, unless
on_get_dataevent handler is assigned and it returns False - then it returns 0.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with client.
batch_size (int) – Batch size limit set by server.
- Return type:
- send_open(channel: Channel, session: FBDPSession, data_pipe: str, pipe_socket: PipeSocket, data_format: str, parameters: dict | None = None) None[source]¶
Sends
OPENmessage.- Parameters:
channel (Channel) – Channel associated with data pipe.
session (FBDPSession) – Session associated with transmission.
data_pipe (str) – Data pipe identification.
pipe_socket (PipeSocket) – Connected pipe socket.
data_format (str) – Required data format.
parameters (dict | None) – Data pipe parameters.
- Raises:
StopError – When sending message fails.
- Return type:
None
- on_init_session¶
eventsocketexecuted fromsend_open()to set additional information to newly created session instance.
- on_server_ready¶
eventsocketexecuted to negotiate the transmission batch size with server.- Parameters:
channel – Channel associated with data pipe.
session – Session associated with server.
batch_size – Max. batch size accepted by server.
- Returns:
0 = Not ready to transmit yet.
n = Ready to transmit 1..<n> messages.
-1 = Ready to transmit 1..<protocol batch size> messages.
- Return type:
Number of messages that could be transmitted (batch size)
Important
The returned value will be used ONLY when it’s smaller than
batch_size.The event handler may cancel the transmission by raising the
StopErrorexception withcode attributecontaining theErrorCodeto be returned inCLOSEmessage.