Time

parse_time_string

Parse a time string into amount of seconds

satella.time.parse_time_string(s: int | float | str) float

Parse a time string into seconds, so eg. ‘30m’ will be equal to 1800, and so will be ‘30 min’.

This will correctly parse: - seconds - minutes - hours - days - weeks

Warning

This does not handle fractions of a second!

Parameters:

s – time string or time value in seconds

Returns:

value in seconds

measure

Sometimes you just need to measure how long does a routine call take.

class satella.time.measure(future_to_measure: ~concurrent.futures._base.Future | None = None, stop_on_stop: bool = True, adjust: float = 0.0, time_getter_callable: ~typing.Callable[[], float] = <built-in function monotonic>, create_stopped: bool = False, timeout: float | None = None)

A class used to measure time elapsed. Use for example like this:

>>> with measure() as measurement:
>>>     time.sleep(1)
>>>     print('This has taken so far', measurement(), 'seconds')
>>>     time.sleep(1)
>>> print('A total of ', measurement(), 'seconds have elapsed')

You can also use the .start() method instead of context manager. Time measurement will stop after exiting or calling .stop() depending on stop_on_stop flag.

Time elapsing starts after the counter is created. If you wish to start it manually, please specify create_stopped=True

When instantiated and called with no arguments, this will return the time elapsed:

>>> a = measure()
>>> time.sleep(0.5)
>>> assert a() >= 0.5

You can also decorate your functions to have them keep track time of their execution, like that:

>>> @measure()
>>> def measuring(measurement_object: measure, *args, **kwargs):
>>>     ...

This will also correctly work on methods, correctly inserting the measurement object after self/cls:

>>> class Test:
>>>     @measure()
>>>     def measuring(self, measurement_object: measure, arg1):
>>>         ...

You can also measure how long does executing a future take, eg.

>>> future = get_my_future()
>>> measurement = measure(future)
>>> future.result()
>>> print('Executing the future took', measurement(), 'seconds')

In case a future is passed, the measurement will stop automatically as soon as the future returns with a result (or exception).

Note that in order to reuse a single counter you must .reset() it first. Just calling .start() after .stop() will result in the timer being resumed instead!

Note that if you’re measuring generators, this will count the time passed from the measure() was called till the generator finishes.

Note that if you’re using the decorator form, this object will be first copied and then passed to the function/future. This is to prevent undefined behaviour during multithreading. Also, the timer that you pass to this function will be started/not started, depending on what you set earlier. .reset() will be called on a copy of this object.

This can also be used to write custom timeouts, eg.

>>> with measure(timeout=5) as m:
>>>     while not m.timeouted:
>>>         ... do something ...
>>>         if condition:
>>>             break
>>>     if m.timeouted:
>>>         raise WouldWaitMore('timeout hit')
Parameters:
  • stop_on_stop – stop elapsing time upon calling .stop()/exiting the context manager. If this is set to False then .start() and .stop() won’t work and calling them will raise a TypeError.

  • adjust – interval to add to current time upon initialization

  • time_getter_callable – callable/0 -> float to get the time with

  • create_stopped – if this is set to True, you will manually need to call .start()

  • timeout – a time limit, after exceeding which the property timeouted will be true

adjust(interval: float) None

Add given value to internal started_at counter

assert_not_timeouted() None

If the time elapsed exceeded timeout, throw WouldWaitMore.

Always returns if the timeout was not givne

get_time_elapsed() float
Returns:

currently elapsed time

has_exceeded(value: float) bool

Return whether the timer has exceeded provided value.

Deprecated since version 2.14.22.

raise_if_exceeded(value: float, exc_class: ~typing.Type[Exception] = <class 'satella.exceptions.WouldWaitMore'>)

Raise provided exception, with no arguments, if timer has clocked more than provided value.

If no exc_class is provided, WouldWaitMore will be raised by default.

Deprecated since version 2.14.22.

reset() None

Reset the counter, enabling it to start counting after a .stop() call. This will put the counter in a STOPPED mode if it’s running already.

reset_and_start() None

Syntactic sugar for calling reset() and then start()

reset_and_stop() None

Syntactic sugar for calling stop() and then reset()

New in version 2.23.2.

start() None

Start measuring time or resume measuring it

stop() None

Stop counting time

Raises:

TypeError – stop_on_stop is enabled or the counter has already been stopped

property stopped: bool

Whether this counter is already stopped

New in version 2.23.2.

property time_remaining: float
Returns:

the difference between provided timeout and elapsed time, or None if timeout was not given. This will never be negative.

property timeouted: bool
Returns:

Has the time elapsed exceeded timeout? Always False if timeout was not given

update() None

Alias for .start()

Note that you might specify other things to represent as time, such as “get the amount of free memory” value.

time_as_int

Syntactic sugar for int(time.time()).

satella.time.time_as_int() int

Syntactic sugar for

>>> from time import time
>>> int(time())

time_ms

Syntactic sugar for int(time.time()*1000)

satella.time.time_ms() int

Syntactic sugar for

>>> from time import time
>>> int(time()*1000)

This will try to use time.time_ns() if available

time_us

Syntactic sugar for int(time.time()*1000000)

satella.time.time_us() int

Syntactic sugar for

>>> from time import time
>>> int(time()*1000000)

This will try to use time.time_ns() if available

sleep

satella.time.sleep(y: str | float, abort_on_interrupt: bool = False) bool

Sleep for given interval.

This won’t be interrupted by KeyboardInterrupted, and always will sleep for given time interval. This will return at once if x is negative

Parameters:
  • y – the interval to wait in seconds, can be also a time string

  • abort_on_interrupt – whether to abort at once when KeyboardInterrupt is seen

Returns:

whether the function has completed its sleep naturally. False is seen on aborts thanks to KeyboardInterrupt only if abort_on_interrupt is True. False will be also seen on negative y.

ExponentialBackoff

class satella.time.ExponentialBackoff(start: float = 1, limit: float = 30, sleep_fun: ~typing.Callable[[float], None] = <built-in function sleep>, grace_amount: int = 0)

A class that will sleep increasingly longer on errors. Meant to be used in such a way:

>>> eb = ExponentialBackoff(start=2, limit=30)
>>> while not connect():
>>>     eb.failed()
>>>     eb.sleep()
>>> eb.success()

Also a structure that will mark an object (eg. the Internet access) as inaccessible for some duration. Usage is that case is like this:

>>> eb = ExponentialBackoff(start=2, limit=30)
>>> eb.failed()
>>> self.assertFalse(eb.available)
>>> time.sleep(2)
>>> self.assertTrue(eb.available)

Note that this structure is thread safe only when a single object is doing the success or failed calls, and other utilize wait_until_available().

Parameters:
  • start – value at which to start

  • limit – maximum sleep timeout

  • sleep_fun – function used to sleep. Will accept a single argument - number of seconds to wait

  • grace_amount – amount of fails() that this will survive before everything fails

property available: bool

Was the status of the last call success?

failed() None

Called when something fails.

launch(exceptions_on_failed: ~typing.Type[Exception] | ~typing.Tuple[~typing.Type[Exception]] = <class 'Exception'>, immediate: bool = False)

A decorator to simplify writing doing-something loops. Basically, this:

>>> eb = ExponentialBackoff(start=2.5, limit=30)
>>> @eb.launch(TypeError)
>>> def do_action(*args, **kwargs):
>>>     x_do_action(*args, **kwargs)
>>> do_action(5, test=True)

is equivalent to this:

>>> eb = ExponentialBackoff(start=2.5, limit=30)
>>> while True:
>>>     try:
>>>         x_do_action(5, test=True)
>>>     except TypeError:
>>>         eb.failed()
>>>         eb.sleep()

The first example with immediate=True could skip the last call to do_action, as it will be executed automatically with zero parameters if immediate=True is set.

Parameters:
  • exceptions_on_failed – a list of a single exception of exceptions whose raising will signal that fun has failed

  • immediate – immediately execute the function, but return the wrapped function as a result of this decorator. The function will be called with zero arguments.

Returns:

a function, that called, will pass the exactly same parameters

property ready_for_next_check: bool
Returns:

Has failure() been called only in the last waiting period?

sleep() None

Called when sleep is expected.

success() None

Called when something successes.

time_until_next_check() float | None

Return the time until next health check, or None if the service is healthy

wait_until_available(timeout: float | None = None) None

Waits until the service is available

Parameters:

timeout – maximum amount of seconds to wait. If waited more than that, WouldWaitMore will be raised

Raises:

WouldWaitMore – waited for timeout and service still was not healthy