Pool

Please import all classes from namespace libchirp.pool, to make sure you get the right implementation.

from libchirp.pool import Chirp, Config, Loop, Message
libchirp.pool.Chirp(loop, config, \*\*kwargs) Implements a concurrent.futures.ThreadPoolExecutor.
libchirp.pool.Message([cmsg]) Chirp message.

Example

pool_echo.py

from concurrent.futures import Future
from libchirp.pool import Chirp, Config, Loop

res = Future()

class MyChirp(Chirp):
    def handler(self, msg):
        print(msg.data)
        res.set_result(self.send(msg).result())

loop = Loop(); config = Config()
config.DISABLE_ENCRYPTION = True
# Workers are usually asynchronous
config.SYNCHRONOUS = False
try:
    chirp = MyChirp(loop, config)
    try:
        res.result()
    finally:
        chirp.stop()
finally:
    loop.stop()

For a sender see Example.

Chirp

class libchirp.pool.Chirp(loop, config, **kwargs)[source]

Bases: libchirp.ChirpBase, concurrent.futures.thread.ThreadPoolExecutor

Implements a concurrent.futures.ThreadPoolExecutor.

Used when you have to interface with blocking code. Please only use if really needed. The handler is called on different threads, do not share anything or only thread-safe things.

Subclass libchirp.pool.Chirp and implement handler().

If libchirp.Config.AUTO_RELEASE = True the message will be release automatically once the handler returns.

Concurrency is achieved by sending multiple messages and waiting for results later. Use libchirp.MessageBase.identity as key to a dict, to match-up requests and answers.

**kwargs are passed to ThreadPoolExecutor.

See Exceptions.

Parameters:
handler(msg)[source]

Called when a message arrives.

Please implement this method.

If you don’t implement this chirp will release the message-slots regardless of the AUTO_RELEASE setting.

Parameters:msg (libchirp.pool.Message) – The message
identity()
loop

Get the libchirp.Loop used by the chirp instance.

Return type:Loop
map(fn, *iterables, *, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Args:
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.

Exception: If fn(*args) raises for any values.

request(msg, auto_release=True)

Send a message and wait for an answer.

This method returns a libchirp.ChirpFuture.

The result() of the will contain the answer to the message. If you don’t intend to consume the result use send() instead.

By default the message slot used by the response will released. If auto_release is False, you have to release the response-message.

Exceptions, threading and concurrency aspects are the same as send(). Issue: If an answer to a request arrives after the timeout it will be delivered at normal message.

To wait for the request being sent use libchirp.ChirpFuture.send_result().

req = chirp.request(msg)
req.send_result()
answer = req.result()
Parameters:
  • msg (MessageThread) – The message to send.
  • auto_release (bool) – Release the response (default True)
Return type:

concurrent.futures.Future

send(msg)

Send a message. This method returns a Future.

The result will contain the message that has been sent.

In synchronous-mode the future finishes once the remote has released the message. In asynchronous-mode the future finishes once the message has been passed to the operating-system.

Calling result() can raise the exceptions: ConnectionError, TimeoutError, RuntimeError, ValueError, MemoryError. The exception contains the last error message if any generated by chirp. Also Exception for unknown errors. See Exceptions.

See also Concurrency.

Sending different messages from different threads is thread-safe. Sending the same message twice from different threads will lead to undefined behavior. Sending, waiting for the result, switching the thread synchronized (via queue for example), sending is fine, tough.

Parameters:msg (MessageThread) – The message to send.
Return type:concurrent.futures.Future
shutdown(wait=True)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the executor have been reclaimed.
stop()[source]

Stop the chirp-instance.

submit(fn, *args, **kwargs)

Submits a callable to be executed with the given arguments.

Schedules the callable to be executed as fn(*args, **kwargs) and returns a Future instance representing the execution of the callable.

Returns:
A Future representing the given call.

Message

class libchirp.pool.Message(cmsg=None)[source]

Bases: libchirp.MessageThread

Chirp message. To answer to message just replace the data and send it.

Note

The underlaying C type is annotated in parens. The properties of the message use asserts to check if the value has the correct type, length, range. You can disable these with python -O.

address

Get address.

If the message was received: The address of the remote the message was received from.

If the message will be sent: The address to send the message to.

This allows to reply to messages just by replacing data().

Returns:String representation generated by py:class:ipaddress.ip_address.
Return type:string
data

Get the data of the message.

Return type:bytes
has_slot

Return if the message has a slot.

If libchirp.Config.AUTO_RELEASE is False, you have to call release_slot()

Return type:bool
header

Get the header used by upper-layer protocols.

Users should not use it, except if you know what you are doing.

Return type:bytes
identity

Get identify the message and answers to it. (uint8_t[16]).

The identity can be used to find answers to a message, since replying to the message won’t change the identity.

If you need to uniquely identify the message, use the identity/serial pair, since the serial will change when replying to messages. (read-only)

Return type:bytes
port

Get port. (uint16_t).

If the message was received: The port of the remote the message was received from.

If the message will be sent: The port to send the message to.

This allows to reply to messages just by replacing data().

Return type:int
release()

Release the internal message-slot. This method returns a Future.

Will also acknowledge the message if the remote requested a acknowledge-message.

The result of the future will be set to (identity, serial) once the message is released. If the message had no slot, the result will be set to None.

Releasing a message from a different thread is thread-safe. Releasing the same message from different threads twice will lead to undefined behavior. Releasing, waiting for the result, switching the thread synchronized (via queue for example), releasing is fine, tough.

Return type:Future
release_slot()

Release the internal message-slot. This method returns a Future.

Will also acknowledge the message if the remote requested a acknowledge-message.

The result of the future will be set to (identity, serial) once the message is released. If the message had no slot, the result will be set to None.

Releasing a message from a different thread is thread-safe. Releasing the same message from different threads twice will lead to undefined behavior. Releasing, waiting for the result, switching the thread synchronized (via queue for example), releasing is fine, tough.

Return type:Future
remote_identity

Detect the remote instance. (uint8_t[16]).

By default a node’s identity will change on each start of chirp. If multiple peers share state, a change in the remote_identity should trigger a reset of the state. Simply use the remote_identity as key in a dictionary of shared state. (read-only)

Return type:bytes
serial

Get the serial number of the message. (uint32_t).

Increases monotonic. Be aware of overflows, if want to use it for ordering use the delta: serialA - serialB. (read-only)

Return type:int