Concurrent data structures¶
DeferredValue¶
- class satella.coding.concurrent.DeferredValue¶
A class that allows you to pass arguments that will be available later during runtime.
Usage:
>>> def thread1(value): >>> print(value.value())
>>> val = DeferredValue() >>> threading.Thread(target=thread1, args=(val, )).start() >>> time.sleep(10) >>> val.set_value(3)
- set_value(va: T) None ¶
Set a value and wake up all the threads waiting on it.
- Parameters:
va – value to set
- Raises:
ValueError – value is already set
- value(timeout: float | None = None) T ¶
Wait until value is available, and return it.
- Parameters:
timeout – number of seconds to wait. If None is given, this will take as long as necessary.
- Returns:
a value
- Raises:
WouldWaitMore – timeout was given and it has expired
CallableGroup¶
- class satella.coding.concurrent.CallableGroup(gather: bool = True, swallow_exceptions: bool = False)¶
This behaves like a function, but allows to add other functions to call when invoked, eg.
c1 = Callable()
c1.add(foo) c1.add(bar)
c1(2, 3)
Now both foo and bar will be called with arguments (2, 3). Their exceptions will be propagated.
- add(callable_: CancellableCallback | Callable[[], T], one_shot: bool = False)¶
Add a callable.
Can be a
CancellableCallback
, in that case methodremove_cancelled()
might be useful.- Parameters:
callable – callable
one_shot – if True, callable will be unregistered after single call
- property has_cancelled_callbacks: bool¶
Check whether this has any
CancellableCallback
instances and whether any of them was cancelled
- remove_cancelled() None ¶
Remove it’s entries that are CancelledCallbacks and that were cancelled
CallNoOftenThan¶
- class satella.coding.concurrent.CallNoOftenThan(interval: float, callable_: Callable)¶
A class that will ensure that calls to given callable are made no more often than some interval.
Even if it’s call is called more often than specified value, the callable just won’t be called and None will be returned.
- Parameters:
interval – interval in seconds
callable – callable to call
parallel_construct¶
- satella.coding.concurrent.parallel_construct(iterable: Iterable[V], function: Callable[[V], U | None], thread_pool: ThreadPoolExecutor, span_title: str | None = None) List[U] ¶
Construct a list from executing given function in a thread pool executor.
If opentracing is installed, and tracing is enabled, current span will be passed to child threads.
- Parameters:
iterable – iterable to apply
function – function to apply. If that function returns None, no element will be added
thread_pool – thread pool to execute
span_title – span title to create. For each execution a child span will be returned
- Returns:
list that is the result of parallel application of function on each element
CancellableCallback¶
- class satella.coding.concurrent.CancellableCallback(callback_fun: Callable)¶
A callback that you can cancel.
Useful for event-driven software that looks through lists of callbacks and determines whether to delete them or further invalidate in some other way.
If called, the function itself won’t be called as well if this was cancelled. In this case a None will be returned instead of the result of callback_fun()
This short circuits __bool__ to return not .cancelled.
Hashable and __eq__-able by identity.
- Parameters:
callback_fun – function to call
- Variables:
cancelled – whether this callback was cancelled (bool)
- cancel() None ¶
Cancel this callback.
LockedDataset¶
- class satella.coding.concurrent.LockedDataset¶
A locked dataset. Subclass like
>>> class MyDataset(LockedDataset): >>> def __init__(self): >>> super(MyDataset, self).__init__() >>> with self: >>> self.mydata: str = "lol wut" >>> @LockedDataset.locked >>> def protected(self): >>> self.mydata = "updated atomically"
>>> mds = MyDataset() >>> with mds as md: >>> md.mydata = "modified atomically"
>>> try: >>> with mds(blocking=True, timeout=0.5) as md: >>> md.mydata = "modified atomically" >>> except ResourceLocked: >>> print('Could not update the resource')
If no lock is held, this class that derives from such will raise ResourceNotLocked upon element access while a lock is not being held.
Note that __enter__ will raise WouldWaitMore if timeout was given.
- static locked(blocking=True, timeout=-1) Callable[[Callable], Callable] ¶
Decorator to use for annotating methods that would lock :param blocking: whether to block at all :param timeout: optional timeout. Default, or -1 means “return ASAP”
PeekableQueue¶
- class satella.coding.concurrent.PeekableQueue¶
A thread-safe FIFO queue that supports peek()ing for elements.
- get(timeout: float | None = None) T ¶
Get an element.
- Parameters:
timeout – maximum amount of seconds to wait. Default value of None means wait as long as necessary
- Returns:
the item
- Raises:
Empty – queue was empty
- peek(timeout: float | None = None) T ¶
Get an element without removing it from the top of the queue.
- Parameters:
timeout – maximum amount of seconds to wait. Default value of None means wait as long as necessary
- Returns:
the item
- Raises:
WouldWaitMore – timeout has expired
- put(item: T) None ¶
Add an element to the queue
- Parameters:
item – element to add
- put_many(items: Sequence[T]) None ¶
Put multiple items
- Parameters:
items – sequence of items to put
- qsize() int ¶
Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block. :return: approximate size of the queue
ThreadCollection¶
- class satella.coding.concurrent.ThreadCollection(threads: Sequence[Thread] | None = None)¶
A collection of threads.
Create like:
>>> class MyThread(Thread): >>> def __init__(self, a): >>> ... >>> tc = ThreadCollection.from_class(MyThread, [2, 4, 5], daemon=True) >>> tc.start() >>> tc.terminate() >>> tc.join()
This also implements iteration (it will return all the threads in the collection) and length check. This also supports + and += operations for all thread collections, threads and iterables of threads.
- add(thread: Thread) ThreadCollection ¶
Add a thread to the collection
- Parameters:
thread – thread to add
- Returns:
this thread collection instance
- append(thread: Thread) ThreadCollection ¶
Alias for
add()
- Parameters:
thread – thread to add
- Returns:
this thread collection instance
- property daemon: bool¶
Is any of the threads a daemon?
Also, when used a setter sets daemon attribute.
- classmethod from_class(cls_to_use, iteratable, **kwargs) ThreadCollection ¶
Build a thread collection
- Parameters:
cls_to_use – class to instantiate with
iteratable – an iterable with the sole argument to this class
- classmethod get_currently_running(include_main_thread: bool = True) ThreadCollection ¶
Get all currently running threads as thread collection
- Parameters:
include_main_thread – whether to include the main thread
- Returns:
a thread collection representing all currently running threads
- is_alive() bool ¶
Is at least one thread alive?
- join(timeout: float | None = None) ThreadCollection ¶
Join all threads
- Parameters:
timeout – maximum time in seconds to wait for the threads to terminate. Note that the timeout will be applied to each thread in sequence, so this can block for up to thread_count*timeout seconds. Default value of None means wait as long as it’s necessary.
- Returns:
this thread collection instance
- Raises:
WouldWaitMore – one of the threads failed to terminate
- start() ThreadCollection ¶
Start all threads
- Returns:
this thread collection instance
- terminate(*args, **kwargs) ThreadCollection ¶
Call terminate() on all threads that have this method
- Returns:
this thread collection instance
TerminableThread¶
Note that force=True is not available on PyPy. If an attempt to use it on PyPy is made,
RuntimeError
will be thrown.
Please note that in order to terminate, target Python thread must at least execute some Python code. It means that if it’s hanging on I/O, for example, it won’t be affected.
- class satella.coding.concurrent.TerminableThread(*args, terminate_on: Type[Exception] | Tuple[Type[Exception]] | None = None, **kwargs)¶
Class that will execute something in a loop unless terminated. Use like:
>>> class MeGrimlock(TerminableThread): >>> def loop(self): >>> ... do your operations .. >>> a = MeGrimlock().start() >>> a.terminate().join()
Property to check whether to terminate is stored in self.terminating.
If you decide to override run(), you got to check periodically for self._terminating to become true. If it’s true, then a termination request was received, and the thread should terminate itself. If you decide to use the loop/cleanup interface, you don’t need to do so, because it will be automatically checked for you before each loop() call.
You may also use it as a context manager. Entering the context will start the thread, and exiting it will .terminate().join() it, in the following way:
>>> a = MeGrimlock() >>> with a: >>> ... >>> self.assertFalse(a.is_alive())
If prepare() throws one of the terminate_on exceptions,
loop()
even won’t be called. However,terminate()
will be automatically called then.Same applies for
IntervalTerminableThread
andCPUTimeAwareIntervalTerminableThread
.- cleanup()¶
Called after thread non-forced termination, in the thread’s context.
The default implementation does nothing.
- loop() None ¶
Run one iteration of the loop. Meant to be overrided. You do not need to override it if you decide to override run() through.
This should block for as long as a single check will take, as termination checks take place between calls.
Note that if it throws one of the exceptions given in terminate_on this thread will terminate cleanly, whereas if it throws something else, the thread will be terminated with a traceback.
- prepare() None ¶
This is called before the .loop() looping loop is entered.
This is invoked already in a separate thread.
- run() None ¶
Calls self.loop() indefinitely, until terminating condition is met
- safe_sleep(interval: float, wake_up_each: float = 2) None ¶
Sleep for interval, waking up each wake_up_each seconds to check if terminating, finish earlier if is terminating.
This will do the right thing when passed a negative interval.
To be invoked only by the thread that’s represented by the object!
- Parameters:
interval – Time to sleep in total
wake_up_each – Amount of seconds to wake up each
- Raises:
SystemExit – thread is terminating
- safe_wait_condition(condition: Condition, timeout: str | float, wake_up_each: str | float = 2, dont_raise: bool = False) None ¶
Wait for a condition, checking periodically if the thread is being terminated.
To be invoked only by the thread that’s represented by the object!
- Parameters:
condition – condition to wait on
timeout – maximum time to wait in seconds. Can be also a time string
wake_up_each – amount of seconds to wake up each to check for termination. Can be also a time string.
dont_raise – if set to True,
WouldWaitMore
will not be raised
- Raises:
WouldWaitMore – timeout has passed and Condition has not happened
SystemExit – thread is terminating
- start() TerminableThread ¶
Start the execution of this thread :return: this thread
- terminate(force: bool = False) TerminableThread ¶
Signal this thread to terminate.
Forcing, if requested, will be done by injecting a SystemExit exception into target thread, so the thread must acquire GIL. For example, following would not be interruptable:
>>> time.sleep(1000000)
Note that calling force=True on PyPy won’t work, and NotImplementedError will be raised instead.
- Parameters:
force – Whether to force a quit
- Returns:
self
- Raises:
RuntimeError – when something goes wrong with the underlying Python machinery
NotImplementedError – force=True was used on PyPy
- property terminating: bool¶
- Returns:
Is this thread either alive and trying to terminate or dead and after termination?
In order to terminate you can throw SystemExit.
IntervalTerminableThread¶
- class satella.coding.concurrent.IntervalTerminableThread(seconds: str | float, *args, **kwargs)¶
A TerminableThread that calls .loop() once per x seconds, taking into account the length of
loop()
runtime.If executing .loop() takes more than x seconds, on_overrun() will be called. If executing .process() takes more than x seconds, it will be called immediately after it returns (and
on_overrun()
executes)- Parameters:
seconds – time that a single looping through should take in seconds. Can be also a time string. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing.
- abstract loop() None ¶
Override me!
- on_overrun(time_taken: float) None ¶
Called when executing .loop() takes more than x seconds.
Called each cycle.
You are meant to override this, as by default this does nothing.
- Parameters:
time_taken – how long did calling .loop() take
- run()¶
Calls self.loop() indefinitely, until terminating condition is met
BogusTerminableThread¶
- class satella.coding.concurrent.BogusTerminableThread¶
A mock object that implements threading interface but does nothing
- Variables:
started – bool, if it’s running
terminated – bool, if terminated
daemon – bool, if daemon
- is_alive() bool ¶
- Returns:
if this thread is alive
- join(timeout=None) None ¶
Wait for the pseudo-thread. Sets running to False if thread was terminated.
- Parameters:
timeout – maximum number of seconds to wait for termination
- Raises:
WouldWaitMore – thread did not terminate within that many seconds
RuntimeError – tried to join() before start()!
- start() None ¶
Set running to True :raises RuntimeError: thread already terminated or already running
call_in_separate_thread¶
- satella.coding.concurrent.call_in_separate_thread(*t_args, no_thread_attribute: bool = False, delay: float = 0, **t_kwargs)¶
Decorator to mark given routine as callable in a separate thread.
The decorated routine will return a Future that is waitable to get the result (or the exception) of the function.
The returned Future will have an extra attribute, “thread” that is thread that was spawned for it. The returned thread will in turn have an attribute “future” that links to this future.
Warning
calling this will cause reference loops, so don’t use it if you’ve disabled Python GC, or in that case enable the no_thread_attribute argument
The arguments given here will be passed to thread’s constructor, so use like:
- Parameters:
no_thread_attribute – if set to True, future won’t have a link returned to it’s thread. The thread will have attribute of “future” anyways.
delay – seconds to wait before launching function
>>> @call_in_separate_thread(daemon=True) >>> def handle_messages(): >>> while True: >>> ...
SingleStartThread¶
- class satella.coding.concurrent.SingleStartThread(*args, **kwargs)¶
A thread that keeps track of whether it’s .start() method was called, and does nothing if it’s called second or so time.
- start() SingleStartThread ¶
No-op when called second or so time. The first time it starts the thread.
- Returns:
self
SequentialIssuer¶
- class satella.coding.concurrent.SequentialIssuer(start: int = 0)¶
A classs that issues an monotonically increasing value.
- Parameters:
start – start issuing IDs from this value
- Variables:
start – next value to be issued
- issue() int ¶
Just issue a next identifier
- Returns:
a next identifier
- no_less_than(no_less_than: int) int ¶
Issue an int, which is no less than a given value
- Parameters:
no_less_than – value that the returned id will not be less than this
- Returns:
an identifier, no less than no_less_than
CPManager¶
- class satella.coding.resources.CPManager(max_number: int, max_cycle_no: int)¶
A thread-safe no-hassle connection-pool manager.
Extend this class to build your own connection pool managers.
This supports automatic connection recycling, connection will be cycled each max_cycle_no takings and deposits.
Note that you have to overload
teardown_connection()
andcreate_connection()
.You obtain a connection by using
acquire_connection()
. If it fails you should mark it as such usingfail_connection()
. In all cases you have to return it usingrelease_connection()
.- Parameters:
max_number – maximum number of connections
max_cycle_no – maximum number of get/put connection cycles.
- Variables:
max_number – maximum amount of connections. Can be changed during runtime
Warning
May not work under PyPy for reasons having to do with id’s semantics. A RuntimeWarning will be issued when not running under CPython.
- acquire_connection() T ¶
Either acquire a new connection, or just establish it in the background
- Returns:
a new connection:
- Raises:
RuntimeError – CPManager is terminating!
- close() None ¶
Check if the resource needs cleanup, and clean up this resource.
Use like this:
>>> class MyClose(Closeable): >>> def close(self): >>> if super().close(): >>> .. clean up ..
- Returns:
whether the cleanup should proceed
- Raises:
RuntimeError – the constructor was not invoked
- abstract create_connection() T ¶
Create a new connection.
Is safe to block.
- Returns:
a new connection instance
- fail_connection(connection: T) None ¶
Signal that a given connection has been failed
- Parameters:
connection – connection to fail
- invalidate() None ¶
Close all connections. Connections have to be released first. Object is ready for use after this
- release_connection(connection: T) None ¶
Release a connection
- Parameters:
connection – connection to release
- abstract teardown_connection(connection: T) None ¶
Close the connection.
Is safe to block.
- Parameters:
connection – connection to tear down
IDAllocator¶
- class satella.coding.concurrent.IDAllocator(start_at: int = 0, top_limit: int | None = None)¶
Reusable ID allocator
You can use it to requisition ints from a pool, and then free their ints, permitting them to be reused.
Thread-safe.
- Parameters:
start_at – the lowest integer that the allocator will return
top_limit – the maximum value that will not be allocated. If used, subsequent calls to
allocate_int()
will raiseEmpty
- allocate_int() int ¶
Return a previously unallocated int, and mark it as allocated
- Returns:
an allocated int
- Raises:
Empty – could not allocate an int due to top limit
- mark_as_allocated(x: int)¶
Mark given x as allocated
- Parameters:
x – x to mark as allocated
- Raises:
AlreadyAllocated – x was already allocated
ValueError – x is less than start_at
- mark_as_free(x: int)¶
Mark x as free
- Parameters:
x – int to free
- Raises:
ValueError – x was not allocated or less than start_at
Monitor¶
A monitor is a Java-like synchronization idea. Inheriting from Monitor outfits the class with a Lock (or a reentrant lock, if RMonitor is used), that can be used to coordinate access to some shared resource.
Take care to invoke Monitor’s constructor when inheriting, or this won’t work.
You can decorate your methods with Monitor.synchronized to have them execute with the lock acquired. If you have such a method, you can also temporarily release the lock using context manager Monitor.release (it will be reacquired) when context manager is exited.
You can also use manual synchronization with context manager Monitor.acquire.
from satella.coding import Monitor
class MyProtectedClass(Monitor):
def __init__(self, *args):
super(Monitor, self).__init__()
@Monitor.synchronized
def synchronized(self):
pass # everything here is executed with class lock acquired
@Monitor.synchronized
def temporary_release(self):
pass # lock is acquired here
with Monitor.release(self):
pass # lock is NOT ACQUIRED here
pass # and here it's reacquired again
def manual_sync(self):
pass # not synchronized
with Monitor.acquire(self):
pass # synchronized
You can also use Monitor.release and Monitor.acquire with other objects than self, but exercise caution and think over the consequences.
- class satella.coding.concurrent.Monitor¶
Base utility class for creating monitors (the synchronization thingies!)
These are NOT re-entrant!
Use it like that:
>>> class MyProtectedObject(Monitor): >>> def __init__(self, *args, **kwargs): >>> Monitor.__init__(self) >>> ... do your job ..
>>> @Monitor.synchronized >>> def function_that_needs_mutual_exclusion(self): >>> .. do your threadsafe jobs ..
>>> def function_that_partially_needs_protection(self): >>> .. do your jobs .. >>> with Monitor.acquire(self): >>> .. do your threadsafe jobs .. >>> .. do your jobs .. >>> with self: >>> .. do your threadsafe jobs ..
- class acquire(foo: Monitor)¶
Returns a context manager object that can lock another object, as long as that object is a monitor.
Consider foo, which is a monitor. If you needed to lock it from outside, you would do:
>>> with Monitor.acquire(foo): >>> .. do operations on foo that need mutual exclusion ..
- class release(foo: Monitor)¶
Returns a context manager object that can release another object as long as that object is a monitor.
Consider foo, which is a monitor. You have a protected function, but you feel that you can release it for a while as it would improve parallelism. You can use it as such:
>>> @Monitor.synchronized >>> def protected_function(self): >>> .. do some stuff that needs mutual exclusion .. >>> with Monitor.release(self): >>> .. do some I/O that does not need mutual exclusion .. >>> .. back to protected stuff ..
- classmethod synchronize_on(monitor: Monitor) Callable[[Callable], Callable] ¶
A decorator for locking on non-self Monitor objects
Use it like:
>>> class MasterClass(Monitor): >>> def get_object(self): >>> class SlaveClass: >>> @Monitor.synchronize_on(self) >>> def get_object(self2): >>> ... >>> return SlaveClass
- static synchronize_on_attribute(attr_name: str)¶
When a Monitor is an attribute of a class, and you have a method instance that you would like secure by acquiring that monitor, use this.
The first argument taken by that method instance must be self.
- Parameters:
attr_name – name of the attribute that is the monitor
- static synchronized(fun: Callable) Callable ¶
This is a decorator. Class method decorated with that will lock the global lock of given instance, making it threadsafe. Depending on usage pattern of your class and it’s data semantics, your performance may vary
- class satella.coding.concurrent.RMonitor¶
Monitor, but using an reentrant lock instead of a normal one
Additionally, following types are predefined for your convenience:
- class satella.coding.concurrent.MonitorList(*args)¶
A list that is also a monitor.
Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice
- class satella.coding.concurrent.MonitorDict(*args, **kwargs)¶
A dict that is also a monitor.
Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice
- class satella.coding.concurrent.MonitorSet(*args)¶
A set that allows atomic insert-if-not-already-there operation
LockedStructure¶
A proxy to an object along with a lock, that can be triggered using the context manager:
a = {1:2, 3:4}
ls = LockedStructure(a)
assert len(ls), 2
with ls:
a[4] = 5
- class satella.coding.concurrent.LockedStructure(obj_to_wrap: T, lock: allocate_lock | None = None)¶
A wizard to make every Python structure thread-safe.
It wraps obj_to_wrap, passing on all calls, settings and so on to the object wrapper, from lock exposing only the context manager protocol.
Example:
>>> locked_dict = LockedStructure(dict) >>> with locked_dict: >>> locked_dict[5] = 2
Also, please note that operations such as addition will strip this object of being a locked structure, ie. they will return object that participated in locked structure plus some other.
Note that in-place operations return the locked structure.
AtomicNumber¶
- class satella.coding.concurrent.AtomicNumber(v: int | float = 0)¶
An atomic number. Note that the class is not hashable and for a reason, since it’s value might change in time. So in this case this is more of like a container for numbers.
Treat it like a normal number, except all operations are executed atomically.
You can also wait for it to change it’s value, via wait().
You change it’s value in the following way:
>>> a = AtomicNumber() >>> a += 2
Note that if the number if used in an expression, such as
>>> b = a + 2
Then a normal number will be returned
- wait(timeout: float | None = None, throw_exception: bool = True)¶
Block until the atomic number changes it’s value.
- Parameters:
timeout – maximum time to wait. None means wait indefinitely
throw_exception – whether to throw WouldWaitMore on timeout
- Raises:
WouldWaitMore – the value hasn’t changed within the timeout
- wait_until_equal(v: int | float, timeout: float | None = None) None ¶
Wait until the value of this number equals v.
- Parameters:
v – value to compare this number against
timeout – maximum time to wait
- Raises:
WouldWaitMore – timeout expired without the value becoming equal to target
FutureCollection¶
- class satella.coding.concurrent.FutureCollection(futures: Sequence[Future] = ())¶
A set of futures sharing a common result, or a common exception.
This overloads the operator + for making an union of futures. It can be used with either instances of
FutureCollection
or normal futures.Also supports the indexing operator to get n-th future.
- add(future: Future) FutureCollection ¶
Add a future
- Parameters:
future – a Future to add
- Returns:
self
- add_done_callback(callback, only_one: bool = False) None ¶
Add a callback to a Future to be called on it’s completion.
By default, this will add the callback to all futures.
- Parameters:
callback – callback that takes the completed Future as argument
only_one – callback will be added only to a single Future. False by default
- Raises:
IndexError – only_one was given and no Futures in collection!
- cancel() bool ¶
Cancel all futures
- Returns:
True if all sections were cancelled
- exception(timeout: float | None = None) Exception | None ¶
Return first exception raised by any of the futures
This will block until the results are available. This call proceeding does not mean that results for all are available, since this will return the first exception encountered!
- Parameters:
timeout – a timeout in seconds for a single result. Default value None means wait as long as necessary
- Returns:
the first exception, or None if there were no exceptions
- Raises:
WouldWaitMore – timeout while waiting for result
- result(timeout: float | None = None) list ¶
Return the result of all futures, as a list.
This will block until the results are available.
- Parameters:
timeout – a timeout in seconds for a single result. Default value None means wait as long as necessary
- Returns:
list containing results of all futures
- Raises:
WouldWaitMore – timeout while waiting for result
- set_exception(exc) None ¶
Set an exception for all futures
- Parameters:
exc – exception instance to set
- set_result(result) None ¶
Set a result for all futures
- Parameters:
result – result to set
- set_running_or_notify_cancel() bool ¶
Call
set_running_or_notify_cancel
on the futuresThis will return True if at least one future was not cancelled
Condition¶
A simplified version of threading.Condition. Doesn’t require you to acquire it in order to be notified. Just a bunch of syntactic sugar.
- class satella.coding.concurrent.Condition(lock=None)¶
A wrapper to faciliate easier usage of Pythons’ threading.Condition.
There’s no need to acquire the underlying lock, as wait/notify/notify_all do it for you.
This happens to sorta not work on PyPy. Use at your own peril. You have been warned.
- notify(n: int = 1) None ¶
Notify n threads waiting on this Condition
- Parameters:
n – amount of threads to notify
- notifyAll() None ¶
Deprecated alias for notify_all
Deprecated since version 2.14.22.
- notify_all() None ¶
Notify all threads waiting on this Condition
- wait(timeout: str | float | None = None, dont_raise: bool = False) None ¶
Wait for condition to become true.
- Parameters:
timeout – timeout to wait. None is default and means infinity. Can be also a time string.
dont_raise – if True, then WouldWaitMore won’t be raised
- Raises:
ResourceLocked – unable to acquire the underlying lock within specified timeout.
WouldWaitMore – wait’s timeout has expired
Timer¶
- class satella.coding.concurrent.Timer(interval: str | float, function, args=None, kwargs=None, spawn_separate=False)¶
A copy of threading.Timer but all objects are backed and waited upon in a single thread. They can be executed either in background monitor’s thread or a separate thread can be spawned for them.
There might be up to a second of delay before the timer is picked up.
n If spawn_separate is False, exceptions will be logged.
- param interval:
amount of seconds that should elapse between calling start() and function executing. Can be also a time string.
- param function:
function to execute
- param args:
argument for function
- param kwargs:
kwargs for function
- param spawn_separate:
whether to call the function in a separate thread
- cancel() None ¶
Do not execute this timer
- start() None ¶
Order this timer task to be executed in interval seconds
Functions and decorators¶
parallel_execute¶
For executing those functions that return a Future in parallel.
parallel_execute will return you an iterator, returning the result (or raising an exception) for every result you get.
- satella.coding.concurrent.parallel_execute(callable_: ~typing.Callable[[~satella.coding.typing.T], ~concurrent.futures._base.Future], args: ~typing.Iterable[~satella.coding.typing.T], kwargs: ~typing.Iterable[dict] = <generator object infinite_iterator>)¶
Execute a number of calls to callable in parallel.
Callable must be a function that accepts arguments and returns a plain Python future.
Return will be an iterator that will yield every value of the iterator, or return an instance of exception, if any of the calls excepted.
- Parameters:
callable – a callable that returns futures
args – an iterable of arguments to provide to the callable
kwargs – an iterable of keyword arguments to provide to the callable
- Returns:
an iterator yielding every value (or exception instance if thew) of the future
run_as_future¶
- satella.coding.concurrent.run_as_future(fun)¶
A decorator that accepts a function that should be executed in a separate thread, and a Future returned instead of it’s result, that will enable to watch the function for completion.
The created thread will be non-demonic
Example usage:
>>> @run_as_future >>> def parse_a_file(x: str): >>> ... >>> fut = parse_a_file('test.txt') >>> result = fut.result()
sync_threadpool¶
- satella.coding.concurrent.sync_threadpool(tpe: ExecutorWrapper | ThreadPoolExecutor, max_wait: float | None = None) None ¶
Make sure that every thread of given thread pool executor is done processing jobs scheduled until this moment.
Make sure that other tasks do not submit anything to this thread pool executor.
- Parameters:
tpe – thread pool executor to sync. Can be also a ExecutorWrapper.
max_wait – maximum time to wait. Default, None, means wait forever
- Raises:
WouldWaitMore – timeout exceeded. Raised only when max_wait is not None.