Source code for async_wrapper.convert._async

from __future__ import annotations

from functools import cached_property, partial, wraps
from typing import TYPE_CHECKING, Any, Generic

from anyio import to_thread
from typing_extensions import ParamSpec, TypeVar

if TYPE_CHECKING:
    from collections.abc import Awaitable, Callable, Coroutine

_T = TypeVar("_T", infer_variance=True)
_P = ParamSpec("_P")

__all__ = ["sync_to_async"]


class Async(Generic[_P, _T]):
    def __init__(self, func: Callable[_P, _T]) -> None:
        self._func = func

    @cached_property
    def _wrapped(self) -> Callable[_P, Coroutine[Any, Any, _T]]:
        @wraps(self._func)
        async def inner(*args: _P.args, **kwargs: _P.kwargs) -> _T:
            return await to_thread.run_sync(partial(self._func, *args, **kwargs))

        return inner

    async def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _T:
        return await self._wrapped(*args, **kwargs)


[docs] def sync_to_async(func: Callable[_P, _T]) -> Callable[_P, Awaitable[_T]]: """ Convert a synchronous function to an asynchronous function. Args: func: The synchronous function to be converted. Returns: An asynchronous function that behaves equivalently to the input synchronous function. Example: .. code-block:: python import time import anyio from async_wrapper import sync_to_async @sync_to_async def test(x: int) -> int: print(f"[{x}] test: start") time.sleep(1) print(f"[{x}] test: end") return x async def main() -> None: start = time.perf_counter() async with anyio.create_task_group() as task_group: for i in range(4): task_group.start_soon(test, i) end = time.perf_counter() assert end - start < 1.1 if __name__ == "__main__": anyio.run(main) """ from async_wrapper.convert._sync.main import Sync if isinstance(func, Sync): return func._func # noqa: SLF001 return Async(func)