Queue

Please import all classes from namespace libchirp.queue, to make sure you get the right implementation.

from libchirp.queue import Chirp, Config, Loop, Message
libchirp.queue.Chirp(loop, config) Implements the queue.Queue-based interface.
libchirp.queue.Message([cmsg]) Chirp message.

Example

queue_sender.py

from libchirp.queue import Chirp, Config, Loop, Message

loop = Loop(); config = Config(); message = Message()
config.DISABLE_ENCRYPTION = True
config.PORT = 2992
message.data = b'hello'
message.address = "127.0.0.1"
message.port = 2998
try:
    chirp = Chirp(loop, config)
    chirp.send(message).result()
    msg = chirp.get()
    msg.release().result()
    print(msg.data)
finally:
    chirp.stop()
    loop.stop()

For a echo-server see Example.

The usual pattern for the queue interface is:

config.ACKNOWLEDGE = False
while True:
   msg = chirp.get()
   # Do heavy work
   chirp.send(msg)  # Send the result
   msg.release()  # Notify the remote that the work is done

Chirp

class libchirp.queue.Chirp(loop, config)[source]

Bases: libchirp.ChirpBase, queue.Queue

Implements the queue.Queue-based interface.

Concurrency is achieved by sending multiple messages and waiting for results later. Use libchirp.queue.Message.identity as key to a dict, to match-up requests and answers.

See Exceptions.

Parameters:
disable_queue

Get if the queue is disabled.

If the queue is disabled no messages will be delivered to the queue and the messages will always be released.

empty()

Return True if the queue is empty, False otherwise (not reliable!).

This method is likely to be removed at some point. Use qsize() == 0 as a direct substitute, but be aware that either approach risks a race condition where a queue can grow before the result of empty() or qsize() can be used.

To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join() method.

full()

Return True if the queue is full, False otherwise (not reliable!).

This method is likely to be removed at some point. Use qsize() >= n as a direct substitute, but be aware that either approach risks a race condition where a queue can shrink before the result of full() or qsize() can be used.

get(*args, **kwargs)[source]

Remove, release and return an message from the queue.

If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception if no item was available within that time. Otherwise (block is false), return an item if one is immediately available, else raise the Empty exception (timeout is ignored in that case).

get_nowait()[source]

Equivalent to get(False).

identity()
join()

Blocks until all items in the Queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, join() unblocks.

loop

Get the libchirp.Loop used by the chirp instance.

Return type:Loop
put(item, block=True, timeout=None)

Put an item into the queue.

If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until a free slot is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Full exception if no free slot was available within that time. Otherwise (‘block’ is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (‘timeout’ is ignored in that case).

put_nowait(item)

Put an item into the queue without blocking.

Only enqueue the item if a free slot is immediately available. Otherwise raise the Full exception.

qsize()

Return the approximate size of the queue (not reliable!).

request(msg, auto_release=True)

Send a message and wait for an answer.

This method returns a libchirp.ChirpFuture.

The result() of the will contain the answer to the message. If you don’t intend to consume the result use send() instead.

By default the message slot used by the response will released. If auto_release is False, you have to release the response-message.

Exceptions, threading and concurrency aspects are the same as send(). Issue: If an answer to a request arrives after the timeout it will be delivered at normal message.

To wait for the request being sent use libchirp.ChirpFuture.send_result().

req = chirp.request(msg)
req.send_result()
answer = req.result()
Parameters:
  • msg (MessageThread) – The message to send.
  • auto_release (bool) – Release the response (default True)
Return type:

concurrent.futures.Future

send(msg)

Send a message. This method returns a Future.

The result will contain the message that has been sent.

In synchronous-mode the future finishes once the remote has released the message. In asynchronous-mode the future finishes once the message has been passed to the operating-system.

Calling result() can raise the exceptions: ConnectionError, TimeoutError, RuntimeError, ValueError, MemoryError. The exception contains the last error message if any generated by chirp. Also Exception for unknown errors. See Exceptions.

See also Concurrency.

Sending different messages from different threads is thread-safe. Sending the same message twice from different threads will lead to undefined behavior. Sending, waiting for the result, switching the thread synchronized (via queue for example), sending is fine, tough.

Parameters:msg (MessageThread) – The message to send.
Return type:concurrent.futures.Future
stop()

Stop the chirp-instance.

task_done()

Indicate that a formerly enqueued task is complete.

Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

Message

class libchirp.queue.Message(cmsg=None)[source]

Bases: libchirp.MessageThread

Chirp message. To answer to message just replace the data and send it.

Note

The underlaying C type is annotated in parens. The properties of the message use asserts to check if the value has the correct type, length, range. You can disable these with python -O.

address

Get address.

If the message was received: The address of the remote the message was received from.

If the message will be sent: The address to send the message to.

This allows to reply to messages just by replacing data().

Returns:String representation generated by py:class:ipaddress.ip_address.
Return type:string
data

Get the data of the message.

Return type:bytes
has_slot

Return if the message has a slot.

If libchirp.Config.AUTO_RELEASE is False, you have to call release_slot()

Return type:bool
header

Get the header used by upper-layer protocols.

Users should not use it, except if you know what you are doing.

Return type:bytes
identity

Get identify the message and answers to it. (uint8_t[16]).

The identity can be used to find answers to a message, since replying to the message won’t change the identity.

If you need to uniquely identify the message, use the identity/serial pair, since the serial will change when replying to messages. (read-only)

Return type:bytes
port

Get port. (uint16_t).

If the message was received: The port of the remote the message was received from.

If the message will be sent: The port to send the message to.

This allows to reply to messages just by replacing data().

Return type:int
release()

Release the internal message-slot. This method returns a Future.

Will also acknowledge the message if the remote requested a acknowledge-message.

The result of the future will be set to (identity, serial) once the message is released. If the message had no slot, the result will be set to None.

Releasing a message from a different thread is thread-safe. Releasing the same message from different threads twice will lead to undefined behavior. Releasing, waiting for the result, switching the thread synchronized (via queue for example), releasing is fine, tough.

Return type:Future
release_slot()

Release the internal message-slot. This method returns a Future.

Will also acknowledge the message if the remote requested a acknowledge-message.

The result of the future will be set to (identity, serial) once the message is released. If the message had no slot, the result will be set to None.

Releasing a message from a different thread is thread-safe. Releasing the same message from different threads twice will lead to undefined behavior. Releasing, waiting for the result, switching the thread synchronized (via queue for example), releasing is fine, tough.

Return type:Future
remote_identity

Detect the remote instance. (uint8_t[16]).

By default a node’s identity will change on each start of chirp. If multiple peers share state, a change in the remote_identity should trigger a reset of the state. Simply use the remote_identity as key in a dictionary of shared state. (read-only)

Return type:bytes
serial

Get the serial number of the message. (uint32_t).

Increases monotonic. Be aware of overflows, if want to use it for ordering use the delta: serialA - serialB. (read-only)

Return type:int