ezmsg.zmq.pubsub#

Classes

class ZMQPollerSettings(read_addr: str, zmq_topic: str, poll_time: float = 0.1, multipart: bool = False)[source]#

Bases: Settings

Parameters:
read_addr: str#
zmq_topic: str#
poll_time: float = 0.1#
multipart: bool = False#
__init__(read_addr, zmq_topic, poll_time=0.1, multipart=False)#
Parameters:
Return type:

None

class ZMQPollerState[source]#

Bases: State

context: Context#
socket: Socket#
monitor: Socket#
poller: Poller#
class ZMQPollerUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit

Represents a node in the graph which polls data from ZMQ. Data polled from ZMQ are subsequently pushed to the rest of the graph as a ZMQMessage.

Parameters:
  • read_addr – The address from which ZMQ data should be polled.

  • zmq_topic – The ZMQ topic being polled.

  • timeout – The maximum amount of time (in seconds) that should be spent polling a ZMQ socket each time. Defaults to FOREVER_POLL_TIME if not specified.

  • exit_condition – An optional ZMQ event code specifying the event which, if encountered by the monitor, should signal the termination of this particular node’s activity.

  • settings (Settings | None)

OUTPUT = OutputStream:unlocated[<class 'ezmsg.zmq.util.ZMQMessage'>](self.num_buffers=32, self.force_tcp=False)#
SETTINGS#

alias of ZMQPollerSettings

STATE#

alias of ZMQPollerState

initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

shutdown()[source]#

Runs when the Unit terminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async socket_monitor()[source]#
Return type:

None

async zmq_publisher()[source]#
Return type:

AsyncGenerator

class ZMQSenderSettings(write_addr: str, zmq_topic: str, multipart: bool = False, wait_for_sub: bool = True)[source]#

Bases: Settings

Parameters:
  • write_addr (str)

  • zmq_topic (str)

  • multipart (bool)

  • wait_for_sub (bool)

write_addr: str#
zmq_topic: str#
multipart: bool = False#
wait_for_sub: bool = True#
__init__(write_addr, zmq_topic, multipart=False, wait_for_sub=True)#
Parameters:
  • write_addr (str)

  • zmq_topic (str)

  • multipart (bool)

  • wait_for_sub (bool)

Return type:

None

class ZMQSenderState[source]#

Bases: State

context: Context#
socket: Socket#
monitor: Socket#
class ZMQSenderUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit

Represents a node in an ezmsg graph that receives ZMQMessage messages on its

INPUT stream, then publishes each message by writing to a zmq.PUB socket.

Parameters:
  • write_addr – The address to which ZMQ data should be written.

  • zmq_topic – The ZMQ topic being sent.

  • multipart – If True, use socket.send_multipart, else use socket.send.

  • wait_for_sub – If True, the sender will wait for a subscriber before publishing This behaves strangely and should be set False.

  • settings (Settings | None)

INPUT = InputStream:unlocated[<class 'ezmsg.zmq.util.ZMQMessage'>]()#
SETTINGS#

alias of ZMQSenderSettings

STATE#

alias of ZMQSenderState

initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

shutdown()[source]#

Runs when the Unit terminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async zmq_subscriber(message)[source]#
Parameters:

message (ZMQMessage)

Return type:

None

class ZMQSenderSettings(write_addr: str, zmq_topic: str, multipart: bool = False, wait_for_sub: bool = True)[source]#

Bases: Settings

Parameters:
  • write_addr (str)

  • zmq_topic (str)

  • multipart (bool)

  • wait_for_sub (bool)

write_addr: str#
zmq_topic: str#
multipart: bool = False#
wait_for_sub: bool = True#
__init__(write_addr, zmq_topic, multipart=False, wait_for_sub=True)#
Parameters:
  • write_addr (str)

  • zmq_topic (str)

  • multipart (bool)

  • wait_for_sub (bool)

Return type:

None

class ZMQSenderState[source]#

Bases: State

context: Context#
socket: Socket#
monitor: Socket#
class ZMQSenderUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit

Represents a node in an ezmsg graph that receives ZMQMessage messages on its

INPUT stream, then publishes each message by writing to a zmq.PUB socket.

Parameters:
  • write_addr – The address to which ZMQ data should be written.

  • zmq_topic – The ZMQ topic being sent.

  • multipart – If True, use socket.send_multipart, else use socket.send.

  • wait_for_sub – If True, the sender will wait for a subscriber before publishing This behaves strangely and should be set False.

  • settings (Settings | None)

INPUT = InputStream:unlocated[<class 'ezmsg.zmq.util.ZMQMessage'>]()#
SETTINGS#

alias of ZMQSenderSettings

STATE#

alias of ZMQSenderState

initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

shutdown()[source]#

Runs when the Unit terminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async zmq_subscriber(message)[source]#
Parameters:

message (ZMQMessage)

Return type:

None

class ZMQPollerSettings(read_addr: str, zmq_topic: str, poll_time: float = 0.1, multipart: bool = False)[source]#

Bases: Settings

Parameters:
read_addr: str#
zmq_topic: str#
poll_time: float = 0.1#
multipart: bool = False#
__init__(read_addr, zmq_topic, poll_time=0.1, multipart=False)#
Parameters:
Return type:

None

class ZMQPollerState[source]#

Bases: State

context: Context#
socket: Socket#
monitor: Socket#
poller: Poller#
class ZMQPollerUnit(*args, settings=None, **kwargs)[source]#

Bases: Unit

Represents a node in the graph which polls data from ZMQ. Data polled from ZMQ are subsequently pushed to the rest of the graph as a ZMQMessage.

Parameters:
  • read_addr – The address from which ZMQ data should be polled.

  • zmq_topic – The ZMQ topic being polled.

  • timeout – The maximum amount of time (in seconds) that should be spent polling a ZMQ socket each time. Defaults to FOREVER_POLL_TIME if not specified.

  • exit_condition – An optional ZMQ event code specifying the event which, if encountered by the monitor, should signal the termination of this particular node’s activity.

  • settings (Settings | None)

OUTPUT = OutputStream:unlocated[<class 'ezmsg.zmq.util.ZMQMessage'>](self.num_buffers=32, self.force_tcp=False)#
SETTINGS#

alias of ZMQPollerSettings

STATE#

alias of ZMQPollerState

initialize()[source]#

Runs when the Unit is instantiated. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

shutdown()[source]#

Runs when the Unit terminates. This is called from within the same process this unit will live. This lifecycle hook can be overridden. It can be run as async functions by simply adding the async keyword when overriding.

Return type:

None

async socket_monitor()[source]#
Return type:

None

async zmq_publisher()[source]#
Return type:

AsyncGenerator