用Python手把手教你实现Apriori算法:从超市购物篮数据到关联规则实战
在零售行业,理解顾客购买行为模式是提升销售额的关键。想象一下,当顾客将啤酒和尿布放入购物车时,你是否能捕捉到这种看似不相关商品之间的隐藏联系?这正是关联规则分析的魅力所在。本文将带你从零开始,用Python实现经典的Apriori算法,通过超市购物篮数据挖掘商品间的有趣关联。
1. 关联规则与Apriori算法基础
关联规则分析是数据挖掘中用于发现大型数据集中变量间有趣关系的技术。最著名的例子当属"啤酒与尿布"的故事——超市发现购买尿布的年轻父亲们常常会顺便购买啤酒。这种分析的核心指标包括:
- 支持度(Support):项集在所有交易中出现的频率
- 置信度(Confidence):在包含X的交易中,也包含Y的条件概率
- 提升度(Lift):规则中X和Y的相关性度量
Apriori算法基于"频繁项集的所有子集也必须是频繁的"这一先验性质,通过逐层搜索的迭代方法发现频繁项集。其核心流程可分为三步:
- 生成候选项集
- 计算候选项集支持度并筛选
- 基于频繁项集生成关联规则
2. 数据准备与预处理
在开始算法实现前,我们需要准备合适的超市购物篮数据。以下是一个简化的交易数据集示例:
dataset = [ ['牛奶', '面包', '尿布'], ['可乐', '面包', '尿布', '啤酒'], ['牛奶', '尿布', '啤酒', '鸡蛋'], ['面包', '牛奶', '尿布', '啤酒'], ['面包', '牛奶', '尿布', '可乐'] ]提示:实际应用中,数据通常来自数据库或CSV文件。可以使用pandas的read_csv函数加载,然后转换为算法所需的列表格式。
对于更复杂的数据,我们可能需要预处理步骤:
import pandas as pd def load_and_preprocess(filepath): # 读取原始数据 raw_data = pd.read_csv(filepath) # 按交易ID分组,生成交易列表 transactions = raw_data.groupby('transaction_id')['item'].apply(list).values.tolist() return transactions3. Apriori算法核心实现
3.1 生成初始候选项集
首先实现createC1函数,用于生成大小为1的所有候选项集:
def createC1(dataset): """生成初始候选项集(大小为1的项集)""" C1 = [] for transaction in dataset: for item in transaction: if [item] not in C1: C1.append([item]) # 使用frozenset以便后续作为字典键使用 return list(map(frozenset, C1))3.2 扫描数据集计算支持度
接下来实现scanD函数,用于筛选满足最小支持度的项集:
def scanD(D, Ck, min_support): """扫描数据集,计算候选项集的支持度""" ss_cnt = {} for tid in D: for can in Ck: if can.issubset(tid): ss_cnt[can] = ss_cnt.get(can, 0) + 1 num_items = float(len(D)) ret_list = [] support_data = {} for key in ss_cnt: support = ss_cnt[key] / num_items if support >= min_support: ret_list.insert(0, key) support_data[key] = support return ret_list, support_data3.3 生成更高阶候选项集
实现aprioriGen函数,用于根据频繁k-1项集生成候选k项集:
def aprioriGen(Lk, k): """根据频繁(k-1)项集生成候选k项集""" ret_list = [] len_Lk = len(Lk) for i in range(len_Lk): for j in range(i+1, len_Lk): L1 = list(Lk[i])[:k-2] L2 = list(Lk[j])[:k-2] L1.sort() L2.sort() # 如果前k-2个项相同,则合并 if L1 == L2: ret_list.append(Lk[i] | Lk[j]) return ret_list3.4 完整Apriori算法实现
整合上述函数,实现完整的Apriori算法:
def apriori(dataset, min_support=0.5): """Apriori算法主函数""" C1 = createC1(dataset) D = list(map(set, dataset)) L1, support_data = scanD(D, C1, min_support) L = [L1] k = 2 while len(L[k-2]) > 0: Ck = aprioriGen(L[k-2], k) Lk, supK = scanD(D, Ck, min_support) support_data.update(supK) L.append(Lk) k += 1 return L, support_data4. 从频繁项集生成关联规则
有了频繁项集后,我们可以进一步挖掘关联规则:
def generate_rules(L, support_data, min_confidence=0.7): """从频繁项集生成关联规则""" big_rule_list = [] for i in range(1, len(L)): for freq_set in L[i]: H1 = [frozenset([item]) for item in freq_set] if i > 1: rules_from_conseq(freq_set, H1, support_data, big_rule_list, min_confidence) else: calc_confidence(freq_set, H1, support_data, big_rule_list, min_confidence) return big_rule_list def calc_confidence(freq_set, H, support_data, brl, min_confidence=0.7): """计算规则置信度并筛选""" prunedH = [] for conseq in H: conf = support_data[freq_set] / support_data[freq_set - conseq] if conf >= min_confidence: brl.append((freq_set - conseq, conseq, conf)) prunedH.append(conseq) return prunedH def rules_from_conseq(freq_set, H, support_data, brl, min_confidence=0.7): """生成候选规则集""" m = len(H[0]) if len(freq_set) > (m + 1): Hmp1 = aprioriGen(H, m + 1) Hmp1 = calc_confidence(freq_set, Hmp1, support_data, brl, min_confidence) if len(Hmp1) > 1: rules_from_conseq(freq_set, Hmp1, support_data, brl, min_confidence)5. 实际应用与结果分析
让我们用完整流程分析示例数据集:
# 加载数据 dataset = [ ['牛奶', '面包', '尿布'], ['可乐', '面包', '尿布', '啤酒'], ['牛奶', '尿布', '啤酒', '鸡蛋'], ['面包', '牛奶', '尿布', '啤酒'], ['面包', '牛奶', '尿布', '可乐'] ] # 设置最小支持度为60%,最小置信度为80% min_support = 0.6 min_confidence = 0.8 # 运行Apriori算法 L, support_data = apriori(dataset, min_support) # 生成关联规则 rules = generate_rules(L, support_data, min_confidence) # 打印结果 print("发现的关联规则:") for rule in rules: print(f"{tuple(rule[0])} => {tuple(rule[1])} (置信度: {rule[2]:.2f})")典型输出可能如下:
发现的关联规则: ('尿布',) => ('牛奶',) (置信度: 0.80) ('牛奶',) => ('尿布',) (置信度: 1.00) ('尿布',) => ('面包',) (置信度: 0.80) ('面包',) => ('尿布',) (置信度: 1.00) ('啤酒',) => ('尿布',) (置信度: 1.00) ('尿布', '牛奶') => ('面包',) (置信度: 0.80)这些结果可以转化为实际的商业决策:
- 将牛奶和尿布摆放在相邻货架
- 对面包和尿布进行捆绑促销
- 为购买啤酒的顾客推荐尿布折扣
6. 性能优化与实用技巧
原始Apriori算法在处理大型数据集时可能效率较低,以下是几种优化策略:
基于哈希的优化:
def scanD_hash(D, Ck, min_support, hash_table_size=100): """使用哈希表优化支持度计算""" hash_table = [0] * hash_table_size for tid in D: for can in Ck: if can.issubset(tid): # 简单的哈希函数示例 h = hash(can) % hash_table_size hash_table[h] += 1 # ...其余部分与原始scanD类似并行计算优化:
from multiprocessing import Pool def parallel_scanD(args): """并行计算支持度的辅助函数""" D_chunk, Ck = args ss_cnt = {} for tid in D_chunk: for can in Ck: if can.issubset(tid): ss_cnt[can] = ss_cnt.get(can, 0) + 1 return ss_cnt def scanD_parallel(D, Ck, min_support, num_processes=4): """并行版本的支持度计算""" chunk_size = len(D) // num_processes chunks = [(D[i*chunk_size:(i+1)*chunk_size], Ck) for i in range(num_processes)] with Pool(num_processes) as p: results = p.map(parallel_scanD, chunks) # 合并结果 ss_cnt = {} for res in results: for key in res: ss_cnt[key] = ss_cnt.get(key, 0) + res[key] # ...其余部分与原始scanD相同其他实用技巧:
- 对数据进行适当采样以减少计算量
- 使用更高效的数据结构如位图表示项集
- 考虑使用FP-growth等替代算法处理超大规模数据
7. 算法局限性与替代方案
虽然Apriori算法直观易懂,但也存在一些局限性:
| 局限性 | 说明 | 解决方案 |
|---|---|---|
| 多次扫描数据集 | 需要反复扫描数据计算支持度 | 使用FP-growth等只需扫描两次的算法 |
| 候选项集爆炸 | 当频繁1项集很多时,候选项集数量急剧增加 | 设置更高的最小支持度或使用垂直数据格式 |
| 处理稀疏数据效率低 | 对于稀疏数据集效率不高 | 使用压缩技术或基于图的方法 |
FP-growth是Apriori的流行替代方案,它使用称为FP树的数据结构,通常比Apriori快一个数量级。以下是简单对比:
from mlxtend.frequent_patterns import fpgrowth # 使用mlxtend库实现FP-growth def fpgrowth_analysis(transactions, min_support): # 需要先将数据转换为one-hot编码格式 te = TransactionEncoder() te_ary = te.fit(transactions).transform(transactions) df = pd.DataFrame(te_ary, columns=te.columns_) # 运行FP-growth return fpgrowth(df, min_support=min_support, use_colnames=True)在实际项目中,根据数据规模和特点选择合适的算法至关重要。对于教学和理解关联规则基本原理,Apriori仍是绝佳选择;而对于生产环境中的大规模数据,FP-growth或基于采样的方法可能更为合适。