# SPDX-FileCopyrightText: 2019-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: saturnin
# FILE: saturnin/base/types.py
# DESCRIPTION: Saturnin type definitions and constants
# CREATED: 22.4.2019
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2019 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
# ______________________________________.
# pylint: disable=R0902
"""
Saturnin common type definitions and constants
==============================================
This module contains:
1. Type aliases and new types for type annotations.
2. Commonly used constants like platform, vendor and mime type identifiers.
3. Exceptions.
4. Sentinels.
5. Enums.
6. Dataclasses.
"""
from __future__ import annotations
from typing import List, Dict, Optional, Callable, Any, NewType, ByteString, Final
from enum import IntEnum, IntFlag, Enum, auto
import uuid
from dataclasses import dataclass, field, replace as _dcls_replace
import zmq
from firebird.base.types import Error, Distinct, MIME, Sentinel
from firebird.base.config import Config
from firebird.base.protobuf import PROTO_STRUCT, load_registered, create_message, \
struct2dict, dict2struct
# Type annotation types
TSupplement = Optional[Dict[str, Any]]
"""name/value dictionary"""
Token = NewType('Token', ByteString)
"""Message token"""
RoutingID = NewType('RoutingID', ByteString)
"""Routing ID"""
# Constants
#: Platform OID (`firebird.butler.platform.saturnin`)
PLATFORM_OID: Final[str] = '1.3.6.1.4.1.53446.1.1.0'
#: Platform UID (:func:`~uuid.uuid5` - NAMESPACE_OID)
PLATFORM_UID: Final[uuid.UUID] = uuid.uuid5(uuid.NAMESPACE_OID, PLATFORM_OID)
#: Platform version (semver)
PLATFORM_VERSION: Final[str] = '0.8.0'
#: Platform vendor OID (`firebird.butler.vendor.firebird`)
VENDOR_OID: Final[str] = '1.3.6.1.4.1.53446.1.2.0'
#: Platform vendor UID (:func:`~uuid.uuid5` - NAMESPACE_OID)
VENDOR_UID: Final[uuid.UUID] = uuid.uuid5(uuid.NAMESPACE_OID, VENDOR_OID)
#: MIME type for protobuf messages
MIME_TYPE_PROTO: Final[str] = MIME('application/x.fb.proto')
#: MIME type for plain text
MIME_TYPE_TEXT: Final[str] = MIME('text/plain')
#: MIME type for binary data
MIME_TYPE_BINARY: Final[str] = MIME('application/octet-stream')
#: Configuration section name for local service addresses
SECTION_LOCAL_ADDRESS: Final[str] = 'local_address'
#: Configuration section name for node service addresses
SECTION_NODE_ADDRESS: Final[str] = 'node_address'
#: Configuration section name for network service addresses
SECTION_NET_ADDRESS: Final[str] = 'net_address'
#: Configuration section name for service UIDs
SECTION_SERVICE_UID: Final[str] = 'service_uid'
#: Configuration section name for service peer UIDs
SECTION_PEER_UID: Final[str] = 'peer_uid'
#: Default configuration section name for service bundle
SECTION_BUNDLE: Final[str] = 'bundle'
#: Default configuration section name for single service
SECTION_SERVICE: Final[str] = 'service'
#: protobuf ID for peer information message
PROTO_PEER: Final[str] = 'firebird.butler.PeerIdentification'
# Exceptions
[docs]
class InvalidMessageError(Error):
"A formal error was detected in a message"
[docs]
class ChannelError(Error):
"Transmission channel error"
[docs]
class ServiceError(Error):
"Error raised by service"
[docs]
class ClientError(Error):
"Error raised by Client"
[docs]
class StopError(Error):
"Exception that should stop furter processing."
[docs]
class RestartError(Error):
"Exception signaling that restart is needed for further processing."
#Sentinels
#: Sentinel for return values that indicates failed message processing
INVALID: Final[Sentinel] = Sentinel('INVALID')
#: Sentinel for return values that indicates timeout expiration
TIMEOUT: Final[Sentinel] = Sentinel('TIMEOUT')
#: Sentinel for return values that indicates restart request
RESTART: Final[Sentinel] = Sentinel('RESTART')
# Enums
[docs]
class Origin(IntEnum):
"""Origin of received message in protocol context."""
UNKNOWN = auto()
SERVICE = auto()
CLIENT = auto()
ANY = auto()
# Aliases
PROVIDER = SERVICE
CONSUMER = CLIENT
[docs]
def peer_role(self) -> Origin:
"""Returns peer's role, i.e. complementary to current (CLIENT/SERVICE,
PROVIDER/CONSULER) value.
Note: Returns current value if it's `ANY` or `UNKNOWN`.
"""
if self in (Origin.ANY, Origin.UNKNOWN):
return self
return Origin.CLIENT if self == Origin.SERVICE else Origin.SERVICE
[docs]
class SocketMode(IntEnum):
"""ZeroMQ socket mode."""
UNKNOWN = auto()
BIND = auto()
CONNECT = auto()
[docs]
class Direction(IntFlag):
"""ZeroMQ socket direction of transmission."""
NONE = 0
IN = zmq.POLLIN
OUT = zmq.POLLOUT
BOTH = OUT | IN
[docs]
class SocketType(IntEnum):
"""ZeroMQ socket type."""
UNKNOWN_TYPE = -1 # Not a valid option, defined only to handle undefined values
DEALER = zmq.DEALER
ROUTER = zmq.ROUTER
PUB = zmq.PUB
SUB = zmq.SUB
XPUB = zmq.XPUB
XSUB = zmq.XSUB
PUSH = zmq.PUSH
PULL = zmq.PULL
STREAM = zmq.STREAM
PAIR = zmq.PAIR
[docs]
class State(IntEnum):
"""General state information."""
UNKNOWN_STATE = 0
READY = 1
RUNNING = 2
WAITING = 3
SUSPENDED = 4
FINISHED = 5
ABORTED = 6
# Aliases
CREATED = READY
BLOCKED = WAITING
STOPPED = SUSPENDED
TERMINATED = ABORTED
[docs]
class PipeSocket(IntEnum):
"""Data Pipe Socket identification."""
UNKNOWN_PIPE_SOCKET = 0 # Not a valid option, defined only to handle undefined values
INPUT = 1
OUTPUT = 2
[docs]
class FileOpenMode(IntEnum):
"""File open mode."""
UNKNOWN_FILE_OPEN_MODE = 0 # Not a valid option, defined only to handle undefined values
READ = 1
CREATE = 2
WRITE = 3
APPEND = 4
RENAME = 5
[docs]
class Outcome(Enum):
"""Service execution outcome.
"""
UNKNOWN = 'UNKNOWN'
OK = 'OK'
ERROR = 'ERROR'
[docs]
class ButlerInterface(IntEnum):
"""Base class for service API code enumerations (FBSP interfaces).
"""
[docs]
@classmethod
def get_uid(cls) -> uuid.UUID:
"Returns interface UUID."
raise NotImplementedError()
# Dataclasses
[docs]
@dataclass(eq=True, order=False, frozen=True)
class AgentDescriptor(Distinct):
"""Service or Client descriptor dataclass.
Note:
Because this is a `dataclass`, the class variables are those attributes that have
default value. Other attributes are created in constructor.
Arguments:
uid: Agent ID
name: Agent name
version: Agent version string
vendor_uid: Vendor ID
classification: Agent classification string
platform_uid: Butler platform ID
platform_version: Butler platform version string
supplement: Optional list of supplemental information
"""
uid: uuid.UUID
name: str
version: str
vendor_uid: uuid.UUID
classification: str
platform_uid: uuid.UUID = PLATFORM_UID
platform_version: str = PLATFORM_VERSION
supplement: TSupplement = None
[docs]
def get_key(self) -> Any:
"""Returns `uid` (instance key). Used for instance hash computation."""
return self.uid
[docs]
def copy(self) -> AgentDescriptor:
"""Returns copy of this AgentDescriptor instance.
"""
return _dcls_replace(self)
[docs]
def replace(self, **changes) -> AgentDescriptor:
"""Creates a new `AgentDescriptor`, replacing fields with values from `changes`.
"""
return _dcls_replace(self, **changes)
[docs]
@dataclass(eq=True, order=False, frozen=True)
class PeerDescriptor(Distinct):
"""Peer descriptor.
Arguments:
uid: Peer ID
pid: Peer process ID
host: Host name
supplement: Optional list of supplemental information
"""
uid: uuid.UUID
pid: int
host: str
supplement: TSupplement = None
[docs]
def get_key(self) -> Any:
"""Returns `uid` (instance key). Used for instance hash computation."""
return self.uid
[docs]
def as_proto(self) -> Any:
"""Returns `firebird.butler.PeerIdentification` protobuf message initialized
from instance data.
"""
msg = create_message(PROTO_PEER)
msg.uid = self.uid.bytes
msg.pid = self.pid
msg.host = self.host
if self.supplement is not None:
sup = msg.supplement.add()
sup.Pack(dict2struct(self.supplement))
return msg
[docs]
def copy(self) -> PeerDescriptor:
"""Returns copy of this PeerDescriptor instance.
"""
return _dcls_replace(self)
[docs]
def replace(self, **changes) -> PeerDescriptor:
"""Creates a new `PeerDescriptor`, replacing fields with values from `changes`.
"""
return _dcls_replace(self, **changes)
[docs]
@classmethod
def from_proto(cls, proto: Any) -> PeerDescriptor:
"""Creates new PeerDescriptor from `firebird.butler.PeerIdentification` protobuf
message.
"""
if proto.DESCRIPTOR.full_name != PROTO_PEER:
raise ValueError("PeerIdentification protobuf message required")
data = None
if proto.supplement:
for i in proto.supplement:
if i.TypeName() == PROTO_STRUCT:
msg = create_message(PROTO_STRUCT)
i.Unpack(msg)
data = struct2dict(msg)
break
return cls(uuid.UUID(bytes=proto.uid), proto.pid, proto.host, data)
[docs]
@dataclass(eq=True, order=False, frozen=True)
class ServiceDescriptor(Distinct):
"""Service descriptor.
Arguments:
agent: Service agent descriptor
api: Service FBSP API description or `None` for microservice
description: Text describing the service
facilities: List of Saturnin facilities that this service uses
factory: Locator string for service factory
config: Service configuration factory
"""
agent: AgentDescriptor
api: List[ButlerInterface]
description: str
facilities: List[str]
factory: str
config: Callable[[], Config]
[docs]
def get_key(self) -> Any:
"""Returns `agent.uid` (instance key). Used for instance hash computation."""
return self.agent.uid
[docs]
@dataclass(eq=True, order=False, frozen=True)
class ApplicationDescriptor(Distinct):
"""Application descriptor.
Arguments:
uid: Application ID
name: Application name
version: Application version string
vendor_uid: Vendor ID
classification: Application classification string
description: Text describing the application
factory: Locator string for application `typer` command
config: Locator string for application configuration factory
"""
uid: uuid.UUID
name: str
version: str
vendor_uid: uuid.UUID
classification: str
description: str
factory: str
config: str
[docs]
def get_key(self) -> Any:
"""Returns `uid` (instance key). Used for instance hash computation."""
return self.uid
[docs]
@dataclass(order=True)
class PrioritizedItem:
"""Prioritized item for use with `heapq` to implement priority queue.
Arguments:
priority: Item priority
item: Prioritized item
"""
priority: int
item: Any=field(compare=False)
load_registered('firebird.butler.protobuf')
load_registered('firebird.base.protobuf')
del load_registered