# SPDX-FileCopyrightText: 2019-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: saturnin
# FILE: saturnin/protocol/fbdp.py
# DESCRIPTION: Firebird Butler Data Pipe Protocol
# See https://firebird-butler.readthedocs.io/en/latest/rfc/9/FBDP.html
# CREATED: 30.7.2019
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2019 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
# ______________________________________.
# pylint: disable=R0902, R0903, R0912, R0913, C0301, W0702, W0703
"""Saturnin reference implementation of Firebird Butler Data Pipe Protocol
See https://firebird-butler.readthedocs.io/en/latest/rfc/9/FBDP.html
"""
from __future__ import annotations
from typing import Type, Dict, Any, Union, Iterable, Final
import uuid
import warnings
from struct import pack, unpack
from enum import IntEnum, IntFlag
from firebird.base.signal import eventsocket
from firebird.base.protobuf import ProtoMessage, create_message, dict2struct, struct2dict
from saturnin.base import (InvalidMessageError, StopError, RoutingID, TZMQMessage,
PipeSocket, Channel, Protocol, Message, Session, ANY)
#: Protobuf message for FBDP OPEN message
PROTO_OPEN: Final[str] = 'firebird.butler.FBDPOpenDataframe'
#: Protobuf message for FBDP ERROR message
PROTO_ERROR: Final[str] = 'firebird.butler.ErrorDescription'
#: FBDP protocol control frame :mod:`struct` format
HEADER_FMT_FULL: Final[str] = '!4sBBH'
#: FBDP protocol control frame :mod:`struct` format without FOURCC
HEADER_FMT: Final[str] = '!4xBBH'
#: FBDP protocol identification (FOURCC)
FOURCC: Final[bytes] = b'FBDP'
#: FBDP protocol version mask
VERSION_MASK: Final[int] = 7
#: Default data batch size
DATA_BATCH_SIZE: Final[int] = 50
[docs]
class MsgType(IntEnum):
"""FBDP Message Type"""
UNKNOWN = 0 # not a valid message type
OPEN = 1 # initial message from client
READY = 2 # transfer negotiation message
NOOP = 3 # no operation, used for keep-alive & ping purposes
DATA = 4 # user data
CLOSE = 5 # sent by peer that is going to close the connection
[docs]
class MsgFlag(IntFlag):
"""FBDP message flag"""
NONE = 0
ACK_REQ = 1
ACK_REPLY = 2
[docs]
class ErrorCode(IntEnum):
"""FBDP Error Code"""
# No error
OK = 0
# General errors
INVALID_MESSAGE = 1
PROTOCOL_VIOLATION = 2
ERROR = 3
INTERNAL_ERROR = 4
INVALID_DATA = 5
TIMEOUT = 6
# Errors that prevent the connection from opening
PIPE_ENDPOINT_UNAVAILABLE = 100
FBDP_VERSION_NOT_SUPPORTED = 101
NOT_IMPLEMENTED = 102
DATA_FORMAT_NOT_SUPPORTED = 103
[docs]
class FBDPMessage(Message):
"""Firebird Butler Datapipe Protocol (FBDP) Message.
"""
def __init__(self):
#: Type of message
self.msg_type: MsgType = MsgType.UNKNOWN
#: Message flags
self.flags: MsgFlag = MsgFlag(0)
#: Data associated with message
self.type_data: int = 0
#: Data frame associated with message type (or None)
self.data_frame: Union[ProtoMessage, Any] = None
def __str__(self):
return f"{self.__class__.__qualname__}[{self.msg_type.name}]"
__repr__ = __str__
[docs]
def from_zmsg(self, zmsg: TZMQMessage) -> None:
"""Populate message data from sequence of ZMQ data frames.
Arguments:
zmsg: Sequence of frames that should be deserialized.
Raises:
InvalidMessageError: If message is not a valid protocol message.
"""
try:
control_byte, flags, self.type_data = unpack(HEADER_FMT, zmsg.pop(0))
self.msg_type = MsgType(control_byte >> 3)
self.flags = MsgFlag(flags)
if self.msg_type is MsgType.OPEN:
self.data_frame = create_message(PROTO_OPEN)
self.data_frame.ParseFromString(zmsg.pop(0))
elif self.msg_type is MsgType.DATA:
self.data_frame = zmsg.pop(0) if zmsg else None
elif self.msg_type is MsgType.CLOSE:
self.type_data = ErrorCode(self.type_data)
self.data_frame = []
while zmsg:
err = create_message(PROTO_ERROR)
err.ParseFromString(zmsg.pop(0))
self.data_frame.append(err)
except Exception as exc:
raise InvalidMessageError("Invalid message") from exc
[docs]
def as_zmsg(self) -> TZMQMessage:
"""Returns message as sequence of ZMQ data frames.
"""
zmsg = []
zmsg.append(self.get_header())
if self.msg_type is MsgType.OPEN:
zmsg.append(self.data_frame.SerializeToString())
elif (self.msg_type is MsgType.DATA and self.data_frame is not None):
zmsg.append(self.data_frame)
elif self.msg_type is MsgType.CLOSE:
while self.data_frame:
zmsg.append(self.data_frame.pop(0).SerializeToString())
return zmsg
[docs]
def clear(self) -> None:
"""Clears message data.
"""
self.msg_type = MsgType.UNKNOWN
self.type_data = 0
self.flags = MsgFlag(0)
self.data_frame = None
[docs]
def copy(self) -> Message:
"""Returns copy of the message.
"""
msg: FBDPMessage = self.__class__()
msg.msg_type = self.msg_type
msg.flags = self.flags
msg.type_data = self.type_data
if self.msg_type is MsgType.OPEN:
msg.data_frame = create_message(PROTO_OPEN)
msg.data_frame.CopyFrom(self.data_frame)
elif self.msg_type is MsgType.CLOSE:
msg.data_frame = []
for frame in self.data_frame:
err = create_message(PROTO_ERROR)
err.CopyFrom(frame)
msg.data_frame.append(err)
else:
msg.data_frame = self.data_frame
return msg
[docs]
def get_keys(self) -> Iterable:
"""Returns iterable of dictionary keys to be used with `.Protocol.handlers`.
Keys must be provided in order of precedence (from more specific to general).
"""
return [self.msg_type, ANY]
[docs]
def has_ack_req(self) -> bool:
"""Returns True if message has ACK_REQ flag set.
"""
return MsgFlag.ACK_REQ in self.flags
[docs]
def has_ack_reply(self) -> bool:
"""Returns True if message has ASK_REPLY flag set.
"""
return MsgFlag.ACK_REPLY in self.flags
[docs]
def set_flag(self, flag: MsgFlag) -> None:
"""Set flag specified by `flag` mask.
Arguments:
flag: Flag to be set.
"""
self.flags |= flag
[docs]
def clear_flag(self, flag: MsgFlag) -> None:
"""Clear flag specified by `flag` mask.
"""
self.flags &= ~flag
[docs]
def note_exception(self, exc: Exception):
"""Store information from exception into CLOSE Message.
Arguments:
exc: Exception to be stored
"""
assert self.msg_type is MsgType.CLOSE
errdesc = create_message(PROTO_ERROR)
if hasattr(exc, 'code'):
errdesc.code = getattr(exc, 'code')
errdesc.description = str(exc)
self.data_frame.append(errdesc)
[docs]
class FBDPSession(Session):
"""FBDP session. Contains information about Data Pipe.
"""
def __init__(self):
super().__init__()
#: Data Pipe Identification.
self.pipe: str = None
#: Data Pipe socket Identification.
self.socket: PipeSocket = None
#: Specification of format for user data transmitted in DATA messages.
self.data_format: str = None
#: Data Pipe parameters.
self.params: Dict = {}
#: Number of DATA messages that remain to be transmitted since last READY message.
self.transmit: int = None
#: Indicator that server sent READY and waits from READY response from client
self.await_ready: bool = False
[docs]
class _FBDP(Protocol):
"""9/FBDP - Firebird Butler Data Pipe Protocol
"""
#: string with protocol OID (dot notation).
OID: Final[str] = '1.3.6.1.4.1.53446.1.3.2'
# iso.org.dod.internet.private.enterprise.firebird.butler.protocol.fbdp
#: UUID instance that identifies the protocol.
UID: Final[uuid.UUID] = uuid.uuid5(uuid.NAMESPACE_OID, OID)
[docs]
def __init__(self, *, session_type: Type[FBDPSession] = FBDPSession):
"""
Arguments:
session_type: Class for session objects.
"""
super().__init__(session_type=session_type)
#: Initial batch size
self.batch_size = DATA_BATCH_SIZE
#: CONSUMER option. Whether ACK_REPLY message for DATA/ACK_REQ should be sent
#: before (False) or after (True) call to `.on_accept_data()` callback.
self.confirm_processing: bool = False
#: PRODUCER option. If sent DATA message has ACK_REQ flag set, send next DATA
#: message after ACK_REPLY is received (True), or continue sending DATA without
#: delay (False).
self.send_after_confirmed: bool = True
# Session socket that means that messages flow to us
self._flow_in_socket: PipeSocket = None
self.on_produce_data = self.handle_produce_data
self.on_accept_data = self.handle_accept_data
self._msg: FBDPMessage = FBDPMessage()
self.message_factory = self.__message_factory
self.handlers.update({MsgType.NOOP: self.handle_noop_msg,
MsgType.DATA: self.handle_data_msg,
MsgType.CLOSE: self.handle_close_msg,
})
def __message_factory(self, zmsg: TZMQMessage=None) -> Message: # pylint: disable=W0613
"Internal message factory"
self._msg.clear()
return self._msg
[docs]
def _init_new_batch(self, channel: Channel, session: FBDPSession) -> None:
"""Initializes the transmission of a new batch of DATA messages.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
"""
raise NotImplementedError()
[docs]
def _send_data(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Sends next `DATA` message to the client attached to PIPE_OUTPUT.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
msg: Message to send.
"""
error_code = None
exc = None
try:
self.on_produce_data(channel, session, msg)
channel.send(msg, session)
session.transmit -= 1
if session.transmit > 0:
if msg.has_ack_req() and self.send_after_confirmed:
channel.set_wait_out(False, session)
elif self.on_get_data.is_set():
if not self.on_get_data(channel, session):
channel.set_wait_out(False, session)
else:
channel.set_wait_out(False, session)
self._init_new_batch(channel, session)
except StopError as err:
error_code = getattr(err, 'code', ErrorCode.ERROR)
if error_code is not ErrorCode.OK:
exc = err
except Exception as err:
error_code = ErrorCode.INTERNAL_ERROR
exc = err
if error_code is not None:
self.send_close(channel, session, error_code, exc)
[docs]
def _on_output_ready(self, channel: Channel) -> None:
"""Event handler called when channel is ready to accept at least one outgoing message
without blocking (or dropping it).
Arguments:
channel: Channel associated with data pipe.
"""
for session in list(channel.sessions.values()):
if session.send_pending:
msg = self.create_message_for(MsgType.DATA)
# This is called directly and not via `handle_msg()` and message handler,
# so it's necessary to handle exceptions like `handle_msg()` does.
try:
self._send_data(channel, session, msg)
except Exception as exc:
try:
self.handle_exception(channel, session, msg, exc)
except:
warnings.warn('Exception raised in exception handler', RuntimeWarning)
[docs]
def validate(self, zmsg: TZMQMessage) -> None:
"""Verifies that sequence of ZMQ data frames is a valid protocol message.
If this validation passes without exception, then `.parse()` of the same message
must be successful as well.
Arguments:
zmsg: ZeroMQ multipart message.
Raises:
InvalidMessageError: If ZMQ message is not a valid protocol message.
"""
if not zmsg:
raise InvalidMessageError("Empty message")
fbdp_header = zmsg[0]
if len(fbdp_header) != 8:
raise InvalidMessageError("Message header must be 8 bytes long")
try:
fourcc, control_byte, flags, _ = unpack(HEADER_FMT_FULL, fbdp_header)
except Exception as exp:
raise InvalidMessageError("Invalid control frame") from exp
if fourcc != FOURCC:
raise InvalidMessageError("Invalid FourCC")
if (control_byte & VERSION_MASK) != self.REVISION:
raise InvalidMessageError("Invalid protocol version")
if (flags | 3) > 3:
raise InvalidMessageError("Invalid flags")
try:
message_type = MsgType(control_byte >> 3)
except ValueError as exc:
raise InvalidMessageError(f"Illegal message type {control_byte >> 3}") from exc
if message_type is MsgType.OPEN:
if len(zmsg) != 2:
raise InvalidMessageError("OPEN message must have a dataframe")
try:
fpb = create_message(PROTO_OPEN)
fpb.ParseFromString(zmsg[1])
if not fpb.data_pipe:
raise ValueError("Missing 'data_pipe' specification")
pipe_socket = PipeSocket(fpb.pipe_socket)
if pipe_socket is PipeSocket.UNKNOWN_PIPE_SOCKET:
raise ValueError("Invalid 'pipe_socket'")
if not fpb.data_format:
raise ValueError("Missing 'data_format' specification")
except Exception as exc:
raise InvalidMessageError("Invalid data frame for OPEN message") from exc
elif (message_type is MsgType.CLOSE and len(zmsg) > 1):
fpb = create_message(PROTO_ERROR)
for frame in zmsg[1:]:
fpb.ParseFromString(frame)
if not fpb.description:
raise InvalidMessageError("Missing error description")
elif (message_type is MsgType.DATA and len(zmsg) > 2):
raise InvalidMessageError("DATA message may have only one data frame")
elif (message_type in (MsgType.READY, MsgType.NOOP) and len(zmsg) > 1):
raise InvalidMessageError("Data frames not allowed for READY and NOOP messages")
[docs]
def handle_exception(self, channel: Channel, session: Session, msg: Message, exc: Exception) -> None:
"""Called by `.handle_msg()` on exception in message handler.
Sends CLOSE message and calls `.on_exception` handler.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
msg: Message associated with exception.
exc: Exception raised
"""
error_code = getattr(exc, 'code', ErrorCode.ERROR) if isinstance(exc, StopError) \
else ErrorCode.INTERNAL_ERROR
self.send_close(channel, session, error_code, exc)
super().handle_exception(channel, session, msg, exc)
[docs]
def handle_produce_data(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Default event handler executed when DATA message should be sent to pipe.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
msg: DATA message that will be sent to peer.
Important:
The base implementation simply raises StopError with ErrorCode.OK code,
which closes the pipe normally.
"""
raise StopError('OK', code=ErrorCode.OK)
[docs]
def handle_accept_data(self, channel: Channel, session: FBDPSession, data: bytes) -> None:
"""Default event hander executed when `DATA` message is received from pipe.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
data: Data received from peer.
The event handler may cancel the transmission by raising the `.StopError` exception
with `code attribute` containing the `.ErrorCode` to be returned in `CLOSE` message.
Note:
The ACK-REQUEST in received DATA message is handled automatically by protocol.
Important:
The base implementation simply raises StopError with ErrorCode.OK code,
which closes the pipe normally.
"""
raise StopError('OK', code=ErrorCode.OK)
[docs]
def handle_noop_msg(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Process `NOOP` message received from peer.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
"""
if msg.has_ack_req():
channel.send(self.create_ack_reply(msg), session)
self.on_noop(channel, session)
[docs]
def handle_data_msg(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Process `DATA` message received from client.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `handle_exception`.
"""
if session.socket is self._flow_in_socket:
# DATA flow to us (INPUT for server context, OUTPUT for client context)
if session.transmit is None:
# Transmission not started, DATA out of band
raise StopError("Out of band DATA message",
code=ErrorCode.PROTOCOL_VIOLATION)
# ACK before processing?
if msg.has_ack_req() and not self.confirm_processing:
# We must create reply message directly to keep received message
reply = FBDPMessage()
reply.msg_type = msg.msg_type
reply.type_data = msg.type_data
reply.set_flag(MsgFlag.ACK_REPLY)
if channel.send(msg, session) != 0:
raise StopError("ACK-REPLY send failed", code=ErrorCode.ERROR)
# Process incoming data
self.on_accept_data(channel, session, msg.data_frame)
# ACK after processing?
if msg.has_ack_req() and self.confirm_processing:
if channel.send(self.create_ack_reply(msg), session) != 0:
raise StopError("ACK-REPLY send failed", code=ErrorCode.ERROR)
session.transmit -= 1
if session.transmit == 0:
self._init_new_batch(channel, session)
else:
# DATA flow from us (OUTPUT for server context, INPUT for client context)
if msg.has_ack_reply():
if (session.transmit > 0) and self.send_after_confirmed:
# Re-Initiate transfer to output (via I/O loop) if data are available
if not self.on_get_data.is_set() or self.on_get_data(channel, session):
channel.set_wait_out(True, session)
self.on_data_confirmed(channel, session, msg.type_data)
else:
# Only client attached to PIPE_INPUT can send DATA messages
socket: PipeSocket = PipeSocket.OUTPUT \
if self._flow_in_socket is PipeSocket.INPUT else PipeSocket.INPUT
raise StopError(f"DATA message sent to {socket.name} socket",
code=ErrorCode.PROTOCOL_VIOLATION)
[docs]
def handle_close_msg(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Process `CLOSE` message received from client.
Calls `on_pipe_closed` and then discards the session.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
"""
try:
self.on_pipe_closed(channel, session, msg)
except:
# We don't want to handle this via `handle_exception` and we're closing
# the pipe anyway
pass
finally:
channel.discard_session(session)
[docs]
def create_message_for(self, msg_type: MsgType, type_data: int=0, flags: MsgFlag=None) -> FBDPMessage:
"""Returns message of particular FBDP message type.
Arguments:
msg_type: Type of message to be created.
type_data: Message control data.
flags: Message flags.
"""
msg: FBDPMessage = self.message_factory()
msg.msg_type = msg_type
msg.type_data = type_data
if flags is not None:
msg.flags = flags
if msg.msg_type is MsgType.OPEN:
msg.data_frame = create_message(PROTO_OPEN)
elif msg.msg_type is MsgType.CLOSE:
msg.data_frame = []
return msg
[docs]
def create_ack_reply(self, msg: FBDPMessage) -> FBDPMessage:
"""Returns new message that is an ACK-REPLY response message.
Arguments:
msg: Message to be answered.
"""
reply = self.create_message_for(msg.msg_type, msg.type_data, msg.flags)
reply.clear_flag(MsgFlag.ACK_REQ)
reply.set_flag(MsgFlag.ACK_REPLY)
return reply
[docs]
def send_ready(self, channel: Channel, session: FBDPSession, batch_size: int) -> None:
"""Sends `READY` message.
Arguments:
channel: Channel associate with data pipe.
session: Session associated with transmission.
batch_size: Requested data transmission batch size.
Raises:
StopError: When sending message fails.
"""
msg = self.create_message_for(MsgType.READY, batch_size)
if channel.send(msg, session) != 0:
raise StopError("Broken pipe, can't send READY message", code=ErrorCode.ERROR)
[docs]
def send_close(self, channel: Channel, session: FBDPSession, error_code: ErrorCode,
exc: Exception=None) -> None:
"""Sends `CLOSE` message, calls `on_pipe_closed` and then discards the session.
Arguments:
channel: Channel associate with data pipe.
session: Session associated with transmission.
error_code: Error code.
exc: Exception that caused the error.
"""
msg = self.create_message_for(MsgType.CLOSE, error_code)
if exc:
msg.note_exception(exc)
try:
channel.send(msg, session)
self.on_pipe_closed(channel, session, msg, exc)
finally:
channel.discard_session(session)
@eventsocket
def on_pipe_closed(self, channel: Channel, session: FBDPSession, msg: FBDPMessage,
exc: Exception=None) -> None:
"""`~firebird.base.signal.eventsocket` called when `CLOSE` message is received or sent,
to release any resources associated with current transmission.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
msg: Received/sent CLOSE message.
exc: Exception that caused the error.
"""
@eventsocket
def on_noop(self, channel: Channel, session: FBDPSession) -> None:
"""`~firebird.base.signal.eventsocket` called when `NOOP` message is received, and
after ACK-REPLY (if requested) is send.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
"""
@eventsocket
def on_accept_data(self, channel: Channel, session: FBDPSession, data: bytes) -> None:
"""`~firebird.base.signal.eventsocket` executed for CONSUMER to process data
received in `DATA` message.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
data: Data received from client.
The event handler may cancel the transmission by raising the `StopError` exception
with `code` attribute containing the `ErrorCode` to be returned in CLOSE message.
Note:
The ACK-REQUEST in received DATA message is handled automatically by protocol.
"""
@eventsocket
def on_produce_data(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""`~firebird.base.signal.eventsocket` executed for PRODUCER to store data into
outgoing `DATA` message.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
msg: DATA message that will be sent to client.
The event handler must store the data in `msg.data_frame` attribute. It may also
set ACK-REQUEST flag and `type_data` attribute.
The event handler may cancel the transmission by raising the `StopError` exception
with `code` attribute containing the `ErrorCode` to be returned in CLOSE message.
Note:
To indicate end of data, raise StopError with ErrorCode.OK code.
"""
@eventsocket
def on_data_confirmed(self, channel: Channel, session: FBDPSession, type_data: int) -> None:
"""`~firebird.base.signal.eventsocket` executed for PRODUCER when ACK_REPLY on
sent `DATA` is received.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
type_data: Content of `type_data` field from received DATA message confirmation.
The event handler may cancel the transmission by raising the `StopError` exception
with `code` attribute containing the `ErrorCode` to be returned in CLOSE message.
"""
@eventsocket
def on_get_data(self, channel: Channel, session: FBDPSession) -> bool:
"""`~firebird.base.signal.eventsocket` executed for PRODUCER to query the data
source for data availability, and for CONSUMER to query whether data could be accepted.
Important:
If this event does not have handler assigned, the data source is considered as
"stable" source that can always produce/consume data (for example data files
are stable sources).
For PRODUCERS, handler MUST return True if there are data available for sending.
If handler returns False, the transmission will be suspended until its resumed
via `Channel.set_wait_out(True)`.
The event handler may cancel the transmission by raising the `StopError` exception
with `code` attribute containing the `ErrorCode` to be returned in CLOSE message.
"""
@property
def logging_id(self) -> str:
"Returns _logging_id_ or <class_name>"
return getattr(self, '_logging_id_', self.__class__.__name__)
[docs]
class FBDPServer(_FBDP):
"""9/FBDP - Firebird Butler Data Pipe Protocol - Server side.
"""
[docs]
def __init__(self, *, session_type: Type[FBDPSession] = FBDPSession):
"""
Arguments:
session_type: Class for session objects.
"""
super().__init__(session_type=session_type)
# Session socket that means that messages flow to us (client connected to our INPUT)
self._flow_in_socket: PipeSocket = PipeSocket.INPUT
#
self.on_accept_client = self.handle_accept_client
self.on_get_ready = self.handle_get_ready
self.on_schedule_ready = self.handle_schedule_ready
#
self.handlers.update({MsgType.OPEN: self.handle_open_msg,
MsgType.READY: self.handle_ready_msg,
})
[docs]
def _init_new_batch(self, channel: Channel, session: FBDPSession) -> None:
"""Initializes the transmission of a new batch of `DATA` messages.
As we're server, we also have to send `READY` to the client.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
"""
session.transmit = None
if (batch_size := self.on_get_ready(channel, session)) == 0:
self.on_schedule_ready(channel, session)
if batch_size is not None:
ready = max(0, self.batch_size if batch_size == -1 else batch_size)
self.send_ready(channel, session, ready)
session.await_ready = True
[docs]
def handle_accept_client(self, channel: Channel, session: FBDPSession) -> None:
"""Default event handler that raises `.StopError` exception with ErrorCode.INTERNAL_ERROR.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
"""
raise StopError("Accept handler not defined", code=ErrorCode.INTERNAL_ERROR)
[docs]
def handle_get_ready(self, channel: Channel, session: FBDPSession) -> int:
"""Default event handler that returns -1, unless `on_get_data` event handler is
assigned and it returns False - then it returns 0.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
"""
if self.on_get_data.is_set() and not self.on_get_data(channel, session):
return 0
return -1
[docs]
def handle_schedule_ready(self, channel: Channel, session: FBDPSession) -> None:
"""Default event handler that raises `.StopError` exception with ErrorCode.INTERNAL_ERROR.
Note:
This handler must be reasigned or overriden only when `.on_get_ready` event
handler may return zero.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
"""
raise StopError("READY scheduler not defined", code=ErrorCode.INTERNAL_ERROR)
[docs]
def resend_ready(self, channel: Channel, session: FBDPSession) -> None:
"""Send another `READY` message to the client.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
"""
# It's possible that session could be canceled before resend is called,
# or transmission was already started, or READY was already sent and awaits
# response from client. In such case we'll ignore this resend request.
if (session.routing_id in channel.sessions
and session.transmit is None
and not session.await_ready):
# This is called directly and not via `handle_msg()` and message handler, so
# it's necessary to handle exceptions like `handle_msg()` does.
try:
self._init_new_batch(channel, session)
except Exception as exc:
try:
self.handle_exception(channel, session, FBDPMessage(), exc)
except:
warnings.warn('Exception raised in exception handler', RuntimeWarning)
#else:
#if session.routing_id not in channel.sessions:
#warnings.warn('resend_ready: senssion cancelled', RuntimeWarning)
#elif session.transmit is not None:
#warnings.warn('resend_ready: transmission already started', RuntimeWarning)
#elif session.await_ready:
#warnings.warn('resend_ready: READY was already sent', RuntimeWarning)
#else:
#warnings.warn('resend_ready: programming error', RuntimeWarning)
[docs]
def handle_open_msg(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Process OPEN message received from client.
Arguments:
channel: Channel that received the message.
session: Session associated with client.
msg: Received message.
Note:
All exceptions are handled by `~_FBDP.handle_exception`.
"""
if session.pipe is not None:
# Client already attached to data pipe, OPEN out of band
raise StopError("Out of band OPEN message", code=ErrorCode.PROTOCOL_VIOLATION)
socket = PipeSocket(msg.data_frame.pipe_socket)
session.pipe = msg.data_frame.data_pipe
session.socket = socket
session.data_format = msg.data_frame.data_format
session.params.update(struct2dict(msg.data_frame.parameters))
self.on_accept_client(channel, session)
self._init_new_batch(channel, session)
channel.on_output_ready = self._on_output_ready
[docs]
def handle_ready_msg(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Process READY message received from client.
Arguments:
channel: Channel that received the message.
session: Session associated with client.
msg: Received message.
Note:
All exceptions are handled by `~_FBDP.handle_exception`.
"""
if not session.await_ready:
# Transmission in progress, READY is out of band
raise StopError("Out of band READY message",
code=ErrorCode.PROTOCOL_VIOLATION)
session.await_ready = False
if msg.type_data == 0:
# Client either confirmed our zero, or is not ready yet.
self.on_schedule_ready(channel, session)
else:
# All green to transmit DATA
session.transmit = msg.type_data
if session.socket is PipeSocket.OUTPUT:
# Initiate transfer to output (via I/O loop)
channel.set_wait_out(True, session)
@eventsocket
def on_accept_client(self, channel: Channel, session: FBDPSession) -> None:
"""`~firebird.base.signal.eventsocket` executed when client connects to the data
pipe via OPEN message.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
The session attributes `~FBDPSession.pipe`, `~FBDPSession.socket`, `.data_format`
and `.params` contain information sent by client, and the event handler must
validate the request.
If request should be rejected, it must raise the `.StopError` exception with `code
attribute` containing the `ErrorCode` to be returned in `CLOSE` message.
"""
@eventsocket
def on_get_ready(self, channel: Channel, session: FBDPSession) -> int:
"""`~firebird.base.signal.eventsocket` executed to obtain the transmission batch
size for the client.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
Returns:
Number of messages that could be transmitted (batch size):
* 0 = Not ready to transmit yet
* n = Ready to transmit 1..<n> messages.
* -1 = Ready to transmit 1..<protocol batch size> messages.
The event handler may cancel the transmission by raising the `.StopError` exception
with `code attribute` containing the `ErrorCode` to be returned in `CLOSE` message.
"""
@eventsocket
def on_schedule_ready(self, channel: Channel, session: FBDPSession) -> None:
"""The `~firebird.base.signal.eventsocket` is executed in order to send the `READY`
message to the client later.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
The event handler may cancel the transmission by raising the `.StopError` exception
with `code attribute` containing the `ErrorCode` to be returned in `CLOSE` message.
"""
[docs]
class FBDPClient(_FBDP):
"""9/FBDP - Firebird Butler Data Pipe Protocol - Client side.
"""
[docs]
def __init__(self, *, session_type: Type[FBDPSession] = FBDPSession):
"""
Arguments:
session_type: Class for session objects.
batch_size: Default batch size.
"""
super().__init__(session_type=session_type)
# Session socket that means that messages flow to us (we are connected to server OUTPUT)
self._flow_in_socket: PipeSocket = PipeSocket.OUTPUT
self.on_server_ready = self.handle_server_ready
#
self.handlers.update({MsgType.OPEN: self.handle_open_msg,
MsgType.READY: self.handle_ready_msg,
})
[docs]
def handle_server_ready(self, channel: Channel, session: FBDPSession, batch_size: int) -> int: # pylint: disable=W0613
"""Default event handler that returns -1, unless `.on_get_data` event handler is
assigned and it returns False - then it returns 0.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
batch_size: Batch size limit set by server.
"""
if self.on_get_data.is_set() and not self.on_get_data(channel, session):
return 0
return -1
[docs]
def _init_new_batch(self, channel: Channel, session: FBDPSession) -> None:
"""Initializes the transmission of a new batch of DATA messages.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with server.
"""
session.transmit = None
[docs]
def accept_new_session(self, channel: Channel, routing_id: RoutingID, msg: FBDPMessage) -> bool: # pylint: disable=W0613
"""Validates incoming message that initiated new session/transmission.
Arguments:
channel: Channel that received the message.
routing_id: Routing ID of the sender.
msg: Received message.
Returns:
Always False (transmission must be initiated by Client).
"""
return False
[docs]
def connect_with_session(self, channel: Channel) -> bool: # pylint: disable=W0613
"""Called by `.Channel.connect` to determine whether new session should be
associated with connected peer.
As FBDP require that connecting peers must send OPEN message to initiate
transmission, it always returns True.
Arguments:
channel: Channel associated with data pipe.
"""
return True
[docs]
def handle_open_msg(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""`OPEN` message received from server is violation of the protocol, so it raises
`.StopError` with this error code.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~_FBDP.handle_exception`.
"""
raise StopError("OPEN message received from server", ErrorCode.PROTOCOL_VIOLATION)
[docs]
def handle_ready_msg(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Process `READY` message received from server.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~_FBDP.handle_exception`.
"""
if session.transmit is not None:
# Transmission in progress, READY is out of band
raise StopError("Out of band READY message",
code=ErrorCode.PROTOCOL_VIOLATION)
if msg.type_data > 0:
# Server is ready
batch_size = self.on_server_ready(channel, session, msg.type_data)
result = max(0, min(msg.type_data, self.batch_size if batch_size == -1 else batch_size))
self.send_ready(channel, session, result)
if result > 0:
# We are ready to transmit as well
session.transmit = result
if session.socket is PipeSocket.INPUT:
# Initiate transfer to server (via I/O loop)
channel.set_wait_out(True, session)
else:
# Server is not ready, but we must send READY(0) back to confirm we've got it!
self.send_ready(channel, session, 0)
[docs]
def send_open(self, channel: Channel, session: FBDPSession, data_pipe: str,
pipe_socket: PipeSocket, data_format: str, parameters: Dict=None) -> None:
"""Sends `OPEN` message.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with transmission.
data_pipe: Data pipe identification.
pipe_socket: Connected pipe socket.
data_format: Required data format.
parameters: Data pipe parameters.
Raises:
StopError: When sending message fails.
"""
msg = self.create_message_for(MsgType.OPEN)
msg.data_frame.data_pipe = data_pipe
msg.data_frame.pipe_socket = pipe_socket.value
msg.data_frame.data_format = data_format
if parameters:
msg.data_frame.parameters.CopyFrom(dict2struct(parameters))
if channel.send(msg, session) != 0:
raise StopError("Broken pipe, can't send OPEN message", code=ErrorCode.ERROR)
channel.on_output_ready = self._on_output_ready
session.pipe = data_pipe
session.socket = pipe_socket
session.data_format = data_format
if parameters:
session.params.update(parameters)
self.on_init_session(channel, session)
@eventsocket
def on_server_ready(self, channel: Channel, session: FBDPSession, batch_size: int) -> int:
"""`~firebird.base.signal.eventsocket` executed to negotiate the transmission
batch size with server.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with server.
batch_size: Max. batch size accepted by server.
Returns:
Number of messages that could be transmitted (batch size):
* 0 = Not ready to transmit yet.
* n = Ready to transmit 1..<n> messages.
* -1 = Ready to transmit 1..<protocol batch size> messages.
Important:
The returned value will be used ONLY when it's smaller than `batch_size`.
The event handler may cancel the transmission by raising the `.StopError` exception
with `code attribute` containing the `ErrorCode` to be returned in `CLOSE` message.
"""
@eventsocket
def on_init_session(self, channel: Channel, session: FBDPSession) -> None:
"""`~firebird.base.signal.eventsocket` executed from `send_open()` to set
additional information to newly created session instance.
"""