从入门到实战:基于Python的Locust性能压测全攻略

发布时间:2026/7/1 23:23:14

从入门到实战:基于Python的Locust性能压测全攻略 1. 项目概述为什么选择Locust进行压测如果你正在寻找一个既能模拟真实用户行为又能用你熟悉的Python代码来定义压测场景的工具那么Locust几乎是不二之选。我最初接触压测是从JMeter开始的图形化界面上手快但一旦遇到复杂的逻辑、动态参数或者需要和现有Python测试框架集成时就感到束手束脚。Locust的出现完美解决了这个问题——它让你用代码定义一切从简单的HTTP请求到复杂的业务流程都像写普通Python脚本一样自然。这个项目或者说这篇分享源于我最近为一个内部API网关项目做性能评估的经历。我需要模拟上千个用户每个用户登录后以不同的时间间隔查询订单、提交评论并且用户行为需要有一定的随机性。用JMeter实现这个场景需要捣鼓各种插件和BeanShell脚本调试起来非常痛苦。而用Locust我只需要几十行Python代码逻辑清晰维护起来也极其方便。更重要的是Locust的分布式压测能力非常轻量级一个Master节点就能轻松协调上百个Worker资源消耗远小于同级别的其他工具。所以这篇内容不仅仅是又一个“Hello World”式的教程。我会带你从一个最基础的demo开始手把手搭建一个完整的、可扩展的压测场景然后深入剖析Locust那些最常用但也最容易让人困惑的核心参数和配置项。无论你是想验证一个新接口的吞吐量还是需要对一个复杂系统进行全链路压测这里面的思路和技巧都能直接套用。我们不止讲“怎么做”更会重点讲清楚“为什么这么做”以及我在实际压测中踩过的那些坑。2. 从零搭建你的第一个Locust压测Demo很多人学新工具喜欢直接看复杂案例但我的经验是一个能跑起来的最简例子才是建立信心和理解框架设计的关键。我们先抛开所有高级功能看看如何用最少代码启动一次压测。2.1 环境准备与极简脚本编写首先确保你的Python环境3.6及以上已经就绪。安装Locust只需要一条命令pip install locust接下来创建一个名为locustfile.py的文件。这个文件名是Locust默认寻找的入口文件当然你也可以用其他名字并通过参数指定。我们先写一个最基础的脚本from locust import HttpUser, task, between class QuickstartUser(HttpUser): # 模拟用户等待时间介于1到2.5秒之间 wait_time between(1, 2.5) task def hello_world(self): # 发起一个GET请求到根路径 self.client.get(/) task(3) # 这个任务的权重是3意味着它被执行的频率是hello_world的3倍 def view_items(self): # 模拟查看物品列表并捕获响应 with self.client.get(/items, catch_responseTrue) as response: if response.status_code 200: # 可以在这里对响应内容做断言 if total not in response.text: response.failure(Response did not contain total) else: response.failure(fStatus code was {response.status_code})这个脚本定义了一类虚拟用户QuickstartUser。它有两个行为task访问首页和查看物品列表。wait_time定义了用户每次执行任务后的思考时间这很重要因为真正的用户不会毫秒不差地连续发送请求。task(3)中的权重参数3意味着在负载生成时view_items任务被选中的概率是hello_world任务的3倍。注意locustfile.py必须放在你启动命令的当前目录或者通过-f参数指定路径。这是新手最容易犯的错之一启动后发现 “No Locustfile found”。2.2 启动压测与Web UI解读保存好脚本后在终端中进入该文件所在目录运行locust默认情况下Locust会启动一个Web UI服务地址是http://localhost:8089。打开浏览器访问这个地址你会看到Locust的启动界面。关键参数第一填Number of users (peak concurrency)这是你希望模拟的最大并发用户数。Locust会以你设置的孵化速率Spawn rate逐步增加到这个数量。注意这不是每秒请求数RPS而是同时“存活”的虚拟用户数量。Spawn rate (users spawned/second)孵化速率即每秒启动多少个虚拟用户直到达到“Number of users”设置的总数。设置为1就意味着每秒增加1个用户平稳上升。Host被测试系统的根地址比如http://api.your-service.com。我们脚本中的self.client.get(“/items”)会拼接到这个Host后面形成完整的请求URLhttp://api.your-service.com/items。填写好这些例如Users: 100 Spawn rate: 10 Host:http://localhost:8080点击 “Start swarming”压测就开始了。Web UI的仪表盘是监控压测的核心你需要关注这几个指标RPS (Requests per second)每秒发送的请求数。这是衡量系统吞吐量的黄金指标。Failure/s每秒失败的请求数。任何非零值都需要警惕。Response Times (ms)响应时间的分布。重点关注中位数Median和95分位值95%ile。中位数代表大多数用户的体验95分位值则反映了尾部延迟即最慢的那5%请求的情况这对用户体验至关重要。Users当前活跃的虚拟用户数。2.3 无头模式Headless运行与结果导出Web UI适合调试和实时监控但在自动化测试或CI/CD流水线中我们更需要命令行模式。这就是--headless参数的用武之地。locust -f locustfile.py --headless --users 100 --spawn-rate 10 -H http://localhost:8080 --run-time 1m30s这条命令的意思是以无头模式启动最终用户数100每秒孵化10个用户压测持续1分30秒后自动停止。更实用的功能是结果导出方便你生成报告或进行后续分析locust -f locustfile.py --headless --users 100 --spawn-rate 10 -H http://localhost:8080 --run-time 30s --csvresult --htmlreport.html这个命令会在运行后生成两个文件result_stats.csv详细统计数据和report.html一个简洁的HTML报告。--csv参数是很多教程里没提但实际工作中极其重要的功能它让你能轻松地将压测数据导入到Excel或数据分析工具中进行更深入的性能趋势分析。实操心得在调试脚本阶段我强烈建议先用--headless模式配合--run-time 10s快速运行一下看看脚本是否有语法错误或逻辑问题这比启动Web UI再点开始要快得多。确认脚本无误后再使用Web UI进行细致的观察和调参。3. Locust核心参数与配置深度解析掌握了基本流程后我们来拆解Locust里那些决定压测行为和质量的关键参数。理解它们你才能精准控制你的“虚拟大军”。3.1 用户行为定义wait_time、task与权重1.wait_time模拟用户思考时间这是让压测场景变得“真实”的关键。Locust提供了几种策略between(min, max) 最常用在最小值和最大值之间随机取一个等待时间如between(1, 5)表示等1到5秒。constant(n) 每次等待固定的秒数。constant_pacing(n)这是一个高级且非常有用的参数。它试图保证每个任务执行周期任务执行时间等待时间至少为n秒。如果你的任务执行很快它会自动增加等待时间以确保节奏稳定。这对于模拟固定节奏的业务如每5秒查询一次非常有用。错误示例与修正# 可能的问题如果任务执行时间不稳定单纯用between可能导致RPS波动巨大。 class MyUser(HttpUser): wait_time between(0.1, 0.3) # 思考时间很短 task def fast_api(self): self.client.get(“/fast”) # 这个接口可能10ms就返回 task def slow_api(self): self.client.get(“/slow”) # 这个接口可能2秒才返回在这种情况下执行slow_api时会长时间占用一个虚拟用户导致整体并发用户数中实际在发送请求的比例降低RPS会下降。更合理的做法是为不同响应时间的接口设计不同的用户类或者使用constant_pacing来平滑请求间隔。2.task装饰器与权重task标记一个方法是用户的任务。权重决定了任务被选择的概率。task 权重默认为1。task(3) 权重为3。在一个用户类中所有任务的权重会被归一化。例如一个权重1的任务和一个权重3的任务它们被选中的概率分别是 1/(13)25% 和 75%。3. 任务集TaskSet的嵌套与调度对于复杂的、有顺序的业务流例如必须先登录然后才能下单可以使用TaskSet。from locust import HttpUser, task, TaskSet, between class UserBehavior(TaskSet): # 在TaskSet内部同样可以用task定义子任务 task(2) def view_profile(self): self.client.get(“/profile”) task(1) def stop(self): # 调用self.interrupt()可以中断当前TaskSet的执行返回到父级 self.interrupt() class WebsiteUser(HttpUser): wait_time between(5, 9) # tasks是一个列表可以包含元组 (TaskSet类, 权重) tasks [UserBehavior] # 等价于 [(UserBehavior, 1)] task def index(self): self.client.get(“/”)TaskSet让用户行为的组织更加模块化。self.interrupt()是控制流程的关键它允许用户跳出当前的TaskSet回到父级的任务选择中从而模拟用户跳出某个流程的行为。3.2 性能压测控制参数--users、--spawn-rate与--run-time这几个参数共同决定了负载模型。--users总用户数。它代表的是模拟的“用户池”大小即同时存在的虚拟用户上限。这些用户会根据wait_time在任务间休眠因此活跃并发请求数通常小于总用户数。--spawn-rate孵化速率。它控制负载爬升的斜率。设置为10意味着每秒有10个新用户被创建并开始执行任务直到达到总用户数。一个平滑的爬升如--spawn-rate 5有助于观察系统在压力逐步增加时的表现而一个陡峭的爬升如--spawn-rate 100则可以测试系统的瞬时抗冲击能力。--run-time压测持续时间。格式如1m30s、10h。设置一个明确的运行时间对于获取稳定、可比较的性能数据至关重要。我通常会先快速运行1-2分钟检查脚本和系统基本状态然后进行至少10-15分钟的稳态压测以获得更可靠的平均值。负载模型设计示例假设你想模拟一个“周末大促”的场景预热期5分钟--users 500 --spawn-rate 10。用5分钟时间缓慢将用户增加到500观察系统在压力逐步增大时的性能变化。峰值期10分钟--users 2000 --spawn-rate 100。快速将用户冲到2000模拟抢购开始时的洪峰。稳定期30分钟 保持2000用户持续运行检验系统在高压下的稳定性。衰退期5分钟 停止孵化新用户让现有用户自然完成任务并退出。在Locust中你需要分次运行或编写更复杂的脚本来实现这个分段模型。一种常见做法是使用–-users和–-spawn-rate的组合或者利用Locust的事件钩子如test_start在运行时动态调整。3.3 请求客户端与认证配置self.client HttpSession在任务中使用的self.client是HttpSession的一个实例它本质上是requests.Session的封装。这意味着它自动保持了Cookie适合模拟需要登录态的会话。处理认证对于需要Token认证的API通常在on_start方法中登录并保存Token。class ApiUser(HttpUser): wait_time between(0.5, 2) host “http://api.example.com” def on_start(self): # 每个虚拟用户开始运行时先执行登录 response self.client.post(“/auth/login”, json{“username”: “test”, “password”: “test”}) if response.status_code 200: self.token response.json().get(“token”) # 将token设置到后续所有请求的header中 self.client.headers.update({“Authorization”: f”Bearer {self.token}”}) else: # 登录失败可以标记该用户为失败或停止运行 self.environment.runner.quit() task def get_secret(self): # 此时的请求会自动带上Authorization头 self.client.get(“/protected/resource”)重要提示on_start是每个虚拟用户实例在开始运行其任务循环前都会执行一次的方法。如果你有10个用户就会执行10次登录。这模拟了10个不同的用户会话是符合真实场景的。切勿误以为它只在压测开始时执行一次。3.4 分布式压测与Master-Worker模式当单台机器无法模拟足够多的用户受限于CPU、网络或端口数时就需要分布式压测。Locust的分布式模式采用一个Master节点和多个Worker节点的架构。Master节点负责协调、收集数据、提供Web UI。它本身不产生任何负载。Worker节点负责实际执行任务脚本、生成负载。启动步骤启动Master指定一个开放的端口例如5557locust -f locustfile.py --master --master-bind-host0.0.0.0 --master-bind-port5557在每台Worker机器上启动Worker指向Master的IP和端口locust -f locustfile.py --worker --master-host192.168.1.100 --master-port5557关键参数与避坑指南--master/--worker 明确节点角色。--master-bind-host/--master-bind-port Master节点监听的地址和端口默认为*:5557。如果Master有防火墙需要开放此端口。--master-host/--master-port Worker节点需要连接的Master地址和端口。--expect-workersMaster端参数这是一个非常有用的参数。在启动Master时使用--expect-workers 4告诉Master等待4个Worker连接。当所有Worker就绪后Web UI上才会出现“Start”按钮避免了你手动去数Worker是否都连上了的麻烦。文件一致性所有Master和Worker节点上的locustfile.py必须完全一致这是分布式压测最大的坑。不一致的脚本会导致不可预知的行为。建议使用版本控制系统如Git或共享存储来确保文件同步。主机资源 Master节点资源消耗很小但Worker节点是负载生成器需要足够的CPU和网络资源。监控Worker节点的资源使用情况避免其自身成为瓶颈。4. 构建高阶压测场景动态参数、关联与检查点基础请求只是开始真实的业务场景复杂得多。我们需要让虚拟用户“智能”起来。4.1 参数化与数据驱动让每个虚拟用户使用不同的数据比如不同的用户名、商品ID。我们可以从文件中读取或者动态生成。方法一从CSV/JSON文件读取适用于固定数据集import csv from locust import HttpUser, task, between class ParameterizedUser(HttpUser): wait_time between(1, 3) def on_start(self): # 假设我们有一个users.csv文件内容为username,password with open(‘users.csv’, ‘r’) as f: reader csv.DictReader(f) self.user_list list(reader) # 加载到内存 self.current_user None task def login(self): if not self.user_list: self.environment.runner.quit() return # 每次任务取一个用户用完后移除模拟用户池 self.current_user self.user_list.pop() self.client.post(“/login”, json{ “username”: self.current_user[‘username’], “password”: self.current_user[‘password’] })注意这种方式在分布式运行时每个Worker都会读取完整文件可能导致数据重复使用。更佳实践是在Master节点准备数据并通过消息队列分发给Worker或者使用数据库并确保每个Worker连接后获取不同的数据段。方法二使用Faker库动态生成适用于海量随机数据from locust import HttpUser, task, between from faker import Faker class FakeDataUser(HttpUser): wait_time between(1, 5) def on_start(self): # 每个用户实例有自己的Faker生成器 self.fake Faker(locale’zh_CN’) task def create_order(self): order_data { “product_id”: self.fake.random_int(min1000, max9999), “customer_name”: self.fake.name(), “address”: self.fake.address(), “phone”: self.fake.phone_number() } self.client.post(“/order”, jsonorder_data)使用Faker可以轻松生成符合业务逻辑的、逼真的测试数据且数据量无限非常适合大规模压测。4.2 请求关联与状态保持模拟一个下单流程加入购物车 - 结算 - 支付。后一个请求依赖于前一个请求的响应结果如订单ID。class ShoppingUser(HttpUser): wait_time between(2, 5) task def purchase_flow(self): # 1. 加入购物车 cart_resp self.client.post(“/cart/add”, json{“item_id”: 123}) if cart_resp.status_code ! 200: return # 失败则终止此流程 cart_id cart_resp.json().get(“cart_id”) # 2. 创建订单依赖cart_id order_resp self.client.post(“/order/create”, json{“cart_id”: cart_id}) if order_resp.status_code ! 200: return order_id order_resp.json().get(“order_id”) # 3. 模拟支付依赖order_id pay_resp self.client.post(f”/order/{order_id}/pay”, json{“method”: “credit_card”}) # 可以在这里对支付结果进行断言 if pay_resp.status_code 200 and pay_resp.json().get(“status”) “success”: pay_resp.success() else: pay_resp.failure(“Payment failed”)这种线性的请求关联清晰地模拟了用户的完整操作链。关键在于妥善处理每一步的响应提取必要参数并传递给下一步。4.3 响应断言与复杂校验Locust的catch_responseTrue和with语句提供了强大的响应校验能力。task def check_complex_response(self): with self.client.get(“/api/complex”, catch_responseTrue, name”/api/complex [校验]”) as response: # 1. 检查状态码 if response.status_code ! 200: response.failure(f”Bad status code: {response.status_code}”) return # 状态码不对后面的校验就不做了 # 2. 尝试解析JSON try: json_data response.json() except JSONDecodeError: response.failure(“Response is not valid JSON”) return # 3. 校验JSON结构中的特定字段 if not isinstance(json_data, dict): response.failure(“Response is not a JSON object”) return expected_fields [“status”, “data”, “code”] for field in expected_fields: if field not in json_data: response.failure(f”Missing field: {field}”) return # 4. 校验业务逻辑例如code必须为0 if json_data.get(“code”) ! 0: response.failure(f”Business error, code: {json_data.get(‘code’)}”) return # 5. 校验数据内容例如data是一个列表且不为空 data json_data.get(“data”, []) if not isinstance(data, list) or len(data) 0: response.failure(“Data field is empty or not a list”) return # 所有检查通过标记为成功 response.success()通过多层级的校验你可以确保接口不仅返回了HTTP 200而且在业务逻辑和数据格式上都是正确的。name参数可以自定义该请求在统计中的名称便于区分同一个URL但不同校验逻辑的请求。5. 实战问题排查与性能调优经验录压测过程中问题往往不在被测系统而在压测脚本或工具本身。这里记录几个我高频遇到的坑和解决思路。5.1 Locust自身成为瓶颈 “Socket” 与 “CPU” 问题现象 当模拟的用户数--users达到几千时Locust Worker节点的CPU占用率飙升接近100%而实际的RPS却上不去甚至开始出现 “ConnectionError” 或 “Socket” 相关错误。根因分析默认HTTP客户端限制 Locust底层使用的requests库同步或geventhttpclient在极高并发下创建和管理大量连接会消耗大量CPU和内存。每个虚拟用户User都是一个greenlet微线程虽然轻量但数量巨大时切换和网络IO处理仍会成为负担。操作系统限制 每个客户端连接都需要一个本地端口。当短时间内建立大量连接时可能会耗尽本地可用端口尤其是在短连接场景下导致 “Address already in use” 错误。解决方案增加Worker节点 这是最直接的方法。将负载分散到多台机器上降低单机压力。使用--expect-workers确保所有节点就绪。调整Locust配置--html和--csv日志输出 在压测期间生成详细的HTML和CSV报告会消耗额外I/O和CPU。在最终正式压测时可以考虑先不生成或者生成在内存盘上。减少日志级别 通过环境变量LOCUST_LOGLEVELERROR减少控制台日志输出。优化脚本精简wait_time 如果是为了追求极限RPS可以适当减少思考时间如between(0.1, 0.3)但要注意这不符合真实用户行为。检查任务逻辑 避免在任务循环中执行复杂的计算或耗时的本地操作如频繁读写大文件。调整系统参数Linux增加本地端口范围sysctl -w net.ipv4.ip_local_port_range“1024 65535”减少TIME_WAIT时间sysctl -w net.ipv4.tcp_tw_reuse1和sysctl -w net.ipv4.tcp_fin_timeout30警告 修改系统参数有风险请在测试环境操作并充分理解其含义。5.2 结果分析中的“长尾”与“毛刺”现象 平均响应时间看起来不错但95分位或99分位值95%ile, 99%ile非常高或者响应时间曲线图上出现频繁的尖峰毛刺。排查思路检查被测系统 这是首要怀疑对象。查看应用日志、数据库监控、中间件如Redis、Kafka监控寻找慢查询、Full GC、锁竞争等迹象。使用APM工具如SkyWalking, Pinpoint定位具体慢的代码链路。检查压测机资源 使用top,vmstat,dstat等命令监控压测Worker的CPU、内存、网络带宽和磁盘I/O。如果压测机资源饱和它发出的请求本身就会不稳定产生毛刺。检查网络 在压测机和被压测服务器之间是否存在网络抖动、带宽不足或防火墙策略限制可以使用ping看延迟和丢包或mtr工具进行排查。分析Locust日志 在启动Locust时加上--logfilelocust.log --loglevelDEBUG然后分析失败请求的具体错误信息看是否是连接超时、连接被重置等网络层问题。审视压测脚本 是否存在某些特定请求如一个巨大的文件上传或一个复杂的查询本身就很慢这些请求会拉高整体分位值。可以考虑将它们分离到单独的用户类或任务中进行独立分析。5.3 分布式压测中的数据同步与一致性难题问题 在分布式模式下如果每个Worker都独立读取同一个测试数据文件如CSV可能会导致多个虚拟用户使用了同一条数据例如同一个用户名这在测试需要唯一性约束的业务时如注册、下单会引发问题。解决方案策略预分区数据 将总测试数据文件提前分割成N份N等于Worker数量每台Worker加载自己的那一份。这种方法简单但需要提前规划好数据量。使用中央数据源 所有Worker从一个共享的数据源获取数据例如Redis队列 在Master或一个独立服务中将测试数据如用户ID推入Redis的List或Set。每个Worker在虚拟用户的on_start方法中使用BRPOP或SPOP命令原子性地弹出一条数据。这能保证数据不重复。# 示例在Master或一个初始化脚本中向Redis填充数据 # r redis.Redis(...) # for user_id in range(10000): # r.lpush(‘user_queue’, user_id) # 在locustfile.py中 import redis class RedisUser(HttpUser): def on_start(self): # 每个Worker连接同一个Redis self.redis_client redis.Redis(host‘shared-redis’, port6379, decode_responsesTrue) # 阻塞弹出保证唯一性 self.user_id self.redis_client.brpop(‘user_queue’, timeout5) if not self.user_id: self.environment.runner.quit() # 数据用完了数据库带原子操作 使用数据库如MySQL的表通过SELECT ... FOR UPDATE或版本号机制来分配数据。动态生成数据 如前所述使用Faker库。只要确保每个虚拟用户实例的随机种子不同就能生成海量且不重复的数据。这是解决数据一致性最简单有效的方法尤其适合不需要严格预定义数据集的场景。5.4 如何设计一个有说服力的压测报告压测的最终产出是报告。一份好的报告不仅要罗列数据更要讲清上下文、过程和结论。你的报告应该包含测试目标 本次压测要解决什么问题例如验证系统在1000并发用户下的API响应时间是否满足200ms的SLA寻找系统的最大吞吐量瓶颈。测试环境压测环境 Locust Master/Worker的机器配置CPU、内存、网络、数量、位置。被测环境 应用服务器的配置、数量、架构图、依赖的中间件和数据库版本。务必确保环境与生产环境尽可能一致至少是等比例缩容。测试场景与脚本 简要描述模拟的用户行为用户类、任务权重、思考时间。可以附上核心脚本片段。负载模型 清晰说明并发用户数是如何增长的爬升曲线稳态持续了多久。关键性能指标KPI数据以表格形式呈现稳态期间的数据 | 指标 | 平均值 | 中位数 (50%ile) | 90%ile | 95%ile | 99%ile | 最小值 | 最大值 | | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | | 响应时间 (ms) | 45 | 32 | 89 | 150 | 420 | 10 | 1200 | | RPS (Req/s) | 1250 | - | - | - | - | 1100 | 1300 |失败率 必须低于预设目标如0.1%。资源监控图表 附上被测系统服务器在压测期间的CPU、内存、磁盘I/O、网络带宽的使用率图表。数据库的监控图表QPS、连接数、慢查询也至关重要。结果分析与结论目标是否达成例如“在1000并发用户稳态压力下所有API的95%ile响应时间均低于150ms满足200ms的目标且错误率为0。”发现了哪些瓶颈例如“当并发用户超过1500时数据库连接池出现等待应用服务器CPU利用率达到90%此时95%ile响应时间上升至500ms。初步判断瓶颈在于应用服务处理能力。”后续建议 例如“建议1. 对X服务进行代码性能优化2. 考虑将数据库连接池参数从50调整为803. 对Y接口增加缓存。”记住压测不是为了把系统打挂而是为了了解系统的行为找到其性能边界和薄弱点为容量规划和性能优化提供数据支撑。每一次压测都应该带着明确的问题开始以清晰的结论和 actionable 的建议结束。

相关新闻