saturnin.protocol.fbdp

Saturnin reference implementation of Firebird Butler Data Pipe Protocol

See https://firebird-butler.readthedocs.io/en/latest/rfc/9/FBDP.html

Constants

saturnin.protocol.fbdp.PROTO_OPEN: Final[str] = 'firebird.butler.FBDPOpenDataframe'

Protobuf message for FBDP OPEN message

saturnin.protocol.fbdp.PROTO_ERROR: Final[str] = 'firebird.butler.ErrorDescription'

Protobuf message for FBDP ERROR message

saturnin.protocol.fbdp.HEADER_FMT_FULL: Final[str] = '!4sBBH'

FBDP protocol control frame struct format

saturnin.protocol.fbdp.HEADER_FMT: Final[str] = '!4xBBH'

FBDP protocol control frame struct format without FOURCC

saturnin.protocol.fbdp.FOURCC: Final[bytes] = b'FBDP'

FBDP protocol identification (FOURCC)

saturnin.protocol.fbdp.VERSION_MASK: Final[int] = 7

FBDP protocol version mask

saturnin.protocol.fbdp.DATA_BATCH_SIZE: Final[int] = 50

Default data batch size

Enums

class saturnin.protocol.fbdp.MsgType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

FBDP Message Type

CLOSE = 5
DATA = 4
NOOP = 3
OPEN = 1
READY = 2
UNKNOWN = 0
class saturnin.protocol.fbdp.MsgFlag(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntFlag

FBDP message flag

ACK_REPLY = 2
ACK_REQ = 1
NONE = 0
class saturnin.protocol.fbdp.ErrorCode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

FBDP Error Code

DATA_FORMAT_NOT_SUPPORTED = 103
ERROR = 3
FBDP_VERSION_NOT_SUPPORTED = 101
INTERNAL_ERROR = 4
INVALID_DATA = 5
INVALID_MESSAGE = 1
NOT_IMPLEMENTED = 102
OK = 0
PIPE_ENDPOINT_UNAVAILABLE = 100
PROTOCOL_VIOLATION = 2
TIMEOUT = 6

Classes

class saturnin.protocol.fbdp.FBDPMessage[source]

Bases: Message

Firebird Butler Datapipe Protocol (FBDP) Message.

as_zmsg() TZMQMessage[source]

Returns message as sequence of ZMQ data frames.

Return type:

TZMQMessage

clear() None[source]

Clears message data.

Return type:

None

clear_flag(flag: MsgFlag) None[source]

Clear flag specified by flag mask.

Parameters:

flag (MsgFlag) –

Return type:

None

copy() Message[source]

Returns copy of the message.

Return type:

Message

from_zmsg(zmsg: TZMQMessage) None[source]

Populate message data from sequence of ZMQ data frames.

Parameters:

zmsg (TZMQMessage) – Sequence of frames that should be deserialized.

Raises:

InvalidMessageError – If message is not a valid protocol message.

Return type:

None

get_header() bytes[source]

Return message header (FBDP control frame).

Return type:

bytes

get_keys() Iterable[source]

Returns iterable of dictionary keys to be used with Protocol.handlers. Keys must be provided in order of precedence (from more specific to general).

Return type:

Iterable

has_ack_reply() bool[source]

Returns True if message has ASK_REPLY flag set.

Return type:

bool

has_ack_req() bool[source]

Returns True if message has ACK_REQ flag set.

Return type:

bool

note_exception(exc: Exception)[source]

Store information from exception into CLOSE Message.

Parameters:

exc (Exception) – Exception to be stored

set_flag(flag: MsgFlag) None[source]

Set flag specified by flag mask.

Parameters:

flag (MsgFlag) – Flag to be set.

Return type:

None

data_frame: Message | Any

Data frame associated with message type (or None)

flags: MsgFlag

Message flags

msg_type: MsgType

Type of message

type_data: int

Data associated with message

class saturnin.protocol.fbdp.FBDPSession[source]

Bases: Session

FBDP session. Contains information about Data Pipe.

await_ready: bool

Indicator that server sent READY and waits from READY response from client

data_format: str

Specification of format for user data transmitted in DATA messages.

params: Dict

Data Pipe parameters.

pipe: str

Data Pipe Identification.

socket: PipeSocket

Data Pipe socket Identification.

transmit: int

Number of DATA messages that remain to be transmitted since last READY message.

class saturnin.protocol.fbdp._FBDP(*args, **kwargs)[source]

Bases: Protocol

9/FBDP - Firebird Butler Data Pipe Protocol

_FBDP__message_factory(zmsg: TZMQMessage = None) Message

Internal message factory

Parameters:

zmsg (TZMQMessage) –

Return type:

Message

__init__(*, session_type: ~typing.Type[~saturnin.protocol.fbdp.FBDPSession] = <class 'saturnin.protocol.fbdp.FBDPSession'>)[source]
Parameters:

session_type (Type[FBDPSession]) – Class for session objects.

_init_new_batch(channel: Channel, session: FBDPSession) None[source]

Initializes the transmission of a new batch of DATA messages.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

Return type:

None

_on_output_ready(channel: Channel) None[source]

Event handler called when channel is ready to accept at least one outgoing message without blocking (or dropping it).

Parameters:

channel (Channel) – Channel associated with data pipe.

Return type:

None

_send_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Sends next DATA message to the client attached to PIPE_OUTPUT.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

  • msg (FBDPMessage) – Message to send.

Return type:

None

create_ack_reply(msg: FBDPMessage) FBDPMessage[source]

Returns new message that is an ACK-REPLY response message.

Parameters:

msg (FBDPMessage) – Message to be answered.

Return type:

FBDPMessage

create_message_for(msg_type: MsgType, type_data: int = 0, flags: MsgFlag = None) FBDPMessage[source]

Returns message of particular FBDP message type.

Parameters:
  • msg_type (MsgType) – Type of message to be created.

  • type_data (int) – Message control data.

  • flags (MsgFlag) – Message flags.

Return type:

FBDPMessage

handle_accept_data(channel: Channel, session: FBDPSession, data: bytes) None[source]

Default event hander executed when DATA message is received from pipe.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

  • data (bytes) – Data received from peer.

Return type:

None

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

The ACK-REQUEST in received DATA message is handled automatically by protocol.

Important

The base implementation simply raises StopError with ErrorCode.OK code, which closes the pipe normally.

handle_close_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Process CLOSE message received from client.

Calls on_pipe_closed and then discards the session.

Parameters:
Return type:

None

handle_data_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Process DATA message received from client.

Parameters:
Return type:

None

Note

All exceptions are handled by handle_exception.

handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None[source]

Called by handle_msg() on exception in message handler.

Sends CLOSE message and calls on_exception handler.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (Session) – Session associated with peer.

  • msg (Message) – Message associated with exception.

  • exc (Exception) – Exception raised

Return type:

None

handle_noop_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Process NOOP message received from peer.

Parameters:
Return type:

None

handle_produce_data(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Default event handler executed when DATA message should be sent to pipe.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with peer.

  • msg (FBDPMessage) – DATA message that will be sent to peer.

Return type:

None

Important

The base implementation simply raises StopError with ErrorCode.OK code, which closes the pipe normally.

send_close(channel: Channel, session: FBDPSession, error_code: ErrorCode, exc: Exception = None) None[source]

Sends CLOSE message, calls on_pipe_closed and then discards the session.

Parameters:
  • channel (Channel) – Channel associate with data pipe.

  • session (FBDPSession) – Session associated with transmission.

  • error_code (ErrorCode) – Error code.

  • exc (Exception) – Exception that caused the error.

Return type:

None

send_ready(channel: Channel, session: FBDPSession, batch_size: int) None[source]

Sends READY message.

Parameters:
  • channel (Channel) – Channel associate with data pipe.

  • session (FBDPSession) – Session associated with transmission.

  • batch_size (int) – Requested data transmission batch size.

Raises:

StopError – When sending message fails.

Return type:

None

validate(zmsg: TZMQMessage) None[source]

Verifies that sequence of ZMQ data frames is a valid protocol message.

If this validation passes without exception, then parse() of the same message must be successful as well.

Parameters:

zmsg (TZMQMessage) – ZeroMQ multipart message.

Raises:

InvalidMessageError – If ZMQ message is not a valid protocol message.

Return type:

None

OID: Final[str] = '1.3.6.1.4.1.53446.1.3.2'

string with protocol OID (dot notation).

UID: Final[uuid.UUID] = UUID('34209338-6370-5e24-a28a-802814e6327c')

UUID instance that identifies the protocol.

batch_size

Initial batch size

confirm_processing: bool

CONSUMER option. Whether ACK_REPLY message for DATA/ACK_REQ should be sent before (False) or after (True) call to on_accept_data() callback.

property logging_id: str

Returns _logging_id_ or <class_name>

on_accept_data

eventsocket executed for CONSUMER to process data received in DATA message.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with client.

  • data – Data received from client.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

The ACK-REQUEST in received DATA message is handled automatically by protocol.

on_data_confirmed

eventsocket executed for PRODUCER when ACK_REPLY on sent DATA is received.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with peer.

  • type_data – Content of type_data field from received DATA message confirmation.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

on_get_data

eventsocket executed for PRODUCER to query the data source for data availability, and for CONSUMER to query whether data could be accepted.

Important

If this event does not have handler assigned, the data source is considered as “stable” source that can always produce/consume data (for example data files are stable sources).

For PRODUCERS, handler MUST return True if there are data available for sending. If handler returns False, the transmission will be suspended until its resumed via Channel.set_wait_out(True).

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

on_noop

eventsocket called when NOOP message is received, and after ACK-REPLY (if requested) is send.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with peer.

on_pipe_closed

eventsocket called when CLOSE message is received or sent, to release any resources associated with current transmission.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with peer.

  • msg – Received/sent CLOSE message.

  • exc – Exception that caused the error.

on_produce_data

eventsocket executed for PRODUCER to store data into outgoing DATA message.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with client.

  • msg – DATA message that will be sent to client.

The event handler must store the data in msg.data_frame attribute. It may also set ACK-REQUEST flag and type_data attribute.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

Note

To indicate end of data, raise StopError with ErrorCode.OK code.

send_after_confirmed: bool

PRODUCER option. If sent DATA message has ACK_REQ flag set, send next DATA message after ACK_REPLY is received (True), or continue sending DATA without delay (False).

class saturnin.protocol.fbdp.FBDPServer(*args, **kwargs)[source]

Bases: _FBDP

9/FBDP - Firebird Butler Data Pipe Protocol - Server side.

__init__(*, session_type: ~typing.Type[~saturnin.protocol.fbdp.FBDPSession] = <class 'saturnin.protocol.fbdp.FBDPSession'>)[source]
Parameters:

session_type (Type[FBDPSession]) – Class for session objects.

_init_new_batch(channel: Channel, session: FBDPSession) None[source]

Initializes the transmission of a new batch of DATA messages.

As we’re server, we also have to send READY to the client.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

handle_accept_client(channel: Channel, session: FBDPSession) None[source]

Default event handler that raises StopError exception with ErrorCode.INTERNAL_ERROR.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

handle_get_ready(channel: Channel, session: FBDPSession) int[source]

Default event handler that returns -1, unless on_get_data event handler is assigned and it returns False - then it returns 0.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

int

handle_open_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Process OPEN message received from client.

Parameters:
  • channel (Channel) – Channel that received the message.

  • session (FBDPSession) – Session associated with client.

  • msg (FBDPMessage) – Received message.

Return type:

None

Note

All exceptions are handled by handle_exception.

handle_ready_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Process READY message received from client.

Parameters:
  • channel (Channel) – Channel that received the message.

  • session (FBDPSession) – Session associated with client.

  • msg (FBDPMessage) – Received message.

Return type:

None

Note

All exceptions are handled by handle_exception.

handle_schedule_ready(channel: Channel, session: FBDPSession) None[source]

Default event handler that raises StopError exception with ErrorCode.INTERNAL_ERROR.

Note

This handler must be reasigned or overriden only when on_get_ready event handler may return zero.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

resend_ready(channel: Channel, session: FBDPSession) None[source]

Send another READY message to the client.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

Return type:

None

on_accept_client

eventsocket executed when client connects to the data pipe via OPEN message.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with client.

The session attributes pipe, socket, data_format and params contain information sent by client, and the event handler must validate the request.

If request should be rejected, it must raise the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

on_get_ready

eventsocket executed to obtain the transmission batch size for the client.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with client.

Returns:

  • 0 = Not ready to transmit yet

  • n = Ready to transmit 1..<n> messages.

  • -1 = Ready to transmit 1..<protocol batch size> messages.

Return type:

Number of messages that could be transmitted (batch size)

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

on_schedule_ready

The eventsocket is executed in order to send the READY message to the client later.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with client.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.

class saturnin.protocol.fbdp.FBDPClient(*args, **kwargs)[source]

Bases: _FBDP

9/FBDP - Firebird Butler Data Pipe Protocol - Client side.

__init__(*, session_type: ~typing.Type[~saturnin.protocol.fbdp.FBDPSession] = <class 'saturnin.protocol.fbdp.FBDPSession'>)[source]
Parameters:
  • session_type (Type[FBDPSession]) – Class for session objects.

  • batch_size – Default batch size.

_init_new_batch(channel: Channel, session: FBDPSession) None[source]

Initializes the transmission of a new batch of DATA messages.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with server.

Return type:

None

accept_new_session(channel: Channel, routing_id: RoutingID, msg: FBDPMessage) bool[source]

Validates incoming message that initiated new session/transmission.

Parameters:
  • channel (Channel) – Channel that received the message.

  • routing_id (RoutingID) – Routing ID of the sender.

  • msg (FBDPMessage) – Received message.

Returns:

Always False (transmission must be initiated by Client).

Return type:

bool

connect_with_session(channel: Channel) bool[source]

Called by Channel.connect to determine whether new session should be associated with connected peer.

As FBDP require that connecting peers must send OPEN message to initiate transmission, it always returns True.

Parameters:

channel (Channel) – Channel associated with data pipe.

Return type:

bool

handle_open_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

OPEN message received from server is violation of the protocol, so it raises StopError with this error code.

Parameters:
Return type:

None

Note

All exceptions are handled by handle_exception.

handle_ready_msg(channel: Channel, session: FBDPSession, msg: FBDPMessage) None[source]

Process READY message received from server.

Parameters:
Return type:

None

Note

All exceptions are handled by handle_exception.

handle_server_ready(channel: Channel, session: FBDPSession, batch_size: int) int[source]

Default event handler that returns -1, unless on_get_data event handler is assigned and it returns False - then it returns 0.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with client.

  • batch_size (int) – Batch size limit set by server.

Return type:

int

send_open(channel: Channel, session: FBDPSession, data_pipe: str, pipe_socket: PipeSocket, data_format: str, parameters: Dict = None) None[source]

Sends OPEN message.

Parameters:
  • channel (Channel) – Channel associated with data pipe.

  • session (FBDPSession) – Session associated with transmission.

  • data_pipe (str) – Data pipe identification.

  • pipe_socket (PipeSocket) – Connected pipe socket.

  • data_format (str) – Required data format.

  • parameters (Dict) – Data pipe parameters.

Raises:

StopError – When sending message fails.

Return type:

None

on_init_session

eventsocket executed from send_open() to set additional information to newly created session instance.

on_server_ready

eventsocket executed to negotiate the transmission batch size with server.

Parameters:
  • channel – Channel associated with data pipe.

  • session – Session associated with server.

  • batch_size – Max. batch size accepted by server.

Returns:

  • 0 = Not ready to transmit yet.

  • n = Ready to transmit 1..<n> messages.

  • -1 = Ready to transmit 1..<protocol batch size> messages.

Return type:

Number of messages that could be transmitted (batch size)

Important

The returned value will be used ONLY when it’s smaller than batch_size.

The event handler may cancel the transmission by raising the StopError exception with code attribute containing the ErrorCode to be returned in CLOSE message.