Python布隆过滤器实现

发布时间:2026/5/28 17:51:10

Python布隆过滤器实现 Python布隆过滤器实现 — 位数组/哈希函数/误判率/变体布隆过滤器是一种概率型数据结构用于判断一个元素是否可能在集合中或绝对不在集合中。它以牺牲一定的准确性来换取极低的内存占用。1. BloomFilter 基础实现import mathimport mmh3 # 第三方库提供murmurhash3算法from bitarray import bitarray # 高效的位数组实现class BloomFilter:标准布隆过滤器实现def __init__(self, n, false_positive_rate0.01):n: 预计要存储的元素数量false_positive_rate: 期望的误判率默认1%# 计算位数组大小m和哈希函数数量kself.size self._optimal_size(n, false_positive_rate)self.hash_count self._optimal_hash_count(self.size, n)self.bit_array bitarray(self.size)self.bit_array.setall(0) # 初始化所有位为0self.inserted_count 0 # 已插入元素计数def _optimal_size(self, n, p):根据预期元素数n和误判率p计算最优位数组大小return int(-n * math.log(p) / (math.log(2) ** 2))def _optimal_hash_count(self, m, n):根据位数组大小m和元素数n计算最优哈希函数数量return int(m / n * math.log(2))def _get_hash_values(self, item):使用双重哈希模拟多个哈希函数# 使用mmh3的两种seed来生成两个基哈希值h1 mmh3.hash(str(item), 0, signedFalse)h2 mmh3.hash(str(item), 1, signedFalse)# 通过线性组合生成k个不同的哈希值return [(h1 i * h2) % self.size for i in range(self.hash_count)]def add(self, item):向布隆过滤器中添加元素for idx in self._get_hash_values(item):self.bit_array[idx] 1self.inserted_count 1def __contains__(self, item):检查元素是否在过滤器中可能有误判for idx in self._get_hash_values(item):if self.bit_array[idx] 0:return False # 一定不存在return True # 可能存在有一定误判率def get_false_positive_rate(self):计算当前的误判率if self.size 0:return 0.0# p ≈ (1 - e^(-k*n/m))^kk self.hash_countm self.sizen self.inserted_countreturn (1 - math.exp(-k * n / m)) ** kdef __repr__(self):return (fBloomFilter(size{self.size}, hash_count{self.hash_count}, finserted{self.inserted_count}))2. 布隆过滤器的误判率分析# 误判率公式p ≈ (1 - e^(-k*n/m))^k# 其中# m 位数组大小bit数# n 已插入元素数量# k 哈希函数数量## 最优哈希函数数量k (m/n) * ln(2)# 最优位数组大小m -n * ln(p) / (ln(2))^2## 典型场景# p1%时每个元素约需9.6 bits# p0.1%时每个元素约需14.4 bits3. 使用示例与误判验证def demo_bloom_filter():演示布隆过滤器的使用和误判率bf BloomFilter(1000, 0.01) # 预计1000个元素1%误判率print(f创建过滤器: {bf})# 插入1000个元素for i in range(1000):bf.add(fitem_{i})print(f已插入: {bf.inserted_count} 个元素)# 测试已存在的元素应该都能找到false_negatives 0for i in range(1000):if fitem_{i} not in bf:false_negatives 1print(f假阴性不应出现: {false_negatives})# 测试不存在的元素可能出现误判false_positives 0test_count 10000for i in range(1000, 1000 test_count):if fitem_{i} in bf:false_positives 1actual_fpr false_positives / test_countprint(f误判数: {false_positives}/{test_count})print(f实际误判率: {actual_fpr:.4%})print(f理论误判率: {bf.get_false_positive_rate():.4%})4. 最优哈希函数数量计算器def bloom_optimizer(n, p):根据元素数和目标误判率计算最优参数m int(-n * math.log(p) / (math.log(2) ** 2))k int(m / n * math.log(2))bytes_per_elem m / 8 / nreturn {expected_elements: n,target_fpr: p,bit_array_size_bits: m,bit_array_size_bytes: m // 8,hash_functions: k,bytes_per_element: round(bytes_per_elem, 2)}params bloom_optimizer(1_000_000, 0.01)print(100万元素1%误判率的参数:)for k, v in params.items():print(f {k}: {v})5. 计数布隆过滤器Counting Bloom Filterclass CountingBloomFilter:计数布隆过滤器支持删除操作def __init__(self, n, false_positive_rate0.01):self.size int(-n * math.log(false_positive_rate) / (math.log(2) ** 2))self.hash_count int(self.size / n * math.log(2))# 使用数组存储计数器每个位置4位最大计数15self.counters [0] * self.sizedef _get_hashes(self, item):h1 mmh3.hash(str(item), 0, signedFalse)h2 mmh3.hash(str(item), 1, signedFalse)return [(h1 i * h2) % self.size for i in range(self.hash_count)]def add(self, item):for idx in self._get_hashes(item):if self.counters[idx] 15: # 防止溢出self.counters[idx] 1def remove(self, item):从计数布隆过滤器中删除元素for idx in self._get_hashes(item):if self.counters[idx] 0:self.counters[idx] - 1def __contains__(self, item):return all(self.counters[idx] 0for idx in self._get_hashes(item))6. 可扩展布隆过滤器Scalable Bloom Filterclass ScalableBloomFilter:可扩展布隆过滤器动态增长以适应更多元素def __init__(self, initial_capacity1000, error_rate0.01):self.filters []self.initial_capacity initial_capacityself.error_rate error_rateself._add_filter()def _add_filter(self):添加一个新的布隆过滤器# 每个新过滤器的误判率等比缩小保证整体误判率不变scale 0.8 ** len(self.filters)new_filter BloomFilter(self.initial_capacity * (2 ** len(self.filters)),self.error_rate * (1 - scale))self.filters.append(new_filter)def add(self, item):向最新的过滤器中添加元素self.filters[-1].add(item)# 如果最新的过滤器已满添加新的过滤器if (self.filters[-1].inserted_count self.initial_capacity * (2 ** (len(self.filters) - 1))):self._add_filter()def __contains__(self, item):for f in self.filters:if item in f:return Truereturn False7. 布隆过滤器的应用场景# 1. 缓存系统防止缓存穿透# 查询前先经布隆过滤器判断不存在的key直接返回避免查询数据库## 2. 垃圾邮件过滤器# 将已知垃圾邮件发件人地址存入布隆过滤器快速过滤## 3. 爬虫URL去重# 记录已爬取的URL避免重复爬取允许少量重复## 4. 推荐系统# 记录已推荐给用户的物品ID避免重复推荐## 5. 防止缓存穿透的典型代码class CacheWithBloom:使用布隆过滤器防止缓存穿透def __init__(self, cache, db, expected_keys100000):self.cache cacheself.db dbself.bloom BloomFilter(expected_keys, 0.01)def preload_bloom(self, keys):预热将数据库中的key加载到布隆过滤器for k in keys:self.bloom.add(k)def get(self, key):if key not in self.bloom: # 布隆过滤器说不在return None # 直接返回不查数据库value self.cache.get(key)if value is None:value self.db.query(key) # 查数据库if value:self.cache.set(key, value)return value

相关新闻