Source code for saturnin.component.micro

# SPDX-FileCopyrightText: 2019-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: saturnin
# FILE:           saturnin/component/micro.py
# DESCRIPTION:    Base module for implementation of Firebird Butler Microservices
# CREATED:        22.4.2019
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2019 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
#                 ______________________________________.

"""Saturnin base module for implementation of Firebird Butler Microservices.

This module provides the `MicroService` base class, which handles common
microservice lifecycle management, communication with a controller via ICCP,
and a basic event loop for handling ZMQ messages and scheduled tasks.
"""

from __future__ import annotations

import os
import platform
import threading
import uuid
from collections.abc import Callable
from contextlib import suppress
from heapq import heappop, heappush
from time import monotonic_ns
from typing import Final, cast
from weakref import proxy

import zmq
from saturnin.base import (
    Channel,
    ChannelManager,
    Component,
    ComponentConfig,
    ConfigProto,
    Direction,
    Outcome,
    PairChannel,
    PeerDescriptor,
    PrioritizedItem,
    ServiceDescriptor,
    ServiceError,
    State,
    ZMQAddress,
)
from saturnin.protocol.iccp import ICCPComponent

from firebird.base.trace import TracedMixin
from firebird.base.types import conjunctive

#: Service control channel name
SVC_CTRL: Final[str] = 'iccp'

[docs] class MicroService(Component, TracedMixin, metaclass=conjunctive): """Saturnin Component for Firebird Butler Microservices. Arguments: zmq_context: ZeroMQ Context. descriptor: Service descriptor. peer_uid: Peer ID, `None` means that newly generated UUID type 1 should be used. """ def __init__(self, zmq_context: zmq.Context, descriptor: ServiceDescriptor, *, peer_uid: uuid.UUID | None=None): self._heap: list = [] #: Service execution outcome self.outcome: Outcome = Outcome.UNKNOWN #: Service execution outcome details self.details: Exception | list[str] = None #: Service internal state self.state: State = State.UNKNOWN_STATE #: Event to stop the component self.stop: threading.Event = threading.Event() #: ChannelManager instance. self.mngr: ChannelManager = ChannelManager(zmq_context) #: Dictionary with endpoints to which the component binds. #: Key is channel name, value is list of ZMQAddress instances. #: Initially empty. self.endpoints: dict[str, list[ZMQAddress]] = {} #: Service desriptor. self.descriptor: ServiceDescriptor = descriptor #: Peer descriptor for this component. self.peer: PeerDescriptor = PeerDescriptor(uuid.uuid1() if peer_uid is None else peer_uid, os.getpid(), platform.node())
[docs] def handle_stop_component(self, exc: Exception | None=None) -> None: """ICCP event handler. Called when commponent should stop its operation. It stops the component by setting the `~Component.stop` event. Arguments: exc: Exception that describes the reason why component should stop. If not provided, the component should stop on controller's request. """ if exc is not None: self.outcome = Outcome.ERROR self.details = exc self.stop.set()
[docs] def handle_config_request(self, config: ConfigProto) -> None: """ICCP event handler. Called when controller requested reconfiguration. Must raise an exception if configuration fails for any reason. By default, the component does not support run-time configuration, so it raises `NotImplementedError`. Arguments: config: New configuration provided by controller. """ raise NotImplementedError("Service does not support run-time configuration")
[docs] def schedule(self, action: Callable, after: int) -> None: """Schedule action to be executed after specified time. Action is executed in `run()` main I/O loop not sooner than after specified number of milliseconds from time when `schedule()` is called. However, the actual delay could be longer than specified (depends on time spent in message handlers and other factors). Arguments: action: Callable (without arguments) to be executed. Use `functools.partial` if callable requires arguments. after: Delay in milliseconds. """ heappush(self._heap, PrioritizedItem(monotonic_ns() + (after * 1000000), action))
[docs] def get_timeout(self) -> int: """Returns the timeout in milliseconds until the next scheduled action. If no actions are scheduled, it returns 1000ms (1 second) as a default polling interval. """ if not self._heap: return 1000 back = [] i = len(self._heap) now = monotonic_ns() while self._heap and (item := heappop(self._heap)).priority < now: back.append(item) if len(back) != i: heappush(self._heap, item) for value in back: heappush(self._heap, value) return max(int((item.priority - now) / 1000000), 0)
[docs] def run_scheduled(self) -> None: """Executes any scheduled actions whose execution time (priority) is at or before the current monotonic time. """ while self._heap: item = heappop(self._heap) if item.priority < monotonic_ns(): item.item() else: heappush(self._heap, item) break
[docs] def initialize(self, config: ComponentConfig) -> None: """Verify configuration and assemble component structural parts. Arguments: config: Service configuration. """ config.validate() # Fail early! if config.logging_id.value is not None: self._agent_name_ = config.logging_id.value
[docs] def bind_endpoints(self) -> None: """Binds all ZMQ endpoints defined in `.endpoints` using the respective channels from `.mngr`. """ for name, addr_list in self.endpoints.items(): chn: Channel = self.mngr.channels.get(name) for i, addr in enumerate(addr_list): #self.endpoints[name][i] = chn.bind(addr) addr_list[i] = chn.bind(addr)
[docs] def aquire_resources(self) -> None: """Acquire resources required by component (e.g., open files, connect to other services). This method is called during the warm-up phase after basic initialization. Implementations should raise an exception if resource acquisition fails. """
[docs] def release_resources(self) -> None: """Release resources acquired by the component (e.g., close files, disconnect from other services). This method is called during the graceful shutdown phase. """
[docs] def start_activities(self) -> None: """Start normal component activities. Must raise an exception when start fails. """
[docs] def stop_activities(self) -> None: """Stop component activities. """
[docs] def warm_up(self, ctrl_addr: ZMQAddress | None) -> None: """Performs the warm-up sequence for the microservice. This includes: - Setting up the ICCP control channel if `ctrl_addr` is provided. - Warming up the `.ChannelManager` to create ZMQ sockets. - Connecting to the controller via ICCP. - Binding all service-defined endpoints via `.bind_endpoints()`. - Acquiring necessary resources via `aquire_resources()`. - Starting normal component activities via `.start_activities()`. - Sending a `READY` message to the controller upon success, or an `ERROR` message if any warm-up step fails. Arguments: ctrl_addr: The ZMQ address of the controller's control channel. If `None`, the component runs without ICCP communication (e.g., standalone). """ if ctrl_addr is not None: # Service control channel iccp = ICCPComponent(with_traceback=__debug__) iccp.on_stop_component = self.handle_stop_component iccp.on_config_request = self.handle_config_request chn: PairChannel = self.mngr.create_channel(PairChannel, SVC_CTRL, iccp, wait_for=Direction.IN, sock_opts={'rcvhwm': 5, 'sndhwm': 5,}) self.mngr.warm_up() if ctrl_addr is not None: chn.connect(ctrl_addr) if not chn.can_send(): raise ServiceError("Broken component control channel") try: self.bind_endpoints() self.aquire_resources() self.start_activities() except Exception as exc: if ctrl_addr is not None: chn.send(cast(ICCPComponent, chn.protocol).error_msg(exc), chn.session) self.mngr.shutdown() raise else: if ctrl_addr is not None: chn.send(cast(ICCPComponent, chn.protocol).ready_msg(self.peer, self.endpoints), chn.session) self.state = State.READY
[docs] def run(self) -> None: """The main execution loop for the microservice. This loop continuously waits for I/O events on managed channels and processes them in a prioritized order: 1. Messages on the ICCP control channel (if connected). 2. Output-ready events on other channels. 3. Input-ready events on other channels. After processing I/O, it executes any due scheduled actions via `.run_scheduled()`. The loop terminates when `.stop` event is set. Upon termination, it performs a graceful shutdown sequence: `.stop_activities()`, `.release_resources()`, sends a `FINISHED` (or `ERROR`) message to the controller, and shuts down the `.ChannelManager`. """ self.state = State.RUNNING ctrl_chn: PairChannel = self.mngr.channels.get(SVC_CTRL) try: while not self.stop.is_set(): events = self.mngr.wait(self.get_timeout()) if events: # Messages from service control channel have top priority if ctrl_chn in events: ctrl_chn.receive() if self.stop.is_set(): continue # stop quickly # Channels waiting for output have precedence if self.mngr.has_pollout(): for chn, event in events.items(): if Direction.OUT in event: chn.on_output_ready(chn) # Now process incomming messages for chn, event in events.items(): if Direction.IN in event: chn.receive() # Now it's time for scheduled actions self.run_scheduled() # Gracefully stop the service self.state = State.STOPPED self.stop_activities() self.release_resources() if self.outcome is Outcome.UNKNOWN: self.outcome = Outcome.OK ctrl_chn.send(cast(ICCPComponent, ctrl_chn.protocol).finished_msg(self.outcome, self.details), ctrl_chn.session) self.mngr.shutdown() self.state = State.FINISHED except Exception as exc: self.state = State.ABORTED with suppress(Exception): # try send report to controller ctrl_chn.send(cast(ICCPComponent, ctrl_chn.protocol).error_msg(exc), ctrl_chn.session) with suppress(Exception): self.mngr.shutdown(forced=True)