1. 项目概述:为什么“多维聚合”不是Pandas进阶技巧,而是业务分析的生存技能
我在银行风控部门干了七年,从刚毕业写SQL查数的分析师,到带三个人小团队做反欺诈模型的数据架构师。这七年里,我亲手重构过四套核心报表系统,也给二十多个业务部门做过数据赋能培训。最常被问到的问题不是“怎么建模”,而是:“老师,这个指标能不能按客户+产品+时间三个维度一起算?现在要跑七次SQL,每次等十分钟,领导在催……”——这句话背后,藏着一个被严重低估的事实:绝大多数真实业务问题,天然就是多维、动态、带上下文的。它不关心你学没学过groupby,只关心你能不能在下午三点前,把“华东区高净值客户在Q3对理财子产品的月度交易频次+单笔均值+波动率+滚动三个月趋势”这张表,干净利落地塞进领导的PPT里。
这就是Part 20的核心价值。它不是教你怎么用agg()函数,而是告诉你:当财务总监问“上个月哪些区域的餐饮类商户手续费收入环比跌了超15%,但交易笔数却涨了?”时,你脑子里该闪过的不是“先groupby区域,再groupby品类,再merge……”,而是一条链式操作:groupby(['region', 'category']).agg({'fee': 'sum', 'count': 'sum'}).pct_change().unstack().query('fee < -0.15 & count > 0')。这种思维切换,才是从“数据搬运工”到“业务解题者”的分水岭。关键词里的“Towards AI”,恰恰点出了本质——这不是纯技术教程,而是AI时代下,数据从业者必须掌握的“业务语义翻译能力”。你面对的从来不是冰冷的DataFrame,而是客户经理手里的客户名单、风控官屏幕上的异常预警、运营总监白板上画的转化漏斗。本文所有案例,都来自我经手的真实场景:某股份制银行信用卡中心的欺诈模式识别、某城商行普惠金融部的小微贷风险敞口监控、某互联网券商的客户资产配置健康度分析。没有玩具数据,只有带着业务疤痕的真实数据流。如果你正被重复性聚合、手工拼表、临时加维度的需求压得喘不过气,或者总在“这个指标明明能算,但代码又臭又长还容易错”的泥潭里打转,那接下来的内容,就是你急需的那把瑞士军刀。
2. 核心思路拆解:为什么“一次聚合,多维输出”是生产环境的黄金标准
2.1 拒绝“分步计算+手动合并”的原始社会操作
刚入行时,我也习惯把一个复杂指标拆成七八个独立步骤。比如算“各分行零售贷款客户的平均年龄、逾期率、近3个月放款总额、首贷占比”,我会先写四个SQL:一个GROUP BY branch, product算年龄均值,一个算逾期率,一个算放款总额,一个算首贷人数/总人数……最后用Excel或Python的merge()硬凑。结果呢?第一,耗时翻倍——数据库要跑四次全表扫描,网络IO来回四趟;第二,逻辑割裂——每个SQL的WHERE条件稍有不同(比如逾期率要过滤已结清贷款,放款总额要包含当月未放款申请),合并后数据就对不上;第三,维护地狱——业务方突然说“把‘近3个月’改成‘近6个月’”,你得改四份代码,漏改一份,报表就崩。我在某次季度汇报前夜,就因为漏改了一个SQL的时间窗口,导致全行逾期率报表虚高12%,被叫去开了个长达两小时的“数据质量复盘会”。那次之后,我逼自己彻底重构所有聚合逻辑。
Pandas的agg()字典映射,就是为终结这种低效而生。它的底层原理很简单:在一次分组迭代中,对每个分组内的数据块,同步执行多个聚合函数,共享同一份内存数据视图。这意味着CPU不用反复加载、解析、过滤同一份数据,I/O压力直接砍掉70%以上。更重要的是,它强制你把所有业务规则“声明式”地写在一起,像一份契约——{'age': 'mean', 'overdue_rate': lambda x: (x == 'Y').mean(), 'loan_amt_3m': 'sum', 'first_loan_pct': lambda x: (x == 'Y').sum() / len(x)}。这份契约清晰告诉所有人:这些指标基于完全相同的分组逻辑和数据切片。当业务规则变更时,你只需在一个地方修改,所有指标自动同步更新。这不仅是效率问题,更是数据可信度的生命线。
2.2 “多维聚合”不是技术炫技,而是业务问题的自然映射
业务问题从来不是一维的。想想看:
- 风控总监不会问“全行逾期率多少”,他会问:“华东区、A类客户、信用贷产品、近90天内首次逾期的客户,其平均逾期天数和二次逾期概率是多少?”
- 运营总监不会问“用户活跃度”,他会问:“新注册用户中,完成实名认证且首笔交易在72小时内发生的用户,在30天内的复购率、ARPU值、以及其推荐好友的转化率分别是多少?”
这些问题天然携带至少3-4个维度标签(区域、客群、产品、时间窗口、行为序列)。强行用一维groupby硬拆,就像用一把螺丝刀去修汽车发动机——工具没错,但完全违背问题本身的结构。groupby(['region', 'customer_tier', 'product_type'])不是代码技巧,而是对业务实体关系的忠实建模。它让代码成为业务逻辑的镜像,而不是对业务的拙劣翻译。我在重构某银行财富管理部的客户分层模型时,把原来分散在12个脚本里的客户标签计算,统一收束到一个groupby(['cust_id', 'month_end_date'])的聚合链中。结果不仅运行时间从47分钟缩短到8分钟,更关键的是,当监管要求新增“绿色金融产品持有情况”标签时,我只在agg字典里加了一行'green_product_holding': lambda x: (x > 0).any(),整个体系就完成了升级。这种可扩展性,是任何“分步计算”永远无法企及的。
2.3 为什么“自定义函数”比内置函数更重要?因为业务规则永远比数学函数复杂
Pandas内置的'sum'、'mean'、'std'覆盖了80%的统计需求,但剩下的20%,恰恰是决定分析成败的关键。比如:
- 风险加权平均:银行计算贷款组合收益率,不能简单用
mean(),必须按每笔贷款余额加权:“大额贷款的收益波动,对整体组合的影响远大于小额贷款”。 - 分位数截断:分析客户消费能力时,
median()比mean()抗干扰,但极端高净值客户(如单笔消费500万)仍会扭曲分位数,需要先剔除Top 0.1%再计算。 - 状态机聚合:识别“高潜力流失客户”,需判断“过去30天登录≥5次,但最近7天无任何交易,且账户余额>5万元”。
这些规则无法用字符串参数表达,必须用Python函数实现。而lambda和def函数的区别,就是“能用”和“能维护”的分界线。lambda x: x.max() - x.min()适合一行逻辑,但一旦涉及条件分支、多步计算、异常处理,就必须用def。我在写某券商的客户资产健康度评分函数时,最初用lambda,后来发现要处理空值、负值、极值,代码迅速膨胀到20行,可读性归零。重构成def asset_health_score(series)后,我加了详细的docstring说明业务背景(“此分数用于触发客户经理主动服务,阈值设定依据2023年客户回访数据,得分<30需48小时内触达”),并用@lru_cache缓存中间结果。半年后新同事接手,五分钟就看懂了逻辑,还顺手优化了缓存策略。好的自定义函数,是写给未来自己的说明书,不是写给当前机器的指令集。
3. 核心细节解析与实操要点:从语法到生产级健壮性的跨越
3.1 多列多函数聚合:别再被“层级列名”搞晕,这是你的数据资产目录
当你执行df.groupby('category').agg({'amount': ['mean', 'median'], 'fee': ['min', 'max']}),输出是一个MultiIndex DataFrame,列名是二维的:外层是原始列名(amount,fee),内层是聚合函数名(mean,median...)。新手常在这里卡壳,抱怨“取数太麻烦”。但真相是:这个层级结构不是bug,而是feature——它是你数据资产的天然分类目录。
看一个真实案例:某支付机构要向监管报送《商户风险画像报告》,要求包含“各行业商户的交易金额中位数、手续费率区间、单日最大交易笔数”。用传统方式,你要建三张表。用层级列名,一张表搞定:
risk_report = df.groupby('industry').agg({ 'txn_amount': 'median', 'fee_rate': ['min', 'max'], 'daily_txn_count': 'max' }) # 输出列: txn_amount -> median | fee_rate -> min | fee_rate -> max | daily_txn_count -> max这时,risk_report['txn_amount']['median']就是你要的“各行业交易金额中位数”,清晰、无歧义。如果强行reset_index()或flatten(),反而丢失了语义关联。我的经验是:在分析阶段,拥抱层级列名;在交付阶段,按需展平。展平也有讲究,别用笨办法:
# ❌ 错误:手动拼接字符串,易错且难维护 risk_report.columns = ['_'.join(col).strip() for col in risk_report.columns] # ✅ 正确:用pandas原生方法,保留语义 risk_report.columns = risk_report.columns.map('_'.join) # 结果: txn_amount_median, fee_rate_min, fee_rate_max, daily_txn_count_max更进一步,你可以用rename()给业务含义更强的名字:
risk_report = risk_report.rename(columns={ 'txn_amount_median': 'industry_med_txn_amt', 'fee_rate_min': 'industry_min_fee_rate', # ... 其他 })这一步看似微小,却让下游使用者(业务方、BI工程师)一眼看懂字段含义,避免因命名歧义导致的分析错误。我在某次跨部门协作中,就因为一个字段叫avg_amt没注明是“交易金额均值”还是“单笔手续费均值”,导致市场部和风控部的结论完全相反,白白浪费了三天排查时间。
3.2 自定义函数的三大生死线:空值、类型、性能
写自定义函数,90%的线上事故源于这三个坑。我用血泪教训总结出“三不原则”:
不假设输入非空
生产数据总有意外。某次凌晨两点,监控报警显示聚合任务失败,日志里赫然写着TypeError: cannot perform reduce with flexible type。排查发现,某个新接入的渠道,其transaction_amount字段被错误地存成了字符串"N/A"。mean()函数遇到字符串会报错,但lambda x: x.max() - x.min()却默默返回NaN,直到下游计算时才崩溃。解决方案:所有自定义函数第一行必须做类型清洗和空值防御:def safe_range(series): # 强制转数值,错误值设为NaN series = pd.to_numeric(series, errors='coerce') # 剔除NaN后,若剩余数据不足2个,返回NaN(range需至少2点) if series.dropna().shape[0] < 2: return np.nan return series.max() - series.min()
不忽略数据类型差异
int64和float64的聚合结果精度不同,object类型(字符串)的聚合可能返回意外结果。某次计算客户“平均持仓天数”,因原始数据中混入了"-"占位符,mean()返回NaN,而'mean'字符串参数却返回0.0(pandas内部将-转为0),导致全行客户持仓天数被低估。永远用pd.to_numeric(..., errors='coerce')显式转换,而非依赖pandas的隐式推断。
不写O(n²)的慢函数
在groupby中,函数会被调用n次(n=分组数),每次处理一个分组。若函数内部有嵌套循环,性能会指数级恶化。例如,计算“客户交易金额的移动标准差”,有人会写:# ❌ 千万别这么写!每次调用都要重算整个窗口 def slow_moving_std(series): result = [] for i in range(len(series)): window = series[max(0, i-2):i+1] # 3点窗口 result.append(window.std()) return pd.Series(result).iloc[-1] # 只取最后一点正确做法是利用pandas内置的
rolling(),它经过C语言优化:# ✅ 正确:利用向量化操作 def fast_moving_std(series): return series.rolling(window=3).std().iloc[-1]我在处理某保险公司的保单数据(单表1.2亿行)时,用慢函数版本聚合耗时42分钟,换成向量化后仅需1.8分钟。性能差距,就是能否在T+1报表截止前交付的生死线。
3.3 滚动窗口的“三重门”:窗口大小、最小周期、边界处理
滚动窗口(rolling())是时间序列分析的基石,但它的陷阱比想象中深。以rolling(window=7)为例,它有三道必须跨过的门:
第一道门:窗口大小是业务决策,不是技术参数window=7意味着“过去7天”,但“7天”对不同业务意义迥异。对电商GMV,7天是合理的周度波动观察窗;对高频交易系统,7秒都嫌太长;对银行间拆借利率,7个交易日(约10个自然日)才匹配市场节奏。窗口大小必须由业务方拍板,而非数据工程师凭空设定。我在某基金公司的项目中,风控团队坚持用window=5(5个交易日),而投研团队要求window=20(约一个月)。最终我们达成妥协:提供两个版本,并在报表中标注清楚适用场景。记住:没有“正确”的窗口,只有“合适”的窗口。
第二道门:min_periods是生产环境的救命稻草
默认情况下,rolling(window=7)要求窗口内必须有7个有效值,否则返回NaN。但在真实世界,数据总有缺失——周末无交易、系统故障丢数据、新上线渠道初期数据稀疏。若不设min_periods,你会得到大片NaN,报表直接不可用。我的标准配置是:
# 对于7日窗口,允许最少3个点参与计算(覆盖大部分数据缺失场景) df['7day_avg'] = df.groupby('customer_id')['amount'].rolling( window=7, min_periods=3 # 关键! ).mean().reset_index(level=0, drop=True)min_periods=3意味着:只要过去7天里有3天有数据,就计算这3天的均值。这比min_periods=1(单点均值无意义)更稳健,也比min_periods=7(过于严苛)更实用。这个参数,是平衡数据严谨性与业务可用性的核心杠杆。
第三道门:边界处理决定分析可信度
滚动窗口的起始点如何处理?rolling()默认从第7行开始计算,前6行是NaN。业务方常问:“第一天的数据就没了?那周初的异常怎么发现?” 这时,你需要根据场景选择策略:
- 前向填充(ffill):适用于趋势平滑,如
df['7day_avg'] = df['7day_avg'].ffill()。但会掩盖初期波动。 - 用
expanding()替代:对于“累计至今”的场景,expanding()比rolling()更自然。 - 业务定制填充:某次为交易所做实时风控,要求“首日用当日值填充”,我们写了:
没有银弹方案,只有针对业务场景的定制化选择。把边界处理写进需求文档,和业务方确认签字,是避免上线后扯皮的唯一方法。rolling_series = df.groupby('symbol')['price'].rolling(window=7).mean() # 手动填充首6个NaN为当日价格 first_values = df.groupby('symbol')['price'].first() rolling_filled = rolling_series.fillna(first_values)
3.4 展开(Unstack)与重塑:让数据长出业务的眼睛
unstack()常被误解为“把行变列”的格式美化工具。错。它是让数据结构匹配人类认知模式的关键手术。业务人员看数据,天然用矩阵思维:行是主体(客户、区域、产品),列是维度(时间、指标、状态)。groupby(['region', 'product'])['revenue'].mean()返回的是Series,索引是MultiIndex,像一本没目录的厚书——你知道内容在,但找起来费劲。unstack()就是帮你生成目录:
# 原始结果(难读) # region product # North Widget 15000.0 # Gadget 12000.0 # South Widget 18000.0 # Gadget 14000.0 # unstack后(直观) # product Widget Gadget # region # North 15000 12000 # South 18000 14000这不仅仅是视觉优化。它让后续操作变得极其简单:
- 快速比较:
result['Widget'] > result['Gadget']直接得到各区域哪个产品更优。 - 跨列计算:
result['Widget'] / result['Gadget']算出产品比值。 - 无缝对接BI:Tableau、Power BI导入DataFrame时,会自动将unstack后的列识别为维度,无需额外配置。
但unstack()有两大雷区:
- 缺失值陷阱:若某区域没有某产品数据(如North无Gadget),
unstack()后该单元格为NaN。业务方看到NaN会质疑“数据丢了?”。解决方案:永远用fill_value参数明确告知缺失含义:result = df.groupby(['region', 'product'])['revenue'].mean().unstack(fill_value=0) # 明确表示:该区域该产品无销售,收入为0 - 层级错位:
unstack()默认展开最内层索引。若你groupby(['a','b','c']),unstack()会展开c。若想展开b,需指定level:
我在某次为零售集团做全国门店分析时,因没指定result = df.groupby(['a','b','c'])['val'].mean().unstack(level=1) # 展开'b'level,把“城市”维度错误地展开了,导致上海、北京被合并成一列,报表被业务方当场打回。从此,我的unstack()必加level参数,哪怕只有一层索引——这是刻进DNA的习惯。
4. 实操过程与核心环节实现:从零构建一个银行级客户交易分析流水线
4.1 数据准备:模拟真实世界的脏乱差
生产环境的数据,永远比教程里的干净。我们从构造一个“足够真实”的数据集开始,它包含所有常见痛点:
import pandas as pd import numpy as np from datetime import datetime, timedelta # 设置随机种子,保证可复现 np.random.seed(42) # 构造基础维度表(模拟主数据系统) regions = ['North', 'South', 'East', 'West'] products = ['CreditCard', 'Loan', 'WealthMgmt', 'Insurance'] customers = [f'C{str(i).zfill(3)}' for i in range(1, 501)] # 500个客户 categories = ['Groceries', 'Dining', 'Travel', 'Retail', 'Utilities'] # 生成交易数据(模拟OLTP系统) n_records = 50000 dates = pd.date_range('2023-01-01', '2023-12-31', freq='D') # 模拟周末/节假日交易量激增 date_weights = np.where(dates.weekday >= 5, 1.5, 1.0) # 周末权重1.5 date_weights = np.where(dates.month.isin([1, 2, 12]), 1.3, date_weights) # 年货节、春节权重1.3 # 随机抽样日期(考虑权重) sample_dates = np.random.choice(dates, size=n_records, p=date_weights/date_weights.sum()) # 构造数据框 data = { 'transaction_id': [f'TXN{str(i).zfill(6)}' for i in range(1, n_records+1)], 'date': sample_dates, 'customer_id': np.random.choice(customers, n_records), 'region': np.random.choice(regions, n_records), 'product': np.random.choice(products, n_records), 'category': np.random.choice(categories, n_records), # 金额:不同产品有不同量级(信用卡小额高频,贷款大额低频) 'amount': np.concatenate([ np.random.lognormal(8, 0.8, size=int(n_records*0.4)), # CreditCard: ~3000元均值 np.random.lognormal(11, 0.5, size=int(n_records*0.3)), # Loan: ~60000元均值 np.random.lognormal(9, 0.6, size=int(n_records*0.2)), # WealthMgmt: ~8000元均值 np.random.lognormal(7, 0.7, size=int(n_records*0.1)) # Insurance: ~1000元均值 ])[:n_records], # 手续费:按比例+固定值(模拟真实计费规则) 'fee': lambda x: x['amount'] * 0.025 + np.random.uniform(1, 5, n_records), # 添加1%的异常值(模拟数据录入错误) 'amount': lambda x: np.where(np.random.random(n_records) < 0.01, x['amount'] * 100, # 错误放大100倍 x['amount']) } df = pd.DataFrame(data) # 强制转换为数值,处理可能的类型混乱 df['amount'] = pd.to_numeric(df['amount'], errors='coerce') df['fee'] = pd.to_numeric(df['fee'], errors='coerce') # 添加10%的缺失值(模拟系统故障) mask = np.random.random(df.shape) < 0.1 df.loc[mask, ['amount', 'fee']] = np.nan print(f"原始数据形状: {df.shape}") print(f"缺失值统计:\n{df.isnull().sum()}") print(f"金额异常值(>100万)数量: {(df['amount'] > 1e6).sum()}")这段代码刻意引入了:
- 时间分布不均(周末、节假日权重更高)
- 量级差异(不同产品金额跨度达百倍)
- 数据污染(1%的金额错误、10%的缺失值)
- 类型混乱(需强制转换)
这才是你每天面对的真实战场。任何跳过数据清洗的分析,都是空中楼阁。
4.2 分析1:多维聚合——客户盈利性全景视图
业务需求:“请按客户、产品、区域三个维度,计算每个客户在每个产品、每个区域的平均交易金额、手续费率、交易频次,并识别出‘高价值低费率’客户(平均金额>5000且费率<2%)”。
# 步骤1:基础聚合(一次搞定所有指标) base_agg = df.groupby(['customer_id', 'product', 'region']).agg({ 'amount': 'mean', # 平均交易金额 'fee': lambda x: (x/x.shift(1)).mean() if len(x) > 1 else np.nan, # 费率均值(此处简化,实际应为fee/amount) 'transaction_id': 'count' # 交易频次 }).rename(columns={'transaction_id': 'txn_count'}) # 步骤2:计算真实手续费率(fee/amount),并处理除零 base_agg['fee_rate'] = base_agg['fee'] / base_agg['amount'] base_agg['fee_rate'] = np.where(base_agg['amount'] == 0, np.nan, base_agg['fee_rate']) # 步骤3:识别高价值低费率客户(业务规则落地) high_value_low_fee = base_agg[ (base_agg['amount'] > 5000) & (base_agg['fee_rate'] < 0.02) ].reset_index() print("高价值低费率客户清单(Top 10):") print(high_value_low_fee.sort_values('amount', ascending=False).head(10))关键技巧:
agg()字典中,'transaction_id': 'count'利用ID列计数,比len()更高效。- 费率计算放在聚合后,避免在agg中做复杂除法(影响性能)。
np.where()处理除零,比try-except更适合向量化操作。
4.3 分析2:自定义函数——动态风险敞口计算
业务需求:“计算每个客户在每个产品上的‘风险敞口’:若该客户在该产品上近30天有交易,则敞口=近30天交易金额总和;否则,敞口=该客户在该产品上的历史最高单笔金额”。
def dynamic_risk_exposure(group): """ 动态风险敞口计算 输入:按 customer_id, product 分组的子DataFrame 输出:标量(敞口值) """ # 获取最新交易日期 latest_date = group['date'].max() # 计算30天窗口起始日 window_start = latest_date - pd.Timedelta(days=30) # 筛选近30天交易 recent_txns = group[group['date'] >= window_start] if len(recent_txns) > 0: # 近30天有交易:求和 return recent_txns['amount'].sum() else: # 近30天无交易:取历史最高单笔 return group['amount'].max() if len(group) > 0 else 0 # 应用自定义函数(注意:groupby需包含date列) risk_exposure = df.groupby(['customer_id', 'product']).apply(dynamic_risk_exposure) risk_exposure = risk_exposure.reset_index(name='risk_exposure') print("动态风险敞口(Top 10):") print(risk_exposure.sort_values('risk_exposure', ascending=False).head(10))避坑心得:
- 函数内必须用
group['date'].max()获取分组内最新日期,不能用全局df['date'].max()。 recent_txns['amount'].sum()返回标量,符合apply()要求;若返回Series会报错。if len(group) > 0防御空分组,这是生产环境的铁律。
4.4 分析3:滚动窗口——客户行为漂移检测
业务需求:“识别‘行为漂移’客户:其近7天平均交易金额,较过去30天均值下降超40%,且下降持续超过3天”。
# 步骤1:按客户排序,确保时间顺序 df_sorted = df.sort_values(['customer_id', 'date']).copy() # 步骤2:计算两个滚动均值 df_sorted['7day_avg'] = df_sorted.groupby('customer_id')['amount'].rolling( window=7, min_periods=3 ).mean().reset_index(level=0, drop=True) df_sorted['30day_avg'] = df_sorted.groupby('customer_id')['amount'].rolling( window=30, min_periods=15 ).mean().reset_index(level=0, drop=True) # 步骤3:计算漂移比率 df_sorted['drift_ratio'] = df_sorted['7day_avg'] / df_sorted['30day_avg'] # 步骤4:标记漂移日(7day_avg < 30day_avg * 0.6) df_sorted['is_drift'] = df_sorted['drift_ratio'] < 0.6 # 步骤5:计算连续漂移天数(核心!) def count_consecutive(series): """计算连续True的天数""" # 将布尔转为0/1 s = series.astype(int) # 差分,找到连续段起点 diff = s.diff().fillna(1) # 累计求和,相同值代表同一连续段 group = (diff != 0).cumsum() # 每段内求和 return s.groupby(group).cumsum() df_sorted['consecutive_drift_days'] = df_sorted.groupby('customer_id')['is_drift'].apply(count_consecutive) # 步骤6:筛选满足条件的客户(连续漂移>=3天) drift_customers = df_sorted[df_sorted['consecutive_drift_days'] >= 3][ ['customer_id', 'date', '7day_avg', '30day_avg', 'consecutive_drift_days'] ].drop_duplicates('customer_id') print("行为漂移客户(持续3天以上):") print(drift_customers.head(10))技术亮点:
count_consecutive()函数是解决“连续事件计数”问题的通用模板,我在多个项目中复用。drop_duplicates('customer_id')确保每个客户只出现一次,避免重复告警。- 所有计算都在
df_sorted上进行,避免索引错位(这是rolling()最常见的错误)。
4.5 分析4:多级展开——构建业务友好的交叉分析表
业务需求:“生成一张‘区域×产品’的交叉表,展示各区域在各产品上的平均交易金额、手续费率、交易频次,并高亮显示‘金额>10000且费率<1.5%’的单元格”。
# 步骤1:多级聚合 cross_agg = df.groupby(['region', 'product']).agg({ 'amount': 'mean', 'fee': lambda x: (x/x.shift(1)).mean() if len(x) > 1 else np.nan, 'transaction_id': 'count' }).rename(columns={'transaction_id': 'txn_count'}) # 步骤2:计算费率 cross_agg['fee_rate'] = cross_agg['fee'] / cross_agg['amount'] cross_agg['fee_rate'] = np.where(cross_agg['amount'] == 0, np.nan, cross_agg['fee_rate']) # 步骤3:展开为交叉表(region为行,product为列) # 注意:agg返回的是MultiIndex Series,需先to_frame() cross_table = cross_agg['amount'].unstack(fill_value=0) fee_rate_table = cross_agg['fee_rate'].unstack(fill_value=np.nan) txn_count_table = cross_agg['txn_count'].unstack(fill_value=0) # 步骤4:应用样式(高亮) def highlight_high_value(s): """高亮函数:金额>10000且费率<1.5%""" # 创建一个与s同形的空DataFrame,填入CSS样式 styles = pd.DataFrame('', index=s.index, columns=s.columns) # 遍历每个单元格 for idx in s.index: for col in s.columns: # 获取对应费率 fee_val = fee_rate_table.loc[idx, col] if pd.notna(fee_val) and s.loc[idx, col] > 10000 and fee_val < 0.015: styles.loc[idx, col] = 'background-color: #d4edda; color: #155724;' return styles # 应用样式并导出HTML(供邮件/钉钉发送) styled_table = cross_table.style.apply(highlight_high_value, axis=None) # styled_table.to_html('region_product_analysis.html') # 取消注释可保存 print("区域×产品交叉分析表(金额):") print(cross_table.round(2)) print("\n高亮规则:金额>10000且费率<1.5%")实战价值:
unstack()后,cross_table、fee_rate_table、txn_count_table三张表索引完全对齐,可任意组合计算。style.apply()是生成可交付报表的利器,比写Excel宏简单十倍。- 高亮逻辑封装成函数,便于复用到其他维度(如
customer_id × product)。
5. 常见问题与排查技巧实录:那些让你半夜爬起来改代码的坑
5.1 “为什么我的agg结果全是NaN?”——空值传播的隐形杀手
现象:执行df.groupby('col').agg({'val': 'mean'}),结果全是NaN,但df['val'].describe()显示有大量有效值。
排查路径:
- 检查分组键是否有空值:
df['col'].isnull().sum()。groupby默认会丢弃分组键为空的行。若col列有100个NaN,它们被直接过滤,val列的有效值可能全在这些被丢弃的行里。 - 检查被聚合列的空值比例:
df.groupby('col')['val'].apply(lambda x: x.isnull().mean())。若某分组内`val