async_wrapper.wait.py¶
- class async_wrapper.wait.Waiter(func: collections.abc.Callable[_P, Awaitable[Any]], *args: _P.args, **kwargs: _P.kwargs) None[source]¶
Bases:
Eventwait wrapper
Example
import anyio from async_wrapper import Waiter async def test() -> None: print("test: start") await anyio.sleep(1) print("test: end") async def test2(event: anyio.Event) -> None: print("test2: start") await event.wait() print("test2: end") async def main() -> None: async with anyio.create_task_group() as task_group: event = Waiter(test)(task_group) task_group.start_soon(test2, event) if __name__ == "__main__": anyio.run(main)
-
class async_wrapper.wait.Completed(task_group: TaskGroup | None =
None) None[source]¶ Bases:
objectExample
from __future__ import annotations import anyio from async_wrapper import Completed async def test( x: int, sleep: float, result: list[int] | None = None, ) -> int: print(f"[{x}] test: start") await anyio.sleep(sleep) print(f"[{x}] test: end") if result is not None: result.append(x) return x async def main() -> None: result: list[int] = [] async with anyio.create_task_group() as task_group: task_group.start_soon(test, 1, 1, result) async with Completed(task_group) as completed: completed.start_soon(None, test, 2, 0.2) completed.start_soon(None, test, 3, 0.1) completed.start_soon(None, test, 4, 0.3) result.extend([value async for value in completed]) assert result == [3, 2, 4, 1] result = [] async with anyio.create_task_group() as task_group: task_group.start_soon(test, 1, 1, result) async with Completed() as completed: completed.start_soon(task_group, test, 2, 0.2) completed.start_soon(task_group, test, 3, 0.1) completed.start_soon(task_group, test, 4, 0.3) result.extend([value async for value in completed]) assert result == [3, 2, 4, 1] if __name__ == "__main__": anyio.run(main)
- async async_wrapper.wait.wait_for(event: Event | Iterable[Event], func: Callable[[_P], Awaitable[_T]], *args: __SPHINX_IMMATERIAL_TYPE_VAR__P__P, **kwargs: __SPHINX_IMMATERIAL_TYPE_VAR__P__P) _T[source]¶
Wait for an event before executing an awaitable function.
like
asyncio.wait_for()Example
import anyio from async_wrapper import wait_for async def test() -> None: print("test: start") await anyio.sleep(1) print("test: end") async def test2(event: anyio.Event) -> None: print("test2: start") await event.wait() print("test2: end") async def main() -> None: event = anyio.Event() async with anyio.create_task_group() as task_group: task_group.start_soon(wait_for, event, test) task_group.start_soon(test2, event) if __name__ == "__main__": anyio.run(main)