Source code for saturnin._scripts.repl

# SPDX-FileCopyrightText: 2022-present The Firebird Projects <www.firebirdsql.org>
#
# SPDX-License-Identifier: MIT
#
#   PROGRAM/MODULE: saturnin
#   FILE:           saturnin/_scripts/repl.py
#   DESCRIPTION:    REPL for Typer application
#   CREATED:        05.08.2022
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Initial code is based on https://github.com/click-contrib/click-repl
#
# Contributor(s): Pavel Císař (initial code)
#                 ______________________________________
# pylint: disable=W0212

"""REPL for Typer application
"""

from __future__ import annotations
from typing import Dict, Any, List, TextIO, Callable, Optional
from pathlib import Path
from operator import attrgetter
import shlex
import sys
from io import StringIO
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.history import FileHistory
from prompt_toolkit.shortcuts import prompt
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.auto_suggest import AutoSuggestFromHistory
import click
from click.exceptions import Exit as ClickExit, Abort as ClickAbort
from rich.console import Console
from saturnin.base import RestartError, RESTART, directory_scheme
from saturnin.lib.console import console as cm, FORCE_TERMINAL

EchoCallback = Callable[[str], None]

#: Prompt-toolkit key bindings
kb = KeyBindings()

@kb.add('c-space')
def _(event):
    " Initialize autocompletion, or select the next completion. "
    buff = event.app.current_buffer
    if buff.complete_state:
        buff.complete_next()
    else:
        buff.start_completion(select_first=False)

[docs] class CustomClickCompleter(Completer): """Custom prompt-toolkit completer. It provides command completion for Typer/Click commands and parameters, including option/parameter values. Arguments: cli: Root Typer command group """ def __init__(self, cli): self.cli = cli
[docs] def get_completions(self, document, complete_event=None): """Yields completion choices. """ # Code analogous to click._bashcomplete.do_complete try: txt = document.text_before_cursor in_help: bool = txt.startswith('?') if in_help: txt = txt[1:] args = shlex.split(txt) except ValueError: # Invalid command, perhaps caused by missing closing quotation. return cursor_within_command = (document.text_before_cursor.rstrip() == document.text_before_cursor) if args and cursor_within_command: # We've entered some text and no space, give completions for the # current word. incomplete = args.pop() else: # We've not entered anything, either at all or for the current # command, so give all relevant completions for this context. incomplete = "" ctx = click.shell_completion._resolve_context(self.cli, {}, "", args) if ctx is None: return last_arg = args[-1] if args else '' choices = [] stop: bool = in_help if isinstance(ctx.command, click.MultiCommand): # Completion is list of commands at given context level if not args: choices.append(Completion('quit', -len(incomplete), display_meta="Quit Saturnin console")) for name in ctx.command.list_commands(ctx): command = ctx.command.get_command(ctx, name) if not command.hidden: choices.append(Completion(str(name),-len(incomplete), display_meta=command.get_short_help_str())) stop = stop or choices if not stop: # First check whether we're entering value for option. for param in ctx.command.params: if (isinstance(param, click.Option) and not param.is_flag and (last_arg in param.opts or last_arg in param.secondary_opts)): # Completion are possible values for last option, if applicable if isinstance(param.type, click.Choice): for choice in param.type.choices: choices.append(Completion(str(choice), -len(incomplete))) else: choices.extend(Completion(str(item.value), -len(incomplete), display_meta=item.help) for item in param.shell_complete(args, incomplete)) stop = True # Do not continue even if we don't have choices! stop = stop or choices if not stop: # We're looking for possible argument values or option # First we build list of already processed options and arguments... not_processed_params = [] for param in ctx.command.params: if isinstance(param, click.Option): if ctx.params[param.name] == param.default: not_processed_params.append(param) if not incomplete.startswith('-'): for param in ctx.command.params: if isinstance(param, click.Argument): if (param.nargs == 1) and (ctx.params[param.name] == param.default): not_processed_params.append(param) break elif param.nargs == -1: not_processed_params.append(param) break # for param in not_processed_params: if isinstance(param, click.Option): # Completion is list of options for options in (param.opts, param.secondary_opts): for opt in options: choices.append(Completion(str(opt), -len(incomplete), display_meta=param.help)) elif isinstance(param, click.Argument): # Completion are values for argument, if applicable if isinstance(param.type, click.Choice): for choice in param.type.choices: choices.append(Completion(str(choice), -len(incomplete), display_meta=param.help)) else: choices.extend(Completion(str(item.value), -len(incomplete), display_meta=item.help if item.help else param.help) for item in param.shell_complete(args, incomplete)) stop = stop or choices choices.sort(key=attrgetter('text')) for item in choices: if item.text.startswith(incomplete): yield item
[docs] class IOManager: # pylint: disable=R0902 """REPL I/O manager. Handles command prompt, stdin/stdout redirection etc. Arguments: context: Current Click context echo: Callback called with command line before it's executed. console: Costom Rich console for output. If not provided, Saturnin standard console is used. """ def __init__(self, context, *, echo: Optional[EchoCallback]=None, console: Console=None): self.console: Console = cm.std_console if console is None else console self.html_output: bool = False self.output_file: TextIO = None self.output_filename: Path = None self.echo: Optional[EchoCallback] = echo self.run_commands: List[str] = [] self.isatty: bool = sys.stdin.isatty() self.saved_stdin = sys.stdin self.saved_stdout = sys.stdout self.pipe_in = StringIO() self.pipe_out = StringIO() self.prompt_kwargs: Dict[str, Any] = {} group_ctx = context.parent or context defaults = { 'history': FileHistory(str(directory_scheme.history_file)), 'completer': CustomClickCompleter(group_ctx.command), 'message': '> ', 'key_bindings': kb, 'auto_suggest': AutoSuggestFromHistory() } for key, default_value in defaults.items(): if key not in self.prompt_kwargs: self.prompt_kwargs[key] = default_value # self.cmd_queue = [] def __enter__(self) -> IOManager: return self def __exit__(self, exc_type, exc_value, traceback) -> None: sys.stdin = self.saved_stdin sys.stdout = self.saved_stdout self.restore_console() def _is_internal_cmd(self, cmd: str) -> bool: cmd = cmd.rstrip().split(' ')[0] if cmd.lower() in ['help', 'quit']: return True if cmd.startswith('?'): return True return False def _get_next_cmd(self) -> str: command = self.cmd_queue.pop(0) self.pipe_in = self.pipe_out self.pipe_in.seek(0) sys.stdin = self.pipe_in if self.cmd_queue: self.pipe_out = StringIO() sys.stdout = self.pipe_out else: sys.stdout = self.saved_stdout return command
[docs] def _get_command(self) -> str: "Returns next command fetched from queue, stdin or console prompt." sys.stdin = self.saved_stdin sys.stdout = self.saved_stdout self.pipe_out = StringIO() if self.run_commands: command = self.run_commands.pop(0) elif not self.isatty: command = sys.stdin.readline() else: command = prompt(**self.prompt_kwargs) return command
[docs] def get_command(self) -> str: "Returns next command." if self.cmd_queue: return self._get_next_cmd() command = self._get_command() if self.echo and command.strip(): self.echo(command) sys.stdin = self.saved_stdin sys.stdout = self.saved_stdout return command
[docs] def reset_queue(self) -> None: "Clear command queue" i = len(self.cmd_queue) self.cmd_queue.clear() sys.stdin = self.saved_stdin sys.stdout = self.saved_stdout if i > 0: self.console.print(f'Remaining {i} command(s) not executed')
[docs] def redirect_console(self, filename: Path) -> None: """Redirects console output to file. Arguments: filename: File for console output. """ if self.output_file is not None: self.output_file.close() self.output_file = filename.open(mode='w', encoding='utf8') self.output_filename = filename self.html_output = filename.suffix.startswith('.htm') self.console = Console(file=self.output_file, width=5000, force_terminal=FORCE_TERMINAL, emoji=False, record=self.html_output)
[docs] def restore_console(self) -> None: "Closes the output file and restores output to console." if self.output_file is not None: self.output_file.close() self.output_file = None if self.html_output: self.console.save_html(self.output_filename) self.console = cm.std_console
[docs] def repl(context, ioman: IOManager) -> bool: # pylint: disable=R0912 """ Start an interactive shell. All subcommands are available in it. Arguments: context: Current Click context. ioman: IOManager instance. Returns: True if REPL should be restarted, otherwise returns False. If stdin is not a TTY, no prompt will be printed, but commands are read from stdin. """ group_ctx = context.parent or context group_ctx.info_name = '' group = group_ctx.command group.params.clear() while True: try: command = ioman.get_command() except KeyboardInterrupt: continue except EOFError: break if not command: if ioman.isatty: continue break # Internal commands if ioman._is_internal_cmd(command): cmd = command.rstrip() if cmd.lower() == 'help': command = '--help' elif cmd.startswith('?'): command = cmd[1:] + ' --help' elif cmd.lower() == 'quit': break # Special commands for cmd in ('pip ', 'install package ', 'uninstall package '): if command.startswith(cmd): command = command[:len(cmd)] + ' -- ' + command[len(cmd):] break try: args = shlex.split(command) ctx = click.shell_completion._resolve_context(group, {}, "", args) except ValueError as exc: ioman.console.print(f"{type(exc).__name__}: {exc}") continue try: with group.make_context(None, args, parent=group_ctx) as ctx: result = group.invoke(ctx) #ctx.exit() if result is RESTART: raise RestartError except click.ClickException as exc: exc.show() ioman.reset_queue() except ClickExit as exc: pass #sys.exit(exc.exit_code) except ClickAbort as exc: cm.print_error("Aborted!") sys.exit(1) except SystemExit: pass except RestartError: return True except Exception as exc: # pylint: disable=W0703 cm.print_error(f"{exc.__class__.__name__}:{exc!s}") return False