
【Python进阶】异步编程完全指南从asyncio到实战应用引言异步编程是现代Python开发中不可或缺的技能特别是在处理IO密集型任务时。Python的asyncio库提供了强大的异步编程能力让我们能够编写高效的并发代码。本文将详细介绍Python异步编程的核心概念和实战应用。一、异步编程基础1.1 同步 vs 异步# 同步编程 import time def sync_task(name, delay): print(fTask {name} started) time.sleep(delay) print(fTask {name} completed) # 同步执行串行 start time.time() sync_task(A, 2) sync_task(B, 2) print(fTotal time: {time.time() - start:.2f}s) # ~4秒# 异步编程 import asyncio async def async_task(name, delay): print(fTask {name} started) await asyncio.sleep(delay) print(fTask {name} completed) # 异步执行并发 async def main(): start time.time() await asyncio.gather( async_task(A, 2), async_task(B, 2) ) print(fTotal time: {time.time() - start:.2f}s) # ~2秒 asyncio.run(main())1.2 核心概念对比概念同步编程异步编程执行方式串行执行并发执行阻塞方式阻塞等待非阻塞等待CPU利用率低等待时空闲高等待时执行其他任务适用场景计算密集型IO密集型二、asyncio核心概念2.1 协程Coroutine# 定义协程 async def greet(name): print(fHello, {name}) await asyncio.sleep(1) print(fGoodbye, {name}) # 运行协程 asyncio.run(greet(World))2.2 事件循环Event Loop# 获取事件循环 loop asyncio.get_event_loop() # 创建任务 task1 loop.create_task(async_task(A, 2)) task2 loop.create_task(async_task(B, 2)) # 运行直到完成 loop.run_until_complete(asyncio.gather(task1, task2)) # 关闭事件循环 loop.close()2.3 任务Taskasync def main(): # 创建任务 task asyncio.create_task(async_task(A, 2)) # 取消任务 await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print(Task was cancelled) asyncio.run(main())三、异步IO操作3.1 异步文件IOimport aiofiles async def read_file(filepath): async with aiofiles.open(filepath, moder) as f: content await f.read() return content async def write_file(filepath, content): async with aiofiles.open(filepath, modew) as f: await f.write(content) async def main(): content await read_file(input.txt) await write_file(output.txt, content) print(Files processed) asyncio.run(main())3.2 异步HTTP请求import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: tasks [ fetch(session, https://api.example.com/data/1), fetch(session, https://api.example.com/data/2), fetch(session, https://api.example.com/data/3) ] results await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())3.3 异步数据库操作import asyncpg async def connect_to_db(): conn await asyncpg.connect( useruser, passwordpassword, databasemydb, hostlocalhost ) return conn async def query_users(conn): users await conn.fetch(SELECT * FROM users LIMIT 10) return users async def main(): conn await connect_to_db() users await query_users(conn) for user in users: print(user) await conn.close() asyncio.run(main())四、异步模式与模式4.1 生产者-消费者模式async def producer(queue): for i in range(10): await asyncio.sleep(0.5) await queue.put(fItem {i}) print(fProduced: Item {i}) await queue.put(None) # 结束信号 async def consumer(queue): while True: item await queue.get() if item is None: break await asyncio.sleep(1) print(fConsumed: {item}) queue.task_done() async def main(): queue asyncio.Queue(maxsize5) # 创建任务 producer_task asyncio.create_task(producer(queue)) consumer_task asyncio.create_task(consumer(queue)) await producer_task await consumer_task asyncio.run(main())4.2 超时控制async def slow_task(): await asyncio.sleep(5) return Done async def main(): try: result await asyncio.wait_for(slow_task(), timeout2) print(result) except asyncio.TimeoutError: print(Task timed out) asyncio.run(main()) # 输出: Task timed out4.3 并发限制async def fetch_with_limit(session, url, semaphore): async with semaphore: async with session.get(url) as response: return await response.json() async def main(): semaphore asyncio.Semaphore(10) # 最多10个并发 async with aiohttp.ClientSession() as session: urls [fhttps://api.example.com/data/{i} for i in range(100)] tasks [ fetch_with_limit(session, url, semaphore) for url in urls ] results await asyncio.gather(*tasks) print(fFetched {len(results)} items) asyncio.run(main())五、异步Web框架5.1 FastAPI异步端点from fastapi import FastAPI import asyncio app FastAPI() app.get(/) async def root(): await asyncio.sleep(1) return {message: Hello World} app.get(/items/{item_id}) async def read_item(item_id: int, q: str None): await asyncio.sleep(0.5) return {item_id: item_id, q: q}5.2 Starlette异步应用from starlette.applications import Starlette from starlette.responses import JSONResponse import asyncio app Starlette() app.route(/) async def homepage(request): await asyncio.sleep(1) return JSONResponse({hello: world}) app.route(/async) async def async_endpoint(request): await asyncio.sleep(2) return JSONResponse({async: True})六、异步编程最佳实践6.1 避免阻塞调用# 错误示例在异步代码中使用同步阻塞调用 async def bad_example(): # time.sleep是阻塞的 time.sleep(1) # 这会阻塞整个事件循环 return done # 正确示例使用异步版本 async def good_example(): await asyncio.sleep(1) # 非阻塞 return done6.2 合理使用并发async def process_batch(items): # 分批处理避免一次性创建过多任务 batch_size 100 for i in range(0, len(items), batch_size): batch items[i:ibatch_size] tasks [process_item(item) for item in batch] await asyncio.gather(*tasks)6.3 错误处理async def safe_fetch(session, url): try: async with session.get(url) as response: if response.status ! 200: raise Exception(fHTTP error: {response.status}) return await response.json() except aiohttp.ClientError as e: print(fRequest failed: {e}) return None except Exception as e: print(fUnexpected error: {e}) return None6.4 调试异步代码# 启用调试模式 asyncio.run(main(), debugTrue) # 或者设置环境变量 # PYTHONASYNCIODEBUG1 python script.py七、性能对比7.1 同步vs异步性能测试import time import requests import aiohttp # 同步版本 def sync_fetch(urls): results [] for url in urls: results.append(requests.get(url).text) return results # 异步版本 async def async_fetch(urls): async with aiohttp.ClientSession() as session: tasks [session.get(url) for url in urls] responses await asyncio.gather(*tasks) return [await r.text() for r in responses] # 测试 urls [https://httpbin.org/get] * 10 # 同步执行 start time.time() sync_fetch(urls) print(fSync time: {time.time() - start:.2f}s) # 异步执行 start time.time() asyncio.run(async_fetch(urls)) print(fAsync time: {time.time() - start:.2f}s)7.2 性能对比结果并发数同步时间异步时间提升倍数102.5s0.5s5x5012.3s1.2s10x10024.1s2.1s11x八、实战案例异步爬虫8.1 爬取网页内容import asyncio import aiohttp from bs4 import BeautifulSoup async def fetch_page(session, url): 获取网页内容 async with session.get(url) as response: return await response.text() async def parse_page(html): 解析网页 soup BeautifulSoup(html, html.parser) title soup.title.string if soup.title else No title links [a[href] for a in soup.find_all(a, hrefTrue)] return {title: title, links: links} async def crawl(url, session, visited): 递归爬取 if url in visited: return visited.add(url) print(fCrawling: {url}) try: html await fetch_page(session, url) page_data await parse_page(html) # 提取并爬取链接 tasks [] for link in page_data[links][:5]: # 限制数量 if link.startswith(http): tasks.append(crawl(link, session, visited)) await asyncio.gather(*tasks) except Exception as e: print(fFailed to crawl {url}: {e}) async def main(): visited set() async with aiohttp.ClientSession() as session: await crawl(https://example.com, session, visited) print(fTotal pages crawled: {len(visited)}) asyncio.run(main())8.2 异步数据处理管道async def download_data(url, queue): 下载数据 async with aiohttp.ClientSession() as session: async with session.get(url) as response: data await response.json() await queue.put(data) async def process_data(queue): 处理数据 while True: data await queue.get() # 处理逻辑 processed { id: data.get(id), name: data.get(name), processed_at: time.time() } print(fProcessed: {processed}) queue.task_done() async def main(): queue asyncio.Queue() # 启动处理器 processor asyncio.create_task(process_data(queue)) # 下载数据 urls [ https://api.example.com/data/1, https://api.example.com/data/2, https://api.example.com/data/3 ] tasks [download_data(url, queue) for url in urls] await asyncio.gather(*tasks) await queue.join() processor.cancel() asyncio.run(main())九、常见问题与解决方案9.1 阻塞事件循环# 问题CPU密集型任务阻塞事件循环 async def cpu_intensive_task(): result 0 for i in range(10**8): result i return result # 解决方案使用线程池 async def run_cpu_task(): loop asyncio.get_event_loop() result await loop.run_in_executor( None, # 使用默认线程池 cpu_intensive_task ) return result9.2 共享状态管理# 使用锁保护共享状态 import asyncio class Counter: def __init__(self): self.value 0 self.lock asyncio.Lock() async def increment(self): async with self.lock: self.value 1 return self.value async def worker(counter): for _ in range(1000): await counter.increment() async def main(): counter Counter() tasks [worker(counter) for _ in range(10)] await asyncio.gather(*tasks) print(fFinal value: {counter.value}) # 应该是 10000 asyncio.run(main())9.3 调试技巧# 设置日志级别 import logging logging.basicConfig(levellogging.DEBUG) # 监控任务 async def monitor_tasks(): while True: tasks [t for t in asyncio.all_tasks() if not t.done()] print(fActive tasks: {len(tasks)}) await asyncio.sleep(1)十、结语Python异步编程是处理IO密集型任务的利器通过合理使用asyncio可以显著提高程序的并发性能。但需要注意避免阻塞调用、合理控制并发数量并做好错误处理。希望本文能帮助你掌握Python异步编程的核心技能。#Python #异步编程 #asyncio #并发