Skip to content

Instantly share code, notes, and snippets.

@kleptog
Created October 29, 2018 08:04
Show Gist options
  • Save kleptog/7f79132d59217e07976231a97149d577 to your computer and use it in GitHub Desktop.
Save kleptog/7f79132d59217e07976231a97149d577 to your computer and use it in GitHub Desktop.
Python implementation for boolean expression index
# coding=utf8
# This is a python implementation of the boolean expression algorithm
# described in the paper "Indexing Boolean Expressions" by Whong et al.
# https://theory.stanford.edu/~sergei/papers/vldb09-indexing.pdf
# Note this really only uses the index ideas from the paper, because the
# ctual algorithm it describes involves shuffling through lists, something
# which is extremely difficult to do efficiently in python. I went for the
# easier route of simply counting entries. As such, I don't give any
# guarentees about the complexity of the algorithm, but it seems it should
# be comparable.
# This file implements the CNF algorithm in nthe paper.
import pprint
from collections import Counter
import itertools
class Condition(object):
var = None
val = None
inv = None
def __init__(self, var, val, inv=False):
self.var = var
self.val = val
self.inv = (inv is not False)
def __str__(self):
if self.inv:
return "(%s ∉ %r)" % (self.var, self.val)
else:
return "(%s ∈ %r)" % (self.var, self.val)
class And(list):
def __init__(self, *args):
return list.__init__(self, args)
def __str__(self):
return " & ".join(str(c) for c in self)
class Or(list):
def __init__(self, *args):
return list.__init__(self, args)
def __str__(self):
return " | ".join("(%s)" % c for c in self)
class BE(object):
""" Represents a Boolean Expression to be indexed and evaluated IN CNF form"""
def __init__(self, name, be):
if isinstance(be, Or):
be = And(be)
assert isinstance(be, And)
self.name = name
self.be = be
def cnf_size(self):
""" The size of a CNF BE is defined as the number of disjunctions in the BE with no ∉ predicates """
size = 0
for c in self.be:
assert isinstance(c, Or)
for d in c:
if d.inv:
break
else:
size += 1
return size
def cnf_index_terms(self):
for i, c in enumerate(self.be):
for d in c:
for val in d.val:
yield ((d.var, val), (self, d.inv, i))
def get_disjunction_counts(self):
return [sum(1 for term in and_term if term.inv) for and_term in self.be]
def num_terms(self):
return len(self.be)
def eval(self, ass):
ass = dict(ass)
def eval_term(term):
if isinstance(term, And):
return all(eval_term(t) for t in term)
if isinstance(term, Or):
return any(eval_term(t) for t in term)
if isinstance(term, Condition):
val = ass.get(term.var)
if val is None:
return term.inv
if term.inv:
return val not in term.val
return val in term.val
return eval_term(self.be)
def __repr__(self):
return "<BE %r>" % self.name
c1 = BE('c1', And(Or(Condition('A', [1]), Condition('B',[1])), Or(Condition('C', [1]), Condition('D', [1]))))
c2 = BE('c2', And(Or(Condition('A', [1]), Condition('C',[2])), Or(Condition('B', [1]), Condition('D', [1]))))
c3 = BE('c3', And(Or(Condition('A', [1]), Condition('B',[1])), Or(Condition('C', [2]), Condition('D', [1]))))
c4 = BE('c4', And(Or(Condition('A', [1, 2]), Condition('B',[1])), Or(Condition('A', [1]), Condition('D', [1])))) # Typo in paper
c5 = BE('c5', And(Or(Condition('A', [1]), Condition('B',[1])), Or(Condition('C', [1, 2], True), Condition('D', [1], True), Condition('E', [1]))))
c6 = BE('c6', Or(Condition('A', [1], True), Condition('B', [1])))
exprs = [c1, c2, c3, c4, c5, c6]
class CNFIndex(object):
def __init__(self, exprs):
self.index, self.zeros = self.build_index(exprs)
# Index conditions
def build_index(self, exprs):
index = dict()
zeros = set()
for be in exprs:
k = be.cnf_size()
if k not in index:
index[k] = dict()
if k == 0:
zeros.add(be)
for key, val in be.cnf_index_terms():
if key not in index[k]:
index[k][key] = []
index[k][key].append(val)
return index, zeros
def dump_index(self):
pprint.pprint(self.index)
def match(self, ass):
result = set()
for k in self.index:
counts = Counter(itertools.chain.from_iterable(self.index[k].get(a, []) for a in ass))
rules = set(rule for rule, inv, iid in counts)
if k == 0:
rules.update(self.zeros) # Always do zeros
for rule in rules:
disjunction_count_list = rule.get_disjunction_counts()
for dis_id in range(rule.num_terms()):
if counts.get((rule, False, dis_id), 0) == 0 and counts.get((rule, True, dis_id), 0) >= disjunction_count_list[dis_id]:
break
else:
result.add(rule)
return result
class NoIndex(object):
def __init__(self, exprs):
self.exprs = exprs
def match(self, ass):
result = set()
for expr in self.exprs:
if expr.eval(ass):
result.add(expr)
return result
no_index = NoIndex(exprs)
cnf_index = CNFIndex(exprs)
#print cnf_index.match((('A', 1), ('C', 2)))
#print no_index.match((('A', 1), ('C', 2)))
all_ass = itertools.product(
(('A', x) for x in (None, 1, 2)),
(('B', x) for x in (None, 1, 2)),
(('C', x) for x in (None, 1, 2)),
(('D', x) for x in (None, 1, 2)),
(('E', x) for x in (None, 1, 2)),
)
errors = Counter()
for ass in all_ass:
ass = [x for x in ass if x[1] is not None]
r1 = cnf_index.match(ass)
r2 = no_index.match(ass)
if r1 != r2:
errors.update(r1 ^ r2)
print r1, r2, ass
import pdb; pdb.set_trace()
pprint.pprint(errors)
import timeit
def testcnf():
for ass in all_ass:
cnf_index(ass)
def testno():
for ass in all_ass:
no_index(ass)
print timeit.timeit(testcnf, number=10000)
print timeit.timeit(testno, number=10000)
# coding=utf8
# This is a python implementation of the boolean expression algorithm
# described in the paper "Indexing Boolean Expressions" by Whong et al.
# https://theory.stanford.edu/~sergei/papers/vldb09-indexing.pdf
# Note this really only uses the index ideas from the paper, because the
# ctual algorithm it describes involves shuffling through lists, something
# which is extremely difficult to do efficiently in python. I went for the
# easier route of simply counting entries. As such, I don't give any
# guarentees about the complexity of the algorithm, but it seems it should
# be comparable.
# This file implements the DNF algorithm in the paper
import pprint
from collections import Counter
import itertools
class Condition(object):
var = None
val = None
inv = None
def __init__(self, var, val, inv=False):
self.var = var
self.val = val
self.inv = (inv is not False)
def __str__(self):
if self.inv:
return "(%s ∉ %r)" % (self.var, self.val)
else:
return "(%s ∈ %r)" % (self.var, self.val)
class And(list):
def __init__(self, *args):
return list.__init__(self, args)
def __str__(self):
return " & ".join(str(c) for c in self)
class Or(list):
def __init__(self, *args):
return list.__init__(self, args)
def __str__(self):
return " | ".join("(%s)" % c for c in self)
class BE(object):
""" Represents a Boolean Expression to be indexed and evaluated IN DNF form"""
def __init__(self, name, be):
if isinstance(be, Or):
be = And(be)
assert isinstance(be, And)
self.name = name
self.be = be
def dnf_size(self):
""" The size of a DNF BE is defined as the number of ∈ predicates """
size = 0
for c in self.be:
assert isinstance(c, Condition)
if not c.inv:
size += 1
return size
def dnf_index_terms(self):
for i, d in enumerate(self.be):
for val in d.val:
yield ((d.var, val), (self, d.inv))
def eval(self, ass):
ass = dict(ass)
def eval_term(term):
if isinstance(term, And):
return all(eval_term(t) for t in term)
if isinstance(term, Or):
return any(eval_term(t) for t in term)
if isinstance(term, Condition):
val = ass.get(term.var)
if val is None:
return term.inv
if term.inv:
return val not in term.val
return val in term.val
return eval_term(self.be)
def __repr__(self):
return "<BE %r>" % self.name
c1 = BE('c1', And(Condition('age', [3]), Condition('state', ['NY'])))
c2 = BE('c2', And(Condition('age', [3]), Condition('gender', ['F'])))
c3 = BE('c3', And(Condition('age', [3]), Condition('gender', ['M']), Condition('state', ['CA'], True)))
c4 = BE('c4', And(Condition('state', ['CA']), Condition('gender', ['M'])))
c5 = BE('c5', And(Condition('age', [3, 4])))
c6 = BE('c6', And(Condition('state', ['CA', 'NY'], True)))
exprs = [c1, c2, c3, c4, c5, c6]
class DNFIndex(object):
def __init__(self, exprs):
self.index, self.zeros = self.build_index(exprs)
# Index conditions
def build_index(self, exprs):
index = dict()
zeros = set()
for be in exprs:
k = be.dnf_size()
if k not in index:
index[k] = dict()
if k == 0:
zeros.add(be)
for key, val in be.dnf_index_terms():
if key not in index[k]:
index[k][key] = []
index[k][key].append(val)
return index, zeros
def dump_index(self):
pprint.pprint(self.index)
def match(self, ass):
result = set()
for k in self.index:
counts = Counter(itertools.chain.from_iterable(self.index[k].get(a, []) for a in ass))
rules = set(rule for rule, inv in counts)
if k == 0:
rules.update(self.zeros) # Always do zeros
for rule in rules:
if counts.get((rule, False), 0) == k and counts.get((rule, True), 0) == 0:
result.add(rule)
return result
class NoIndex(object):
def __init__(self, exprs):
self.exprs = exprs
def match(self, ass):
result = set()
for expr in self.exprs:
if expr.eval(ass):
result.add(expr)
return result
no_index = NoIndex(exprs)
dnf_index = DNFIndex(exprs)
#print dnf_index.match((('A', 1), ('C', 2)))
#print no_index.match((('A', 1), ('C', 2)))
all_ass = itertools.product(
(('state', x) for x in (None, 'NY', 'CA')),
(('gender', x) for x in (None, 'M', 'F')),
(('age', x) for x in (None, 3, 4)),
)
errors = Counter()
for ass in all_ass:
ass = [x for x in ass if x[1] is not None]
r1 = dnf_index.match(ass)
r2 = no_index.match(ass)
if r1 != r2:
errors.update(r1 ^ r2)
print r1, r2, ass
import pdb; pdb.set_trace()
pprint.pprint(errors)
import timeit
def testdnf():
for ass in all_ass:
dnf_index(ass)
def testno():
for ass in all_ass:
no_index(ass)
print timeit.timeit(testdnf, number=10000)
print timeit.timeit(testno, number=10000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment