Pool¶
Please import all classes from namespace libchirp.pool
, to make sure
you get the right implementation.
from libchirp.pool import Chirp, Config, Loop, Message
libchirp.pool.Chirp (loop, config, \*\*kwargs) |
Implements a concurrent.futures.ThreadPoolExecutor . |
libchirp.pool.Message ([cmsg]) |
Chirp message. |
Example¶
pool_echo.py
from concurrent.futures import Future
from libchirp.pool import Chirp, Config, Loop
res = Future()
class MyChirp(Chirp):
def handler(self, msg):
print(msg.data)
res.set_result(self.send(msg).result())
loop = Loop(); config = Config()
config.DISABLE_ENCRYPTION = True
# Workers are usually asynchronous
config.SYNCHRONOUS = False
try:
chirp = MyChirp(loop, config)
try:
res.result()
finally:
chirp.stop()
finally:
loop.stop()
For a sender see Example.
Chirp¶
-
class
libchirp.pool.
Chirp
(loop, config, **kwargs)[source]¶ Bases:
libchirp.ChirpBase
,concurrent.futures.thread.ThreadPoolExecutor
Implements a
concurrent.futures.ThreadPoolExecutor
.Used when you have to interface with blocking code. Please only use if really needed. The handler is called on different threads, do not share anything or only thread-safe things.
Subclass
libchirp.pool.Chirp
and implementhandler()
.If
libchirp.Config.AUTO_RELEASE
= True the message will be release automatically once the handler returns.Concurrency is achieved by sending multiple messages and waiting for results later. Use
libchirp.MessageBase.identity
as key to a dict, to match-up requests and answers.**kwargs are passed to ThreadPoolExecutor.
See Exceptions.
Parameters: - loop (libchirp.Loop) – libuv event-loop
- config (libchirp.Config) – chirp config
-
handler
(msg)[source]¶ Called when a message arrives.
Please implement this method.
If you don’t implement this chirp will release the message-slots regardless of the AUTO_RELEASE setting.
Parameters: msg (libchirp.pool.Message) – The message
-
identity
()¶
-
loop
¶ Get the
libchirp.Loop
used by the chirp instance.Return type: Loop
-
map
(fn, *iterables, *, timeout=None, chunksize=1)¶ Returns an iterator equivalent to map(fn, iter).
- Args:
- fn: A callable that will take as many arguments as there are
- passed iterables.
- timeout: The maximum number of seconds to wait. If None, then there
- is no limit on the wait time.
- chunksize: The size of the chunks the iterable will be broken into
- before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.
- Returns:
- An iterator equivalent to: map(func, *iterables) but the calls may be evaluated out-of-order.
- Raises:
- TimeoutError: If the entire result iterator could not be generated
- before the given timeout.
Exception: If fn(*args) raises for any values.
-
request
(msg, auto_release=True)¶ Send a message and wait for an answer.
This method returns a
libchirp.ChirpFuture
.The result() of the will contain the answer to the message. If you don’t intend to consume the result use
send()
instead.By default the message slot used by the response will released. If auto_release is False, you have to release the response-message.
Exceptions, threading and concurrency aspects are the same as
send()
. Issue: If an answer to a request arrives after the timeout it will be delivered at normal message.To wait for the request being sent use
libchirp.ChirpFuture.send_result()
.req = chirp.request(msg) req.send_result() answer = req.result()
Parameters: - msg (MessageThread) – The message to send.
- auto_release (bool) – Release the response (default True)
Return type:
-
send
(msg)¶ Send a message. This method returns a Future.
The result will contain the message that has been sent.
In synchronous-mode the future finishes once the remote has released the message. In asynchronous-mode the future finishes once the message has been passed to the operating-system.
Calling result() can raise the exceptions:
ConnectionError
,TimeoutError
,RuntimeError
,ValueError
,MemoryError
. The exception contains the last error message if any generated by chirp. AlsoException
for unknown errors. See Exceptions.See also Concurrency.
Sending different messages from different threads is thread-safe. Sending the same message twice from different threads will lead to undefined behavior. Sending, waiting for the result, switching the thread synchronized (via queue for example), sending is fine, tough.
Parameters: msg (MessageThread) – The message to send. Return type: concurrent.futures.Future
-
shutdown
(wait=True)¶ Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other methods can be called after this one.
- Args:
- wait: If True then shutdown will not return until all running
- futures have finished executing and the resources used by the executor have been reclaimed.
Message¶
-
class
libchirp.pool.
Message
(cmsg=None)[source]¶ Bases:
libchirp.MessageThread
Chirp message. To answer to message just replace the data and send it.
Note
The underlaying C type is annotated in parens. The properties of the message use asserts to check if the value has the correct type, length, range. You can disable these with python -O.
-
address
¶ Get address.
If the message was received: The address of the remote the message was received from.
If the message will be sent: The address to send the message to.
This allows to reply to messages just by replacing
data()
.Returns: String representation generated by py:class:ipaddress.ip_address. Return type: string
-
has_slot
¶ Return if the message has a slot.
If
libchirp.Config.AUTO_RELEASE
is False, you have to callrelease_slot()
Return type: bool
-
header
¶ Get the header used by upper-layer protocols.
Users should not use it, except if you know what you are doing.
Return type: bytes
-
identity
¶ Get identify the message and answers to it. (uint8_t[16]).
The identity can be used to find answers to a message, since replying to the message won’t change the identity.
If you need to uniquely identify the message, use the identity/serial pair, since the serial will change when replying to messages. (read-only)
Return type: bytes
-
port
¶ Get port. (uint16_t).
If the message was received: The port of the remote the message was received from.
If the message will be sent: The port to send the message to.
This allows to reply to messages just by replacing
data()
.Return type: int
-
release
()¶ Release the internal message-slot. This method returns a Future.
Will also acknowledge the message if the remote requested a acknowledge-message.
The result of the future will be set to (identity, serial) once the message is released. If the message had no slot, the result will be set to None.
Releasing a message from a different thread is thread-safe. Releasing the same message from different threads twice will lead to undefined behavior. Releasing, waiting for the result, switching the thread synchronized (via queue for example), releasing is fine, tough.
Return type: Future
-
release_slot
()¶ Release the internal message-slot. This method returns a Future.
Will also acknowledge the message if the remote requested a acknowledge-message.
The result of the future will be set to (identity, serial) once the message is released. If the message had no slot, the result will be set to None.
Releasing a message from a different thread is thread-safe. Releasing the same message from different threads twice will lead to undefined behavior. Releasing, waiting for the result, switching the thread synchronized (via queue for example), releasing is fine, tough.
Return type: Future
-
remote_identity
¶ Detect the remote instance. (uint8_t[16]).
By default a node’s identity will change on each start of chirp. If multiple peers share state, a change in the remote_identity should trigger a reset of the state. Simply use the remote_identity as key in a dictionary of shared state. (read-only)
Return type: bytes
-