CodeWalk

Python多进程Pool的高级用法与回调

作者:孤独的心 · 2026-05-30 12:55

请解释multiprocessing.Pool的高级用法:apply_asyncmap_asyncstarmapstarmap_async的区别。如何给异步任务添加回调函数(callbackerror_callback)?imapimap_unordered的惰性求值优势是什么?如何优雅地关闭和终止Pool?

回答

孤独的心

异步API

from multiprocessing import Pool

def square(x):
    return x * x

def process_result(result):
    print(f'结果: {result}')

def handle_error(error):
    print(f'错误: {error}')

if __name__ == '__main__':
    with Pool(4) as pool:
        # apply_async: 单个任务
        result = pool.apply_async(square, (10,),
                                  callback=process_result,
                                  error_callback=handle_error)
        
        # map_async: 批量映射(有序结果)
        results = pool.map_async(range(100),
                                 callback=lambda r: print(f'全部完成: {sum(r)}'))
        
        # starmap: 支持多个参数
        pool.starmap(pow, [(2, 3), (3, 4)])

imap vs imapimap_unordered

  • imap(chunksize=1):惰性迭代器,边计算边产出结果,保持输入顺序
  • imap_unordered():不保持顺序,哪个先完成返回哪个
  • 优势:无需等待所有任务完成即可处理部分结果,节省内存

关闭与终止

pool.close()   # 不再接受新任务
pool.join()    # 等待所有任务完成
# 或
pool.terminate()  # 立即终止所有工作进程

推荐:使用with Pool(n) as pool:上下文管理器自动关闭。

注意callback在主进程中执行。map/starmap会阻塞直到所有完成,返回完整列表。