async_wrapper.wait.py
- class async_wrapper.wait.Waiter(func: ~typing.Callable[[~ParamT], ~typing.Awaitable[~typing.Any]], *args: ~typing.~ParamT, **kwargs: ~typing.~ParamT)[source]
Bases:
Eventwait wrapper
Example
>>> import anyio >>> >>> from async_wrapper import Waiter >>> >>> >>> async def test() -> None: >>> print("test: start") >>> await anyio.sleep(1) >>> print("test: end") >>> >>> >>> async def test2(event: anyio.Event) -> None: >>> print("test2: start") >>> await event.wait() >>> print("test2: end") >>> >>> >>> async def main() -> None: >>> async with anyio.create_task_group() as task_group: >>> event = Waiter(test)(task_group) >>> task_group.start_soon(test2, event) >>> >>> >>> if __name__ == "__main__": >>> anyio.run(main) $ poetry run python main.py test: start test2: start test: end test2: end
- class async_wrapper.wait.Completed(task_group: TaskGroup | None = None)[source]
Bases:
objectlike
asyncio.as_completed()Example
>>> from __future__ import annotations >>> >>> import anyio >>> >>> from async_wrapper import Completed >>> >>> >>> async def test( >>> x: int, >>> sleep: float, >>> result: list[int] | None = None, >>> ) -> int: >>> print(f"[{x}] test: start") >>> await anyio.sleep(sleep) >>> print(f"[{x}] test: end") >>> if result is not None: >>> result.append(x) >>> return x >>> >>> >>> async def main() -> None: >>> result: list[int] = [] >>> async with anyio.create_task_group() as task_group: >>> task_group.start_soon(test, 1, 1, result) >>> async with Completed(task_group) as completed: >>> completed.start_soon(None, test, 2, 0.2) >>> completed.start_soon(None, test, 3, 0.1) >>> completed.start_soon(None, test, 4, 0.3) >>> >>> result.extend([value async for value in completed]) >>> >>> assert result == [3, 2, 4, 1] >>> >>> result = [] >>> async with anyio.create_task_group() as task_group: >>> task_group.start_soon(test, 1, 1, result) >>> async with Completed() as completed: >>> completed.start_soon(task_group, test, 2, 0.2) >>> completed.start_soon(task_group, test, 3, 0.1) >>> completed.start_soon(task_group, test, 4, 0.3) >>> >>> result.extend([value async for value in completed]) >>> >>> assert result == [3, 2, 4, 1] >>> >>> >>> if __name__ == "__main__": >>> anyio.run(main)
- start_soon(task_group: TaskGroup | None, func: Callable[..., Awaitable[Any]], *args: Any, name: Any = None) None[source]
Start a coroutine in a task group, similar to
anyio.abc.TaskGroup.start_soon().If a task group is already provided, the task_group parameter should be the same object.
- Parameters:
task_group – An
anyio.abc.TaskGroup. Defaults to None.func – The target coroutine function.
name – The name used in
anyio.abc.TaskGroup.start_soon(). Defaults to None.
- async async_wrapper.wait.wait_for(event: ~anyio.Event | ~typing.Iterable[~anyio.Event], func: ~typing.Callable[[~ParamT], ~typing.Awaitable[~async_wrapper.wait.ValueT_co]], *args: ~typing.~ParamT, **kwargs: ~typing.~ParamT) ValueT_co[source]
Wait for an event before executing an awaitable function.
like
asyncio.wait_for()- Parameters:
event – An
anyio.Eventor an iterable of events.func – An awaitable function to be executed.
- Returns:
The result of the executed function.
Example
>>> import anyio >>> >>> from async_wrapper import wait_for >>> >>> >>> async def test() -> None: >>> print("test: start") >>> await anyio.sleep(1) >>> print("test: end") >>> >>> >>> async def test2(event: anyio.Event) -> None: >>> print("test2: start") >>> await event.wait() >>> print("test2: end") >>> >>> >>> async def main() -> None: >>> event = anyio.Event() >>> async with anyio.create_task_group() as task_group: >>> task_group.start_soon(wait_for, event, test) >>> task_group.start_soon(test2, event) >>> >>> >>> if __name__ == "__main__": >>> anyio.run(main) $ poetry run python main.py test: start test2: start test: end test2: end