async_wrapper.task_group module

class async_wrapper.task_group.TaskGroupWrapper(task_group: TaskGroup) None[source]

Bases: TaskGroup

wrap anyio.abc.TaskGroup

Example

import anyio

from async_wrapper import TaskGroupWrapper


async def test(x: int) -> int:
    await anyio.sleep(0.1)
    return x


async def main() -> None:
    async with anyio.create_task_group() as task_group:
        async with TaskGroupWrapper(task_group) as tg:
            func = tg.wrap(test)
            soon_1 = func(1)
            soon_2 = func(2)

    assert soon_1.is_ready
    assert soon_2.is_ready
    assert soon_1.value == 1
    assert soon_2.value == 2


if __name__ == "__main__":
    anyio.run(main)
property cancel_scope : CancelScope
start_soon(func: Callable[[...], Awaitable[Any]], *args: Any, name: Any = None) None[source]

Start a new task in this task group.

Parameters:
func: Callable[[...], Awaitable[Any]]

a coroutine function

*args: Any

positional arguments to call the function with

name: Any = None

name of the task, for the purposes of introspection and debugging

Added in version 3.0.

wrap(func: Callable[[_P], Awaitable[_T]], semaphore: Semaphore | None = None, limiter: CapacityLimiter | None = None, lock: Lock | None = None) SoonWrapper[_P, _T][source]

Wrap a function to be used within a wrapper.

The wrapped function will return a value shortly.

Parameters:
func: Callable[[_P], Awaitable[_T]]

The target function to be wrapped.

semaphore: Semaphore | None = None

An anyio.abc.Semaphore. Defaults to None.

limiter: CapacityLimiter | None = None

An anyio.abc.CapacityLimiter. Defaults to None.

lock: Lock | None = None

An anyio.abc.Lock. Defaults to None.

Returns:

The wrapped function.

class async_wrapper.task_group.SoonValue None[source]

Bases: Generic[_T]

A class representing a value that will be available soon.

property value : _T

Gets the soon-to-be available value.

Raises:

PendingError – Raised if the value is not yet available.

Returns:

The soon-to-be available value.

property is_ready : bool

Checks if the value is ready.

Returns:

True if the value is not pending, False otherwise.

async_wrapper.task_group.create_task_group_wrapper() TaskGroupWrapper[source]

create new task group wrapper

Returns:

new TaskGroupWrapper