
前言在大规模分页采集、分布式长任务爬虫运行过程中程序崩溃、服务器重启、网络中断、IP 封禁、进程意外退出等问题时有发生。常规爬虫一旦中断只能从头重新全量爬取不仅造成大量重复请求、浪费带宽与服务器资源还会拉长采集周期、增加目标站点反爬风险。断点续爬作为企业级爬虫必备核心机制核心设计思想是实时持久化记录当前采集进度、已完成任务、待执行任务程序异常终止或手动停止后再次启动可自动读取历史进度从中断位置继续采集无需从头遍历从根本上规避重复采集、资源浪费与低效重启问题。本文涉及断点续爬开发所需核心依赖库官方超链接redis-py Redis 进度持久化库pymysql MySQL 进度与数据存储库json Python 内置序列化库pickle Python 对象持久化库filelock 文件锁防止多进程进度冲突库本文系统讲解断点续爬底层原理、进度存储介质选型、文件型断点、Redis 分布式断点、数据库断点三种完整实现方案配套全套可运行代码、逐行原理剖析、异常断电测试、生产级容错优化适配单机爬虫、多进程爬虫、分布式集群爬虫各类场景。一、断点续爬核心原理与介质选型1.1 断点续爬核心工作逻辑爬虫启动时优先读取历史断点进度获取已采集页码、已完成 URL 列表、待执行任务队列运行过程中实时定时持久化进度快照记录当前分页号、任务偏移量、成功 / 失败任务程序异常退出、手动停止后进度数据永久保留在存储介质中下次启动自动加载断点跳过已完成部分从中断位置接续执行剩余任务。1.2 三种断点存储介质对比表格存储介质适用场景优点缺点本地 JSON 文件单机小型爬虫、简单分页采集无需额外中间件、部署简单、轻量无依赖多进程易冲突、分布式无法共享、读写性能一般Redis 缓存分布式爬虫、多节点集群、高并发采集读写速度快、天然支持多节点共享、原子性强、支持队列断点依赖 Redis 服务、增加部署组件MySQL 数据库海量任务、需要持久化归档、任务状态精细化管理数据永久落地、可记录失败任务、支持复杂状态查询读写速度低于 Redis、配置相对复杂1.3 断点续爬核心记录字段无论采用哪种存储介质断点数据必须包含核心字段保证接续准确性当前采集页码 / 任务偏移量已完成 URL 唯一集合待执行任务队列列表失败任务单独存储列表最后更新时间戳二、基础工具封装2.1 文件锁工具防止多进程进度文件覆盖python运行from filelock import FileLock # 进度文件锁避免多进程同时读写冲突 lock FileLock(crawl_progress.lock)代码原理多进程爬虫同时读写本地进度文件时极易出现内容覆盖、进度错乱FileLock 实现文件互斥锁同一时间仅允许一个进程操作进度文件保证断点数据完整性。2.2 通用请求工具python运行import requests from bs4 import BeautifulSoup def get_page_html(url): 通用页面请求封装 try: resp requests.get(url, timeout10) resp.raise_for_status() return resp.text except Exception as e: print(f请求失败{url}错误{e}) return None代码原理统一封装请求逻辑加入超时与状态码异常捕获适配后续所有断点续爬案例调用减少代码冗余。三、方案一本地 JSON 文件实现单机断点续爬3.1 进度文件结构定义采用 JSON 文件持久化保存断点信息文件内容结构如下json{ current_page: 5, finished_urls: [], fail_urls: [], update_time: 2025-01-01 12:00:00 }3.2 JSON 断点读写封装类python运行import json import os from datetime import datetime class JsonBreakPoint: def __init__(self, file_pathcrawl_progress.json): self.file_path file_path self.lock FileLock(crawl_progress.lock) # 初始化进度文件不存在则创建默认结构 self.init_progress() def init_progress(self): if not os.path.exists(self.file_path): default_data { current_page: 1, finished_urls: [], fail_urls: [], update_time: datetime.now().strftime(%Y-%m-%d %H:%M:%S) } self.save_progress(default_data) def save_progress(self, data): 保存断点进度到JSON文件 with self.lock: data[update_time] datetime.now().strftime(%Y-%m-%d %H:%M:%S) with open(self.file_path, w, encodingutf-8) as f: json.dump(data, f, ensure_asciiFalse, indent2) def load_progress(self): 读取历史断点进度 with self.lock: with open(self.file_path, r, encodingutf-8) as f: return json.load(f) def reset_progress(self): 重置断点从头开始采集 if os.path.exists(self.file_path): os.remove(self.file_path) self.init_progress()代码原理初始化自动检测进度文件不存在则生成默认起始进度读写全程加文件锁解决多进程并发冲突封装保存、读取、重置三大核心方法统一管理断点数据自动刷新更新时间戳便于人工排查任务运行状态。3.3 完整断点续爬业务实现python运行def json_breakpoint_crawl(max_page20): bp JsonBreakPoint() progress bp.load_progress() # 从断点记录的当前页码开始而非从1开始 start_page progress[current_page] base_url https://static.scrape.center/page/{} print(f检测到历史断点从第{start_page}页开始续爬) for page in range(start_page, max_page 1): url base_url.format(page) # 跳过已完成URL if url in progress[finished_urls]: print(f已完成跳过{url}) continue html get_page_html(url) if html: # 模拟解析业务 soup BeautifulSoup(html, lxml) title soup.find(h2).get_text(stripTrue) print(f正在采集第{page}页{title}) # 标记为已完成 progress[finished_urls].append(url) else: # 记录失败URL progress[fail_urls].append(url) # 实时更新断点进度 progress[current_page] page 1 bp.save_progress(progress) if __name__ __main__: # 中途强制终止程序再次运行将从断点继续 json_breakpoint_crawl(max_page20)代码原理程序启动优先加载 JSON 断点读取上次终止的页码作为起始页循环采集过程中实时记录当前页码、已完成 URL、失败 URL每采集完一页立即落地保存进度保证异常断电不丢失进度手动停止或程序崩溃后再次运行自动从记录页码接续爬取无需从头执行。四、方案二Redis 实现分布式断点续爬生产主推4.1 Redis 断点设计思路利用 Redis 字符串存储当前页码、集合存储已完成 URL、列表存储待爬任务支持多台服务器、多进程、多协程共享断点进度是分布式爬虫标准断点方案。4.2 Redis 断点封装类python运行import redis from datetime import datetime class RedisBreakPoint: def __init__(self): self.r redis.Redis( host127.0.0.1, port6379, db2, decode_responsesTrue ) # 断点Key定义 self.CURR_PAGE_KEY crawl:break:current_page self.FINISHED_SET_KEY crawl:break:finished_urls self.FAIL_LIST_KEY crawl:break:fail_urls def init_breakpoint(self, start_page1): 初始化断点无数据则设置初始页码 if not self.r.get(self.CURR_PAGE_KEY): self.r.set(self.CURR_PAGE_KEY, start_page) def set_current_page(self, page): 更新当前采集页码 self.r.set(self.CURR_PAGE_KEY, page) def get_current_page(self): 获取断点起始页码 return int(self.r.get(self.CURR_PAGE_KEY) or 1) def add_finished_url(self, url): 添加已完成URL self.r.sadd(self.FINISHED_SET_KEY, url) def is_url_finished(self, url): 判断URL是否已采集完成 return self.r.sismember(self.FINISHED_SET_KEY, url) def add_fail_url(self, url): 添加失败任务URL self.r.rpush(self.FAIL_LIST_KEY, url) def reset_breakpoint(self): 重置所有断点数据 self.r.delete(self.CURR_PAGE_KEY) self.r.delete(self.FINISHED_SET_KEY) self.r.delete(self.FAIL_LIST_KEY)代码原理采用 Redis String 存储当前页码、Set 存储已完成 URL天然去重、List 存储失败任务所有节点共享同一个 Redis 配置进度全局同步提供初始化、读写页码、标记完成、记录失败、重置断点全套方法Redis 读写为内存级操作速度远超文件与数据库高并发无压力。4.3 分布式断点续爬完整实现python运行def redis_breakpoint_crawl(max_page20): bp RedisBreakPoint() bp.init_breakpoint() start_page bp.get_current_page() base_url https://static.scrape.center/page/{} print(f分布式断点续爬从第{start_page}页开始) for page in range(start_page, max_page 1): url base_url.format(page) if bp.is_url_finished(url): print(f集群已完成跳过{url}) continue html get_page_html(url) if html: soup BeautifulSoup(html, lxml) title soup.find(h2).get_text(stripTrue) print(f分布式采集第{page}页{title}) bp.add_finished_url(url) else: bp.add_fail_url(url) # 更新断点页码 bp.set_current_page(page 1) if __name__ __main__: redis_breakpoint_crawl(20)代码原理多台服务器同时运行爬虫共享 Redis 断点进度一台采集完成后其他节点自动跳过每完成一页实时更新页码与已完成集合断电后任意节点启动均可接续失败 URL 存入 Redis 列表后续可单独批量重试完善任务闭环。五、方案三MySQL 数据库持久化断点续爬5.1 断点进度表结构sqlCREATE TABLE crawl_breakpoint ( id INT AUTO_INCREMENT PRIMARY KEY, task_name VARCHAR(100) NOT NULL COMMENT 任务名称, current_page INT DEFAULT 1 COMMENT 当前断点页码, finished_urls TEXT COMMENT 已完成URL逗号分隔, fail_urls TEXT COMMENT 失败URL, update_time DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;5.2 MySQL 断点核心封装python运行import pymysql class MysqlBreakPoint: def __init__(self): self.conn pymysql.connect( host127.0.0.1, userroot, password123456, databasecrawl_db, charsetutf8mb4 ) self.cursor self.conn.cursor() def get_task_break(self, task_name): 读取任务断点进度 sql SELECT current_page,finished_urls,fail_urls FROM crawl_breakpoint WHERE task_name%s self.cursor.execute(sql, (task_name,)) return self.cursor.fetchone() def save_task_break(self, task_name, page, finished, fail): 保存任务断点进度 sql INSERT INTO crawl_breakpoint(task_name,current_page,finished_urls,fail_urls) VALUES(%s,%s,%s,%s) ON DUPLICATE KEY UPDATE current_page%s,finished_urls%s,fail_urls%s self.cursor.execute(sql, (task_name, page, finished, fail, page, finished, fail)) self.conn.commit() def close(self): self.cursor.close() self.conn.close()代码原理采用 MySQL 主键存在则更新、不存在则插入语法实时持久化任务断点适合需要长期归档、后台管理查看进度的业务场景断点数据永久落地不丢失。六、断点续爬生产级容错优化6.1 定时快照保存不依赖单页保存进度每采集 5 页或 10 页做一次进度快照写入减少 IO 频次提升运行效率。6.2 断点数据容错兜底加载断点文件或 Redis 数据异常损坏时自动初始化默认起始进度避免程序直接崩溃。6.3 失败任务单独重试将断点中记录的失败 URL 单独拉取启动二次重试爬虫补齐漏采数据。6.4 多任务隔离断点不同采集任务使用独立断点 Key、独立文件、独立数据库记录避免任务之间进度互相干扰。6.5 自动过期清理对历史已完成任务的断点数据设置过期清理避免文件、Redis、数据库数据无限堆积。七、三种断点方案适用场景总结表格实现方案推荐场景优点概括JSON 文件断点单机小项目、无中间件依赖、本地简单采集零部署、轻量便捷Redis 断点分布式集群、多进程高并发、生产环境主力速度快、集群共享、原子性强MySQL 断点任务归档、后台管理、需要永久保存进度数据落地、可溯源、易管理