from __future__ import annotations
from collections import deque
from contextlib import AsyncExitStack, suppress
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Generic,
Protocol,
runtime_checkable,
)
import anyio
from typing_extensions import TypedDict, TypeVar, override
from async_wrapper.exception import AlreadyDisposedError
if TYPE_CHECKING:
from anyio.abc import CapacityLimiter, Lock, Semaphore
class Synchronization(TypedDict, total=False):
semaphore: Semaphore
lock: Lock
limiter: CapacityLimiter
__all__ = ["Disposable", "SimpleDisposable", "Pipe", "create_disposable"]
InputT = TypeVar("InputT", infer_variance=True)
OutputT = TypeVar("OutputT", infer_variance=True)
[docs]@runtime_checkable
class Disposable(Protocol[InputT, OutputT]):
"""
Defines the interface for a disposable resource.
Type Parameters:
InputT: The type of input data.
OutputT: The type of output data.
"""
[docs] async def next(self, value: InputT) -> OutputT:
"""
Processes the next input value and produces an output value.
Args:
value: The input value.
Returns:
The output value.
"""
... # pragma: no cover
[docs] async def dispose(self) -> Any:
"""Disposes the resource and releases any associated resources."""
[docs]class SimpleDisposable(Disposable[InputT, OutputT], Generic[InputT, OutputT]):
"""simple disposable impl."""
__slots__ = ("_func", "_dispose", "_is_disposed")
def __init__(
self, func: Callable[[InputT], Awaitable[OutputT]], *, dispose: bool = True
) -> None:
self._func = func
self._dispose = dispose
self._is_disposed = False
@property
def is_disposed(self) -> bool:
"""is disposed"""
return self._is_disposed
[docs] @override
async def next(self, value: InputT) -> OutputT:
if self._is_disposed:
raise AlreadyDisposedError("disposable already disposed")
return await self._func(value)
[docs] @override
async def dispose(self) -> Any:
self._is_disposed = True
[docs]class Pipe(Disposable[InputT, OutputT], Generic[InputT, OutputT]):
"""
Implements a pipe that can be used to communicate data between coroutines.
Type Parameters:
InputT: The type of input data.
OutputT: The type of output data.
Args:
listener: The function that will be called to process each input value.
context: An optional synchronization context to use.
dispose: An optional function that will be called to dispose the pipe.
"""
_context: Synchronization
_listener: Callable[[InputT], Awaitable[OutputT]]
_listeners: deque[tuple[Disposable[OutputT, Any], bool]]
_dispose: Callable[[], Awaitable[Any]] | None
_is_disposed: bool
_dispose_lock: Lock
__slots__ = (
"_context",
"_listener",
"_listeners",
"_dispose",
"_is_disposed",
"_dispose_lock",
)
def __init__(
self,
listener: Callable[[InputT], Awaitable[OutputT]],
context: Synchronization | None = None,
dispose: Callable[[], Awaitable[Any]] | None = None,
) -> None:
self._listener = listener
self._context = context or {}
self._listeners = deque()
self._dispose = dispose
self._is_disposed = False
self._dispose_lock = anyio.Lock()
@property
def is_disposed(self) -> bool:
"""is disposed"""
return self._is_disposed
[docs] @override
async def next(self, value: InputT) -> OutputT:
if self._is_disposed:
raise AlreadyDisposedError("pipe already disposed")
output = await self._listener(value)
async with anyio.create_task_group() as task_group:
for listener, _ in self._listeners:
task_group.start_soon(_call_next, self._context, listener, output)
return output
[docs] @override
async def dispose(self) -> None:
async with self._dispose_lock:
if self._is_disposed:
return
async with anyio.create_task_group() as task_group:
if self._dispose is not None:
task_group.start_soon(_call_dispose, self._context, self._dispose)
for listener, do_dispose in self._listeners:
if not do_dispose:
continue
task_group.start_soon(_call_dispose, self._context, listener)
self._is_disposed = True
[docs] def subscribe(
self,
listener: Disposable[OutputT, Any] | Callable[[OutputT], Awaitable[Any]],
*,
dispose: bool = True,
) -> None:
"""
Subscribes a listener to the pipe.
Args:
listener: The listener to subscribe.
dispose: Whether to dispose the listener when the pipe is disposed.
"""
if self._is_disposed:
raise AlreadyDisposedError("pipe already disposed")
if not isinstance(listener, Disposable):
listener = SimpleDisposable(listener)
self._listeners.append((listener, dispose))
[docs]def create_disposable(
func: Callable[[InputT], Awaitable[OutputT]], *, dispose: bool = True
) -> SimpleDisposable[InputT, OutputT]:
"""SimpleDisposable shortcut
Args:
func: awaitable function.
dispose: dispose flag. Defaults to True.
Returns:
SimpleDisposable object
"""
return SimpleDisposable(func, dispose=dispose)
async def _enter_context(stack: AsyncExitStack, context: Synchronization) -> None:
semaphore = context.get("semaphore")
if semaphore is not None:
await stack.enter_async_context(semaphore)
limiter = context.get("limiter")
if limiter is not None:
await stack.enter_async_context(limiter)
lock = context.get("lock")
if lock is not None:
await stack.enter_async_context(lock)
async def _call_next(
context: Synchronization, disposable: Disposable[InputT, Any], value: InputT
) -> None:
async with AsyncExitStack() as stack:
await _enter_context(stack, context)
with suppress(AlreadyDisposedError):
await disposable.next(value)
async def _call_dispose(
context: Synchronization,
disposable: Disposable[Any, Any] | Callable[[], Awaitable[Any]],
) -> None:
async with AsyncExitStack() as stack:
await _enter_context(stack, context)
if isinstance(disposable, Disposable):
await disposable.dispose()
else:
await disposable()