生产级多维聚合:从Pandas groupby到业务语义建模

发布时间:2026/6/10 21:58:14

生产级多维聚合:从Pandas groupby到业务语义建模 1. 项目概述为什么多维聚合不是“加个groupby”就能搞定的事我在银行数据平台组干了八年从最早用SQL写几十行嵌套子查询做客户分层到后来带团队搭实时风险计算引擎踩过的坑比写的代码还多。今天聊的这个主题——“多维聚合中的数据操作”听起来像教科书里的一个章节标题但实际在生产环境里它直接决定着风控模型能不能准时上线、月度经营分析报告能不能在凌晨三点前自动生成、甚至某次大促期间的实时交易监控大屏会不会突然卡死。你肯定见过这样的场景业务方发来一封加急邮件“请尽快输出各区域、各产品线、各客户等级的当月收入、毛利、客单价、复购率、NPS均值再叠加近30天滚动环比和YTD累计值”。如果你第一反应是打开Jupyter敲df.groupby([region,product,tier])[revenue].sum()然后发现结果是个MultiIndex Series导出Excel后老板说“这列名怎么是tuple我要的是表格”或者你硬着头皮写五个独立的groupby再merge跑完发现内存爆了任务超时被调度系统kill掉……那说明你还没真正吃透多维聚合的底层逻辑。这不是Pandas语法手册的搬运而是我带着三个真实项目复盘出来的经验一个信用卡反欺诈系统的实时特征计算模块一个财富管理平台的客户资产健康度看板还有一个跨境支付网关的手续费分润结算引擎。它们共同指向一个事实——多维聚合的本质是把业务语义精准映射到数据结构上同时扛住数据量、时效性、可维护性的三重压力。所谓“高级聚合”高级的从来不是函数名而是你对“这笔钱到底属于谁、在什么时间、以什么方式产生、要回答什么问题”的理解深度。关键词里提到的“Towards AI”其实恰恰点出了这类内容的价值锚点它不教你怎么调参也不讲模型架构而是聚焦在AI落地最常被忽视的“地基层”——数据如何被正确切片、折叠、拉伸、重组。就像盖楼再炫酷的设计图如果钢筋没绑牢、混凝土标号不够迟早出事。本文所有案例都来自真实生产代码库已脱敏所有参数选择都有业务依据所有避坑提示都来自凌晨两点的线上告警。接下来咱们就按实战顺序一层层拆解这些让分析师夜不能寐的聚合难题。2. 核心思路拆解从“能跑通”到“能扛住”的四重跃迁很多初学者学完agg()函数觉得“哦会了”结果一上生产环境就懵。我带过不少应届生他们写的聚合脚本在本地10万行数据上跑得飞快但放到银行日均3亿笔交易的数仓里要么OOM要么耗时从2分钟涨到47分钟。问题出在哪不是语法错了而是设计思路上缺了四重关键跃迁。下面这四点是我给团队新人必讲的“聚合心法”。2.1 第一重跃迁从“单维度统计”到“业务维度建模”看原文第一个例子按merchant_category分组算均值和中位数。这没错但如果你只停在这一步就永远在做报表不是在建模。真正的业务维度建模要回答三个问题这个维度是谁定义的它的生命周期有多长它和其他维度的关系是什么比如merchant_category商户类别在银行内部它不是随便填的字符串。它由收单部门维护有标准编码体系如银联MCC码每年更新两次。它和region地区存在强关联——华南地区的“餐饮”类商户和东北地区的“餐饮”类商户客群画像、交易频次、风险特征完全不同。所以单纯按category聚合得到的只是一个统计数字而按[region,category]联合聚合才开始逼近业务真相。我在做跨境支付分润时就吃过亏。最初只按country国家分组算手续费结果发现东南亚几个小国的数据波动极大。后来才发现这些国家的商户实际由新加坡的收单机构统一接入真正的业务责任主体是“收单机构国家”组合。于是我们重构了维度表把acquiring_institution收单机构作为一级维度country作为二级维度聚合逻辑立刻稳定下来。维度不是数据字段而是业务规则的快照。选错维度等于用错误的地图导航。2.2 第二重跃迁从“函数堆砌”到“计算意图显性化”原文用了lambda x: x.max() - x.min()算范围很简洁。但生产环境里我严禁团队这么写。为什么因为三个月后当风控同事问“这个range阈值300是怎么定的”你翻代码只能看到一行lambda根本看不出业务逻辑。更可怕的是当需要把这个逻辑复用到另一个指标比如交易时长范围时你得再抄一遍改一处漏一处。我的做法是所有自定义聚合必须封装成命名函数并强制包含业务注释和参数校验。比如上面那个范围计算我会写成def transaction_range(series, threshold_percentile95): 计算交易金额范围max-min但排除极端异常值 业务依据根据2023年全量数据回测95%分位数外的交易占总量0.3%属设备故障或测试数据 使用场景用于动态调整反欺诈规则阈值避免因单笔异常大额交易误触发告警 if len(series) 5: return np.nan # 数据量不足不参与计算 # 剔除95%分位数外的离群值非简单max/min upper_bound np.percentile(series, threshold_percentile) filtered_series series[series upper_bound] if len(filtered_series) 3: return np.nan return filtered_series.max() - filtered_series.min()看到没函数名transaction_range比lambda清晰十倍docstring里写了业务依据2023年回测、使用场景反欺诈阈值、甚至数据量兜底逻辑。这已经不是代码而是可执行的业务文档。当新同事接手时不用问人看函数就知道该不该用、怎么用、边界在哪。2.3 第三重跃迁从“静态切片”到“时空双维度编织”滚动窗口rolling和扩展窗口expanding常被当成“时间序列专属技巧”这是巨大误解。它们本质是给静态聚合注入时间维度的语法糖。关键在于窗口大小不是技术参数而是业务节奏的翻译器。原文用3天滚动平均算营收看似合理。但在我们财富管理平台这个“3”就完全不对。为什么因为客户申购赎回是T1确认净值公布是每个交易日收盘后而销售团队的晨会复盘周期是“上周五到本周四”。所以我们的滚动窗口必须是5个交易日且必须对齐自然周周一至周五不能简单用freqD。否则周五的滚动均值会包含下周一的数据导致晨会看到的“最新趋势”其实是滞后的。更隐蔽的坑在扩展窗口。原文用expanding().sum()算累计营收没问题。但当我们算“客户生命周期价值LTV”时就不能直接用expanding().sum()。因为LTV要求1只计算有效客户开户满30天且有首笔交易2剔除销户客户的历史数据3按客户首次交易日期对齐起始点。这意味着扩展窗口的起点不是数据表的第一行而是每个客户的first_transaction_date。这需要先用transform(min)算出每个客户的首笔时间再用apply配合自定义函数实现——窗口的“起点”和“步长”必须由业务规则驱动而非数据物理顺序。2.4 第四重跃迁从“结果导向”到“下游友好型输出”unstack()操作看起来只是把MultiIndex转成DataFrame但背后是数据交付契约的建立。原文例子中region作行、product作列输出一个矩阵。这个矩阵能直接喂给BI工具吗不一定。要看BI工具的元数据规范。我在对接Tableau时就栽过跟头。Tableau要求维度字段名必须是纯英文、无空格、无特殊字符而我们的region字段里有“华东-上海”这种带短横线的值。unstack()后生成的列名直接变成华东-上海Tableau读取时报错。解决方案不是改数据源业务不允许而是在unstack()后立即做列名标准化# unstack后标准化列名 result df_sales.groupby([region,product])[revenue].mean().unstack() result.columns [col.replace( , _).replace(-, _) for col in result.columns] # 华东-上海 - 华东_上海 result.index.name customer_segment # 显式设置索引名避免BI工具识别为Unnamed:0这还只是表层。更深层的是数据语义的保真。unstack()后缺失值默认是NaN但业务上“某区域某产品无销售”和“数据未采集”是两回事。前者要填0表示真实零销量后者要留NaN表示数据缺失。所以unstack(fill_value0)不是可选项而是必选项且必须在函数注释里写明“fill_value0 表示该组合在报告期内无交易非数据丢失”。这四重跃迁构成了生产级聚合的护城河。它不追求炫技而追求在业务、工程、协作三者间找到那个最稳的平衡点。接下来我们就用这四重思维逐个击破实操中的硬骨头。3. 实操细节解析那些文档里绝不会写的“脏活累活”现在进入最硬核的部分。我把原文的五个技术点全部还原到真实生产环境的上下文中告诉你每一步背后的真实考量、踩过的坑、以及为什么必须这样写。这些细节往往决定了你的聚合脚本是能用还是能扛住百万级并发调用。3.1 多列多函数聚合别让列名嵌套毁掉整个ETL流程原文的agg({transaction_amount: [mean,median], processing_fee: [min,max]})示例很经典但生产环境里这行代码后面藏着至少三道关卡。第一关列名嵌套的“平铺地狱”输出结果是MultiIndex Columns形如(transaction_amount, mean)。当你想把它存入数据库或传给下游API时绝大多数系统不支持tuple列名。强行用reset_index()会把索引变普通列但原始分组键merchant_category又成了数据的一部分后续join极易出错。我的标准解法是用agg()后立即pipe(flatten_columns)并封装成可复用函数def flatten_columns(df): 将MultiIndex Columns展平为扁平列名用下划线连接 if isinstance(df.columns, pd.MultiIndex): df.columns [_.join(col).strip() for col in df.columns.values] return df # 应用 result (df.groupby(merchant_category) .agg({transaction_amount: [mean,median], processing_fee: [min,max]}) .pipe(flatten_columns)) # 输出列名transaction_amount_mean, transaction_amount_median, processing_fee_min, processing_fee_max第二关数据类型一致性陷阱你以为mean和median都是float64错。当transaction_amount列里混有None或np.nan时median()返回float64但mean()在某些pandas版本下可能返回object类型尤其当列含字符串混合时。下游系统读取时直接报错。解决方案强制类型转换且转换逻辑写在agg字典里result df.groupby(merchant_category).agg({ transaction_amount: [ (amount_mean, lambda x: pd.to_numeric(x, errorscoerce).mean()), (amount_median, lambda x: pd.to_numeric(x, errorscoerce).median()) ], processing_fee: [ (fee_min, lambda x: pd.to_numeric(x, errorscoerce).min()), (fee_max, lambda x: pd.to_numeric(x, errorscoerce).max()) ] })这里用元组(amount_mean, lambda...)替代字符串既指定了新列名又确保了类型安全。pd.to_numeric(..., errorscoerce)会把无法转数字的值设为NaN避免类型混乱。第三关空组处理——业务上不存在的组代码里必须显式声明银行风控要求即使某类商户当月无交易报表中也必须显示该类别金额为0。但groupby默认会丢弃空组。解决方案不是reindex()太慢而是用pd.Categorical提前定义全量类别# 定义全量商户类别从业务字典获取 all_categories [Retail, Dining, Travel, Groceries, Electronics, Healthcare] df[merchant_category] pd.Categorical(df[merchant_category], categoriesall_categories) result (df.groupby(merchant_category, observedFalse) # observedFalse 关键保留未出现的类别 .agg({transaction_amount: sum}) .fillna(0)) # 空组自动补0observedFalse是核心开关它告诉pandas“别只看数据里有的我要全量维度”。这行代码省去了后续补全逻辑且性能极佳。提示在金融类ETL中空组处理是SLA红线。某次大促期间因未处理空组导致“医疗健康”类商户的风控评分缺失被监管问询。从此我们所有聚合脚本开头必加pd.Categorical声明。3.2 自定义聚合函数当业务逻辑复杂到需要“写论文”时原文的weighted_average函数很优雅但生产环境里权重逻辑远不止“线性递增”。让我分享一个真实的反欺诈场景计算客户“近期交易活跃度得分”需综合考虑交易频次、金额、时间衰减、设备稳定性四个维度。这个函数不能写成一个lambda必须是一个完整的类原因有三1需要预计算全局统计量如全量客户平均交易间隔2需要缓存中间结果避免重复计算3需要支持单元测试。以下是精简版实现class TransactionActivityScorer: def __init__(self, base_window_days30, decay_factor0.95): 初始化活跃度评分器 base_window_days: 基础时间窗口天用于计算基准频次 decay_factor: 时间衰减因子越接近1衰减越慢适合长期客户 self.base_window_days base_window_days self.decay_factor decay_factor # 预加载全局统计从缓存或配置中心读取避免每次调用都查库 self.global_avg_interval 3.2 # 全量客户平均交易间隔天 def _calculate_time_weight(self, transaction_dates): 计算每笔交易的时间衰减权重 if len(transaction_dates) 2: return np.ones(len(transaction_dates)) # 以最近一笔交易为t0向前推算天数 latest_date transaction_dates.max() days_since_latest (latest_date - transaction_dates).dt.days # 指数衰减weight decay_factor ^ days weights np.power(self.decay_factor, days_since_latest) return weights def _calculate_device_stability(self, device_ids): 计算设备稳定性得分同设备交易占比 if len(device_ids) 2: return 1.0 # 同设备交易次数 / 总交易次数 main_device device_ids.mode().iloc[0] if not device_ids.mode().empty else device_ids.iloc[0] stability_score (device_ids main_device).sum() / len(device_ids) return min(stability_score, 0.95) # 封顶0.95避免单一设备过度影响 def __call__(self, group_df): 对单个客户组计算活跃度得分 group_df: 包含date, amount, device_id等列的DataFrame if len(group_df) 3: return 0.0 # 交易太少无法评估 # 1. 时间衰减权重 time_weights self._calculate_time_weight(group_df[date]) # 2. 金额归一化避免大额交易主导 amount_normalized group_df[amount] / group_df[amount].mean() # 3. 设备稳定性 device_score self._calculate_device_stability(group_df[device_id]) # 4. 综合得分 (频次得分 * 0.4) (金额得分 * 0.3) (设备得分 * 0.3) frequency_score len(group_df) / self.base_window_days # 日均交易频次 amount_score np.average(amount_normalized, weightstime_weights) final_score (frequency_score * 0.4 amount_score * 0.3 device_score * 0.3) return round(final_score, 3) # 在聚合中使用 scorer TransactionActivityScorer(base_window_days30) result df.groupby(customer_id).apply(scorer)看到没这个函数本身就是一个微型业务模型。它把“活跃度”这个模糊概念拆解为可量化、可验证、可解释的三个子维度。当风控策略调整时我们只需修改__call__里的权重系数无需动底层逻辑。这才是自定义聚合的终极形态——不是写代码而是把业务规则编译成数据指令。3.3 滚动窗口聚合时间对齐才是真正的难点原文的滚动平均示例用rolling(window3).mean()就完了。但生产环境里window3这个数字背后是整整两天的跨部门对齐会议。让我用一个真实案例说明为零售银行APP设计“近7天消费趋势”指标用于向客户推送个性化优惠。表面看就是rolling(7).mean()。但问题来了数据延迟交易流水T1入库即今天10月25日看到的是10月24日的数据。那么“近7天”应该算到10月24日还是10月25日节假日效应春节假期7天客户消费模式剧变用固定7天窗口会严重失真。业务口径市场部定义的“近7天”是自然周周一至周日而技术部按数据入库时间算是滚动7天。我的解决方案是放弃rolling()的默认行为用resample()rolling()组合强制对齐业务日历def get_business_weekly_trend(df, target_dateNone): 计算客户近7天消费趋势严格对齐自然周 target_date: 业务目标日期如今日用于确定计算截止点 if target_date is None: target_date pd.Timestamp.today().normalize() # 取今日0点 # 1. 过滤数据只取target_date前7天内的交易含target_date当天 cutoff_date target_date - pd.Timedelta(days6) # 从target_date往前推6天共7天 df_filtered df[df[date] cutoff_date].copy() # 2. 按自然周重采样周一为每周第一天 # 先确保date列是datetime再设为索引 df_filtered df_filtered.set_index(date) weekly_resampled df_filtered.resample(W-MON, labelleft, closedleft)[amount].sum() # 3. 对重采样后的周数据做滚动此时窗口是周非天 # 但我们需要的是“近7天”所以这里滚动窗口1只取最新一周 trend_series weekly_resampled.rolling(window1).sum().dropna() # 4. 关键填充缺失周如某周无交易需补0 # 构建完整周索引 full_weeks pd.date_range(startcutoff_date, endtarget_date, freqW-MON) trend_series trend_series.reindex(full_weeks, fill_value0) return trend_series.iloc[-1] # 返回最新一周的消费总额 # 在groupby中应用 df[weekly_trend] df.groupby(customer_id).apply( lambda x: get_business_weekly_trend(x, target_datepd.Timestamp(2024-10-25)) )这段代码的核心思想是滚动窗口的粒度必须与业务决策的粒度一致。市场部看的是“周维度”的趋势我们就必须先resample到周再滚动。强行在日粒度上滚动7天得到的是技术正确、业务错误的结果。这也是为什么我常说“没有bad code只有bad business alignment”。3.4 扩展窗口聚合累计值不是求和而是状态机原文的expanding().sum()算累计营收干净利落。但当我做“客户资金沉淀率”计算时发现expanding()根本不够用。因为沉淀率 客户账户余额 / 客户历史总入金× 100%而“历史总入金”这个分母必须满足只计算该客户开户以来的入金剔除退款、手续费等负向流水按入金时间升序累加不能按数据入库时间。expanding()默认按DataFrame的物理顺序累加但物理顺序 ≠ 业务时间顺序。解决方案是先排序再用cumsum()且必须用transform保证分组内独立计算def calculate_cumulative_deposit(group_df): 计算客户累计入金仅正向流水 group_df: 按customer_id分组的DataFrame含date, amount, flow_type列 # 1. 过滤正向入金flow_type IN deposits group_df[group_df[flow_type] IN].copy() # 2. 按交易时间排序关键 deposits deposits.sort_values(date) # 3. 累计求和 deposits[cumulative_deposit] deposits[amount].cumsum() # 4. 将结果映射回原group_df保持原始顺序和行数 # 用date作为key merge避免索引错位 result group_df.merge( deposits[[date, cumulative_deposit]], ondate, howleft ) # 5. 填充缺失值如该客户无入金则cumulative_deposit为0 result[cumulative_deposit] result[cumulative_deposit].fillna(0) return result[cumulative_deposit] # 应用 df[cumulative_deposit] df.groupby(customer_id).apply(calculate_cumulative_deposit).explode()注意explode()的使用——因为apply返回的是Series of Series需要用explode()展开。这个细节90%的教程都不会提但线上环境不加就会报错。注意在高并发场景下applysort_values性能较差。我们最终用numba重写了核心累加逻辑性能提升8倍。但原则不变扩展窗口的本质是维护一个随时间演进的状态而不是一个数学函数。3.5 多级分组与unstack当行列互换成为数据契约原文的unstack()示例输出一个漂亮的矩阵。但生产环境里这个矩阵要喂给三个不同系统BI看板、下游API、监管报送文件。每个系统对行列、缺失值、数据类型的容忍度都不同。所以unstack()不是终点而是数据交付流水线的起点。BI看板要求行列必须是字符串缺失值填0列名要带业务前缀。下游API要求必须是JSON格式行索引转为字段列名不能有特殊字符。监管报送要求必须是Excel且特定单元格需加批注说明计算逻辑。我的标准做法是写一个deliver_to_target()函数按目标系统类型定制输出def deliver_to_target(result_df, target_systembi): 将unstacked结果交付给不同目标系统 target_system: bi, api, regulatory if target_system bi: # BI看板填0标准化列名重置索引 df_out result_df.fillna(0) df_out.columns [frev_{col.lower().replace( , _)} for col in df_out.columns] df_out df_out.reset_index().rename(columns{region: geo_region}) return df_out elif target_system api: # API转JSON索引转字段列名转驼峰 df_out result_df.reset_index() df_out.columns [to_camel_case(col) for col in df_out.columns] return df_out.to_dict(orientrecords) elif target_system regulatory: # 监管报送生成带批注的Excel from openpyxl import Workbook from openpyxl.comments import Comment wb Workbook() ws wb.active # 写入数据 for r_idx, row in enumerate(dataframe_to_rows(df_out, indexTrue, headerTrue), 1): for c_idx, value in enumerate(row, 1): cell ws.cell(rowr_idx, columnc_idx, valuevalue) if r_idx 1 and c_idx 1: # 列标题行除第一列外 cell.comment Comment(计算逻辑各区域各产品线平均收入按自然月汇总, System) return wb # 返回workbook对象由调用方保存 # 辅助函数字符串转驼峰 def to_camel_case(text): s text.replace(-, ).replace(_, ) s s.split() if len(s) 0: return text return s[0].lower() .join(i.capitalize() for i in s[1:])这个函数把unstack()从一个技术操作升级为数据交付协议的执行器。它确保同一份聚合结果能无缝适配所有下游需求彻底解决“写一次改三次”的运维噩梦。4. 端到端实战从信用卡交易数据到高管决策看板现在我们把前面所有知识点拧成一股绳走一遍真实的端到端流程。这不是教学演示而是我去年在某股份制银行落地的“信用卡客户价值分层”项目。项目目标每天凌晨2点自动生成一份PDF报告发送给零售银行部总经理包含Top 100高价值客户名单、其消费行为特征、风险预警信号。整个流程从原始交易表开始到最终PDF全部由Python脚本驱动。4.1 数据源与清洗别让脏数据毁掉所有高级聚合原始数据来自核心银行系统每日增量同步一张card_transactions表包含约1200万行记录。字段有txn_id,customer_id,card_no,merchant_category,amount,fee,currency,date,time,device_id,ip_address。清洗第一步货币标准化表中currency字段有CNY、USD、HKD三种。amount是原币种金额。高管要看的是人民币等值所以必须统一换算。但汇率不是固定值——每笔交易发生时的实时汇率存储在另一张exchange_rates表中。我的做法是用merge_asof()做时间点对齐而非简单join# exchange_rates表date, currency, rate_cny对人民币汇率 # 按date排序确保merge_asof能正确匹配 exchange_rates exchange_rates.sort_values(date) # 对交易表按date排序后merge df_txn_sorted df_transactions.sort_values(date) df_enriched pd.merge_asof( df_txn_sorted, exchange_rates, ondate, bycurrency, directionbackward, # 取交易时间前最近的汇率 allow_exact_matchesTrue ) # 计算人民币等值 df_enriched[amount_cny] df_enriched[amount] * df_enriched[rate_cny]merge_asof()是关键。它比merge快10倍且能处理“找最近时间点”的业务逻辑。如果用merge得先对每笔交易找汇率O(n²)复杂度根本跑不完。清洗第二步设备指纹去重同一客户用同一设备多次交易可能是正常行为如家庭共用手机也可能是黑产模拟器批量注册。我们定义24小时内同一设备ID关联超过5个不同客户ID视为可疑设备。这需要跨客户聚合但groupby只能按单维度分组。解决方案用transformnunique# 计算每个device_id关联的客户数 df_enriched[device_customer_count] df_enriched.groupby(device_id)[customer_id].transform(nunique) # 标记可疑设备 df_enriched[is_suspicious_device] df_enriched[device_customer_count] 5 # 过滤掉可疑设备的交易风控策略 df_clean df_enriched[~df_enriched[is_suspicious_device]]transform(nunique)是神来之笔。它不改变原DataFrame形状直接在每行上添加一个新列值是该行device_id对应的所有客户数。这比先groupby再merge快得多且内存友好。4.2 核心聚合七层嵌套的业务逻辑现在我们基于df_clean构建高管最关心的七个指标。这不是七个独立聚合而是一个有机整体每一层都依赖上一层的输出。第一层客户基础画像单维度聚合按customer_id计算总交易笔数、总金额CNY、平均单笔、最大单笔、最小单笔、交易天数去重date、首笔交易日、末笔交易日。base_agg df_clean.groupby(customer_id).agg({ amount_cny: [count, sum, mean, max, min], date: [nunique, min, max] }).round(2) # 重命名列 base_agg.columns [txn_count, total_amount_cny, avg_txn_amount, max_txn_amount, min_txn_amount, txn_days, first_txn_date, last_txn_date]第二层多维交叉分析unstack前置按[customer_id,merchant_category]分组算各品类平均交易额再unstack()。但这里有个坑merchant_category有50多个值unstack()后会产生50多列下游系统处理不了。解决方案只保留Top 10高频品类其余归为Other# 先统计各品类交易频次 category_freq df_clean[merchant_category].value_counts() top_10_categories category_freq.head(10).index.tolist() # 将非Top10品类标记为Other df_clean[mc_group] df_clean[merchant_category].apply( lambda x: x if x in top_10_categories else Other ) # 再聚合unstack category_agg (df_clean.groupby([customer_id,mc_group])[amount_cny] .mean() .unstack(fill_value0) .round(2))第三层时间动态指标滚动扩展组合计算每个客户的近30天滚动平均交易额、近30天滚动交易频次、开户至今累计交易额、近7天交易额占累计额比例。# 先按date排序设索引 df_sorted df_clean.sort_values([customer_id,date]).set_index(date) # 滚动计算30天窗口 rolling_30d df_sorted.groupby(customer_id)[amount_cny].rolling(30D).agg([mean,count]) # 扩展计算累计 cumulative df_sorted.groupby(customer_id)[amount_cny].expanding().sum() # 合并结果 temp_df pd.concat([rolling_30d, cumulative], axis1) temp_df.columns [rolling_30d_avg, rolling_30d_count, cumulative_amount] # 计算占比 temp_df[recent_ratio] (temp_df[rolling_30d_avg] * temp_df[rolling_30d_count]) / temp_df[cumulative_amount]第四层风险信号聚合自定义函数用前面定义的TransactionActivityScorer计算每个客户的活跃度得分再用自定义函数计算“交易时间离散度”标准差/均值识别睡眠客户。# 活跃度得分 activity_score df_clean.groupby(customer_id).apply(scorer) # 时间离散度 def time_dispersion(group_df): if len(group_df) 5: return np.nan dates pd.to_datetime(group_df[date]) intervals dates.diff().dt.days.dropna() return intervals.std() / intervals.mean() if intervals.mean() 0 else np.nan dispersion_score df_clean.groupby(customer_id).apply(time_dispersion)第五层高价值客户筛选业务规则引擎综合以上所有指标用规则引擎筛选Top 100规则1total_amount_cny 50万 CNY年化规则2txn_count 120笔年化

相关新闻