Source code for saturnin.lib.data.onepipe

#
# PROGRAM/MODULE: saturnin
# FILE:           saturnin/lib/data/onepipe.py
# DESCRIPTION:    Base class for Saturnin data provider and consumer microservices
# CREATED:        14.12.2020
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2020 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
#                 ______________________________________

"""Saturnin base classes for data provider and consumer microservices using the FBDP protocol.

This module provides abstract base classes (`BaseDataPipeMicro`, `DataProviderMicro`,
`DataConsumerMicro`) and their associated configurations (`BaseDataPipeConfig`,
`DataProviderConfig`, `DataConsumerConfig`) to simplify the creation of
microservices that act as either producers or consumers of data over a
Saturnin data pipe. These classes handle much of the common boilerplate
for FBDP communication, allowing developers to focus on the specific
data handling logic.
"""

from __future__ import annotations

import uuid
from functools import partial
from typing import Final, cast

import zmq
from saturnin.base import (
    MIME,
    Channel,
    ComponentConfig,
    DealerChannel,
    Direction,
    Error,
    Message,
    Outcome,
    PipeSocket,
    ServiceDescriptor,
    Session,
    SocketMode,
    StopError,
    ZMQAddress,
)
from saturnin.component.micro import MicroService
from saturnin.protocol.fbdp import ErrorCode, FBDPClient, FBDPMessage, FBDPServer, FBDPSession

from firebird.base.config import BoolOption, EnumOption, IntOption, MIMEOption, StrOption, ZMQAddressOption

#: Channel & endpoint name
PIPE_CHN: Final[str] = 'pipe'

[docs] class BaseDataPipeConfig(ComponentConfig): """Base data provider/consumer microservice configuration. """ def __init__(self, name: str): super().__init__(name) #: Stop service when pipe is closed self.stop_on_close: BoolOption = \ BoolOption('stop_on_close', "Stop service when pipe is closed", default=True) #: Data Pipe Identification self.pipe: StrOption = \ StrOption('pipe', "Data Pipe Identification", required=True) #: Data Pipe endpoint address self.pipe_address: ZMQAddressOption = \ ZMQAddressOption('pipe_address', "Data Pipe endpoint address", required=True) #: Data Pipe Mode self.pipe_mode: EnumOption = \ EnumOption('pipe_mode', SocketMode, "Data Pipe Mode", required=True) #: Pipe data format specification self.pipe_format: MIMEOption = \ MIMEOption('pipe_format', "Pipe data format specification") #: Data batch size self.batch_size: IntOption = \ IntOption('batch_size', "Data batch size", required=True, default=50) #: READY message schedule interval in milliseconds self.ready_schedule_interval: IntOption = \ IntOption('ready_schedule_interval', "READY message schedule interval in milliseconds", required=True, default=1000)
[docs] def validate(self) -> None: """Extended validation. - `pipe_format` is required for CONNECT `pipe_mode`. """ super().validate() if self.pipe_mode.value is SocketMode.CONNECT and self.pipe_format.value is None: raise Error("'pipe_format' required for CONNECT pipe mode.")
[docs] class DataProviderConfig(BaseDataPipeConfig): """Base data provider microservice configuration. """
[docs] class DataConsumerConfig(BaseDataPipeConfig): """Base data consumer microservice configuration. """
[docs] class BaseDataPipeMicro(MicroService): """Base data provider/consumer microservice. Abstract base class for both providers and consumers. It handles the common FBDP logic. Descendant classes should override: - `handle_accept_client` to validate client request and acquire resources associated with the pipe. - `handle_produce_data` to produce data for outgoing DATA message (PRODUCER only). - `handle_accept_data` to process received data (CONSUMER only). - `handle_pipe_closed` to release resources associated with the pipe. Arguments: zmq_context: ZeroMQ Context. descriptor: Service descriptor. peer_uid: Peer ID, `None` means that newly generated UUID type 1 should be used. """ def __init__(self, zmq_context: zmq.Context, descriptor: ServiceDescriptor, *, peer_uid: uuid.UUID | None=None): super().__init__(zmq_context, descriptor, peer_uid=peer_uid) #: Pipe socket this service handles if operated as server (bind). Must be set #: in descendant class. #: For PROVIDER it's `.PipeSocket.OUTPUT`, for CONSUMER it's `.PipeSocket.INPUT` self.server_socket: PipeSocket = None #: FDBP protocol handler (server or client) #: This object manages the FBDP state machine and message handling. self.protocol: FBDPServer | FBDPClient = None # Next members are set in initialize() #: [Configuration] Whether service should stop when pipe is closed self.stop_on_close: bool = None #: [Configuration] Data Pipe Identification self.pipe: str = None #: [Configuration] Data Pipe Mode self.pipe_mode: SocketMode = None #: [Configuration] Data Pipe endpoint address self.pipe_address: ZMQAddress = None #: [Configuration] Pipe data format specification self.pipe_format: MIME = None #: [Configuration] Data batch size self.batch_size: int = None #: [Configuration] READY message schedule interval in milliseconds self.ready_schedule_interval: int = None
[docs] def initialize(self, config: BaseDataPipeConfig) -> None: """Verify configuration and assemble component structural parts. - Sets up the service based on the provided configuration. - Creates the appropriate FBDP protocol handler (`FBDPServer` for `BIND`, `FBDPClient` for `CONNECT`). - Registers default event handlers for various FBDP events (e.g., `on_accept_client`, `on_pipe_closed`, `on_produce_data`, `on_accept_data`). - Creates a `DealerChannel` for communication over the pipe. """ super().initialize(config) # Configuration self.stop_on_close = config.stop_on_close.value self.pipe: str = config.pipe.value self.pipe_mode: SocketMode = config.pipe_mode.value self.pipe_address: ZMQAddress = config.pipe_address.value self.pipe_format: MIME | None = config.pipe_format.value self.batch_size: int = config.batch_size.value self.ready_schedule_interval: int = config.ready_schedule_interval.value # Set up FBDP protocol if self.pipe_mode == SocketMode.BIND: # server self.protocol = FBDPServer() self.protocol.on_exception = self.handle_exception self.protocol.on_accept_client = self.handle_accept_client self.protocol.on_schedule_ready = self.handle_schedule_ready # We have an endpoint to bind self.endpoints[PIPE_CHN] = [self.pipe_address] else: # client self.protocol = FBDPClient() # common parts self.protocol.batch_size = self.batch_size self.protocol.on_pipe_closed = self.handle_pipe_closed self.protocol.on_produce_data = self.handle_produce_data self.protocol.on_accept_data = self.handle_accept_data # Create pipe channel self.mngr.create_channel(DealerChannel, PIPE_CHN, self.protocol, wait_for=Direction.IN)
[docs] def aquire_resources(self) -> None: """Acquire resources required by the component. Specifically: If `.pipe_mode` is `~SocketMode.CONNECT`, it connects to the data pipe endpoint and initiates the FBDP `OPEN` handshake. The client socket type (INPUT/OUTPUT) is determined as the inverse of `.server_socket`. """ # Connect to the data pipe if self.pipe_mode == SocketMode.CONNECT: chn: Channel = self.mngr.channels[PIPE_CHN] session = chn.connect(self.pipe_address) # OPEN the data pipe connection, this also fills session attributes # PRODUCER client must attach to INPUT, CONSUMER client must attach to OUTPUT client_socket = PipeSocket.INPUT if self.server_socket is PipeSocket.OUTPUT \ else PipeSocket.OUTPUT cast(FBDPClient, chn.protocol).send_open(chn, session, self.pipe, client_socket, self.pipe_format)
[docs] def release_resources(self) -> None: """Release resources aquired by component: Specifically: Sends an FBDP `CLOSE` message (indicating an error) to all active pipe sessions. """ # CLOSE all active data pipe sessions chn: Channel = self.mngr.channels[PIPE_CHN] # send_close() will discard session, so we can't iterate over sessions.values() directly for session in list(chn.sessions.values()): # We have to report error here, because normal is to close pipes before # shutdown is commenced. Mind that service shutdown could be also caused by error! cast(FBDPServer, chn.protocol).send_close(chn, session, ErrorCode.ERROR)
[docs] def handle_exception(self, channel: Channel, session: Session, msg: Message, exc: Exception) -> None: """Event handler called by `.handle_msg()` on exception in message handler. Arguments: channel: Channel associated with data pipe. session: Session associated with connection. msg: Message. exc: Exception. Sets service outcome to ERROR and notes exception as details. """ self.outcome = Outcome.ERROR self.details = exc
# FBDP server only
[docs] def handle_accept_client(self, channel: Channel, session: FBDPSession) -> None: """Event handler executed when client connects to the data pipe via OPEN message. Arguments: channel: Channel associated with data pipe. session: Session associated with client. The session attributes `~.FBDPSession.pipe`, `~.FBDPSession.socket`, `~.FBDPSession.data_format` and `~.FBDPSession.params` contain information sent by client, and the event handler validates the request. If request should be rejected, it raises the `.StopError` exception with `~Error.code` attribute containing the `~saturnin.protocol.fbdp.ErrorCode` to be returned in CLOSE message. Important: Base implementation validates pipe identification and pipe socket, and converts data format from string to MIME (in session). The descendant class that overrides this method must call `super` as first action. """ if session.pipe != self.pipe: raise StopError(f"Unknown data pipe '{session.pipe}'", code = ErrorCode.PIPE_ENDPOINT_UNAVAILABLE) # We're server, so clients can only attach to our server_socket if session.socket is not self.server_socket: raise StopError(f"'{session.socket}' socket not available", code = ErrorCode.PIPE_ENDPOINT_UNAVAILABLE) # We work with MIME formats, so we'll convert the format specification to MIME session.data_format = MIME(session.data_format)
[docs] def handle_schedule_ready(self, channel: Channel, session: FBDPSession) -> None: """Event handler executed in order to send the READY message to the client later. Arguments: channel: Channel associated with data pipe. session: Session associated with client. The event handler may cancel the transmission by raising the `.StopError` exception with `~Error.code` attribute containing the `~saturnin.protocol.fbdp.ErrorCode` to be returned in CLOSE message. Important: The base implementation schedules `~.FBDPServer.resend_ready()` according to `.ready_schedule_interval` configuration option. """ self.schedule(partial(cast(FBDPServer, channel.protocol).resend_ready, channel, session), self.ready_schedule_interval)
# FBDP common
[docs] def handle_produce_data(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None: """Event handler executed to store data into outgoing DATA message. Arguments: channel: Channel associated with data pipe. session: Session associated with client. msg: DATA message that will be sent to client. The event handler must store the data in `msg.data_frame` attribute. It may also set ACK-REQUEST flag and `type_data` attribute. The event handler may cancel the transmission by raising the `.StopError` exception with `code` attribute containing the `~saturnin.protocol.fbdp.ErrorCode` to be returned in CLOSE message. Note: To indicate end of data, raise StopError with ErrorCode.OK code. Exceptions are handled by protocol, but only StopError is checked for protocol ErrorCode. As we want to report INVALID_DATA properly, we have to convert UnicodeError into StopError. Important: The base implementation simply raises `.StopError` with `ErrorCode.OK` code, so the descendant class must override this method without `super` call. """ raise StopError('OK', code=ErrorCode.OK)
[docs] def handle_accept_data(self, channel: Channel, session: FBDPSession, data: bytes) -> None: """Event handler executed to process data received in DATA message. Arguments: channel: Channel associated with data pipe. session: Session associated with client. data: Data received from client. The event handler may cancel the transmission by raising the `.StopError` exception with `~Error.code` attribute containing the `~saturnin.protocol.fbdp.ErrorCode` to be returned in CLOSE message. Note: The ACK-REQUEST in received DATA message is handled automatically by protocol. Important: The base implementation simply raises `.StopError` with `ErrorCode.OK` code, so the descendant class must override this method without `super` call. """ raise StopError('OK', code=ErrorCode.OK)
[docs] def handle_pipe_closed(self, channel: Channel, session: FBDPSession, msg: FBDPMessage, exc: Exception | None=None) -> None: """Event handler executed when CLOSE message is received or sent, to release any resources associated with current transmission. Arguments: channel: Channel associated with data pipe. session: Session associated with peer. msg: Received/sent CLOSE message. exc: Exception that caused the error. Important: The base implementation does only two actions: - If exception is provided, sets service execution outcome to ERROR and notes exception in details. - Stops the service if `.stop_on_close` is True. The descendant class that overrides this method must call `super`. """ # FDBP converts exceptions raised in our event handler to CLOSE messages, so # here is the central place to handle errors in data pipe processing. # Note problem in service execution outcome if exc is not None: self.outcome = Outcome.ERROR self.details = exc # if self.stop_on_close: self.stop.set()
[docs] class DataProviderMicro(BaseDataPipeMicro): """Base data provider microservice (PRODUCER). This class specializes `BaseDataPipeMicro` for services that produce data and send it over an FBDP pipe. Descendant classes should override: - `~.BaseDataPipeMicro.handle_accept_client` to validate client requests and acquire resources associated with the pipe. - `~.BaseDataPipeMicro.handle_produce_data` to generate and provide the data for outgoing `DATA` messages. - `~.BaseDataPipeMicro.handle_pipe_closed` to release resources associated with the pipe. """
[docs] def initialize(self, config: DataProviderConfig) -> None: """Verify configuration and assemble component structural parts. """ super().initialize(config) self.server_socket = PipeSocket.OUTPUT # High water mark optimization chn: Channel = self.mngr.channels[PIPE_CHN] chn.sock_opts['rcvhwm'] = 5 chn.sock_opts['sndhwm'] = int(self.batch_size / 2) + 5
[docs] class DataConsumerMicro(BaseDataPipeMicro): """Base data provider microservice. Descendant classes should override: - `~.BaseDataPipeMicro.handle_accept_client` to validate client request and aquire resources associated with pipe. - `~.BaseDataPipeMicro.handle_accept_data` to process received data. - `~.BaseDataPipeMicro.handle_pipe_closed` to release resource assiciated with pipe. """
[docs] def initialize(self, config: DataConsumerConfig) -> None: """Verify configuration and assemble component structural parts. """ super().initialize(config) self.server_socket = PipeSocket.INPUT # High water mark optimization chn: Channel = self.mngr.channels[PIPE_CHN] chn.sock_opts['rcvhwm'] = int(self.batch_size / 2) + 5 chn.sock_opts['sndhwm'] = 5