# SPDX-FileCopyrightText: 2019-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: saturnin
# FILE: saturnin/protocol/fbsp.py
# DESCRIPTION: Firebird Butler Service Protocol
# See https://firebird-butler.readthedocs.io/en/latest/rfc/4/FBSP.html
# CREATED: 21.2.2019
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2019 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
# ______________________________________.
"""Saturnin reference implementation of Firebird Butler Service Protocol.
See https://firebird-butler.readthedocs.io/en/latest/rfc/4/FBSP.html
"""
from __future__ import annotations
import uuid
from collections.abc import Callable, Iterable
from contextlib import suppress
from enum import IntEnum, IntFlag
from struct import pack, unpack
from traceback import format_exception
from typing import TYPE_CHECKING, Any, ClassVar, Final
import zmq
from saturnin.base import (
ANY,
INVALID,
AgentDescriptor,
ButlerInterface,
Channel,
InvalidMessageError,
Message,
PeerDescriptor,
Protocol,
RoutingID,
ServiceDescriptor,
ServiceError,
Session,
State,
StopError,
Token,
TZMQMessage,
)
from firebird.base.protobuf import ProtoMessage, create_message
from firebird.base.signal import eventsocket
if TYPE_CHECKING:
from firebird.butler.fbsd_pb2 import ErrorDescription
from firebird.butler.fbsp_pb2 import (
FBSPCancelRequests,
FBSPHelloDataframe,
FBSPStateInformation,
FBSPWelcomeDataframe,
)
# Message header
#: FBSP protocol control frame :mod:`struct` format
HEADER_FMT_FULL: Final[str] = '!4sBBH8s'
#: FBSP protocol control frame :mod:`struct` format without FOURCC
HEADER_FMT: Final[str] = '!4xBBH8s'
#: FBSP protocol identification (FOURCC)
FOURCC: Final[bytes] = b'FBSP'
#: FBSP protocol version mask
VERSION_MASK: Final[int] = 7
#: FBSP protocol error mask
ERROR_TYPE_MASK: Final[int] = 31
# Protobuf messages
#: Protobuf message for FBSP HELLO message
PROTO_HELLO: Final[str] = 'firebird.butler.FBSPHelloDataframe'
#: Protobuf message for FBSP WELCOME message
PROTO_WELCOME: Final[str] = 'firebird.butler.FBSPWelcomeDataframe'
#: Protobuf message for FBSP CANCEL REQUEST message
PROTO_CANCEL_REQ: Final[str] = 'firebird.butler.FBSPCancelRequests'
#: Protobuf message for FBSP STATE INFO message
PROTO_STATE_INFO: Final[str] = 'firebird.butler.FBSPStateInformation'
#: Protobuf message for FBSP ERROR message
PROTO_ERROR: Final[str] = 'firebird.butler.ErrorDescription'
# Enums
[docs]
class MsgType(IntEnum):
"""FBSP Message Type"""
UNKNOWN = 0 # Not a valid option, defined only to handle undefined values
HELLO = 1 # initial message from client
WELCOME = 2 # initial message from service
NOOP = 3 # no operation, used for keep-alive & ping purposes
REQUEST = 4 # client request
REPLY = 5 # service response to client request
DATA = 6 # separate data sent by either client or service
CANCEL = 7 # cancel request
STATE = 8 # operating state information
CLOSE = 9 # sent by peer that is going to close the connection
ERROR = 31 # error reported by service
[docs]
class MsgFlag(IntFlag):
"""FBSP message flag"""
NONE = 0
ACK_REQ = 1
ACK_REPLY = 2
MORE = 4
[docs]
class ErrorCode(IntEnum):
"""FBSP Error Code"""
# Errors indicating that particular request cannot be satisfied
INVALID_MESSAGE = 1
PROTOCOL_VIOLATION = 2
BAD_REQUEST = 3
NOT_IMPLEMENTED = 4
ERROR = 5
INTERNAL_ERROR = 6
REQUEST_TIMEOUT = 7
TOO_MANY_REQUESTS = 8
FAILED_DEPENDENCY = 9
FORBIDDEN = 10
UNAUTHORIZED = 11
NOT_FOUND = 12
GONE = 13
CONFLICT = 14
PAYLOAD_TOO_LARGE = 15
INSUFFICIENT_STORAGE = 16
REQUEST_CANCELLED = 17
# Fatal errors indicating that connection would/should be terminated
SERVICE_UNAVAILABLE = 2000
FBSP_VERSION_NOT_SUPPORTED = 2001
[docs]
def bb2h(value_hi: int, value_lo: int) -> int:
"""Compose two bytes (high byte, low byte) into a 16-bit unsigned word (short) value
using network byte order (big-endian).
"""
return unpack('!H', pack('!BB', value_hi, value_lo))[0]
[docs]
def msg_bytes(msg: bytes | bytearray | zmq.Frame) -> bytes | bytearray:
"""Return message frame as bytes.
"""
return msg.bytes if isinstance(msg, zmq.Frame) else msg
[docs]
class FBSPMessage(Message):
"""Firebird Butler Service Protocol (FBSP) Message.
"""
def __init__(self):
#: Type of message
self.msg_type: MsgType = MsgType.UNKNOWN
#: Message flags
self.flags: MsgFlag = MsgFlag.NONE
#: Data associated with message
self.type_data: int = 0
#: Message token
self.token: Token = bytearray(8)
def __str__(self):
return f"{self.__class__.__qualname__}[{self.msg_type.name}]"
__repr__ = __str__
[docs]
def _unpack_data(self, data: list) -> None:
"""Called when all fields of the message are set. Usefull for data deserialization.
Note: Default implementation does nothing.
Arguments:
data: A list of remaining ZMQ frames after the header, to be deserialized into
message-specific attributes.
"""
[docs]
def _pack_data(self) -> list:
"""Called when serialization is requested.
Note: Default implementation returns empty list.
Returns:
A list of ZMQ-compatible frames (bytes or zmq.Frame) representing the message-specific payload.
"""
return []
[docs]
def _set_hdr(self, header: bytes) -> None:
"""Initialize new message from header.
"""
control_byte, flags, self.type_data, self.token = unpack(HEADER_FMT, header)
self.msg_type = MsgType(control_byte >> 3)
self.flags = MsgFlag(flags)
[docs]
def from_zmsg(self, zmsg: TZMQMessage) -> None:
"""Populate message data from sequence of ZMQ data frames.
Arguments:
zmsg: Sequence of ZMQ frames. The first frame (header) is assumed to
have been pre-processed by the message factory to set initial
attributes like `msg_type`. This method processes subsequent frames.
Raises:
InvalidMessageError: If message is not a valid protocol message.
"""
try:
zmsg.pop(0) # header is already set by message_factory
if MsgFlag.ACK_REPLY not in self.flags:
self._unpack_data(zmsg)
except Exception as exc:
raise InvalidMessageError("Invalid message") from exc
[docs]
def as_zmsg(self) -> TZMQMessage:
"""Returns message as sequence of ZMQ data frames.
"""
zmsg = [self.get_header()]
if MsgFlag.ACK_REPLY not in self.flags:
zmsg.extend(self._pack_data())
return zmsg
[docs]
def clear(self) -> None:
"""Clears message data.
Important:
The `msg_type` remains the same (i.e. you can't change the message type once
message instance was created), but all other data are cleared.
"""
self.type_data = 0
self.flags = MsgFlag(0)
self.token: Token = bytearray(8)
[docs]
def copy(self) -> Message:
"""Returns copy of the message.
"""
msg: FBSPMessage = self.__class__()
msg.from_zmsg(self.as_zmsg())
return msg
[docs]
def get_keys(self) -> Iterable:
"""Returns iterable of dictionary keys to be used with `Protocol.handlers`.
Keys must be provided in order of precedence (from more specific to general).
"""
return [self.msg_type, ANY]
[docs]
def has_more(self) -> bool:
"""Returns True if message has `MORE` flag set.
"""
return MsgFlag.MORE in self.flags
[docs]
def has_ack_req(self) -> bool:
"""Returns True if message has ACK_REQ flag set.
"""
return MsgFlag.ACK_REQ in self.flags
[docs]
def has_ack_reply(self) -> bool:
"""Returns True if message has ASK_REPLY flag set.
"""
return MsgFlag.ACK_REPLY in self.flags
[docs]
def set_flag(self, flag: MsgFlag) -> None:
"""Set flag specified by `flag` mask.
"""
self.flags |= flag
[docs]
def clear_flag(self, flag: MsgFlag) -> None:
"""Clear flag specified by `flag` mask.
"""
self.flags &= ~flag
[docs]
class HandshakeMessage(FBSPMessage):
"""Base FBSP client/service handshake message (`HELLO` or `WELCOME`). The message
includes basic information about the Peer.
"""
def __init__(self):
super().__init__()
#: Protobuf message instance holding peer information.
self.data: ProtoMessage = None
[docs]
def _unpack_data(self, data: list) -> None:
"""Called when all fields of the message are set. Usefull for data deserialization.
"""
self.data.ParseFromString(data.pop(0))
[docs]
def _pack_data(self) -> list:
"""Called when serialization is requested.
"""
return [self.data.SerializeToString()]
[docs]
def clear(self) -> None:
"""Clears message attributes.
"""
super().clear()
self.data.Clear()
[docs]
class HelloMessage(HandshakeMessage):
"""The `HELLO` message is a Client request to open a Connection to the Service.
The message includes basic information about the Client and Connection parameters
required by the Client.
"""
def __init__(self):
super().__init__()
#:`FBSPHelloDataframe` protobuf message with peer information
self.data: FBSPHelloDataframe = create_message(PROTO_HELLO)
[docs]
class WelcomeMessage(HandshakeMessage):
"""The `WELCOME` message is the response of the Service to the `HELLO` message sent by
the Client, which confirms the successful creation of the required Connection and announces
basic parameters of the Service and the Connection.
"""
def __init__(self):
super().__init__()
#: `FBSPWelcomeDataframe` protobuf message with peer information
self.data: FBSPWelcomeDataframe = create_message(PROTO_WELCOME)
[docs]
class APIMessage(FBSPMessage):
"""Base FBSP client/service API message (`REQUEST`, `REPLY`, `STATE`).
The message includes information about the API call (interface ID and API Code).
"""
def __init__(self):
super().__init__()
#: List of bytes, typically representing data frames of the API message payload.
self.data: list = []
[docs]
def _unpack_data(self, data: list) -> None:
"""Called when all fields of the message are set. Usefull for data deserialization.
"""
self.data.extend(i.bytes if isinstance(i, zmq.Frame) else i for i in data)
[docs]
def _pack_data(self) -> list:
"""Called when serialization is requested.
"""
return self.data
[docs]
def clear(self) -> None:
"""Clears message attributes.
"""
super().clear()
self.data.clear()
@property
def interface_id(self) -> int:
"Interface ID (high byte of Request Code)"
return unpack('!BB', pack('!H', self.type_data))[0]
@interface_id.setter
def interface_id(self, value: int) -> None:
self.type_data = bb2h(value, self.api_code)
@property
def api_code(self) -> int:
"API Code (lower byte of Request Code)"
return unpack('!BB', pack('!H', self.type_data))[1]
@api_code.setter
def api_code(self, value: int) -> None:
self.type_data = bb2h(self.interface_id, value)
@property
def request_code(self) -> int:
"Request Code (Interface ID + API Code)"
return self.type_data
[docs]
class StateMessage(APIMessage):
"""The `STATE` message is a Client request to the Service.
"""
def __init__(self):
super().__init__()
#: `FBSPStateInformation` protobuf message with state information
self.state_info: FBSPStateInformation = create_message(PROTO_STATE_INFO)
[docs]
def _unpack_data(self, data: list) -> None:
"""Called when all fields of the message are set. Usefull for data deserialization.
Arguments:
data: A list containing the serialized `FBSPStateInformation` protobuf message.
"""
self.state_info.ParseFromString(data.pop(0))
super()._unpack_data(data)
[docs]
def _pack_data(self) -> list:
"""Called when serialization is requested.
Returns:
A list containing the serialized `FBSPStateInformation` protobuf message.
"""
return [self.state_info.SerializeToString()]
[docs]
def clear(self) -> None:
"""Clears message attributes.
"""
super().clear()
self.state_info.Clear()
@property
def state(self) -> State:
"Service state"
return State(self.state_info.state)
@state.setter
def state(self, value: State) -> None:
self.state_info.state = value.value
[docs]
class DataMessage(FBSPMessage):
"""The `DATA` message is intended for delivery of arbitrary data between connected peers."""
def __init__(self):
super().__init__()
#: Data payload
self.data: list[bytes] = []
[docs]
def _unpack_data(self, data: list) -> None:
"""Called when all fields of the message are set. Usefull for data deserialization.
Arguments:
data: A list containing `bytes` or `zmq.Frame` data payload.
"""
self.data.extend(i.bytes if isinstance(i, zmq.Frame) else i for i in data)
[docs]
def _pack_data(self) -> list:
"""Called when serialization is requested.
Returns:
A list of `bytes` containing the message `data`.
"""
return self.data
[docs]
def clear(self) -> None:
"""Clears message attributes.
"""
super().clear()
self.data.clear()
[docs]
class CancelMessage(FBSPMessage):
"""The `CANCEL` message represents a request for a Service to stop processing the previous
request from the Client.
"""
def __init__(self):
super().__init__()
#: `.FBSPCancelRequests` protobuf message
self.request: FBSPCancelRequests = create_message(PROTO_CANCEL_REQ)
[docs]
def _unpack_data(self, data: list) -> None:
"""Called when all fields of the message are set. Usefull for data deserialization.
Arguments:
data: A list containing the serialized `FBSPCancelRequests` protobuf message.
"""
self.request.ParseFromString(data.pop(0))
[docs]
def _pack_data(self) -> list:
"""Called when serialization is requested.
Returns:
A list containing the serialized `FBSPCancelRequests` protobuf message.
"""
return [self.request.SerializeToString()]
[docs]
def clear(self) -> None:
"""Clears message attributes.
"""
super().clear()
self.request.Clear()
[docs]
class ErrorMessage(FBSPMessage):
"""The `ERROR` message notifies the Client about error condition detected by Service.
"""
def __init__(self):
super().__init__()
#: List of `.ErrorDescription` protobuf messages with error information
self.errors: list[ErrorDescription] = []
[docs]
def _unpack_data(self, data: list) -> None:
"""Called when all fields of the message are set. Usefull for data deserialization.
Arguments:
data: A list containing the serialized `ErrorDescription` protobuf messages.
"""
while data:
err = create_message(PROTO_ERROR)
err.ParseFromString(msg_bytes(data.pop(0)))
self.errors.append(err)
[docs]
def _pack_data(self) -> list:
"""Called when serialization is requested.
Returns:
A list containing the serialized `ErrorDescription` protobuf messages.
"""
return [err.SerializeToString() for err in self.errors]
[docs]
def clear(self) -> None:
"""Clears message attributes.
"""
super().clear()
self.errors.clear()
[docs]
def add_error(self) -> ErrorDescription:
"""Return newly created `ErrorDescription` associated with message."""
err = create_message(PROTO_ERROR)
self.errors.append(err)
return err
[docs]
def note_exception(self, exc: Exception) -> None:
"""Store information from exception into `.ErrorMessage`.
Arguments:
exc: The Exceptioninstance to be recorded in the error message. Handles chained
exceptions viacause `cause` attribute.
"""
to_note = exc
while to_note:
errdesc = self.add_error()
if hasattr(to_note, 'code'):
errdesc.code = to_note.code
errdesc.description = str(to_note)
if not isinstance(to_note, StopError):
errdesc.annotation['traceback'] = \
''.join(format_exception(to_note.__class__, to_note,
to_note.__traceback__, chain=False))
to_note = to_note.__cause__
@property
def error_code(self) -> ErrorCode:
"Error code"
return ErrorCode(self.type_data >> 5)
@error_code.setter
def error_code(self, value: ErrorCode) -> None:
self.type_data = (value.value << 5) | (self.type_data & ERROR_TYPE_MASK)
@property
def relates_to(self) -> MsgType:
"Message type this error relates to"
return MsgType(self.type_data & ERROR_TYPE_MASK)
@relates_to.setter
def relates_to(self, value: MsgType) -> None:
self.type_data &= ~ERROR_TYPE_MASK
self.type_data |= value.value
[docs]
class _FBSP(Protocol):
"""4/FBSP - Firebird Butler Service Protocol
"""
#: string with protocol OID (dot notation).
OID: Final[str] = '1.3.6.1.4.1.53446.1.3.1'
# iso.org.dod.internet.private.enterprise.firebird.butler.protocol.fbsp
#: UUID instance that identifies the protocol.
UID: Final[uuid.UUID] = uuid.uuid5(uuid.NAMESPACE_OID, OID)
#: Mapping from message type to specific Message class
MESSAGE_MAP: ClassVar[dict[MsgType, FBSPMessage]] = {
MsgType.HELLO: HelloMessage,
MsgType.WELCOME: WelcomeMessage,
MsgType.NOOP: FBSPMessage,
MsgType.REQUEST: APIMessage,
MsgType.REPLY: APIMessage,
MsgType.DATA: DataMessage,
MsgType.CANCEL: CancelMessage,
MsgType.STATE: StateMessage,
MsgType.CLOSE: FBSPMessage,
MsgType.ERROR: ErrorMessage,
}
[docs]
def __init__(self, *, session_type: type[Session]):
"""
Arguments:
session_type: Class for session objects.
"""
super().__init__(session_type=session_type)
self.message_factory = self.__message_factory
def __message_factory(self, zmsg: TZMQMessage | None=None) -> Message:
"""Internal message factory.
Arguments:
zmsg: The raw ZMQ message list, used here to extract the header for determining
message type and initializing the correct `.FBSPMessage` subclass.
"""
msg = self.MESSAGE_MAP[MsgType(int.from_bytes(zmsg[0][4:5], 'big') >> 3)]()
msg._set_hdr(zmsg[0])
return msg
[docs]
def validate(self, zmsg: TZMQMessage) -> None:
"""Verifies that sequence of ZMQ data frames is a valid FBSP protocol message.
If this validation passes without exception, then `.convert_msg()` of the same
message must be successful as well.
Arguments:
zmsg: ZeroMQ multipart message to be validated against FBSP rules (FourCC,
version, header structure, type-specific frames).
Raises:
InvalidMessageError: If ZMQ message is not a valid FBSP message.
"""
if not zmsg:
raise InvalidMessageError("Empty message")
fbsp_header = zmsg[0]
if len(fbsp_header) != 16: # noqa: PLR2004
raise InvalidMessageError("Message header must be 16 bytes long")
try:
fourcc, control_byte, flags, type_data, _ = unpack(HEADER_FMT_FULL, fbsp_header)
except Exception as exp:
raise InvalidMessageError("Can't parse the control frame") from exp
if fourcc != FOURCC:
raise InvalidMessageError("Invalid FourCC")
if (control_byte & VERSION_MASK) != self.REVISION:
raise InvalidMessageError("Invalid protocol version")
if (flags | 7) > 7: # noqa: PLR2004
raise InvalidMessageError("Invalid flags")
msg_type = control_byte >> 3
try:
if msg_type == 0:
raise ValueError()
message_type = MsgType(msg_type)
except ValueError as exc:
raise InvalidMessageError(f"Illegal message type {msg_type}") from exc
#
if message_type in (MsgType.REQUEST, MsgType.REPLY, MsgType.STATE):
# Check request_code validity
pass
if message_type is MsgType.ERROR:
try:
ErrorCode(type_data >> 5)
except ValueError as exc:
raise InvalidMessageError(f"Unknown ERROR code: {type_data >> 5}") from exc
if MsgType(type_data & ERROR_TYPE_MASK) not in (MsgType.UNKNOWN, MsgType.HELLO,
MsgType.REQUEST, MsgType.DATA,
MsgType.CANCEL):
raise InvalidMessageError("Invalid request code in ERROR message")
if len(zmsg) > 1:
frame = create_message(PROTO_ERROR)
for i, segment in enumerate(zmsg[1:]):
try:
frame.ParseFromString(msg_bytes(segment))
frame.Clear()
except Exception as exc:
raise InvalidMessageError(f"Invalid ERROR message data frame: {i}") from exc
elif message_type is MsgType.HELLO:
try:
create_message(PROTO_HELLO).ParseFromString(msg_bytes(zmsg[1]))
except Exception as exc:
raise InvalidMessageError("Invalid HELLO message data frame") from exc
elif message_type is MsgType.WELCOME:
try:
create_message(PROTO_WELCOME).ParseFromString(msg_bytes(zmsg[1]))
except Exception as exc:
raise InvalidMessageError("Invalid WELCOME message data frame") from exc
elif message_type is MsgType.NOOP:
if len(zmsg) > 1:
raise InvalidMessageError("Data frames not allowed for NOOP message")
elif message_type is MsgType.CANCEL:
if len(zmsg) > 2: # noqa: PLR2004
raise InvalidMessageError("CANCEL message must have exactly one data frame")
try:
create_message(PROTO_CANCEL_REQ).ParseFromString(msg_bytes(zmsg[2]))
except Exception as exc:
raise InvalidMessageError("Invalid CANCEL message data frame") from exc
elif message_type is MsgType.STATE:
if len(zmsg) > 2: # noqa: PLR2004
raise InvalidMessageError("STATE message must have exactly one data frame")
try:
create_message(PROTO_STATE_INFO).ParseFromString(msg_bytes(zmsg[2]))
except Exception as exc:
raise InvalidMessageError("Invalid STATE message data frame") from exc
[docs]
def create_message_for(self, message_type: MsgType, token: Token=bytes(8),
type_data: int=0, flags: MsgFlag=MsgFlag.NONE) -> FBSPMessage:
"""Create new `.FBSPMessage` child class instance for particular FBSP message type.
Arguments:
message_type: Type of message to be created
token: Message token
type_data: Message control data
flags: Flags
Returns:
New `.FBSPMessage` subclass instance, initialized with the provided header details.
"""
return self.message_factory([pack(HEADER_FMT_FULL, FOURCC, (message_type << 3) | 1,
flags, type_data, token)])
[docs]
def create_ack_reply(self, msg: FBSPMessage) -> FBSPMessage:
"""Returns new message that is an ACK-REPLY response message.
Arguments:
msg: Message to be acknowledged.
"""
return self.message_factory([pack(HEADER_FMT_FULL, FOURCC,
(msg.msg_type << 3) | 1,
(msg.flags & ~MsgFlag.ACK_REQ) | MsgFlag.ACK_REPLY,
msg.type_data, msg.token)])
[docs]
def create_data_for(self, msg: APIMessage) -> DataMessage:
"""Create new DataMessage for reply to specific reuest message.
Arguments:
message: Request message instance that data relates to
"""
return self.create_message_for(MsgType.DATA, msg.token)
[docs]
class FBSPSession(Session):
"""FBSP session that holds information about attached peer.
"""
def __init__(self):
super().__init__()
#: `.HelloMessage` (for service sessions) or `.WelcomeMessage` (for client sessions)
#: received from peer during handshake.
self.greeting: HelloMessage = None
#: Client peer ID for service, Service agent ID for client
self.partner_uid: uuid.UUID = None
[docs]
class FBSPService(_FBSP):
"""4/FBSP - Firebird Butler Service Protocol - Service side.
"""
[docs]
def __init__(self, *, session_type: type[FBSPSession]=FBSPSession,
service: ServiceDescriptor, peer: PeerDescriptor):
"""
Arguments:
session_type: Class for session objects.
service: Agent descriptor for service.
peer: Peer descriptor for service.
"""
super().__init__(session_type=session_type)
self.handlers.update({MsgType.HELLO: self.handle_hello_msg,
MsgType.REQUEST: self.handle_request_msg,
MsgType.CANCEL: self.handle_cancel_msg,
MsgType.NOOP: self.handle_noop_msg,
MsgType.DATA: self.handle_data_msg,
MsgType.CLOSE: self.handle_close_msg,
MsgType.REPLY: self.handle_ack_reply,
MsgType.STATE: self.handle_ack_reply,
MsgType.WELCOME: self.handle_unexpected_msg,
})
self.api_handlers: dict[Any, Callable] = {}
self._apis: list[ButlerInterface] = []
self._apis.extend(service.api)
self.welcome_df: FBSPWelcomeDataframe = create_message(PROTO_WELCOME)
self.welcome_df.instance.uid = peer.uid.bytes
self.welcome_df.instance.pid = peer.pid
self.welcome_df.instance.host = peer.host
self.welcome_df.service.uid = service.agent.uid.bytes
self.welcome_df.service.name = service.agent.name
self.welcome_df.service.version = service.agent.version
self.welcome_df.service.classification = service.agent.classification
self.welcome_df.service.vendor.uid = service.agent.vendor_uid.bytes
self.welcome_df.service.platform.uid = service.agent.platform_uid.bytes
self.welcome_df.service.platform.version = service.agent.platform_version
for i, _api in enumerate(self._apis):
intf = self.welcome_df.api.add()
intf.number = i
intf.uid = _api.get_uid().bytes
[docs]
def accept_new_session(self, channel: Channel, routing_id: RoutingID, msg: FBSPMessage) -> bool:
"""Validates incoming message that initiated new session/transmission.
Calls `on_accept_client` event handler that may reject the client by raising `.StopError`
exception with appropriate error code to be sent to the client in ERROR message.
Arguments:
channel: Channel that received the message.
routing_id: Routing ID of the sender.
msg: Received message.
"""
err_code = None
err = None
try:
if msg.msg_type is not MsgType.HELLO:
raise StopError("Expecting HELLO message", code=ErrorCode.PROTOCOL_VIOLATION)
self.on_accept_client(channel, msg)
except StopError as exc:
err_code = getattr(exc, 'code', ErrorCode.ERROR)
except Exception as exc:
err_code = getattr(exc, 'code', ErrorCode.INTERNAL_ERROR)
err = exc
if err_code is not None:
session = Session()
session.routing_id = routing_id
self.send_error(channel, session, msg, err_code, err)
return False
return True
[docs]
def register_api_handler(self, api_code: ButlerInterface, handler: Callable) -> None:
"""Register handler for REQUEST message for particular service API code.
Arguments:
api_code: API code.
handler: A callable that handles the API code matching the signature
(channel: Channel, session: FBSPSession, msg: APIMessage) -> None
"""
# Validate handler via eventsocket
_hndcheck.service_handler = handler
_hndcheck.service_handler = None
self.api_handlers[bb2h(self._apis.index(api_code.__class__), api_code.value)] = handler
[docs]
def create_welcome_reply(self, msg: HelloMessage) -> WelcomeMessage:
"""Create new `.WelcomeMessage` that is a reply to client's HELLO.
Arguments:
msg: `.HelloMessage` from the client
"""
return self.create_message_for(MsgType.WELCOME, msg.token)
[docs]
def create_error_for(self, msg: FBSPMessage, error_code: ErrorCode) -> ErrorMessage:
"""Create new ErrorMessage that relates to specific message.
Arguments:
msg: `FBSPMessage` instance that error relates to
error_code: Error code
"""
err = self.create_message_for(MsgType.ERROR, msg.token)
err.relates_to = msg.msg_type
err.error_code = error_code
return err
[docs]
def create_reply_for(self, msg: APIMessage) -> APIMessage:
"""Create new ReplyMessage for specific RequestMessage.
Arguments:
msg: Request message instance that reply relates to
"""
return self.create_message_for(MsgType.REPLY, msg.token, msg.type_data)
[docs]
def create_state_for(self, msg: APIMessage, state: State) -> StateMessage:
"""Create new `.StateMessage` that relates to specific `RequestMessage`.
Arguments:
msg: Request message instance that state relates to
state: State code
"""
msg = self.create_message_for(MsgType.STATE, msg.token, msg.type_data)
msg.state = state
return msg
[docs]
def handle_exception(self, channel: Channel, session: Session, msg: Message, exc: Exception) -> None:
"""Called by `.handle_msg()` on exception in message handler.
Sends `ERROR` message and then calls `.on_exception` handler.
Important:
The error code is extracted ONLY from `.StopError` exception. Other exception
types will result in `6 - Internal Service Error` error code.
Arguments:
channel: Channel for communication with client
session: Client session
msg: Message aasociated with exception
exc: Exception raised
"""
error_code = getattr(exc, 'code', ErrorCode.ERROR) if isinstance(exc, StopError) \
else ErrorCode.INTERNAL_ERROR
self.send_error(channel, session, msg, error_code, exc)
super().handle_exception(channel, session, msg, exc)
[docs]
def handle_unexpected_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None:
"""Process unexpected valid message received from client.
Sends `ERROR` message to the client with error code `2 - Protocol violation`.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
"""
raise StopError("Unexpected message", code=ErrorCode.PROTOCOL_VIOLATION)
[docs]
def handle_hello_msg(self, channel: Channel, session: FBSPSession, msg: HelloMessage) -> None:
"""Process `HELLO` message received from client.
Sends `WELCOME` message to the client.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
The HELLO message was preprocessed and approved by `.accept_new_session()`.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
session.greeting = msg
session.partner_uid = uuid.UUID(bytes=msg.data.instance.uid)
welcome = self.create_welcome_reply(msg)
welcome.data.CopyFrom(self.welcome_df)
channel.send(welcome, session)
[docs]
def handle_request_msg(self, channel: Channel, session: FBSPSession, msg: APIMessage) -> None:
"""Process `REQUEST` message received from client.
Calls the appropriate API handler set via `.register_api_handler()`.
Important:
The registered API handler is responsible for sending the `ACK-REPLY`
message if the incoming request `msg` has the `ACK_REQ` flag set.
According to FBSP specification, the request should be acknowledged
once the Service has decided to accept it and before starting fulfillment.
The handler may also send an `ERROR` message instead of an `ACK-REPLY`.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
handler = self.api_handlers.get(msg.type_data)
if handler is None:
raise StopError("", code=ErrorCode.NOT_IMPLEMENTED)
handler(channel, session, msg, self)
[docs]
def handle_cancel_msg(self, channel: Channel, session: FBSPSession, msg: CancelMessage) -> None:
"""Process `CANCEL` message received from peer.
Calls `on_cancel` event handler. According to FBSP, the service must send the ERROR
message with appropriate error code (if request was successfuly cancelled, the code
must be `17 - Request Cancelled`). To follow the common pattern, the event handler
must raise an `.StopError` exception with `code attribute` set to the error code
to be returned in ERROR message.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Raises:
StopError: With error code `4 - Not Implemented` if event handler is not defined.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
self.on_cancel(channel, session, msg)
if not self.on_cancel.is_set():
raise StopError("Request cancellation not supported", code=ErrorCode.NOT_IMPLEMENTED)
[docs]
def handle_noop_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None:
"""Process `NOOP` message received from client.
If message is an ACK-REPLY, it calls `on_ack_received` event handler.
Otherwise it sends ACK-REPLY if requested and then calls `on_noop` event handler.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
if msg.has_ack_reply():
self.on_ack_received(channel, session, msg)
return
if msg.has_ack_req():
channel.send(self.create_ack_reply(msg), session)
self.on_noop(channel, session)
[docs]
def handle_data_msg(self, channel: Channel, session: FBSPSession, msg: DataMessage) -> None:
"""Process `DATA` message received from peer.
If message is an ACK-REPLY, it calls `on_ack_received` event handler. Otherwise it
calls `on_data` event handler (that must also handle the ACK-REQ if set).
If `on_data` event handler is not defined, the handler sends ERROR message
with code `2 - Protocol violation` by raising the `.StopError`.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
if msg.has_ack_reply():
self.on_ack_received(channel, session, msg)
return
self.on_data(channel, session, msg)
if not self.on_data.is_set():
raise StopError("Unexpected DATA message", code=ErrorCode.PROTOCOL_VIOLATION)
[docs]
def handle_close_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None:
"""Process `CLOSE` message received from client.
Calls `on_session_closed` event handler and then discards the session.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Important:
All exceptions raised by event handler are silently ignored.
"""
try:
self.on_session_closed(channel, session, msg)
except Exception:
# We don't want to handle this via `handle_exception` and we're closing
# the session anyway
pass
finally:
channel.discard_session(session)
[docs]
def handle_ack_reply(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None:
"""Process ACK-REPLY messages received from client.
If message has ACK-REPLY flag set, it calls `on_ack_received` event handler.
Otherwise it raises `.StopError` with PROTOCOL_VIOLATION error code.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
if msg.has_ack_reply():
self.on_ack_received(channel, session, msg)
else:
raise StopError(f"Client can send {msg.msg_type.name} messages only as ACK-REPLY",
code=ErrorCode.PROTOCOL_VIOLATION)
[docs]
def send_error(self, channel: Channel, session: FBSPSession, relates_to: FBSPMessage,
error_code: ErrorCode, exc: Exception | None=None) -> None:
"""Sends `ERROR` message to the client associated with session.
Arguments:
channel: Channel associate with data pipe.
session: Session associated with transmission.
relates_to: FBSP message to which this ERROR is related.
error_code: Error code.
exc: Exception that caused the error.
"""
msg: ErrorMessage = self.create_message_for(MsgType.ERROR, relates_to.token)
msg.error_code = error_code
msg.relates_to = relates_to.msg_type
if exc is not None:
msg.note_exception(exc)
channel.send(msg, session)
[docs]
def send_close(self, channel: Channel, session: FBSPSession) -> None:
"""Sends `CLOSE` message to the client associated with session and calls
`on_session_closed` event handler. The message passed to the event handler is
the initial `HELLO` message from client.
Arguments:
channel: Channel associate with data pipe.
session: Session associated with transmission.
Important:
All exceptions raised by event handler are silently ignored.
"""
channel.send(self.create_message_for(MsgType.CLOSE, session.greeting.token), session)
with suppress(Exception):
# We don't want to handle exception via `handle_exception` and we're closing
# the session anyway
self.on_session_closed(channel, session, session.greeting)
[docs]
def close(self, channel: Channel):
"""Close all connections to attached clients.
Arguments:
channel: Channel used for transmission
"""
while channel.sessions:
_, session = channel.sessions.popitem()
with suppress(Exception):
# channel could be already closed from other side, as we are closing it too
# we can ignore any send errors
self.send_close(channel, session)
@eventsocket
def on_accept_client(self, channel: Channel, msg: HelloMessage) -> None:
"""`~firebird.base.signal.eventsocket` called when HELLO message is received from
client.
Arguments:
channel: Channel that received the message.
msg: Received message.
The event handler may reject the client by raising the `StopError` exception
with `code` attribute containing the `ErrorCode` to be returned in ERROR message.
If event handler does not raise an exception, the client is accepted, new session
is created and WELCOME message is sent to the client.
"""
@eventsocket
def on_cancel(self, channel: Channel, session: FBSPSession, msg: CancelMessage) -> None:
"""`~firebird.base.signal.eventsocket` called when CANCEL message is received from
client.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
The event handler must raise an `.Error` exception with `code` attribute set to
the error code to be returned in ERROR message. If request was successfuly cancelled,
the code must be `ErrorCode.REQUEST_CANCELLED`.
"""
@eventsocket
def on_noop(self, channel: Channel, session: FBSPSession) -> None:
"""`~firebird.base.signal.eventsocket` called when NOOP message is received, and
after ACK-REPLY (if requested) is send.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
@eventsocket
def on_data(self, channel: Channel, session: FBSPSession, msg: DataMessage) -> None:
"""`~firebird.base.signal.eventsocket` called when DATA message is received.
Important:
If handler is defined, it must send the ACK-REPLY if received message has
ACK-REQ set. It's up to service interface whether ACK-REPLY would be sent
before, or after data contained within message are processed.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
@eventsocket
def on_session_closed(self, channel: Channel, session: FBSPSession, msg: FBSPMessage,
exc: Exception | None=None) -> None:
"""`~firebird.base.signal.eventsocket` called when CLOSE message is received or
sent, to release any resources associated with current transmission.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
msg: Received/sent CLOSE message.
exc: Exception that caused the error.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
@eventsocket
def on_ack_received(self, channel: Channel, session: FBSPSession, msg: DataMessage) -> None:
"""`~firebird.base.signal.eventsocket` called when ACK-REPLY NOOP, DATA, REPLY or
STATE message is received.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received ACK-REPLY message.
Note:
All exceptions raised by handler are handled by `handle_exception`.
"""
[docs]
class FBSPClient(_FBSP):
"""4/FBSP - Firebird Butler Service Protocol - Client side.
This is the RAW version of client side FBSP protocol for clients that directly process
FBSP messages received from service. Such clients call `Channel.receive()` and then
process the returned message or sentinel.
This protocol implementation handles only essential message processing:
- Message parsing.
- ACK-REQ for received STATE and NOOP messages.
- Returns INVALID sentinel for received HELLO and CANCEL messages.
- Closes the session when CLOSE message is received.
"""
[docs]
def __init__(self, *, session_type: type[FBSPSession]=FBSPSession):
"""
Arguments:
session_type: Class for session objects.
"""
super().__init__(session_type=session_type)
self.handlers.update({MsgType.WELCOME: self.handle_welcome_msg,
MsgType.ERROR: self.handle_fbsp_msg,
MsgType.REPLY: self.handle_fbsp_msg,
MsgType.STATE: self.handle_fbsp_msg,
MsgType.NOOP: self.handle_noop_msg,
MsgType.DATA: self.handle_fbsp_msg,
MsgType.CLOSE: self.handle_close_msg,
MsgType.REQUEST: self.handle_fbsp_msg,
MsgType.HELLO: self.handle_unexpected_msg,
MsgType.CANCEL: self.handle_unexpected_msg,
})
self._apis: dict[ButlerInterface, int] = {}
[docs]
def handle_welcome_msg(self, channel: Channel, session: FBSPSession, msg: WelcomeMessage) -> WelcomeMessage:
"""Process `WELCOME` message received from service.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
"""
session.greeting = msg
session.partner_uid = uuid.UUID(bytes=msg.data.service.uid)
for intf in msg.data.api:
self._apis[uuid.UUID(bytes=intf.uid)] = intf.number
return msg
[docs]
def handle_fbsp_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> FBSPMessage:
"""FBSP message handler that simply returns received message.
If message is a `STATE` message with ACK-REQ, sends ACK-REPLY. ACK-REQ for other
messages must be handled by caller.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions raised in handler are handled by `handle_exception`.
"""
if (msg.msg_type is MsgType.STATE) and msg.has_ack_req():
channel.send(self.create_ack_reply(msg), session)
return msg
[docs]
def handle_unexpected_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> FBSPMessage:
"""FBSP message handler that returns INVALID sentinel.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
"""
return INVALID
[docs]
def handle_noop_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None:
"""Process `NOOP` message received from service.
Sends ACK-REPLY if requested.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
if msg.has_ack_req():
channel.send(self.create_ack_reply(msg), session)
[docs]
def handle_close_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> FBSPMessage:
"""Process `CLOSE` message received from service.
Discards the session and returns the CLOSE message received.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
"""
channel.discard_session(session)
return msg
[docs]
def has_api(self, api: type[ButlerInterface]) -> bool:
"""Returns True if attached service supports specified interface.
Arguments:
api: Service API (interface) enumeration.
"""
return api.get_uid() in self._apis
[docs]
def create_request_for(self, session: FBSPSession, api_code: ButlerInterface, token: Token) -> APIMessage:
"""Returns new `REQUEST` message for specific API call.
Arguments:
session: Session instance
api_code: API Code
token: Message token
"""
return self.create_message_for(MsgType.REQUEST, token,
bb2h(self._apis[api_code.__class__.get_uid()],
api_code.value))
[docs]
def exception_for(self, msg: ErrorMessage) -> ServiceError:
"""Returns a .ServiceError exception populated with details from the provided ERROR
message.
Arguments:
msg: ERROR message received
"""
desc = [f"{msg.error_code.name}, relates to {msg.relates_to.name}"]
for err in msg.errors:
desc.append(f"#{err.code} : {err.description}")
return ServiceError('\n'.join(desc))
[docs]
def send_hello(self, channel: Channel, session: FBSPSession, agent: AgentDescriptor,
peer: PeerDescriptor, token: Token=bytes(8)) -> None:
"""Sends `HELLO` message to the service.
Arguments:
channel: Channel used for connection to the service.
session: Session associated with service.
agent: Agent descriptor for client.
peer: Peer descriptor for client.
token: FBSP message token to be used in HELLO message.
"""
msg: HelloMessage = self.create_message_for(MsgType.HELLO, token)
msg.data.instance.uid = peer.uid.bytes
msg.data.instance.pid = peer.pid
msg.data.instance.host = peer.host
msg.data.client.uid = agent.uid.bytes
msg.data.client.name = agent.name
msg.data.client.version = agent.version
msg.data.client.classification = agent.classification
msg.data.client.vendor.uid = agent.vendor_uid.bytes
msg.data.client.platform.uid = agent.platform_uid.bytes
msg.data.client.platform.version = agent.platform_version
channel.send(msg, session)
[docs]
def send_close(self, channel: Channel, session: FBSPSession) -> None:
"""Sends `CLOSE` message to the service associated with session.
Arguments:
channel: Channel associate with data pipe.
session: Session associated with transmission.
"""
channel.send(self.create_message_for(MsgType.CLOSE, session.greeting.token), session)
[docs]
class FBSPEventClient(FBSPClient):
"""4/FBSP - Firebird Butler Service Protocol - Client side.
This is the EVENT version of client side FBSP protocol for clients that process
FBSP messages received from service indirectly. Such clients use central I/O loop
that process incomming messages in uniform way, and actual processing is done via event
handlers.
"""
[docs]
def __init__(self, *, session_type: type[FBSPSession]=FBSPSession):
"""
Arguments:
session_type: Class for session objects.
"""
super().__init__(session_type=session_type)
self.api_handlers: dict[Any, Callable] = {}
self.handlers.update({MsgType.WELCOME: self.handle_welcome_msg,
MsgType.ERROR: self.handle_error_msg,
MsgType.REPLY: self.handle_reply_msg,
MsgType.STATE: self.handle_state_msg,
MsgType.NOOP: self.handle_noop_msg,
MsgType.DATA: self.handle_data_msg,
MsgType.CLOSE: self.handle_close_msg,
MsgType.REQUEST: self.handle_ack_reply,
MsgType.HELLO: self.handle_unexpected_msg,
MsgType.CANCEL: self.handle_unexpected_msg,
})
[docs]
def register_api_handler(self, api_code: ButlerInterface, handler: Callable) -> None:
"""Register handler for response messages for particular service API code.
Arguments:
api_code: API code.
handler: Callable that handles the API code matching the signature
(channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None.
"""
# Validate handler via eventsocket
_hndcheck.client_handler = handler
_hndcheck.client_handler = None
self.api_handlers[bb2h(self._apis[api_code.__class__.get_uid()], api_code.value)] = handler
[docs]
def handle_unexpected_msg(self, channel: Channel, session: FBSPSession, msg: HelloMessage) -> None:
"""Process unexpected valid message received from service.
Raises `.StopError` with error code `2 - Protocol violation`.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
raise StopError("Unexpected message", code=ErrorCode.PROTOCOL_VIOLATION)
[docs]
def handle_welcome_msg(self, channel: Channel, session: FBSPSession, msg: WelcomeMessage) -> None:
"""Process `WELCOME` message received from service.
Calls `on_service_connected` event handler.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
"""
super().handle_welcome_msg(channel, session, msg)
self.on_service_connected(channel, session, msg, self)
[docs]
def handle_error_msg(self, channel: Channel, session: FBSPSession, msg: ErrorMessage) -> None:
"""Process `ERROR` message received from service.
Calls `.on_error` event handler.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
self.on_error(channel, session, msg)
[docs]
def handle_reply_msg(self, channel: Channel, session: FBSPSession, msg: APIMessage) -> None:
"""Process `REPLY` message received from service.
Calls the appropriate API handler set via `.register_api_handler()`.
Important:
The registered API handler must send the `ACK-REPLY` message if requested.
According to FBSP specification, the reply must be acknowledged without any delay,
unless a previous agreement between the Client and the Service exists to handle
it differently (for example when Client is prepared to accept subsequent `DATA` or
other messages from Service).
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
handler = self.api_handlers.get(msg.type_data)
handler(channel, session, msg, self)
[docs]
def handle_state_msg(self, channel: Channel, session: FBSPSession, msg: StateMessage) -> None:
"""Process `STATE` message received from service.
Sends ACK-REPLY if requested and calls `.on_state` event handler.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
if msg.has_ack_req():
channel.send(self.create_ack_reply(msg), session)
self.on_state(channel, session, msg)
[docs]
def handle_noop_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None:
"""Process `NOOP` message received from service.
If message is an ACK-REPLY, it calls `.on_ack_received` event handler.
Otherwise it sends ACK-REPLY if requested and then calls `.on_noop` event handler.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
if msg.has_ack_reply():
self.on_ack_received(channel, session, msg)
return
if msg.has_ack_req():
channel.send(self.create_ack_reply(msg), session)
self.on_noop(channel, session)
[docs]
def handle_data_msg(self, channel: Channel, session: FBSPSession, msg: DataMessage) -> None:
"""Process `DATA` message received from serviec.
If message is an ACK-REPLY, it calls `.on_ack_received` event handler.
Otherwise it calls `.on_data` event handler that must handle the ACK-REQ if set.
If `.on_data` event handler is not defined, the ACK-REPLY message is send by this
handler.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
if msg.has_ack_reply():
self.on_ack_received(channel, session, msg)
return
self.on_data(channel, session, msg)
if not self.on_data.is_set() and msg.has_ack_req():
channel.send(self.create_ack_reply(msg), session)
[docs]
def handle_close_msg(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None:
"""Process `CLOSE` message received from service.
Calls `.on_session_closed` event handler and then discards the session.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
"""
try:
self.on_session_closed(channel, session, msg)
except Exception:
# We don't want to handle this via `handle_exception` and we're closing
# the session anyway
pass
finally:
channel.discard_session(session)
[docs]
def handle_ack_reply(self, channel: Channel, session: FBSPSession, msg: FBSPMessage) -> None:
"""Process ACK-REPLY messages received from service.
If message has ACK-REPLY flag set, it calls `.on_ack_received` event handler.
Otherwise it's ignored because it violates the protocol.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
if msg.has_ack_reply():
self.on_ack_received(channel, session, msg)
@eventsocket
def on_service_connected(self, channel: Channel, session: FBSPSession,
msg: WelcomeMessage, protocol: FBSPEventClient) -> None:
"""`~firebird.base.signal.eventsocket` called when `WELCOME` message is received
from service.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
protocol: This FBSPEventClient instance, passed to allow the handler to make
further protocol calls if needed.
The event handler should register all service API handlers for `REPLY` messages
(see `FBSPEventClient.register_api_handler()`).
"""
@eventsocket
def on_noop(self, channel: Channel, session: FBSPSession) -> None:
"""`~firebird.base.signal.eventsocket` called when `NOOP` message is received, and
after ACK-REPLY (if requested) is send.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
"""
@eventsocket
def on_data(self, channel: Channel, session: FBSPSession, msg: DataMessage) -> None:
"""`~firebird.base.signal.eventsocket` called when `DATA` message is received.
Important:
If handler is defined, it must send the ACK-REPLY if received message has
ACK-REQ set. It's up to service interface whether ACK-REPLY would be sent
before, or after data contained within message are processed.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
@eventsocket
def on_error(self, channel: Channel, session: FBSPSession, msg: ErrorMessage) -> None:
"""`~firebird.base.signal.eventsocket` called when `ERROR` message is received.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
@eventsocket
def on_state(self, channel: Channel, session: FBSPSession, msg: StateMessage) -> None:
"""`~firebird.base.signal.eventsocket` called when `STATE` message is received.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
@eventsocket
def on_session_closed(self, channel: Channel, session: FBSPSession, msg: FBSPMessage,
exc: Exception | None=None) -> None:
"""`~firebird.base.signal.eventsocket` called when `CLOSE` message is received or
sent, to release any resources associated with current transmission.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
msg: Received/sent CLOSE message.
exc: Exception that caused the error.
"""
@eventsocket
def on_ack_received(self, channel: Channel, session: FBSPSession, msg: DataMessage) -> None:
"""`~firebird.base.signal.eventsocket` called when ACK-REPLY message is received.
Arguments:
channel: Channel that received the message.
session: Session instance.
msg: Received ACK-REPLY message.
Note:
All exceptions are handled by `~saturnin.base.transport.Protocol.handle_exception`.
"""
[docs]
class _APIHandlerChecker:
"""Helper class for validation of API handlers.
"""
@eventsocket
def service_handler(self, channel: Channel, session: FBSPSession, msg: FBSPMessage,
protocol: FBSPService) -> None:
"Signature definition for FBSP Service API handlers."
@eventsocket
def client_handler(self, channel: Channel, session: FBSPSession, msg: FBSPMessage,
protocol: FBSPEventClient) -> None:
"Signature definition for FBSP Event Client API handlers."
_hndcheck: _APIHandlerChecker = _APIHandlerChecker()