async_wrapper.task_group module

class async_wrapper.task_group.TaskGroupWrapper(task_group: TaskGroup)

Bases: TaskGroup

wrap anyio.abc.TaskGroup

Example

>>> import anyio
>>>
>>> from async_wrapper import TaskGroupWrapper
>>>
>>>
>>> async def test(x: int) -> int:
>>>     await anyio.sleep(0.1)
>>>     return x
>>>
>>>
>>> async def main() -> None:
>>>     async with anyio.create_task_group() as task_group:
>>>         async with TaskGroupWrapper(task_group) as tg:
>>>             func = tg.wrap(test)
>>>             soon_1 = func(1)
>>>             soon_2 = func(2)
>>>
>>>     assert soon_1.is_ready
>>>     assert soon_2.is_ready
>>>     assert soon_1.value == 1
>>>     assert soon_2.value == 2
>>>
>>>
>>> if __name__ == "__main__":
>>>     anyio.run(main)
property cancel_scope: CancelScope
start_soon(func: Callable[[...], Awaitable[Any]], *args: Any, name: Any = None) None

Start a new task in this task group.

Parameters:
  • func – a coroutine function

  • args – positional arguments to call the function with

  • name – name of the task, for the purposes of introspection and debugging

New in version 3.0.

wrap(func: Callable[ParamT, Awaitable[ValueT_co]], semaphore: Semaphore | None = None, limiter: CapacityLimiter | None = None, lock: Lock | None = None) SoonWrapper[ParamT, ValueT_co]

Wrap a function to be used within a wrapper.

The wrapped function will return a value shortly.

Parameters:
Returns:

The wrapped function.

async_wrapper.task_group.create_task_group_wrapper() TaskGroupWrapper

create new task group wrapper

Returns:

new TaskGroupWrapper