
B站评论数据获取终极指南如何用Python高效爬取4000万条评论而不被封【免费下载链接】bilibili-api哔哩哔哩常用API调用。支持视频、番剧、用户、频道、音频等功能。原仓库地址https://github.com/MoyuScript/bilibili-api项目地址: https://gitcode.com/gh_mirrors/bi/bilibili-api在数据驱动的时代B站作为中国最大的年轻人文化社区其海量用户评论数据蕴含着巨大的价值。无论是进行情感分析、内容优化还是用户行为研究获取B站评论数据都是每个数据分析师和开发者的必备技能。今天让我们一起探索如何通过bilibili-api这个强大的Python库安全高效地获取B站评论数据。 概念解析B站评论系统的三层架构要理解B站评论获取的技术原理我们需要先了解其背后的三层架构设计第一层资源类型系统B站将不同类型的评论内容抽象为统一的资源模型。无论是视频、专栏文章、动态还是音频都通过CommentResourceType枚举进行分类。这种设计让API调用变得异常简洁。第二层数据分页机制B站评论系统经历了从传统分页到懒加载的演进。早期的get_comments()函数使用页码分页而现代的get_comments_lazy()则采用偏移量机制这种设计能更好地应对海量数据的增量加载需求。第三层认证与权限体系评论数据的获取深度与用户认证状态密切相关。未登录状态下只能获取前20条评论而通过Credential认证后可以获取完整的评论数据这是B站反爬虫策略的重要组成部分。图B站API的模块化设计评论系统是其中的重要组成部分 实战演练从零开始构建评论爬虫环境准备与安装首先我们需要安装bilibili-api库。这个库提供了完整的异步支持能够高效处理网络请求# 克隆项目仓库 git clone https://gitcode.com/gh_mirrors/bi/bilibili-api cd bilibili-api # 安装依赖 pip install -r requirements.txt # 或者直接通过pip安装 pip install bilibili-api-python基础评论获取三步到位让我们从一个最简单的例子开始了解如何获取视频评论import asyncio from bilibili_api import comment, sync, Credential async def get_video_comments_basic(): 基础版视频评论获取 # 1. 准备认证信息可选但推荐 credential Credential( sessdatayour_sessdata_here, bili_jctyour_bili_jct_here, buvid3your_buvid3_here ) # 2. 获取评论数据 result await comment.get_comments_lazy( oid418788911, # 视频ID type_comment.CommentResourceType.VIDEO, offset, # 初始偏移量为空 credentialcredential ) # 3. 解析和处理数据 replies result.get(replies, []) cursor result.get(cursor, {}) print(f获取到 {len(replies)} 条评论) print(f下一页偏移量: {cursor.get(pagination_reply, {}).get(next_offset, )}) # 显示前3条评论 for i, reply in enumerate(replies[:3]): user_info reply[member] content reply[content][message] print(f{i1}. {user_info[uname]}: {content[:50]}...) # 运行示例 sync(get_video_comments_basic())进阶实现完整的评论爬取框架对于实际项目我们需要更健壮的实现import asyncio import json import time from typing import List, Dict, Optional from bilibili_api import comment, Credential from bilibili_api.exceptions import NetworkException, ResponseCodeException class BilibiliCommentCrawler: B站评论爬虫框架 def __init__(self, credential: Optional[Credential] None): self.credential credential self.request_count 0 self.comment_count 0 async def fetch_comments( self, oid: int, resource_type: comment.CommentResourceType, max_pages: int 100, batch_size: int 1000 ) - List[Dict]: 获取指定资源的所有评论 all_comments [] offset page_count 0 while True: try: # 使用指数退避策略处理网络异常 result await self._safe_request( oid, resource_type, offset ) if not result: break # 提取评论数据 replies result.get(replies, []) if replies: all_comments.extend(replies) self.comment_count len(replies) # 检查是否还有更多数据 cursor result.get(cursor, {}) is_end cursor.get(is_end, False) next_offset cursor.get(pagination_reply, {}).get(next_offset, ) if is_end or not next_offset: print(✅ 评论获取完成) break # 更新偏移量 offset next_offset page_count 1 # 进度显示 if page_count % 10 0: print(f 已获取 {page_count} 页共 {len(all_comments)} 条评论) # 安全限制 if max_pages and page_count max_pages: print(f⚠️ 达到最大页数限制: {max_pages} 页) break # 批处理保存 if len(all_comments) batch_size: await self._save_batch(all_comments, oid, page_count) all_comments [] # 请求间隔避免触发反爬 await asyncio.sleep(0.8) except Exception as e: print(f❌ 获取评论时出错: {str(e)}) await asyncio.sleep(3) # 错误后等待更长时间 continue return all_comments async def _safe_request( self, oid: int, resource_type: comment.CommentResourceType, offset: str, max_retries: int 3 ) - Optional[Dict]: 带重试机制的请求函数 for attempt in range(max_retries): try: self.request_count 1 return await comment.get_comments_lazy( oidoid, type_resource_type, offsetoffset, credentialself.credential ) except NetworkException as e: if attempt max_retries - 1: wait_time 2 ** attempt # 指数退避 print(f 网络异常{wait_time}秒后重试...) await asyncio.sleep(wait_time) else: raise e except ResponseCodeException as e: print(f API响应异常: {e.code} - {e.msg}) return None async def _save_batch(self, comments: List[Dict], oid: int, page: int): 批量保存评论数据 filename fcomments_oid_{oid}_page_{page}.json with open(filename, w, encodingutf-8) as f: json.dump(comments, f, ensure_asciiFalse, indent2) print(f 已保存第 {page} 页评论到 {filename}) async def get_statistics(self): 获取统计信息 return { total_requests: self.request_count, total_comments: self.comment_count, avg_comments_per_request: self.comment_count / self.request_count if self.request_count 0 else 0 } # 使用示例 async def main(): # 创建爬虫实例 crawler BilibiliCommentCrawler() # 获取视频评论 video_comments await crawler.fetch_comments( oid418788911, resource_typecomment.CommentResourceType.VIDEO, max_pages50 ) # 获取统计信息 stats await crawler.get_statistics() print(f\n 爬虫统计:) print(f总请求次数: {stats[total_requests]}) print(f总评论数: {stats[total_comments]}) print(f平均每请求评论数: {stats[avg_comments_per_request]:.2f}) # 分析评论数据 if video_comments: print(f\n 评论分析:) print(f热门评论按点赞排序:) sorted_comments sorted(video_comments, keylambda x: x.get(like, 0), reverseTrue) for i, cmt in enumerate(sorted_comments[:5]): print(f{i1}. {cmt[member][uname]}: {cmt[content][message][:60]}... ( {cmt.get(like, 0)})) # 运行主函数 sync(main()) 深度扩展多类型评论获取与高级技巧支持所有资源类型的通用爬虫B站评论系统支持多种内容类型我们可以创建一个通用的爬虫类from enum import Enum from dataclasses import dataclass from typing import Any, Callable dataclass class CommentSource: 评论来源配置 name: str resource_type: comment.CommentResourceType get_id_func: Callable[[Any], int] description: str class MultiSourceCommentCrawler: 多源评论爬虫 # 定义支持的评论源 SOURCES { video: CommentSource( name视频, resource_typecomment.CommentResourceType.VIDEO, get_id_funclambda x: x, # 视频直接使用aid description获取视频评论需要视频AV号 ), article: CommentSource( name专栏, resource_typecomment.CommentResourceType.ARTICLE, get_id_funclambda x: x, # 专栏使用cvid description获取专栏文章评论需要文章CV号 ), dynamic: CommentSource( name动态, resource_typecomment.CommentResourceType.DYNAMIC, get_id_funclambda x: x, # 动态使用rid description获取动态评论需要动态ID ), audio: CommentSource( name音频, resource_typecomment.CommentResourceType.AUDIO, get_id_funclambda x: x, # 音频使用auid description获取音频评论需要音频AU号 ) } def __init__(self, credential: Optional[Credential] None): self.credential credential async def crawl_by_source( self, source_type: str, source_id: int, max_pages: int 20 ) - List[Dict]: 根据来源类型爬取评论 if source_type not in self.SOURCES: raise ValueError(f不支持的来源类型: {source_type}) source self.SOURCES[source_type] print(f 开始爬取{source.name}评论ID: {source_id}) # 这里可以集成前面的fetch_comments逻辑 # ...评论数据清洗与分析获取到评论数据后我们需要进行清洗和分析import re from collections import Counter from datetime import datetime class CommentAnalyzer: 评论数据分析器 staticmethod def extract_mentions(text: str) - List[str]: 提取提及的用户 pattern r([\w\u4e00-\u9fa5\-]) return re.findall(pattern, text) staticmethod def extract_hashtags(text: str) - List[str]: 提取话题标签 pattern r#([^#\s])# return re.findall(pattern, text) staticmethod def analyze_sentiment_patterns(comments: List[Dict]) - Dict: 简单情感模式分析 positive_words [好, 赞, 支持, 喜欢, 棒, 厉害] negative_words [差, 不好, 垃圾, 讨厌, 失望] positive_count 0 negative_count 0 for cmt in comments: content cmt[content][message].lower() if any(word in content for word in positive_words): positive_count 1 if any(word in content for word in negative_words): negative_count 1 return { total_comments: len(comments), positive: positive_count, negative: negative_count, neutral: len(comments) - positive_count - negative_count } staticmethod def get_top_users(comments: List[Dict], top_n: int 10) - List[Dict]: 获取最活跃的用户 user_counter Counter() for cmt in comments: user_name cmt[member][uname] user_counter[user_name] 1 return [ {user: user, count: count} for user, count in user_counter.most_common(top_n) ] 场景扩展不同应用场景的实战方案场景一内容创作者的数据分析如果你是内容创作者需要分析自己视频的评论反馈async def analyze_creator_content(video_aids: List[int], credential: Credential): 创作者内容分析 crawler BilibiliCommentCrawler(credential) all_insights [] for aid in video_aids: print(f\n 分析视频 AV{aid}) comments await crawler.fetch_comments( oidaid, resource_typecomment.CommentResourceType.VIDEO, max_pages30 ) if comments: # 分析情感倾向 sentiment CommentAnalyzer.analyze_sentiment_patterns(comments) # 提取常见问题 question_pattern r[?] questions [c for c in comments if re.search(question_pattern, c[content][message])] # 获取活跃用户 top_users CommentAnalyzer.get_top_users(comments) insights { video_aid: aid, total_comments: len(comments), sentiment_analysis: sentiment, questions_count: len(questions), top_users: top_users[:5], avg_likes: sum(c.get(like, 0) for c in comments) / len(comments) } all_insights.append(insights) # 生成报告 print(f 评论总数: {insights[total_comments]}) print(f 积极评论: {insights[sentiment_analysis][positive]}) print(f 问题数量: {insights[questions_count]}) print(f 平均点赞: {insights[avg_likes]:.1f}) return all_insights场景二竞品分析的数据采集对于竞品分析我们需要同时监控多个频道的评论数据import asyncio from typing import Dict, List class CompetitiveAnalysis: 竞品分析数据采集 def __init__(self, credential: Credential): self.credential credential self.crawlers {} async def monitor_channels( self, channel_configs: List[Dict], interval_hours: int 24 ): 监控多个频道的评论数据 while True: print(f\n⏰ 开始新一轮数据采集: {datetime.now()}) tasks [] for config in channel_configs: task self._collect_channel_data(config) tasks.append(task) # 并发采集数据 results await asyncio.gather(*tasks, return_exceptionsTrue) # 处理结果 for config, result in zip(channel_configs, results): if isinstance(result, Exception): print(f❌ 采集失败 {config[name]}: {result}) else: print(f✅ 采集成功 {config[name]}: {len(result)} 条评论) # 等待下次采集 print(f⏳ 等待 {interval_hours} 小时后进行下次采集...) await asyncio.sleep(interval_hours * 3600) async def _collect_channel_data(self, config: Dict) - List[Dict]: 采集单个频道数据 crawler BilibiliCommentCrawler(self.credential) return await crawler.fetch_comments( oidconfig[source_id], resource_typeconfig[resource_type], max_pagesconfig.get(max_pages, 20) )⚠️ 避坑指南常见问题与解决方案问题1403 Forbidden错误症状请求频繁返回403状态码原因B站的反爬虫机制检测到异常请求模式解决方案添加有效的Credential认证信息使用get_comments_lazy()替代旧的get_comments()在请求间添加随机延迟0.5-2秒使用代理IP轮换# 正确的请求间隔设置 import random async def safe_request_with_delay(): # 添加随机延迟避免固定间隔 delay random.uniform(0.5, 2.0) await asyncio.sleep(delay) # 执行请求...问题2获取评论不全症状只能获取到前20条评论原因未登录状态下的限制解决方案确保提供完整的Credential信息验证sessdata、bili_jct、buvid3等参数的正确性使用login_v2模块重新获取认证信息问题3offset参数使用错误症状无法获取下一页评论或者重复获取相同数据原因offset参数使用不当正确用法# 第一次请求 result1 await comment.get_comments_lazy(oid, type_, offset) # 获取下一页的offset next_offset result1[cursor][pagination_reply][next_offset] # 第二次请求使用正确的offset result2 await comment.get_comments_lazy(oid, type_, offsetnext_offset)问题4内存占用过高症状处理大量评论时内存使用量激增解决方案使用分批处理每1000条评论保存一次使用生成器模式流式处理及时清理不需要的数据async def stream_comments(oid, type_, batch_size500): 流式处理评论数据 offset while True: result await comment.get_comments_lazy(oid, type_, offset) if not result.get(replies): break # 分批yield数据 for i in range(0, len(result[replies]), batch_size): batch result[replies][i:i batch_size] yield batch # 更新offset offset result[cursor][pagination_reply][next_offset] if not offset: break 下一步行动立即开始你的B站数据分析之旅现在你已经掌握了bilibili-api评论获取的核心技术是时候开始实践了第一步环境搭建克隆项目仓库git clone https://gitcode.com/gh_mirrors/bi/bilibili-api安装依赖pip install -r requirements.txt查看官方文档docs/modules/comment.md第二步获取认证信息使用bilibili_api.login_v2模块获取你的认证信息或者通过浏览器开发者工具提取cookies。第三步从简单示例开始从本文的基础示例代码开始先获取少量评论数据熟悉API的返回结构。第四步构建自己的分析工具根据你的具体需求扩展本文提供的框架添加数据可视化、情感分析、趋势预测等功能。第五步贡献与分享如果你发现了更好的使用方法或者修复了bug欢迎向项目贡献代码。bilibili-api是一个开源项目社区的力量让它变得更加强大。记住技术的学习是一个持续的过程。B站的API也在不断更新保持对bilibili_api/comment.py源码的关注及时了解最新的变化。祝你在B站数据分析的道路上取得成功图B站前端数据结构示例理解这些结构有助于更好地解析API返回的数据【免费下载链接】bilibili-api哔哩哔哩常用API调用。支持视频、番剧、用户、频道、音频等功能。原仓库地址https://github.com/MoyuScript/bilibili-api项目地址: https://gitcode.com/gh_mirrors/bi/bilibili-api创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考