Concurrent data structures

DeferredValue

class satella.coding.concurrent.DeferredValue

A class that allows you to pass arguments that will be available later during runtime.

Usage:

>>> def thread1(value):
 >>>     print(value.value())
>>> val = DeferredValue()
>>> threading.Thread(target=thread1, args=(val, )).start()
>>> time.sleep(10)
>>> val.set_value(3)
result(timeout=None) T

An alias for value()

set_value(va: T) None

Set a value and wake up all the threads waiting on it.

Parameters:

va – value to set

Raises:

ValueError – value is already set

value(timeout: float | None = None) T

Wait until value is available, and return it.

Parameters:

timeout – number of seconds to wait. If None is given, this will take as long as necessary.

Returns:

a value

Raises:

WouldWaitMore – timeout was given and it has expired

CallableGroup

class satella.coding.concurrent.CallableGroup(gather: bool = True, swallow_exceptions: bool = False)

This behaves like a function, but allows to add other functions to call when invoked, eg.

c1 = Callable()

c1.add(foo) c1.add(bar)

c1(2, 3)

Now both foo and bar will be called with arguments (2, 3). Their exceptions will be propagated.

add(callable_: CancellableCallback | Callable[[], T], one_shot: bool = False)

Add a callable.

Can be a CancellableCallback, in that case method remove_cancelled() might be useful.

Parameters:
  • callable – callable

  • one_shot – if True, callable will be unregistered after single call

property has_cancelled_callbacks: bool

Check whether this has any CancellableCallback instances and whether any of them was cancelled

remove_cancelled() None

Remove it’s entries that are CancelledCallbacks and that were cancelled

CallNoOftenThan

class satella.coding.concurrent.CallNoOftenThan(interval: float, callable_: Callable)

A class that will ensure that calls to given callable are made no more often than some interval.

Even if it’s call is called more often than specified value, the callable just won’t be called and None will be returned.

Parameters:
  • interval – interval in seconds

  • callable – callable to call

parallel_construct

satella.coding.concurrent.parallel_construct(iterable: Iterable[V], function: Callable[[V], U | None], thread_pool: ThreadPoolExecutor, span_title: str | None = None) List[U]

Construct a list from executing given function in a thread pool executor.

If opentracing is installed, and tracing is enabled, current span will be passed to child threads.

Parameters:
  • iterable – iterable to apply

  • function – function to apply. If that function returns None, no element will be added

  • thread_pool – thread pool to execute

  • span_title – span title to create. For each execution a child span will be returned

Returns:

list that is the result of parallel application of function on each element

CancellableCallback

class satella.coding.concurrent.CancellableCallback(callback_fun: Callable)

A callback that you can cancel.

Useful for event-driven software that looks through lists of callbacks and determines whether to delete them or further invalidate in some other way.

If called, the function itself won’t be called as well if this was cancelled. In this case a None will be returned instead of the result of callback_fun()

This short circuits __bool__ to return not .cancelled.

Hashable and __eq__-able by identity.

Parameters:

callback_fun – function to call

Variables:

cancelled – whether this callback was cancelled (bool)

cancel() None

Cancel this callback.

LockedDataset

class satella.coding.concurrent.LockedDataset

A locked dataset. Subclass like

>>> class MyDataset(LockedDataset):
>>>     def __init__(self):
>>>         super(MyDataset, self).__init__()
>>>         with self:
>>>             self.mydata: str = "lol wut"
>>>    @LockedDataset.locked
>>>    def protected(self):
>>>         self.mydata = "updated atomically"
>>> mds = MyDataset()
>>> with mds as md:
>>>     md.mydata = "modified atomically"
>>> try:
>>>     with mds(blocking=True, timeout=0.5) as md:
>>>         md.mydata = "modified atomically"
>>> except ResourceLocked:
>>>     print('Could not update the resource')

If no lock is held, this class that derives from such will raise ResourceNotLocked upon element access while a lock is not being held.

Note that __enter__ will raise WouldWaitMore if timeout was given.

static locked(blocking=True, timeout=-1) Callable[[Callable], Callable]

Decorator to use for annotating methods that would lock :param blocking: whether to block at all :param timeout: optional timeout. Default, or -1 means “return ASAP”

PeekableQueue

class satella.coding.concurrent.PeekableQueue

A thread-safe FIFO queue that supports peek()ing for elements.

get(timeout: float | None = None) T

Get an element.

Parameters:

timeout – maximum amount of seconds to wait. Default value of None means wait as long as necessary

Returns:

the item

Raises:

Empty – queue was empty

peek(timeout: float | None = None) T

Get an element without removing it from the top of the queue.

Parameters:

timeout – maximum amount of seconds to wait. Default value of None means wait as long as necessary

Returns:

the item

Raises:

WouldWaitMore – timeout has expired

put(item: T) None

Add an element to the queue

Parameters:

item – element to add

put_many(items: Sequence[T]) None

Put multiple items

Parameters:

items – sequence of items to put

qsize() int

Return the approximate size of the queue. Note, qsize() > 0 doesn’t guarantee that a subsequent get() will not block. :return: approximate size of the queue

ThreadCollection

class satella.coding.concurrent.ThreadCollection(threads: Sequence[Thread] | None = None)

A collection of threads.

Create like:

>>> class MyThread(Thread):
>>>     def __init__(self, a):
>>>         ...
>>> tc = ThreadCollection.from_class(MyThread, [2, 4, 5], daemon=True)
>>> tc.start()
>>> tc.terminate()
>>> tc.join()

This also implements iteration (it will return all the threads in the collection) and length check. This also supports + and += operations for all thread collections, threads and iterables of threads.

add(thread: Thread) ThreadCollection

Add a thread to the collection

Parameters:

thread – thread to add

Returns:

this thread collection instance

append(thread: Thread) ThreadCollection

Alias for add()

Parameters:

thread – thread to add

Returns:

this thread collection instance

property daemon: bool

Is any of the threads a daemon?

Also, when used a setter sets daemon attribute.

classmethod from_class(cls_to_use, iteratable, **kwargs) ThreadCollection

Build a thread collection

Parameters:
  • cls_to_use – class to instantiate with

  • iteratable – an iterable with the sole argument to this class

classmethod get_currently_running(include_main_thread: bool = True) ThreadCollection

Get all currently running threads as thread collection

Parameters:

include_main_thread – whether to include the main thread

Returns:

a thread collection representing all currently running threads

is_alive() bool

Is at least one thread alive?

join(timeout: float | None = None) ThreadCollection

Join all threads

Parameters:

timeout – maximum time in seconds to wait for the threads to terminate. Note that the timeout will be applied to each thread in sequence, so this can block for up to thread_count*timeout seconds. Default value of None means wait as long as it’s necessary.

Returns:

this thread collection instance

Raises:

WouldWaitMore – one of the threads failed to terminate

start() ThreadCollection

Start all threads

Returns:

this thread collection instance

terminate(*args, **kwargs) ThreadCollection

Call terminate() on all threads that have this method

Returns:

this thread collection instance

TerminableThread

Note that force=True is not available on PyPy. If an attempt to use it on PyPy is made, RuntimeError will be thrown.

Please note that in order to terminate, target Python thread must at least execute some Python code. It means that if it’s hanging on I/O, for example, it won’t be affected.

class satella.coding.concurrent.TerminableThread(*args, terminate_on: Type[Exception] | Tuple[Type[Exception]] | None = None, **kwargs)

Class that will execute something in a loop unless terminated. Use like:

>>> class MeGrimlock(TerminableThread):
>>>     def loop(self):
>>>         ... do your operations ..
>>> a = MeGrimlock().start()
>>> a.terminate().join()

Property to check whether to terminate is stored in self.terminating.

If you decide to override run(), you got to check periodically for self._terminating to become true. If it’s true, then a termination request was received, and the thread should terminate itself. If you decide to use the loop/cleanup interface, you don’t need to do so, because it will be automatically checked for you before each loop() call.

You may also use it as a context manager. Entering the context will start the thread, and exiting it will .terminate().join() it, in the following way:

>>> a = MeGrimlock()
>>> with a:
>>>     ...
>>> self.assertFalse(a.is_alive())

If prepare() throws one of the terminate_on exceptions, loop() even won’t be called. However, terminate() will be automatically called then.

Same applies for IntervalTerminableThread and CPUTimeAwareIntervalTerminableThread.

cleanup()

Called after thread non-forced termination, in the thread’s context.

The default implementation does nothing.

loop() None

Run one iteration of the loop. Meant to be overrided. You do not need to override it if you decide to override run() through.

This should block for as long as a single check will take, as termination checks take place between calls.

Note that if it throws one of the exceptions given in terminate_on this thread will terminate cleanly, whereas if it throws something else, the thread will be terminated with a traceback.

prepare() None

This is called before the .loop() looping loop is entered.

This is invoked already in a separate thread.

run() None

Calls self.loop() indefinitely, until terminating condition is met

safe_sleep(interval: float, wake_up_each: float = 2) None

Sleep for interval, waking up each wake_up_each seconds to check if terminating, finish earlier if is terminating.

This will do the right thing when passed a negative interval.

To be invoked only by the thread that’s represented by the object!

Parameters:
  • interval – Time to sleep in total

  • wake_up_each – Amount of seconds to wake up each

Raises:

SystemExit – thread is terminating

safe_wait_condition(condition: Condition, timeout: str | float, wake_up_each: str | float = 2, dont_raise: bool = False) None

Wait for a condition, checking periodically if the thread is being terminated.

To be invoked only by the thread that’s represented by the object!

Parameters:
  • condition – condition to wait on

  • timeout – maximum time to wait in seconds. Can be also a time string

  • wake_up_each – amount of seconds to wake up each to check for termination. Can be also a time string.

  • dont_raise – if set to True, WouldWaitMore will not be raised

Raises:
  • WouldWaitMore – timeout has passed and Condition has not happened

  • SystemExit – thread is terminating

start() TerminableThread

Start the execution of this thread :return: this thread

terminate(force: bool = False) TerminableThread

Signal this thread to terminate.

Forcing, if requested, will be done by injecting a SystemExit exception into target thread, so the thread must acquire GIL. For example, following would not be interruptable:

>>> time.sleep(1000000)

Note that calling force=True on PyPy won’t work, and NotImplementedError will be raised instead.

Parameters:

force – Whether to force a quit

Returns:

self

Raises:
  • RuntimeError – when something goes wrong with the underlying Python machinery

  • NotImplementedError – force=True was used on PyPy

property terminating: bool
Returns:

Is this thread either alive and trying to terminate or dead and after termination?

In order to terminate you can throw SystemExit.

IntervalTerminableThread

class satella.coding.concurrent.IntervalTerminableThread(seconds: str | float, *args, **kwargs)

A TerminableThread that calls .loop() once per x seconds, taking into account the length of loop() runtime.

If executing .loop() takes more than x seconds, on_overrun() will be called. If executing .process() takes more than x seconds, it will be called immediately after it returns (and on_overrun() executes)

Parameters:

seconds – time that a single looping through should take in seconds. Can be also a time string. This will include the time spent on calling .loop(), the rest of this time will be spent safe_sleep()ing.

abstract loop() None

Override me!

on_overrun(time_taken: float) None

Called when executing .loop() takes more than x seconds.

Called each cycle.

You are meant to override this, as by default this does nothing.

Parameters:

time_taken – how long did calling .loop() take

run()

Calls self.loop() indefinitely, until terminating condition is met

BogusTerminableThread

class satella.coding.concurrent.BogusTerminableThread

A mock object that implements threading interface but does nothing

Variables:
  • started – bool, if it’s running

  • terminated – bool, if terminated

  • daemon – bool, if daemon

is_alive() bool
Returns:

if this thread is alive

join(timeout=None) None

Wait for the pseudo-thread. Sets running to False if thread was terminated.

Parameters:

timeout – maximum number of seconds to wait for termination

Raises:
  • WouldWaitMore – thread did not terminate within that many seconds

  • RuntimeError – tried to join() before start()!

start() None

Set running to True :raises RuntimeError: thread already terminated or already running

terminate() None

Set terminated to True.

Note that to set running to False you need to invoke join() afterwards.

call_in_separate_thread

satella.coding.concurrent.call_in_separate_thread(*t_args, no_thread_attribute: bool = False, delay: float = 0, **t_kwargs)

Decorator to mark given routine as callable in a separate thread.

The decorated routine will return a Future that is waitable to get the result (or the exception) of the function.

The returned Future will have an extra attribute, “thread” that is thread that was spawned for it. The returned thread will in turn have an attribute “future” that links to this future.

Warning

calling this will cause reference loops, so don’t use it if you’ve disabled Python GC, or in that case enable the no_thread_attribute argument

The arguments given here will be passed to thread’s constructor, so use like:

Parameters:
  • no_thread_attribute – if set to True, future won’t have a link returned to it’s thread. The thread will have attribute of “future” anyways.

  • delay – seconds to wait before launching function

>>> @call_in_separate_thread(daemon=True)
>>> def handle_messages():
>>>     while True:
>>>         ...

SingleStartThread

class satella.coding.concurrent.SingleStartThread(*args, **kwargs)

A thread that keeps track of whether it’s .start() method was called, and does nothing if it’s called second or so time.

start() SingleStartThread

No-op when called second or so time. The first time it starts the thread.

Returns:

self

SequentialIssuer

class satella.coding.concurrent.SequentialIssuer(start: int = 0)

A classs that issues an monotonically increasing value.

Parameters:

start – start issuing IDs from this value

Variables:

start – next value to be issued

issue() int

Just issue a next identifier

Returns:

a next identifier

no_less_than(no_less_than: int) int

Issue an int, which is no less than a given value

Parameters:

no_less_than – value that the returned id will not be less than this

Returns:

an identifier, no less than no_less_than

CPManager

class satella.coding.resources.CPManager(max_number: int, max_cycle_no: int)

A thread-safe no-hassle connection-pool manager.

Extend this class to build your own connection pool managers.

This supports automatic connection recycling, connection will be cycled each max_cycle_no takings and deposits.

Note that you have to overload teardown_connection() and create_connection().

You obtain a connection by using acquire_connection(). If it fails you should mark it as such using fail_connection(). In all cases you have to return it using release_connection().

Parameters:
  • max_number – maximum number of connections

  • max_cycle_no – maximum number of get/put connection cycles.

Variables:

max_number – maximum amount of connections. Can be changed during runtime

Warning

May not work under PyPy for reasons having to do with id’s semantics. A RuntimeWarning will be issued when not running under CPython.

acquire_connection() T

Either acquire a new connection, or just establish it in the background

Returns:

a new connection:

Raises:

RuntimeError – CPManager is terminating!

close() None

Check if the resource needs cleanup, and clean up this resource.

Use like this:

>>> class MyClose(Closeable):
>>>     def close(self):
>>>         if super().close():
>>>             .. clean up ..
Returns:

whether the cleanup should proceed

Raises:

RuntimeError – the constructor was not invoked

abstract create_connection() T

Create a new connection.

Is safe to block.

Returns:

a new connection instance

fail_connection(connection: T) None

Signal that a given connection has been failed

Parameters:

connection – connection to fail

invalidate() None

Close all connections. Connections have to be released first. Object is ready for use after this

release_connection(connection: T) None

Release a connection

Parameters:

connection – connection to release

abstract teardown_connection(connection: T) None

Close the connection.

Is safe to block.

Parameters:

connection – connection to tear down

IDAllocator

class satella.coding.concurrent.IDAllocator(start_at: int = 0, top_limit: int | None = None)

Reusable ID allocator

You can use it to requisition ints from a pool, and then free their ints, permitting them to be reused.

Thread-safe.

Parameters:
  • start_at – the lowest integer that the allocator will return

  • top_limit – the maximum value that will not be allocated. If used, subsequent calls to allocate_int() will raise Empty

allocate_int() int

Return a previously unallocated int, and mark it as allocated

Returns:

an allocated int

Raises:

Empty – could not allocate an int due to top limit

mark_as_allocated(x: int)

Mark given x as allocated

Parameters:

x – x to mark as allocated

Raises:
  • AlreadyAllocated – x was already allocated

  • ValueError – x is less than start_at

mark_as_free(x: int)

Mark x as free

Parameters:

x – int to free

Raises:

ValueError – x was not allocated or less than start_at

Monitor

A monitor is a Java-like synchronization idea. Inheriting from Monitor outfits the class with a Lock (or a reentrant lock, if RMonitor is used), that can be used to coordinate access to some shared resource.

Take care to invoke Monitor’s constructor when inheriting, or this won’t work.

You can decorate your methods with Monitor.synchronized to have them execute with the lock acquired. If you have such a method, you can also temporarily release the lock using context manager Monitor.release (it will be reacquired) when context manager is exited.

You can also use manual synchronization with context manager Monitor.acquire.

from satella.coding import Monitor


class MyProtectedClass(Monitor):
    def __init__(self, *args):
        super(Monitor, self).__init__()

    @Monitor.synchronized
    def synchronized(self):
        pass # everything here is executed with class lock acquired

    @Monitor.synchronized
    def temporary_release(self):
        pass    # lock is acquired here
        with Monitor.release(self):
            pass    # lock is NOT ACQUIRED here
        pass # and here it's reacquired again

    def manual_sync(self):
        pass    # not synchronized
        with Monitor.acquire(self):
            pass # synchronized

You can also use Monitor.release and Monitor.acquire with other objects than self, but exercise caution and think over the consequences.

class satella.coding.concurrent.Monitor

Base utility class for creating monitors (the synchronization thingies!)

These are NOT re-entrant!

Use it like that:

>>> class MyProtectedObject(Monitor):
>>>     def __init__(self, *args, **kwargs):
>>>         Monitor.__init__(self)
>>>         ... do your job ..
>>>     @Monitor.synchronized
>>>     def function_that_needs_mutual_exclusion(self):
>>>         .. do your threadsafe jobs ..
>>>     def function_that_partially_needs_protection(self):
>>>         .. do your jobs ..
>>>         with Monitor.acquire(self):
>>>             .. do your threadsafe jobs ..
>>>         .. do your jobs ..
>>>         with self:
>>>             .. do your threadsafe jobs ..
class acquire(foo: Monitor)

Returns a context manager object that can lock another object, as long as that object is a monitor.

Consider foo, which is a monitor. If you needed to lock it from outside, you would do:

>>> with Monitor.acquire(foo):
>>>     .. do operations on foo that need mutual exclusion ..
class release(foo: Monitor)

Returns a context manager object that can release another object as long as that object is a monitor.

Consider foo, which is a monitor. You have a protected function, but you feel that you can release it for a while as it would improve parallelism. You can use it as such:

>>> @Monitor.synchronized
>>> def protected_function(self):
>>>     .. do some stuff that needs mutual exclusion ..
>>>     with Monitor.release(self):
>>>         .. do some I/O that does not need mutual exclusion ..
>>>     .. back to protected stuff ..
classmethod synchronize_on(monitor: Monitor) Callable[[Callable], Callable]

A decorator for locking on non-self Monitor objects

Use it like:

>>> class MasterClass(Monitor):
>>>     def get_object(self):
>>>         class SlaveClass:
>>>             @Monitor.synchronize_on(self)
>>>             def get_object(self2):
>>>                 ...
>>>         return SlaveClass
static synchronize_on_attribute(attr_name: str)

When a Monitor is an attribute of a class, and you have a method instance that you would like secure by acquiring that monitor, use this.

The first argument taken by that method instance must be self.

Parameters:

attr_name – name of the attribute that is the monitor

static synchronized(fun: Callable) Callable

This is a decorator. Class method decorated with that will lock the global lock of given instance, making it threadsafe. Depending on usage pattern of your class and it’s data semantics, your performance may vary

class satella.coding.concurrent.RMonitor

Monitor, but using an reentrant lock instead of a normal one

Additionally, following types are predefined for your convenience:

class satella.coding.concurrent.MonitorList(*args)

A list that is also a monitor.

Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice

class satella.coding.concurrent.MonitorDict(*args, **kwargs)

A dict that is also a monitor.

Note that access to it’s properties is not automatically synchronized, you got to invoke the monitor to implement an opportunistic locking of your own choice

class satella.coding.concurrent.MonitorSet(*args)

A set that allows atomic insert-if-not-already-there operation

LockedStructure

A proxy to an object along with a lock, that can be triggered using the context manager:

a = {1:2, 3:4}
ls = LockedStructure(a)

assert len(ls), 2
with ls:
    a[4] = 5
class satella.coding.concurrent.LockedStructure(obj_to_wrap: T, lock: allocate_lock | None = None)

A wizard to make every Python structure thread-safe.

It wraps obj_to_wrap, passing on all calls, settings and so on to the object wrapper, from lock exposing only the context manager protocol.

Example:

>>> locked_dict = LockedStructure(dict)
>>> with locked_dict:
>>>     locked_dict[5] = 2

Also, please note that operations such as addition will strip this object of being a locked structure, ie. they will return object that participated in locked structure plus some other.

Note that in-place operations return the locked structure.

AtomicNumber

class satella.coding.concurrent.AtomicNumber(v: int | float = 0)

An atomic number. Note that the class is not hashable and for a reason, since it’s value might change in time. So in this case this is more of like a container for numbers.

Treat it like a normal number, except all operations are executed atomically.

You can also wait for it to change it’s value, via wait().

You change it’s value in the following way:

>>> a = AtomicNumber()
>>> a += 2

Note that if the number if used in an expression, such as

>>> b = a + 2

Then a normal number will be returned

wait(timeout: float | None = None, throw_exception: bool = True)

Block until the atomic number changes it’s value.

Parameters:
  • timeout – maximum time to wait. None means wait indefinitely

  • throw_exception – whether to throw WouldWaitMore on timeout

Raises:

WouldWaitMore – the value hasn’t changed within the timeout

wait_until_equal(v: int | float, timeout: float | None = None) None

Wait until the value of this number equals v.

Parameters:
  • v – value to compare this number against

  • timeout – maximum time to wait

Raises:

WouldWaitMore – timeout expired without the value becoming equal to target

FutureCollection

class satella.coding.concurrent.FutureCollection(futures: Sequence[Future] = ())

A set of futures sharing a common result, or a common exception.

This overloads the operator + for making an union of futures. It can be used with either instances of FutureCollection or normal futures.

Also supports the indexing operator to get n-th future.

add(future: Future) FutureCollection

Add a future

Parameters:

future – a Future to add

Returns:

self

add_done_callback(callback, only_one: bool = False) None

Add a callback to a Future to be called on it’s completion.

By default, this will add the callback to all futures.

Parameters:
  • callback – callback that takes the completed Future as argument

  • only_one – callback will be added only to a single Future. False by default

Raises:

IndexError – only_one was given and no Futures in collection!

cancel() bool

Cancel all futures

Returns:

True if all sections were cancelled

exception(timeout: float | None = None) Exception | None

Return first exception raised by any of the futures

This will block until the results are available. This call proceeding does not mean that results for all are available, since this will return the first exception encountered!

Parameters:

timeout – a timeout in seconds for a single result. Default value None means wait as long as necessary

Returns:

the first exception, or None if there were no exceptions

Raises:

WouldWaitMore – timeout while waiting for result

result(timeout: float | None = None) list

Return the result of all futures, as a list.

This will block until the results are available.

Parameters:

timeout – a timeout in seconds for a single result. Default value None means wait as long as necessary

Returns:

list containing results of all futures

Raises:

WouldWaitMore – timeout while waiting for result

set_exception(exc) None

Set an exception for all futures

Parameters:

exc – exception instance to set

set_result(result) None

Set a result for all futures

Parameters:

result – result to set

set_running_or_notify_cancel() bool

Call set_running_or_notify_cancel on the futures

This will return True if at least one future was not cancelled

Condition

A simplified version of threading.Condition. Doesn’t require you to acquire it in order to be notified. Just a bunch of syntactic sugar.

class satella.coding.concurrent.Condition(lock=None)

A wrapper to faciliate easier usage of Pythons’ threading.Condition.

There’s no need to acquire the underlying lock, as wait/notify/notify_all do it for you.

This happens to sorta not work on PyPy. Use at your own peril. You have been warned.

notify(n: int = 1) None

Notify n threads waiting on this Condition

Parameters:

n – amount of threads to notify

notifyAll() None

Deprecated alias for notify_all

Deprecated since version 2.14.22.

notify_all() None

Notify all threads waiting on this Condition

wait(timeout: str | float | None = None, dont_raise: bool = False) None

Wait for condition to become true.

Parameters:
  • timeout – timeout to wait. None is default and means infinity. Can be also a time string.

  • dont_raise – if True, then WouldWaitMore won’t be raised

Raises:

Timer

class satella.coding.concurrent.Timer(interval: str | float, function, args=None, kwargs=None, spawn_separate=False)

A copy of threading.Timer but all objects are backed and waited upon in a single thread. They can be executed either in background monitor’s thread or a separate thread can be spawned for them.

There might be up to a second of delay before the timer is picked up.

n If spawn_separate is False, exceptions will be logged.

param interval:

amount of seconds that should elapse between calling start() and function executing. Can be also a time string.

param function:

function to execute

param args:

argument for function

param kwargs:

kwargs for function

param spawn_separate:

whether to call the function in a separate thread

cancel() None

Do not execute this timer

start() None

Order this timer task to be executed in interval seconds

Functions and decorators

parallel_execute

For executing those functions that return a Future in parallel.

parallel_execute will return you an iterator, returning the result (or raising an exception) for every result you get.

satella.coding.concurrent.parallel_execute(callable_: ~typing.Callable[[~satella.coding.typing.T], ~concurrent.futures._base.Future], args: ~typing.Iterable[~satella.coding.typing.T], kwargs: ~typing.Iterable[dict] = <generator object infinite_iterator>)

Execute a number of calls to callable in parallel.

Callable must be a function that accepts arguments and returns a plain Python future.

Return will be an iterator that will yield every value of the iterator, or return an instance of exception, if any of the calls excepted.

Parameters:
  • callable – a callable that returns futures

  • args – an iterable of arguments to provide to the callable

  • kwargs – an iterable of keyword arguments to provide to the callable

Returns:

an iterator yielding every value (or exception instance if thew) of the future

run_as_future

satella.coding.concurrent.run_as_future(fun)

A decorator that accepts a function that should be executed in a separate thread, and a Future returned instead of it’s result, that will enable to watch the function for completion.

The created thread will be non-demonic

Example usage:

>>> @run_as_future
>>> def parse_a_file(x: str):
>>>     ...
>>> fut = parse_a_file('test.txt')
>>> result = fut.result()

sync_threadpool

satella.coding.concurrent.sync_threadpool(tpe: ExecutorWrapper | ThreadPoolExecutor, max_wait: float | None = None) None

Make sure that every thread of given thread pool executor is done processing jobs scheduled until this moment.

Make sure that other tasks do not submit anything to this thread pool executor.

Parameters:
  • tpe – thread pool executor to sync. Can be also a ExecutorWrapper.

  • max_wait – maximum time to wait. Default, None, means wait forever

Raises:

WouldWaitMore – timeout exceeded. Raised only when max_wait is not None.