Concurrent data structures¶
CallableGroup¶
-
class
satella.coding.concurrent.
CallableGroup
(gather: bool = True, swallow_exceptions: bool = False)¶ This behaves like a function, but allows to add other functions to call when invoked, eg.
c1 = Callable()
c1.add(foo) c1.add(bar)
c1(2, 3)
Now both foo and bar will be called with arguments (2, 3). Their exceptions will be propagated.
-
add
(callable_: Union[satella.coding.concurrent.callablegroup.CancellableCallback, Callable[[], T]], one_shot: bool = False)¶ Add a callable.
Can be a
CancellableCallback
, in that case methodremove_cancelled()
might be useful.Parameters: - callable – callable
- one_shot – if True, callable will be unregistered after single call
-
has_cancelled_callbacks
¶ Check whether this has any
CancellableCallback
instances and whether any of them was cancelled
-
remove_cancelled
() → None¶ Remove it’s entries that are CancelledCallbacks and that were cancelled
-
CallNoOftenThan¶
-
class
satella.coding.concurrent.
CallNoOftenThan
(interval: float, callable_: Callable)¶ A class that will ensure that calls to given callable are made no more often than some interval.
Even if it’s call is called more often than specified value, the callable just won’t be called and None will be returned.
Parameters: - interval – interval in seconds
- callable – callable to call
parallel_construct¶
-
satella.coding.concurrent.
parallel_construct
(iterable: Iterable[V], function: Callable[[V], Optional[U]], thread_pool: concurrent.futures.thread.ThreadPoolExecutor, span_title: Optional[str] = None) → List[U]¶ Construct a list from executing given function in a thread pool executor.
If opentracing is installed, and tracing is enabled, current span will be passed to child threads.
Parameters: - iterable – iterable to apply
- function – function to apply. If that function returns None, no element will be added
- thread_pool – thread pool to execute
- span_title – span title to create. For each execution a child span will be returned
Returns: list that is the result of parallel application of function on each element
CancellableCallback¶
-
class
satella.coding.concurrent.
CancellableCallback
(callback_fun: Callable)¶ A callback that you can cancel.
Useful for event-driven software that looks through lists of callbacks and determines whether to delete them or further invalidate in some other way.
If called, the function itself won’t be called as well if this was cancelled. In this case a None will be returned instead of the result of callback_fun()
This short circuits __bool__ to return not .cancelled.
Hashable and __eq__-able by identity.
Parameters: callback_fun – function to call Variables: cancelled – whether this callback was cancelled (bool) -
cancel
() → None¶ Cancel this callback.
-
LockedDataset¶
-
class
satella.coding.concurrent.
LockedDataset
¶ A locked dataset. Subclass like
>>> class MyDataset(LockedDataset): >>> def __init__(self): >>> super(MyDataset, self).__init__() >>> with self: >>> self.mydata: str = "lol wut" >>> @LockedDataset.locked >>> def protected(self): >>> self.mydata = "updated atomically"
>>> mds = MyDataset() >>> with mds as md: >>> md.mydata = "modified atomically"
>>> try: >>> with mds(blocking=True, timeout=0.5) as md: >>> md.mydata = "modified atomically" >>> except ResourceLocked: >>> print('Could not update the resource')
If no lock is held, this class that derives from such will raise ResourceNotLocked upon element access while a lock is not being held.
Note that __enter__ will raise WouldWaitMore if timeout was given.
-
static
locked
(blocking=True, timeout=-1) → Callable[[Callable], Callable]¶ Decorator to use for annotating methods that would lock :param blocking: whether to block at all :param timeout: optional timeout. Default, or -1 means “return ASAP”
-
static
PeekableQueue¶
-
class
satella.coding.concurrent.
PeekableQueue
¶ A thread-safe FIFO queue that supports peek()ing for elements.
-
get
(timeout: Optional[float] = None) → T¶ Get an element.
Parameters: timeout – maximum amount of seconds to wait. Default value of None means wait as long as necessary Returns: the item Raises: Empty – queue was empty
-
peek
(timeout: Optional[float] = None) → T¶ Get an element without removing it from the top of the queue.
Parameters: timeout – maximum amount of seconds to wait. Default value of None means wait as long as necessary Returns: the item Raises: WouldWaitMore – timeout has expired
-
put
(item: T) → None¶ Add an element to the queue
Parameters: item – element to add
-
put_many
(items: Sequence[T]) → None¶ Put multiple items
Parameters: items – sequence of items to put
-
qsize
() → int¶ Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block. :return: approximate size of the queue
-
ThreadCollection¶
-
class
satella.coding.concurrent.
ThreadCollection
(threads: Optional[Sequence[threading.Thread]] = None)¶ A collection of threads.
Create like:
>>> class MyThread(Thread): >>> def __init__(self, a): >>> ... >>> tc = ThreadCollection.from_class(MyThread, [2, 4, 5], daemon=True) >>> tc.start() >>> tc.terminate() >>> tc.join()
This also implements iteration (it will return all the threads in the collection) and length check. This also supports + and += operations for all thread collections, threads and iterables of threads.
-
add
(thread: threading.Thread) → satella.coding.concurrent.thread_collection.ThreadCollection¶ Add a thread to the collection
Parameters: thread – thread to add Returns: this thread collection instance
-
append
(thread: threading.Thread) → satella.coding.concurrent.thread_collection.ThreadCollection¶ Alias for
add()
Parameters: thread – thread to add Returns: this thread collection instance
-
daemon
¶ Is any of the threads a daemon?
Also, when used a setter sets daemon attribute.
-
classmethod
from_class
(cls_to_use, iteratable, **kwargs) → satella.coding.concurrent.thread_collection.ThreadCollection¶ Build a thread collection
Parameters: - cls_to_use – class to instantiate with
- iteratable – an iterable with the sole argument to this class
-
classmethod
get_currently_running
(include_main_thread: bool = True) → satella.coding.concurrent.thread_collection.ThreadCollection¶ Get all currently running threads as thread collection
Parameters: include_main_thread – whether to include the main thread Returns: a thread collection representing all currently running threads
-
is_alive
() → bool¶ Is at least one thread alive?
-
join
(timeout: Optional[float] = None) → satella.coding.concurrent.thread_collection.ThreadCollection¶ Join all threads
Parameters: timeout – maximum time in seconds to wait for the threads to terminate. Note that the timeout will be applied to each thread in sequence, so this can block for up to thread_count*timeout seconds. Default value of None means wait as long as it’s necessary. Returns: this thread collection instance Raises: WouldWaitMore – one of the threads failed to terminate
-
start
() → satella.coding.concurrent.thread_collection.ThreadCollection¶ Start all threads
Returns: this thread collection instance
-
terminate
(*args, **kwargs) → satella.coding.concurrent.thread_collection.ThreadCollection¶ Call terminate() on all threads that have this method
Returns: this thread collection instance
-
TerminableThread¶
Note that force=True is not available on PyPy. If an attempt to use it on PyPy is made,
RuntimeError
will be thrown.
Please note that in order to terminate, target Python thread must at least execute some Python code. It means that if it’s hanging on I/O, for example, it won’t be affected.
-
class
satella.coding.concurrent.
TerminableThread
(*args, terminate_on: Union[Type[Exception], Tuple[Type[Exception]], None] = None, **kwargs)¶ Class that will execute something in a loop unless terminated. Use like:
>>> class MeGrimlock(TerminableThread): >>> def loop(self): >>> ... do your operations .. >>> a = MeGrimlock().start() >>> a.terminate().join()
Property to check whether to terminate is stored in self.terminating.
If you decide to override run(), you got to check periodically for self._terminating to become true. If it’s true, then a termination request was received, and the thread should terminate itself. If you decide to use the loop/cleanup interface, you don’t need to do so, because it will be automatically checked for you before each loop() call.
You may also use it as a context manager. Entering the context will start the thread, and exiting it will .terminate().join() it, in the following way:
>>> a = MeGrimlock() >>> with a: >>> ... >>> self.assertFalse(a.is_alive())
If prepare() throws one of the terminate_on exceptions,
loop()
even won’t be called. However,terminate()
will be automatically called then.Same applies for
IntervalTerminableThread
andCPUTimeAwareIntervalTerminableThread
.-
cleanup
()¶ Called after thread non-forced termination, in the thread’s context.
The default implementation does nothing.
-
loop
() → None¶ Run one iteration of the loop. Meant to be overrided. You do not need to override it if you decide to override run() through.
This should block for as long as a single check will take, as termination checks take place between calls.
Note that if it throws one of the exceptions given in terminate_on this thread will terminate cleanly, whereas if it throws something else, the thread will be terminated with a traceback.
-
prepare
() → None¶ This is called before the .loop() looping loop is entered.
This is invoked already in a separate thread.
-
run
() → None¶ Calls self.loop() indefinitely, until terminating condition is met
-
safe_sleep
(interval: float, wake_up_each: float = 2) → None¶ Sleep for interval, waking up each wake_up_each seconds to check if terminating, finish earlier if is terminating.
This will do the right thing when passed a negative interval.
To be invoked only by the thread that’s represented by the object!
Parameters: - interval – Time to sleep in total
- wake_up_each – Amount of seconds to wake up each
Raises: SystemExit – thread is terminating
-
safe_wait_condition
(condition: satella.coding.concurrent.thread.Condition, timeout: Union[str, float], wake_up_each: Union[str, float] = 2, dont_raise: bool = False) → None¶ Wait for a condition, checking periodically if the thread is being terminated.
To be invoked only by the thread that’s represented by the object!
Parameters: - condition – condition to wait on
- timeout – maximum time to wait in seconds. Can be also a time string
- wake_up_each – amount of seconds to wake up each to check for termination. Can be also a time string.
- dont_raise – if set to True,
WouldWaitMore
will not be raised
Raises: - WouldWaitMore – timeout has passed and Condition has not happened
- SystemExit – thread is terminating
-
start
() → satella.coding.concurrent.thread.TerminableThread¶ Start the execution of this thread :return: this thread
-
terminate
(force: bool = False) → satella.coding.concurrent.thread.TerminableThread¶ Signal this thread to terminate.
Forcing, if requested, will be done by injecting a SystemExit exception into target thread, so the thread must acquire GIL. For example, following would not be interruptable:
>>> time.sleep(1000000)
Note that calling force=True on PyPy won’t work, and NotImplementedError will be raised instead.
Parameters: force – Whether to force a quit
Returns: self
Raises: - RuntimeError – when something goes wrong with the underlying Python machinery
- NotImplementedError – force=True was used on PyPy
-
terminating
¶ Returns: Is this thread either alive and trying to terminate or dead and after termination?
-
In order to terminate you can throw SystemExit.
IntervalTerminableThread¶
-
class
satella.coding.concurrent.
IntervalTerminableThread
(seconds: Union[str, float], *args, **kwargs)¶ A TerminableThread that calls .loop() once per x seconds.
If executing .loop() takes more than x seconds, on_overrun() will be called.
Parameters: seconds – time that a single looping through should take in seconds. Can be also a time string. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing. -
loop
() → None¶ Override me!
-
on_overrun
(time_taken: float) → None¶ Called when executing .loop() takes more than x seconds.
Called each cycle.
You are meant to override this, as by default this does nothing.
Parameters: time_taken – how long did calling .loop() take
-
run
()¶ Calls self.loop() indefinitely, until terminating condition is met
-
BogusTerminableThread¶
-
class
satella.coding.concurrent.
BogusTerminableThread
¶ A mock object that implements threading interface but does nothing
Variables: - started – bool, if it’s running
- terminated – bool, if terminated
- daemon – bool, if daemon
-
is_alive
() → bool¶ Returns: if this thread is alive
-
join
(timeout=None) → None¶ Wait for the pseudo-thread. Sets running to False if thread was terminated.
Parameters: timeout – maximum number of seconds to wait for termination
Raises: - WouldWaitMore – thread did not terminate within that many seconds
- RuntimeError – tried to join() before start()!
-
start
() → None¶ Set running to True :raises RuntimeError: thread already terminated or already running
call_in_separate_thread¶
-
satella.coding.concurrent.
call_in_separate_thread
(*t_args, no_thread_attribute: bool = False, delay: float = 0, **t_kwargs)¶ Decorator to mark given routine as callable in a separate thread.
The decorated routine will return a Future that is waitable to get the result (or the exception) of the function.
The returned Future will have an extra attribute, “thread” that is thread that was spawned for it. The returned thread will in turn have an attribute “future” that links to this future.
Warning
calling this will cause reference loops, so don’t use it if you’ve disabled Python GC, or in that case enable the no_thread_attribute argument
The arguments given here will be passed to thread’s constructor, so use like:
Parameters: - no_thread_attribute – if set to True, future won’t have a link returned to it’s thread. The thread will have attribute of “future” anyways.
- delay – seconds to wait before launching function
>>> @call_in_separate_thread(daemon=True) >>> def handle_messages(): >>> while True: >>> ...
SingleStartThread¶
-
class
satella.coding.concurrent.
SingleStartThread
(*args, **kwargs)¶ A thread that keeps track of whether it’s .start() method was called, and does nothing if it’s called second or so time.
-
start
() → satella.coding.concurrent.thread.SingleStartThread¶ No-op when called second or so time. The first time it starts the thread.
Returns: self
-
SequentialIssuer¶
-
class
satella.coding.concurrent.
SequentialIssuer
(start: int = 0)¶ A classs that issues an monotonically increasing value.
Parameters: start – start issuing IDs from this value Variables: start – next value to be issued -
issue
() → int¶ Just issue a next identifier
Returns: a next identifier
-
no_less_than
(no_less_than: int) → int¶ Issue an int, which is no less than a given value
Parameters: no_less_than – value that the returned id will not be less than this Returns: an identifier, no less than no_less_than
-
CPManager¶
-
class
satella.coding.resources.
CPManager
(max_number: int, max_cycle_no: int)¶ A thread-safe no-hassle connection-pool manager.
Extend this class to build your own connection pool managers.
This supports automatic connection recycling, connection will be cycled each max_cycle_no takings and deposits.
Note that you have to overload
teardown_connection()
andcreate_connection()
.You obtain a connection by using
acquire_connection()
. If it fails you should mark it as such usingfail_connection()
. In all cases you have to return it usingrelease_connection()
.Parameters: - max_number – maximum number of connections
- max_cycle_no – maximum number of get/put connection cycles.
Variables: max_number – maximum amount of connections. Can be changed during runtime
Warning
May not work under PyPy for reasons having to do with id’s semantics. A RuntimeWarning will be issued when not running under CPython.
-
acquire_connection
() → T¶ Either acquire a new connection, or just establish it in the background
Returns: a new connection: Raises: RuntimeError – CPManager is terminating!
-
close
() → None¶ Check if the resource needs cleanup, and clean up this resource.
Use like this:
>>> class MyClose(Closeable): >>> def close(self): >>> if super().close(): >>> .. clean up ..
Returns: whether the cleanup should proceed Raises: RuntimeError – the constructor was not invoked
-
create_connection
() → T¶ Create a new connection.
Is safe to block.
Returns: a new connection instance
-
fail_connection
(connection: T) → None¶ Signal that a given connection has been failed
Parameters: connection – connection to fail
-
invalidate
() → None¶ Close all connections. Connections have to be released first. Object is ready for use after this
-
release_connection
(connection: T) → None¶ Release a connection
Parameters: connection – connection to release
-
teardown_connection
(connection: T) → None¶ Close the connection.
Is safe to block.
Parameters: connection – connection to tear down
IDAllocator¶
-
class
satella.coding.concurrent.
IDAllocator
(start_at: int = 0, top_limit: Optional[int] = None)¶ Reusable ID allocator
You can use it to requisition ints from a pool, and then free their ints, permitting them to be reused.
Thread-safe.
Parameters: - start_at – the lowest integer that the allocator will return
- top_limit – the maximum value that will not be allocated. If used,
subsequent calls to
allocate_int()
will raiseEmpty
-
allocate_int
() → int¶ Return a previously unallocated int, and mark it as allocated
Returns: an allocated int Raises: Empty – could not allocate an int due to top limit
-
mark_as_allocated
(x: int)¶ Mark given x as allocated
Parameters: x – x to mark as allocated
Raises: - AlreadyAllocated – x was already allocated
- ValueError – x is less than start_at
-
mark_as_free
(x: int)¶ Mark x as free
Parameters: x – int to free Raises: ValueError – x was not allocated or less than start_at
Monitor¶
A monitor is a Java-like synchronization idea. Inheriting from Monitor outfits the class with a Lock (or a reentrant lock, if RMonitor is used), that can be used to coordinate access to some shared resource.
Take care to invoke Monitor’s constructor when inheriting, or this won’t work.
You can decorate your methods with Monitor.synchronized to have them execute with the lock acquired. If you have such a method, you can also temporarily release the lock using context manager Monitor.release (it will be reacquired) when context manager is exited.
You can also use manual synchronization with context manager Monitor.acquire.
from satella.coding import Monitor
class MyProtectedClass(Monitor):
def __init__(self, *args):
super(Monitor, self).__init__()
@Monitor.synchronized
def synchronized(self):
pass # everything here is executed with class lock acquired
@Monitor.synchronized
def temporary_release(self):
pass # lock is acquired here
with Monitor.release(self):
pass # lock is NOT ACQUIRED here
pass # and here it's reacquired again
def manual_sync(self):
pass # not synchronized
with Monitor.acquire(self):
pass # synchronized
You can also use Monitor.release and Monitor.acquire with other objects than self, but exercise caution and think over the consequences.
-
class
satella.coding.concurrent.
Monitor
¶ Base utility class for creating monitors (the synchronization thingies!)
These are NOT re-entrant!
Use it like that:
>>> class MyProtectedObject(Monitor): >>> def __init__(self, *args, **kwargs): >>> Monitor.__init__(self) >>> ... do your job ..
>>> @Monitor.synchronized >>> def function_that_needs_mutual_exclusion(self): >>> .. do your threadsafe jobs ..
>>> def function_that_partially_needs_protection(self): >>> .. do your jobs .. >>> with Monitor.acquire(self): >>> .. do your threadsafe jobs .. >>> .. do your jobs .. >>> with self: >>> .. do your threadsafe jobs ..
-
class
acquire
(foo: satella.coding.concurrent.monitor.Monitor)¶ Returns a context manager object that can lock another object, as long as that object is a monitor.
Consider foo, which is a monitor. If you needed to lock it from outside, you would do:
>>> with Monitor.acquire(foo): >>> .. do operations on foo that need mutual exclusion ..
-
class
release
(foo: satella.coding.concurrent.monitor.Monitor)¶ Returns a context manager object that can release another object as long as that object is a monitor.
Consider foo, which is a monitor. You have a protected function, but you feel that you can release it for a while as it would improve parallelism. You can use it as such:
>>> @Monitor.synchronized >>> def protected_function(self): >>> .. do some stuff that needs mutual exclusion .. >>> with Monitor.release(self): >>> .. do some I/O that does not need mutual exclusion .. >>> .. back to protected stuff ..
-
classmethod
synchronize_on
(monitor: satella.coding.concurrent.monitor.Monitor) → Callable[[Callable], Callable]¶ A decorator for locking on non-self Monitor objects
Use it like:
>>> class MasterClass(Monitor): >>> def get_object(self): >>> class SlaveClass: >>> @Monitor.synchronize_on(self) >>> def get_object(self2): >>> ... >>> return SlaveClass
-
static
synchronize_on_attribute
(attr_name: str)¶ When a Monitor is an attribute of a class, and you have a method instance that you would like secure by acquiring that monitor, use this.
The first argument taken by that method instance must be self.
Parameters: attr_name – name of the attribute that is the monitor
-
static
synchronized
(fun: Callable) → Callable¶ This is a decorator. Class method decorated with that will lock the global lock of given instance, making it threadsafe. Depending on usage pattern of your class and it’s data semantics, your performance may vary
-
class
-
class
satella.coding.concurrent.
RMonitor
¶ Monitor, but using an reentrant lock instead of a normal one
Additionally, following types are predefined for your convenience:
-
class
satella.coding.concurrent.
MonitorList
(*args)¶ A list that is also a monitor.
Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice
-
class
satella.coding.concurrent.
MonitorDict
(*args, **kwargs)¶ A dict that is also a monitor.
Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice
-
class
satella.coding.concurrent.
MonitorSet
(*args)¶ A set that allows atomic insert-if-not-already-there operation
LockedStructure¶
A proxy to an object along with a lock, that can be triggered using the context manager:
a = {1:2, 3:4}
ls = LockedStructure(a)
assert len(ls), 2
with ls:
a[4] = 5
-
class
satella.coding.concurrent.
LockedStructure
(obj_to_wrap: T, lock: Optional[_thread.allocate_lock] = None)¶ A wizard to make every Python structure thread-safe.
It wraps obj_to_wrap, passing on all calls, settings and so on to the object wrapper, from lock exposing only the context manager protocol.
Example:
>>> locked_dict = LockedStructure(dict) >>> with locked_dict: >>> locked_dict[5] = 2
Also, please note that operations such as addition will strip this object of being a locked structure, ie. they will return object that participated in locked structure plus some other.
Note that in-place operations return the locked structure.
AtomicNumber¶
-
class
satella.coding.concurrent.
AtomicNumber
(v: Union[int, float] = 0)¶ An atomic number. Note that the class is not hashable and for a reason, since it’s value might change in time. So in this case this is more of like a container for numbers.
Treat it like a normal number, except all operations are executed atomically.
You can also wait for it to change it’s value, via wait().
You change it’s value in the following way:
>>> a = AtomicNumber() >>> a += 2
Note that if the number if used in an expression, such as
>>> b = a + 2
Then a normal number will be returned
-
wait
(timeout: Optional[float] = None, throw_exception: bool = True)¶ Block until the atomic number changes it’s value.
Parameters: - timeout – maximum time to wait. None means wait indefinitely
- throw_exception – whether to throw WouldWaitMore on timeout
Raises: WouldWaitMore – the value hasn’t changed within the timeout
-
wait_until_equal
(v: Union[int, float], timeout: Optional[float] = None) → None¶ Wait until the value of this number equals v.
Parameters: - v – value to compare this number against
- timeout – maximum time to wait
Raises: WouldWaitMore – timeout expired without the value becoming equal to target
-
FutureCollection¶
-
class
satella.coding.concurrent.
FutureCollection
(futures: Sequence[concurrent.futures._base.Future] = ())¶ A set of futures sharing a common result, or a common exception.
This overloads the operator + for making an union of futures. It can be used with either instances of
FutureCollection
or normal futures.Also supports the indexing operator to get n-th future.
-
add
(future: concurrent.futures._base.Future) → satella.coding.concurrent.futures.collection.FutureCollection¶ Add a future
Parameters: future – a Future to add Returns: self
-
add_done_callback
(callback, only_one: bool = False) → None¶ Add a callback to a Future to be called on it’s completion.
By default, this will add the callback to all futures.
Parameters: - callback – callback that takes the completed Future as argument
- only_one – callback will be added only to a single Future. False by default
Raises: IndexError – only_one was given and no Futures in collection!
-
cancel
() → bool¶ Cancel all futures
Returns: True if all sections were cancelled
-
exception
(timeout: Optional[float] = None) → Optional[Exception]¶ Return first exception raised by any of the futures
This will block until the results are available. This call proceeding does not mean that results for all are available, since this will return the first exception encountered!
Parameters: timeout – a timeout in seconds for a single result. Default value None means wait as long as necessary Returns: the first exception, or None if there were no exceptions Raises: WouldWaitMore – timeout while waiting for result
-
result
(timeout: Optional[float] = None) → list¶ Return the result of all futures, as a list.
This will block until the results are available.
Parameters: timeout – a timeout in seconds for a single result. Default value None means wait as long as necessary Returns: list containing results of all futures Raises: WouldWaitMore – timeout while waiting for result
-
set_exception
(exc) → None¶ Set an exception for all futures
Parameters: exc – exception instance to set
-
set_result
(result) → None¶ Set a result for all futures
Parameters: result – result to set
-
set_running_or_notify_cancel
() → bool¶ Call
set_running_or_notify_cancel
on the futuresThis will return True if at least one future was not cancelled
-
Condition¶
A simplified version of threading.Condition. Doesn’t require you to acquire it in order to be notified. Just a bunch of syntactic sugar.
-
class
satella.coding.concurrent.
Condition
(lock=None)¶ A wrapper to faciliate easier usage of Pythons’ threading.Condition.
There’s no need to acquire the underlying lock, as wait/notify/notify_all do it for you.
This happens to sorta not work on PyPy. Use at your own peril. You have been warned.
-
notify
(n: int = 1) → None¶ Notify n threads waiting on this Condition
Parameters: n – amount of threads to notify
-
notifyAll
() → None¶ Deprecated alias for notify_all
Deprecated since version 2.14.22.
-
notify_all
() → None¶ Notify all threads waiting on this Condition
-
wait
(timeout: Union[str, float, None] = None, dont_raise: bool = False) → None¶ Wait for condition to become true.
Parameters: - timeout – timeout to wait. None is default and means infinity. Can be also a time string.
- dont_raise – if True, then WouldWaitMore won’t be raised
Raises: - ResourceLocked – unable to acquire the underlying lock within specified timeout.
- WouldWaitMore – wait’s timeout has expired
-
Timer¶
-
class
satella.coding.concurrent.
Timer
(interval: Union[str, float], function, args=None, kwargs=None, spawn_separate=False)¶ A copy of threading.Timer but all objects are backed and waited upon in a single thread. They can be executed either in background monitor’s thread or a separate thread can be spawned for them.
There might be up to a second of delay before the timer is picked up.
If spawn_separate is False, exceptions will be logged
Parameters: - interval – amount of seconds that should elapsed between calling start() and function executing. Can be also a time string.
- function – function to execute
- args – argument for function
- kwargs – kwargs for function
- spawn_separate – whether to call the function in a separate thread
-
cancel
() → None¶ Do not execute this timer
-
start
() → None¶ Order this timer task to be executed in interval seconds
Functions and decorators¶
parallel_execute¶
For executing those functions that return a Future in parallel.
parallel_execute will return you an iterator, returning the result (or raising an exception) for every result you get.
-
satella.coding.concurrent.
parallel_execute
(callable_: Callable[[T], concurrent.futures._base.Future], args: Iterable[T], kwargs: Iterable[dict] = <generator object infinite_iterator>)¶ Execute a number of calls to callable in parallel.
Callable must be a function that accepts arguments and returns a plain Python future.
Return will be an iterator that will yield every value of the iterator, or return an instance of exception, if any of the calls excepted.
Parameters: - callable – a callable that returns futures
- args – an iterable of arguments to provide to the callable
- kwargs – an iterable of keyword arguments to provide to the callable
Returns: an iterator yielding every value (or exception instance if thew) of the future
run_as_future¶
-
satella.coding.concurrent.
run_as_future
(fun)¶ A decorator that accepts a function that should be executed in a separate thread, and a Future returned instead of it’s result, that will enable to watch the function for completion.
The created thread will be non-demonic
Example usage:
>>> @run_as_future >>> def parse_a_file(x: str): >>> ... >>> fut = parse_a_file('test.txt') >>> result = fut.result()
sync_threadpool¶
-
satella.coding.concurrent.
sync_threadpool
(tpe: Union[satella.coding.concurrent.futures.wrapped_executor.ExecutorWrapper, concurrent.futures.thread.ThreadPoolExecutor], max_wait: Optional[float] = None) → None¶ Make sure that every thread of given thread pool executor is done processing jobs scheduled until this moment.
Make sure that other tasks do not submit anything to this thread pool executor.
Parameters: - tpe – thread pool executor to sync. Can be also a ExecutorWrapper.
- max_wait – maximum time to wait. Default, None, means wait forever
Raises: WouldWaitMore – timeout exceeded. Raised only when max_wait is not None.