02-LlamaIndex数据连接:多源数据加载与Data Connectors生态

发布时间:2026/7/4 21:11:06

02-LlamaIndex数据连接:多源数据加载与Data Connectors生态 02-LlamaIndex数据连接多源数据加载与Data Connectors生态系列导航01 核心概念与RAG处理管线02 多源数据加载与Data Connectors ← 当前03 文本分块策略与NodeParser04 向量存储与混合索引策略05 Retriever、Query Engine与Chat Engine06 Agent与Workflow编排07 多模态与企业级部署08 生态集成与LLM协同关键字LlamaIndex、Data Connector、SimpleDirectoryReader、数据加载器、元数据增强、企业级数据接入、增量同步、数据脱敏做过知识库项目的人都有这样的经历用户的需求永远是把我们的所有文档都接进来——Wiki页面、Confluence知识库、飞书文档、数据库里的运维记录、S3上的报告PDF、还有一堆散落在各处的Markdown笔记。每一类数据源都有自己的认证方式、文件格式、增量更新机制接起来比搭RAG本身还累。LlamaIndex的Data Connectors生态就是冲着这个问题来的。它用统一的Reader接口把100多种数据源抽象成一样的Document输出让你不用关心数据从哪来的差异只需关注数据怎么处理。本文从工程视角详细拆解这套连接体系。一、数据连接器的架构设计LlamaIndex把数据加载抽象为一个清晰的读取-解析-输出三层模型┌──────────────────────────────────────────────────────────────────┐ │ Data Connector 三层模型 │ ├──────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 第一层数据源适配器 (Source Adapters) │ │ │ │ │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ │ │ 本地文件 │ │ 数据库 │ │ 云存储 │ │ SaaS平台 │ │ │ │ │ │ PDF/MD/ │ │ PG/Mongo │ │ S3/GCS/ │ │ Notion/ │ │ │ │ │ │ DOCX/CSV │ │ Redis │ │ Azure │ │ Confluence│ │ │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ └───────┼────────────┼────────────┼────────────┼──────────────┘ │ │ │ │ │ │ │ │ v v v v │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 第二层统一Reader接口 (BaseReader) │ │ │ │ │ │ │ │ load_data() ── Document[] │ │ │ │ async_load_data() ── Document[] │ │ │ │ lazy_load_data() ── Generator[Document] │ │ │ └──────────────────────────┬──────────────────────────────────┘ │ │ │ │ │ v │ │ ┌─────────────────────────────────────────────────────────────┐ │ │ │ 第三层文档标准化 (Document Standardization) │ │ │ │ │ │ │ │ text: 提取的文本内容 │ │ │ │ metadata: {filename, created_at, source_type, ...} │ │ │ │ doc_id: 唯一标识符 │ │ │ └─────────────────────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────┘所有Reader都继承自BaseReader基类核心接口只有两个方法load_data()→ 返回Document[]列表lazy_load_data()→ 返回生成器适合大文件流式处理这种设计意味着不管你从哪种数据源读取拿到的都是统一的Document对象后续的解析、分块、索引逻辑完全不感知数据来源。二、SimpleDirectoryReader万能入口对于本地文件加载SimpleDirectoryReader是首选工具。它的设计哲学是自动判断、开箱即用。fromllama_index.coreimportSimpleDirectoryReader# 最简形式自动识别目录下所有支持格式的文件documentsSimpleDirectoryReader(./data).load_data()格式自动识别SimpleDirectoryReader内置了文件类型到Reader的映射表文件扩展名自动使用的Reader说明.pdfPDFReader支持文本提取和基础表格解析.docxDocxReaderWord文档保留段落结构.xlsx/.csvPandasCSVReader表格数据自动转DataFrame.mdMarkdownReader保留标题层级和链接结构.html/.htmUnstructuredReaderHTML页面基于beautifulsoup解析.txtDefaultReader纯文本原样读取.jsonJSONReader结构化JSON数据精细控制实际项目中你几乎一定会用到更精细的配置fromllama_index.coreimportSimpleDirectoryReader readerSimpleDirectoryReader(input_dir./company_docs,required_exts[.md,.pdf,.docx],# 只加载指定格式exclude[.gitignore,README.md],# 排除特定文件recursiveTrue,# 递归子目录filename_as_idTrue,# 用文件名作为doc_idnum_files_limit100,# 限制文件数量调试用)documentsreader.load_data()print(f加载了{len(documents)}个文档)按需指定Reader当自动识别不够用时可以手动指定fromllama_index.readers.fileimportPDFReader,MarkdownReader readerSimpleDirectoryReader(input_dir./docs,file_extractor{.pdf:PDFReader(),.md:MarkdownReader(),})工程提示SimpleDirectoryReader适合开发和小规模数据。生产环境建议使用专用的Reader配合元数据增强后面会讲因为SimpleDirectoryReader的元数据比较有限。三、数据库连接器企业数据大量存在于结构化存储中。LlamaIndex为主流数据库提供了专门的ReaderSQL数据库fromllama_index.readers.databaseimportDatabaseReader# PostgreSQLreaderDatabaseReader(uripostgresql://user:passlocalhost:5432/mydb)# 查询特定表documentsreader.load_data(querySELECT title, content, created_at FROM articles WHERE status published)MongoDBfromllama_index.readers.databaseimportMongoReader readerMongoReader(urimongodb://localhost:27017,db_nameknowledge_base)documentsreader.load_data(collection_namedocs,field_names[title,body,tags],query_dict{status:active})数据库Reader的关键设计数据库Reader有一个特殊之处你需要自己写SQL/查询语句来控制数据的提取范围。这不是缺陷而是必要的灵活性——因为数据库表结构千差万别框架不可能猜出你要查哪些字段。┌───────────────────────────────────────────────────┐ │ 数据库Reader使用流程 │ │ │ │ ① 确定目标表和字段 │ │ SELECT title, content, category │ │ FROM articles │ │ WHERE department engineering │ │ │ │ ② 字段映射到Document结构 │ │ content ── Document.text │ │ 其他字段 ── Document.metadata │ │ │ │ ③ 生成Document列表 │ │ 每行结果 一个Document对象 │ │ │ │ ④ 进入标准RAG管线 │ │ 分块 ── 索引 ── 检索 ── 生成 │ │ │ └───────────────────────────────────────────────────┘工程建议对于大型数据库表务必加上WHERE条件限制数据范围并考虑分批加载num_rows_limit。一次性加载百万行数据进内存会导致OOM。四、云存储与SaaS平台连接器对象存储S3 / Azure Blob / GCSfromllama_index.readers.s3importS3Reader readerS3Reader(bucketmy-company-docs,prefixknowledge_base/,# 指定目录前缀aws_access_key_id...,aws_secret_access_key...,)documentsreader.load_data()Notionfromllama_index.readers.notionimportNotionPageReader readerNotionPageReader(integration_tokenntn_xxx...)documentsreader.load_data(page_ids[page_id_1,page_id_2])Google Drivefromllama_index.readers.googleimportGoogleDriveReader readerGoogleDriveReader(service_account_key/path/to/service_account.json,folder_ids[folder_id_1],)documentsreader.load_data()Web爬取fromllama_index.readers.webimportSimpleWebPageReader readerSimpleWebPageReader()documentsreader.load_data(urls[https://docs.example.com/guide/intro])LlamaIndex还支持深度爬取BeautifulSoupWebReader、TrafilaturaWebReader以及通过AsyncWebPageReader实现并发爬取适合批量抓取站内文档。五、元数据设计数据加载的灵魂文档加载出来之后元数据的质量直接决定了后续检索的效果。LlamaIndex的Document元数据是一个普通的Python字典但它承载了过滤、溯源、权限控制等关键功能。元数据增强模式fromllama_index.coreimportSimpleDirectoryReader documentsSimpleDirectoryReader(./data).load_data()# 为每个文档追加自定义元数据fordocindocuments:# 保留原始元数据文件名、创建时间等original_metadoc.metadata# 追加业务元数据doc.metadata{**original_meta,category:classify_document(doc.text),# 自动分类department:extract_department(doc.text),# 提取所属部门confidence:compute_confidence(doc.text),# 内容可信度评分tags:extract_keywords(doc.text),# 关键词标签version:2.0,access_level:internal,# 权限标记}元数据驱动的检索过滤元数据的价值在检索阶段才真正体现。你可以基于元数据进行预过滤大幅减少向量搜索的计算量fromllama_index.core.vector_storesimportMetadataFilters,FilterCondition# 只在技术文档类别中搜索且必须是2025年之后的filtersMetadataFilters(filters[{key:category,value:技术文档},{key:created_date,value:2025-01-01,operator:},],conditionFilterCondition.AND,)query_engineindex.as_query_engine(similarity_top_k5,filtersfilters,# 先元数据过滤再做向量检索)┌─────────────────────────────────────────────────────┐ │ 元数据过滤 vs 纯向量检索 │ │ │ │ 纯向量检索: │ │ 全量10万个节点 ── Embedding计算 ── Top-5 │ │ 耗时: ~200ms (所有节点都要计算相似度) │ │ │ │ 元数据预过滤 向量检索: │ │ 全量10万个节点 ── 元数据过滤 ── 500个候选 ── Top-5 │ │ 耗时: ~20ms (只对候选集计算相似度) │ │ │ │ 性能提升: ~10x │ └─────────────────────────────────────────────────────┘工程实践如果你的文档库超过10万个节点元数据过滤几乎是必需的。建议至少维护category分类、source_type来源类型、created_date创建时间三个基础字段。六、企业级数据接入方案当数据规模和企业管控要求上来了SimpleDirectoryReader就不太够用了。以下是几个企业级场景的处理方案6.1 增量同步全量加载百万文档再建索引太慢了。生产环境需要增量同步机制importhashlibfromdatetimeimportdatetimeclassIncrementalLoader:def__init__(self,checkpoint_filesync_checkpoint.json):self.checkpoint_filecheckpoint_file self.last_syncself._load_checkpoint()def_load_checkpoint(self):# 从检查点文件读取上次同步时间ifos.path.exists(self.checkpoint_file):withopen(self.checkpoint_file)asf:returnjson.load(f).get(last_sync)returnNonedefload_new_documents(self,directory):只加载上次同步之后新增或修改的文件all_filesglob(f{directory}/**/*,recursiveTrue)new_documents[]forfilepathinall_files:ifnotos.path.isfile(filepath):continuemod_timedatetime.fromtimestamp(os.path.getmtime(filepath))ifself.last_syncandmod_timedatetime.fromisoformat(self.last_sync):continuereaderSimpleDirectoryReader(input_files[filepath])docsreader.load_data()new_documents.extend(docs)# 更新检查点self._save_checkpoint()returnnew_documents6.2 数据脱敏企业数据接入时敏感信息身份证号、手机号、财务数据必须在进入RAG管线前脱敏importredefsanitize_document(text,patternsNone):对文档文本进行脱敏处理ifpatternsisNone:patterns{r\d{17}[\dXx]:[身份证号],# 身份证r1[3-9]\d{9}:[手机号],# 手机号r\d{4}[- ]?\d{4}[- ]?\d{4}:[银行卡号],# 银行卡r[a-zA-Z0-9._%-][a-zA-Z0-9.-]\.[a-zA-Z]{2,}:[邮箱],# 邮箱}forpattern,replacementinpatterns.items():textre.sub(pattern,replacement,text)returntext# 在文档加载后立即脱敏fordocindocuments:doc.textsanitize_document(doc.text)doc.metadata[sanitized]True6.3 版本管理同一份文档可能被多次更新索引中应该只保留最新版本defdeduplicate_documents(documents):按doc_id去重只保留每个文档的最新版本latest{}fordocindocuments:doc_iddoc.metadata.get(doc_id,doc.doc_id)versiondoc.metadata.get(version,0)ifdoc_idnotinlatestorversionlatest[doc_id].metadata.get(version,0):latest[doc_id]docreturnlist(latest.values())6.4 权限控制企业级场景中不同用户只能搜索自己有权限查看的文档defbuild_user_filters(user_roles,user_department):根据用户权限构建元数据过滤器filters_list[]ifadminnotinuser_roles:# 非管理员只能看本部门 公开文档filters_listMetadataFilters(filters[{key:access_level,value:public},{key:department,value:user_department},],conditionFilterCondition.OR,)returnfilters_list# 查询时注入权限过滤器user_filtersbuild_user_filters([developer],engineering)query_engineindex.as_query_engine(similarity_top_k5,filtersuser_filters,)七、数据预处理管线一个生产级的文档加载流程远不止读取文件这么简单。完整的数据预处理管线通常包含五个阶段┌──────────────────────────────────────────────────────────────────┐ │ 数据预处理管线五个阶段 │ ├──────────────────────────────────────────────────────────────────┤ │ │ │ ① 加载 (Loading) │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 原始文件 ──Reader── Document[] │ │ │ │ 关键格式自动识别、错误文件跳过、并发加载 │ │ │ └──────────────────────────┬─────────────────────────────────┘ │ │ │ │ │ ② 解析 (Parsing) v │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ Document[] ──NodeParser── Node[] │ │ │ │ 关键格式适配、结构提取表格/代码块/标题 │ │ │ └──────────────────────────┬─────────────────────────────────┘ │ │ │ │ │ ③ 清洗 (Cleaning) v │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 去除HTML标签、多余空白、页眉页脚、水印文本 │ │ │ │ 修复编码错误、断行、特殊字符 │ │ │ └──────────────────────────┬─────────────────────────────────┘ │ │ │ │ │ ④ 标准化 (Standardization) v │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 统一编码格式(UTF-8)、日期格式、命名规范 │ │ │ │ 标注语言标记、分类标签、关键词 │ │ │ └──────────────────────────┬─────────────────────────────────┘ │ │ │ │ │ ⑤ 元数据增强 (Metadata v │ │ Enhancement) │ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 追加权限标记、版本号、数据来源、可信度评分 │ │ │ │ 脱敏敏感信息替换、PII检测 │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────┘LlamaIndex的Transformations机制支持你把这五个阶段串成一条处理链fromllama_index.coreimportVectorStoreIndex,SimpleDirectoryReaderfromllama_index.core.ingestionimportIngestionPipelinefromllama_index.core.node_parserimportSentenceSplitterfromllama_index.core.extractorsimport(TitleExtractor,KeywordExtractor,)# 构建完整的摄取管线pipelineIngestionPipeline(transformations[SentenceSplitter(chunk_size1024,chunk_overlap100),TitleExtractor(nodes5),# 自动提取文档标题KeywordExtractor(keywords10),# 自动提取关键词# embed_model 会在pipeline内部自动注入])# 一键执行nodespipeline.run(documentsdocuments)indexVectorStoreIndex(nodes)IngestionPipeline的好处是把所有处理步骤封装在一起你可以随时插入新的步骤比如自定义的清洗器、元数据增强器也可以缓存中间结果避免重复计算。八、连接器选型速查表数据源推荐Reader适用规模注意事项本地混合格式SimpleDirectoryReader1万文件注意exclude排除无关文件大量PDFPDFReader 并发加载1万-100万需要自建分批加载逻辑PostgreSQLDatabaseReader全量/增量复杂查询需手写SQLMongoDBMongoReader全量/增量注意field_names映射S3/Azure BlobS3Reader/AzureBlobReader海量文件按prefix分区加载NotionNotionPageReader5000页API有速率限制ConfluenceConfluenceReader企业级需配置空间权限Web页面SimpleWebPageReader / Trafilatura站点级注意robots.txt九、总结数据加载是RAG系统的第一步也是最容易踩坑累积的地方。文件格式识别不准、元数据缺失、增量同步逻辑写错、敏感数据未脱敏——这些问题在开发阶段可能不明显但上了生产后就会一个接一个地爆发。LlamaIndex的Data Connectors生态通过统一的Reader接口把100多种数据源标准化为Document对象IngestionPipeline则把加载后的预处理流程串成了可配置的处理链。这两者结合让数据接入从每次从头写变成了配置组装。但要注意Reader只是帮你解决了格式适配的问题业务层面的逻辑权限控制、增量同步、数据脱敏仍然需要你自己实现。第3篇我们会进入NodeParser的世界看看Document变成Node之后分块策略如何影响检索质量。参考资料LlamaIndex Data Connectors 官方文档https://docs.llama_index.ai/en/stable/understanding/loading/loading/LlamaIndex SimpleDirectoryReaderhttps://docs.llama_index.ai/en/stable/module_guides/loading/simpledirectoryreader/LlamaIndex IngestionPipelinehttps://docs.llama_index.ai/en/stable/module_guides/ingestion/ingestion_pipeline/

相关新闻