第 3 篇:频率控制与限流——别因为太贪心先被封

发布时间:2026/5/21 7:04:52

第 3 篇:频率控制与限流——别因为太贪心先被封 一、为什么一加多线程就被封常见路径for循环太慢 → 上多线程/协程 → QPS 飙高 → 很快出现429 Too Many Requests403 Forbidden200 但内容是风控页/验证码提示站点往往按 IP / 账号 / 接口 做时间窗口统计。你一分钟打几百次同一接口和真人行为差太远先封频率是最便宜的反爬手段。结论对抗频率层目标不是「最快」而是 在可接受时间内平稳拉完数据。二、三个词限速、抖动、退避概念含义限速控制单位时间内请求次数如每域名 2 QPS抖动间隔加随机数避免固定 1.000s、2.000s 的「机器节奏」退避遇 429/风控先降温sleep、降 QPS不要死循环重试三、最简单按固定间隔 抖动适合单进程、快速止血import time import random class RateLimiter: def __init__(self, min_interval: float 1.5): self.min_interval min_interval self._last_ts 0.0 def wait(self): now time.time() gap now - self._last_ts if gap self.min_interval: time.sleep(self.min_interval - gap random.uniform(0, 0.3)) self._last_ts time.time()用法每次requests前调用limiter.wait()。四、令牌桶控制平均 QPS 允许小幅突发比固定间隔更灵活长期平均不超过 rate短时允许 capacity 个突发。import time import threading class TokenBucket: def __init__(self, rate: float, capacity: int): rate: 每秒补充令牌数期望 QPS capacity: 桶容量最大突发 self._rate max(rate, 1e-6) self._capacity capacity self._tokens float(capacity) self._last_ts time.time() self._lock threading.Lock() def _refill(self): now time.time() delta now - self._last_ts self._last_ts now self._tokens min(self._capacity, self._tokens delta * self._rate) def consume(self, tokens: float 1.0): while True: with self._lock: self._refill() if self._tokens tokens: self._tokens - tokens return need tokens - self._tokens wait_time need / self._rate time.sleep(min(wait_time, 2.0))多线程共用一个桶整体 QPS 会被压在约rate附近。五、多站点分桶A 站限流别拖死 B 站按 域名 各放一个桶避免一个站打满连累别的站。from urllib.parse import urlparse class MultiSiteLimiter: def __init__(self, default_rate1.5, default_capacity5): self.default_rate default_rate self.default_capacity default_capacity self._buckets {} self._lock threading.Lock() def _host(self, url: str) - str: h urlparse(url).netloc return h if h else default def wait(self, url: str, rate: float None, capacity: int None): host self._host(url) with self._lock: if host not in self._buckets: r rate if rate is not None else self.default_rate c capacity if capacity is not None else self.default_capacity self._buckets[host] TokenBucket(r, c) self._buckets[host].consume(1)可选为特定域名单独配置例如per_host{api.xxx.com: (0.5, 3)}在wait里查表即可可自行扩展。六、限流参数怎么选干货表场景建议 rateQPScapacity说明个人博客、小站0.3135宁可慢一般内容站1258先看 robots/用户协议明显有风控的站0.20.524配合更长冷却需登录的接口每账号再限一层-账号池各带一桶原则从保守开始根据成功率再慢慢调高一旦出现 429/风控关键词先减半 rate别硬顶。七、响应分类 遇 429/风控自动降温def is_block_page(html: str) - bool: if not html: return False keys [验证码, 安全验证, 访问过于频繁, 访问受限, verify you are human, robot check] low html.lower() return any(k.lower() in low for k in keys) def classify_response(status_code: int, text: str) - str: if status_code 429: return rate_limited if status_code 403: return forbidden if status_code 200: if not (text or ).strip(): return empty if is_block_page(text): return blocked return ok return unknown自适应降速某 host 连续多次rate_limited/blocked对该 host 临时把rate * 0.5或进入冷却 60s见下节完整 Worker。八、完整示例队列 多 Worker 多站限流 429 降速下面是一段可直接改 URL 跑通结构的完整脚本请替换为你自己的合法目标与频率。import queue import random import threading import time import requests from dataclasses import dataclass from urllib.parse import urlparse # TokenBucket MultiSiteLimiter同上此处合并 class TokenBucket: def __init__(self, rate: float, capacity: int): self._rate max(rate, 1e-6) self._capacity capacity self._tokens float(capacity) self._last_ts time.time() self._lock threading.Lock() def _refill(self): now time.time() delta now - self._last_ts self._last_ts now self._tokens min(self._capacity, self._tokens delta * self._rate) def consume(self, tokens: float 1.0): while True: with self._lock: self._refill() if self._tokens tokens: self._tokens - tokens return need tokens - self._tokens wait_time need / self._rate time.sleep(min(wait_time, 2.0)) def slow_down(self, factor: float 0.5): with self._lock: self._rate max(self._rate * factor, 0.05) class HostRateState: def __init__(self, rate: float, capacity: int): self.bucket TokenBucket(rate, capacity) self.bad_streak 0 self.cooldown_until 0.0 self.lock threading.Lock() def on_bad(self): with self.lock: self.bad_streak 1 if self.bad_streak 3: self.bucket.slow_down(0.5) self.bad_streak 0 self.cooldown_until time.time() 30 def on_ok(self): with self.lock: self.bad_streak 0 def in_cooldown(self) - bool: return time.time() self.cooldown_until class MultiSiteLimiter: def __init__(self, default_rate1.0, default_capacity5): self.default_rate default_rate self.default_capacity default_capacity self._hosts {} self._lock threading.Lock() def _get_state(self, url: str) - HostRateState: host urlparse(url).netloc or default with self._lock: if host not in self._hosts: self._hosts[host] HostRateState(self.default_rate, self.default_capacity) return self._hosts[host] def wait(self, url: str): st self._get_state(url) if st.in_cooldown(): time.sleep(min(st.cooldown_until - time.time(), 10) random.uniform(0, 1)) st.bucket.consume(1) def report(self, url: str, kind: str): st self._get_state(url) if kind in (rate_limited, blocked, forbidden): st.on_bad() elif kind ok: st.on_ok() def classify_response(status_code: int, text: str) - str: if status_code 429: return rate_limited if status_code 403: return forbidden if status_code 200: t (text or ).strip() if not t: return empty low t.lower() for k in [验证码, 安全验证, 访问过于频繁, 访问受限, verify you are human]: if k.lower() in low: return blocked return ok return unknown dataclass class Task: url: str def worker(name: str, q: queue.Queue, limiter: MultiSiteLimiter, session: requests.Session): headers { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36, Accept-Language: zh-CN,zh;q0.9, } while True: try: task q.get(timeout5) except queue.Empty: print(f[{name}] 队列空退出) return try: limiter.wait(task.url) r session.get(task.url, headersheaders, timeout15) kind classify_response(r.status_code, r.text) limiter.report(task.url, kind) print(f[{name}] {task.url[:60]}... - {kind} {r.status_code} len{len(r.text)}) if kind in (rate_limited, blocked): time.sleep(5 random.uniform(0, 3)) except Exception as e: print(f[{name}] err {e}) finally: q.task_done() if __name__ __main__: # 示例请换成你有权访问的 URL禁止对未授权站点高频访问 urls [ https://httpbin.org/get, https://httpbin.org/status/429, ] * 5 task_q queue.Queue() for u in urls: task_q.put(Task(urlu)) limiter MultiSiteLimiter(default_rate1.5, default_capacity4) sess requests.Session() threads [] for i in range(3): t threading.Thread(targetworker, args(fW{i}, task_q, limiter, sess), daemonTrue) t.start() threads.append(t) task_q.join() print(done)说明httpbin.org/status/429用于本地看 429 → report → slow_down 行为真实站点请降低 rate、减少并发。九、排查清单是否全站共用一个限流 → 改为按 host 分桶。间隔是否完全固定 → 加random.uniform抖动。429 后是否立即重试 → 先 sleep 再试并降低 QPS。多线程是否每线程各自限流 → 应共享同一站点的桶。是否忽略 200 风控页 → 用classify_response识别关键词。登录态是否多线程乱用 → 会话与账号池见系列第 4 篇。十、系列导航 关注说明第 1 篇反爬全景图第 2 篇请求层Header、Session第 3 篇本篇频率、令牌桶、多站分桶、队列 Worker、429 降速第 4 篇会话与 Cookie / 账号池第 5 篇Playwright requests

相关新闻