
RAG 系统构建实战从零到生产级别的检索增强生成前言RAGRetrieval-Augmented Generation检索增强生成是当前大模型应用的主流架构之一。它通过结合外部知识库来弥补大模型知识陈旧、容易产生幻觉等问题。我之前负责的一个企业知识库问答系统就是基于 RAG 架构构建的从最初的原型到生产级别的系统踩过了不少坑。今天想系统性地分享 RAG 系统的构建经验包括架构设计、核心组件、常见问题与优化策略。RAG 核心原理RAG 的核心思想很直观在生成回答之前先从外部知识库检索相关信息然后用这些信息来增强模型的生成能力。用户问题 → 检索 → 相关文档 → 拼接上下文 → LLM 生成 → 最终回答这种方式有几个关键优势知识时效性可以随时更新知识库不需要重新训练模型可解释性答案的来源可以追溯到具体文档成本效率相比微调成本更低减少幻觉模型主要基于检索到的内容回答系统架构一个完整的 RAG 系统包含以下组件┌─────────────────────────────────────────────────────────┐ │ RAG 系统架构 │ ├─────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 数据摄入 │ │ 查询处理 │ │ │ │ (Ingestion) │ │ (Query) │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌──────────────┐ ┌──────────────┐ │ │ │ 文档解析 │ │ 向量检索 │ │ │ │ 文本分块 │ │ 重排序 │ │ │ │ Embedding │ │ 上下文组装 │ │ │ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ ▼ ▼ │ │ ┌──────────────────────────────────┐ │ │ │ 向量数据库 │ │ │ │ (Milvus/Qdrant/Pinecone) │ │ │ └──────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────┐ │ │ │ LLM 生成模块 │ │ │ └──────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────┘数据摄入流程文档解析from dataclasses import dataclass from typing import List, Optional import re dataclass class Document: 文档数据结构 id: str content: str metadata: dict class DocumentParser: 文档解析器 def parse(self, file_path: str) - Document: 解析文档 if file_path.endswith(.pdf): return self._parse_pdf(file_path) elif file_path.endswith(.docx): return self._parse_docx(file_path) elif file_path.endswith(.txt): return self._parse_txt(file_path) else: raise ValueError(fUnsupported file type: {file_path}) def _parse_pdf(self, file_path: str) - Document: 解析 PDF from pypdf import PdfReader reader PdfReader(file_path) text_parts [] for page in reader.pages: text page.extract_text() text_parts.append(text) content \n\n.join(text_parts) return Document( idfile_path, contentcontent, metadata{ source: file_path, pages: len(reader.pages) } ) def _parse_docx(self, file_path: str) - Document: 解析 Word 文档 from docx import Document as DocxDocument doc DocxDocument(file_path) paragraphs [p.text for p in doc.paragraphs if p.text.strip()] content \n\n.join(paragraphs) return Document( idfile_path, contentcontent, metadata{source: file_path} ) def _parse_txt(self, file_path: str) - Document: 解析文本文件 with open(file_path, r, encodingutf-8) as f: content f.read() return Document( idfile_path, contentcontent, metadata{source: file_path} )文本分块文本分块Chunking是 RAG 中非常关键的步骤分块策略直接影响检索效果class TextSplitter: 文本分块器 def __init__( self, chunk_size: int 500, chunk_overlap: int 50, min_chunk_size: int 50 ): self.chunk_size chunk_size self.chunk_overlap chunk_overlap self.min_chunk_size min_chunk_size def split_text(self, text: str, metadata: dict None) - List[Document]: 分割文本为多个 chunk if not text or not text.strip(): return [] # 尝试按段落分割 paragraphs self._split_by_paragraph(text) chunks [] current_chunk current_size 0 for para in paragraphs: para_size len(para) # 如果段落本身就超过 chunk_size需要进一步分割 if para_size self.chunk_size: # 先保存当前 chunk if current_chunk: chunks.append(self._create_chunk(current_chunk, metadata)) current_chunk current_size 0 # 分割大段落 sub_chunks self._split_long_paragraph(para, metadata) chunks.extend(sub_chunks) continue # 检查是否需要开始新 chunk if current_size para_size self.chunk_size: if current_size self.min_chunk_size: chunks.append(self._create_chunk(current_chunk, metadata)) # 带 overlap 的滑动窗口 current_chunk current_chunk[-self.chunk_overlap:] \n para current_size len(current_chunk) else: current_chunk \n para current_size para_size else: current_chunk \n para current_size para_size # 处理最后一个 chunk if current_chunk and len(current_chunk) self.min_chunk_size: chunks.append(self._create_chunk(current_chunk, metadata)) return chunks def _split_by_paragraph(self, text: str) - List[str]: 按段落分割 # 尝试多种分隔符 for sep in [\n\n, \n, . , 。]: if sep in text: parts text.split(sep) # 过滤空段落 return [p.strip() for p in parts if p.strip()] return [text.strip()] def _split_long_paragraph(self, text: str, metadata: dict) - List[Document]: 分割过长的段落 chunks [] start 0 while start len(text): end start self.chunk_size chunk_text text[start:end] # 尝试在句子边界分割 if end len(text): sentence_breaks [ chunk_text.rfind(。), chunk_text.rfind(.), chunk_text.rfind(?), chunk_text.rfind(!), chunk_text.rfind(\n) ] for br in sorted(sentence_breaks, reverseTrue): if br self.chunk_size // 2: end start br 1 break chunks.append(self._create_chunk(text[start:end], metadata)) start end - self.chunk_overlap return chunks def _create_chunk(self, content: str, metadata: dict) - Document: 创建 chunk 文档 import hashlib chunk_id hashlib.md5(content.encode()).hexdigest() return Document( idchunk_id, contentcontent.strip(), metadatametadata or {} )Embedding 与存储class EmbeddingPipeline: Embedding 处理流水线 def __init__(self, model_name: str BAAI/bge-large-zh): from transformers import AutoModel, AutoTokenizer import torch self.tokenizer AutoTokenizer.from_pretrained(model_name) self.model AutoModel.from_pretrained(model_name) self.model.eval() def encode(self, texts: List[str], batch_size: int 32) - List[List[float]]: 编码文本为向量 embeddings [] for i in range(0, len(texts), batch_size): batch texts[i:i batch_size] inputs self.tokenizer( batch, paddingTrue, truncationTrue, max_length512, return_tensorspt ) with torch.no_grad(): outputs self.model(**inputs) # Mean pooling attention_mask inputs[attention_mask] token_embeddings outputs.last_hidden_state input_mask_expanded attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() embeddings_tensor torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min1e-9) # L2 归一化 embeddings_tensor torch.nn.functional.normalize(embeddings_tensor, p2, dim1) embeddings.extend(embeddings_tensor.numpy().tolist()) return embeddings class VectorStore: 向量存储 def __init__(self, store_type: str qdrant): self.store_type store_type if store_type qdrant: from qdrant_client import QdrantClient self.client QdrantClient(hostlocalhost, port6333) elif store_type milvus: # Milvus 连接代码 pass def upsert(self, collection_name: str, documents: List[Document], embeddings: List[List[float]]): 批量插入文档 if self.store_type qdrant: from qdrant_client.models import PointStruct points [ PointStruct( idi, vectoremb, payload{ content: doc.content, metadata: doc.metadata } ) for i, (doc, emb) in enumerate(zip(documents, embeddings)) ] self.client.upsert( collection_namecollection_name, pointspoints )查询流程检索class RAGRetriever: RAG 检索器 def __init__( self, vector_store: VectorStore, embedding_pipeline: EmbeddingPipeline, collection_name: str ): self.vector_store vector_store self.embedding_pipeline embedding_pipeline self.collection_name collection_name def retrieve( self, query: str, top_k: int 5, score_threshold: float 0.5 ) - List[dict]: 检索相关文档 # 1. 查询向量 query_embedding self.embedding_pipeline.encode([query])[0] # 2. 检索 search_results self.vector_store.client.search( collection_nameself.collection_name, query_vectorquery_embedding, limittop_k ) # 3. 过滤和格式化 results [] for result in search_results: if result.score score_threshold: results.append({ content: result.payload[content], metadata: result.payload.get(metadata, {}), score: result.score }) return results重排序初始检索的结果可能不够精确需要进行重排序class Reranker: 文档重排序 def __init__(self, model_name: str BAAI/bge-reranker-base): from transformers import AutoModelForSequenceClassification, AutoTokenizer import torch self.tokenizer AutoTokenizer.from_pretrained(model_name) self.model AutoModelForSequenceClassification.from_pretrained(model_name) self.model.eval() def rerank( self, query: str, documents: List[dict], top_k: int 3 ) - List[dict]: 重排序文档 if not documents: return [] # 构建文本对 pairs [(query, doc[content]) for doc in documents] # 批量编码 inputs self.tokenizer( pairs, paddingTrue, truncationTrue, max_length512, return_tensorspt ) with torch.no_grad(): scores self.model(**inputs).logits.squeeze(-1) # 按分数排序 scored_docs [ (doc, score.item()) for doc, score in zip(documents, scores) ] scored_docs.sort(keylambda x: x[1], reverseTrue) return [ {**doc, rerank_score: score} for doc, score in scored_docs[:top_k] ]上下文组装class ContextBuilder: 上下文组装器 def __init__(self, max_context_length: int 3000): self.max_context_length max_context_length def build_context( self, query: str, retrieved_docs: List[dict], include_metadata: bool True ) - str: 组装检索结果为上下文 context_parts [f问题{query}\n] current_length len(context_parts[0]) for i, doc in enumerate(retrieved_docs, 1): doc_text doc[content] doc_length len(doc_text) # 检查是否超出长度限制 if current_length doc_length 50 self.max_context_length: # 尝试截断 remaining self.max_context_length - current_length - 50 if remaining 100: doc_text doc_text[:remaining] ... else: break # 添加来源信息 if include_metadata and doc.get(metadata): source doc[metadata].get(source, 未知来源) doc_text f[文档 {i}] (来源: {source})\n{doc_text} else: doc_text f[文档 {i}]\n{doc_text} context_parts.append(doc_text) current_length len(doc_text) return \n\n.join(context_parts)生成模块class RAGGenerator: RAG 生成器 def __init__(self, llm_model): self.llm_model llm_model def generate( self, query: str, context: str, system_prompt: str None ) - str: 生成回答 if system_prompt is None: system_prompt 你是一个知识库问答助手。你的任务是根据提供的参考资料回答用户的问题。 要求 1. 只根据参考资料中的信息回答不要编造信息 2. 如果参考资料中没有相关信息请明确告知用户 3. 回答要准确、清晰、有条理 4. 如果有多条参考资料综合它们的信息给出完整回答 5. 在回答末尾标注参考了哪些文档 格式要求 - 使用中文回答 - 重要的信息点要突出 - 保持回答的连贯性 prompt f参考资料 {context} --- 用户问题{query} 请根据以上参考资料回答用户的问题 response self.llm_model.generate( promptprompt, system_promptsystem_prompt ) return response完整 RAG Pipelineclass RAGPipeline: 完整的 RAG Pipeline def __init__( self, llm_model, vector_store: VectorStore, embedding_pipeline: EmbeddingPipeline, reranker: Reranker None, collection_name: str knowledge_base ): self.llm_model llm_model self.retriever RAGRetriever(vector_store, embedding_pipeline, collection_name) self.reranker reranker self.context_builder ContextBuilder() self.generator RAGGenerator(llm_model) def ingest(self, documents: List[Document]): 摄入文档 # 分块 splitter TextSplitter() chunks [] for doc in documents: doc_chunks splitter.split_text(doc.content, doc.metadata) chunks.extend(doc_chunks) # Embedding embeddings self.embedding_pipeline.encode([c.content for c in chunks]) # 存储 self.retriever.vector_store.upsert( self.retriever.collection_name, chunks, embeddings ) def query(self, question: str, top_k: int 5, use_rerank: bool True) - dict: 查询 # 1. 检索 retrieved self.retriever.retrieve(question, top_ktop_k * 2 if use_rerank else top_k) # 2. 重排序 if use_rerank and self.reranker: retrieved self.reranker.rerank(question, retrieved, top_ktop_k) # 3. 组装上下文 context self.context_builder.build_context(question, retrieved) # 4. 生成 answer self.generator.generate(question, context) return { question: question, answer: answer, sources: [ { content: doc[content][:200] ..., score: doc.get(score, doc.get(rerank_score, 0)) } for doc in retrieved ] }常见问题与优化问题 1检索不到相关内容可能原因分块策略不当重要信息被切断Embedding 模型不匹配中文 vs 英文向量维度不匹配解决方案尝试不同的 chunk_size使用针对中文优化的 Embedding 模型检查向量数据库的配置问题 2上下文超出限制解决方案class SmartContextBuilder(ContextBuilder): 智能上下文组装器 def build_context(self, query, documents, max_context_lengthNone): max_context max_context_length or self.max_context_length # 策略优先保留高分文档对低分文档进行摘要 if len(documents) 3: high_score_docs [d for d in documents if d[score] 0.8] low_score_docs [d for d in documents if d[score] 0.8] # 对低分文档进行摘要 for doc in low_score_docs: doc[content] self._summarize(doc[content], max_words100)问题 3生成内容与检索内容不相关解决方案加强 system prompt增加验证步骤使用更精确的重排序模型总结构建一个生产级别的 RAG 系统需要关注以下几个关键点数据处理文档解析要全分块策略要合理检索优化选择合适的 Embedding 模型必要时使用重排序上下文组装合理控制长度保留关键信息生成质量设计好的 prompt进行输出验证持续迭代根据实际效果不断调整优化RAG 不是一次性就能做好的系统需要在实践中不断优化。希望这篇分享对大家有帮助。