目录
1. 数据集导入观察
2. Apriori算法、FP-Growth算法 使用方式
3. 朴素 brute-force
4. grid_search 参数比较 + 三种算法比较
5. main 函数 与 报告生成
6. 实验结果总结
– 通过调整不同级别的支持度与置信度,比较 Apriori 算法、FP-Growth 算法以及一种进行穷举搜索的基准方法。
比较维度包括:生成的频繁项集数量、规则挖掘所用的存储空间、计算成本
– 发现一些有趣的关联规则,并就这些规则所揭示的洞察进行讨论。


1. 数据集导入观察
表格读入&信息查看
1import pandas as pd 2 3# 加载数据集 4df = pd.read_csv('Groceries.csv') 5 6print('数据基本信息:') 7df.info() 8 9# 查看数据集行数和列数 10rows, columns = df.shape 11 12print(df.head().to_csv(sep='\t', na_rep='nan'))
item列数据 拆分为单项,并从大到小排序。
1from collections import Counter 2 3# 提取物品并统计 4all_items = [] 5for item_str in df['items']: 6 # 去除大括号,按逗号分割 7 items = item_str.strip('{}').split(',') 8 all_items.extend([item.strip() for item in items]) 9 10# 统计词频 11item_counts = dict(Counter(all_items)) 12 13sorted_item_counts = dict(sorted(item_counts.items(), key=lambda x: x[1], reverse=True)) 14 15# 输出结果 16sorted_item_counts
{'whole milk': 2513, 'other vegetables': 1903, 'rolls/buns': 1809, 'soda': 1715,
'yogurt': 1372, 'bottled water': 1087 ……}
2. Apriori算法、FP-Growth算法 使用方式


rules 格式: A -> B 以及置信度 confidence。
(不输出支持度,因为支持度作为一个是否热门的阈值,关联性重点由置信度)
1# pip install apriori_python 2from apriori_python import apriori 3itemSetList = [['eggs', 'bacon', 'soup'], 4 ['eggs', 'bacon', 'apple'], 5 ['soup', 'bacon', 'banana']] 6freqItemSet, rules = apriori(itemSetList, minSup=0.5, minConf=0.5) 7print(freqItemSet, rules) 8 9# 输出频繁项 [{'eggs'}, {'eggs', 'bacon'}, {'soup'}, {'bacon', 'soup'}, {'bacon'}] 10# 输出 P(Y|X) 置信度 11# [[{'bacon'}, {'eggs'}, 0.6666], [{'bacon'}, {'soup'}, 0.6666], [{'eggs'}, {'bacon'}, 1.0], [{'soup'}, {'bacon'}, 1.0]]
1# pip install fpgrowth_py 2from fpgrowth_py import fpgrowth 3itemSetList = [['eggs', 'bacon', 'soup'], 4 ['eggs', 'bacon', 'apple'], 5 ['soup', 'bacon', 'banana']] 6freqItemSet, rules = fpgrowth(itemSetList, minSupRatio=0.5, minConf=0.5) 7print(freqItemSet, rules)
对数据集初步用 fp_growth
0.074支持率阈值时,{'whole milk'}, {'other vegetables'} 互为关联。
1from fpgrowth_py import fpgrowth 2import pandas as pd 3 4# 加载数据集 5df = pd.read_csv('Groceries.csv') 6 7# 提取物品 8all_items = [] 9for item_str in df['items']: 10 # 去除大括号,按逗号分割 11 items = item_str.strip('{}').split(',') 12 all_items.extend([[item.strip() for item in items]]) 13 14freqItemSet, rules = fpgrowth(all_items, minSupRatio=0.074, minConf=0) 15print(freqItemSet) 16print(rules)
[[{'whole milk'}, {'other vegetables'}, 0.293], [{'other vegetables'}, {'whole milk'}, 0.387]]
把 whole milk 记为 A,other vegetables 记为 B。
P(B|A) = 0.293 > 0.1935; P(A|B) = 0.387 > 0.256 均为促进作用。
3. 朴素 brute-force
1. 筛出频繁单项
1def brute_force_bitset(transactions: List[List[Any]], minSupRatio: float, minConf: float): 2 3 num_transactions = len(transactions) 4 5 # Step 1: 统计单项支持度并按 minSupRatio 过滤 6 # 使用向上取整来与常见库语义对齐(支持度 >= minSupRatio) 7 min_support_count = max(1, math.ceil(minSupRatio * num_transactions)) 8 item_count: Dict[Any, int] = {} 9 for tx in transactions: 10 # 去重防止同一事务重复计数同一物品 11 for item in set(tx): 12 item_count[item] = item_count.get(item, 0) + 1 13 14 # 筛选出频繁单项(达到最小支持度的物品) 15 kept_items: List[Any] = sorted([it for it, c in item_count.items() if c >= min_support_count]) 16 if not kept_items: 17 return [], [] # 如果没有频繁单项,直接返回空结果
2. 每个单项出现在哪些 transaction -> 交集支持度
每个单项 用长为 len(transaction) 的二进制表示出现在哪些小票上,多项就是与运算,“1”的个数。
因为前四名的商品出现频率 2513*1903*1809*1715/9835^4 = 0.0016
支持率不低于 0.0016时频繁项集只考虑大小为1、2、3.
1 # Step 2: 只对保留的频繁单项建立索引与位集编码 2 item_to_index: Dict[Any, int] = {item: idx for idx, item in enumerate(kept_items)} 3 index_to_item: List[Any] = kept_items 4 # 初始化位集列表,每个物品对应一个位集 5 item_bitsets: List[int] = [0] * len(kept_items) 6 for tx_idx, tx in enumerate(transactions): 7 bit = 1 << tx_idx # 为每个事务创建一个位掩码 8 for item in tx: 9 idx = item_to_index.get(item) 10 if idx is not None: # 只处理频繁物品 11 item_bitsets[idx] |= bit # 在位集中设置对应位 12 13 # 用于缓存频繁项集的支持度计数 14 freq_support_count: Dict[FrozenSet[int], int] = {} 15 16 # 所有频繁单项的索引列表 17 all_indices = list(range(len(kept_items))) 18 19 # Step 3: 按项集大小枚举所有可能的组合 20 # 从1项集开始,逐步增加到最大项集大小 21 for k in range(1, 4): 22 # 枚举所有k项组合 23 for combo in itertools.combinations(all_indices, k): 24 # 通过位集交集计算共同支持的事务 25 bits = (1 << num_transactions) - 1 # 初始化为全1掩码,表示所有事务 26 for idx in combo: 27 bits &= item_bitsets[idx] # 逐项求交集 28 29 # '1'的数量支持度计数 30 support_count = bits.bit_count() 31 # 检查是否达到最小支持度比例(添加小量避免浮点误差) 32 if (support_count / num_transactions) + 1e-12 >= minSupRatio: 33 freq_support_count[frozenset(combo)] = support_count
3. 关联规则生成。2~3项的频繁项集,拆成两部分,算条件概率。
在上一步提前存了 每个频繁项集支持度大小。
1 # Step 4: 关联规则生成 2 rules: List[Tuple[Tuple[FrozenSet[Any], FrozenSet[Any]], float, float, float]] = [] 3 # 预计算所有频繁项集的支持度比例,用于快速查找 4 idx_support_ratio: Dict[FrozenSet[int], float] = { 5 idx_set: supp / num_transactions for idx_set, supp in freq_support_count.items() 6 } 7 8 # 遍历所有频繁项集(至少包含2个物品的项集才能生成规则) 9 for idx_set, supp_ratio_union in idx_support_ratio.items(): 10 if len(idx_set) < 2: 11 continue # 单项集无法生成关联规则 12 13 idx_list = list(idx_set) 14 # 枚举所有非空真子集作为规则前件A 15 for r in range(1, len(idx_list)): # r从1到len-1,确保前后件都不为空 16 for lhs_indices in itertools.combinations(idx_list, r): 17 lhs = frozenset(lhs_indices) # 规则前件A 18 rhs = idx_set - lhs # 规则后件B 19 20 supp_lhs = idx_support_ratio[lhs] 21 supp_rhs = idx_support_ratio[rhs] 22 23 # 计算置信度 = P(B|A) = support(A∪B) / support(A) 24 confidence = supp_ratio_union / supp_lhs 25 if confidence + 1e-12 < minConf: # 添加小量避免浮点误差 26 continue # 置信度不达标,跳过 27 28 # 计算提升度 = confidence / support(B) = P(B|A) / P(B) 29 lift = confidence / supp_rhs if supp_rhs > 0 else 0.0 30 31 # 将索引转换回原始物品 32 lhs_items = frozenset(index_to_item[i] for i in lhs) 33 rhs_items = frozenset(index_to_item[i] for i in rhs) 34 35 # 添加到规则列表:((前件,后件), 支持度, 置信度, 提升度) 36 rules.append(((lhs_items, rhs_items), supp_ratio_union, confidence, lift)) 37 38 return len(freq_support_count), rules
4. grid_search 参数比较 + 三种算法比较
初始化传入交易列表;
1class AlgorithmComparator: 2 """算法比较器""" 3 4 def __init__(self, transactions): 5 self.transactions = transactions 6 self.results = [] 7 # 记录 Apriori 在不同支持度下的各项集大小计数 8 self.apriori_k_rows = []
对给定支持度、置信度,跑三个算法,记录结果。
1def grid_search(self, min_sup_ratios, min_confs): 2 """执行网格搜索""" 3 for min_sup_ratio in min_sup_ratios: 4 for min_conf in min_confs: 5 print(f"测试参数: min_sup_ratio={min_sup_ratio}, min_conf={min_conf}") 6 7 # 测试FP-Growth算法 8 fp_growth_result = self.test_algorithm('fpgrowth', min_sup_ratio, min_conf) 9 10 # 测试Apriori算法 11 apriori_result = self.test_algorithm('apriori', min_sup_ratio, min_conf) 12 13 # 测试优化穷举(位集)算法 14 bruteforce_result = self.test_algorithm('bruteforce', min_sup_ratio, min_conf) 15 16 # 保存结果 17 result = { 18 'min_sup_ratio': min_sup_ratio, 19 'min_conf': min_conf, 20 'fp_growth': fp_growth_result, 21 'apriori': apriori_result, 22 'bruteforce': bruteforce_result 23 } 24 self.results.append(result)
每个算法需要传出来记录的参数:
algorithm、freq、rules、time、memory
tracemalloc 记录空间、time.perf_counter() 计时
1def test_algorithm(self, algorithm: str, min_sup_ratio: float, min_conf: float): 2 3 # 1. 预处理:清理现有垃圾 4 gc.collect() 5 6 # 2. 开始内存追踪 7 tracemalloc.start() 8 start_time = time.perf_counter() 9 10 try: 11 if algorithm == 'fpgrowth': 12 freq_item_set, rules = fpgrowth(self.transactions, minSupRatio=min_sup_ratio, minConf=min_conf) 13 freq_itemsets_count = len(freq_item_set) if freq_item_set else 0 14 rules_count = len(rules) if rules else 0 15 print(f"FP-Growth 生成频繁项集数: {freq_itemsets_count}, 规则数: {rules_count}") 16 17 elif algorithm == 'apriori': 18 freq_item_set, rules = apriori(self.transactions, minSup=min_sup_ratio, minConf=min_conf) 19 freq_itemsets_count = sum(len(itemsets) for itemsets in freq_item_set.values()) 20 rules_count = len(rules) if rules else 0 21 22 # 收集Apriori特有的k项集数据 23 for k, itemsets in freq_item_set.items(): 24 try: 25 k_int = int(k) 26 except Exception: 27 k_int = k 28 self.apriori_k_rows.append( 29 {'min_sup_ratio': min_sup_ratio, 'k': k_int, 'count': int(len(itemsets))}) 30 print(f"Apriori 生成频繁项集数: {freq_itemsets_count}, 规则数: {rules_count}") 31 32 elif algorithm == 'bruteforce': 33 freq_itemsets_count, rules = brute_force_bitset(self.transactions, minSupRatio=min_sup_ratio, 34 minConf=min_conf) 35 rules_count = len(rules) if rules else 0 36 print(f"BruteForce-Bitset 生成频繁项集数: {freq_itemsets_count}, 规则数: {rules_count}") 37 38 else: 39 raise ValueError(f"不支持的算法: {algorithm}") 40 41 finally: 42 # 计算执行时间和内存使用 43 execution_time = time.perf_counter() - start_time 44 current_memory, peak_memory = tracemalloc.get_traced_memory() 45 tracemalloc.stop() 46 return { 47 'algorithm': algorithm, 48 'freq_itemsets_count': freq_itemsets_count, 49 'rules_count': rules_count, 50 'execution_time': execution_time, 51 'memory_usage': peak_memory / 1024 / 1024, # 转换为MB 52 }
5. main 函数 与 报告生成
原数据每个交易中,不同物品用逗号间隔,将其转换为列表形式。
(适合于apriori和fp-growth的输入)
1def main(): 2 # 加载数据集 3 df = pd.read_csv('Groceries.csv') 4 5 # 提取物品 6 all_items = [] 7 for item_str in df['items']: 8 items = item_str.strip('{}').split(',') 9 all_items.append([item.strip() for item in items]) 10 11 print(f"数据集大小: {len(all_items)} 条交易记录") 12 print(f"示例交易: {all_items[:3]}")
不同的参数 grid_search 并生成
1. k-频繁项集数量 2. 三种算法结果比较
1# 创建比较器 2comparator = AlgorithmComparator(all_items) 3 4# 定义参数网格 5min_sup_ratios = [0.01, 0.02, 0.05, 0.074, 0.075] 6min_confs = [0.0, 0.1, 0.25, 0.4, 0.55] 7 8# 执行网格搜索 9comparator.grid_search(min_sup_ratios, min_confs) 10 11# 生成报告表格 12report_df = comparator.generate_report() 13 14# 保存到CSV文件 15report_df.to_csv('algorithm_comparison_results.csv', index=False) 16 17# 生成 Apriori 的各k计数表并保存 18apriori_k_df = comparator.generate_apriori_k_report() 19apriori_k_df.to_csv('apriori_k_size_breakdown.csv', index=False)
生成结果表格:
1def generate_report(self): 2 """生成结果表格""" 3 report_data = [] 4 5 for result in self.results: 6 row = { 7 'min_sup_ratio': result['min_sup_ratio'], 8 'min_conf': result['min_conf'], 9 10 # FP-Growth 结果 11 'FP_Growth_Freq_Itemsets': result['fp_growth']['freq_itemsets_count'], 12 'FP_Growth_Rules': result['fp_growth']['rules_count'], 13 'FP_Growth_Time(s)': round(result['fp_growth']['execution_time'], 4), 14 'FP_Growth_Memory(MB)': round(result['fp_growth']['memory_usage'], 2), 15 16 # Apriori 结果 17 'Apriori_Freq_Itemsets': result['apriori']['freq_itemsets_count'], 18 'Apriori_Rules': result['apriori']['rules_count'], 19 'Apriori_Time(s)': round(result['apriori']['execution_time'], 4), 20 'Apriori_Memory(MB)': round(result['apriori']['memory_usage'], 2), 21 22 # BruteForce-Bitset 结果 23 'BF_Freq_Itemsets': result['bruteforce']['freq_itemsets_count'], 24 'BF_Rules': result['bruteforce']['rules_count'], 25 'BF_Time(s)': round(result['bruteforce']['execution_time'], 4), 26 'BF_Memory(MB)': round(result['bruteforce']['memory_usage'], 2), 27 } 28 report_data.append(row) 29 30 # 创建DataFrame 31 report_df = pd.DataFrame(report_data) 32 33 # 按支持度和置信度排序 34 report_df = report_df.sort_values(['min_sup_ratio', 'min_conf']).reset_index(drop=True) 35 36 return report_df 37 38def generate_apriori_k_report(self): 39 return pd.DataFrame(self.apriori_k_rows)
6. 实验结果总结
1. 如何计时?
末 - 初 不行,因为中间会有空间的调度和释放。
中间采样找峰值不行,因为如果算法本身执行实际时间很短,没来得及采样。
tracemalloc() 追踪 peak_memory。但是会影响运行速度,所以时间要分开记录。
2. 热门商品的核心关联模式
全脂牛奶(出现频率:2513次,占25.5%的交易)和其他蔬菜(出现频率:1903次,占19.3%的交易)作为数据集中最频繁的两个商品,在关联规则中占据主导地位。在置信度排名前31的规则中,规则的后件均为全脂牛奶或其他蔬菜。
3. 统计假象识别
需要谨慎区分真实的商业洞察与统计假象:
• 假关联案例:{soda} → {whole milk} (22.97%)的置信度低于全脂牛奶本身的支持度(25.5%),这表明该规则更多反映的是全脂牛奶的普遍性而非真实的关联关系。
• 基础商品效应:许多单商品指向全脂牛奶或其他蔬菜的高置信度规则,主要源于这两种商品在家用食材购买时,本身的高频次。
4. 有意义的关联规则识别(排名较高,条件概率相对单项翻倍)
1)早餐组合模式:
• {sausage} → {rolls/buns} (32.58%)
2)膳食搭配模式:
• {whole milk, other vegetables} → {root vegetables} (30.98%)
• {whole milk, other vegetables} → {yogurt} (29.76%)
3)乳制品组合模式:
• {whipped/sour cream} → {yogurt} (28.94%)
4)水果组合模式:
• {tropical fruit} → {yogurt} (27.91%)
• {pip fruit} → {tropical fruit} (27.02%)
5. 算法效率评估
由于观察到 0.01 支持度下,只有98个单项,并且四项集的概率一定小于0.0016,所以应用位运算的枚举法效率很高。
FP-growth 相对于 Apriori算法 进行空间换时间。
《复杂结构数据挖掘(三)关联规则挖掘实验》 是转载文章,点击查看原文。