把布隆过滤器用起来 本文偏应用和代码实践,理论请参考本文末尾参考文章
简介 一句话简介: 过滤器,判断这个元素在与不在,不在则100%不在;在则去查询,确认在不在。
详细简介: BloomFilter,中文名称叫做布隆过滤器,是1970年由 Bloom 提出的,它可以被用来检测一个元素是否在一个集合中,它的空间利用效率很高,使用它可以大大节省存储空间。BloomFilter 使用位数组表示一个待检测集合,并可以快速地通过概率算法判断一个元素是否存在于这个集合中,所以利用这个算法我们可以实现去重效果。
它的优点是空间效率和查询时间都远远超过一般算法,缺点是有一定的误识别率和删除困难。
场景 1、大量爬虫数据去重
2、保护数据安全: 广告精确投放 :广告主通过设备id,计算hash算法,在数据包(数据提供方)中去查找,如果在存在,则证明该设备id属于目标人群,进行投放广告,同时保证设备id不泄露。数据提供方和广告主都没有暴露自己拥有的设备id。间接用户画像且不违数据安全法。详见:https://zhuanlan.zhihu.com/p/37847480
3、比特币网络转账确认
SPV节点:SPV是“Simplified Payment Verification”(简单支付验证)的缩写。中本聪论文简要地提及了这一概念,指出:不运行完全节点也可验证支付,用户只需要保存所有的block header就可以了。用户虽然不能自己验证交易,但如果能够从区块链的某处找到相符的交易,他就可以知道网络已经认可了这笔交易,而且得到了网络的多少个确认。
先去访问布隆过滤器,去判断交易记录是否在某个block(区块)里存在。从海量数据(十亿个区块,每个区块1-2M的交易记录,),快速得到结果。 详见:https://www.youtube.com/watch?v=uC6Q5m0SSQ0
4、分布式系统(Map-Reduce) 把大任务切分成块,分配和验证一个子任务是否在一个子系统上。
必要性 省空间,提升效率
我们首先来回顾一下 ScrapyRedis 的去重机制,它将 Request 的指纹存储到了 Redis 集合中,每个指纹的长度为 40,例如 27adcc2e8979cdee0c9cecbbe8bf8ff51edefb61 就是一个指纹,它的每一位都是 16 进制数。
让我们来计算一下用这种方式耗费的存储空间,每个 16 进制数占用 4b,1 个指纹用 40 个 16 进制数表示,占用空间为 20B,所以 1 万个指纹即占用空间 200KB,1 亿个指纹即占用 2G,所以当我们的爬取数量达到上亿级别时,Redis 的占用的内存就会变得很高,而且这仅仅是指纹的存储,另外 Redis 还存储了爬取队列,内存占用会进一步提高,更别说有多个 Scrapy 项目同时爬取的情况了。所以当爬取达到亿级别规模时 ScrapyRedis 提供的集合去重已经不能满足我们的要求,所以在这里我们需要使用一个更加节省内存的去重算法,它叫做 BloomFilter。
(内存版)Python实现的内存版布隆过滤器pybloom https://github.com/jaybaird/python-bloomfilter 安装:
该模块包含两个类实现布隆过滤器功能。BloomFilter 是定容。ScalableBloomFilter 可以自动扩容
使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 >>> from pybloom import BloomFilter>>> f = BloomFilter(capacity=1000 , error_rate=0.001 ) >>> f.add('Traim304' ) False >>> f.add('Traim304' ) True >>> 'Traim304' in f True >>> 'Jacob' in f False >>> len (f) 1 >>> f = BloomFilter(capacity=1000 , error_rate=0.001 ) >>> from pybloom import ScalableBloomFilter>>> sbf = ScalableBloomFilter(mode=ScalableBloomFilter.SMALL_SET_GROWTH)>>>
超过误报率时抛出异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 >>> f = BloomFilter(capacity=1000 , error_rate=0.0000001 )>>> for a in range (1000 ):... _ = f.add(a)... >>> len (a)Traceback (most recent call last): File "<stdin>" , line 1 , in <module> TypeError: object of type 'int' has no len () >>> len (f)1000 >>> f.add(1000 )False >>> f.add(1001 ) Traceback (most recent call last): File "<stdin>" , line 1 , in <module> File "/usr/local/lib/python2.7/site-packages/pybloom/pybloom.py" , line 182 , in add raise IndexError("BloomFilter is at capacity" ) IndexError: BloomFilter is at capacity
(持久化)手动实现的redis版布隆过滤器 大数据量,多用Redis持久化版本的布隆过滤器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 import hashlibimport redisimport sixclass MultipleHash (object ): '''根据提供的原始数据,和预定义的多个salt,生成多个hash函数值''' def __init__ (self, salts, hash_func_name="md5" ): self .hash_func = getattr (hashlib, hash_func_name) if len (salts) < 3 : raise Exception("请至少提供3个salt" ) self .salts = salts def get_hash_values (self, data ): '''根据提供的原始数据, 返回多个hash函数值''' hash_values = [] for i in self .salts: hash_obj = self .hash_func() hash_obj.update(self ._safe_data(data)) hash_obj.update(self ._safe_data(i)) ret = hash_obj.hexdigest() hash_values.append(int (ret, 16 )) return hash_values def _safe_data (self, data ): ''' python2 str === python3 bytes python2 uniocde === python3 str :param data: 给定的原始数据 :return: 二进制类型的字符串数据 ''' if six.PY3: if isinstance (data, bytes ): return data elif isinstance (data, str ): return data.encode() else : raise Exception("请提供一个字符串" ) else : if isinstance (data, str ): return data elif isinstance (data, unicode): return data.encode() else : raise Exception("请提供一个字符串" ) class BloomFilter (object ): '''''' def __init__ (self, salts, redis_host="localhost" , redis_port=6379 , redis_db=0 , redis_key="bloomfilter" ): self .redis_host = redis_host self .redis_port = redis_port self .redis_db = redis_db self .redis_key = redis_key self .client = self ._get_redis_client() self .multiple_hash = MultipleHash(salts) def _get_redis_client (self ): '''返回一个redis连接对象''' pool = redis.ConnectionPool(host=self .redis_host, port=self .redis_port, db=self .redis_db) client = redis.StrictRedis(connection_pool=pool) return client def save (self, data ): '''''' hash_values = self .multiple_hash.get_hash_values(data) for hash_value in hash_values: offset = self ._get_offset(hash_value) self .client.setbit(self .redis_key, offset, 1 ) return True def is_exists (self, data ): hash_values = self .multiple_hash.get_hash_values(data) for hash_value in hash_values: offset = self ._get_offset(hash_value) v = self .client.getbit(self .redis_key, offset) if v == 0 : return False return True def _get_offset (self, hash_value ): return hash_value % (2 **8 * 2 **20 * 2 *3 ) if __name__ == '__main__' : data = ["asdfasdf" , "123" , "123" , "456" ,"asf" , "asf" ] bm = BloomFilter(salts=["1" ,"2" ,"3" , "4" ],redis_host="172.17.0.2" ) for d in data: if not bm.is_exists(d): bm.save(d) print ("映射数据成功: " , d) else : print ("发现重复数据:" , d)
应用在scrapy-redis中 代码已经打包成了一个 Python 包并发布到了 PyPi,链接为:https://pypi.python.org/pypi/scrapy-redis-bloomfilter,因此我们以后如果想使用 ScrapyRedisBloomFilter 直接使用就好了,不需要再自己实现一遍。
我们可以直接使用Pip来安装,命令如下:
1 pip3 install scrapy-redis-bloomfilter
使用的方法和 ScrapyRedis 基本相似,在这里说明几个关键配置:
1 2 3 4 5 6 DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter" BLOOMFILTER_HASH_NUMBER = 6 BLOOMFILTER_BIT = 30
DUPEFILTER_CLASS 是去重类,如果要使用 BloomFilter需要将 DUPEFILTER_CLASS 修改为该包的去重类。
BLOOMFILTER_HASH_NUMBER 是 BloomFilter 使用的哈希函数的个数,默认为 6,可以根据去重量级自行修改。
BLOOMFILTER_BIT 即前文所介绍的 BloomFilter 类的 bit 参数,它决定了位数组的位数,如果 BLOOMFILTER_BIT 为 30,那么位数组位数为 2 的 30次方,将占用 Redis 128MB 的存储空间,去重量级在 1 亿左右,即对应爬取量级 1 亿左右。如果爬取量级在 10亿、20 亿甚至 100 亿,请务必将此参数对应调高。
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Spider 文件: from scrapy import Request, Spiderclass TestSpider (Spider ): name = 'test' base_url = 'https://www.baidu.com/s?wd=' def start_requests (self ): for i in range (10 ): url = self .base_url + str (i) yield Request(url, callback=self .parse) for i in range (100 ): url = self .base_url + str (i) yield Request(url, callback=self .parse) def parse (self, response ): self .logger.debug('Response of ' + response.url)
在 start_requests() 方法中首先循环 10 次,构造参数为 0-9 的 URL,然后重新循环了 100 次,构造了参数为 0-99 的 URL,那么这里就会包含 10 个重复的 Request,我们运行项目测试一下:
可以看到最后的输出结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 {'bloomfilter/filtered' : 10 , 'downloader/request_bytes' : 34021 , 'downloader/request_count' : 100 , 'downloader/request_method_count/GET' : 100 , 'downloader/response_bytes' : 72943 , 'downloader/response_count' : 100 , 'downloader/response_status_count/200' : 100 , 'finish_reason' : 'finished' , 'finish_time' : datetime.datetime(2017 , 8 , 11 , 9 , 34 , 30 , 419597 ), 'log_count/DEBUG' : 202 , 'log_count/INFO' : 7 , 'memusage/max' : 54153216 , 'memusage/startup' : 54153216 , 'response_received_count' : 100 , 'scheduler/dequeued/redis' : 100 , 'scheduler/enqueued/redis' : 100 , 'start_time' : datetime.datetime(2017 , 8 , 11 , 9 , 34 , 26 , 495018 )}
可以看到最后统计的第一行的结果:
1 'bloomfilter/filtered' : 10 ,
这就是 BloomFilter 过滤后的统计结果,可以看到它的过滤个数为 10 个,也就是它成功将重复的 10 个 Reqeust 识别出来了,测试通过。
原理 本文偏应用,难以描述的原理,最后说。 一个很长的二进制向量和一个映射函数。
参考资料 1、https://zhuanlan.zhihu.com/p/37847480 2、https://www.youtube.com/watch?v=uC6Q5m0SSQ0 3、《python3网络爬虫开发实战》崔庆才 4、https://www.jianshu.com/p/f57187e2b5b9