Python多进程Pool的高级用法与回调
请解释multiprocessing.Pool的高级用法:apply_async、map_async、starmap、starmap_async的区别。如何给异步任务添加回调函数(callback和error_callback)?imap和imap_unordered的惰性求值优势是什么?如何优雅地关闭和终止Pool?
回答
孤独的心
异步API:
from multiprocessing import Pool
def square(x):
return x * x
def process_result(result):
print(f'结果: {result}')
def handle_error(error):
print(f'错误: {error}')
if __name__ == '__main__':
with Pool(4) as pool:
# apply_async: 单个任务
result = pool.apply_async(square, (10,),
callback=process_result,
error_callback=handle_error)
# map_async: 批量映射(有序结果)
results = pool.map_async(range(100),
callback=lambda r: print(f'全部完成: {sum(r)}'))
# starmap: 支持多个参数
pool.starmap(pow, [(2, 3), (3, 4)])
imap vs imap与imap_unordered:
imap(chunksize=1):惰性迭代器,边计算边产出结果,保持输入顺序imap_unordered():不保持顺序,哪个先完成返回哪个- 优势:无需等待所有任务完成即可处理部分结果,节省内存
关闭与终止:
pool.close() # 不再接受新任务
pool.join() # 等待所有任务完成
# 或
pool.terminate() # 立即终止所有工作进程
推荐:使用with Pool(n) as pool:上下文管理器自动关闭。
注意:callback在主进程中执行。map/starmap会阻塞直到所有完成,返回完整列表。