saturnin.base.transport¶
Saturnin ZeroMQ messaging - base classes and other definitions.
The messaging framework consists from:
Channels, that manage ZeroMQ sockets for transmission of messages.
Messages, that encapsulate ZeroMQ messages passed through Channels.
Protocol, that is responsible for handling received ZeroMQ messages in accordance to transport protocol definition.
Session, that contains data related to client/server connections.
ChannelManager, that manages communication Channels and is responsible for i/o loop.
Types for type hints¶
- saturnin.base.transport.TMessageFactory¶
Message factory callable
alias of
Callable
[[Optional
[TZMQMessage
]],Message
]
Constants¶
- saturnin.base.transport.INTERNAL_ROUTE: Final[RoutingID] = b'INTERNAL'¶
Internal routing ID
Classes¶
- class saturnin.base.transport.ChannelManager(*args, **kwargs)[source]¶
Bases:
LoggingIdMixin
,TracedMixin
Manager of ZeroMQ communication channels.
- create_channel(cls: Type[Channel], name: str, protocol: Protocol, *, routing_id: RoutingID = Sentinel('DEFAULT'), session_type: Type[Session] = Sentinel('DEFAULT'), wait_for: Direction = Direction.NONE, snd_timeout: int = 100, rcv_timeout: int = 100, linger: int = 5000, sock_opts: TSocketOptions = None) Channel [source]¶
Creates new channel.
- Parameters:
name (str) – Channel name.
routing_id (RoutingID) – Channel socket identity (routing ID for peers).
protocol (Protocol) – Protocol for serializing/deserializing messages.
session_type (Type[Session]) – Session type. DEFAULT session type is obtained from MessageHandler.
wait_for (Direction) – Direction(s) of transmission events for this channel processed by
wait()
.snd_timeout (int) – Timeout for send operation in milliseconds, None means infinite.
rcv_timeout (int) – Timeout for receive operation in milliseconds, None means infinite.
linger (int) – ZMQ socket linger period.
sock_opts (TSocketOptions) – Dictionary with socket additional options.
- Return type:
- has_pollout() bool [source]¶
Returns True if
wait()
will check for POLLOUT event on any channel.- Return type:
- shutdown(*, forced: bool = False) None [source]¶
Close all managed channels.
Calls unbind/disconnect on active channels, and clears all sessions.
- Parameters:
forced (bool) – When True, channels are closed with zero LINGER and all ZMQ errors are ignored.
- Return type:
None
- update_poller(channel: Channel, value: Direction) None [source]¶
Update poller registration for channel.
- warm_up() None [source]¶
Create and set up ZMQ sockets for all registered channels that does not have socket.
- Return type:
None
- class saturnin.base.transport.Message[source]¶
Bases:
ABC
Abstract base class for protocol message.
- abstract as_zmsg() TZMQMessage [source]¶
Returns message as sequence of ZMQ data frames.
- Return type:
TZMQMessage
- abstract from_zmsg(zmsg: TZMQMessage) None [source]¶
Populate message data from sequence of ZMQ data frames.
- Parameters:
zmsg (TZMQMessage) – Sequence of frames that should be deserialized.
- Raises:
InvalidMessageError – If message is not a valid protocol message.
- Return type:
None
- abstract get_keys() Iterable [source]¶
Returns iterable of dictionary keys to be used with
Protocol.handlers
. Keys must be provided in order of precedence (from more specific to general).- Return type:
- class saturnin.base.transport.SimpleMessage[source]¶
Bases:
Message
Simple protocol message that holds items from ZMQ multipart message in its
data
attribute.- as_zmsg() TZMQMessage [source]¶
Returns message as sequence of ZMQ data frames.
Important
This class simply returns the list kept in
data
attribute. This may cause problems if returned list is subsequently updated. In such a case, create a copy of returned list, or create a subclass that overrides this method.- Return type:
TZMQMessage
- copy() SimpleMessage [source]¶
Returns copy of the message.
- Return type:
- from_zmsg(zmsg: TZMQMessage) None [source]¶
Populate message data from sequence of ZMQ data frames.
- Parameters:
zmsg (TZMQMessage) – Sequence of frames that should be deserialized.
- Raises:
InvalidMessageError – If message is not a valid protocol message.
- Return type:
None
- get_keys() Iterable [source]¶
Returns iterable of dictionary keys to be used with
Protocol.handlers
.The default implementation returns list with first data frame or None followed by
ANY
sentinel.- Return type:
- class saturnin.base.transport.Session[source]¶
Bases:
object
Base Peer Session class.
- routing_id: RoutingID¶
Channel routing ID for connected peer
- class saturnin.base.transport.Protocol(*args, **kwargs)[source]¶
Bases:
TracedMixin
Base class for protocol.
The main purpose of protocol class is to validate ZMQ messages, create protocol messages and session objects used by
Channel
to manage transmissions, and to handle messages received from channel. This base class defines common interface for message convertsion and validation. Descendant classes typically add methods for creation of protocol messages and message handling.- __init__(*, session_type: ~typing.Type[~saturnin.base.transport.Session] = <class 'saturnin.base.transport.Session'>)[source]¶
- accept_new_session(channel: Channel, routing_id: RoutingID, msg: Message) bool [source]¶
Validates incoming message that initiated new session/transmission.
Important
Default implementation unconditionally accept new sessions (always returns True).
- connect_with_session(channel: Channel) bool [source]¶
Called by
Channel.connect()
to determine whether new session should be associated with connected peer.Note
Because it’s not possible to call
Channel.send
without session, all protocols that require connecting peers to send a message to initiate transmission must return True.The default implementation uses
Channel.direction
to determine the return value (True if it containsDirection.OUT
, else False).
- convert_msg(zmsg: TZMQMessage) Message [source]¶
Converts ZMQ message into protocol message.
- Parameters:
zmsg (TZMQMessage) – ZeroMQ multipart message.
- Returns:
New protocol message instance with parsed ZMQ message. The base Protocol implementation returns
SimpleMessage
instance created by message factory.- Raises:
InvalidMessageError – If message is not a valid protocol message.
- Return type:
- handle_exception(channel: Channel, session: Session, msg: Message, exc: Exception) None [source]¶
Called by
handle_msg()
on exception in message handler.Important
Executes
on_exception
event. Descendant classes that override this method must call super() or execute this event directly.If this method is not overriden by descendant, and handler for this event is not defined, all exceptions raised in message handlers are silently ignored.
- handle_invalid_msg(channel: Channel, session: Session, exc: InvalidMessageError) None [source]¶
Called by
Channel.receive()
when message conversion raisesInvalidMessageError
.Important
Executes
on_invalid_msg
event. Descendant classes that override this method must call super() or execute this event directly.If this method is not overriden by descendant, and handler for this event is not defined, all
InvalidMessageError
exceptions are silently ignored.- Parameters:
channel (Channel) – Channel that received the message.
session (Session) – Session for this trasmission
exc (InvalidMessageError) – Exception raised while processing the message
- Return type:
None
- handle_msg(channel: Channel, session: Session, msg: Message) Any [source]¶
Process message received from peer.
Uses
handlers
dictionary andMessage.get_keys()
to select and execute the message handler. Exceptions raised by message handler are processed byon_exception
event handler (if assigned). Exceptions raised by event handler are ignored, onlyRuntimeWarning
is emitted.
- initialize_session(session: Session) None [source]¶
Initialize new session instance. The default implementation does nothing.
- Parameters:
session (Session) – Session to be initialized.
- Return type:
None
- is_valid(zmsg: TZMQMessage) bool [source]¶
Return True if ZMQ message is a valid protocol message, otherwise returns False.
Exceptions other than
InvalidMessageError
are not caught.- Parameters:
zmsg (TZMQMessage) – ZeroMQ multipart message.
- Return type:
- validate(zmsg: TZMQMessage) None [source]¶
Verifies that sequence of ZMQ data frames is a valid protocol message.
If this validation passes without exception, then
convert_msg()
of the same message must be successful as well.Important
Implementation in base Protocol performs no validation and always returns True.
- Parameters:
zmsg (TZMQMessage) – ZeroMQ multipart message.
- Raises:
InvalidMessageError – If ZMQ message is not a valid protocol message.
- Return type:
None
- OID: str = '1.3.6.1.4.1.53446.1.5'¶
string with protocol OID (dot notation). MUST be set in child class.
- UID: uuid.UUID = UUID('bd233dd4-bc38-5843-bb3b-f292023b7ece')¶
UUID instance that identifies the protocol. MUST be set in child class.
- handlers: Dict[Any, TMessageHandler]¶
Message handlers
- message_factory¶
eventsocket
for message factory that must return protocol message instance. The default factory produces newSimpleMessage
instance on each call.- Parameters:
zmsg – ZeroMQ multipart message.
Important
The returned message SHOULD NOT be initialized from
zmsg
. This argument is passed to fatory for cases when ZeroMQ message content must be analysed to create instance of appropriate message class. SeeFBSP
message factory for example.
- on_exception¶
eventsocket
called byhandle_msg()
on exception in message handler.- Parameters:
channel – Channel that received the message
session – Session associated with transmission
msg – Received message
exc – Exception raised
Important
If handler for this event is not defined, all exceptions raised in message handlers are silently ignored.
The exception thrown in this event handler is also not handled, and propagates to the upper layers (usually an I/O loop).
- on_invalid_msg¶
eventsocket
called byChannel.receive()
when message conversion raisesInvalidMessageError
.- Parameters:
channel – Channel that received invalid message
session – Session associated with transmission
exc – Exception raised
Important
If handler for this event is not defined, all
InvalidMessageError
exceptions are silently ignored.
- class saturnin.base.transport.Channel(*args, **kwargs)[source]¶
Bases:
TracedMixin
Base Class for ZMQ communication channel (socket).
- __init__(mngr: ChannelManager, name: str, protocol: Protocol, routing_id: RoutingID, session_type: Type[Session], wait_for: Direction, snd_timeout: int, rcv_timeout: int, linger: int, sock_opts: TSocketOptions)[source]¶
- Parameters:
mngr (ChannelManager) – Channel manager.
name (str) – Channel name.
routing_id (RoutingID) – Routing ID for ZMQ socket.
protocol (Protocol) – Protocol for serializing/deserializing messages.
session_type (Type[Session]) – Session type.
DEFAULT
session type is obtained from Protocol.wait_for (Direction) – Direction(s) of transmission events for this channel processed by
ChannelManager.wait()
.snd_timeout (int) – Timeout for send operation on the socket in milliseconds.
rcv_timeout (int) – Timeout for receive operation on the socket in milliseconds.
linger (int) – ZMQ socket linger period.
sock_opts (TSocketOptions) – Dictionary with socket options that should be set after socket creation.
- _adjust()[source]¶
Called by
__init__()
to configure the channel parameters.
- _configure() None [source]¶
Called by
set_socket()
to configure the 0MQ socket.- Return type:
None
- bind(endpoint: ZMQAddress) ZMQAddress [source]¶
Bind the 0MQ socket to an address.
- Parameters:
endpoint (ZMQAddress) – Address to bind
- Returns:
The endpoint address.
- Raises:
ChannelError – On attempt to a) bind another endpoint for PAIR socket, or b) bind to already binded endpoint.
- Return type:
Important
The returned address MAY differ from original address when wildcard specification is used.
- can_send(timeout: int = 0) bool [source]¶
Returns True if underlying ZMQ socket is ready to accept at least one outgoing message without blocking (or dropping it).
Important
It may return True for some sockets although subsequent
send()
may fail or block. Typicall situation is ROUTER socket that is attached to multiple peers.
- close_socket() None [source]¶
Close the ZMQ socket.
Note
This will not change the linger value for socket, so underlying socket may not close if there are undelivered messages. The socket is actually closed only after all messages are delivered or discarded by reaching the socket’s LINGER timeout.
- Return type:
None
- connect(endpoint: ZMQAddress, *, routing_id: RoutingID = None) Session | None [source]¶
Connect to a remote channel.
- Parameters:
endpoint (ZMQAddress) – Endpoint address for connected peer.
routing_id (RoutingID) – Channel routing ID (required for routed channels).
- Returns:
Session associated with connected peer, or None if no session was created.
- Raises:
ChannelError – On attempt to a) connect another endpoint for PAIR socket, or b) connect to already connected endpoint.
- Return type:
Session | None
- create_session(routing_id: RoutingID) Session [source]¶
Returns newly created session.
- Parameters:
routing_id (RoutingID) – Routing ID for new session.
- Raises:
ChannelError – When session for specified
routing_id
already exists.- Return type:
- discard_session(session: Session) None [source]¶
Discard session object.
If
Session.endpoint
value is set, it also disconnects channel from this endpoint.- Parameters:
session (Session) – The Session to be discarded.
- Return type:
None
- disconnect(endpoint: ZMQAddress = None) None [source]¶
Disconnect from a remote socket (undoes a call to
connect()
).Important
Does not discards sessions that are bound to any disconnected endpoint. Use
discard_session()
to disconnect & discard associated session.- Parameters:
endpoint (ZMQAddress) – Endpoint address or None to disconnect from all connected endpoints. Note: The address must be the same as the addresss returned by
connect()
.- Raises:
ChannelError – If channel is not connected to specified
endpoint
.- Return type:
None
- drop_socket() None [source]¶
Unconditionally drops the ZMQ socket and all pending messages (forces LINGER=0).
Note
All ZMQ errors raised by this operation are silently ignored.
- Return type:
None
- message_available(timeout: int = 0) bool [source]¶
Returns True if underlying ZMQ socket is ready to receive at least one message without blocking (or error).
- receive(timeout: int = None) Any [source]¶
Receive and process protocol message with assigned protocol.
If protocol raises
InvalidMessageError
on message conversion, it callsProtocol.on_invalid_msg
event handler (if defined) before message is dropped. Exceptions raised by event handler are ignored, onlyRuntimeWarning
is emitted.If there is no session found for route, it first calls
Protocol.accept_new_session()
, and message is handled only when new session is accepted.- Parameters:
timeout (int) – The timeout (in milliseconds) to wait for message.
- Returns:
Whatever protocol message handler returns, sentinel
TIMEOUT
when timeout expires, or sentinelINVALID
when: a) received message was not valid protocol message, or b) handler raises an exception, or c) there is no session associated with peer and new session was not accepted by protocol.- Return type:
- send_zmsg(zmsg: TZMQMessage) None [source]¶
Sends ZMQ multipart message.
Important
Does not handle any ZMQError exception.
- Parameters:
zmsg (TZMQMessage) –
- Return type:
None
- set_socket(socket: Socket) None [source]¶
Used by
ChannelManager
to set socket to be used byChannel
.- Parameters:
socket (Socket) – 0MQ socket to be used by channel
- Return type:
None
- set_wait_in(value: bool) None [source]¶
Enable/disable receiving messages. It sets/clear
Direction.IN
inwait_for
- set_wait_out(value: bool, session: Session = None) None [source]¶
Enable/disable sending messages. It sets/clear
Direction.OUT
inwait_for
.- Parameters:
- Raises:
ChannelError – For routed channel with active sessions when session is not provided.
- Return type:
None
Important
If channel has active sessions, the
Session.send_pending
flag is also altered.
- unbind(endpoint: ZMQAddress = None) None [source]¶
Unbind from an address (undoes a call to
bind()
).- Parameters:
endpoint (ZMQAddress) – Endpoint address, or
None
to unbind from all binded endpoints. Note: The address must be the same as the addresss returned bybind()
.- Raises:
ChannelError – If channel is not binded to specified
endpoint
.- Return type:
None
- endpoints: List[ZMQAddress]¶
List of binded/connected endpoints
- property log_context: Any¶
Logging context. Returns
log_context
from ChannelManager.
- property manager: ChannelManager¶
The channel manager to which this channel belongs.
- property mode: SocketMode¶
ZMQ Socket mode.
- on_output_ready¶
eventsocket
called when channel is ready to accept at least one outgoing message without blocking (or dropping it).- Parameters:
channel – Channel ready for sending a message.
- on_receive_failed¶
eventsocket
called byChannel.receive()
when receive operation fails withzmq.ZMQError
exception other than EAGAIN.If event returns
True
, thereceive()
returnsINVALID
, otherwise the exception is propagated to the caller.- Parameters:
channel – Channel where receive operation failed.
err_code – Error code.
- on_receive_later¶
eventsocket
called byChannel.receive()
when receive operation fails withzmq.Again
exception.If event returns True, the receive() returns INVALID, otherwise the exception is propagated to the caller.
- Parameters:
channel – Channel where receive operation failed.
- on_send_failed¶
eventsocket
called byChannel.send()
when send operation fails withzmq.ZMQError
exception other thanEAGAIN
.If event returns
True
, the error is ignored, otherwise the error code is reported to the caller.- Parameters:
channel – Channel where send operation failed.
session – Session associated with failed transmission.
msg – Message that wasn’t sent.
err_code – Error code.
- on_send_later¶
eventsocket
called byChannel.send()
when send operation fails withzmq.Again
exception.If event returns
True
, the error is ignored, otherwise the error code is reported to the caller.- Parameters:
channel – Channel where send operation failed.
session – Session associated with failed transmission.
msg – Message that wasn’t sent.
- on_shutdown¶
eventsocket
called byChannelManager.shutdown()
before the channel is shut down.Important
All exceptions escaping this method are silently ignored.
- Parameters:
channel – Channel to be shut down.
forced – When True, the channel will be closed with zero LINGER and all ZMQ errors will be ignored.
- property routing_id: RoutingID¶
Routing_id value for ZMQ socket.
- property session: Session¶
Session associated with channel.
Important: Valid only when channel has exactly one associated session.
- sock_opts: TSocketOptions¶
Dictionary with socket options that should be set after socket creation
- socket: zmq.Socket¶
ZMQ socket for transmission of messages
- property socket_type: SocketType¶
ZMQ socket type this channel uses.
- property wait_for: Direction¶
Direction(s) of transmission events for this channel processed by
ChannelManager.wait()
orChannel.wait()
.- Raises:
ChannelError – When assigned value contains direction not supported by channel for transmission.
Channels for individual 0MQ socket types¶
- class saturnin.base.transport.DealerChannel(*args, **kwargs)[source]¶
Bases:
Channel
Communication channel over DEALER socket.
- class saturnin.base.transport.PushChannel(*args, **kwargs)[source]¶
Bases:
Channel
Communication channel over PUSH socket.
- class saturnin.base.transport.PullChannel(*args, **kwargs)[source]¶
Bases:
Channel
Communication channel over PULL socket.
- class saturnin.base.transport.PubChannel(*args, **kwargs)[source]¶
Bases:
Channel
Communication channel over PUB socket.
- class saturnin.base.transport.SubChannel(*args, **kwargs)[source]¶
Bases:
Channel
Communication channel over SUB socket.
- class saturnin.base.transport.XPubChannel(*args, **kwargs)[source]¶
Bases:
Channel
Communication channel over XPUB socket.
- class saturnin.base.transport.XSubChannel(*args, **kwargs)[source]¶
Bases:
Channel
Communication channel over XSUB socket.