Skip to content

Pipe

pipe

Disposable

Bases: Protocol[_T, _T2]

Defines the interface for a disposable resource.

is_disposed property

is_disposed: bool

Check if disposed

next async

next(value: _T) -> _T2

Processes the next input value and produces an output value.

Parameters:

Name Type Description Default
value
_T

The input value.

required

Returns:

Type Description
_T2

The output value.

Source code in src/async_wrapper/pipe.py
async def next(self, value: _T) -> _T2:
    """
    Processes the next input value and produces an output value.

    Args:
        value: The input value.

    Returns:
        The output value.
    """
    ...  # pragma: no cover

dispose async

dispose() -> Any

Disposes the resource and releases any associated resources.

Source code in src/async_wrapper/pipe.py
async def dispose(self) -> Any:
    """Disposes the resource and releases any associated resources."""

DisposableWithCallback

Bases: Disposable[_T, _T2], Protocol[_T, _T2]

disposable & callback

is_disposed property

is_disposed: bool

Check if disposed

prepare_callback

prepare_callback(
    subscribable: Subscribable[_T, _T2],
) -> Any

Prepare a callback to use when dispose is executed.

Parameters:

Name Type Description Default
subscribable
Subscribable[_T, _T2]

subscribable object

required
Source code in src/async_wrapper/pipe.py
def prepare_callback(self, subscribable: Subscribable[_T, _T2]) -> Any:
    """Prepare a callback to use when dispose is executed.

    Args:
        subscribable: subscribable object
    """

next async

next(value: _T) -> _T2

Processes the next input value and produces an output value.

Parameters:

Name Type Description Default
value
_T

The input value.

required

Returns:

Type Description
_T2

The output value.

Source code in src/async_wrapper/pipe.py
async def next(self, value: _T) -> _T2:
    """
    Processes the next input value and produces an output value.

    Args:
        value: The input value.

    Returns:
        The output value.
    """
    ...  # pragma: no cover

dispose async

dispose() -> Any

Disposes the resource and releases any associated resources.

Source code in src/async_wrapper/pipe.py
async def dispose(self) -> Any:
    """Disposes the resource and releases any associated resources."""

Subscribable

Bases: Disposable[_T, _T2], Protocol[_T, _T2]

subscribable & disposable

size property

size: int

listener size

is_disposed property

is_disposed: bool

Check if disposed

subscribe

subscribe(
    disposable: (
        Disposable[_T2, Any]
        | Callable[[_T2], Awaitable[Any]]
    ),
    *,
    dispose: bool = True
) -> Any

Subscribes a disposable

Parameters:

Name Type Description Default
disposable
Disposable[_T2, Any] | Callable[[_T2], Awaitable[Any]]

The disposable to subscribe.

required
dispose
bool

Whether to dispose the disposable when the pipe is disposed.

True
Source code in src/async_wrapper/pipe.py
def subscribe(
    self,
    disposable: Disposable[_T2, Any] | Callable[[_T2], Awaitable[Any]],
    *,
    dispose: bool = True,
) -> Any:
    """
    Subscribes a disposable

    Args:
        disposable: The disposable to subscribe.
        dispose: Whether to dispose the disposable when the pipe is disposed.
    """

unsubscribe

unsubscribe(disposable: Disposable[Any, Any]) -> None

Unsubscribes a disposable

Parameters:

Name Type Description Default
disposable
Disposable[Any, Any]

The disposable to unsubscribe.

required
Source code in src/async_wrapper/pipe.py
def unsubscribe(self, disposable: Disposable[Any, Any]) -> None:
    """
    Unsubscribes a disposable

    Args:
        disposable: The disposable to unsubscribe.
    """

next async

next(value: _T) -> _T2

Processes the next input value and produces an output value.

Parameters:

Name Type Description Default
value
_T

The input value.

required

Returns:

Type Description
_T2

The output value.

Source code in src/async_wrapper/pipe.py
async def next(self, value: _T) -> _T2:
    """
    Processes the next input value and produces an output value.

    Args:
        value: The input value.

    Returns:
        The output value.
    """
    ...  # pragma: no cover

dispose async

dispose() -> Any

Disposes the resource and releases any associated resources.

Source code in src/async_wrapper/pipe.py
async def dispose(self) -> Any:
    """Disposes the resource and releases any associated resources."""

SimpleDisposable

SimpleDisposable(func: Callable[[_T], Awaitable[_T2]])

Bases: DisposableWithCallback[_T, _T2], Generic[_T, _T2]

simple disposable impl.

Source code in src/async_wrapper/pipe.py
def __init__(self, func: Callable[[_T], Awaitable[_T2]]) -> None:
    self._func = func
    self._is_disposed = False
    self._journals = deque()
    self._async_lock = anyio.Lock()
    self._thread_lock = threading.Lock()

is_disposed property

is_disposed: bool

Check if disposed

next async

next(value: _T) -> _T2

Processes the next input value and produces an output value.

Parameters:

Name Type Description Default
value
_T

The input value.

required

Returns:

Type Description
_T2

The output value.

Source code in src/async_wrapper/pipe.py
@override
async def next(self, value: _T) -> _T2:
    if self._is_disposed:
        raise AlreadyDisposedError("disposable already disposed")
    return await self._func(value)

dispose async

dispose() -> Any

Disposes the resource and releases any associated resources.

Source code in src/async_wrapper/pipe.py
@override
async def dispose(self) -> Any:
    async with self._async_lock:
        while self._journals:
            journal = self._journals.popleft()
            journal.unsubscribe(self)
    self._is_disposed = True

prepare_callback

prepare_callback(
    subscribable: Subscribable[_T, _T2],
) -> Any

Prepare a callback to use when dispose is executed.

Parameters:

Name Type Description Default
subscribable
Subscribable[_T, _T2]

subscribable object

required
Source code in src/async_wrapper/pipe.py
@override
def prepare_callback(self, subscribable: Subscribable[_T, _T2]) -> Any:
    if self._is_disposed:
        raise AlreadyDisposedError("disposable already disposed")

    with self._thread_lock:
        self._journals.append(subscribable)

Pipe

Pipe(
    listener: Callable[[_T], Awaitable[_T2]],
    context: Synchronization | None = None,
    dispose: Callable[[], Awaitable[Any]] | None = None,
)

Bases: Subscribable[_T, _T2], Generic[_T, _T2]

Implements a pipe that can be used to communicate data between coroutines.

Parameters:

Name Type Description Default

listener

Callable[[_T], Awaitable[_T2]]

The function that will be called to process each input value.

required

context

Synchronization | None

An optional synchronization context to use.

None

dispose

Callable[[], Awaitable[Any]] | None

An optional function that will be called to dispose the pipe.

None
Source code in src/async_wrapper/pipe.py
def __init__(
    self,
    listener: Callable[[_T], Awaitable[_T2]],
    context: Synchronization | None = None,
    dispose: Callable[[], Awaitable[Any]] | None = None,
) -> None:
    self._listener = listener
    self._context = context or {}
    self._listeners = {}
    self._dispose = dispose
    self._is_disposed = False
    self._dispose_lock = anyio.Lock()

is_disposed property

is_disposed: bool

Check if disposed

size property

size: int

listener size

next async

next(value: _T) -> _T2

Processes the next input value and produces an output value.

Parameters:

Name Type Description Default
value
_T

The input value.

required

Returns:

Type Description
_T2

The output value.

Source code in src/async_wrapper/pipe.py
@override
async def next(self, value: _T) -> _T2:
    if self._is_disposed:
        raise AlreadyDisposedError("pipe already disposed")

    output = await self._listener(value)

    async with anyio.create_task_group() as task_group:
        for listener in self._listeners:
            task_group.start_soon(_call_next, self._context, listener, output)

    return output

dispose async

dispose() -> None

Disposes the resource and releases any associated resources.

Source code in src/async_wrapper/pipe.py
@override
async def dispose(self) -> None:
    async with self._dispose_lock:
        if self._is_disposed:
            return

        async with anyio.create_task_group() as task_group:
            if self._dispose is not None:
                task_group.start_soon(_call_dispose, self._context, self._dispose)

            for listener, do_dispose in self._listeners.items():
                if not do_dispose:
                    continue
                task_group.start_soon(_call_dispose, self._context, listener)

        self._is_disposed = True

subscribe

subscribe(
    disposable: (
        Disposable[_T2, Any]
        | Callable[[_T2], Awaitable[Any]]
    ),
    *,
    dispose: bool = True
) -> None

Subscribes a disposable

Parameters:

Name Type Description Default
disposable
Disposable[_T2, Any] | Callable[[_T2], Awaitable[Any]]

The disposable to subscribe.

required
dispose
bool

Whether to dispose the disposable when the pipe is disposed.

True
Source code in src/async_wrapper/pipe.py
@override
def subscribe(
    self,
    disposable: Disposable[_T2, Any] | Callable[[_T2], Awaitable[Any]],
    *,
    dispose: bool = True,
) -> None:
    if self._is_disposed:
        raise AlreadyDisposedError("pipe already disposed")

    if not isinstance(disposable, Disposable):
        disposable = SimpleDisposable(disposable)
    self._listeners[disposable] = dispose
    if isinstance(disposable, DisposableWithCallback):
        disposable.prepare_callback(self)

unsubscribe

unsubscribe(disposable: Disposable[Any, Any]) -> None

Unsubscribes a disposable

Parameters:

Name Type Description Default
disposable
Disposable[Any, Any]

The disposable to unsubscribe.

required
Source code in src/async_wrapper/pipe.py
@override
def unsubscribe(self, disposable: Disposable[Any, Any]) -> None:
    self._listeners.pop(disposable, None)

create_disposable

create_disposable(
    func: Callable[[_T], Awaitable[_T2]],
) -> SimpleDisposable[_T, _T2]

SimpleDisposable shortcut

Parameters:

Name Type Description Default

func

Callable[[_T], Awaitable[_T2]]

awaitable function.

required

Returns:

Type Description
SimpleDisposable[_T, _T2]

SimpleDisposable object

Source code in src/async_wrapper/pipe.py
def create_disposable(
    func: Callable[[_T], Awaitable[_T2]],
) -> SimpleDisposable[_T, _T2]:
    """SimpleDisposable shortcut

    Args:
        func: awaitable function.

    Returns:
        SimpleDisposable object
    """
    return SimpleDisposable(func)