async_wrapper.pipe.py

class async_wrapper.pipe.Disposable(*args, **kwargs)[source]

Bases: Protocol[_T, _T2]

Defines the interface for a disposable resource.

property is_disposed : bool

Check if disposed

async next(value: _T) _T2[source]

Processes the next input value and produces an output value.

Parameters:
value: _T

The input value.

Returns:

The output value.

async dispose() Any[source]

Disposes the resource and releases any associated resources.

class async_wrapper.pipe.DisposableWithCallback(*args, **kwargs)[source]

Bases: Disposable[_T, _T2], Protocol[_T, _T2]

disposable & callback

prepare_callback(subscribable: Subscribable[_T, _T2]) Any[source]

Prepare a callback to use when dispose is executed.

Parameters:
subscribable: Subscribable[_T, _T2]

subscribable object

class async_wrapper.pipe.Subscribable(*args, **kwargs)[source]

Bases: Disposable[_T, _T2], Protocol[_T, _T2]

subscribable & disposable

property size : int

listener size

subscribe(disposable: Disposable[_T2, Any] | Callable[[_T2], Awaitable[Any]], *, dispose: bool = True) Any[source]

Subscribes a disposable

Parameters:
disposable: Disposable[_T2, Any] | Callable[[_T2], Awaitable[Any]]

The disposable to subscribe.

dispose: bool = True

Whether to dispose the disposable when the pipe is disposed.

unsubscribe(disposable: Disposable[Any, Any]) None[source]

Unsubscribes a disposable

Parameters:
disposable: Disposable[Any, Any]

The disposable to unsubscribe.

class async_wrapper.pipe.SimpleDisposable(func: Callable[[_T], Awaitable[_T2]]) None[source]

Bases: DisposableWithCallback[_T, _T2], Generic[_T, _T2]

simple disposable impl.

property is_disposed : bool

Check if disposed

async next(value: _T) _T2[source]

Processes the next input value and produces an output value.

Parameters:
value: _T

The input value.

Returns:

The output value.

async dispose() Any[source]

Disposes the resource and releases any associated resources.

prepare_callback(subscribable: Subscribable[_T, _T2]) Any[source]

Prepare a callback to use when dispose is executed.

Parameters:
subscribable: Subscribable[_T, _T2]

subscribable object

class async_wrapper.pipe.Pipe(listener: Callable[[_T], Awaitable[_T2]], context: Synchronization | None = None, dispose: Callable[[], Awaitable[Any]] | None = None) None[source]

Bases: Subscribable[_T, _T2], Generic[_T, _T2]

Implements a pipe that can be used to communicate data between coroutines.

Parameters:
listener: Callable[[_T], Awaitable[_T2]]

The function that will be called to process each input value.

context: Synchronization | None = None

An optional synchronization context to use.

dispose: Callable[[], Awaitable[Any]] | None = None

An optional function that will be called to dispose the pipe.

property is_disposed : bool

Check if disposed

property size : int

listener size

async next(value: _T) _T2[source]

Processes the next input value and produces an output value.

Parameters:
value: _T

The input value.

Returns:

The output value.

async dispose() None[source]

Disposes the resource and releases any associated resources.

subscribe(disposable: Disposable[_T2, Any] | Callable[[_T2], Awaitable[Any]], *, dispose: bool = True) None[source]

Subscribes a disposable

Parameters:
disposable: Disposable[_T2, Any] | Callable[[_T2], Awaitable[Any]]

The disposable to subscribe.

dispose: bool = True

Whether to dispose the disposable when the pipe is disposed.

unsubscribe(disposable: Disposable[Any, Any]) None[source]

Unsubscribes a disposable

Parameters:
disposable: Disposable[Any, Any]

The disposable to unsubscribe.

async_wrapper.pipe.create_disposable(func: Callable[[_T], Awaitable[_T2]]) SimpleDisposable[_T, _T2][source]

SimpleDisposable shortcut

Parameters:
func: Callable[[_T], Awaitable[_T2]]

awaitable function.

Returns:

SimpleDisposable object