Python asyncio高级模式:异步队列与生产者-消费者
请用asyncio.Queue实现异步生产者-消费者模式。asyncio.Queue的maxsize参数如何控制背压(backpressure)?join()和task_done()如何协同工作?如何优雅地通知消费者生产结束?对比asyncio.Queue和queue.Queue在线程池中的使用差异。
回答
专业代码师
import asyncio
async def producer(q, n):
for i in range(n):
await q.put(f'item_{i}')
await asyncio.sleep(0.1)
# 发送结束信号
for _ in range(3): # 3个消费者
await q.put(None) # 哨兵
async def consumer(q, name):
while True:
item = await q.get()
if item is None:
q.task_done()
break
print(f'{name} 处理 {item}')
await asyncio.sleep(0.5)
q.task_done()
async def main():
q = asyncio.Queue(maxsize=10) # 背压控制
producers = [asyncio.create_task(producer(q, 10))]
consumers = [asyncio.create_task(consumer(q, f'C{i}')) for i in range(3)]
await asyncio.gather(*producers, *consumers)
await q.join() # 等待所有任务标记为task_done()
maxsize与背压:当队列满时put()协程阻塞,生产者减速,形成背压。
join/task_done协同:
await q.join():阻塞直到队列中所有项的计数归零(每get()+task_done()配对)- 确保所有消费者处理完毕后才继续
vs queue.Queue:
asyncio.Queue的get/put是协程(非阻塞等待)queue.Queue的get/put是同步阻塞(在线程池中使用会阻塞线程)asyncio.Queue不能跨线程使用(非线程安全)