async_wrapper.wait.py

class async_wrapper.wait.Waiter(func: Callable[ParamT, Awaitable[Any]], *args: ParamT.args, **kwargs: ParamT.kwargs)[source]

Bases: Event

wait wrapper

Example

>>> import anyio
>>>
>>> from async_wrapper import Waiter
>>>
>>>
>>> async def test() -> None:
>>>     print("test: start")
>>>     await anyio.sleep(1)
>>>     print("test: end")
>>>
>>>
>>> async def test2(event: anyio.Event) -> None:
>>>     print("test2: start")
>>>     await event.wait()
>>>     print("test2: end")
>>>
>>>
>>> async def main() -> None:
>>>     async with anyio.create_task_group() as task_group:
>>>         event = Waiter(test)(task_group)
>>>         task_group.start_soon(test2, event)
>>>
>>>
>>> if __name__ == "__main__":
>>>     anyio.run(main)
$ poetry run python main.py
test: start
test2: start
test: end
test2: end
copy(*args: Any, **kwargs: Any) Self[source]

create new event

Returns:

new Waiter

set() None[source]

Set the flag, notifying all listeners.

is_set() bool[source]

Return True if the flag is set, False if not.

async wait() None[source]

Wait until the flag has been set.

If the flag has already been set when this method is called, it returns immediately.

statistics() EventStatistics[source]

Return statistics about the current state of this event.

class async_wrapper.wait.Completed(task_group: TaskGroup | None = None)[source]

Bases: object

like asyncio.as_completed()

Example

>>> from __future__ import annotations
>>>
>>> import anyio
>>>
>>> from async_wrapper import Completed
>>>
>>>
>>> async def test(
>>>     x: int,
>>>     sleep: float,
>>>     result: list[int] | None = None,
>>> ) -> int:
>>>     print(f"[{x}] test: start")
>>>     await anyio.sleep(sleep)
>>>     print(f"[{x}] test: end")
>>>     if result is not None:
>>>         result.append(x)
>>>     return x
>>>
>>>
>>> async def main() -> None:
>>>     result: list[int] = []
>>>     async with anyio.create_task_group() as task_group:
>>>         task_group.start_soon(test, 1, 1, result)
>>>         async with Completed(task_group) as completed:
>>>             completed.start_soon(None, test, 2, 0.2)
>>>             completed.start_soon(None, test, 3, 0.1)
>>>             completed.start_soon(None, test, 4, 0.3)
>>>
>>>             result.extend([value async for value in completed])
>>>
>>>     assert result == [3, 2, 4, 1]
>>>
>>>     result = []
>>>     async with anyio.create_task_group() as task_group:
>>>         task_group.start_soon(test, 1, 1, result)
>>>         async with Completed() as completed:
>>>             completed.start_soon(task_group, test, 2, 0.2)
>>>             completed.start_soon(task_group, test, 3, 0.1)
>>>             completed.start_soon(task_group, test, 4, 0.3)
>>>
>>>             result.extend([value async for value in completed])
>>>
>>>     assert result == [3, 2, 4, 1]
>>>
>>>
>>> if __name__ == "__main__":
>>>     anyio.run(main)
start_soon(task_group: TaskGroup | None, func: Callable[..., Awaitable[Any]], *args: Any, name: Any = None) None[source]

Start a coroutine in a task group, similar to anyio.abc.TaskGroup.start_soon().

If a task group is already provided, the task_group parameter should be the same object.

Parameters:
async async_wrapper.wait.wait_for(event: Event | Iterable[Event], func: Callable[ParamT, Awaitable[ValueT]], *args: ParamT.args, **kwargs: ParamT.kwargs) ValueT[source]

Wait for an event before executing an awaitable function.

like asyncio.wait_for()

Parameters:
  • event – An anyio.Event or an iterable of events.

  • func – An awaitable function to be executed.

  • *args – The arguments to pass to the awaitable function.

  • **kwargs – The keyword arguments to pass to the awaitable function.

Returns:

The result of the executed function.

Example

>>> import anyio
>>>
>>> from async_wrapper import wait_for
>>>
>>>
>>> async def test() -> None:
>>>     print("test: start")
>>>     await anyio.sleep(1)
>>>     print("test: end")
>>>
>>>
>>> async def test2(event: anyio.Event) -> None:
>>>     print("test2: start")
>>>     await event.wait()
>>>     print("test2: end")
>>>
>>>
>>> async def main() -> None:
>>>     event = anyio.Event()
>>>     async with anyio.create_task_group() as task_group:
>>>         task_group.start_soon(wait_for, event, test)
>>>         task_group.start_soon(test2, event)
>>>
>>>
>>> if __name__ == "__main__":
>>>     anyio.run(main)
$ poetry run python main.py
test: start
test2: start
test: end
test2: end