async_wrapper.queue.py

class async_wrapper.queue.Queue(max_size: float | None = None, *, _stream: tuple[MemoryObjectSendStream[_T], MemoryObjectReceiveStream[_T]] | None = None) None[source]

Bases: Generic[_T]

obtained from asyncio.Queue

Example

from __future__ import annotations

from typing import Any

import anyio

from async_wrapper import Queue


async def aput(queue: Queue[Any], value: Any) -> None:
    async with queue:
        await queue.aput(value)


async def main() -> None:
    queue: Queue[Any] = Queue(10)

    async with anyio.create_task_group() as task_group:
        async with queue.aputter:
            for i in range(10):
                task_group.start_soon(aput, queue.clone.putter, i)

    async with queue.agetter:
        result = {x async for x in queue}

    assert result == set(range(10))


if __name__ == "__main__":
    anyio.run(main)
property aputter : AbstractAsyncContextManager[Self]

aclose putter only

property agetter : AbstractAsyncContextManager[Self]

aclose getter only

property putter : AbstractContextManager[Self]

close putter only

property getter : AbstractContextManager[Self]

close getter only

qsize() int[source]

number of items in the queue.

property maxsize : float

number of items allowed in the queue.

empty() bool[source]

true if the queue is empty, false otherwise.

full() bool[source]

return true if there are maxsize items in the queue.

async aget(*, timeout: float | None = None) _T[source]

remove and return an item from the queue.

Parameters:
timeout: float | None = None

error occurs when over timeout. Defaults to None.

Returns:

item from queue

async aput(value: _T, *, timeout: float | None = None) None[source]

put an item into the queue.

Parameters:
value: _T

item

timeout: float | None = None

error occurs when over timeout. Defaults to None.

get() _T[source]

remove and return an item from the queue without blocking.

put(value: _T) None[source]

put an item into the queue without blocking.

async aclose() None[source]

close the stream as async

close() None[source]

close the stream as sync

property clone : _Clone[_T]

Create a queue factory for generating RestrictedQueue instances.

Returns:

A queue factory.

Added in version 0.5.2.

statistics() MemoryObjectStreamStatistics[source]

return statstics from stream

async_wrapper.queue.create_queue(max_size: float | None = None) Queue[Any][source]

create queue like asyncio.Queue

Parameters:
max_size: float | None = None

queue size. Defaults to None.

Returns:

new Queue using anyio.abc.ObjectStream