
Python 并发编程多线程与多进程1. 技术分析1.1 并发编程概述并发编程可以提升程序性能并发类型 多线程: 共享内存适合IO密集型 多进程: 独立内存适合CPU密集型 异步IO: 事件驱动适合高IO场景1.2 并发模型对比模型适用场景优势劣势多线程IO密集型低开销GIL限制多进程CPU密集型真正并行高开销异步IO高IO场景高吞吐量复杂度高1.3 GIL影响GIL (Global Interpreter Lock) 同一时刻只有一个线程执行Python字节码 多线程在CPU密集型任务上无法真正并行 IO密集型任务可以通过切换获得并发收益2. 核心功能实现2.1 多线程编程import threading from concurrent.futures import ThreadPoolExecutor class ThreadPoolManager: def __init__(self, max_workersNone): self.executor ThreadPoolExecutor(max_workersmax_workers) def submit_task(self, func, *args, **kwargs): return self.executor.submit(func, *args, **kwargs) def map_tasks(self, func, iterable): return self.executor.map(func, iterable) def shutdown(self, waitTrue): self.executor.shutdown(waitwait) class ThreadSafeCounter: def __init__(self): self.count 0 self.lock threading.Lock() def increment(self): with self.lock: self.count 1 def get_count(self): with self.lock: return self.count def fetch_url(url): import requests response requests.get(url) return response.text def fetch_urls_concurrent(urls): with ThreadPoolExecutor(max_workers10) as executor: results list(executor.map(fetch_url, urls)) return results2.2 多进程编程import multiprocessing from concurrent.futures import ProcessPoolExecutor class ProcessPoolManager: def __init__(self, max_workersNone): self.executor ProcessPoolExecutor(max_workersmax_workers) def submit_task(self, func, *args, **kwargs): return self.executor.submit(func, *args, **kwargs) def map_tasks(self, func, iterable): return self.executor.map(func, iterable) def shutdown(self, waitTrue): self.executor.shutdown(waitwait) def compute_intensive_task(data): result 0 for i in range(1, 10000000): result i * data return result def compute_parallel(data_list): with ProcessPoolExecutor() as executor: results list(executor.map(compute_intensive_task, data_list)) return results class SharedMemoryManager: def __init__(self): self.manager multiprocessing.Manager() self.shared_dict self.manager.dict() def set_value(self, key, value): self.shared_dict[key] value def get_value(self, key): return self.shared_dict.get(key)2.3 异步编程import asyncio import aiohttp class AsyncIOManager: def __init__(self): self.loop asyncio.get_event_loop() async def fetch_url(self, url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def fetch_all_urls(self, urls): tasks [self.fetch_url(url) for url in urls] return await asyncio.gather(*tasks) def run(self, coroutine): return self.loop.run_until_complete(coroutine) async def process_data_async(data): results [] for item in data: result await process_item(item) results.append(result) return results async def process_item(item): await asyncio.sleep(0.1) return item * 23. 性能对比3.1 并发模型性能任务类型单线程多线程多进程异步IOIO密集型(100请求)100s10s15s5sCPU密集型(4核)100s95s25s100s3.2 线程/进程开销操作线程进程创建开销1ms100ms内存开销1MB100MB通信开销低高3.3 并发框架对比框架适用场景复杂度性能threading简单并发低中concurrent.futures任务池低中asyncio高IO高高multiprocessingCPU密集中高4. 最佳实践4.1 并发模式选择def choose_concurrency_model(task_type, data_size): if task_type io_bound: if data_size 1000: return asyncio return threading elif task_type cpu_bound: return multiprocessing else: return sequential class ConcurrencyStrategySelector: staticmethod def select(strategy, func, data): strategies { threading: lambda: run_with_threads(func, data), multiprocessing: lambda: run_with_processes(func, data), asyncio: lambda: run_with_async(func, data), sequential: lambda: [func(item) for item in data] } return strategies[strategy]()4.2 并发编程模式class ConcurrentDataProcessor: def __init__(self, modethreading): self.mode mode def process(self, data, func): if self.mode threading: return self._process_threading(data, func) elif self.mode multiprocessing: return self._process_multiprocessing(data, func) elif self.mode asyncio: return self._process_asyncio(data, func) def _process_threading(self, data, func): with ThreadPoolExecutor() as executor: return list(executor.map(func, data)) def _process_multiprocessing(self, data, func): with ProcessPoolExecutor() as executor: return list(executor.map(func, data)) def _process_asyncio(self, data, func): async def async_func(item): return func(item) async def process_all(): tasks [async_func(item) for item in data] return await asyncio.gather(*tasks) return asyncio.run(process_all())5. 总结并发编程可以显著提升性能多线程适合IO密集型任务多进程适合CPU密集型任务异步IO适合高IO吞吐量场景选择原则根据任务类型选择合适模型对比数据如下IO密集型任务异步IO 多线程 多进程CPU密集型任务多进程 其他线程创建开销比进程小100倍推荐使用concurrent.futures简化并发编程