python使用布隆过滤器

   日期:2020-08-20     浏览:114    评论:0    
核心提示:使用库pybloom_livefrom pybloom_live import ScalableBloomFilter,BloomFilter# 可自动伸缩的布隆过滤器bloom = ScalableBloomFilter(initial_capacity=100,error_rate=0.001)# 添加内容bloom.add(daqi)print(daqiin bloom)# 定长的布隆过滤器bloom1 = BloomFilter(capacity=10000)bloo

使用库pybloom_live

from pybloom_live import ScalableBloomFilter,BloomFilter

# 可自动伸缩的布隆过滤器
bloom = ScalableBloomFilter(initial_capacity=100,error_rate=0.001)

# 添加内容
bloom.add('daqi')
print('daqi'in bloom)

# 定长的布隆过滤器
bloom1 = BloomFilter(capacity=10000)
bloom1.add('daqi')
print('daqi'in bloom1)

手动实现一个简单的布隆过滤器

使用bitarray实现,将初始数组置为0,根据hash计算出节点置为1,同时写了一个生成随机码的函数用于测试。

import random
import mmh3
from bitarray import bitarray
import os.path
import re


# bitarray长度
BIT_SIZE = 50000

class BloomFilter():

    def __init__(self):
        bit_array = bitarray(BIT_SIZE)
        bit_array.setall(0)
        self.bit_array = bit_array
        self.bit_size = self.length()


    def get_points(self, url):
        """ 生成需要插入的位置 :param url: :return:节点的列表 """
        point_list = []
        for i in range(7):
            point = mmh3.hash(url,30+i) % self.bit_size
            point_list.append(point)
        return point_list

    def add(self, url):
        """ 添加url到bitarray中 :param url: :return: """
        res = self.bitarray_expand()
        points = self.get_points(url)
        try:
            for point in points:
                self.bit_array[point] = 1
            return '注册完成!'
        except Exception as e:
            return e

    def contains(self,url):
        """ 验证url是否存在 :param url: :return:True or False """
        points = self.get_points(url)
        # 在bitarray中查找对应的点,如果有一个点值为0就说明该url不存在
        for p in points:
            if self.bit_array[p] == 0:
                return False
        return True


    def count(self):
        """ 获取bitarrray中使用的节点数 :return: bitarray长度 """
        return self.bit_array.count()


    def length(self):
        """ 获取bitarray的长度 :return:bitarray的长度 """
        return len(self.bit_array)


    def bitarray_expand(self):
        """ 扩充bitarray长度 :return:bitarray的长度或使用率,布隆过滤器的bitarray的使用最好不要超过50%,这样误判率低一些 """
        isusespace = round(int(self.count()) / int(self.length()),4)
        if 0.50 < isusespace:
            # 新建bitarray
            expand_bitarray = bitarray(BIT_SIZE)
            expand_bitarray.setall(0)
            # 增加新建的bitarray
            self.bit_array = self.bit_array + expand_bitarray
            self.bit_size = self.length()
            return self.bit_size
        else:
            return f'长度尚可,{round(isusespace * 100,2)}%'



def get_captcha():
    """ 生成用于测试的随机码 :return: """
    seed = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
    captcha = ""
    for i in range(10):
        captcha += random.choice(seed)
    print(captcha)
    return captcha


if __name__ == '__main__':
    bloom = BloomFilter()
    for i in  range(100000):
        bloom.add(f'www.{get_captcha()}.com')
        print(bloom.length())
        print(bloom.count())
    print(bloom.count())

 
打赏
 本文转载自:网络 
所有权利归属于原作者,如文章来源标示错误或侵犯了您的权利请联系微信13520258486
更多>最近资讯中心
更多>最新资讯中心
0相关评论

推荐图文
推荐资讯中心
点击排行
最新信息
新手指南
采购商服务
供应商服务
交易安全
关注我们
手机网站:
新浪微博:
微信关注:

13520258486

周一至周五 9:00-18:00
(其他时间联系在线客服)

24小时在线客服