Skip to content

Queue

queue

Queue

Queue(
    max_size: float | None = None,
    *,
    _stream: (
        tuple[
            MemoryObjectSendStream[_T],
            MemoryObjectReceiveStream[_T],
        ]
        | None
    ) = None
)

Bases: Generic[_T]

obtained from asyncio.Queue

Examples:

from __future__ import annotations

from typing import Any

import anyio

from async_wrapper import Queue


async def aput(queue: Queue[Any], value: Any) -> None:
    async with queue:
        await queue.aput(value)


async def main() -> None:
    queue: Queue[Any] = Queue(10)

    async with anyio.create_task_group() as task_group:
        async with queue.aputter:
            for i in range(10):
                task_group.start_soon(aput, queue.clone.putter, i)

    async with queue.agetter:
        result = {x async for x in queue}

    assert result == set(range(10))


if __name__ == "__main__":
    anyio.run(main)
Source code in src/async_wrapper/queue.py
def __init__(
    self,
    max_size: float | None = None,
    *,
    _stream: tuple[MemoryObjectSendStream[_T], MemoryObjectReceiveStream[_T]]
    | None = None,
) -> None:
    self._init(max_size, _stream=_stream)

aputter property

aclose putter only

agetter property

aclose getter only

putter property

close putter only

getter property

close getter only

maxsize property

maxsize: float

number of items allowed in the queue.

clone property

clone: _Clone[_T]

Create a queue factory for generating RestrictedQueue instances.

Returns:

Type Description
_Clone[_T]

A queue factory.

.. versionadded:: 0.5.2

qsize

qsize() -> int

number of items in the queue.

Source code in src/async_wrapper/queue.py
def qsize(self) -> int:
    """number of items in the queue."""
    return len(self._getter._state.buffer)  # noqa: SLF001

empty

empty() -> bool

true if the queue is empty, false otherwise.

Source code in src/async_wrapper/queue.py
def empty(self) -> bool:
    """true if the queue is empty, false otherwise."""
    return self.qsize() <= 0

full

full() -> bool

return true if there are maxsize items in the queue.

Source code in src/async_wrapper/queue.py
def full(self) -> bool:
    """return true if there are maxsize items in the queue."""
    if self.maxsize == math.inf:
        return False
    return self.qsize() >= self.maxsize

aget async

aget(*, timeout: float | None = None) -> _T

remove and return an item from the queue.

Parameters:

Name Type Description Default
timeout
float | None

error occurs when over timeout. Defaults to None.

None

Returns:

Type Description
_T

item from queue

Source code in src/async_wrapper/queue.py
async def aget(self, *, timeout: float | None = None) -> _T:
    """
    remove and return an item from the queue.

    Args:
        timeout: error occurs when over timeout. Defaults to None.

    Returns:
        item from queue
    """
    with fail_after(timeout):
        return await self._aget()

aput async

aput(value: _T, *, timeout: float | None = None) -> None

put an item into the queue.

Parameters:

Name Type Description Default
value
_T

item

required
timeout
float | None

error occurs when over timeout. Defaults to None.

None
Source code in src/async_wrapper/queue.py
async def aput(self, value: _T, *, timeout: float | None = None) -> None:
    """
    put an item into the queue.

    Args:
        value: item
        timeout: error occurs when over timeout. Defaults to None.
    """
    with fail_after(timeout):
        await self._aput(value)

get

get() -> _T

remove and return an item from the queue without blocking.

Source code in src/async_wrapper/queue.py
def get(self) -> _T:
    """remove and return an item from the queue without blocking."""
    try:
        return self._getter.receive_nowait()
    except WouldBlock as exc:
        if self.empty():
            raise QueueEmptyError from exc
        raise QueueBrokenError from exc
    except (ClosedResourceError, BrokenResourceError) as exc:
        raise QueueBrokenError from exc

put

put(value: _T) -> None

put an item into the queue without blocking.

Source code in src/async_wrapper/queue.py
def put(self, value: _T) -> None:
    """put an item into the queue without blocking."""
    try:
        self._putter.send_nowait(value)
    except WouldBlock as exc:
        if self.full():
            raise QueueFullError from exc
        raise QueueBrokenError from exc
    except (ClosedResourceError, BrokenResourceError) as exc:
        raise QueueBrokenError from exc

aclose async

aclose() -> None

close the stream as async

Source code in src/async_wrapper/queue.py
async def aclose(self) -> None:
    """close the stream as async"""
    exc_group: ExceptionGroup[Any]
    try:
        await self._aclose()
    except (ClosedResourceError, BrokenResourceError) as exc:
        raise QueueBrokenError from exc
    except ExceptionGroup as exc_group:
        if all(
            isinstance(error, (ClosedResourceError, BrokenResourceError))
            for error in exc_group.exceptions
        ):
            raise QueueBrokenError from exc_group
        raise

close

close() -> None

close the stream as sync

Source code in src/async_wrapper/queue.py
def close(self) -> None:
    """close the stream as sync"""
    try:
        self._close()
    except (ClosedResourceError, BrokenResourceError) as exc:
        raise QueueBrokenError from exc

statistics

return statstics from stream

Source code in src/async_wrapper/queue.py
def statistics(self) -> MemoryObjectStreamStatistics:
    """return statstics from stream"""
    return self._getter._state.statistics()  # noqa: SLF001

create_queue

create_queue(max_size: float | None = None) -> Queue[Any]

create queue like asyncio.Queue

Parameters:

Name Type Description Default

max_size

float | None

queue size. Defaults to None.

None

Returns:

Type Description
Queue[Any]
Source code in src/async_wrapper/queue.py
def create_queue(max_size: float | None = None) -> Queue[Any]:
    """
    create queue like [`asyncio.Queue`][]

    Args:
        max_size: queue size. Defaults to None.

    Returns:
        new [`Queue`][async_wrapper.Queue] using [`anyio.abc.ObjectStream`][]
    """
    return Queue(max_size)