async_wrapper.task_group module

class async_wrapper.task_group.TaskGroupWrapper(task_group: TaskGroup)[source]

Bases: TaskGroup

wrap anyio.abc.TaskGroup

Example

>>> import anyio
>>>
>>> from async_wrapper import TaskGroupWrapper
>>>
>>>
>>> async def test(x: int) -> int:
>>>     await anyio.sleep(0.1)
>>>     return x
>>>
>>>
>>> async def main() -> None:
>>>     async with anyio.create_task_group() as task_group:
>>>         async with TaskGroupWrapper(task_group) as tg:
>>>             func = tg.wrap(test)
>>>             soon_1 = func(1)
>>>             soon_2 = func(2)
>>>
>>>     assert soon_1.is_ready
>>>     assert soon_2.is_ready
>>>     assert soon_1.value == 1
>>>     assert soon_2.value == 2
>>>
>>>
>>> if __name__ == "__main__":
>>>     anyio.run(main)
property cancel_scope: CancelScope
start_soon(func: Callable[[...], Awaitable[Any]], *args: Any, name: Any = None) None[source]

Start a new task in this task group.

Parameters:
  • func – a coroutine function

  • args – positional arguments to call the function with

  • name – name of the task, for the purposes of introspection and debugging

New in version 3.0.

wrap(func: Callable[ParamT, Awaitable[ValueT]], semaphore: Semaphore | None = None, limiter: CapacityLimiter | None = None, lock: Lock | None = None) SoonWrapper[ParamT, ValueT][source]

Wrap a function to be used within a wrapper.

The wrapped function will return a value shortly.

Parameters:
  • func – The target function to be wrapped.

  • semaphore – An anyio.abc.Semaphore. Defaults to None.

  • limiter – An anyio.abc.CapacityLimiter. Defaults to None.

  • lock – An anyio.abc.Lock. Defaults to None.

Returns:

The wrapped function.

class async_wrapper.task_group.SoonValue[source]

Bases: Generic[ValueT]

A class representing a value that will be available soon.

property value: ValueT

Gets the soon-to-be available value.

Raises:

PendingError – Raised if the value is not yet available.

Returns:

The soon-to-be available value.

property is_ready: bool

Checks if the value is ready.

Returns:

True if the value is not pending, False otherwise.

async_wrapper.task_group.create_task_group_wrapper() TaskGroupWrapper[source]

create new task group wrapper

Returns:

new TaskGroupWrapper