Source code for async_wrapper.convert._sync

from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor, wait
from contextvars import ContextVar
from functools import partial, wraps
from typing import Awaitable, Callable

import anyio
from sniffio import AsyncLibraryNotFoundError, current_async_library
from typing_extensions import ParamSpec, TypeVar

ValueT = TypeVar("ValueT", infer_variance=True)
ParamT = ParamSpec("ParamT")

__all__ = ["async_to_sync"]

current_async_lib_var = ContextVar("current_async_lib", default="asyncio")
use_uvloop_var = ContextVar("use_uvloop", default=False)


[docs]def async_to_sync( func: Callable[ParamT, Awaitable[ValueT]], ) -> Callable[ParamT, ValueT]: """ Convert an awaitable function to a synchronous function. If used within an asynchronous context, attempts to use the same backend. Defaults to asyncio. Args: func: An awaitable function. Returns: A synchronous function. Example: >>> import asyncio >>> import time >>> >>> import anyio >>> import sniffio >>> >>> from async_wrapper import async_to_sync >>> >>> >>> @async_to_sync >>> async def test(x: int) -> int: >>> backend = sniffio.current_async_library() >>> if backend == "asyncio": >>> loop = asyncio.get_running_loop() >>> print(backend, loop) >>> else: >>> print(backend) >>> await anyio.sleep(1) >>> return x >>> >>> >>> def main() -> None: >>> start = time.perf_counter() >>> result = test(1) >>> end = time.perf_counter() >>> assert result == 1 >>> assert end - start < 1.1 >>> >>> >>> async def async_main() -> None: >>> start = time.perf_counter() >>> result = test(1) >>> end = time.perf_counter() >>> assert result == 1 >>> assert end - start < 1.1 >>> >>> >>> if __name__ == "__main__": >>> main() >>> anyio.run( >>> async_main, >>> backend="asyncio", >>> backend_options={"use_uvloop": True}, >>> ) >>> anyio.run( >>> async_main, >>> backend="asyncio", >>> backend_options={"use_uvloop": True}, >>> ) >>> anyio.run(async_main, backend="trio") $ poetry run python main.py asyncio <_UnixSelectorEventLoop running=True closed=False debug=False> asyncio <_UnixSelectorEventLoop running=True closed=False debug=False> asyncio <uvloop.Loop running=True closed=False debug=False> trio """ sync_func = _as_sync(func) @wraps(func) def inner(*args: ParamT.args, **kwargs: ParamT.kwargs) -> ValueT: backend = _get_current_backend() use_uvloop = _check_uvloop() with ThreadPoolExecutor( 1, initializer=_init, initargs=(backend, use_uvloop) ) as pool: future = pool.submit(sync_func, *args, **kwargs) wait([future]) return future.result() return inner
def _as_sync(func: Callable[ParamT, Awaitable[ValueT]]) -> Callable[ParamT, ValueT]: @wraps(func) def inner(*args: ParamT.args, **kwargs: ParamT.kwargs) -> ValueT: return _run(func, *args, **kwargs) return inner def _run( func: Callable[ParamT, Awaitable[ValueT]], *args: ParamT.args, **kwargs: ParamT.kwargs, ) -> ValueT: backend = _get_current_backend() new_func = partial(func, *args, **kwargs) backend_options = {} if backend == "asyncio": backend_options["use_uvloop"] = _check_uvloop() return anyio.run(new_func, backend=backend, backend_options=backend_options) def _check_uvloop() -> bool: if use_uvloop_var.get(): return True try: import uvloop # type: ignore except ImportError: return False import asyncio policy = asyncio.get_event_loop_policy() return isinstance(policy, uvloop.EventLoopPolicy) def _get_current_backend() -> str: try: return current_async_library() except AsyncLibraryNotFoundError: return current_async_lib_var.get() def _init(backend: str, use_uvloop: bool) -> None: # noqa: FBT001 current_async_lib_var.set(backend) use_uvloop_var.set(use_uvloop)