
Python 并发三剑客多线程、多进程与协程的实战抉择写在前面这篇文章源于我在一次生产事故后的深度反思。当时我们用多线程处理图片压缩任务CPU 跑满了但吞吐量却上不去。那一刻我意识到很多开发者对并发模型的选择凭的是感觉而不是理解。这篇文章就是我想和你聊清楚的事。一、先把概念摆平三者到底是什么在动手选型之前我们得先搞清楚这三个东西的本质区别而不是背定义。多线程Threading多个线程共享同一进程的内存空间切换开销小但在 CPython 中受 GIL全局解释器锁限制同一时刻只有一个线程在执行 Python 字节码。多进程Multiprocessing每个进程拥有独立的内存空间和 Python 解释器真正实现并行计算但进程间通信IPC和内存开销更大。协程Asyncio/Coroutine单线程内的协作式并发通过await主动让出控制权切换开销极小适合大量 I/O 等待场景。用一张表格直观对比维度多线程多进程协程并行能力受 GIL 限制伪并行真并行单线程并发非并行内存开销小共享内存大独立内存极小切换开销中大极小适合场景I/O 密集兼容性好CPU 密集高并发 I/O编程复杂度中需处理竞态中需处理 IPC中需 async/await调试难度高竞态条件中中二、GIL 是什么为什么它让多线程名不副实这是理解 Python 并发的核心。GIL 是 CPython 解释器的一把全局锁确保同一时刻只有一个线程执行 Python 字节码目的是保护内存管理的线程安全。importthreadingimporttime# 验证 GIL 对 CPU 密集任务的影响defcpu_task(n):纯 CPU 计算count0for_inrange(n):count1returncount# 单线程starttime.time()cpu_task(50_000_000)cpu_task(50_000_000)print(f单线程耗时:{time.time()-start:.2f}s)# 多线程你会发现并没有快多少甚至更慢starttime.time()t1threading.Thread(targetcpu_task,args(50_000_000,))t2threading.Thread(targetcpu_task,args(50_000_000,))t1.start();t2.start()t1.join();t2.join()print(f多线程耗时:{time.time()-start:.2f}s)运行结果通常会让你大吃一惊多线程版本不仅没快反而因为 GIL 争抢和线程切换开销耗时相近甚至更长。但 GIL 在 I/O 等待时会释放这是多线程在 I/O 场景依然有价值的根本原因。三、三种负载类型的选型策略3.1 CPU 密集型 → 多进程典型场景图片压缩/转码、视频处理、大规模数值计算、加密解密、机器学习推理。frommultiprocessingimportPoolfromPILimportImageimportosdefcompress_image(args):CPU 密集图片压缩input_path,output_path,qualityargswithImage.open(input_path)asimg:# 转换色彩模式去除 alpha 通道ifimg.modein(RGBA,P):imgimg.convert(RGB)img.save(output_path,JPEG,qualityquality,optimizeTrue)returnoutput_pathdefbatch_compress(image_list,quality75):使用多进程池并行压缩# cpu_count() 自动获取 CPU 核心数withPool(processesos.cpu_count())aspool:resultspool.map(compress_image,image_list)returnresults# 使用示例tasks[(input/photo1.png,output/photo1.jpg,75),(input/photo2.png,output/photo2.jpg,75),# ... 更多任务]batch_compress(tasks)多进程的关键注意点进程间传递数据有序列化开销pickle避免传递大对象使用Pool.map而非手动管理进程更安全进程数通常设为 CPU 核心数过多反而因调度开销下降3.2 I/O 密集型 → 协程首选或多线程典型场景HTTP 请求、数据库查询、文件读写、消息队列消费。协程在高并发 I/O 场景下是绝对的王者因为它的切换开销接近于零importasyncioimportaiohttpimporttimeasyncdeffetch_url(session,url):异步 HTTP 请求asyncwithsession.get(url,timeoutaiohttp.ClientTimeout(total10))asresp:returnawaitresp.text()asyncdeffetch_all(urls):并发抓取所有 URLasyncwithaiohttp.ClientSession()assession:tasks[fetch_url(session,url)forurlinurls]# gather 并发执行所有任务resultsawaitasyncio.gather(*tasks,return_exceptionsTrue)returnresults# 对比顺序请求 vs 协程并发urls[fhttps://httpbin.org/delay/1for_inrange(10)]starttime.time()asyncio.run(fetch_all(urls))print(f协程并发 10 个请求耗时:{time.time()-start:.2f}s)# 结果约 1~2s而顺序请求需要 10s什么时候用多线程而不是协程当你依赖的库不支持 async比如老版本的数据库驱动、某些 SDK多线程是更务实的选择fromconcurrent.futuresimportThreadPoolExecutorimportrequestsdeffetch_sync(url):同步请求用线程池并发returnrequests.get(url,timeout10).textwithThreadPoolExecutor(max_workers20)asexecutor:resultslist(executor.map(fetch_sync,urls))3.3 混合型负载 → 分层架构这是最考验设计能力的场景。混合型负载的核心思路是不同类型的任务交给最擅长的并发模型处理通过队列解耦。四、实战案例图片处理 网络请求 数据库存储的任务系统这是一个真实的业务场景用户上传图片 → 下载原图 → 压缩处理 → 存储到数据库。三个阶段分别是 I/O 密集、CPU 密集、I/O 密集如何拆分架构设计[协程层] 网络下载 (asyncio aiohttp) ↓ 队列传递原始图片数据 [进程层] 图片压缩 (multiprocessing.Pool) ↓ 队列传递压缩结果 [协程层] 数据库存储 (asyncio asyncpg)完整实现importasyncioimportaiohttpimportasyncpgfrommultiprocessingimportPoolfromconcurrent.futuresimportProcessPoolExecutorfromioimportBytesIOfromPILimportImageimportos# CPU 密集层图片压缩运行在子进程中defcompress_image_bytes(image_data:bytes,quality:int75)-bytes: 在子进程中执行图片压缩 注意此函数必须是模块级别的才能被 pickle 序列化 withImage.open(BytesIO(image_data))asimg:ifimg.modein(RGBA,P):imgimg.convert(RGB)outputBytesIO()img.save(output,formatJPEG,qualityquality,optimizeTrue)returnoutput.getvalue()# I/O 层下载与存储运行在协程中asyncdefdownload_image(session:aiohttp.ClientSession,url:str)-bytes:异步下载图片asyncwithsession.get(url)asresp:resp.raise_for_status()returnawaitresp.read()asyncdefsave_to_db(pool:asyncpg.Pool,url:str,data:bytes):异步写入数据库asyncwithpool.acquire()asconn:awaitconn.execute(INSERT INTO images (source_url, compressed_data, size) VALUES ($1, $2, $3),url,data,len(data))# 核心调度器串联三个阶段 asyncdefprocess_pipeline(urls:list[str],db_dsn:str): 混合并发管道 - 下载协程并发 - 压缩进程池CPU 密集 - 存储协程并发 # 初始化资源db_poolawaitasyncpg.create_pool(db_dsn,min_size5,max_size20)process_executorProcessPoolExecutor(max_workersos.cpu_count())loopasyncio.get_event_loop()asyncwithaiohttp.ClientSession()assession:# 阶段一并发下载所有图片print(f开始下载{len(urls)}张图片...)download_tasks[download_image(session,url)forurlinurls]raw_imagesawaitasyncio.gather(*download_tasks,return_exceptionsTrue)# 阶段二提交到进程池压缩CPU 密集不阻塞事件循环print(提交图片压缩任务到进程池...)compress_tasks[]valid_pairs[]# (url, raw_data) 过滤掉下载失败的forurl,rawinzip(urls,raw_images):ifisinstance(raw,Exception):print(f下载失败:{url}-{raw})continuevalid_pairs.append(url)# run_in_executor 将阻塞调用包装为协程不阻塞事件循环taskloop.run_in_executor(process_executor,compress_image_bytes,raw,75)compress_tasks.append(task)compressed_imagesawaitasyncio.gather(*compress_tasks,return_exceptionsTrue)# 阶段三并发写入数据库print(写入数据库...)save_tasks[]forurl,compressedinzip(valid_pairs,compressed_images):ifisinstance(compressed,Exception):print(f压缩失败:{url}-{compressed})continuesave_tasks.append(save_to_db(db_pool,url,compressed))awaitasyncio.gather(*save_tasks)# 清理资源process_executor.shutdown(waitTrue)awaitdb_pool.close()print(全部任务完成)# 入口 if__name____main__:image_urls[https://example.com/image1.jpg,https://example.com/image2.jpg,# ...]DB_DSNpostgresql://user:passwordlocalhost/mydbasyncio.run(process_pipeline(image_urls,DB_DSN))关键设计决策解析为什么用run_in_executor而不是直接调用进程池loop.run_in_executor是协程与阻塞代码之间的桥梁。它把阻塞调用放到线程池或进程池执行同时返回一个可await的 Future事件循环在等待期间可以继续处理其他协程不会被阻塞。为什么数据库连接用连接池每次acquire()从池中借用连接用完自动归还。避免了频繁建立/断开连接的开销也防止并发过高时连接数耗尽。五、性能调优几个容易被忽视的细节5.1 进程池的 worker 数量importos# CPU 密集worker 数 CPU 核心数cpu_workersos.cpu_count()# I/O 密集线程池可以适当放大但不是越多越好# 经验值核心数 * 2 到 核心数 * 5io_workersos.cpu_count()*4# 协程并发数由 semaphore 控制防止资源耗尽semaphoreasyncio.Semaphore(100)# 最多 100 个并发请求asyncdeffetch_with_limit(session,url):asyncwithsemaphore:returnawaitfetch_url(session,url)5.2 避免协程中的意外阻塞这是协程最常见的坑在 async 函数里调用了同步阻塞操作整个事件循环都会卡住。importasyncioimportaiofiles# 异步文件 I/O# ❌ 错误在协程中使用同步文件操作asyncdefbad_read(path):withopen(path,r)asf:# 这会阻塞事件循环returnf.read()# ✅ 正确使用异步文件库asyncdefgood_read(path):asyncwithaiofiles.open(path,r)asf:returnawaitf.read()# ✅ 或者用 run_in_executor 包装asyncdefalso_good_read(path):loopasyncio.get_event_loop()returnawaitloop.run_in_executor(None,open(path).read)5.3 多进程间通信的序列化开销frommultiprocessingimportPoolimportnumpyasnp# ❌ 传递大型 numpy 数组pickle 序列化开销巨大defprocess_array(arr):returnarr.sum()large_arraynp.random.rand(10_000_000)withPool(4)aspool:# 每次传递都要序列化/反序列化非常慢resultpool.map(process_array,[large_array]*4)# ✅ 使用共享内存Python 3.8frommultiprocessingimportshared_memoryimportnumpyasnp shmshared_memory.SharedMemory(createTrue,sizelarge_array.nbytes)shared_arrnp.ndarray(large_array.shape,dtypelarge_array.dtype,buffershm.buf)np.copyto(shared_arr,large_array)# 子进程通过 shm.name 访问无需序列化六、选型决策树面对一个新任务我的选型思路是这样的任务是否涉及大量等待网络/磁盘/数据库 ├── 是 → 是否有成熟的 async 库支持 │ ├── 是 → 用协程asyncio │ └── 否 → 用多线程ThreadPoolExecutor └── 否 → 是否是纯 CPU 计算 ├── 是 → 用多进程ProcessPoolExecutor └── 混合 → 分层架构协程处理 I/O进程池处理 CPU七、总结回到文章开头那次生产事故根因很清晰用多线程处理 CPU 密集的图片压缩GIL 让多线程形同虚设换成多进程后吞吐量提升了近 4 倍。三个模型没有绝对的优劣只有适不适合CPU 密集→ 多进程绕开 GIL真正并行I/O 密集→ 协程优先多线程备选混合负载→ 分层架构用队列解耦各司其职最后想问问你你在项目中遇到过并发选型踩坑的经历吗是 GIL 的问题还是协程里不小心写了阻塞调用欢迎在评论区聊聊这类血泪教训往往比任何教程都有价值。参考资料Python 官方文档 - asyncioPython 官方文档 - multiprocessingPEP 3156 - Asynchronous IO Support书籍推荐《流畅的Python第2版》第19章并发部分、《Python Cookbook》第12章