Welcome to subpub¶
subpub provides a minimalistic, thread-safe, publish-subscribe API for single-process Python applications.
The source code is available on GitHub.
Example¶
The example below demonstrates basic usage.
# Create an instance of the message broker
>>> from subpub import SubPub
>>> sp = SubPub()
# Subscribe to a topic (= any string or regular expression).
# The returned queue `q` is used to retrieve published data:
>>> q = sp.subscribe(r'/food/(\w+)/order-(\d+)')
# Publish any data to topic:
>>> sp.publish('/food/pizza/order-66', "beef pepperoni")
True
# Get the published data from the queue:
>>> match, data = q.get()
>>> data
'beef pepperoni'
# The queue always receives the regexp `match` object as well.
# It can be used to see how the topic matched and get groups:
>>> match.groups()
('pizza', '66')
# Get the published topic:
>>> match.string
'/food/pizza/order-66'
See test cases in test_subpub.py
for more examples.
Key features¶
SubPub’s methods
subscribe
,unsubscribe
,unsubscribe_all
andpublish
are thread-safe.Subscribers use regular experssions to filter on topic.
Subscribers receive published data through queues. (There is no built-in mechanism to register callbacks.)
When an queue is garbage collected,
unsubscribe
is executed automatically (because SubPub only keeps a weak reference to the subscribers’ queues).Publishers can post any Python object as message.
Publishers can use
retain=True
to store a message (as in MQTT).
Reference¶
- class subpub.SubPub(queue_factory=<class '_queue.SimpleQueue'>, *, timeout=None)¶
A threadsafe message broker with publish/subscribe API.
This class implements four methods:
subscribe
- listen to topic and retrieve data throuh a queue.unsubscribe
- stop listening on topic.unsubscribe_all
- stop listening on all topics.publish
- post data to subscribers’ queues.
Example:
>>> sp = SubPub() >>> q = sp.subscribe('helloworld') >>> sp.publish('helloworld', 123) True >>> match, data = q.get() >>> print(data) 123 >>> print(match.string) helloworld
- __init__(queue_factory=<class '_queue.SimpleQueue'>, *, timeout=None)¶
Initialization of SubPub instance.
Example:
>>> sp = SubPub() >>> print(sp) SubPub(queue_factory=SimpleQueue, timeout=None)
- Parameters
queue (Callable) – Default queue factory. If used, this parameter must be a callable which returns an instance of a queue-like object (implements get/put with timeout keyword argument). Used whenever a client subscribes unless the client provides its own queue. Defaults to
queue.SimpleQueue
.timeout (float) – Default timeout used for subscribe/publish when not specified. Used when putting data in client’s queues.
- publish(topic: str, data=None, *, retain=False, timeout=None)¶
Publish data to topic.
This method loops through the clients subscribed regex-topics and searches for a match on
topic
. If there is a match, there.Match
anddata
objects will be wrapped in aMsg
, which then will be put in the client’s subscription queue.Examples:
>>> sp = SubPub() >>> sp.publish('helloworld', data='Hi, there!') False >>> sp.publish('helloworld', 'Hi, new client', retain=True) False
The boolean returned, in this case
False
, indicates if it existed at least one client that received the data.- Parameters
topic (str) – Topic string the data will be published to.
data (any Python object) – Data to be published.
retain (bool) – If true, the published data will be remembered. Each client that subscribes to a regex-topic matching this topic, will immediately receive the retained data when they subscribe. To stop this behavior, make a publish to the same topic with data
None
and retainTrue
.timeout (float) – Timeout when putting published data in subscribers’ queues. The behavior is client specific and depends what type of queue the client uses. If timeout is a positive number, and the default queue
queue.SimpleQueue
is used, the publish blocks at most timeout seconds and raises thequeue.Full
exception if no free slot was available within that time. If timeout isNone
, block if necessary until a free slot is available. Defaults toself.timeout
.
- Returns
Returns
True
if a subscribed client queue existed and the data was successfully put in that queue. If no receiver was found, returnFalse
.- Return type
bool
- subscribe(topic: str, *, queue=None, timeout=None, **args)¶
Subscribe to topic and receive published data in queue.
If topic is a string, it will be compiled to a regular expression using
topic = re.compile(topic)
.A custom receiver
queue
can be provided. If not, a new one will be created byself.queue_factory
with the optional**args
arguments.When data is published to a topic which matches the this topic, the queue will receive an instance of
Msg
which contains the regex-match object and the data.Subscribe to plain string:
>>> sp = SubPub() >>> q1 = sp.subscribe('helloworld')
Subscribe to regex:
>>> q2 = sp.subscribe(r'sensor/\d+/temperature')
Subscribe to regex with named groups:
>>> q3 = sp.subscribe(r'worker/(?P<id>\d+)/(?P<status>done|error)')
The
MqttTopic
class can be used to build topics using MQTT syntax for wildcards:>>> t = MqttTopic('sensor/+/temperature/#') >>> t.as_regexp() re.compile('sensor/([^/]*)/temperature/(.*)$') >>> q4 = sp.subscribe(t)
- Parameters
topic (str or
re.Pattern
) – Regular expression that match topics of interest. If a string, the topic will be compiled to a regular expression withtopic = re.compile(topic)
.queue (Queue like object.) – An instance of a queue-like object (implements get/put with timeout keyword argument). Will be used as receiver queue for published data. If used not, a new one will be created by
self.queue_factory
with the optional**args
arguments.timeout (float) – Timeout when putting retained data in subscriber’s queue. The behavior is client specific and depends what type of queue the client uses. If timeout is a positive number, and the default queue
queue.SimpleQueue
is used, the publish blocks at most timeout seconds and raises thequeue.Full
exception if no free slot was available within that time. If timeout isNone
, block if necessary until a free slot is available. Defaults toself.timeout
.
- Returns
Queue instance which will receive published data whenever the published topic matches the subscribers regex-topic. The data is wrapped together with the
re.Match
object in an instance ofMsg
.- Return type
Queue-like object.
- unsubscribe(topic: str) → bool¶
Unsubscribe to topic.
- Parameters
topic (str or
re.Pattern
) – Same as forsubscribe
.- Returns
Returns False if the caller wasn’t actually subscribed, otherwise True.
- Return type
int
- unsubscribe_all() → int¶
Unsubscribe to all clients
- Returns
Returns the number of topics that got unsubscribed.
- Return type
int
- class subpub.Msg(match, data)¶
Msg is the item sent/received in subscriber’s queues.
It carries the regular experssion
re.Match
object and the publisheddata
(data can be any Python object).- property data¶
Alias for field number 1
- property match¶
Alias for field number 0
- class subpub.ExceptionAwareQueue¶
Raise exception instances when received in queue.
This is useful if you want to publish an exception instance to a client and have it raised automatically when the client receives it by calling
.get()
.- get(block=True, timeout=None)¶
If item retrived is an Exception instance - raise it.
- get_nowait()¶
Same as
get()
but withblock=False
.
- class subpub.MqttTopic(seq)¶
String which represents a topic in MQTT format.
Instead of normal Python regex, the MQTT wildcards, ‘+’ and ‘#’, can be used instead.
An instance of MqttTopic can be used as topic argument to the SubPub methods. It will be converted to a regular expression automatically:
>>> MqttTopic('room/3/sensor/+/temperature/#').as_regexp() re.compile('room/3/sensor/([^/]*)/temperature/(.*)$')
- as_regexp(flags=0)¶
Replace MQTT wildcards and return regular expression.
- class subpub.AsyncSubPub(queue_factory=<class 'asyncio.queues.Queue'>, *, timeout=None)¶
Asynchronous implementation of SubPub.
It has the same API as SubPub but is based on the asyncio paradigm.
- __init__(queue_factory=<class 'asyncio.queues.Queue'>, *, timeout=None)¶
Initialization of AsyncSubPub instance.
- async publish(topic: str, data=None, *, retain=False, timeout=None)¶
Publish data to topic.
- async subscribe(topic: str, *, queue=None, timeout=None, **args)¶
Subscribe to topic and receive published data in queue.
- async unsubscribe(topic: str) → bool¶
Unsubscribe to topic.
- async unsubscribe_all() → int¶
Unsubscribe to all clients