小红书数据采集实战:3种高效使用Python SDK的完整方案

发布时间:2026/6/12 11:10:05

小红书数据采集实战:3种高效使用Python SDK的完整方案 小红书数据采集实战3种高效使用Python SDK的完整方案【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs小红书数据采集已成为数据分析师和开发者获取市场洞察的重要途径。xhs是一个基于小红书Web端请求封装的Python SDK为开发者提供了完整的小红书数据采集解决方案。这个工具包封装了复杂的网络请求和签名逻辑让开发者能够高效、稳定地获取平台公开数据包括笔记内容、用户信息和搜索数据等。 快速安装与环境配置基础安装方法xhs可以通过多种方式快速安装满足不同开发场景的需求# 使用pip从PyPI安装稳定版本 pip install xhs # 从Git仓库安装最新开发版本 pip install githttps://gitcode.com/gh_mirrors/xh/xhs # 从源码安装进行本地开发 git clone https://gitcode.com/gh_mirrors/xh/xhs cd xhs pip install -e .核心依赖与环境要求项目主要依赖以下Python库requests- HTTP请求处理playwright- 浏览器自动化签名lxml- HTML解析pytest- 测试框架配置环境时确保安装Python 3.7版本并按照示例代码example/basic_usage.py中的指引设置必要的签名函数。 核心功能深度解析1. 客户端初始化与认证配置小红书数据采集的核心在于正确的客户端初始化。xhs SDK提供了灵活的配置选项from xhs import XhsClient def custom_sign(uri, dataNone, a1, web_session): 自定义签名函数实现 # 实现签名逻辑 return {x-s: signature, x-t: timestamp} # 初始化客户端 cookie your_cookie_string xhs_client XhsClient( cookiecookie, signcustom_sign, # 自定义签名函数 timeout30, # 请求超时时间 proxiesNone # 代理设置 )签名函数是小红书数据采集的关键组件它负责生成请求所需的加密参数。参考核心源码xhs/core.py了解完整的签名实现细节。2. 笔记数据采集技术获取小红书笔记数据是SDK的核心功能之一# 获取单个笔记详情 note_id 6505318c000000001f03c5a6 note_data xhs_client.get_note_by_id(note_id) # 解析笔记内容 note_info { 标题: note_data.get(title, ), 作者: note_data.get(user, {}).get(nickname, ), 点赞数: note_data.get(likes, 0), 收藏数: note_data.get(collects, 0), 评论数: note_data.get(comments, 0), 发布时间: note_data.get(time, 0) } # 提取多媒体内容 from xhs.help import get_imgs_url_from_note, get_video_url_from_note image_urls get_imgs_url_from_note(note_data) video_url get_video_url_from_note(note_data)3. 搜索与内容发现功能xhs SDK支持多种搜索条件和排序方式from xhs import SearchSortType, SearchNoteType # 基础关键词搜索 search_results xhs_client.search( keywordPython编程教程, sortSearchSortType.GENERAL, note_typeSearchNoteType.ALL ) # 高级搜索配置 advanced_search xhs_client.search( keyword数据分析, sortSearchSortType.TIME_DESCENDING, # 按时间排序 note_typeSearchNoteType.VIDEO, # 仅视频笔记 page1, # 页码 page_size20 # 每页数量 ) # 处理搜索结果 for item in search_results.get(items, []): print(f笔记ID: {item[id]}) print(f标题: {item.get(title, 无标题)}) print(f互动数据: {item.get(likes, 0)}赞) 高级功能应用场景1. 内容分类数据采集小红书内容按分类组织xhs SDK支持按分类获取内容from xhs import FeedType # 获取不同分类的内容流 feed_types { 美食: FeedType.FOOD, 穿搭: FeedType.FASION, 旅行: FeedType.TRAVEL, 健身: FeedType.FITNESS, 游戏: FeedType.GAME } def fetch_category_feed(category_name, feed_type): 获取指定分类的内容 try: feed_data xhs_client.get_home_feed(feed_typefeed_type) print(f成功获取{category_name}分类内容共{len(feed_data.get(items, []))}条) return feed_data except Exception as e: print(f获取{category_name}分类失败: {str(e)}) return None # 批量获取多个分类内容 for category, feed_type in feed_types.items(): data fetch_category_feed(category, feed_type)2. 用户数据采集与分析获取用户信息和用户发布的内容def get_user_info(user_id): 获取用户详细信息 user_data xhs_client.get_user_info(user_id) user_profile { 用户ID: user_data.get(user_id), 昵称: user_data.get(nickname), 粉丝数: user_data.get(fans_count, 0), 关注数: user_data.get(follows_count, 0), 笔记数: user_data.get(notes_count, 0), 获赞数: user_data.get(liked_count, 0) } return user_profile def get_user_notes(user_id, limit50): 获取用户发布的笔记列表 notes [] page 1 while len(notes) limit: user_notes xhs_client.get_user_notes(user_id, pagepage) if not user_notes or not user_notes.get(items): break notes.extend(user_notes[items]) page 1 # 避免请求过于频繁 import time time.sleep(1) return notes[:limit]⚡ 性能优化与最佳实践1. 请求频率控制策略避免被平台限制的关键在于合理的请求频率控制import time from datetime import datetime from collections import deque class RateLimiter: 请求频率限制器 def __init__(self, max_requests5, time_window60): self.max_requests max_requests self.time_window time_window self.request_times deque() def wait_if_needed(self): 如果需要则等待 now time.time() # 清理过期请求记录 while self.request_times and now - self.request_times[0] self.time_window: self.request_times.popleft() if len(self.request_times) self.max_requests: wait_time self.time_window - (now - self.request_times[0]) print(f[{datetime.now()}] 达到频率限制等待 {wait_time:.1f} 秒) time.sleep(wait_time 0.5) self.request_times.append(time.time()) # 使用示例 limiter RateLimiter(max_requests3, time_window60) def safe_request(func, *args, **kwargs): 安全的请求包装器 limiter.wait_if_needed() return func(*args, **kwargs)2. 错误处理与重试机制健壮的错误处理是数据采集系统的关键import random from functools import wraps def retry_on_failure(max_retries3, base_delay1): 失败重试装饰器 def decorator(func): wraps(func) def wrapper(*args, **kwargs): last_exception None for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception e if attempt max_retries - 1: break # 指数退避策略 delay base_delay * (2 ** attempt) random.uniform(0, 0.5) print(f第{attempt1}次尝试失败: {str(e)}等待{delay:.1f}秒后重试) time.sleep(delay) raise Exception(f所有{max_retries}次尝试均失败: {str(last_exception)}) return wrapper return decorator retry_on_failure(max_retries3, base_delay2) def robust_get_note(note_id): 健壮的笔记获取函数 return xhs_client.get_note_by_id(note_id)3. 数据持久化存储方案import json import sqlite3 from datetime import datetime class XhsDataStorage: 小红书数据存储管理器 def __init__(self, db_pathxhs_data.db): self.db_path db_path self._init_database() def _init_database(self): 初始化数据库结构 conn sqlite3.connect(self.db_path) cursor conn.cursor() # 创建笔记数据表 cursor.execute( CREATE TABLE IF NOT EXISTS notes ( note_id TEXT PRIMARY KEY, title TEXT, content TEXT, user_id TEXT, likes INTEGER DEFAULT 0, collects INTEGER DEFAULT 0, comments INTEGER DEFAULT 0, publish_time DATETIME, category TEXT, tags TEXT, raw_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) # 创建用户数据表 cursor.execute( CREATE TABLE IF NOT EXISTS users ( user_id TEXT PRIMARY KEY, nickname TEXT, avatar_url TEXT, notes_count INTEGER DEFAULT 0, fans_count INTEGER DEFAULT 0, following_count INTEGER DEFAULT 0, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ) conn.commit() conn.close() def save_note(self, note_data): 保存笔记数据 conn sqlite3.connect(self.db_path) cursor conn.cursor() cursor.execute( INSERT OR REPLACE INTO notes (note_id, title, content, user_id, likes, collects, comments, publish_time, category, tags, raw_data) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) , ( note_data.get(id), note_data.get(title, ), note_data.get(desc, ), note_data.get(user, {}).get(user_id), note_data.get(likes, 0), note_data.get(collects, 0), note_data.get(comments, 0), datetime.fromtimestamp(note_data.get(time, 0) / 1000), note_data.get(category, ), json.dumps(note_data.get(tag_list, [])), json.dumps(note_data, ensure_asciiFalse) )) conn.commit() conn.close()️ 常见问题解决方案1. 签名验证失败处理签名失败是小红书数据采集中最常见的问题def enhanced_sign_function(uri, dataNone, a1, web_session): 增强版签名函数 import time from playwright.sync_api import sync_playwright for retry_count in range(3): # 最多重试3次 try: with sync_playwright() as playwright: browser playwright.chromium.launch(headlessTrue) context browser.new_context() page context.new_page() # 访问小红书首页初始化环境 page.goto(https://www.xiaohongshu.com) # 设置必要的cookie if a1: context.add_cookies([ { name: a1, value: a1, domain: .xiaohongshu.com, path: / } ]) page.reload() # 增加等待时间确保页面加载完成 time.sleep(2) # 执行签名函数 encrypt_params page.evaluate( ([url, data]) window._webmsxyw(url, data), [uri, data] ) browser.close() return { x-s: encrypt_params.get(X-s, ), x-t: str(encrypt_params.get(X-t, )) } except Exception as e: if retry_count 2: # 最后一次尝试 raise Exception(f签名失败: {str(e)}) # 指数退避等待 wait_time (retry_count 1) * 2 print(f第{retry_count1}次签名失败等待{wait_time}秒后重试) time.sleep(wait_time)2. IP限制与代理配置处理IP限制问题的策略class ProxyRotator: 代理轮换管理器 def __init__(self, proxy_list): self.proxy_list proxy_list self.current_proxy None self.failed_proxies set() def get_next_proxy(self): 获取下一个可用代理 available_proxies [ p for p in self.proxy_list if p not in self.failed_proxies ] if not available_proxies: # 重置失败代理列表 self.failed_proxies.clear() available_proxies self.proxy_list self.current_proxy available_proxies[0] return self.current_proxy def mark_proxy_failed(self, proxy): 标记代理失败 self.failed_proxies.add(proxy) print(f代理 {proxy} 标记为失败剩余可用代理: {len(self.proxy_list) - len(self.failed_proxies)}) # 使用代理的客户端配置 proxies [ http://proxy1.example.com:8080, http://proxy2.example.com:8080, http://proxy3.example.com:8080 ] proxy_rotator ProxyRotator(proxies) def create_client_with_proxy(): 创建带代理的客户端 proxy proxy_rotator.get_next_proxy() return XhsClient( cookieyour_cookie, proxies{ http: proxy, https: proxy }, timeout30 )3. 数据格式兼容性处理处理小红书API数据格式变化def safe_data_parser(response_data, data_typenote): 安全的数据解析器 try: if data_type note: return parse_note_data(response_data) elif data_type user: return parse_user_data(response_data) elif data_type search: return parse_search_data(response_data) else: return response_data except Exception as e: print(f数据解析失败: {str(e)}) return None def parse_note_data(note_response): 解析笔记数据兼容不同格式 # 尝试多种可能的响应格式 data note_response.get(data) or note_response if not data: return None # 兼容不同字段名 result { id: data.get(id) or data.get(note_id) or , title: data.get(title) or data.get(note_title) or data.get(desc, )[:200], desc: data.get(desc) or data.get(content) or , user: { user_id: data.get(user, {}).get(user_id) or data.get(author, {}).get(user_id) or data.get(user_id, ), nickname: data.get(user, {}).get(nickname) or data.get(author, {}).get(nickname) or 未知用户 }, stats: { likes: data.get(likes) or data.get(like_count) or 0, collects: data.get(collects) or data.get(collect_count) or 0, comments: data.get(comments) or data.get(comment_count) or 0, shares: data.get(share_count) or 0 }, media: { images: data.get(image_list) or data.get(images) or [], video: data.get(video) or data.get(video_info) or {} }, time: data.get(time) or data.get(create_time) or 0, raw_data: data # 保留原始数据 } return result 实战应用案例1. 竞品内容监控系统构建自动化竞品监控平台import schedule import time from datetime import datetime class CompetitorMonitor: def __init__(self, xhs_client, competitors, check_interval3600): self.xhs_client xhs_client self.competitors competitors self.check_interval check_interval self.storage XhsDataStorage() def monitor_competitor(self, competitor_name): 监控指定竞品 print(f[{datetime.now()}] 开始监控竞品: {competitor_name}) try: # 搜索竞品相关内容 search_results self.xhs_client.search( keywordcompetitor_name, sortSearchSortType.TIME_DESCENDING, page_size20 ) new_notes 0 for note in search_results.get(items, []): # 检查是否为新内容 if self.is_new_content(note[id]): self.storage.save_note(note) self.analyze_content(note) new_notes 1 print(f[{datetime.now()}] 发现 {new_notes} 条新内容) except Exception as e: print(f[{datetime.now()}] 监控失败: {str(e)}) def is_new_content(self, note_id): 检查是否为新的内容 # 实现数据库查询逻辑 return True def analyze_content(self, note_data): 分析内容数据 engagement_rate ( note_data.get(likes, 0) note_data.get(collects, 0) ) / max(note_data.get(views, 1), 1) print(f内容分析 - 互动率: {engagement_rate:.2%}) print(f关键词: {self.extract_keywords(note_data.get(title, ))}) def start_monitoring(self): 启动监控服务 for competitor in self.competitors: schedule.every(self.check_interval).seconds.do( self.monitor_competitor, competitor ) print(f[{datetime.now()}] 监控服务已启动共监控 {len(self.competitors)} 个竞品) while True: schedule.run_pending() time.sleep(60)2. 内容趋势分析工具分析小红书内容趋势import pandas as pd from collections import Counter from datetime import datetime, timedelta class ContentTrendAnalyzer: def __init__(self, xhs_client): self.xhs_client xhs_client def analyze_category_trends(self, category, days7): 分析分类趋势 trends_data [] # 获取分类内容 feed_type self.get_feed_type_by_category(category) if not feed_type: return None notes self.xhs_client.get_home_feed(feed_typefeed_type) for note in notes.get(items, [])[:50]: # 分析前50条 trends_data.append({ title: note.get(title, ), likes: note.get(likes, 0), collects: note.get(collects, 0), comments: note.get(comments, 0), publish_time: datetime.fromtimestamp(note.get(time, 0) / 1000), category: category }) # 创建分析数据框 df pd.DataFrame(trends_data) # 计算关键指标 analysis_result { category: category, total_notes: len(df), avg_likes: df[likes].mean(), avg_collects: df[collects].mean(), avg_comments: df[comments].mean(), top_keywords: self.extract_trending_keywords(df[title]), trending_posts: df.nlargest(5, likes)[[title, likes]].to_dict(records) } return analysis_result def extract_trending_keywords(self, titles): 提取热门关键词 all_words [] for title in titles: if isinstance(title, str): # 简单的关键词提取逻辑 words title.replace(#, ).split() all_words.extend([w for w in words if len(w) 1]) word_counts Counter(all_words) return word_counts.most_common(10) def get_feed_type_by_category(self, category): 根据分类获取FeedType category_map { 美食: FeedType.FOOD, 穿搭: FeedType.FASION, 旅行: FeedType.TRAVEL, 健身: FeedType.FITNESS, 游戏: FeedType.GAME, 影视: FeedType.MOVIE, 职场: FeedType.CAREER, 情感: FeedType.EMOTION, 家居: FeedType.HOURSE, 彩妆: FeedType.COSMETICS } return category_map.get(category) 性能优化建议1. 异步请求处理对于大规模数据采集建议使用异步处理import asyncio import aiohttp from concurrent.futures import ThreadPoolExecutor class AsyncXhsClient: 异步小红书客户端 def __init__(self, cookie, max_concurrent10): self.cookie cookie self.max_concurrent max_concurrent self.semaphore asyncio.Semaphore(max_concurrent) async def fetch_multiple_notes(self, note_ids): 批量获取笔记数据 tasks [] for note_id in note_ids: task asyncio.create_task( self.get_note_with_semaphore(note_id) ) tasks.append(task) results await asyncio.gather(*tasks, return_exceptionsTrue) return results async def get_note_with_semaphore(self, note_id): 使用信号量控制并发 async with self.semaphore: return await self._get_note_async(note_id) async def _get_note_async(self, note_id): 异步获取笔记 # 实现异步请求逻辑 await asyncio.sleep(0.1) # 模拟网络延迟 return {id: note_id, status: success}2. 缓存策略优化import hashlib import pickle from functools import lru_cache from datetime import datetime, timedelta class DataCache: 数据缓存管理器 def __init__(self, ttl_hours24): self.cache {} self.ttl timedelta(hoursttl_hours) def get_cache_key(self, func_name, *args, **kwargs): 生成缓存键 key_data f{func_name}:{str(args)}:{str(kwargs)} return hashlib.md5(key_data.encode()).hexdigest() def get(self, key): 获取缓存数据 if key in self.cache: data, timestamp self.cache[key] if datetime.now() - timestamp self.ttl: return data else: del self.cache[key] return None def set(self, key, data): 设置缓存数据 self.cache[key] (data, datetime.now()) lru_cache(maxsize100) def cached_get_note(self, note_id): 带缓存的笔记获取 cache_key self.get_cache_key(get_note_by_id, note_id) cached_data self.get(cache_key) if cached_data: print(f从缓存获取笔记 {note_id}) return cached_data # 实际获取数据 data self.xhs_client.get_note_by_id(note_id) self.set(cache_key, data) return data 开始你的小红书数据采集项目现在你已经掌握了xhs Python SDK的核心功能和最佳实践。要开始你的项目安装依赖按照快速安装指南设置环境配置认证获取有效的小红书cookie并配置签名函数测试连接使用示例代码example/basic_usage.py验证连接开发功能根据你的需求选择合适的API方法优化性能应用本文中的优化策略提升采集效率记住技术是工具合规使用是关键。合理运用xhs SDK进行小红书数据采集将为你的数据分析项目提供强有力的支持帮助你在小红书内容生态中获得有价值的市场洞察。如需了解更多高级功能和配置选项请参考核心源码xhs/core.py和官方文档docs/source/xhs.rst。开始构建你的小红书数据采集系统吧【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考

相关新闻