async_wrapper.wait.py

class async_wrapper.wait.Waiter(func: collections.abc.Callable[_P, Awaitable[Any]], *args: _P.args, **kwargs: _P.kwargs) None[source]

Bases: Event

wait wrapper

Example

import anyio

from async_wrapper import Waiter


async def test() -> None:
    print("test: start")
    await anyio.sleep(1)
    print("test: end")


async def test2(event: anyio.Event) -> None:
    print("test2: start")
    await event.wait()
    print("test2: end")


async def main() -> None:
    async with anyio.create_task_group() as task_group:
        event = Waiter(test)(task_group)
        task_group.start_soon(test2, event)


if __name__ == "__main__":
    anyio.run(main)
__call__(task_group: TaskGroup, *, name: Any = None) Self[source]

start soon in task group

copy(*args: Any, **kwargs: Any) Self[source]

create new event

Returns:

new Waiter

set() None[source]

Set the flag, notifying all listeners.

is_set() bool[source]

Return True if the flag is set, False if not.

async wait() None[source]

Wait until the flag has been set.

If the flag has already been set when this method is called, it returns immediately.

statistics() EventStatistics[source]

Return statistics about the current state of this event.

class async_wrapper.wait.Completed(task_group: TaskGroup | None = None) None[source]

Bases: object

like asyncio.as_completed()

Example

from __future__ import annotations

import anyio

from async_wrapper import Completed


async def test(
    x: int,
    sleep: float,
    result: list[int] | None = None,
) -> int:
    print(f"[{x}] test: start")
    await anyio.sleep(sleep)
    print(f"[{x}] test: end")
    if result is not None:
        result.append(x)
    return x


async def main() -> None:
    result: list[int] = []
    async with anyio.create_task_group() as task_group:
        task_group.start_soon(test, 1, 1, result)
        async with Completed(task_group) as completed:
            completed.start_soon(None, test, 2, 0.2)
            completed.start_soon(None, test, 3, 0.1)
            completed.start_soon(None, test, 4, 0.3)

            result.extend([value async for value in completed])

    assert result == [3, 2, 4, 1]

    result = []
    async with anyio.create_task_group() as task_group:
        task_group.start_soon(test, 1, 1, result)
        async with Completed() as completed:
            completed.start_soon(task_group, test, 2, 0.2)
            completed.start_soon(task_group, test, 3, 0.1)
            completed.start_soon(task_group, test, 4, 0.3)

            result.extend([value async for value in completed])

    assert result == [3, 2, 4, 1]


if __name__ == "__main__":
    anyio.run(main)
start_soon(task_group: TaskGroup | None, func: Callable[[...], Awaitable[Any]], *args: Any, name: Any = None) None[source]

Start a coroutine in a task group, similar to anyio.abc.TaskGroup.start_soon().

If a task group is already provided, the task_group parameter should be the same object.

Parameters:
task_group: TaskGroup | None

An anyio.abc.TaskGroup. Defaults to None.

func: Callable[[...], Awaitable[Any]]

The target coroutine function.

*args : Any

The arguments to pass to the coroutine function.

name: Any = None

The name used in anyio.abc.TaskGroup.start_soon(). Defaults to None.

async async_wrapper.wait.wait_for(event: Event | Iterable[Event], func: Callable[[_P], Awaitable[_T]], *args: __SPHINX_IMMATERIAL_TYPE_VAR__P__P, **kwargs: __SPHINX_IMMATERIAL_TYPE_VAR__P__P) _T[source]

Wait for an event before executing an awaitable function.

like asyncio.wait_for()

Parameters:
event: Event | Iterable[Event]

An anyio.Event or an iterable of events.

func: Callable[[_P], Awaitable[_T]]

An awaitable function to be executed.

*args : ParamSpecArgs

The arguments to pass to the awaitable function.

**kwargs : ParamSpecKwargs

The keyword arguments to pass to the awaitable function.

Returns:

The result of the executed function.

Example

import anyio

from async_wrapper import wait_for


async def test() -> None:
    print("test: start")
    await anyio.sleep(1)
    print("test: end")


async def test2(event: anyio.Event) -> None:
    print("test2: start")
    await event.wait()
    print("test2: end")


async def main() -> None:
    event = anyio.Event()
    async with anyio.create_task_group() as task_group:
        task_group.start_soon(wait_for, event, test)
        task_group.start_soon(test2, event)


if __name__ == "__main__":
    anyio.run(main)