#coding:utf-8
#
# PROGRAM/MODULE: saturnin
# FILE: saturnin/lib/data/filter.py
# DESCRIPTION: Base class for Saturnin data filter microservices
# CREATED: 14.12.2020
#
# The contents of this file are subject to the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
# Copyright (c) 2020 Firebird Project (www.firebirdsql.org)
# All Rights Reserved.
#
# Contributor(s): Pavel Císař (original code)
# ______________________________________
# pylint: disable=R0903, R0902, R0915, C0301
"""Saturnin base class for data filter microservices
"""
from __future__ import annotations
from typing import Any, List, Union, cast, Final
from functools import partial
from collections import deque
import uuid
import zmq
from firebird.base.config import (StrOption, EnumOption, IntOption, BoolOption,
ZMQAddressOption, MIMEOption)
from saturnin.base import (Error, StopError, Direction, SocketMode, PipeSocket, Outcome,
ZMQAddress, MIME, ComponentConfig, Message, SimpleMessage, Session, Protocol, Channel,
DealerChannel, PushChannel, PullChannel, ANY, ServiceDescriptor)
from saturnin.component.micro import MicroService
from saturnin.protocol.fbdp import (ErrorCode, FBDPServer, FBDPClient, FBDPSession,
FBDPMessage)
#: Pipe INPUT channel & endpoint name
INPUT_PIPE_CHN: Final[str] = 'input-pipe'
#: Pipe OUTPUT channel & endpoint name
OUTPUT_PIPE_CHN: Final[str] = 'output-pipe'
#: Wake PUSH channel & endpoint name
WAKE_PUSH_CHN: Final[str] = 'wake-push'
#: Wake PULL channel & endpoint name
WAKE_PULL_CHN: Final[str] = 'wake-pull'
[docs]
class DataFilterConfig(ComponentConfig):
"""Base data provider microservice configuration.
Arguments:
name: Conf. section name.
"""
def __init__(self, name: str):
super().__init__(name)
#: When input pipe is closed with error, close output with error as well
self.propagate_input_error: BoolOption = \
BoolOption('propagate_input_error',
"When input pipe is closed with error, close output with error as well",
default=True)
# Input pipe
#: Input Data Pipe Identification
self.input_pipe: StrOption = \
StrOption('input_pipe', "Input Data Pipe Identification", required=True)
#: Input Data Pipe endpoint address
self.input_pipe_address: ZMQAddressOption = \
ZMQAddressOption('input_pipe_address', "Input Data Pipe endpoint address", required=True)
#: Input Data Pipe Mode
self.input_pipe_mode: EnumOption = \
EnumOption('input_pipe_mode', SocketMode, "Input Data Pipe Mode", required=True)
#: Input Pipe data format specification
self.input_pipe_format: MIMEOption = \
MIMEOption('input_pipe_format', "Input Pipe data format specification")
#: Input Pipe Data batch size
self.input_batch_size: IntOption = \
IntOption('input_batch_size', "Input Pipe Data batch size", required=True, default=50)
#: Input Pipe READY message schedule interval in milliseconds
self.input_ready_schedule_interval: IntOption = \
IntOption('input_ready_schedule_interval',
"Input Pipe READY message schedule interval in milliseconds",
required=True, default=1000)
# Output pipe
#: Output Data Pipe Identification
self.output_pipe: StrOption = \
StrOption('output_pipe', "Output Data Pipe Identification", required=True)
#: Output Data Pipe endpoint address
self.output_pipe_address: ZMQAddressOption = \
ZMQAddressOption('output_pipe_address', "Output Data Pipe endpoint address", required=True)
#: Output Data Pipe Mode
self.output_pipe_mode: EnumOption = \
EnumOption('output_pipe_mode', SocketMode, "Output Data Pipe Mode", required=True)
#: Output Pipe data format specification
self.output_pipe_format: MIMEOption = \
MIMEOption('output_pipe_format', "Output Pipe data format specification")
#: Output Pipe Data batch size
self.output_batch_size: IntOption = \
IntOption('output_batch_size', "Output Pipe Data batch size", required=True, default=50)
#: Output Pipe READY message schedule interval in milliseconds
self.output_ready_schedule_interval: IntOption = \
IntOption('output_ready_schedule_interval',
"Output Pipe READY message schedule interval in milliseconds", required=True,
default=1000)
[docs]
def validate(self) -> None:
"""Extended validation.
- `pipe_format` is required for CONNECT `pipe_mode`.
"""
super().validate()
if self.input_pipe_mode.value is SocketMode.CONNECT and self.input_pipe_format.value is None:
raise Error("'input_pipe_format' required for CONNECT pipe mode.")
if self.output_pipe_mode.value is SocketMode.CONNECT and self.output_pipe_format.value is None:
raise Error("'output_pipe_format' required for CONNECT pipe mode.")
[docs]
class DataFilterMicro(MicroService):
"""Base data provider microservice.
Descendant classes should override:
- `.handle_input_accept_client` to validate client request and aquire resources
associated with input pipe.
- `.handle_output_accept_client` to validate client request and aquire resources
associated with output pipe.
- `.handle_output_produce_data` to produce data for outgoing DATA message.
- `.handle_input_accept_data` to process received data.
- `.handle_input_pipe_closed` to release resource assiciated with input pipe.
- `.handle_output_pipe_closed` to release resource assiciated with output pipe.
"""
[docs]
def __init__(self, zmq_context: zmq.Context, descriptor: ServiceDescriptor, *,
peer_uid: uuid.UUID=None):
"""
Arguments:
zmq_context: ZeroMQ Context.
descriptor: Service descriptor.
peer_uid: Peer ID, `None` means that newly generated UUID type 1 should be used.
"""
super().__init__(zmq_context, descriptor, peer_uid=peer_uid)
self.outcome = Outcome.UNKNOWN
self.details = None
#: Data to be sent to output.
self.output: deque = deque()
# Next members are set in initialize()
#: Closing flag
self.closing: bool = False
#: [Configuration] When input pipe is closed with error, close output with error as well
self.propagate_input_error: bool = None
#: [Configuration] Data Pipe Identification
self.input_pipe: str = None
#: [Configuration] Data Pipe Mode
self.input_pipe_mode: SocketMode = None
#: [Configuration] Data Pipe endpoint address
self.input_pipe_address: ZMQAddress = None
#: [Configuration] Pipe data format specification
self.input_pipe_format: MIME = None
#: [Configuration] Data batch size
self.input_batch_size: int = None
#: [Configuration] pipe READY message schedule interval in milliseconds
self.input_ready_schedule_interval: int = None
#: FDBP protocol handler (server or client) for input pipe
self.input_protocol: Union[FBDPServer, FBDPClient] = None
#: Input pipe channel
self.pipe_in_chn: Channel = None
#: [Configuration] Data Pipe Identification
self.output_pipe: str = None
#: [Configuration] Data Pipe Mode
self.output_pipe_mode: SocketMode = None
#: [Configuration] Data Pipe endpoint address
self.output_pipe_address: ZMQAddress = None
#: [Configuration] Pipe data format specification
self.output_pipe_format: MIME = None
#: [Configuration] Data batch size
self.output_batch_size: int = None
#: [Configuration] pipe READY message schedule interval in milliseconds
self.output_ready_schedule_interval: int = None
#: FDBP protocol handler (server or client) for output pipe
self.output_protocol: Union[FBDPServer, FBDPClient] = None
#: Output pipe channel
self.pipe_out_chn: Channel = None
#: Internal AWAKE address
self.wake_address: ZMQAddress = None
#: Internal AWAKE output channel
self.wake_out_chn: PushChannel = None
#: Internal AWAKE input channel
self.wake_in_chn: PullChannel = None
[docs]
def initialize(self, config: DataFilterConfig) -> None:
"""Verify configuration and assemble component structural parts.
Arguments:
config: Service configuration
"""
super().initialize(config)
self.closing = False
# Configuration
self.propagate_input_error = config.propagate_input_error.value
# INPUT pipe
self.input_pipe = config.input_pipe.value
self.input_pipe_mode = config.input_pipe_mode.value
self.input_pipe_address = config.input_pipe_address.value
self.input_pipe_format = config.input_pipe_format.value
self.input_batch_size = config.input_batch_size.value
self.input_ready_schedule_interval = config.input_ready_schedule_interval.value
# Set up FBDP protocol
if self.input_pipe_mode == SocketMode.BIND:
# server
self.input_protocol = FBDPServer()
self.input_protocol.on_exception = self.handle_exception
self.input_protocol.on_accept_client = self.handle_input_accept_client
self.input_protocol.on_schedule_ready = self.handle_input_schedule_ready
# We have an endpoint to bind
self.endpoints[INPUT_PIPE_CHN] = [self.input_pipe_address]
else:
# client
self.input_protocol = FBDPClient()
# common parts
self.input_protocol.batch_size = self.input_batch_size
self.input_protocol.on_pipe_closed = self.handle_input_pipe_closed
self.input_protocol.on_accept_data = self.handle_input_accept_data
self.input_protocol.on_get_data = self.handle_input_get_data
# Create INPUT pipe channel
self.pipe_in_chn = self.mngr.create_channel(DealerChannel, INPUT_PIPE_CHN,
self.input_protocol,
wait_for=Direction.IN)
self.pipe_in_chn.protocol.log_context = self.logging_id
# OUTPUT pipe
self.output_pipe = config.output_pipe.value
self.output_pipe_mode = config.output_pipe_mode.value
self.output_pipe_address = config.output_pipe_address.value
self.output_pipe_format = config.output_pipe_format.value
self.output_batch_size = config.output_batch_size.value
self.output_ready_schedule_interval = config.output_ready_schedule_interval.value
# Set up FBDP protocol
if self.output_pipe_mode == SocketMode.BIND:
# server
self.output_protocol = FBDPServer()
self.output_protocol.on_exception = self.handle_exception
self.output_protocol.on_accept_client = self.handle_output_accept_client
self.output_protocol.on_schedule_ready = self.handle_output_schedule_ready
# We have an endpoint to bind
self.endpoints[OUTPUT_PIPE_CHN] = [self.output_pipe_address]
else:
# client
self.output_protocol = FBDPClient()
# common parts
self.output_protocol.batch_size = self.output_batch_size
self.output_protocol.on_pipe_closed = self.handle_output_pipe_closed
self.output_protocol.on_produce_data = self.handle_output_produce_data
self.output_protocol.on_get_data = self.handle_output_get_data
# Create OUTPUT pipe channel
self.pipe_out_chn = self.mngr.create_channel(DealerChannel, OUTPUT_PIPE_CHN,
self.output_protocol,
wait_for=Direction.IN)
self.pipe_out_chn.protocol.log_context = self.logging_id
# Awake channels
self.wake_address = ZMQAddress(f'inproc://{self.peer.uid.hex}-wake')
wake_protocol = Protocol()
wake_protocol.handlers[ANY] = self.handle_wake_msg
# PUSH wake
self.wake_out_chn = self.mngr.create_channel(PushChannel, WAKE_PUSH_CHN,
wake_protocol)
self.wake_out_chn.protocol.log_context = self.logging_id
# PULL wake
self.wake_in_chn = self.mngr.create_channel(PullChannel, WAKE_PULL_CHN,
wake_protocol,
wait_for=Direction.IN)
self.wake_in_chn.protocol.log_context = self.logging_id
# We have an endpoint to bind
self.endpoints[WAKE_PULL_CHN] = [self.wake_address]
[docs]
def aquire_resources(self) -> None:
"""Aquire resources required by component:
1. Connect wake PUSH channel
2. Connects to input and output data pipes (if necessary).
"""
# Connect wake PUSH
self.wake_out_chn.connect(self.wake_address)
# Connect to the data pipes
# INPUT pipe
if self.input_pipe_mode == SocketMode.CONNECT:
session = self.pipe_in_chn.connect(self.input_pipe_address)
# OPEN the data pipe connection, this also fills session attributes
# We are CONSUMER client, we must attach to server OUTPUT
cast(FBDPClient, self.pipe_in_chn.protocol).send_open(self.pipe_in_chn,
session, self.input_pipe,
PipeSocket.OUTPUT,
self.input_pipe_format)
# OUTPUT pipe
if self.output_pipe_mode == SocketMode.CONNECT:
session = self.pipe_out_chn.connect(self.output_pipe_address)
# OPEN the data pipe connection, this also fills session attributes
# We are PRODUCER client, we must attach to server INPUT
cast(FBDPClient, self.pipe_out_chn.protocol).send_open(self.pipe_out_chn,
session, self.output_pipe,
PipeSocket.INPUT,
self.output_pipe_format)
[docs]
def release_resources(self) -> None:
"""Release resources aquired by component:
1. Disconnect the wake PUSH channel
2. Close all active data input sessions
3. Close all active data output sessions
"""
# Disonnect wake PUSH
for session in list(self.wake_out_chn.sessions.values()):
self.wake_out_chn.discard_session(session)
# CLOSE all active data input pipe sessions
# send_close() will discard session, so we can't iterate over sessions.values() directly
for session in list(self.pipe_in_chn.sessions.values()):
# We have to report error here, because normal is to close pipes before
# shutdown is commenced. Mind that service shutdown could be also caused by error!
cast(FBDPServer, self.pipe_in_chn.protocol).send_close(self.pipe_in_chn, session, ErrorCode.ERROR)
# CLOSE all active data output pipe sessions
# send_close() will discard session, so we can't iterate over sessions.values() directly
for session in list(self.pipe_out_chn.sessions.values()):
# We have to report error here, because normal is to close pipes before
# shutdown is commenced. Mind that service shutdown could be also caused by error!
cast(FBDPServer, self.pipe_out_chn.protocol).send_close(self.pipe_out_chn,
session, ErrorCode.ERROR)
[docs]
def store_output(self, data: Any) -> None:
"""Store data to output queue and send wake notification.
Arguments:
data: Data to be stored to output queue.
"""
self.output.append(data)
msg = SimpleMessage()
msg.data.append(b'wake')
self.wake_out_chn.send(msg, self.wake_out_chn.session)
[docs]
def store_batch_output(self, batch: List) -> None:
"""Store batch of data to output queue and send wake notification.
Arguments:
batch: Data to be stored to output queue.
"""
for data in batch:
self.output.append(data)
if self.output:
msg = SimpleMessage()
msg.data.append(b'wake')
self.wake_out_chn.send(msg, self.wake_out_chn.session)
[docs]
def handle_wake_msg(self, channel: Channel, session: Session, msg: Message) -> None: # pylint: disable=W0613
"""Handler for "data available" pings sent via wake channels.
Arguments:
channel: Channel associated with wake delivery.
session: Session associated with client.
msg: Wake message.
"""
if not self.output:
# Unlikely case when we've got wake but all data were already sent
return
if not self.pipe_out_chn.sessions:
# We need active pipe connection
return
session: FBDPSession = self.pipe_out_chn.session
if session.transmit is not None:
# Transmission in progress, make sure that we will send data
self.pipe_out_chn.set_wait_out(True, session)
elif self.output_pipe_mode is SocketMode.BIND and not session.await_ready:
# We are server without active transmission and READY was not sent yet, so we
# can send READY immediately
cast(FBDPServer, self.pipe_out_chn.protocol)._init_new_batch(self.pipe_out_chn,
session)
[docs]
def handle_exception(self, channel: Channel, session: Session, msg: Message, # pylint: disable=W0613
exc: Exception) -> None:
"""Event handler called by `.handle_msg()` on exception in message handler.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with connection.
msg: Message.
exc: Exception.
Sets service outcome to ERROR and notes exception as details.
"""
if isinstance(exc, StopError):
if getattr(exc, 'code', None) is ErrorCode.OK:
return
self.outcome = Outcome.ERROR
self.details = exc
[docs]
def handle_output_get_data(self, channel: Channel, session: FBDPSession) -> bool: # pylint: disable=W0613
"""Event handler executed to query the data source for data availability.
Arguments:
channel: Channel associated with output data pipe.
session: Session associated with connection.
Returns True if output deque contains any data.
Cancels the transmission by raising the `.StopError` if there are no output data
and input pipe is closed.
"""
have_data = bool(self.output)
if not have_data and not self.pipe_in_chn.sessions:
raise StopError("EOF", code=ErrorCode.OK)
return have_data
# FBDP server only
[docs]
def handle_output_accept_client(self, channel: Channel, session: FBDPSession) -> None: # pylint: disable=W0613
"""Event handler executed when client connects to OUTPUT data pipe via OPEN message.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
The session attributes `~.FBDPSession.pipe`, `~.FBDPSession.socket`,
`~.FBDPSession.data_format` and `~.FBDPSession.params` contain information sent by
client, and the event handler validates the request.
If request should be rejected, it raises the `.StopError` exception with `~.Error.code`
attribute containing the `~saturnin.protocol.fbdp.ErrorCode` to be returned in
CLOSE message.
Important:
Base implementation validates pipe identification and pipe socket, and converts
data format from string to MIME (in session).
The descendant class that overrides this method must call `super` as first
action.
"""
if session.pipe != self.output_pipe:
raise StopError(f"Unknown data pipe '{session.pipe}'",
code = ErrorCode.PIPE_ENDPOINT_UNAVAILABLE)
# Clients can attach only to our OUTPUT
if session.socket is not PipeSocket.OUTPUT:
raise StopError(f"'{session.socket}' socket not available",
code = ErrorCode.PIPE_ENDPOINT_UNAVAILABLE)
# We work with MIME formats, so we'll convert the format specification to MIME
session.data_format = MIME(session.data_format)
[docs]
def handle_output_schedule_ready(self, channel: Channel, session: FBDPSession) -> None:
"""Event handler executed in order to send the READY message to the client later.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
The event handler may cancel the transmission by raising the `.StopError` exception
with `~.Error.code` attribute containing the `~saturnin.protocol.fbdp.ErrorCode` to
be returned in CLOSE message.
Important:
The base implementation schedules `~.FBDPServer.resend_ready()` according to
`.output_ready_schedule_interval` configuration option.
"""
self.schedule(partial(cast(FBDPServer, channel.protocol).resend_ready,
channel, session),
self.output_ready_schedule_interval)
# FBDP common
[docs]
def handle_output_produce_data(self, channel: Channel, session: FBDPSession, msg: FBDPMessage) -> None:
"""Event handler executed to store data into outgoing DATA message.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with client.
msg: DATA message that will be sent to client.
Important:
The base implementation simply raises `.StopError` with
`~saturnin.protocol.fbdp.ErrorCode.OK` code, so the descendant class must
override this method without `super` call.
The event handler must `popleft()` data from `.output` queue and store them in
`msg.data_frame` attribute. It may also set ACK-REQUEST flag and `type_data`
attribute.
The event handler may cancel the transmission by raising the `StopError`
exception with `code` attribute containing the `~saturnin.protocol.fbdp.ErrorCode`
to be returned in CLOSE message.
Note:
To indicate end of data, raise `.StopError` with `~saturnin.protocol.fbdp.ErrorCode.OK` code.
"""
raise StopError('OK', code=ErrorCode.OK)
[docs]
def handle_output_pipe_closed(self, channel: Channel, session: FBDPSession, msg: FBDPMessage, # pylint: disable=W0613
exc: Exception=None) -> None:
"""Event handler executed when CLOSE message is received or sent, to release any
resources associated with current transmission.
Arguments:
channel: Channel associated with data pipe.
session: Session associated with peer.
msg: Received/sent CLOSE message.
exc: Exception that caused the error.
Important:
The base implementation does next actions:
- If exception is provided, sets service execution outcome to ERROR
and notes exception in details.
- Closes the input pipe if it's still open and `.closing` flag is False.
- Sets the signal to stop the service.
The descendant class that overrides this method must call `super`.
"""
# FDBP converts exceptions raised in our event handler to CLOSE messages, so
# here is the central place to handle errors in data pipe processing.
code: ErrorCode = msg.type_data
if exc is not None:
# Note problem in service execution outcome
if code is not ErrorCode.OK:
self.outcome = Outcome.ERROR
self.details = exc
# Close the input pipe if it's still open
if not self.closing:
self.closing = True
for _session in self.pipe_in_chn.sessions:
cast(FBDPServer, self.pipe_in_chn.protocol).send_close(self.pipe_in_chn,
_session, code, exc)
# Request service to stop
self.stop.set()
self.closing = False