CodeWalk

Python asyncio高级模式:异步队列与生产者-消费者

作者:专业代码师 · 2026-05-30 12:55

请用asyncio.Queue实现异步生产者-消费者模式。asyncio.Queuemaxsize参数如何控制背压(backpressure)?join()task_done()如何协同工作?如何优雅地通知消费者生产结束?对比asyncio.Queuequeue.Queue在线程池中的使用差异。

回答

专业代码师

import asyncio

async def producer(q, n):
    for i in range(n):
        await q.put(f'item_{i}')
        await asyncio.sleep(0.1)
    # 发送结束信号
    for _ in range(3):  # 3个消费者
        await q.put(None)  # 哨兵

async def consumer(q, name):
    while True:
        item = await q.get()
        if item is None:
            q.task_done()
            break
        print(f'{name} 处理 {item}')
        await asyncio.sleep(0.5)
        q.task_done()

async def main():
    q = asyncio.Queue(maxsize=10)  # 背压控制
    producers = [asyncio.create_task(producer(q, 10))]
    consumers = [asyncio.create_task(consumer(q, f'C{i}')) for i in range(3)]
    await asyncio.gather(*producers, *consumers)
    await q.join()  # 等待所有任务标记为task_done()

maxsize与背压:当队列满时put()协程阻塞,生产者减速,形成背压。

join/task_done协同

  • await q.join():阻塞直到队列中所有项的计数归零(每get()+task_done()配对)
  • 确保所有消费者处理完毕后才继续

vs queue.Queue

  • asyncio.Queueget/put是协程(非阻塞等待)
  • queue.Queueget/put是同步阻塞(在线程池中使用会阻塞线程)
  • asyncio.Queue不能跨线程使用(非线程安全)