Skip to content

Wait

wait

Waiter

Waiter(
    func: Callable[_P, Awaitable[Any]],
    *args: args,
    **kwargs: kwargs
)

Bases: Event

wait wrapper

Examples:

import anyio

from async_wrapper import Waiter


async def test() -> None:
    print("test: start")
    await anyio.sleep(1)
    print("test: end")


async def test2(event: anyio.Event) -> None:
    print("test2: start")
    await event.wait()
    print("test2: end")


async def main() -> None:
    async with anyio.create_task_group() as task_group:
        event = Waiter(test)(task_group)
        task_group.start_soon(test2, event)


if __name__ == "__main__":
    anyio.run(main)
Source code in src/async_wrapper/wait.py
def __init__(
    self, func: Callable[_P, Awaitable[Any]], *args: _P.args, **kwargs: _P.kwargs
) -> None:
    self._func = func
    self._args = args
    self._kwargs = kwargs

__call__

__call__(
    task_group: TaskGroup, *, name: Any = None
) -> Self

start soon in task group

Source code in src/async_wrapper/wait.py
def __call__(self, task_group: TaskGroup, *, name: Any = None) -> Self:
    """start soon in task group"""
    task_group.start_soon(
        wait_for, self, partial(self._func, *self._args, **self._kwargs), name=name
    )
    return self

copy

copy(*args: Any, **kwargs: Any) -> Self

create new event

Returns:

Type Description
Self

new Waiter

Source code in src/async_wrapper/wait.py
def copy(self, *args: Any, **kwargs: Any) -> Self:
    """
    create new event

    Returns:
        new [`Waiter`][async_wrapper.wait.Waiter]
    """
    if not args:
        args = tuple(self._args)
    if not kwargs:
        kwargs = self._kwargs.copy()
    return Waiter(self._func, *args, **kwargs)  # type: ignore

Completed

Completed(task_group: TaskGroup | None = None)

like asyncio.as_completed

Examples:

from __future__ import annotations

import anyio

from async_wrapper import Completed


async def test(
    x: int,
    sleep: float,
    result: list[int] | None = None,
) -> int:
    print(f"[{x}] test: start")
    await anyio.sleep(sleep)
    print(f"[{x}] test: end")
    if result is not None:
        result.append(x)
    return x


async def main() -> None:
    result: list[int] = []
    async with anyio.create_task_group() as task_group:
        task_group.start_soon(test, 1, 1, result)
        async with Completed(task_group) as completed:
            completed.start_soon(None, test, 2, 0.2)
            completed.start_soon(None, test, 3, 0.1)
            completed.start_soon(None, test, 4, 0.3)

            result.extend([value async for value in completed])

    assert result == [3, 2, 4, 1]

    result = []
    async with anyio.create_task_group() as task_group:
        task_group.start_soon(test, 1, 1, result)
        async with Completed() as completed:
            completed.start_soon(task_group, test, 2, 0.2)
            completed.start_soon(task_group, test, 3, 0.1)
            completed.start_soon(task_group, test, 4, 0.3)

            result.extend([value async for value in completed])

    assert result == [3, 2, 4, 1]


if __name__ == "__main__":
    anyio.run(main)
Source code in src/async_wrapper/wait.py
def __init__(self, task_group: TaskGroup | None = None) -> None:
    self._events: dict[Waiter, MemoryObjectReceiveStream[Any]] = {}
    self.__setter: MemoryObjectSendStream[Waiter] | None = None
    self.__getter: MemoryObjectReceiveStream[Waiter] | None = None
    self.__task_group: TaskGroup | None = task_group

start_soon

start_soon(
    task_group: TaskGroup | None,
    func: Callable[..., Awaitable[Any]],
    *args: Any,
    name: Any = None
) -> None

Start a coroutine in a task group, similar to anyio.abc.TaskGroup.start_soon.

If a task group is already provided, the task_group parameter should be the same object.

Parameters:

Name Type Description Default
task_group
TaskGroup | None

An anyio.abc.TaskGroup. Defaults to None.

required
func
Callable[..., Awaitable[Any]]

The target coroutine function.

required
*args
Any

The arguments to pass to the coroutine function.

()
name
Any

The name used in anyio.abc.TaskGroup.start_soon. Defaults to None.

None
Source code in src/async_wrapper/wait.py
def start_soon(
    self,
    task_group: TaskGroup | None,
    func: Callable[..., Awaitable[Any]],
    *args: Any,
    name: Any = None,
) -> None:
    """
    Start a coroutine in a task group,
    similar to [`anyio.abc.TaskGroup.start_soon`][].

    If a task group is already provided,
    the task_group parameter should be the same object.

    Args:
        task_group: An [`anyio.abc.TaskGroup`][]. Defaults to None.
        func: The target coroutine function.
        *args: The arguments to pass to the coroutine function.
        name: The name used in [`anyio.abc.TaskGroup.start_soon`][].
            Defaults to None.
    """  # noqa: D205
    if not self._is_active:
        raise PendingError("enter first")
    task_group = self._task_group(task_group)
    waiter, getter = _create_waiter(func, *args)
    waiter(task_group, name=name)
    self._events[waiter] = getter

wait_for async

wait_for(
    event: Event | Iterable[Event],
    func: Callable[_P, Awaitable[_T]],
    *args: args,
    **kwargs: kwargs
) -> _T

Wait for an event before executing an awaitable function.

like asyncio.wait_for

Parameters:

Name Type Description Default

event

Event | Iterable[Event]

An anyio.Event or an iterable of events.

required

func

Callable[_P, Awaitable[_T]]

An awaitable function to be executed.

required

*args

args

The arguments to pass to the awaitable function.

()

**kwargs

kwargs

The keyword arguments to pass to the awaitable function.

{}

Returns:

Type Description
_T

The result of the executed function.

Examples:

import anyio

from async_wrapper import wait_for


async def test() -> None:
    print("test: start")
    await anyio.sleep(1)
    print("test: end")


async def test2(event: anyio.Event) -> None:
    print("test2: start")
    await event.wait()
    print("test2: end")


async def main() -> None:
    event = anyio.Event()
    async with anyio.create_task_group() as task_group:
        task_group.start_soon(wait_for, event, test)
        task_group.start_soon(test2, event)


if __name__ == "__main__":
    anyio.run(main)
Source code in src/async_wrapper/wait.py
async def wait_for(
    event: Event | Iterable[Event],
    func: Callable[_P, Awaitable[_T]],
    *args: _P.args,
    **kwargs: _P.kwargs,
) -> _T:
    """
    Wait for an event before executing an awaitable function.

    like [`asyncio.wait_for`][]

    Args:
        event: An [`anyio.Event`][] or an iterable of events.
        func: An awaitable function to be executed.
        *args: The arguments to pass to the awaitable function.
        **kwargs: The keyword arguments to pass to the awaitable function.

    Returns:
        The result of the executed function.

    Examples:
        ```python
        import anyio

        from async_wrapper import wait_for


        async def test() -> None:
            print("test: start")
            await anyio.sleep(1)
            print("test: end")


        async def test2(event: anyio.Event) -> None:
            print("test2: start")
            await event.wait()
            print("test2: end")


        async def main() -> None:
            event = anyio.Event()
            async with anyio.create_task_group() as task_group:
                task_group.start_soon(wait_for, event, test)
                task_group.start_soon(test2, event)


        if __name__ == "__main__":
            anyio.run(main)
        ```
    """
    event = set(event) if not isinstance(event, Event) else (event,)
    try:
        return await func(*args, **kwargs)
    finally:
        for sub in event:
            sub.set()