async_wrapper.pipe.py

class async_wrapper.pipe.Disposable(*args, **kwargs)[source]

Bases: Protocol[InputT, OutputT]

Defines the interface for a disposable resource.

property is_disposed: bool

Check if disposed

async next(value: InputT) OutputT[source]

Processes the next input value and produces an output value.

Parameters:

value – The input value.

Returns:

The output value.

async dispose() Any[source]

Disposes the resource and releases any associated resources.

class async_wrapper.pipe.DisposableWithCallback(*args, **kwargs)[source]

Bases: Disposable[InputT, OutputT], Protocol[InputT, OutputT]

disposable & callback

prepare_callback(subscribable: Subscribable) Any[source]

Prepare a callback to use when dispose is executed.

Parameters:

subscribable – subscribable object

class async_wrapper.pipe.Subscribable(*args, **kwargs)[source]

Bases: Disposable[InputT, OutputT], Protocol[InputT, OutputT]

subscribable & disposable

property size: int

listener size

subscribe(disposable: Disposable[OutputT, Any] | Callable[[OutputT], Awaitable[Any]], *, dispose: bool = True) Any[source]

Subscribes a disposable

Parameters:
  • disposable – The disposable to subscribe.

  • dispose – Whether to dispose the disposable when the pipe is disposed.

unsubscribe(disposable: Disposable[Any, Any]) None[source]

Unsubscribes a disposable

Parameters:

disposable – The disposable to unsubscribe.

class async_wrapper.pipe.SimpleDisposable(func: Callable[[InputT], Awaitable[OutputT]], *, dispose: bool = True)[source]

Bases: DisposableWithCallback[InputT, OutputT], Generic[InputT, OutputT]

simple disposable impl.

property is_disposed: bool

Check if disposed

async next(value: InputT) OutputT[source]

Processes the next input value and produces an output value.

Parameters:

value – The input value.

Returns:

The output value.

async dispose() Any[source]

Disposes the resource and releases any associated resources.

prepare_callback(subscribable: Subscribable) Any[source]

Prepare a callback to use when dispose is executed.

Parameters:

subscribable – subscribable object

class async_wrapper.pipe.Pipe(listener: Callable[[InputT], Awaitable[OutputT]], context: Synchronization | None = None, dispose: Callable[[], Awaitable[Any]] | None = None)[source]

Bases: Subscribable[InputT, OutputT], Generic[InputT, OutputT]

Implements a pipe that can be used to communicate data between coroutines.

Parameters:
  • listener – The function that will be called to process each input value.

  • context – An optional synchronization context to use.

  • dispose – An optional function that will be called to dispose the pipe.

property is_disposed: bool

Check if disposed

property size: int

listener size

async next(value: InputT) OutputT[source]

Processes the next input value and produces an output value.

Parameters:

value – The input value.

Returns:

The output value.

async dispose() None[source]

Disposes the resource and releases any associated resources.

subscribe(disposable: Disposable[OutputT, Any] | Callable[[OutputT], Awaitable[Any]], *, dispose: bool = True) None[source]

Subscribes a disposable

Parameters:
  • disposable – The disposable to subscribe.

  • dispose – Whether to dispose the disposable when the pipe is disposed.

unsubscribe(disposable: Disposable[Any, Any]) None[source]

Unsubscribes a disposable

Parameters:

disposable – The disposable to unsubscribe.

async_wrapper.pipe.create_disposable(func: Callable[[InputT], Awaitable[OutputT]], *, dispose: bool = True) SimpleDisposable[InputT, OutputT][source]

SimpleDisposable shortcut

Parameters:
  • func – awaitable function.

  • dispose – dispose flag. Defaults to True.

Returns:

SimpleDisposable object