saturnin.base.transport

Saturnin ZeroMQ messaging - base classes and other definitions.

The messaging framework consists from:

  1. Channels, that manage ZeroMQ sockets for transmission of messages.

  2. Messages, that encapsulate ZeroMQ messages passed through Channels.

  3. Protocol, that is responsible for handling received ZeroMQ messages in accordance to transport protocol definition.

  4. Session, that contains data related to client/server connections.

  5. ChannelManager, that manages communication Channels and is responsible for i/o loop.

Types for type hints

saturnin.base.transport.TZMQMessage

ZMQ multipart message

alias of List[Union[bytes, Frame]]

saturnin.base.transport.TMessageFactory

Message factory callable

alias of Callable[[Optional[TZMQMessage]], Message]

saturnin.base.transport.TSocketOptions

ZMQ socket options

alias of Dict[str, Any]

saturnin.base.transport.TMessageHandler

Message handler

alias of Callable[[Channel, Session, Message], None]

Constants

saturnin.base.transport.INTERNAL_ROUTE: Final[RoutingID] = b'INTERNAL'

Internal routing ID

Classes

class saturnin.base.transport.ChannelManager(*args, **kwargs)[source]

Bases: LoggingIdMixin, TracedMixin

Manager of ZeroMQ communication channels.

__init__(context: Context)[source]
Parameters:

context (Context) – ZMQ Context instance.

create_channel(cls: Type[Channel], name: str, protocol: Protocol, *, routing_id: RoutingID = Sentinel('DEFAULT'), session_type: Type[Session] = Sentinel('DEFAULT'), wait_for: Direction = Direction.NONE, snd_timeout: int = 100, rcv_timeout: int = 100, linger: int = 5000, sock_opts: TSocketOptions = None) Channel[source]

Creates new channel.

Parameters:
  • cls (Type[Channel]) – Channel class.

  • name (str) – Channel name.

  • routing_id (RoutingID) – Channel socket identity (routing ID for peers).

  • protocol (Protocol) – Protocol for serializing/deserializing messages.

  • session_type (Type[Session]) – Session type. DEFAULT session type is obtained from MessageHandler.

  • wait_for (Direction) – Direction(s) of transmission events for this channel processed by wait().

  • snd_timeout (int) – Timeout for send operation in milliseconds, None means infinite.

  • rcv_timeout (int) – Timeout for receive operation in milliseconds, None means infinite.

  • linger (int) – ZMQ socket linger period.

  • sock_opts (TSocketOptions) – Dictionary with socket additional options.

Return type:

Channel

has_pollout() bool[source]

Returns True if wait() will check for POLLOUT event on any channel.

Return type:

bool

shutdown(*, forced: bool = False) None[source]

Close all managed channels.

Calls unbind/disconnect on active channels, and clears all sessions.

Parameters:

forced (bool) – When True, channels are closed with zero LINGER and all ZMQ errors are ignored.

Return type:

None

update_poller(channel: Channel, value: Direction) None[source]

Update poller registration for channel.

Parameters:
Return type:

None

wait(timeout: int = None) Dict[Channel, Direction][source]

Wait for I/O events on channnels.

Parameters:

timeout (int) – The timeout in milliseconds. None value means infinite.

Returns:

Dictionary with channel keys and event values.

Return type:

Dict[Channel, Direction]

warm_up() None[source]

Create and set up ZMQ sockets for all registered channels that does not have socket.

Return type:

None

channels: Dict[str, Channel]

Dictionary with managed channels. Key is Channel.name, value is the Channel.

ctx: Context

ZMQ Context instance.

log_context: Any

Logging context

class saturnin.base.transport.Message[source]

Bases: ABC

Abstract base class for protocol message.

abstract as_zmsg() TZMQMessage[source]

Returns message as sequence of ZMQ data frames.

Return type:

TZMQMessage

abstract clear() None[source]

Clears message data.

Return type:

None

abstract copy() Message[source]

Returns copy of the message.

Return type:

Message

abstract from_zmsg(zmsg: TZMQMessage) None[source]

Populate message data from sequence of ZMQ data frames.

Parameters:

zmsg (TZMQMessage) – Sequence of frames that should be deserialized.

Raises:

InvalidMessageError – If message is not a valid protocol message.

Return type:

None

abstract get_keys() Iterable[source]

Returns iterable of dictionary keys to be used with Protocol.handlers. Keys must be provided in order of precedence (from more specific to general).

Return type:

Iterable

class saturnin.base.transport.SimpleMessage[source]

Bases: Message

Simple protocol message that holds items from ZMQ multipart message in its data attribute.

as_zmsg() TZMQMessage[source]

Returns message as sequence of ZMQ data frames.

Important

This class simply returns the list kept in data attribute. This may cause problems if returned list is subsequently updated. In such a case, create a copy of returned list, or create a subclass that overrides this method.

Return type:

TZMQMessage

clear() None[source]

Clears message data.

Return type:

None

copy() SimpleMessage[source]

Returns copy of the message.

Return type:

SimpleMessage

from_zmsg(zmsg: TZMQMessage) None[source]

Populate message data from sequence of ZMQ data frames.

Parameters:

zmsg (TZMQMessage) – Sequence of frames that should be deserialized.

Raises:

InvalidMessageError – If message is not a valid protocol message.

Return type:

None

Important

This class just makes a copy of items from ZMQ message list into data. All Frame items are ‘unpacked’ into bytes, other items are simply copied.

get_keys() Iterable[source]

Returns iterable of dictionary keys to be used with Protocol.handlers.

The default implementation returns list with first data frame or None followed by ANY sentinel.

Return type:

Iterable

data: List[bytes]

Sequence of data frames

class saturnin.base.transport.Session[source]

Bases: object

Base Peer Session class.

endpoint: ZMQAddress | None

Connected endpoint address, if any

property logging_id: str

Returns _logging_id_ or <class_name>[<routing_id>::<endpoint>]

routing_id: RoutingID

Channel routing ID for connected peer

send_pending: bool

Flag indicating that session is waiting for send

class saturnin.base.transport.Protocol(*args, **kwargs)[source]

Bases: TracedMixin

Base class for protocol.

The main purpose of protocol class is to validate ZMQ messages, create protocol messages and session objects used by Channel to manage transmissions, and to handle messages received from channel. This base class defines common interface for message convertsion and validation. Descendant classes typically add methods for creation of protocol messages and message handling.

__init__(*, session_type: ~typing.Type[~saturnin.base.transport.Session] = <class 'saturnin.base.transport.Session'>)[source]
Parameters:

session_type (Type[Session]) – Class for session objects.

accept_new_session(channel: Channel, routing_id: RoutingID, msg: Message) bool[source]

Validates incoming message that initiated new session/transmission.

Important

Default implementation unconditionally accept new sessions (always returns True).

Parameters:
  • channel (Channel) – Channel that received the message.

  • routing_id (RoutingID) – Routing ID of the sender.

  • msg (Message) – Received message.

Return type:

bool

connect_with_session(channel: Channel) bool[source]

Called by Channel.connect() to determine whether new session should be associated with connected peer.

Note

Because it’s not possible to call Channel.send without session, all protocols that require connecting peers to send a message to initiate transmission must return True.

The default implementation uses Channel.direction to determine the return value (True if it contains Direction.OUT, else False).

Parameters:

channel (Channel) –

Return type:

bool

convert_msg(zmsg: TZMQMessage) Message[source]

Converts ZMQ message into protocol message.

Parameters:

zmsg (TZMQMessage) – ZeroMQ multipart message.

Returns:

New protocol message instance with parsed ZMQ message. The base Protocol implementation returns SimpleMessage instance created by message factory.

Raises:

InvalidMessageError – If message is not a valid protocol message.

Return type:

Message

handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None[source]

Called by handle_msg() on exception in message handler.

Important

Executes on_exception event. Descendant classes that override this method must call super() or execute this event directly.

If this method is not overriden by descendant, and handler for this event is not defined, all exceptions raised in message handlers are silently ignored.

Parameters:
  • channel (Channel) – Channel that received the message.

  • session (Session) – Session for this trasmission

  • msg (Message) – Message associated with exception

  • exc (Exception) – Exception raised while processing the message

Return type:

None

handle_invalid_msg(channel: Channel, session: Session, exc: InvalidMessageError) None[source]

Called by Channel.receive() when message conversion raises InvalidMessageError.

Important

Executes on_invalid_msg event. Descendant classes that override this method must call super() or execute this event directly.

If this method is not overriden by descendant, and handler for this event is not defined, all InvalidMessageError exceptions are silently ignored.

Parameters:
  • channel (Channel) – Channel that received the message.

  • session (Session) – Session for this trasmission

  • exc (InvalidMessageError) – Exception raised while processing the message

Return type:

None

handle_msg(channel: Channel, session: Session, msg: Message) Any[source]

Process message received from peer.

Uses handlers dictionary and Message.get_keys() to select and execute the message handler. Exceptions raised by message handler are processed by on_exception event handler (if assigned). Exceptions raised by event handler are ignored, only RuntimeWarning is emitted.

Parameters:
  • channel (Channel) – Channel that received the message.

  • session (Session) – Session for this trasmission

  • msg (Message) – Received message.

Returns:

Whatever handler returns, or None when handler raises an exception.

Return type:

Any

initialize_session(session: Session) None[source]

Initialize new session instance. The default implementation does nothing.

Parameters:

session (Session) – Session to be initialized.

Return type:

None

is_valid(zmsg: TZMQMessage) bool[source]

Return True if ZMQ message is a valid protocol message, otherwise returns False.

Exceptions other than InvalidMessageError are not caught.

Parameters:

zmsg (TZMQMessage) – ZeroMQ multipart message.

Return type:

bool

validate(zmsg: TZMQMessage) None[source]

Verifies that sequence of ZMQ data frames is a valid protocol message.

If this validation passes without exception, then convert_msg() of the same message must be successful as well.

Important

Implementation in base Protocol performs no validation and always returns True.

Parameters:

zmsg (TZMQMessage) – ZeroMQ multipart message.

Raises:

InvalidMessageError – If ZMQ message is not a valid protocol message.

Return type:

None

OID: str = '1.3.6.1.4.1.53446.1.5'

string with protocol OID (dot notation). MUST be set in child class.

REVISION: int = 1

Protocol revision (default 1)

UID: uuid.UUID = UUID('bd233dd4-bc38-5843-bb3b-f292023b7ece')

UUID instance that identifies the protocol. MUST be set in child class.

handlers: Dict[Any, TMessageHandler]

Message handlers

property logging_id: str

Returns _logging_id_ or <class_name>[<uid>/<revision>]

message_factory

eventsocket for message factory that must return protocol message instance. The default factory produces new SimpleMessage instance on each call.

Parameters:

zmsg – ZeroMQ multipart message.

Important

The returned message SHOULD NOT be initialized from zmsg. This argument is passed to fatory for cases when ZeroMQ message content must be analysed to create instance of appropriate message class. See FBSP message factory for example.

on_exception

eventsocket called by handle_msg() on exception in message handler.

Parameters:
  • channel – Channel that received the message

  • session – Session associated with transmission

  • msg – Received message

  • exc – Exception raised

Important

If handler for this event is not defined, all exceptions raised in message handlers are silently ignored.

The exception thrown in this event handler is also not handled, and propagates to the upper layers (usually an I/O loop).

on_invalid_msg

eventsocket called by Channel.receive() when message conversion raises InvalidMessageError.

Parameters:
  • channel – Channel that received invalid message

  • session – Session associated with transmission

  • exc – Exception raised

Important

If handler for this event is not defined, all InvalidMessageError exceptions are silently ignored.

property session_type: Type[Session]

Class for session objects.

class saturnin.base.transport.Channel(*args, **kwargs)[source]

Bases: TracedMixin

Base Class for ZMQ communication channel (socket).

__init__(mngr: ChannelManager, name: str, protocol: Protocol, routing_id: RoutingID, session_type: Type[Session], wait_for: Direction, snd_timeout: int, rcv_timeout: int, linger: int, sock_opts: TSocketOptions)[source]
Parameters:
  • mngr (ChannelManager) – Channel manager.

  • name (str) – Channel name.

  • routing_id (RoutingID) – Routing ID for ZMQ socket.

  • protocol (Protocol) – Protocol for serializing/deserializing messages.

  • session_type (Type[Session]) – Session type. DEFAULT session type is obtained from Protocol.

  • wait_for (Direction) – Direction(s) of transmission events for this channel processed by ChannelManager.wait().

  • snd_timeout (int) – Timeout for send operation on the socket in milliseconds.

  • rcv_timeout (int) – Timeout for receive operation on the socket in milliseconds.

  • linger (int) – ZMQ socket linger period.

  • sock_opts (TSocketOptions) – Dictionary with socket options that should be set after socket creation.

_adjust()[source]

Called by __init__() to configure the channel parameters.

_configure() None[source]

Called by set_socket() to configure the 0MQ socket.

Return type:

None

bind(endpoint: ZMQAddress) ZMQAddress[source]

Bind the 0MQ socket to an address.

Parameters:

endpoint (ZMQAddress) – Address to bind

Returns:

The endpoint address.

Raises:

ChannelError – On attempt to a) bind another endpoint for PAIR socket, or b) bind to already binded endpoint.

Return type:

ZMQAddress

Important

The returned address MAY differ from original address when wildcard specification is used.

can_send(timeout: int = 0) bool[source]

Returns True if underlying ZMQ socket is ready to accept at least one outgoing message without blocking (or dropping it).

Important

It may return True for some sockets although subsequent send() may fail or block. Typicall situation is ROUTER socket that is attached to multiple peers.

Parameters:

timeout (int) – Timeout in milliseconds passed to socket poll() call.

Return type:

bool

close_socket() None[source]

Close the ZMQ socket.

Note

This will not change the linger value for socket, so underlying socket may not close if there are undelivered messages. The socket is actually closed only after all messages are delivered or discarded by reaching the socket’s LINGER timeout.

Return type:

None

connect(endpoint: ZMQAddress, *, routing_id: RoutingID = None) Session | None[source]

Connect to a remote channel.

Parameters:
  • endpoint (ZMQAddress) – Endpoint address for connected peer.

  • routing_id (RoutingID) – Channel routing ID (required for routed channels).

Returns:

Session associated with connected peer, or None if no session was created.

Raises:

ChannelError – On attempt to a) connect another endpoint for PAIR socket, or b) connect to already connected endpoint.

Return type:

Session | None

create_session(routing_id: RoutingID) Session[source]

Returns newly created session.

Parameters:

routing_id (RoutingID) – Routing ID for new session.

Raises:

ChannelError – When session for specified routing_id already exists.

Return type:

Session

discard_session(session: Session) None[source]

Discard session object.

If Session.endpoint value is set, it also disconnects channel from this endpoint.

Parameters:

session (Session) – The Session to be discarded.

Return type:

None

disconnect(endpoint: ZMQAddress = None) None[source]

Disconnect from a remote socket (undoes a call to connect()).

Important

Does not discards sessions that are bound to any disconnected endpoint. Use discard_session() to disconnect & discard associated session.

Parameters:

endpoint (ZMQAddress) – Endpoint address or None to disconnect from all connected endpoints. Note: The address must be the same as the addresss returned by connect().

Raises:

ChannelError – If channel is not connected to specified endpoint.

Return type:

None

drop_socket() None[source]

Unconditionally drops the ZMQ socket and all pending messages (forces LINGER=0).

Note

All ZMQ errors raised by this operation are silently ignored.

Return type:

None

is_active() bool[source]

Returns True if channel is active (binded or connected).

Return type:

bool

message_available(timeout: int = 0) bool[source]

Returns True if underlying ZMQ socket is ready to receive at least one message without blocking (or error).

Parameters:

timeout (int) – Timeout in milliseconds passed to socket poll() call.

Return type:

bool

receive(timeout: int = None) Any[source]

Receive and process protocol message with assigned protocol.

If protocol raises InvalidMessageError on message conversion, it calls Protocol.on_invalid_msg event handler (if defined) before message is dropped. Exceptions raised by event handler are ignored, only RuntimeWarning is emitted.

If there is no session found for route, it first calls Protocol.accept_new_session(), and message is handled only when new session is accepted.

Parameters:

timeout (int) – The timeout (in milliseconds) to wait for message.

Returns:

Whatever protocol message handler returns, sentinel TIMEOUT when timeout expires, or sentinel INVALID when: a) received message was not valid protocol message, or b) handler raises an exception, or c) there is no session associated with peer and new session was not accepted by protocol.

Return type:

Any

receive_zmsg() TZMQMessage[source]

Receive ZMQ multipart message.

Return type:

TZMQMessage

send(msg: Message, session: Session) int[source]

Sends protocol message.

Parameters:
  • msg (Message) – Message to be sent.

  • session (Session) – Session to which the message belongs.

Returns:

Zero for success, or ZMQ error code.

Return type:

int

send_zmsg(zmsg: TZMQMessage) None[source]

Sends ZMQ multipart message.

Important

Does not handle any ZMQError exception.

Parameters:

zmsg (TZMQMessage) –

Return type:

None

set_socket(socket: Socket) None[source]

Used by ChannelManager to set socket to be used by Channel.

Parameters:

socket (Socket) – 0MQ socket to be used by channel

Return type:

None

set_wait_in(value: bool) None[source]

Enable/disable receiving messages. It sets/clear Direction.IN in wait_for

Parameters:

value (bool) – True to enable incoming messages, False to disable.

Return type:

None

set_wait_out(value: bool, session: Session = None) None[source]

Enable/disable sending messages. It sets/clear Direction.OUT in wait_for.

Parameters:
  • value (bool) – New value for wait_for_out flag.

  • session (Session) – Related session.

Raises:

ChannelError – For routed channel with active sessions when session is not provided.

Return type:

None

Important

If channel has active sessions, the Session.send_pending flag is also altered.

unbind(endpoint: ZMQAddress = None) None[source]

Unbind from an address (undoes a call to bind()).

Parameters:

endpoint (ZMQAddress) – Endpoint address, or None to unbind from all binded endpoints. Note: The address must be the same as the addresss returned by bind().

Raises:

ChannelError – If channel is not binded to specified endpoint.

Return type:

None

wait(timeout: int = None) Direction[source]

Wait for socket events specified by wait_for.

Parameters:

timeout (int) – The timeout (in milliseconds) to wait for an event. If unspecified, will wait forever for an event.

Return type:

Direction

property direction: Direction

Possible directions of transmission over this channel.

endpoints: List[ZMQAddress]

List of binded/connected endpoints

property log_context: Any

Logging context. Returns log_context from ChannelManager.

property logging_id: str

Returns _logging_id_ or <class_name>[<name>]

property manager: ChannelManager

The channel manager to which this channel belongs.

property mode: SocketMode

ZMQ Socket mode.

property name: str

Channel name.

on_output_ready

eventsocket called when channel is ready to accept at least one outgoing message without blocking (or dropping it).

Parameters:

channel – Channel ready for sending a message.

on_receive_failed

eventsocket called by Channel.receive() when receive operation fails with zmq.ZMQError exception other than EAGAIN.

If event returns True, the receive() returns INVALID, otherwise the exception is propagated to the caller.

Parameters:
  • channel – Channel where receive operation failed.

  • err_code – Error code.

on_receive_later

eventsocket called by Channel.receive() when receive operation fails with zmq.Again exception.

If event returns True, the receive() returns INVALID, otherwise the exception is propagated to the caller.

Parameters:

channel – Channel where receive operation failed.

on_send_failed

eventsocket called by Channel.send() when send operation fails with zmq.ZMQError exception other than EAGAIN.

If event returns True, the error is ignored, otherwise the error code is reported to the caller.

Parameters:
  • channel – Channel where send operation failed.

  • session – Session associated with failed transmission.

  • msg – Message that wasn’t sent.

  • err_code – Error code.

on_send_later

eventsocket called by Channel.send() when send operation fails with zmq.Again exception.

If event returns True, the error is ignored, otherwise the error code is reported to the caller.

Parameters:
  • channel – Channel where send operation failed.

  • session – Session associated with failed transmission.

  • msg – Message that wasn’t sent.

on_shutdown

eventsocket called by ChannelManager.shutdown() before the channel is shut down.

Important

All exceptions escaping this method are silently ignored.

Parameters:
  • channel – Channel to be shut down.

  • forced – When True, the channel will be closed with zero LINGER and all ZMQ errors will be ignored.

property protocol: Protocol

Protocol used by channel

property rcv_timeout: int

Timeout for receive operations.

routed: bool

True if channel uses internal routing

property routing_id: RoutingID

Routing_id value for ZMQ socket.

property session: Session

Session associated with channel.

Important: Valid only when channel has exactly one associated session.

sessions: Dict[RoutingID, Session]

Dictionary of active sessions, key=routing_id

property snd_timeout: int

Timeout for send operations.

sock_opts: TSocketOptions

Dictionary with socket options that should be set after socket creation

socket: zmq.Socket

ZMQ socket for transmission of messages

property socket_type: SocketType

ZMQ socket type this channel uses.

property wait_for: Direction

Direction(s) of transmission events for this channel processed by ChannelManager.wait() or Channel.wait().

Raises:

ChannelError – When assigned value contains direction not supported by channel for transmission.

Channels for individual 0MQ socket types

class saturnin.base.transport.DealerChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over DEALER socket.

class saturnin.base.transport.PushChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over PUSH socket.

class saturnin.base.transport.PullChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over PULL socket.

class saturnin.base.transport.PubChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over PUB socket.

class saturnin.base.transport.SubChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over SUB socket.

subscribe(topic: bytes)[source]

Subscribe to topic.

Parameters:

topic (bytes) – ZMQ topic.

unsubscribe(topic: bytes)[source]

Unsubscribe from topic.

Parameters:

topic (bytes) – ZMQ topic.

class saturnin.base.transport.XPubChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over XPUB socket.

_configure()[source]

Create XPUB socket for this channel.

class saturnin.base.transport.XSubChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over XSUB socket.

subscribe(topic: bytes)[source]

Subscribe to topic.

Parameters:

topic (bytes) – ZMQ topic.

unsubscribe(topic: bytes)[source]

Unsubscribe to topic.

Parameters:

topic (bytes) – ZMQ topic.

class saturnin.base.transport.PairChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over PAIR socket.

class saturnin.base.transport.RouterChannel(*args, **kwargs)[source]

Bases: Channel

Communication channel over ROUTER socket.