工业级大模型学习之路024:LangChain零基础入门教程(第七篇):RAG 系统评估、全链路调优

发布时间:2026/5/25 2:03:47

工业级大模型学习之路024:LangChain零基础入门教程(第七篇):RAG 系统评估、全链路调优 一、理论基础RAG 系统评估体系1.1 为什么必须做 RAG 评估1.1.1 原型与生产的核心差距原型只要能回答几个测试问题就算成功生产必须保证 99% 以上的场景稳定可靠回答准确率≥90%幻觉率≤5%核心差距可量化、可迭代、可复现1.1.2 RAG 失败的三大根源按影响程度排序检索失败占比 60%-70%没有找到正确的信息生成失败占比 20%-30%找到了正确的信息但没有正确使用提示词失败占比 5%-10%提示词没有明确约束大模型的行为1.1.3 只看 能否回答 的三大陷阱回答正确但来源错误大模型用自己的知识回答而不是知识库的知识回答流畅但存在幻觉编造了知识库中没有的信息简单问题能答复杂问题全错系统没有泛化能力1.2 RAG 评估的三大核心维度评估维度评估对象核心问题关键指标评估方法检索质量评估向量检索 重排序有没有找到正确的信息Recallk、Precisionk、F1k、MRR自动评估生成质量评估LLM 生成环节有没有正确使用找到的信息忠实度、相关性、完整性、幻觉率LLM 自动评估 人工抽样端到端评估整个 RAG 系统用户是否满意自然度、流畅度、响应延迟人工评估 性能测试1.3 核心评估指标详解1.3.1 检索指标Recallk召回率 k定义前 k 个检索结果中包含相关文档的比例公式Recallk 检索到的相关文档数 / 总相关文档数意义衡量系统 有没有漏掉有用信息 的能力工业级要求Recall3 ≥ 90%Recall5 ≥ 95%Precisionk精确率 k定义前 k 个检索结果中相关文档的比例公式Precisionk 检索到的相关文档数 / k意义衡量系统 有没有混入无用信息 的能力工业级要求Precision3 ≥ 80%F1kF1 分数 k定义召回率和精确率的调和平均数公式F1k 2 * (Precisionk * Recallk) / (Precisionk Recallk)意义综合衡量检索效果工业级要求F13 ≥ 85%MRR平均倒数排名定义第一个相关文档出现位置的倒数的平均值公式MRR (1/n) * Σ(1/rank_i)其中 rank_i 是第 i 个查询第一个相关文档的排名意义衡量系统 把最相关的文档排在前面 的能力工业级要求MRR ≥ 0.81.3.2 生成指标忠实度Faithfulness定义生成内容与检索上下文的一致程度评分标准0-1 分1 分表示完全基于上下文0 分表示完全编造工业级要求平均忠实度 ≥ 0.9相关性Relevance定义生成内容与用户问题的相关程度评分标准0-1 分1 分表示完全回答了问题0 分表示完全不相关工业级要求平均相关性 ≥ 0.9完整性Completeness定义生成内容覆盖问题要点的比例评分标准0-1 分1 分表示覆盖了所有要点0 分表示一个要点都没覆盖工业级要求平均完整性 ≥ 0.85幻觉率Hallucination Rate定义生成内容中编造信息的比例公式幻觉率 存在幻觉的回答数 / 总回答数工业级要求幻觉率 ≤ 5%1.3.3 性能指标平均响应时间从用户发送请求到收到完整回答的时间工业级要求 ≤ 3 秒95 分位响应时间95% 的请求能在该时间内完成工业级要求 ≤ 5 秒并发处理能力系统每秒能处理的请求数QPS资源占用CPU、内存、GPU 的平均使用率1.4 评估数据集构建工业级最佳实践1.4.1 黄金标准数据集Gold Dataset定义包含 问题 - 标准答案 - 相关文档 ID 三元组的数据集构建方法自动生成 人工审核效率和质量的最佳平衡规模至少 100 个问题覆盖系统的主要使用场景1.4.2 自动生成评估数据集从知识库中随机抽取 N 个文档分块用大模型基于每个分块生成 3-5 个问题和对应的标准答案记录每个问题对应的相关文档 ID人工审核并修正错误的问题和答案1.4.3 评估数据集格式[ { question: RAG技术是什么时候提出的, gold_answer: 2020年由Facebook AI研究院提出, relevant_chunk_ids: [rag_guide_md_0] }, { question: RAG的工作流程分为哪几个步骤, gold_answer: RAG的工作流程分为三个核心步骤1. 文档处理2. 检索3. 生成, relevant_chunk_ids: [rag_guide_md_1] } ]二、检索与生成质量自动评估2.1 第一步构建第一个 RAG 评估数据集代码位置core/rag_evaluator.pyimport json import random from typing import List, Dict from pathlib import Path from langchain_core.documents import Document from core.rag_retriever import RAGRetriever from core.llm_factory import LLMFactory from config.settings import settings from utils.logger import logger class RAGEvaluator: RAG系统评估器 def __init__(self, retriever: RAGRetriever None): self.retriever retriever or RAGRetriever() self.llm LLMFactory.get_llm() self.dataset_path Path(./data/evaluation_dataset.json) def generate_evaluation_dataset(self, num_samples: int 100) - List[Dict]: 自动生成评估数据集 logger.info(f正在生成评估数据集样本数{num_samples}) # 获取所有文档分块 all_chunks self.retriever.vector_store.get()[documents] all_ids self.retriever.vector_store.get()[ids] # 随机抽取样本 samples random.sample(list(zip(all_ids, all_chunks)), min(num_samples, len(all_chunks))) dataset [] for chunk_id, chunk_content in samples: # 基于分块生成问题和答案 prompt f 基于以下文档内容生成3个不同的问题和对应的标准答案。 要求 1. 问题要清晰、具体、有意义 2. 答案必须完全来自文档内容 3. 输出格式为JSON数组每个元素包含question和answer字段 4. 只输出JSON不要添加任何其他内容 文档内容 {chunk_content} try: response self.llm.invoke(prompt) qa_pairs json.loads(response.content) for qa in qa_pairs: dataset.append({ question: qa[question], gold_answer: qa[answer], relevant_chunk_ids: [chunk_id] }) except Exception as e: logger.warning(f生成问题失败{e}) continue # 保存数据集 with open(self.dataset_path, w, encodingutf-8) as f: json.dump(dataset, f, ensure_asciiFalse, indent2) logger.info(f评估数据集生成完成共{len(dataset)}个样本) return dataset def load_evaluation_dataset(self) - List[Dict]: 加载评估数据集 if not self.dataset_path.exists(): logger.info(评估数据集不存在正在自动生成...) return self.generate_evaluation_dataset() with open(self.dataset_path, r, encodingutf-8) as f: return json.load(f)2.2 第二步检索质量自动评估# 在RAGEvaluator类中添加以下方法 def evaluate_retrieval(self, k_list: List[int] [1, 3, 5]) - Dict: 评估检索质量 dataset self.load_evaluation_dataset() logger.info(f开始检索质量评估共{len(dataset)}个样本) results {} for k in k_list: total_recall 0 total_precision 0 total_mrr 0 valid_samples 0 for sample in dataset: question sample[question] gold_ids set(sample[relevant_chunk_ids]) try: # 检索前k个结果 retrieved_docs self.retriever.retrieve(question, top_kk) retrieved_ids set([doc.metadata[chunk_id] for doc in retrieved_docs]) # 计算召回率 recall len(retrieved_ids gold_ids) / len(gold_ids) total_recall recall # 计算精确率 precision len(retrieved_ids gold_ids) / k total_precision precision # 计算MRR mrr 0 for i, doc in enumerate(retrieved_docs): if doc.metadata[chunk_id] in gold_ids: mrr 1 / (i 1) break total_mrr mrr valid_samples 1 except Exception as e: logger.warning(f评估样本失败{e}) continue if valid_samples 0: continue # 计算平均值 avg_recall total_recall / valid_samples avg_precision total_precision / valid_samples avg_mrr total_mrr / valid_samples avg_f1 2 * (avg_precision * avg_recall) / (avg_precision avg_recall) if (avg_precision avg_recall) 0 else 0 results[fk{k}] { recall: round(avg_recall, 4), precision: round(avg_precision, 4), f1: round(avg_f1, 4), mrr: round(avg_mrr, 4) } logger.info(检索质量评估完成) for k, metrics in results.items(): logger.info(f{k}: Recall{metrics[recall]}, Precision{metrics[precision]}, F1{metrics[f1]}, MRR{metrics[mrr]}) return results2.3 第三步生成质量自动评估# 在RAGEvaluator类中添加以下方法 def evaluate_generation(self) - Dict: 评估生成质量 dataset self.load_evaluation_dataset() logger.info(f开始生成质量评估共{len(dataset)}个样本) total_faithfulness 0 total_relevance 0 total_completeness 0 hallucination_count 0 valid_samples 0 for sample in dataset: question sample[question] gold_answer sample[gold_answer] try: # 获取检索上下文和生成回答 docs self.retriever.retrieve(question) context \n\n.join([doc.page_content for doc in docs]) answer self.llm.invoke(f基于以下上下文回答问题{question}\n上下文{context}).content # 用LLM评估生成质量 evaluation_prompt f 请评估以下回答的质量从三个维度打分0-1分 1. 忠实度回答是否完全基于上下文没有编造信息 2. 相关性回答是否与问题相关是否回答了问题 3. 完整性回答是否覆盖了问题的所有要点 问题{question} 上下文{context} 回答{answer} 输出格式为JSON包含faithfulness、relevance、completeness三个字段值为0-1的数字。 只输出JSON不要添加任何其他内容。 evaluation json.loads(self.llm.invoke(evaluation_prompt).content) total_faithfulness evaluation[faithfulness] total_relevance evaluation[relevance] total_completeness evaluation[completeness] if evaluation[faithfulness] 0.5: hallucination_count 1 valid_samples 1 except Exception as e: logger.warning(f评估样本失败{e}) continue if valid_samples 0: return {} # 计算平均值 avg_faithfulness total_faithfulness / valid_samples avg_relevance total_relevance / valid_samples avg_completeness total_completeness / valid_samples hallucination_rate hallucination_count / valid_samples results { faithfulness: round(avg_faithfulness, 4), relevance: round(avg_relevance, 4), completeness: round(avg_completeness, 4), hallucination_rate: round(hallucination_rate, 4) } logger.info(生成质量评估完成) logger.info(f忠实度{results[faithfulness]}, 相关性{results[relevance]}, 完整性{results[completeness]}, 幻觉率{results[hallucination_rate]}) return results2.4 第四步生成完整评估报告# 在RAGEvaluator类中添加以下方法 def generate_full_report(self) - Dict: 生成完整的评估报告 logger.info(正在生成完整评估报告...) report { timestamp: int(time.time()), retrieval_metrics: self.evaluate_retrieval(), generation_metrics: self.evaluate_generation(), system_config: { chunk_size: settings.chunk_size, chunk_overlap: settings.chunk_overlap, embedding_model: settings.embedding_model_name, retrieval_top_k: settings.retrieval_top_k, similarity_threshold: settings.retrieval_similarity_threshold, reranker_enabled: settings.reranker_enabled, reranker_model: settings.reranker_model_name } } # 保存报告 report_path Path(f./data/evaluation_report_{report[timestamp]}.json) with open(report_path, w, encodingutf-8) as f: json.dump(report, f, ensure_asciiFalse, indent2) logger.info(f完整评估报告已保存到{report_path}) return report2.5 测试from dotenv import load_dotenv load_dotenv() from core.rag_evaluator import RAGEvaluator from core.rag_service import RAGService import os def test_day6_production_rag(): print( 第6天RAG系统评估、调优与生产化测试\n) # 1. 初始化 evaluator RAGEvaluator() rag RAGService() # 2. 生成评估数据集 print( 正在生成评估数据集...) dataset evaluator.generate_evaluation_dataset(num_samples50) print(f✅ 生成了{len(dataset)}个评估样本\n) # 3. 检索质量评估 print( 正在进行检索质量评估...) retrieval_metrics evaluator.evaluate_retrieval() print(\n检索质量评估结果) for k, metrics in retrieval_metrics.items(): print(f {k}: Recall{metrics[recall]}, Precision{metrics[precision]}, F1{metrics[f1]}, MRR{metrics[mrr]}) # 4. 生成质量评估 print(\n✍️ 正在进行生成质量评估...) generation_metrics evaluator.evaluate_generation() print(\n生成质量评估结果) print(f 忠实度{generation_metrics[faithfulness]}) print(f 相关性{generation_metrics[relevance]}) print(f 完整性{generation_metrics[completeness]}) print(f 幻觉率{generation_metrics[hallucination_rate]}) # 5. 生成完整评估报告 print(\n 正在生成完整评估报告...) report evaluator.generate_full_report() print(f✅ 评估报告已保存到data/evaluation_report_{report[timestamp]}.json\n) if __name__ __main__: test_day6_production_rag()这时候先不要着急启动测试会反复调用大模型我们可以使用缓存将前面已经查询的结果缓存起来。2.6 多级缓存机制代码位置core/cache.pyimport json import hashlib from pathlib import Path from typing import Any, Optional from utils.logger import logger class LocalCache: 本地文件缓存轻量、无外部依赖 def __init__(self, cache_dir: str ./data/cache): self.cache_dir Path(cache_dir) self.cache_dir.mkdir(parentsTrue, exist_okTrue) def _get_cache_key(self, key: str) - str: 生成缓存键 return hashlib.md5(key.encode(utf-8)).hexdigest() def get(self, key: str) - Optional[Any]: 获取缓存 cache_key self._get_cache_key(key) cache_file self.cache_dir / f{cache_key}.json if not cache_file.exists(): return None try: with open(cache_file, r, encodingutf-8) as f: return json.load(f) except Exception as e: logger.warning(f读取缓存失败{e}) return None def set(self, key: str, value: Any, ttl: int 3600): 设置缓存ttl单位秒 cache_key self._get_cache_key(key) cache_file self.cache_dir / f{cache_key}.json try: with open(cache_file, w, encodingutf-8) as f: json.dump({ value: value, expire_time: int(time.time()) ttl }, f, ensure_asciiFalse) except Exception as e: logger.warning(f写入缓存失败{e}) def get_or_set(self, key: str, func, ttl: int 3600) - Any: 获取缓存如果不存在则执行函数并缓存结果 value self.get(key) if value is not None and value[expire_time] time.time(): return value[value] value func() self.set(key, value, ttl) return value # 全局缓存实例 query_cache LocalCache(./data/cache/query) embedding_cache LocalCache(./data/cache/embedding) retrieval_cache LocalCache(./data/cache/retrieval)集成到 RAGRetriever# 在RAGRetriever.__init__中添加 from core.cache import query_cache, embedding_cache, retrieval_cache # 修改retrieve方法 def retrieve(self, query: str, top_k: int None, similarity_threshold: float None, filter: dict None): # 缓存键 cache_key f{query}_{top_k}_{similarity_threshold}_{str(filter)} # 尝试从缓存获取 cached retrieval_cache.get(cache_key) if cached is not None and cached[expire_time] time.time(): logger.debug(f从缓存获取检索结果{query}) return [Document(**doc) for doc in cached[value]] # 原检索逻辑... # 缓存结果 retrieval_cache.set(cache_key, [doc.dict() for doc in final_docs], ttl3600) return final_docs上面的缓存键保留了top_k,所以缓存的时候会有很多缓存文件评估过程中每个问题在评估过程中被调用 4 次 优化如果想减少缓存文件可以修改缓存键策略不包含 top_k # 缓存键不包含 top_k提高缓存命中率 cache_key f{query}_{similarity_threshold}_{str(filter)}获取时# 尝试从缓存获取 cached retrieval_cache.get(cache_key) if cached is not None and cached[expire_time] time.time(): logger.debug(f从缓存获取检索结果{query}) all_cached_docs [Document(**doc) for doc in cached[value]] return all_cached_docs[:top_k] if top_k else all_cached_docs

相关新闻