async_wrapper.wait.py

class async_wrapper.wait.Waiter(func: ~typing.Callable[[~ParamT], ~typing.Awaitable[~typing.Any]], *args: ~typing.~ParamT, **kwargs: ~typing.~ParamT)

Bases: Event

wait wrapper

Example

>>> import anyio
>>>
>>> from async_wrapper import Waiter
>>>
>>>
>>> async def test() -> None:
>>>     print("test: start")
>>>     await anyio.sleep(1)
>>>     print("test: end")
>>>
>>>
>>> async def test2(event: anyio.Event) -> None:
>>>     print("test2: start")
>>>     await event.wait()
>>>     print("test2: end")
>>>
>>>
>>> async def main() -> None:
>>>     async with anyio.create_task_group() as task_group:
>>>         event = Waiter(test)(task_group)
>>>         task_group.start_soon(test2, event)
>>>
>>>
>>> if __name__ == "__main__":
>>>     anyio.run(main)
$ poetry run python main.py
test: start
test2: start
test: end
test2: end
copy(*args: Any, **kwargs: Any) Self

create new event

Returns:

new Waiter

set() Awaitable[Any]

Set the flag, notifying all listeners.

is_set() bool

Return True if the flag is set, False if not.

async wait() None

Wait until the flag has been set.

If the flag has already been set when this method is called, it returns immediately.

statistics() EventStatistics

Return statistics about the current state of this event.

class async_wrapper.wait.Completed(task_group: TaskGroup | None = None)

Bases: object

like asyncio.as_completed()

Example

>>> from __future__ import annotations
>>>
>>> import anyio
>>>
>>> from async_wrapper import Completed
>>>
>>>
>>> async def test(
>>>     x: int,
>>>     sleep: float,
>>>     result: list[int] | None = None,
>>> ) -> int:
>>>     print(f"[{x}] test: start")
>>>     await anyio.sleep(sleep)
>>>     print(f"[{x}] test: end")
>>>     if result is not None:
>>>         result.append(x)
>>>     return x
>>>
>>>
>>> async def main() -> None:
>>>     result: list[int] = []
>>>     async with anyio.create_task_group() as task_group:
>>>         task_group.start_soon(test, 1, 1, result)
>>>         async with Completed(task_group) as completed:
>>>             completed.start_soon(None, test, 2, 0.2)
>>>             completed.start_soon(None, test, 3, 0.1)
>>>             completed.start_soon(None, test, 4, 0.3)
>>>
>>>             result.extend([value async for value in completed])
>>>
>>>     assert result == [3, 2, 4, 1]
>>>
>>>     result = []
>>>     async with anyio.create_task_group() as task_group:
>>>         task_group.start_soon(test, 1, 1, result)
>>>         async with Completed() as completed:
>>>             completed.start_soon(task_group, test, 2, 0.2)
>>>             completed.start_soon(task_group, test, 3, 0.1)
>>>             completed.start_soon(task_group, test, 4, 0.3)
>>>
>>>             result.extend([value async for value in completed])
>>>
>>>     assert result == [3, 2, 4, 1]
>>>
>>>
>>> if __name__ == "__main__":
>>>     anyio.run(main)
start_soon(task_group: TaskGroup | None, func: Callable[..., Awaitable[Any]], *args: Any, name: Any = None) None

Start a coroutine in a task group, similar to anyio.abc.TaskGroup.start_soon().

If a task group is already provided, the task_group parameter should be the same object.

Parameters:
async async_wrapper.wait.wait_for(event: ~anyio.Event | ~typing.Iterable[~anyio.Event], func: ~typing.Callable[[~ParamT], ~typing.Awaitable[~async_wrapper.wait.ValueT_co]], *args: ~typing.~ParamT, **kwargs: ~typing.~ParamT) ValueT_co

Wait for an event before executing an awaitable function.

like asyncio.wait_for()

Parameters:
  • event – An anyio.Event or an iterable of events.

  • func – An awaitable function to be executed.

Returns:

The result of the executed function.

Example

>>> import anyio
>>>
>>> from async_wrapper import wait_for
>>>
>>>
>>> async def test() -> None:
>>>     print("test: start")
>>>     await anyio.sleep(1)
>>>     print("test: end")
>>>
>>>
>>> async def test2(event: anyio.Event) -> None:
>>>     print("test2: start")
>>>     await event.wait()
>>>     print("test2: end")
>>>
>>>
>>> async def main() -> None:
>>>     event = anyio.Event()
>>>     async with anyio.create_task_group() as task_group:
>>>         task_group.start_soon(wait_for, event, test)
>>>         task_group.start_soon(test2, event)
>>>
>>>
>>> if __name__ == "__main__":
>>>     anyio.run(main)
$ poetry run python main.py
test: start
test2: start
test: end
test2: end