Skip to content

Instantly share code, notes, and snippets.

@inhzus inhzus/apriori.py
Created Apr 22, 2019

Embed
What would you like to do?
Frequent itemsets Algorithm: Apriori
# -*- coding: utf-8 -*-
# created by inhzus
import typing
from collections import defaultdict
from itertools import combinations
from numbers import Number
from pprint import pprint
from bitarray import bitarray
T = typing.TypeVar('T')
def _join(itemsets: typing.List[tuple]) -> typing.Iterable[tuple]:
"""Join k length itemsets into k + 1 length itemsets"""
i = 0
while i < len(itemsets):
step = 1
*heads, tail = itemsets[i]
extras = [tail]
for j in range(i + 1, len(itemsets)):
*inner_heads, inner_tail = itemsets[j]
if heads == inner_heads:
extras.append(inner_tail)
step += 1
else:
break
heads = tuple(heads)
for m, n in sorted(combinations(extras, 2)):
yield heads + (m, n,)
i += step
def _prune(
itemsets: typing.List[tuple],
possibilities: typing.Iterable[tuple]
) -> typing.Iterable[tuple]:
itemsets = set(itemsets)
for possible in possibilities:
# yield possible
for i in range(len(possible) - 2):
unchecked = possible[:i] + possible[i + 1:]
if unchecked not in itemsets:
break
else:
yield possible
def _generate(itemsets: typing.List[tuple]) -> typing.Iterable[tuple]:
yield from _prune(itemsets, _join(itemsets))
def frequent_itemsets(
transactions: typing.List[tuple],
min_support: float
) -> (typing.Dict[int, typing.Dict[tuple, int]], int):
num_transactions = len(transactions)
# counts = Counter()
counter: typing.Dict[T, bitarray] = defaultdict(lambda: bitarray('0' * len(transactions)))
for idx, transaction in enumerate(transactions):
for item in transaction:
counter[item][idx] = 1
# counts[item] += 1
itemsets: typing.Dict[int, typing.Dict[tuple, int]] = {
1: {(k,): counter[k].count()
for k in counter
if counter[k].count() / num_transactions >= min_support}}
# itemsets = {1: {(k,): c for k, c in counts.items()}}
if not itemsets:
return dict(), num_transactions
k = 1
while itemsets[k]:
children: list = sorted(itemsets[k].keys())
candidates: typing.Iterable[tuple] = list(_generate(children))
if not candidates:
break
level = {}
for candidate in candidates:
bits = bitarray('1' * num_transactions)
for item in candidate:
bits &= counter[item]
c = bits.count()
if c / num_transactions >= min_support:
level[candidate] = c
if not level:
break
itemsets[k + 1] = level
k += 1
# k = 1
# while itemsets[k]:
# children: list = sorted(itemsets[k].keys())
# candidates = list(_generate(children))
# if not candidates:
# break
# candidate_counter = Counter()
# for transaction in transactions:
# for candidate, candidate_set in zip(
# candidates, [set(item) for item in candidates]):
# if set.issubset(candidate_set, transaction):
# candidate_counter[candidate] += 1
#
# candidates = {item: c for (item, c) in candidate_counter.items()
# if (c / num_transactions) >= min_support}
# if not candidates:
# break
# itemsets[k + 1] = candidates
# k += 1
return itemsets, num_transactions
class Rule:
def __init__(self,
lhs: tuple,
rhs: tuple,
num_both: int = 0,
num_lhs: int = 0,
num_rhs: int = 0,
num_transactions: int = 0):
self.lhs = lhs
self.rhs = rhs
self.num_both = num_both
self.num_lhs = num_lhs
self.num_rhs = num_rhs
self.num_transactions = num_transactions
@property
def confidence(self):
try:
return self.num_both / self.num_lhs
except ZeroDivisionError or AttributeError:
return None
@property
def support(self):
try:
return self.num_both / self.num_transactions
except ZeroDivisionError or AttributeError:
return None
@property
def lift(self):
"""P(X and Y) / (P(X) * P(Y))"""
try:
expected = (self.num_lhs * self.num_rhs) / self.num_transactions ** 2
observed = self.num_both / self.num_transactions
return observed / expected
except ZeroDivisionError or AttributeError:
return None
@property
def conviction(self):
"""P(not Y) / P(not Y | X)"""
try:
eps = 10e-10
not_rhs = 1 - self.num_rhs / self.num_transactions
not_rhs_given_lhs = 1 - self.confidence
return not_rhs / (not_rhs_given_lhs + eps)
except ZeroDivisionError or AttributeError:
return None
@staticmethod
def _p(_s):
return '{{{}}}'.format(', '.join(str(k) for k in _s))
def __repr__(self):
return '{} => {}'.format(self._p(self.lhs), self._p(self.rhs))
def __eq__(self, other):
return set(self.lhs) == set(other.lhs) and set(self.rhs) == set(other.rhs)
def __hash__(self):
return hash(frozenset(self.lhs + self.rhs))
def __len__(self):
return len(self.lhs + self.rhs)
def _ap_gen_rules(
itemset: tuple,
met: typing.List[tuple],
itemsets: typing.Dict[int, typing.Dict[tuple, int]],
min_conf: float,
num_transactions: int
) -> Rule:
def count(_set):
return itemsets[len(_set)][_set]
if len(itemset) <= (len(met[0]) + 1):
return
met = list(_generate(met))
met_copy = met.copy()
for item in met:
lhs = tuple(sorted(list(set(itemset).difference(set(item)))))
if (count(itemset) / count(lhs)) >= min_conf:
yield Rule(
lhs, item, count(itemset),
count(lhs), count(item), num_transactions)
else:
met_copy.remove(item)
if met_copy:
yield from _ap_gen_rules(
itemset, met_copy, itemsets, min_conf, num_transactions
)
def rules(
itemsets: typing.Dict[int, typing.Dict[tuple, int]],
min_confidence: float,
num_transactions: int
) -> typing.Iterable[Rule]:
if not (0 <= min_confidence <= 1 and isinstance(min_confidence, Number)):
raise ValueError()
if not (num_transactions >= 0 and isinstance(num_transactions, Number)):
raise ValueError()
def count(_set):
return itemsets[len(_set)][_set]
for size, counter in itemsets.items():
if size < 2:
continue
for itemset in sorted(counter.keys()):
for removed in combinations(itemset, 1):
lhs = tuple(sorted(list(set(itemset).difference(set(removed)))))
if count(itemset) / count(lhs) >= min_confidence:
yield Rule(
lhs, removed, count(itemset),
count(lhs), count(removed), num_transactions
)
yield from _ap_gen_rules(
itemset, list(combinations(itemset, 1)),
itemsets, min_confidence, num_transactions
)
def apriori(
transactions: typing.List[tuple],
min_support: float = 0.5,
min_confidence: float = 0.5
) -> (typing.Dict[int, typing.Dict[tuple, int]], typing.List[Rule]):
itemsets, num_ = frequent_itemsets(transactions, min_support)
r = rules(itemsets, min_confidence, num_)
return itemsets, list(r)
if __name__ == '__main__':
s = [(1, 2, 5), (2, 4), (2, 3), (1, 2, 4),
(1, 3), (2, 3), (1, 3), (1, 2, 3, 5), (1, 2, 3)]
pprint(apriori(s, 0.2, 0.5))
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.