:向量存储与检索系统)
一、向量检索是 RAG 系统的心脏1.1 为什么需要向量数据库传统关键词检索的局限性只能匹配字面相同的词无法理解语义相似性无法处理同义词、近义词、多义词对长文本和复杂查询的效果差向量检索的核心优势语义理解将文本转换为高维向量相似语义的文本在向量空间中距离更近模糊匹配支持同义词、近义词、上下文相关的匹配高效检索支持百万级甚至亿级向量的毫秒级相似性搜索1.2 向量检索的完整流程文本分块 → 嵌入模型 → 向量生成 → 向量数据库存储用户查询 → 查询向量生成 → 相似性搜索 → 返回Top-K相关分块1.3 核心概念详解1.3.1 嵌入模型Embedding Model作用将非结构化的文本转换为固定长度的稠密向量核心指标维度通常为 768/1024/1536、语义准确性、速度、支持的语言主流中文嵌入模型BGE 系列、M3E 系列、text2vec 系列1.3.2 相似度计算方法方法原理适用场景余弦相似度Cosine Similarity计算两个向量之间的夹角余弦值文本语义相似度最常用欧氏距离Euclidean Distance计算两个向量之间的直线距离数值型数据、聚类点积Dot Product计算两个向量的点积归一化后的向量速度最快1.3.3 向量索引类型Flat 索引暴力搜索精度 100%速度慢适合小数据集10 万条IVF 索引倒排文件索引速度快精度略有损失适合中等数据集10 万 - 1000 万条HNSW 索引层次化导航小世界图速度最快精度高内存占用大适合大规模数据集1000 万条1.4 主流向量数据库对比数据库部署方式开源性能适用场景Chroma嵌入式✅一般开发、原型、小项目FAISS嵌入式✅优秀大规模数据、高性能需求Pinecone云服务❌优秀生产环境、无需运维Milvus分布式✅优秀企业级、大规模生产环境Qdrant分布式 / 嵌入式✅优秀生产环境、功能丰富二、LangChain 向量存储核心 API 详解2.1 嵌入模型接口Embeddings所有嵌入模型都实现了统一的Embeddings接口embed_documents(texts: list[str])批量嵌入多个文档embed_query(text: str)嵌入单个查询文本2.1.1 OpenAI 兼容嵌入模型豆包from langchain_openai import OpenAIEmbeddings # 豆包嵌入模型完全兼容OpenAI接口 embeddings OpenAIEmbeddings( api_key你的豆包API密钥, base_urlhttps://ark.cn-beijing.volces.com/api/v3, modelbge-large-zh-v1.5 # 中文效果最好的开源嵌入模型 ) # 嵌入查询 query_vector embeddings.embed_query(什么是RAG技术) print(f向量维度{len(query_vector)}) # 批量嵌入文档 documents [RAG是检索增强生成技术, 大语言模型可以生成文本] doc_vectors embeddings.embed_documents(documents) print(f嵌入了{len(doc_vectors)}个文档)2.1.2 本地嵌入模型HuggingFacefrom langchain_huggingface import HuggingFaceEmbeddings # 本地BGE中文嵌入模型 embeddings HuggingFaceEmbeddings( model_nameBAAI/bge-large-zh-v1.5, model_kwargs{device: cpu}, # 有GPU可以改为cuda encode_kwargs{normalize_embeddings: True} )2.2 Chroma 向量数据库入门首选轻量级嵌入式向量数据库无需额外部署适合开发和原型。基本用法from langchain_chroma import Chroma from langchain_core.documents import Document # 初始化向量数据库 vector_store Chroma( collection_namerag_collection, embedding_functionembeddings, persist_directory./data/chroma_db # 持久化存储路径 ) # 添加单个文档 doc Document( page_contentRAG技术于2020年由Facebook提出, metadata{source: test.md, page: 1} ) vector_store.add_documents([doc]) # 批量添加文档 documents [ Document(page_contentRAG可以解决大模型的幻觉问题, metadata{source: test.md, page: 2}), Document(page_content向量数据库用于存储文本嵌入向量, metadata{source: test.md, page: 3}) ] vector_store.add_documents(documents) # 相似性搜索 results vector_store.similarity_search(RAG解决了什么问题, k2) for doc in results: print(f内容{doc.page_content}) print(f来源{doc.metadata[source]}) # 带分数的相似性搜索分数越小越相似 results_with_scores vector_store.similarity_search_with_score(什么是向量数据库, k2) for doc, score in results_with_scores: print(f内容{doc.page_content}相似度分数{score:.4f})元数据过滤# 只搜索来自test.md第1页的文档 results vector_store.similarity_search( RAG是什么时候提出的, k2, filter{source: test.md, page: 1} )持久化与加载# 数据会自动持久化到persist_directory # 重新加载数据库 vector_store Chroma( collection_namerag_collection, embedding_functionembeddings, persist_directory./data/chroma_db ) # 删除集合 Chroma.delete_collection(rag_collection, persist_directory./data/chroma_db)2.3 FAISS 向量数据库高性能首选Facebook 开发的高性能向量数据库适合大规模数据。from langchain_community.vectorstores import FAISS # 初始化FAISS vector_store FAISS.from_documents(documents, embeddings) # 持久化 vector_store.save_local(./data/faiss_db) # 加载 vector_store FAISS.load_local( ./data/faiss_db, embeddings, allow_dangerous_deserializationTrue ) # 相似性搜索 results vector_store.similarity_search(什么是RAG, k2)三、工业级向量检索最佳实践3.1 嵌入模型选择指南场景推荐模型维度特点中文通用场景BAAI/bge-large-zh-v1.51024综合效果最好中文轻量场景BAAI/bge-small-zh-v1.5512速度快效果不错多语言场景BAAI/bge-m31024支持 100 语言远程 API豆包 bge-large-zh-v1.51024无需本地部署速度快3.2 检索参数调优top_k返回的结果数量通常为 3-10太小丢失相关信息太大引入太多噪声增加大模型处理成本相似度阈值过滤掉相似度低于阈值的结果余弦相似度通常设置为 0.7-0.8欧氏距离通常设置为 0.3-0.5批量大小批量嵌入时的批次大小通常为 32-1283.3 检索结果后处理去重去除重复的文档片段过滤根据元数据过滤不需要的结果排序根据相似度分数和元数据进行二次排序截断如果结果太长截断到合适的长度3.4 向量数据库性能优化批量导入尽量使用批量导入而不是单个添加索引优化根据数据量选择合适的索引类型硬件加速使用 GPU 进行嵌入和检索分片存储大规模数据可以分片存储在多个节点上四、项目整合实现工业级 RAG 检索系统4.1 第一步新增依赖在requirements.txt中添加向量存储相关依赖# 向量存储依赖langchain-chromalangchain-communityfaiss-cpu # Windows/Linux CPU版本# faiss-gpu # 有GPU可以使用这个版本sentence-transformers # 本地嵌入模型所需pip install -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple4.2 第二步新增配置项在config/settings.py中添加向量存储配置# 向量存储配置 vector_store_type: str Field(defaultchroma, description向量数据库类型chroma/faiss) vector_store_path: Path Field(defaultPath(./data/vector_db), description向量数据库存储路径) vector_collection_name: str Field(defaultrag_collection, description向量集合名称) # 嵌入模型配置 embedding_provider: str Field(defaultdoubao, description嵌入模型提供商doubao/huggingface) embedding_model_name: str Field(defaultbge-large-zh-v1.5, description嵌入模型名称) embedding_device: str Field(defaultcpu, description嵌入模型运行设备cpu/cuda) # 检索配置 retrieval_top_k: int Field(default3, description检索返回的文档数量) retrieval_similarity_threshold: float Field(default0.7, description相似度阈值)4.3 第三步实现向量存储工厂在core/目录下创建vector_store_factory.pyfrom typing import List, Optional from langchain_core.documents import Document from langchain_core.vectorstores import VectorStore from core.vector_store_factory import VectorStoreFactory from core.document_processor import DocumentProcessor from config.settings import settings from utils.logger import logger from utils.exceptions import RetrievalError class RAGRetriever: RAG检索器封装文档处理和向量检索 def __init__( self, vector_store: Optional[VectorStore] None, document_processor: Optional[DocumentProcessor] None ): self.vector_store vector_store or VectorStoreFactory.get_vector_store() self.document_processor document_processor or DocumentProcessor() logger.info(✅ RAG检索器初始化完成) def add_document(self, file_path: str) - int: 添加单个文档到向量数据库 :param file_path: 文档路径 :return: 添加的分块数量 try: logger.info(f添加文档到知识库{file_path}) # 处理文档 chunks self.document_processor.process_file(file_path) if not chunks: logger.warning(f文档{file_path}没有有效内容) return 0 # 添加到向量数据库 self.vector_store.add_documents(chunks) logger.info(f✅ 文档添加成功共添加{len(chunks)}个分块) # 如果是FAISS保存索引 if settings.vector_store_type faiss: VectorStoreFactory.save_faiss_index(self.vector_store) return len(chunks) except Exception as e: raise RetrievalError(f添加文档失败{str(e)}) from e def add_directory(self, dir_path: str, recursive: bool False) - int: 批量添加目录下的所有文档 :param dir_path: 目录路径 :param recursive: 是否递归处理子目录 :return: 添加的总分块数量 try: logger.info(f批量添加目录到知识库{dir_path}) chunks self.document_processor.process_directory(dir_path, recursive) if not chunks: logger.warning(f目录{dir_path}没有有效文档) return 0 # 批量添加到向量数据库 self.vector_store.add_documents(chunks) logger.info(f✅ 批量添加成功共添加{len(chunks)}个分块) # 如果是FAISS保存索引 if settings.vector_store_type faiss: VectorStoreFactory.save_faiss_index(self.vector_store) return len(chunks) except Exception as e: raise RetrievalError(f批量添加文档失败{str(e)}) from e def retrieve( self, query: str, top_k: int None, similarity_threshold: float None, filter: dict None ) - List[Document]: 检索相关文档 :param query: 用户查询 :param top_k: 返回结果数量默认使用settings配置 :param similarity_threshold: 相似度阈值默认使用settings配置 :param filter: 元数据过滤条件 :return: 相关文档列表 try: top_k top_k or settings.retrieval_top_k similarity_threshold similarity_threshold or settings.retrieval_similarity_threshold logger.debug(f检索查询{query}top_k{top_k}阈值{similarity_threshold}) # 带分数的相似性搜索 results_with_scores self.vector_store.similarity_search_with_score( queryquery, ktop_k, filterfilter ) # 过滤相似度低于阈值的结果 filtered_results [] for doc, score in results_with_scores: # Chroma返回的是余弦距离0-20表示完全相同转换为相似度分数0-1 # 余弦距离转相似度similarity 1 - (distance / 2) # 这适用于 ChromaDB 默认使用的余弦相似度 similarity max(0.0, 1.0 - score / 2.0) doc.metadata[similarity_score] similarity logger.debug(f文档相似度{similarity:.4f}阈值{similarity_threshold}) if similarity similarity_threshold: filtered_results.append(doc) logger.debug(f检索完成找到{len(filtered_results)}个相关文档) return filtered_results except Exception as e: raise RetrievalError(f检索失败{str(e)}) from e def get_document_count(self) - int: 获取向量数据库中的文档数量 try: return self.vector_store._collection.count() except: # FAISS不支持直接获取数量返回-1 return -1 def clear_knowledge_base(self): 清空知识库 try: VectorStoreFactory.delete_collection() # 重新初始化向量存储 self.vector_store VectorStoreFactory.get_vector_store() logger.info(✅ 知识库已清空) except Exception as e: raise RetrievalError(f清空知识库失败{str(e)}) from e4.4 第四步实现 RAG 检索器在core/目录下创建rag_retriever.pyfrom typing import List, Optional from langchain_core.documents import Document from langchain_core.vectorstores import VectorStore from core.vector_store_factory import VectorStoreFactory from core.document_processor import DocumentProcessor from config.settings import settings from utils.logger import logger from utils.exceptions import RetrievalError class RAGRetriever: RAG检索器封装文档处理和向量检索 def __init__( self, vector_store: Optional[VectorStore] None, document_processor: Optional[DocumentProcessor] None ): self.vector_store vector_store or VectorStoreFactory.get_vector_store() self.document_processor document_processor or DocumentProcessor() logger.info(✅ RAG检索器初始化完成) def add_document(self, file_path: str) - int: 添加单个文档到向量数据库 :param file_path: 文档路径 :return: 添加的分块数量 try: logger.info(f添加文档到知识库{file_path}) # 处理文档 chunks self.document_processor.process_file(file_path) if not chunks: logger.warning(f文档{file_path}没有有效内容) return 0 # 添加到向量数据库 self.vector_store.add_documents(chunks) logger.info(f✅ 文档添加成功共添加{len(chunks)}个分块) # 如果是FAISS保存索引 if settings.vector_store_type faiss: VectorStoreFactory.save_faiss_index(self.vector_store) return len(chunks) except Exception as e: raise RetrievalError(f添加文档失败{str(e)}) from e def add_directory(self, dir_path: str, recursive: bool False) - int: 批量添加目录下的所有文档 :param dir_path: 目录路径 :param recursive: 是否递归处理子目录 :return: 添加的总分块数量 try: logger.info(f批量添加目录到知识库{dir_path}) chunks self.document_processor.process_directory(dir_path, recursive) if not chunks: logger.warning(f目录{dir_path}没有有效文档) return 0 # 批量添加到向量数据库 self.vector_store.add_documents(chunks) logger.info(f✅ 批量添加成功共添加{len(chunks)}个分块) # 如果是FAISS保存索引 if settings.vector_store_type faiss: VectorStoreFactory.save_faiss_index(self.vector_store) return len(chunks) except Exception as e: raise RetrievalError(f批量添加文档失败{str(e)}) from e def retrieve( self, query: str, top_k: int None, similarity_threshold: float None, filter: dict None ) - List[Document]: 检索相关文档 :param query: 用户查询 :param top_k: 返回结果数量默认使用settings配置 :param similarity_threshold: 相似度阈值默认使用settings配置 :param filter: 元数据过滤条件 :return: 相关文档列表 try: top_k top_k or settings.retrieval_top_k similarity_threshold similarity_threshold or settings.retrieval_similarity_threshold logger.debug(f检索查询{query}top_k{top_k}阈值{similarity_threshold}) # 带分数的相似性搜索 results_with_scores self.vector_store.similarity_search_with_score( queryquery, ktop_k, filterfilter ) # 过滤相似度低于阈值的结果 filtered_results [] for doc, score in results_with_scores: # Chroma返回的是L2距离越小越相似转换为相似度分数0-1 similarity 1.0 / (1.0 score) if similarity similarity_threshold: doc.metadata[similarity_score] similarity filtered_results.append(doc) logger.debug(f检索完成找到{len(filtered_results)}个相关文档) return filtered_results except Exception as e: raise RetrievalError(f检索失败{str(e)}) from e def get_document_count(self) - int: 获取向量数据库中的文档数量 try: return self.vector_store._collection.count() except: # FAISS不支持直接获取数量返回-1 return -1 def clear_knowledge_base(self): 清空知识库 try: VectorStoreFactory.delete_collection() # 重新初始化向量存储 self.vector_store VectorStoreFactory.get_vector_store() logger.info(✅ 知识库已清空) except Exception as e: raise RetrievalError(f清空知识库失败{str(e)}) from e4.5 第五步实现完整 RAG 问答服务在core/目录下创建rag_service.pyfrom typing import Iterator, AsyncIterator from core.rag_retriever import RAGRetriever from core.prompt_service import PromptService from config.settings import settings from utils.logger import logger class RAGService: 完整的RAG问答服务 def __init__(self): self.retriever RAGRetriever() logger.info(✅ RAG问答服务初始化完成) def query( self, question: str, top_k: int None, similarity_threshold: float None, use_anti_hallucination: bool True ) - str: 同步问答 :param question: 用户问题 :param top_k: 检索结果数量 :param similarity_threshold: 相似度阈值 :param use_anti_hallucination: 是否使用反幻觉提示模板 :return: 生成的回答 try: logger.info(f用户问题{question}) # 1. 检索相关文档 docs self.retriever.retrieve(question, top_k, similarity_threshold) if not docs: return 抱歉知识库中没有找到相关信息无法回答您的问题。 # 2. 构建上下文 context \n\n.join([f[{i1}] {doc.page_content} for i, doc in enumerate(docs)]) logger.debug(f检索到的上下文{context[:500]}...) # 3. 选择提示模板 template_version v2.0 if use_anti_hallucination else v1.0 # 4. 生成回答 answer PromptService.generate( template_namerag_answer, versiontemplate_version, inputs{ context: context, question: question } ) logger.info(f生成回答完成{answer[:200]}...) return answer except Exception as e: logger.error(f问答失败{str(e)}, exc_infoTrue) return 抱歉回答生成时发生错误请稍后重试。 def stream_query( self, question: str, top_k: int None, similarity_threshold: float None, use_anti_hallucination: bool True ) - Iterator[str]: 流式问答 try: logger.info(f用户流式问题{question}) # 1. 检索相关文档 docs self.retriever.retrieve(question, top_k, similarity_threshold) if not docs: yield 抱歉知识库中没有找到相关信息无法回答您的问题。 return # 2. 构建上下文 context \n\n.join([f[{i1}] {doc.page_content} for i, doc in enumerate(docs)]) # 3. 选择提示模板 template_version v2.0 if use_anti_hallucination else v1.0 # 4. 流式生成回答 yield from PromptService.stream_generate( template_namerag_answer, versiontemplate_version, inputs{ context: context, question: question } ) except Exception as e: logger.error(f流式问答失败{str(e)}, exc_infoTrue) yield 抱歉回答生成时发生错误请稍后重试。4.6 第六步更新项目入口main.pyfrom dotenv import load_dotenv load_dotenv() from core.rag_service import RAGService import os def test_rag_system(): 测试完整RAG系统 print( 第4天向量存储与检索系统测试\n) # 1. 初始化RAG服务 rag RAGService() # 2. 创建测试文档 os.makedirs(data, exist_okTrue) test_text # RAG技术详解 ## 什么是RAG RAG检索增强生成是一种将外部知识库检索与大模型生成相结合的技术。 它于2020年由Facebook AI研究院提出旨在解决大模型的知识过时和幻觉问题。 ## RAG的工作原理 RAG系统的工作流程分为三个核心步骤 1. **文档处理**将原始文档切割成小块转换为向量存储到向量数据库 2. **检索**根据用户查询从向量数据库中检索最相关的文档片段 3. **生成**将检索到的文档片段和用户查询一起喂给大模型生成回答 ## RAG的核心优势 - ✅ **知识实时更新**不需要重新训练模型只需更新知识库 - ✅ **减少幻觉**回答完全基于真实的文档内容 - ✅ **可解释性强**可以追溯回答的具体来源 - ✅ **成本低廉**比微调大模型便宜90%以上 with open(data/rag_guide.md, w, encodingutf-8) as f: f.write(test_text) # 3. 添加文档到知识库 print( 添加文档到知识库 ) chunk_count rag.retriever.add_document(data/rag_guide.md) print(f✅ 添加了{chunk_count}个分块到知识库) print(f知识库总文档数{rag.retriever.get_document_count()}\n) # 4. 测试问答 print( 测试RAG问答 ) questions [ RAG技术是什么时候提出的, RAG的工作流程分为哪几个步骤, RAG有哪些核心优势, 大模型的幻觉问题是什么 ] for i, question in enumerate(questions): print(f\n问题{i1}{question}) answer rag.query(question) print(f回答{answer}) # 5. 测试流式问答 print(\n *60) print( 测试流式问答 ) question 用自己的话解释什么是RAG技术 print(f问题{question}) print(回答, end, flushTrue) for chunk in rag.stream_query(question): print(chunk, end, flushTrue) print(\n) print( 所有RAG系统测试通过) if __name__ __main__: test_rag_system()