# SPDX-FileCopyrightText: 2023-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: saturnin
# FILE: saturnin/component/single.py
# DESCRIPTION: Single service controller and executor
# CREATED: 10.2.2023
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2023 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
# ______________________________________
"""Simplified controller and executor for running a single Saturnin service.
This module provides `.SingleController` and `.SingleExecutor` classes to
streamline the process of configuring, starting, and managing a single
Saturnin service instance. It acts as a higher-level abstraction over
the more detailed `.DirectController` and `.ThreadController` implementations,
allowing for easier integration or standalone execution of services.
"""
from __future__ import annotations
import uuid
import warnings
from configparser import DEFAULTSECT, ConfigParser
from pathlib import Path
from typing import Self
import zmq
from saturnin.base import (
SECTION_LOCAL_ADDRESS,
SECTION_NET_ADDRESS,
SECTION_NODE_ADDRESS,
SECTION_PEER_UID,
SECTION_SERVICE,
SECTION_SERVICE_UID,
ChannelManager,
Error,
)
from firebird.base.config import EnvExtendedInterpolation
from firebird.base.logging import FStrMessage as _m
from firebird.base.logging import get_logger
from firebird.base.trace import TracedMixin
from .controller import DirectController, Outcome, ServiceExecConfig, ThreadController
from .registry import service_registry
[docs]
class SingleController(TracedMixin):
"""A controller that manages a single service, allowing it to be executed
either directly in the current thread (using `.DirectController`) or in a
separate thread (using `.ThreadController`).
This class simplifies the setup by managing the underlying controller type
based on the `direct` flag.
Arguments:
parser: Optional `.ConfigParser` instance to be used for service
configuration. If `None`, a new one is created.
manager: Optional `.ChannelManager` to use. If `None`, a new one is
created and managed internally.
direct: If `True`, the service will be run using an internal
`.DirectController`. If `False` (default), a .`ThreadController`
is used.
"""
def __init__(self, *, parser: ConfigParser | None=None, manager: ChannelManager | None=None,
direct: bool=False):
#: Use DirectController instead ThreadController
self.direct: bool = direct
#: Channel manager
self.mngr: ChannelManager = manager
self._ext_mngr: bool = manager is not None
if manager is None:
self.mngr = ChannelManager(zmq.Context.instance())
#: ConfigParser with service configuration
self.config: ConfigParser = \
ConfigParser(interpolation=EnvExtendedInterpolation()) if parser is None else parser
#: Service controller
self.controller: ThreadController | DirectController = None
#: Registry with ServiceDescriptors for services that could be run
#
self.config[SECTION_LOCAL_ADDRESS] = {}
self.config[SECTION_NODE_ADDRESS] = {}
self.config[SECTION_NET_ADDRESS] = {}
self.config[SECTION_SERVICE_UID] = {}
self.config[SECTION_PEER_UID] = {}
# Defaults
self.config[DEFAULTSECT]['here'] = str(Path.cwd())
# Assign Agent IDs for available services
self.config[SECTION_SERVICE_UID].update((sd.name, sd.uid.hex) for sd
in service_registry)
#
[docs]
def start(self, *, timeout: int=10000) -> None:
"""Starts service.
Arguments:
timeout: Timeout for starting the service. `None` (infinity), or a floating
point number specifying a timeout for the operation in seconds (or
fractions thereof) [Default: 10s].
Raises:
ServiceError: On error in communication with service.
TimeoutError: When service does not start in time.
"""
try:
self.controller.configure(self.config, self.controller.name)
self.controller.start(timeout=timeout)
except:
self.stop()
raise
[docs]
def stop(self, *, timeout: int=10000) -> None:
"""Stop runing service.
Arguments:
timeout: Timeout for stopping the service. `None` (infinity), or a floating
point number specifying a timeout for the operation in seconds (or
fractions thereof) [Default: 10s].
Raises:
ServiceError: On error in communication with service.
TimeoutError: When service does not stop in time.
"""
if not self.direct:
try:
self.controller.stop(timeout=timeout)
except Exception as exc:
get_logger(self).error(_m("Error while stopping the service: {args[0]}", args=exc.args))
if self.controller.is_running():
warnings.warn(f"Stopping service {self.controller.name} failed, "
f"service thread terminated", RuntimeWarning)
self.controller.terminate()
[docs]
def join(self, timeout=None) -> None:
"""Wait until service stops.
Arguments:
timeout: Floating point number specifying a timeout for the operation in
seconds (or fractions thereof).
"""
self.controller.join(timeout)
[docs]
class SingleExecutor:
"""A context manager for executing a single Saturnin service.
This class simplifies the lifecycle management of a `SingleController`,
ensuring proper initialization and cleanup (like ZMQ context termination)
when used in a `with` statement.
Arguments:
direct: If `True`, the underlying `.SingleController` will be configured
to run the service directly in the current thread. If `False`
(default), the service runs in a separate thread.
"""
def __init__(self, *, direct: bool = False):
#: Use DirectController instead ThreadController
self.direct: bool = direct
#: Channel manager
self.mngr: ChannelManager = None
#: Controller
self.controller: SingleController = None
def __enter__(self) -> Self:
return self
def __exit__(self, exc_type, exc_value, traceback) -> None:
if self.mngr is not None:
self.mngr.shutdown(forced=True)
zmq.Context.instance().term()
[docs]
def run(self) -> tuple[Outcome, list[str]] | None:
"""Runs the configured service.
If the executor is configured for non-direct (threaded) execution, this
method starts the service, waits for it to join (handling KeyboardInterrupt
for graceful shutdown), and then returns the execution outcome.
If configured for direct execution, this method starts the service, which
will block until it finishes or is interrupted. In direct mode, this
method returns `None` as the outcome is managed within the blocking call.
Returns:
tuple[Outcome, list[str]] | None:
- If not in `direct` mode: A tuple containing the service's
execution `Outcome` and a list of strings with details (e.g., error messages).
- If in `direct` mode: `None`.
"""
self.controller.start()
result = None
if not self.direct:
try:
self.controller.join()
raise KeyboardInterrupt() # This, or direct call to executor.stop()
except KeyboardInterrupt: # SIGINT
self.controller.stop()
finally:
result = (self.controller.controller.outcome, self.controller.controller.details)
return result