# SPDX-FileCopyrightText: 2020-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: saturnin
# FILE: saturnin/protocol/iccp.py
# DESCRIPTION: Internal Component Control Protocol
# CREATED: 16.11.2020
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2020 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
# ______________________________________
"""Saturnin Internal Component Control Protocol (ICCP).
This protocol defines the messages and rules for communication between Saturnin
components (like services or microservices) and their controllers. It handles
lifecycle events (ready, stop, finished), configuration requests, and error reporting.
"""
from __future__ import annotations
import traceback
import uuid
from collections.abc import Iterable
from enum import Enum, IntEnum
from struct import pack, unpack
from typing import Final
from saturnin.base import (
PROTO_PEER,
Channel,
InvalidMessageError,
Message,
Outcome,
PeerDescriptor,
Protocol,
RoutingID,
Session,
StopError,
TZMQMessage,
)
from firebird.base import protobuf
from firebird.base.config import PROTO_CONFIG, Config, ConfigProto, ZMQAddress
from firebird.base.signal import eventsocket
from firebird.base.types import ANY
[docs]
class MsgType(IntEnum):
"""Control message type.
"""
READY = 1
REQUEST = 2
OK = 3
ERROR = 4
STOP = 5
FINISHED = 6
[docs]
class Request(Enum):
"""Controller Request Codes for ICCP.
"""
CONFIGURE = b'CONF'
[docs]
class ICCPMessage(Message):
"""Service Control Message.
"""
def __init__(self):
#: Type of message
self.msg_type: MsgType = None
def __str__(self):
return f"{self.__class__.__qualname__}[{'NONE' if self.msg_type is None else self.msg_type.name}]"
__repr__ = __str__
[docs]
def from_zmsg(self, zmsg: TZMQMessage) -> None:
"""Populate message data from sequence of ZMQ data frames.
Arguments:
zmsg: Sequence of ZMQ frames to be deserialized. The first frame is
always the packed `MsgType`. Subsequent frames depend on the
message type (e.g., peer info for `READY`, error string for `ERROR`).
Raises:
InvalidMessageError: If message is not a valid protocol message.
"""
try:
self.msg_type = MsgType(unpack('!H', zmsg[0])[0])
if self.msg_type is MsgType.READY:
proto = protobuf.create_message(PROTO_PEER, zmsg[1])
self.peer = PeerDescriptor.from_proto(proto)
proto = protobuf.create_message(protobuf.PROTO_STRUCT, zmsg[2])
self.endpoints = {}
for k, v in protobuf.struct2dict(proto).items():
for i in range(len(v)):
v[i] = ZMQAddress(v[i])
self.endpoints[k] = v
elif self.msg_type is MsgType.ERROR:
self.error = zmsg[1].decode('utf8')
elif self.msg_type is MsgType.FINISHED:
self.outcome = Outcome(zmsg[1].decode())
self.details = [v.decode('utf8', errors='replace') for v in zmsg[2:]]
elif self.msg_type is MsgType.REQUEST:
self.request = Request(zmsg[1])
if self.request is Request.CONFIGURE:
self.config = protobuf.create_message(PROTO_CONFIG, zmsg[2])
except Exception as exc:
raise InvalidMessageError("Invalid message") from exc
[docs]
def as_zmsg(self) -> TZMQMessage:
"""Returns message as sequence of ZMQ data frames.
Returns:
A list of ZMQ frames representing the message. The first frame is
the packed `MsgType`. Subsequent frames are added based on the
message type and its attributes.
"""
try:
zmsg = [pack('!H', self.msg_type)]
if self.msg_type is MsgType.READY:
zmsg.append(self.peer.as_proto().SerializeToString())
zmsg.append(protobuf.dict2struct(self.endpoints).SerializeToString())
elif self.msg_type is MsgType.ERROR:
zmsg.append(self.error.encode('utf8', errors='replace'))
elif self.msg_type is MsgType.FINISHED:
zmsg.append(self.outcome.value.encode('utf-8', errors='replace'))
zmsg.extend(v.encode('utf-8', errors='replace') for v in self.details)
elif self.msg_type is MsgType.REQUEST:
if self.request is Request.CONFIGURE:
zmsg.append(self.config.SerializeToString())
except Exception:
traceback.print_exc()
return zmsg
[docs]
def clear(self) -> None:
"""Clears message data.
"""
self.msg_type = None
for attr in ('peer', 'endpoints', 'error', 'request', 'config'):
if hasattr(self, attr):
delattr(self, attr)
[docs]
def copy(self) -> Message:
"""Returns copy of the message.
Returns:
A new `ICCPMessage` instance with a deep copy of relevant
attributes (like `peer`, `endpoints`, `config`) based on the
original message's `msg_type`.
"""
msg = self.__class__()
msg.msg_type = self.msg_type
if self.msg_type is MsgType.READY:
msg.peer = self.peer.copy()
msg.endpoints = self.endpoints.copy()
elif self.msg_type is MsgType.ERROR:
msg.error = self.error
elif self.msg_type is MsgType.REQUEST:
msg.request = self.request
if self.request is Request.CONFIGURE:
msg.config = protobuf.create_message(PROTO_CONFIG)
msg.config.CopyFrom(self.config)
return msg
[docs]
def get_keys(self) -> Iterable:
"""Returns iterable of dictionary keys to be used with `.Protocol.handlers`.
Keys must be provided in order of precedence (from more specific to general).
"""
return [self.msg_type, ANY]
[docs]
class _ICCP(Protocol):
"""Internal Component Control Protocol (ICCP).
Used by Saturnin internally for component/controller transmissions.
"""
#: string with protocol OID (dot notation).
OID: Final[str] = '1.3.6.1.4.1.53446.1.1.0.1.1'
# iso.org.dod.internet.private.enterprise.firebird.butler.platform.saturnin.protocol.iscp
#: UUID instance that identifies the protocol.
UID: Final[uuid.UUID] = uuid.uuid5(uuid.NAMESPACE_OID, OID)
[docs]
def __init__(self, *, session_type: type[Session] = Session):
"""
Arguments:
session_type: Class for session objects.
"""
super().__init__(session_type=session_type)
self._msg: ICCPMessage = ICCPMessage()
self.message_factory = self.__message_factory
def __message_factory(self, zmsg: TZMQMessage | None=None) -> Message:
"Internal message factory"
self._msg.clear()
return self._msg
[docs]
def validate(self, zmsg: TZMQMessage) -> None:
"""Verifies that sequence of ZMQ data frames is a valid protocol message.
If this validation passes without exception, then `.parse()` of the same message
must be successful as well.
Arguments:
zmsg: ZeroMQ multipart message. Expects the first frame to be the
packed `MsgType`, followed by type-specific frames.
Raises:
InvalidMessageError: If ZMQ message is not a valid protocol message.
"""
if not zmsg:
raise InvalidMessageError("Empty message")
try:
msg_type = MsgType(unpack('!H', zmsg[0]))
except Exception as exc:
raise InvalidMessageError("Invalid message type") from exc
if msg_type is MsgType.READY:
try:
PeerDescriptor.from_proto(protobuf.create_message(PROTO_PEER, zmsg[1]))
except Exception as exc:
raise InvalidMessageError("Invalid data: peer descriptor") from exc
try:
protobuf.create_message(protobuf.PROTO_STRUCT, zmsg[2])
except Exception as exc:
raise InvalidMessageError("Invalid data: endpoints") from exc
elif msg_type is MsgType.ERROR:
try:
zmsg[1].decode('utf8')
except Exception as exc:
raise InvalidMessageError("Invalid data: error message") from exc
elif msg_type is MsgType.REQUEST:
try:
req = Request(zmsg[1])
except Exception as exc:
raise InvalidMessageError("Invalid request code") from exc
if req is Request.CONFIGURE:
try:
protobuf.create_message(PROTO_CONFIG, zmsg[2])
except Exception as exc:
raise InvalidMessageError("Invalid data: config") from exc
[docs]
class ICCPComponent(_ICCP):
"""Internal Component Control Protocol (ICCP) - Component (client) side.
Used by Saturnin internally for component/controller transmissions.
"""
[docs]
def __init__(self, *, session_type: type[Session] = Session, with_traceback: bool=False):
"""
Arguments:
session_type: Class for session objects.
with_traceback: When True, stores traceback along with exception in ERROR/FINISHED.
"""
super().__init__(session_type=session_type)
self.with_traceback: bool = with_traceback
self.handlers.update({MsgType.READY: self.wrong_message,
MsgType.REQUEST: self.handle_request,
MsgType.OK: self.wrong_message,
MsgType.ERROR: self.wrong_message,
MsgType.STOP: self.handle_stop,
MsgType.FINISHED: self.wrong_message,
})
[docs]
def accept_new_session(self, channel: Channel, routing_id: RoutingID,
msg: ICCPMessage) -> bool:
"""Validates incoming message that initiated new session/transmission.
Arguments:
channel: Channel that received the message.
routing_id: Routing ID of the sender.
msg: Received message.
Returns:
Always False (transmission must be initiated by Component).
"""
return False
[docs]
def connect_with_session(self, channel: Channel) -> bool:
"""Called by :meth:`Channel.connect` to determine whether new session should be
associated with connected peer.
As ICCP require that connecting peers must send a message to initiate transmission,
it always returns True.
Arguments:
channel: Channel associated with controller.
"""
return True
[docs]
def wrong_message(self, channel: Channel, session: Session, msg: ICCPMessage) -> None:
"""Handle wrong message received from controller.
Arguments:
channel: Channel associated with controller.
session: Session associated with controller.
msg: Message sent by controller.
Raises `.StopError`, which in turn calls `.on_exception` from `.handle_msg`.
"""
raise StopError("Wrong message received from controller")
[docs]
def handle_invalid_msg(self, channel: Channel, session: Session,
exc: InvalidMessageError) -> None:
"""Event handler for `.on_invalid_msg`. Calls `on_stop_component` with exception.
Arguments:
channel: Channel associated with controller.
session: Session associated with controller.
exc: Exception raised.
"""
self.on_stop_component(exc)
super().handle_invalid_msg(channel, session, exc)
[docs]
def handle_exception(self, channel: Channel, session: Session, msg: ICCPMessage,
exc: Exception) -> None:
"""Event handler for `.on_exception`. Calls `on_stop_component` with exception.
Arguments:
channel: Channel associated with controller.
session: Session associated with controller.
msg: Message sent by controller.
exc: Exception to handle.
"""
self.on_stop_component(exc)
super().handle_exception(channel, session, msg, exc)
[docs]
def handle_stop(self, channel: Channel, session: Session, msg: ICCPMessage) -> None:
"""Process `STOP` message received from controller. Calls `on_stop_component`.
Arguments:
channel: Channel associated with controller.
session: Session associated with controller.
msg: Received message.
"""
self.on_stop_component()
[docs]
def handle_request(self, channel: Channel, session: Session, msg: ICCPMessage) -> None:
"""Process `REQUEST` message received from controller.
Currently handles `Request.CONFIGURE` by invoking the `on_config_request`
event with the provided configuration. Sends an `OK` message back to the
controller upon successful processing, or an `ERROR` message if an
exception occurs during the `on_config_request` handling.
Arguments:
channel: The channel connected to the controller.
session: The session instance for this communication.
msg: The received `ICCPMessage` containing the request.
Raises:
StopError: If sending the response (OK/ERROR) to the controller fails.
"""
result = self.ok_msg()
try:
# Right now we have only one type of Request - CONFIGURE
self.on_config_request(msg.config)
except Exception as exc:
result = self.error_msg(exc)
if not (err_code := channel.send(result, session)):
raise StopError("Send to controller failed", err_code=err_code)
[docs]
def ready_msg(self, peer: PeerDescriptor,
endpoints: dict[str, list[ZMQAddress]]) -> ICCPMessage:
"""Returns `READY` control message.
Arguments:
peer: Component descriptor.
endpoints: Component endpoints.
"""
msg: ICCPMessage = self.message_factory()
msg.msg_type = MsgType.READY
msg.peer = peer.copy()
msg.endpoints = endpoints.copy()
return msg
[docs]
def ok_msg(self) -> ICCPMessage:
"""Returns `OK` control message.
"""
msg: ICCPMessage = self.message_factory()
msg.msg_type = MsgType.OK
return msg
[docs]
def error_msg(self, exc: Exception) -> ICCPMessage:
"""Returns `ERROR` control message.
Arguments:
exc: Exception to send.
"""
msg: ICCPMessage = self.message_factory()
msg.msg_type = MsgType.ERROR
if self.with_traceback:
msg.error = ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__))
else:
msg.error = str(exc)
return msg
[docs]
def finished_msg(self, outcome: Outcome,
details: None | Exception | list[str]) -> ICCPMessage:
"""Returns `FINISHED` control message.
Arguments:
outcome: Outcome of componentn run.
details: Additional information about the outcome.
If an `Exception`, its traceback (if `with_traceback` is True)
or string, string representation is used. If a `list[str]`, it's used
directly. If `None`, no details are included.
"""
msg: ICCPMessage = self.message_factory()
msg.msg_type = MsgType.FINISHED
msg.outcome = outcome
if isinstance(details, Exception):
if self.with_traceback:
msg.details = traceback.format_exception(type(details), details,
details.__traceback__)
else:
msg.details = repr(details)
elif details is None:
msg.details = []
else:
msg.details = details.copy()
return msg
@eventsocket
def on_stop_component(self, exc: Exception | None=None) -> None:
"""`~firebird.base.signal.eventsocket` called when commponent should stop its operation.
Arguments:
exc: Exception that describes the reason why component should stop. If not
provided, the component should stop on controller's request.
"""
@eventsocket
def on_config_request(self, config: ConfigProto) -> None:
"""`~firebird.base.signal.eventsocket` called when the controller requests reconfiguration.
The handler should apply the new configuration. Any exception raised by
this event handler will be caught and sent back to the controller as an
`ERROR` message.
Arguments:
config: The new configuration (`~firebird.base.config.ConfigProto`)
provided by the controller.
"""
[docs]
class ICCPController(_ICCP):
"""Internal Component Control Protocol (ICCP) - Controller (server) side.
Used by Saturnin internally for component/controller transmissions.
"""
[docs]
def __init__(self, *, session_type: type[Session] = Session):
"""
Arguments:
session_type: Class for session objects.
"""
super().__init__(session_type=session_type)
self.handlers.update({MsgType.READY: self.handle_ready,
MsgType.REQUEST: self.wrong_message,
MsgType.OK: self.handle_oef,
MsgType.ERROR: self.handle_oef,
MsgType.STOP: self.wrong_message,
MsgType.FINISHED: self.handle_oef,
})
[docs]
def wrong_message(self, channel: Channel, session: Session, msg: ICCPMessage) -> None:
"""Handle wrong message received from component.
Arguments:
channel: Channel associated with component.
session: Session associated with component.
msg: Message sent by component.
Raises `.StopError`, which in turn calls `.on_exception` from `.handle_msg`.
"""
raise StopError("Wrong message received from component")
[docs]
def handle_invalid_msg(self, channel: Channel, session: Session,
exc: InvalidMessageError) -> None:
"""Event handler for `.on_invalid_msg`. Calls `on_stop_controller` with exception.
Arguments:
channel: Channel associated with component.
session: Session associated with component.
exc: Exception raised.
"""
self.on_stop_controller(exc)
super().handle_invalid_msg(channel, session, exc)
[docs]
def handle_exception(self, channel: Channel, session: Session, msg: ICCPMessage,
exc: Exception) -> None:
"""Event handler for `.on_exception`. Calls `on_stop_controller` with exception.
Arguments:
channel: Channel associated with component.
session: Session associated with component.
msg: Message sent by component.
exc: Exception to handle.
"""
self.on_stop_controller(exc)
super().handle_exception(channel, session, msg, exc)
[docs]
def handle_ready(self, channel: Channel, session: Session, msg: ICCPMessage) -> ICCPMessage:
"""Processes a `READY` message received from a component.
If this is the first `READY` message for the session (indicated by
`session.ready` not being set), it marks the session as ready and
returns the received message. This typically signals that the component
is initialized and has provided its peer information and endpoints.
Arguments:
channel: Channel associated with the component.
session: Session associated with the component.
msg: The `READY` message sent by the component.
Returns:
The received `ICCPMessage` if it's the first `READY` for this session.
Raises:
StopError: If a `READY` message is received for a session already marked as ready,
as this indicates an unexpected state.
"""
if hasattr(session, 'ready'):
raise StopError("Unexpected READY message from component")
session.ready = True
return msg
[docs]
def handle_oef(self, channel: Channel, session: Session, msg: ICCPMessage) -> ICCPMessage:
"""Process `OK/ERROR/FINISHED` messages received from component. It simply returns
the message.
Arguments:
channel: Channel associated with component.
session: Session associated with component.
msg: Message sent by component.
"""
if msg.msg_type in (MsgType.OK, MsgType.FINISHED) and not hasattr(session, 'ready'):
raise StopError(f"Unexpected {msg.msg_type.name} message from component")
return msg
[docs]
def stop_msg(self) -> ICCPMessage:
"""Returns `STOP` control message.
"""
msg: ICCPMessage = self.message_factory()
msg.msg_type = MsgType.STOP
return msg
[docs]
def request_config_msg(self, config: Config | None=None) -> ICCPMessage:
"""Returns `REQUEST.CONFIG` control message.
Arguments:
config: Optional `firebird.base.config.Config` instance containing the
configuration for the component. If `None`, an empty configuration
request is created.
"""
msg: ICCPMessage = self.message_factory()
msg.msg_type = MsgType.REQUEST
msg.request = Request.CONFIGURE
msg.config = protobuf.create_message(PROTO_CONFIG)
if config:
config.save_proto(msg.config)
return msg
@eventsocket
def on_stop_controller(self, exc: Exception) -> None:
"""`~firebird.base.signal.eventsocket` called when controller should stop its
operation due to error condition.
Arguments:
exc: Exception that describes the reason why component should stop.
"""