Skip to content

Instantly share code, notes, and snippets.

@carlosmcevilly
Last active August 30, 2018 08:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save carlosmcevilly/9966523 to your computer and use it in GitHub Desktop.
Save carlosmcevilly/9966523 to your computer and use it in GitHub Desktop.
A bloom filter implementation with auto-resizing buckets, and the ability to also store histograms.
#!/usr/local/bin/python3
"""
store and check items in a bloom filter
limitations:
- not optimized for speed.
- data structures on disk are not protected from multiple writers... so, use accordingly.
- some other rough edges... under development. example: no quiet option.
- mixing conventional bloom filter and vector treatment complicates things somewhat.
features:
- resizes filters on the fly without user intervention as more items are added.
- can store vectors in bloom filters.
- decent test coverage.
apologies:
- this was my first Python project so the coding conventions are... evolving.
"""
"""
todo
- auditing (optional, with 2nd filter or other method)
"""
import os
import sys
import math
import json
import bitstring
from operator import itemgetter
from operator import attrgetter
sys.path.append(os.environ['BLOOM_DIR'])
import hasher
VECTOR_DEFAULT_CAPACITY=509 # prime number just short of the number of bits in target bit array size for vectors
BLOOM_VERSION="0.5.0"
# global helper for printing to STDERR
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
class Filter(object):
DEFAULT_CAPACITY=200
COLLISION_PROBABILITY=0.0000001
#COLLISION_PROBABILITY=0.00000000000001 # doubles the disk space
def __init__(self, bucket, string, doc_id=None, capacity=None):
""" initialize a filter object """
file_name = "filter.bits"
self._configuration = None
self.isVector = False
self.bloom_dir = os.environ['BLOOM_DIR']
self.bucket_dir = os.path.join(self.bloom_dir, "buckets", bucket)
if not os.path.isdir(self.bucket_dir):
os.mkdir(self.bucket_dir)
if doc_id and capacity:
self.isVector = True
self.capacity = capacity
file_name = doc_id + '.bits'
else:
self.capacity = self.configuration['capacity']
self.error_rate = self.configuration['collision_probability']
self.bit_file = os.path.join(self.bucket_dir, file_name)
self.string_file = os.path.join(self.bucket_dir, "strings.txt")
self.string = string
self.initialize_hasher()
slots_used = self.configuration['slots_used']
if self.configuration['capacity'] <= slots_used:
self.upsize()
def get_byte_boundary(self, n):
mod = n % 8
if mod:
return n + (8 - mod)
else:
return n
def is_prime(self, n):
"""
We can get away with this super stupid brute force approach
because we are checking only this one number. That and the
fact that our input numbers are not very huge.
"""
if n < 2: return False
if 2 == n: return True
for i in range(2, int(n**0.5)+1):
if (n % i == 0):
return False
return True
def get_next_prime_brute(self, n):
if n < 3:
return 2
if n > 2 and n % 2 == 0:
n += 1
while not self.is_prime(n):
n += 2
return n
def get_sizes_for_minimum_bitsize(self, starting_bitsize):
hash_mod_prime = self.get_next_prime_brute(starting_bitsize)
actual_bit_size = self.get_byte_boundary(hash_mod_prime)
return int(actual_bit_size), int(hash_mod_prime)
def initialize_hasher(self):
if self.isVector:
(min_bitsize, num_hashes) = (VECTOR_DEFAULT_CAPACITY, 1)
else:
(min_bitsize, num_hashes) = self.calculate_bits_and_num_hashes()
(bit_size, hash_mod_size) = self.get_sizes_for_minimum_bitsize(min_bitsize)
self.bit_size = bit_size
self.hash_mod_size = hash_mod_size
self.num_hashes = num_hashes
self.bitarray = self.filter
@property
def configuration(self, isVector=False):
""" property accessor for configuration object """
if self._configuration:
return self._configuration
self._configuration = None
json_path = os.path.join(self.bucket_dir, 'config.json')
try:
with open(json_path, 'r', encoding='utf-8') as json_file:
text = json_file.read()
self._configuration=json.loads(text)
except FileNotFoundError:
eprint("file not found, will create")
capacity = self.DEFAULT_CAPACITY
if self.isVector:
capacity = VECTOR_DEFAULT_CAPACITY
self._configuration = {"version":BLOOM_VERSION, \
"capacity":capacity, \
"sorted":True, \
"slots_used":0, \
"collision_probability":self.COLLISION_PROBABILITY}
self.save_configuration()
return self._configuration
def save_configuration(self):
self._configuration['version'] = BLOOM_VERSION;
json_path = os.path.join(self.bucket_dir, 'config.json')
with open(json_path, 'w', encoding='utf-8') as json_file:
json_file.write(json.dumps(self._configuration, sort_keys=True, indent=2, separators=(',',':')) + "\n")
def upsize(self):
"""
Create a larger bit array and transfer all items to it
This is periodic housekeeping, called automatically when
needed (when our number of items to store exceeds the
capacity of our existing array). Only possible because we
take the rather inefficient step of storing to a strings
file every input string we add. This isn't intended to
be used with vector inputs.
"""
upsize_factor = 1.5
eprint("upsizing from capacity", self.capacity, "to capacity", int(self.capacity * upsize_factor))
self.capacity = int(self.capacity * upsize_factor)
self.configuration['capacity'] = self.capacity
self.configuration['bit_size'] = self.bit_size
self.configuration['mod_size'] = self.hash_mod_size
self.configuration['num_hashes'] = self.num_hashes
self.configuration['byte_size'] = int(self.bit_size / 8)
self.configuration['slots_used'] = 0
self.configuration['version'] = BLOOM_VERSION;
self.save_configuration()
bit_file_temp_filename = self.bit_file + ".previous"
os.rename(self.bit_file, bit_file_temp_filename)
strings = self.get_strings()
if self.string not in strings:
strings.append(self.string)
string_file_temp_filename = self.string_file + ".previous"
os.rename(self.string_file, string_file_temp_filename)
try:
for string in strings:
self.string = string
self.initialize_hasher()
self.update()
self.save_array()
os.unlink(bit_file_temp_filename)
os.unlink(string_file_temp_filename)
except OSError:
print("error: unable to upsize.")
os.rename(bit_file_temp_filename, self.bit_file)
os.rename(string_file_temp_filename, self.string_file)
def update(self):
""" Add an item """
if not self.isVector:
if not self.check():
self.bitarray.set(True, self.hash_producer)
self.increment_config_key('slots_used')
self.save_string()
elif not self.check():
self.bitarray.set(True, self.hash_producer)
self.save_configuration()
def increment_config_key(self, configKey):
if configKey in self.configuration:
self.configuration[configKey] += 1
else:
self.configuration[configKey] = 1
@property
def hash_producer(self):
return hasher.HashIterable(self.hash_mod_size, self.num_hashes, self.string)
def save_string(self):
""" append string to our string file. don't de-dupe at this point. """
try:
with open(self.string_file, 'a', encoding='utf-8') as string_file:
string_file.write(self.string + '\n')
except IOError:
print("Error! unable to write file", self.stringfile)
def get_strings(self):
""" Return current set of strings """
strings = []
try:
with open(self.string_file, 'r', encoding='utf-8') as string_file:
for line in string_file:
strings.append(line[:-1])
except IOError:
pass # It's ok if the file didn't exist; we will create it.
return self.uniq(strings)
# from Raymond Hettinger
# code.activestate.com/recipes/52560-remove-duplicates-from-a-sequence/
def uniq(self, alist):
set = {}
return [set.setdefault(e,e) for e in alist if e not in set]
def check(self):
""" Check whether the item is present """
return self.bitarray.all(True, self.hash_producer)
@property
def filter(self):
if not os.path.isfile(self.bit_file):
eprint("made new filter with capacity:", self.capacity)
return bitstring.BitArray(length=self.bit_size)
else:
return bitstring.BitArray(length=self.bit_size,filename=self.bit_file)
def save_array(self):
with open(self.bit_file, 'wb') as bit_file:
self.bitarray.tofile(bit_file)
def calculate_bits_and_num_hashes(self):
n = self.capacity # number of items
p = self.error_rate # probability_of_false_positives
m = math.ceil((n * math.log(p)) / math.log(1.0 / (math.pow(2.0, math.log(2.0)))))
k = math.ceil(math.log(2.0) * m / n);
return (m, k)
class VectorFilter(object):
def __init__(self, bucket):
""" initialize a filter object """
#file_name = "filter.bits"
self.bloom_dir = os.environ['BLOOM_DIR']
self.bucket = bucket
self.target = VECTOR_DEFAULT_CAPACITY
self.bucket_dir = os.path.join(self.bloom_dir, "buckets", bucket)
#self.bit_file = os.path.join(self.bucket_dir, file_name)
def format_vector(self, vector_as_string):
vector_as_string = vector_as_string.strip()
raw=vector_as_string.split(' ')
eprint("raw is [" + str(raw) + "]")
word = raw[0]
data = raw[1:]
formatted = '{"docid":"' + word + '","vector":['
item_num = 1
items = []
for value in data:
num_str = str(item_num)
item = '["item' + num_str + '",' + str(value) + ']'
items.append(item)
item_num += 1
# '{"docid":"1","vector":[["t1",1],["t2",4],["tN",2]]}'
formatted = formatted + ",".join(items) + ']}'
eprint(formatted)
return formatted
@property
def bits(self):
_bits = None
try:
with open(self.bit_file, 'rb') as bit_file:
_bits = bit_file.read()
except FileNotFoundError:
eprint("file not found")
return _bits
def save_vector(self, vector_as_string):
data=json.loads(vector_as_string)
docid = data['docid']
# Use a multi-key sort to boost according to the heuristic:
# - priority for boosting given to: values closer to zero,
# longer strings
# - if those are equal, we go alphabetically after that.
# There could be another level of sorting introduced instead
# of alphabetical, to add, to take textual data as an example,
# a preference for words containing unusual n-grams.
items = self.sort_by_magnitude_then_len_then_alpha(data['vector'])
keys = [items[i][0] for i in range(0, len(items))]
values = [items[i][1] for i in range(0, len(items))]
# normalize
#scale = 1 / sum(values)
scale = 1 / sum(abs(v) for v in values) # handle negative numbers
values = [f*scale for f in values]
# scale with fudging
scaled = self.values_to_fudged_counts(values)
"""
Set up a filter for this bucket and docid
Save n strings for each item, total is <target> strings.
Each string will have multiple hashes but that is taken
care of by the bloom filter. So here it's:
for item in range(0, len(items))
and
for count in range(0, int(scaled[i]))
but in the called code there's another level:
for hash..
Vector Hashing
=================
For vector documents, we set self.num_hashes to 1
because each hash is being done with a different token,
the result of concatenating an input feature with some
filler bits and a stringified counter incrementing up
to the normalized magnitude for the feature.
Setting half the bits intuitively would seem to (and as I
recall, has also been shown with mathematical analysis to,
though I don't have the reference... Canavan?) maximize
the information available not only about what data is
represented, but also what data is un- or under-
represented in the vector.
We create a new filter (bit array) for each and every
document (vector). For the moment, in the current
state as something of an experimental system, we
store these filters in files named by document ID
under the 'documents' folder, which itself is in
the buckets folder.
"""
#print("found items:", items)
#print("found keys:", keys)
#print("found vector:", vector)
#print("found scaled:", scaled)
filter = None
for i in range(0, len(items)):
# The inner for loop below is not the one-iteration-per-bit loop.
# That's at another level down, encapsulated in filter.update().
# print("doing word", keys[i], scaled[i], "times")
scaled_value = int(scaled[i])
filler = '^s^'
if scaled_value < 0:
# filler assigned according to sign gives varying hash input for
# positive versus negative values, differentiating them in the output.
filler = 'vsv'
scaled_value = abs(scaled_value)
for n in range(0, scaled_value): # do once for each count
string = keys[i] + filler + str(n)
filter = Filter(self.bucket, string, doc_id=docid, capacity=self.target)
filter.update()
filter.save_array()
filter.configuration['slots_used'] = 0
filter.save_configuration()
def non_zero_round(self, n):
""" round toward zero, but never all the way to zero, preserving sign """
negative = False
if n < 0:
negative = True
working_value = abs(n)
result = None
if int(working_value) < 1:
result = 1
else:
result = int(working_value)
if negative:
return result * -1
else:
return result
def values_to_fudged_counts(self, values):
"""
Convert a list of numeric values to a list of integers where the
absolute values of the values sum to a preset target integer value.
This is mathematically impossible for many integer lists (example:
3, 3, 3 cannot be fairly normalized to a sum of 10 while retaining
whole numbers), so we fudge by preferentially increasing or
decreasing the output counts of selected items.
In doing this, we start with the items with a value closest to zero,
on the theory that unusual items are likely to have semantic
significance. For items having the same value, we increase the output
counts for the longest ones first.
The caller should sort the input values list so that items we would
want boosted appear first in the list. Likewise items for which we
would accept decimation of their values, or even their very
presence in the data, should appear at the end of the values list.
The order of the list is preserved. Our calling code relies on this
being the case.
"""
target = self.target
total, chance, factor = 0, 0, 1
if not values: return []
#total = sum(values)
total = sum(abs(v) for v in values) # handle negative numbers
if not total: return []
# scale everything so it totals up to roughly the target
factor = target / total;
# 'scaled' gets a return list of scaled values with sign preserved
scaled = [self.non_zero_round(f*factor) for f in values]
# if target was overshot, decimate some counts. In our meaning
# of decimate, some lucky numbers whether positive or negative
# are potentially brought closer to zero.
scaled = self.decimate_values_to_target(scaled, target)
# if target was undershot, boost counts until we reach it. In our
# meaning of boost, the chosen few numbers get farther from zero.
scaled = self.boost_values_to_target(scaled, target)
return scaled
def boost_values_to_target(self, values, target):
boost = 0
prev_value = 0
# total = sum(values)
total = sum(abs(v) for v in values) # handle negative numbers
length = len(values)
if 0 == length:
return []
while total < target:
for pos in range(0, length):
#print("boosting", values[pos])
value = values[pos]
if abs(value) > abs(prev_value * 2):
boost += 1
prev_value = value
while total + boost > target:
boost -= 1
# 'boost' for us means 'move farther from zero in the current sign direction'...
if value > 0:
values[pos] += boost # ...whether positive
else:
values[pos] -= boost # ...or negative
# note we didn't need a case for == 0 because the non-zero
# round performed earlier ruled that out.
total += boost
if total == target:
break
return values
def decimate_values_to_target(self, values, target):
# total = sum(values)
total = sum(abs(v) for v in values) # handle negative numbers
length = len(values)
if 0 == length:
return []
while total > target:
for pos in range(length-1, -1, -1): # backwards
if values[pos] > 0:
values[pos] -= 1
else:
values[pos] += 1
total -= 1 # always decrement regardless of value's sign
if total <= target:
break
return values
def sort_by_magnitude_then_len_then_alpha(self, unsorted):
"""
Sort descending by magnitude, for which we use abs(count) as a proxy,
then by length of key, then alphabetically by key. See unit tests for
examples. We do the sorts in reverse order of significance.
"""
# alphabetical
mylist = sorted(unsorted, key=(itemgetter(0)))
# by length
mylist = sorted(mylist, key=lambda k : len(k[0]), reverse=True)
# by value
mylist = sorted(mylist, key=lambda k: abs(k[1]))
#mylist = sorted(mylist, key=(itemgetter(1)))
return mylist
def distance(self, other):
return (self.bits^other).count(True)
def features():
features = [BitArray('0b0000100001000010'),
BitArray('0b1110111011101110'),
BitArray('0b0010010010010010'),
BitArray('0b1010101010101010'),
BitArray('0b0011001100110011'),
BitArray('0b0001110001110001'),
BitArray('0b0000111100001111'),
BitArray('0b1000011110000111'),
BitArray('0b1100001111000011'),
BitArray('0b1110000111100001'),
BitArray('0b0001111000011110'),
BitArray('0b0011110000111100'),
BitArray('0b0111100001111000'),
BitArray('0b1100011100011100'),
BitArray('0b1001100110011001'),
BitArray('0b1100110011001100'),
BitArray('0b0011010100010100'),
BitArray('0b0010100010101100')]
return features
class UnexpectedZeroValueException(Exception):
pass
def main():
if len(sys.argv) < 2:
usage_exit()
if sys.argv[1].startswith('-'):
arg = sys.argv[1][1]
if len(sys.argv) < 4 or not legal_argument(sys.argv[1][1]):
usage_exit()
bucket = sys.argv[2]
string = sys.argv[3]
if arg == 'a':
filter = Filter(bucket, string)
filter.update()
filter.save_array()
elif arg == 'c':
filter = Filter(bucket, string)
if filter.check():
eprint("item", string, "found")
exit(1)
else:
eprint("item", string, "not found")
exit(0)
elif arg == 'd':
doc1 = string
doc2 = sys.argv[4] # both arg processing and feature untested
vfilter1 = VectorFilter(bucket)
#xxx
vfilter2 = VectorFilter(bucket)
distance = vfilter1.distance(vfilter2)
eprint("distance is [" + distance + "]\n")
elif arg == 'h':
vfilter = VectorFilter(bucket)
vfilter.save_vector(string)
elif arg == 'e':
vfilter = VectorFilter(bucket)
formatted = vfilter.format_vector(string)
vfilter.save_vector(formatted)
def legal_argument(arg):
legal_args = 'acev' # removed d, not implemented, for 'd'ifference
if arg in legal_args:
return True
return False
def usage_exit():
print("\nvec2bloom version " + BLOOM_VERSION)
print("\nUsage as a normal bloom filter:")
print(" " + sys.argv[0] + " -a bucket string # add a string to a bucket")
print(" " + sys.argv[0] + " -c bucket string # check a bucket for a string")
print("\nUsage for encoding a vector as a bloom filter:")
print(" " + sys.argv[0] + " -v bucket <JSON-like vector data (see below)>")
print(" " + sys.argv[0] + " -e bucket <embedded word data (see below)>")
# print(" " + sys.argv[0] + " -d bucket doc1 doc2")
# next line removed from after the -e line in the triple-quoted doc string
# -d <bucket> <doc1> <doc2>
print("""
Format details for vector options:
-v <bucket> '{"docid":"<an_id>","vector":[["<token1>",<value1>],["<token2>",<value2>]]}'
-e <bucket> 'word -0.001234 0.001234 0.002345 -0.003456 ...'
For the -v option, the words 'docid' and 'vector' are fixed literal
strings that need to be present; everything else is your strings and
data. The vector contains token:value pairs. Tokens are strings and
values can be signed integers or signed floating point numbers. An
example of vector data would be words and their counts, where the
words are the tokens and the counts are the values.
There is no rectification of negative values. Their magnitude is
just as significant as that of positive values, but in the opposite
direction. In other words the information is not thrown away, and
the values (for example) -0.5 and 0.5 are distinct and give different
results, as would be expected.
For the -e option, the data after the bucket name is a quoted string
containing a single line entry from a word2vec text-format data file.
Exit code:
After a check the exit errorlevel is 1 if the string is found, 0 if not.
""")
exit(-1)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
"""
calculate size and number of hashes for bloom filter
"""
import sys
import math
def is_prime(n):
"""
We can get away with this super stupid brute force approach
because we are checking only this one number. That and the
fact that our input numbers are not very huge.
"""
if n < 2:
return False
if 2 == n:
return True
for i in range(2, int(n**0.5)+1):
if (n % i == 0):
return False
return True
def calculate(number_of_items, probability_of_false_positives):
# m = ceil((n * log(p)) / log(1.0 / (pow(2.0, log(2.0)))));
# k = round(log(2.0) * m / n);
n = number_of_items
p = probability_of_false_positives
m = math.ceil((n * math.log(p)) / math.log(1.0 / (math.pow(2.0, math.log(2.0)))))
k = math.ceil(math.log(2.0) * m / n);
return (m, k)
def is_prime(self, n):
"""
We can get away with this super stupid brute force approach
because we are checking only this one number. That and the
fact that our input numbers are not very huge.
"""
if n < 2: return False
if 2 == n: return True
for i in range(2, int(n**0.5)+1):
if (n % i == 0):
return False
def get_next_prime_brute(n):
if n < 3:
return 2
if n > 2 and n % 2 == 0:
n += 1
while not is_prime(n):
n += 2
return n
def get_byte_boundary(n):
mod = n % 8
if mod:
return n + (8 - mod)
else:
return n
def get_sizes_for_minimum_bitsize(starting_bitsize):
hash_mod_prime = get_next_prime_brute(starting_bitsize)
actual_bit_size = get_byte_boundary(hash_mod_prime)
return int(actual_bit_size), int(hash_mod_prime)
def get_sizes_for_minimum_bitsize(starting_bitsize):
hash_mod_prime = get_next_prime_brute(starting_bitsize)
actual_bit_size = get_byte_boundary(hash_mod_prime)
return int(actual_bit_size), int(hash_mod_prime)
def main():
if len(sys.argv) < 3:
print("usage: ", sys.argv[0], "<number of items> <probability of false positives>\n (generally the second argument should be well less than 0.01)\n")
print("example:", sys.argv[0], "100000 0.000001\n")
exit(-1)
n = float(sys.argv[1])
p = float(sys.argv[2])
(m, k) = calculate(n, p)
print("bits required:", m, "("+str(int(m/8)), "bytes)")
print("num_hashes:", k)
(bit_size, hash_mod_size) = get_sizes_for_minimum_bitsize(m)
byte_size = int(bit_size/8)
print("bit_size:", str(bit_size))
print("byte_size:", str(byte_size))
print("mod_size:", str(hash_mod_size))
print("bits_per_item:", str(bit_size/n))
print("bytes_per_item:", str(byte_size/n))
if __name__ == '__main__':
main()
#import sha3 # https://github.com/maandree/sha3sum/tree/master/python3
import hashlib
class HashIterator(object):
"""
Iterator that produces a number of hashes for a string
given at initialization time
"""
def __init__(self, rangesize, number_of_hashes, string):
self.rangesize = rangesize
self.n = int(number_of_hashes)
self.string = string
self.count = 0
#self.bigprime = ((2**1279)-1)
#self.smallprime = ((2**61)-1)
self.hashes = []
def __next__(self):
if self.count >= self.n:
self.count = 0 # reset!
self.hashes = []
raise StopIteration
else:
if not self.hashes:
self._hashstring(self.string)
hash = self.hashes[self.count] % self.rangesize
self.count += 1
return hash
def __iter__(self):
return self
def _hashstring(self, input_string):
"""
hash the input string to a hex output hash and then
chop that into 20 bit values, and return the list
of values.
"""
m = hashlib.sha1()
input_string = input_string.encode('utf-8')
m.update(input_string)
hexhash = m.hexdigest()
while int(len(hexhash) / 6) < self.n:
m.update(input_string + hexhash.encode('utf-8'))
hexhash = hexhash + m.hexdigest()
self.hashes = [int(hexhash[i*6:i*6+6], base=16) for i in range(0,int(len(hexhash)/6)) if len(hexhash[i*6:i*6+6])==6]
class HashIterable(object):
def __init__(self, rangesize, number_of_hashes, string):
self.rangesize = rangesize
self.n = number_of_hashes
self.string = string
def __iter__(self):
return HashIterator(self.rangesize, self.n, self.string)
# this file is no longer needed -- the values are now calculated on the fly.
#!/usr/bin/env python3
"""
test bloom filter functions
"""
import os
import unittest
import random
from bloom import Filter
from bloom import HistogramFilter
"""
Helper function which returns a tuple of two corresponding lists:
1) a list of positive numbers (either integers, or floats)
2) a copy of the same list with randomly chosen members flipped to negative
This is used to test our scaling function, which for these two lists as
input should give two lists whose members have the same absolute values.
The list can be up to max_length members long, and the values in an
integer list can be up to highest_max_value, but most lists have a lower
max_int value which is randomly chosen to be between lowest_max_value
and highest_max_value.
In a floating point list, the values are between 0 and 1 for the positive
list, and between -1 and 1 for the mixed sign list.
"""
def make_lists_of_random_numbers():
max_length = 2000
lowest_max_value = 5
highest_max_value = 2000
length = random.randint(1, max_length)
max_int = random.randint(lowest_max_value, highest_max_value)
is_floating_point = random.randint(0, 1)
mixed = list()
positives = list()
for i in range(0, length):
if is_floating_point:
number = random.random()
else:
number = random.randint(0, max_int)
positives.append(number)
for i in range(0, length):
signer = 1
if random.randint(0, 1) == 1:
signer = -1
mixed.append(positives[i] * signer)
return (positives,mixed)
class TestBloom(unittest.TestCase):
def cleanup_files_in_bucket(self, bucket):
""" helper method """
config_file = os.path.join("buckets", bucket, "config.json")
filter_file = os.path.join("buckets", bucket, "filter.bits")
strings_file = os.path.join("buckets", bucket, "strings.txt")
for file in [config_file, filter_file, strings_file]:
if os.path.exists(file):
os.unlink(file)
def setUp(self):
bucket = "testbucket"
self.cleanup_files_in_bucket(bucket)
self.bloom = Filter(bucket, "teststring")
def test_get_byte_boundary(self):
self.assertEqual(self.bloom.get_byte_boundary(0), 0)
self.assertEqual(self.bloom.get_byte_boundary(1), 8)
self.assertEqual(self.bloom.get_byte_boundary(1.5), 8)
self.assertEqual(self.bloom.get_byte_boundary(2), 8)
self.assertEqual(self.bloom.get_byte_boundary(7), 8)
self.assertEqual(self.bloom.get_byte_boundary(8), 8)
self.assertEqual(self.bloom.get_byte_boundary(9), 16)
self.assertEqual(self.bloom.get_byte_boundary(12), 16)
self.assertEqual(self.bloom.get_byte_boundary(15), 16)
self.assertEqual(self.bloom.get_byte_boundary(16), 16)
self.assertEqual(self.bloom.get_byte_boundary(17), 24)
def test_is_prime(self):
self.assertEqual(self.bloom.is_prime(0), False)
self.assertEqual(self.bloom.is_prime(1), False)
self.assertEqual(self.bloom.is_prime(2), True)
self.assertEqual(self.bloom.is_prime(3), True)
self.assertEqual(self.bloom.is_prime(4), False)
self.assertEqual(self.bloom.is_prime(5), True)
self.assertEqual(self.bloom.is_prime(6), False)
self.assertEqual(self.bloom.is_prime(7), True)
self.assertEqual(self.bloom.is_prime(8), False)
self.assertEqual(self.bloom.is_prime(9), False)
def test_get_next_prime_brute(self):
self.assertEqual(self.bloom.get_next_prime_brute(0), 2)
self.assertEqual(self.bloom.get_next_prime_brute(1), 2)
self.assertEqual(self.bloom.get_next_prime_brute(2), 2)
self.assertEqual(self.bloom.get_next_prime_brute(3), 3)
self.assertEqual(self.bloom.get_next_prime_brute(4), 5)
self.assertEqual(self.bloom.get_next_prime_brute(8), 11)
def test_get_sizes_for_minimum_bitsize(self):
# get a tuple with first (multiple of 8, prime) equal or above a given number
self.assertEqual(self.bloom.get_sizes_for_minimum_bitsize(6), (8,7))
self.assertEqual(self.bloom.get_sizes_for_minimum_bitsize(19), (24,19))
self.assertEqual(self.bloom.get_sizes_for_minimum_bitsize(20), (24,23))
def test_calculate_bits_and_num_hashes(self):
self.bloom.capacity = 100
self.bloom.error_rate = 0.1
self.assertEqual(self.bloom.calculate_bits_and_num_hashes(), (480,4))
self.bloom.error_rate = 0.01
self.assertEqual(self.bloom.calculate_bits_and_num_hashes(), (959,7))
def test_update(self):
self.assertEqual(self.bloom.check(), False)
self.bloom.update()
# true here on next line because it checks the (unsaved) bit array in memory
self.assertEqual(self.bloom.check(), True)
self.bloom.save_array()
self.assertEqual(self.bloom.check(), True)
def test_check(self):
self.assertEqual(self.bloom.check(), False)
self.bloom.update()
self.bloom.save_array()
self.assertEqual(self.bloom.check(), True) # must do update() and save_array() first
def test_get_strings(self):
self.assertEqual(self.bloom.get_strings(), list())
self.bloom.update() # doing update again should not add string again
self.bloom.save_array()
# changed from using set to using list here and in above assert after adding uniq in code
self.assertEqual(self.bloom.get_strings(), ["teststring"])
class TestHist(unittest.TestCase):
def setUp(self):
self.hist = HistogramFilter("testbucket")
def test_non_zero_round(self):
self.assertEqual(self.hist.non_zero_round(-10), -10)
self.assertEqual(self.hist.non_zero_round(-1), -1)
self.assertEqual(self.hist.non_zero_round(-1.00000000000000000000000001), -1)
self.assertEqual(self.hist.non_zero_round(0), 1)
self.assertEqual(self.hist.non_zero_round(-0), 1)
self.assertEqual(self.hist.non_zero_round(-9999999999999999999999999), -9999999999999999999999999)
self.assertEqual(self.hist.non_zero_round(0.0000001), 1)
self.assertEqual(self.hist.non_zero_round(0.000000000000000000000001), 1)
self.assertEqual(self.hist.non_zero_round(1.000000000000000000000001), 1)
# in python3 int(1.999999999999999) returns 1 but int(1.9999999999999999) returns 2
self.assertEqual(self.hist.non_zero_round(1.999999999999999), 1)
self.assertEqual(self.hist.non_zero_round(1.9999999999999999), 2)
self.assertEqual(self.hist.non_zero_round(-1.999999999999999), -1)
self.assertEqual(self.hist.non_zero_round(-1.9999999999999999), -2)
self.assertEqual(self.hist.non_zero_round(1.9), 1)
def test_boost_values_to_target(self):
self.assertEqual(self.hist.boost_values_to_target([1,1,1], 100), [34,33,33])
self.assertEqual(self.hist.boost_values_to_target([-1,-1,-1], 100), [-34,-33,-33])
self.assertEqual(self.hist.boost_values_to_target([-1,1,1], 100), [-34,33,33])
self.assertEqual(self.hist.boost_values_to_target([1,1,-1], 100), [34,33,-33])
self.assertEqual(self.hist.boost_values_to_target([1,-1,1], 100), [34,-33,33])
self.assertEqual(self.hist.boost_values_to_target([1,-1,-1], 100), [34,-33,-33])
self.assertEqual(self.hist.boost_values_to_target([-1,1,-1], 100), [-34,33,-33])
self.assertEqual(self.hist.boost_values_to_target([-1,-1,1], 100), [-34,-33,33])
self.assertEqual(self.hist.boost_values_to_target([22,22,22], 100), [34,33,33])
self.assertEqual(self.hist.boost_values_to_target([1,1,1], 4), [2,1,1])
self.assertEqual(self.hist.boost_values_to_target([100,100,100], 400), [134,133,133])
# bad test because it's testing handling of overshoot, which is something
# the method under test is not intended to handle:
# self.assertEqual(self.hist.boost_values_to_target([2,2,2], 4), [2,1,1])
self.assertEqual(self.hist.boost_values_to_target([], 4), [])
def test_decimate_values_to_target(self):
# The rule is decimate proportionately, but in reverse order. In other words hit the
# items at the end of each list first.
self.assertEqual(self.hist.decimate_values_to_target([40,40,40], 100), [34,33,33])
self.assertEqual(self.hist.decimate_values_to_target([26,26,26,26], 100), [25,25,25,25])
self.assertEqual(self.hist.decimate_values_to_target([13,13,26,52], 100), [12,12,25,51])
self.assertEqual(self.hist.decimate_values_to_target([52,26,13,13], 100), [51,25,12,12])
# include some negatives
self.assertEqual(self.hist.decimate_values_to_target([-52,26,13,13], 100), [-51,25,12,12])
self.assertEqual(self.hist.decimate_values_to_target([52,26,13,-13], 100), [51,25,12,-12])
self.assertEqual(self.hist.decimate_values_to_target([-52,26,13,-13], 100), [-51,25,12,-12])
self.assertEqual(self.hist.decimate_values_to_target([-52,-26,-13,-13], 100), [-51,-25,-12,-12])
self.assertEqual(self.hist.decimate_values_to_target([52,-26,-13,13], 100), [51,-25,-12,12])
self.assertEqual(self.hist.decimate_values_to_target([52,-26,13,13], 100), [51,-25,12,12])
self.assertEqual(self.hist.decimate_values_to_target([52,26,-13,13], 100), [51,25,-12,12])
self.assertEqual(self.hist.decimate_values_to_target([-52,-26,13,13], 100), [-51,-25,12,12])
self.assertEqual(self.hist.decimate_values_to_target([-52,26,-13,13], 100), [-51,25,-12,12])
self.assertEqual(self.hist.decimate_values_to_target([52,-26,13,-13], 100), [51,-25,12,-12])
self.assertEqual(self.hist.decimate_values_to_target([52,26,-13,-13], 100), [51,25,-12,-12])
self.assertEqual(self.hist.decimate_values_to_target([-52,26,13,12], 100), [-52,25,12,11])
self.assertEqual(self.hist.decimate_values_to_target([12,13,26,-52], 100), [12,12,25,-51])
self.assertEqual(self.hist.decimate_values_to_target([-52,26,13,-12], 100), [-52,25,12,-11])
self.assertEqual(self.hist.decimate_values_to_target([-12,-13,26,52], 100), [-12,-12,25,51])
self.assertEqual(self.hist.decimate_values_to_target([52,26,-13,-12], 100), [52,25,-12,-11])
self.assertEqual(self.hist.decimate_values_to_target([12,-13,26,-52], 100), [12,-12,25,-51])
self.assertEqual(self.hist.decimate_values_to_target([52,-26,-13,12], 100), [52,-25,-12,11])
self.assertEqual(self.hist.decimate_values_to_target([-12,-13,-26,-52], 100), [-12,-12,-25,-51])
# This is subtle. The order of the input determines which item gets the hit in the
# decimation. In this case only three of the four items gets decimated. We do the
# decimation in reverse order (for the first list, we go 12, 13, 26, 52) so the 0th
# item of the list gets spared the decrement in each of the next two cases below.
self.assertEqual(self.hist.decimate_values_to_target([52,26,13,12], 100), [52,25,12,11])
self.assertEqual(self.hist.decimate_values_to_target([12,13,26,52], 100), [12,12,25,51])
self.assertEqual(self.hist.decimate_values_to_target([50,50,10], 100), [47,47,6])
self.assertEqual(self.hist.decimate_values_to_target([50,50,-10], 100), [47,47,-6])
self.assertEqual(self.hist.decimate_values_to_target([50,-50,-10], 100), [47,-47,-6])
self.assertEqual(self.hist.decimate_values_to_target([-50,-50,10], 100), [-47,-47,6])
self.assertEqual(self.hist.decimate_values_to_target([-50,50,-10], 100), [-47,47,-6])
self.assertEqual(self.hist.decimate_values_to_target([-50,-50,-10], 100), [-47,-47,-6])
self.assertEqual(self.hist.decimate_values_to_target([], 100), [])
self.assertEqual(self.hist.decimate_values_to_target([], 10), [])
def test_values_to_fudged_counts(self):
for i in range(0, 1000): # have tried this with 1000000 (1 million) and it passed
positive, mixed = make_lists_of_random_numbers()
mixed_scaled = self.hist.values_to_fudged_counts(mixed)
positive_scaled = self.hist.values_to_fudged_counts(positive)
abs_scaled = [abs(x) for x in mixed_scaled]
#print("positive: " + str(positive_scaled))
#print("mixedabs: " + str(abs_scaled))
self.assertEqual(positive_scaled, abs_scaled)
positive_total = sum(abs(v) for v in positive_scaled)
abs_total = sum(abs(v) for v in abs_scaled)
self.assertEqual(positive_total, abs_total)
self.assertEqual(positive_total, self.hist.target)
#print("positive_total is " + str(positive_total))
#print("abs_total is " + str(abs_total))
def test_sort_by_value_then_len_then_alpha(self):
# What we want in the results is the lowest values (counts) first. Then
# for any runs of items that have identical counts, we want the longest
# ones first. Then if there's a run of items where both count and length
# are identical, we want the items within that run sorted alphabetically.
unsorted = [["a",1],["b",2],["cc",2],["d",3]]
expected = [["a",1],["cc",2],["b",2],["d",3]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
unsorted = [["a",1],["bb",2],["c",2],["d",3]]
expected = [["a",1],["bb",2],["c",2],["d",3]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
unsorted = [["a",1],["b",2],["c",2],["d",3]]
expected = [["a",1],["b",2],["c",2],["d",3]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
unsorted = [["x",1],["z",1],["y",1]]
expected = [["x",1],["y",1],["z",1]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
# mix in some negatives
unsorted = [["a",1],["bb",2],["c",2],["d",-3]]
expected = [["a",1],["bb",2],["c",2],["d",-3]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
unsorted = [["a",1],["bb",2],["c",2],["d",-3],["e",-1]]
expected = [["a",1],["e",-1],["bb",2],["c",2],["d",-3]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
unsorted = [["a",-1],["bb",2],["c",2],["d",-3],["e",1]]
expected = [["a",-1],["e",1],["bb",2],["c",2],["d",-3]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
unsorted = [["a",-1],["bb",-2],["c",-2],["d",-3],["e",-1]]
expected = [["a",-1],["e",-1],["bb",-2],["c",-2],["d",-3]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
unsorted = [["a",-1],["bb",2],["c",-2],["d",-3],["e",-1]]
expected = [["a",-1],["e",-1],["bb",2],["c",-2],["d",-3]]
self.assertEqual(self.hist.sort_by_value_then_len_then_alpha(unsorted), expected)
def main():
bloomtest = TestBloom()
histtest = TestHist()
unittest.main()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment