
AI 驱动的智能数据清洗从规则引擎到大模型辅助的工程化链路一、数据清洗的日常困境数据分析师大部分时间都在做数据清洗。缺失值填充、异常值识别、格式标准化、重复记录去重——这些操作看似简单但实际处理时经常遇到需要人工判断的边界情况。比如一个地址字段里可能混入电话号码、邮编甚至乱码一个金额列可能同时存在1,234.56、1234.56和1234.56元三种格式。传统方法用正则表达式和硬编码规则数据源多了之后维护起来很麻烦。规则引擎能处理格式问题但遇到语义层面的异常就无能为力了——它知道年龄999是异常值却无法判断年龄22、职业退休的组合是否合理。大模型的优势在于能理解语义找出规则引擎发现不了的深层异常。二、规则引擎与大模型协作的清洗架构智能数据清洗不是用大模型替代规则而是让规则引擎处理确定性高、频次高的问题大模型处理语义模糊、需要上下文推理的问题。两者配合才能兼顾效率与准确性。flowchart TB A[原始数据输入] -- B{规则引擎预检} B --|格式异常| C[正则修复] B --|缺失值| D[统计填充] B --|确定性异常| E[规则标记] B --|语义模糊| F[大模型推理层] F --|上下文推理| G[异常识别] F --|语义匹配| H[字段修正] F --|跨列关联| I[一致性校验] C -- J[清洗后数据] D -- J E -- J G -- J H -- J I -- J J -- K{质量校验} K --|通过| L[输出] K --|未通过| B2.1 规则引擎确定性清洗的第一道防线规则引擎处理的问题有明确的判断标准执行速度快、成本极低。典型场景包括格式标准化统一日期格式、去除千分位逗号、标准化电话号码缺失值填充数值列用中位数填充、分类列用众数填充确定性异常超出物理范围的值如年龄 0 或 1502.2 大模型推理层语义异常的第二道防线大模型擅长处理需要上下文推理的清洗任务跨列一致性校验判断年龄22、职业退休是否矛盾语义模糊字段修正将北京市朝阳区xxx路中的地址标准化自由文本结构化从大概3月份买了台电脑花了5000多中提取结构化信息三、生产级智能数据清洗的代码实现3.1 规则引擎模块import re import pandas as pd import numpy as np from typing import Optional class RuleEngine: 规则引擎处理确定性数据清洗任务 def __init__(self): self.rules [] self.stats {applied: 0, fixed: 0} def clean_currency(self, series: pd.Series) - pd.Series: 统一金额格式去除货币符号和千分位逗号 def parse_amount(val): if pd.isna(val): return np.nan val_str str(val).strip() # 去除人民币符号、千分位逗号、元等后缀 cleaned re.sub(r[¥$,元], , val_str) try: return float(cleaned) except ValueError: return np.nan # 无法解析的标记为缺失 result series.apply(parse_amount) fixed_count result.notna().sum() - series.notna().sum() self.stats[fixed] max(0, fixed_count) return result def clean_phone(self, series: pd.Series) - pd.Series: 标准化手机号格式保留纯数字校验位数 def parse_phone(val): if pd.isna(val): return np.nan digits re.sub(r[^\d], , str(val)) # 中国大陆手机号11 位1 开头 if re.match(r^1[3-9]\d{9}$, digits): return digits return np.nan # 格式不合法的标记为缺失 return series.apply(parse_phone) def fill_missing_numeric(self, series: pd.Series, strategy: str median) - pd.Series: 数值列缺失值填充 if strategy median: fill_value series.median() elif strategy mean: fill_value series.mean() else: raise ValueError(f不支持的填充策略: {strategy}) missing_count series.isna().sum() result series.fillna(fill_value) self.stats[fixed] missing_count self.stats[applied] 1 return result def clip_outliers(self, series: pd.Series, lower: Optional[float] None, upper: Optional[float] None) - pd.Series: 截断超出物理范围的异常值 return series.clip(lowerlower, upperupper)3.2 大模型推理模块import json from dataclasses import dataclass from openai import OpenAI dataclass class LLMCleanConfig: model: str deepseek-chat temperature: float 0.1 # 为了减少错误温度参数设得很低 batch_size: int 20 # 单次推理处理的最大行数 max_retries: int 2 class LLMCleaner: 大模型推理层处理语义模糊的数据清洗任务 def __init__(self, config: LLMCleanConfig None): self.config config or LLMCleanConfig() self.client OpenAI() def check_cross_column_consistency(self, row: dict, columns: list[str]) - dict: 跨列一致性校验识别语义矛盾 prompt f你是一个数据质量检查专家。请判断以下数据行中各字段之间是否存在语义矛盾。 数据行 {json.dumps(row, ensure_asciiFalse, indent2)} 请以 JSON 格式返回结果 {{ is_consistent: true/false, inconsistencies: [矛盾描述1, 矛盾描述2], suggested_fix: {{列名: 修正值}} }} 仅返回 JSON不要添加其他内容。 response self.client.chat.completions.create( modelself.config.model, messages[{role: user, content: prompt}], temperatureself.config.temperature, ) try: result json.loads(response.choices[0].message.content) return result except json.JSONDecodeError: return {is_consistent: True, inconsistencies: [], suggested_fix: {}} def standardize_free_text(self, text: str, target_schema: dict) - dict: 自由文本结构化从非结构化文本中提取字段 prompt f请从以下文本中提取结构化信息按照目标 schema 填充。 文本{text} 目标 Schema{json.dumps(target_schema, ensure_asciiFalse)} 请以 JSON 格式返回提取结果缺失字段填 null。仅返回 JSON。 response self.client.chat.completions.create( modelself.config.model, messages[{role: user, content: prompt}], temperatureself.config.temperature, ) try: return json.loads(response.choices[0].message.content) except json.JSONDecodeError: return {k: None for k in target_schema} def batch_consistency_check(self, df: pd.DataFrame, columns: list[str]) - pd.DataFrame: 批量跨列一致性校验 results [] for i in range(0, len(df), self.config.batch_size): batch df.iloc[i:i self.config.batch_size] for _, row in batch.iterrows(): row_dict {col: row[col] for col in columns if col in row.index} result self.check_cross_column_consistency(row_dict, columns) results.append(result) # 将校验结果合并回原始 DataFrame result_df df.copy() result_df[_is_consistent] [r[is_consistent] for r in results] result_df[_inconsistencies] [json.dumps(r[inconsistencies], ensure_asciiFalse) for r in results] return result_df3.3 编排层规则引擎与大模型的协同class SmartDataCleaner: 智能数据清洗编排器规则引擎优先大模型兜底 def __init__(self): self.rule_engine RuleEngine() self.llm_cleaner LLMCleaner() def clean(self, df: pd.DataFrame, config: dict) - pd.DataFrame: 执行完整清洗流程 config 示例 { currency_columns: [amount, price], phone_columns: [phone], numeric_fill: {age: median, salary: mean}, clip_columns: {age: {lower: 0, upper: 150}}, consistency_check: [age, occupation, education] } result df.copy() # 第一步规则引擎处理确定性清洗 for col in config.get(currency_columns, []): if col in result.columns: result[col] self.rule_engine.clean_currency(result[col]) for col in config.get(phone_columns, []): if col in result.columns: result[col] self.rule_engine.clean_phone(result[col]) for col, strategy in config.get(numeric_fill, {}).items(): if col in result.columns: result[col] self.rule_engine.fill_missing_numeric( result[col], strategystrategy ) for col, bounds in config.get(clip_columns, {}).items(): if col in result.columns: result[col] self.rule_engine.clip_outliers( result[col], **bounds ) # 第二步大模型处理语义模糊的清洗任务 consistency_cols config.get(consistency_check, []) if consistency_cols: result self.llm_cleaner.batch_consistency_check( result, consistency_cols ) return result四、规则引擎与大模型协作的架构权衡维度规则引擎大模型推理执行速度毫秒级可处理百万行秒级受 API 限流约束单行成本几乎为零约 0.01–0.05 元/行准确率确定性场景 100%语义场景约 90%–95%维护成本规则随数据源增多而膨胀Prompt 调优成本高但通用性强可解释性规则透明可审计推理过程为黑盒关键权衡成本控制大模型推理的成本是规则引擎的数千倍。生产环境中应先用规则引擎过滤掉 80% 的确定性异常仅将剩余 20% 的模糊数据送入大模型。延迟容忍大模型推理的延迟在秒级不适合实时清洗流水线。对于流式数据建议采用异步清洗模式——规则引擎实时处理大模型离线批处理。幻觉风险大模型可能过度修正将正确的数据误判为异常。解决方案是在大模型输出后增加人工审核环节对修正比例超过阈值的批次进行抽样复核。五、总结智能数据清洗的核心思路是规则引擎优先、大模型兜底——确定性任务交给规则引擎保证速度和成本语义模糊任务交给大模型保证准确性。两者协同的关键在于编排层的分流逻辑规则引擎处理格式异常、缺失值填充和确定性异常大模型处理跨列一致性校验和自由文本结构化。落地步骤第一步梳理现有清洗规则将确定性规则迁移至规则引擎模块第二步识别规则引擎无法覆盖的语义异常类型设计对应的 Prompt 模板第三步构建编排层实现自动分流并设置成本监控与质量校验机制。关键原则是——能用规则解决的不要用大模型大模型只处理规则无法覆盖的语义层问题。