Implementing Scalable Async I/O with Python Asyncio Queues

Mar 25, 2024 ยท 3 min read

Asyncio queues provide a great way to pass data between asynchronous tasks in Python. They enable building scalable asynchronous I/O flows without some of the downsides of threads or processes. In this post, we'll dive into how to use asyncio queues effectively in your Python applications.

What are Asyncio Queues?

An asyncio queue is a first in, first out (FIFO) data structure designed for use in asynchronous programming with Python's asyncio module. Asyncio queues are thread and process safe, designed for passing data between coroutines and tasks.

Some key properties of asyncio queues:

  • FIFO ordering - First item in is first out.
  • Size limiting - Queues can have a maximum size.
  • Blocking and timeouts - Get operations can wait and timeout.
  • Asynchronous iteration - Queues can be iterated without blocking the event loop.
  • import asyncio
    
    queue = asyncio.Queue()

    Asyncio queues unlock asynchronous producer/consumer patterns without many downsides of threads or subprocesses.

    Producer/Consumer With an Asyncio Queue

    A simple async queue usage pattern is:

  • Producer coroutines add data to the queue
  • Consumer coroutines process data from the queue
  • async def producer(queue):
        for i in range(10):
             await queue.put(i)
             print(f"Added {i} to queue")
    
    async def consumer(queue):
        while True:
            item = await queue.get()
            print(f"Got {item} off the queue")
            await asyncio.sleep(1)
    
    queue = asyncio.Queue()
    
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))
    
    await producer_task
    await consumer_task

    This shows the basics - producers adding data, consumers processing the data.

    Queue Size Limiting

    By default asyncio queues can grow unbounded in size. This can lead to runaway memory usage.

    We can set a maxsize when creating the queue to limit memory usage:

    queue = asyncio.Queue(maxsize=100)

    Now if a producer adds data when the queue already has 100 items, it will block waiting for space to open up.

    Checking Queue Size

    It can be useful for consumers to know the queue size to make processing decisions:

    print(f"Queue size: {queue.qsize()}")

    The consumer can adjust parallelism based on the current depth.

    Blocking Gets with Timeouts

    By default queue.get() waits forever for an item. We can add timeouts to keep the consumer from blocking the event loop too long:

    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=1.0)
            print(f"Got {item}")
        except asyncio.TimeoutError:
            print("No item after 1s, doing other work")

    Now the consumer coroutine will keep processing events instead of blocking indefinitely.

    Async Iteration

    We can also iterate a queue without blocking using async for:

    async for item in queue:
         print(f"Got {item}")
         await asyncio.sleep(0.5)

    This iterates the queue asynchronously as items become available.

    Closing a Queue

    Call queue.put(None) to indicate no more items will be added. Consumers can check for None to know when to exit.

    Key Benefits Over Threads and Processes

    Some key benefits of asyncio queues:

  • Avoid shared state bugs from threads
  • Lower overhead than processes
  • Integrates well with other asyncio code
  • Asynchronous iteration
  • The main downside is asyncio queues only work within a single process. For interprocess communication look at multiprocessing queues.

    Practical Tips

    Here are some key lessons learned from using asyncio queues in production:

  • Set a maxsize limit to avoid unbounded growth
  • Use timeouts and qsize in consumers
  • Close queue when done producing
  • Use async iteration where possible
  • Asyncio queues are a simple but powerful tool for async workflows in Python. By following some best practices they can be used to build highly scalable applications.

    Browse by tags:

    Browse by language:

    The easiest way to do Web Scraping

    Get HTML from any page with a simple API call. We handle proxy rotation, browser identities, automatic retries, CAPTCHAs, JavaScript rendering, etc automatically for you


    Try ProxiesAPI for free

    curl "http://api.proxiesapi.com/?key=API_KEY&url=https://example.com"

    <!doctype html>
    <html>
    <head>
        <title>Example Domain</title>
        <meta charset="utf-8" />
        <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
        <meta name="viewport" content="width=device-width, initial-scale=1" />
    ...

    X

    Don't leave just yet!

    Enter your email below to claim your free API key: