Source code for saturnin._scripts.commands.daemons

# SPDX-FileCopyrightText: 2023-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
# PROGRAM/MODULE: saturnin
# FILE:           saturnin/_scripts/commands/daemons.py
# DESCRIPTION:    Saturnin daemon commands
# CREATED:        16.02.2023
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2023 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
#                 ______________________________________

"""Typer commands for managing Saturnin daemon processes.

This module provides CLI commands to list currently running Saturnin daemons,
show detailed information about a specific daemon process, and stop a running
daemon. Daemons are typically Saturnin recipes executed in `DAEMON` mode.
"""

from __future__ import annotations

import datetime
from typing import Annotated, Final

import psutil
import typer
from rich import box
from rich.table import Table
from rich.text import Text
from saturnin.base import directory_scheme
from saturnin.component.recipe import RecipeInfo, recipe_registry
from saturnin.lib import daemon
from saturnin.lib.console import console

#: Typer command group for daemon management commands
app = typer.Typer(rich_markup_mode="rich", help="Saturnin daemons.")

NOT_AVAILABLE: Final[str] = '[dim]N/A[/dim]'

[docs] def get_first_line(text: str) -> str: """Returns the first non-empty line from the input string. Args: text: The string from which to extract the first line. Returns: The first line of the string, stripped of leading/trailing whitespace. Returns an empty string if the input is empty or only whitespace. """ if not text: return "" return text.strip().split('\n', 1)[0]
[docs] def get_running_daemons() -> dict[int, str]: """Scans Saturnin's PID directory to identify running daemon processes. It checks each `.pid` file, reads the associated recipe name, and verifies if the process with that PID is still running using `psutil`. If a PID file points to a non-existent process, the stale PID file is removed. Returns: A dictionary where keys are process IDs (int) of running daemons, and values are the corresponding `RecipeInfo` objects. If a recipe associated with a running daemon is no longer installed, the value will be `None`. """ result = {} for pid_file in directory_scheme.pids.glob('*.pid'): recipe_name = pid_file.read_text() pid = int(pid_file.stem) if psutil.pid_exists(pid): result[pid] = recipe_registry.get(recipe_name) else: pid_file.unlink() # Stale PID file return result
[docs] def pid_completer(ctx, args, incomplete) -> list[tuple[str, str]]: #noqa: ARG001 """Click/Typer autocompletion function for running Saturnin daemon PIDs. Provides a list of (PID, recipe_name) tuples for autocompletion suggestions. Args: ctx: The Click/Typer context. args: The current command arguments. incomplete: The partially typed PID. Returns: A list of tuples, where each tuple contains the PID as a string and the associated recipe name (or an empty string if the recipe is unknown). """ return [(str(pid), recipe.name if recipe else "") for pid, recipe in get_running_daemons().items()]
def format_size(size: int, decimals: int=2, *, binary_system: bool=True) -> str: """Formats a size in bytes into a human-readable string (e.g., KiB, MB, GB). Args: size: The size in bytes. decimals: The number of decimal places for the formatted size. binary_system: If True (default), uses binary prefixes (KiB, MiB). If False, uses decimal prefixes (kB, MB). Returns: A string representing the human-readable size. """ if binary_system: units = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB'] largest_unit = 'YiB' step = 1024 else: units = ['B', 'kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB'] largest_unit = 'YB' step = 1000 for unit in units: if size < step: return ('%.' + str(decimals) + 'f %s') % (size, unit) size /= step return ('%.' + str(decimals) + 'f %s') % (size, largest_unit)
[docs] @app.command() def list_daemons() -> None: """Lists all currently running Saturnin daemon processes. Displays a table with the PID, status, associated recipe name, and a brief description of the recipe for each running daemon. If a daemon's recipe has been uninstalled, it's marked accordingly. """ daemons = get_running_daemons() # if daemons: table = Table(title='Running Saturnin Daemons', box=box.ROUNDED) table.add_column('PID', style='cyan') table.add_column('Status', style='bold yellow') table.add_column('Recipe', style='green', overflow="fold") table.add_column('Description', style='white', overflow="fold") # Sort by PID for consistent listing sorted_daemons = sorted(daemons.items(), key=lambda item: item[0]) recipe: RecipeInfo for pid, recipe in sorted_daemons: try: proc_status = psutil.Process(pid).status() except psutil.NoSuchProcess: proc_status = "Exited" # Should have been caught by get_running_daemons except psutil.AccessDenied: proc_status = "Access Denied" if recipe is None: table.add_row(str(pid), proc_status, '[warning]UNKNOWN RECIPE[/warning]', "Recipe used by this daemon was uninstalled or is missing.") else: table.add_row(str(pid), proc_status, recipe.name, get_first_line(recipe.description)) console.print(table) else: console.print("No Saturnin daemons are currently running (or tracked by PID files).")
[docs] @app.command() def show_daemon( pid: Annotated[int, typer.Argument(metavar='PID', autocompletion=pid_completer, help="The Process ID of the daemon to inspect")] ) -> None: """Displays detailed information about a running Saturnin daemon process. Information includes process status, creation time, resource usage (CPU, memory, I/O), command line, and associated user. """ try: if not psutil.pid_exists(pid): console.print_error(f"Process with PID [item]{pid}[/] not found.") return proc = psutil.Process(pid) # Check if it's a Saturnin daemon (optional, but good for context) recipe_name = "Unknown" pid_file_path = directory_scheme.pids / f"{pid}.pid" if pid_file_path.exists(): recipe_name = pid_file_path.read_text().strip() data = proc.as_dict(ad_value='N/A') # Use ad_value for graceful missing attrs except psutil.NoSuchProcess: console.print_error(f"Process with PID [item]{pid}[/] disappeared before details could be fetched.") return except psutil.AccessDenied: console.print_error(f"Access denied when trying to get details for PID [item]{pid}[/]. " "Try running with higher privileges.") return except Exception as e: console.print_error(f"An error occurred while fetching details for PID [item]{pid}[/]: {e}") return created = datetime.datetime.fromtimestamp(data['create_time']) run_time = datetime.datetime.now() - created panel_title = f" Details for Daemon PID [important]{pid}[/important]" if recipe_name != "Unknown": panel_title += f" (Recipe: [green]{recipe_name}[/green])" table = Table(title=panel_title, box=box.ROUNDED, show_header=False, title_justify='left') table.add_column('', style='green', overflow='fold') table.add_column('', overflow='fold') # table.add_row('Status:', Text(str(data.get('status', NOT_AVAILABLE)), style='important')) table.add_row('Created:', created.strftime("%Y-%m-%d %H:%M:%S")) table.add_row('Run Time:', str(run_time).split('.')[0]) # Remove microseconds for brevity table.add_row('Name:', str(data.get('name', NOT_AVAILABLE))) table.add_row('Executable:', str(data.get('exe', NOT_AVAILABLE))) cmdline_str = ' '.join(data.get('cmdline', [])) if data.get('cmdline') else NOT_AVAILABLE table.add_row('Cmd. Line:', cmdline_str) table.add_row('CWD:', str(data.get('cwd', NOT_AVAILABLE))) table.add_row('User:', str(data.get('username', NOT_AVAILABLE))) table.add_row('# Threads:', str(data.get('num_threads', NOT_AVAILABLE))) if 'num_handles' in data: # Windows-specific table.add_row('# Handles:', str(data.get('num_handles', NOT_AVAILABLE))) if 'num_fds' in data: # POSIX-specific table.add_row('# File Descriptors:', str(data.get('num_fds', NOT_AVAILABLE))) children = data.get('children', []) if children: # psutil.Process.children() needs to be called explicitly usually child_procs = proc.children() if child_procs: parts = [] line_sep = Text('\n') for child in child_procs: try: parts.append(Text.assemble((str(child.pid), 'number'), ':', child.name(), ' - ', (child.status(), 'important'))) except (psutil.NoSuchProcess, psutil.AccessDenied): parts.append(Text.assemble((str(child.pid), 'number'), ': N/A (No access or exited)')) table.add_row('Children:', line_sep.join(parts)) connections = data.get('net_connections') if connections is not None: # Check if connections info was obtainable table.add_row('# INET Con.:', Text(str(len(connections))), end_section=True) else: table.add_row('# INET Con.:', NOT_AVAILABLE) cpu_times = data.get('cpu_times') if cpu_times: table.add_row('CPU User Time:', Text(f"{cpu_times.user:.2f}s")) table.add_row('CPU System Time:', Text(f"{cpu_times.system:.2f}s")) has_iowait = hasattr(cpu_times, 'iowait') and cpu_times.iowait is not None if has_iowait: table.add_row('CPU I/O Wait:', Text(f"{cpu_times.iowait:.2f}s"), end_section=True) else: table.add_row('', '', end_section=True) # Just to close the section visually else: table.add_row('CPU Times:', NOT_AVAILABLE) memory_info = data.get('memory_info') if memory_info: table.add_row('Memory RSS:', Text(format_size(memory_info.rss))) table.add_row('Memory VMS:', Text(format_size(memory_info.vms)), end_section=True) else: table.add_row('Memory Info:', NOT_AVAILABLE) io_counters = data.get('io_counters') if io_counters: table.add_row('I/O Read Count:', Text(f"{io_counters.read_count:,}")) table.add_row('I/O Write Count:', Text(f"{io_counters.write_count:,}")) table.add_row('I/O Bytes Read:', Text(format_size(io_counters.read_bytes))) table.add_row('I/O Bytes Written: ', Text(format_size(io_counters.write_bytes)), end_section=True) else: table.add_row('I/O Counters:', NOT_AVAILABLE) console.print(table)
[docs] @app.command() def stop_daemon( pid: Annotated[int, typer.Argument(metavar='PID', autocompletion=pid_completer, help="The Process ID of the daemon to stop")] ) -> None: """Stops a running Saturnin daemon process. This command attempts a graceful shutdown by sending the appropriate signal (SIGINT on Unix, CTRL_C_EVENT on Windows) to the daemon. """ #if not psutil.pid_exists(pid): #console.print_error("Process not found") #return #daemon.stop_daemon(pid) try: if not psutil.pid_exists(pid): console.print_error(f"Process with PID [item]{pid}[/] not found.") return psutil.Process(pid) # To confirm it's a real process except psutil.NoSuchProcess: console.print_error(f"Process with PID [item]{pid}[/] not found (or exited just now).") return except psutil.AccessDenied: console.print_error(f"Access denied when checking PID [item]{pid}[/]. Cannot proceed to stop.") return console.print(f"Attempting to stop daemon with PID [item]{pid}[/]... ", end="") try: daemon.stop_daemon(pid) # This calls `saturnin-daemon stop <pid>` # `stop_daemon` will raise an error if `saturnin-daemon stop` fails or times out. except Exception: console.print('[warning]ERROR') console.print_exception(show_locals=False) # Show traceback for the error from stop_daemon else: console.print('[ok]OK')