Skip to content

TaskGroup

task_group

TaskGroupWrapper

TaskGroupWrapper(task_group: TaskGroup)

Bases: TaskGroup

wrap anyio.abc.TaskGroup

Examples:

import anyio

from async_wrapper import TaskGroupWrapper


async def test(x: int) -> int:
    await anyio.sleep(0.1)
    return x


async def main() -> None:
    async with anyio.create_task_group() as task_group:
        async with TaskGroupWrapper(task_group) as tg:
            func = tg.wrap(test)
            soon_1 = func(1)
            soon_2 = func(2)

    assert soon_1.is_ready
    assert soon_2.is_ready
    assert soon_1.value == 1
    assert soon_2.value == 2


if __name__ == "__main__":
    anyio.run(main)
Source code in src/async_wrapper/task_group/task_group.py
def __init__(self, task_group: _TaskGroup) -> None:
    self._task_group = task_group
    self._active_self = False

wrap

wrap(
    func: Callable[_P, Awaitable[_T]],
    semaphore: Semaphore | None = None,
    limiter: CapacityLimiter | None = None,
    lock: Lock | None = None,
) -> SoonWrapper[_P, _T]

Wrap a function to be used within a wrapper.

The wrapped function will return a value shortly.

Parameters:

Name Type Description Default
func
Callable[_P, Awaitable[_T]]

The target function to be wrapped.

required
semaphore
Semaphore | None

An anyio.Semaphore. Defaults to None.

None
limiter
CapacityLimiter | None

An anyio.CapacityLimiter. Defaults to None.

None
lock
Lock | None

An anyio.Lock. Defaults to None.

None

Returns:

Type Description
SoonWrapper[_P, _T]

The wrapped function.

Source code in src/async_wrapper/task_group/task_group.py
def wrap(
    self,
    func: Callable[_P, Awaitable[_T]],
    semaphore: Semaphore | None = None,
    limiter: CapacityLimiter | None = None,
    lock: Lock | None = None,
) -> SoonWrapper[_P, _T]:
    """
    Wrap a function to be used within a wrapper.

    The wrapped function will return a value shortly.

    Args:
        func: The target function to be wrapped.
        semaphore: An [`anyio.Semaphore`][]. Defaults to None.
        limiter: An [`anyio.CapacityLimiter`][]. Defaults to None.
        lock: An [`anyio.Lock`][]. Defaults to None.

    Returns:
        The wrapped function.
    """
    return SoonWrapper(func, self, semaphore=semaphore, limiter=limiter, lock=lock)

SoonValue

SoonValue()

Bases: Generic[_T]

A class representing a value that will be available soon.

Source code in src/async_wrapper/task_group/value.py
def __init__(self) -> None:
    self._value: _T | local = Pending

value property

value: _T

Gets the soon-to-be available value.

Raises:

Type Description
PendingError

Raised if the value is not yet available.

Returns:

Type Description
_T

The soon-to-be available value.

is_ready property

is_ready: bool

Checks if the value is ready.

Returns:

Type Description
bool

True if the value is not pending, False otherwise.

create_task_group_wrapper

create_task_group_wrapper() -> TaskGroupWrapper

create new task group wrapper

Returns:

Type Description
TaskGroupWrapper
Source code in src/async_wrapper/task_group/task_group.py
def create_task_group_wrapper() -> TaskGroupWrapper:
    """
    create new task group wrapper

    Returns:
        new [`TaskGroupWrapper`][async_wrapper.task_group.task_group.TaskGroupWrapper]
    """
    return TaskGroupWrapper(_create_task_group())