Skip to content

Instantly share code, notes, and snippets.

@escuccim
Last active February 8, 2019 10:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save escuccim/546a456f2b1240658e07439eddefd033 to your computer and use it in GitHub Desktop.
Save escuccim/546a456f2b1240658e07439eddefd033 to your computer and use it in GitHub Desktop.
PatternFusion
# depends on my FPTree algorithm - https://gist.github.com/escuccim/dc9aa979d54af69234f508538af7ee1a
# to mine the initial set of candidates for the pool
import numpy as np
import pandas as pd
import random
from itertools import combinations
class PatternFusion():
def __init__(self, min_support=10):
self.min_support = min_support
def check_pattern(self, pattern, data):
support = 0
pattern_found = False
if len(pattern) not in self.f_patterns:
self.f_patterns[len(pattern)] = {}
if tuple(sorted(pattern)) in self.f_patterns[len(pattern)]:
support = self.f_patterns[len(pattern)][tuple(sorted(pattern))]
pattern_found = True
if not pattern_found:
for transaction in data:
if set(pattern).issubset(set(transaction)):
support += 1
# put the support of the pattern into the dict so we don't need to look
# it up again
self.f_patterns[len(pattern)][tuple(sorted(pattern))] = support
return support > self.min_support
# takes two patterns and a complete DB and returns the pattern distance
def pattern_distance(self, P1, P2, db):
# if P1 is a subset of P2 or vice versa we can take a shortcut
if set(P1).issubset(set(P2)) or set(P2).issubset(set(P1)):
if len(P1) not in self.f_patterns:
self.check_pattern(P1, db)
elif tuple(sorted(P1)) not in self.f_patterns[len(P1)]:
self.check_pattern(P1, db)
P1_support = self.f_patterns[len(P1)][tuple(sorted(P1))]
P2_support = self.f_patterns[len(P2)][tuple(sorted(P2))]
if len(P2) not in self.f_patterns:
self.check_pattern(P2, db)
elif tuple(sorted(P2)) not in self.f_patterns[len(P2)]:
self.check_pattern(P2, db)
P1_support = self.f_patterns[len(P1)][tuple(sorted(P1))]
if set(P1).issubset(set(P2)):
return 1 - (P2_support / P1_support)
elif set(P2).issubset(set(P1)):
return 1 - (P1_support / P2_support)
# otherwise we need to do some counting to get the pattern distance
P1orP2_count = 0
P1andP2_count = 0
# convert P1 and P2 to sets so we can do subset testing
P1 = set(P1)
P2 = set(P2)
# loop through the DB getting the support for (P1 and P2) and (P1 or P2)
for transaction in db:
trans_set = set(transaction)
# if both P1 and P2 are in the transaction then we increment both counts
if P1.issubset(trans_set) and P2.issubset(trans_set):
P1andP2_count += 1
P1orP2_count += 1
# else if only one of them is we increment the intersect count
elif P1.issubset(trans_set) or P2.issubset(trans_set):
P1orP2_count += 1
return 1 - (P1andP2_count / P1orP2_count)
# fuse the new patterns into one big superpattern or smaller patterns
def fusion(self, seed, new_pattern, data, pattern_pool, verbose=False):
if verbose:
print("Fusion:", seed, new_pattern)
print("Pattern pool:", len(pattern_pool))
# if we don't have any new patterns to fuse just return the pool
if len(new_pattern) == 0:
if list(seed) not in pattern_pool:
return pattern_pool + [list(seed)]
else:
return pattern_pool
# add the seed to our set of items
complete_pattern = list(seed) + new_pattern
# if new_pattern is frequent add it to the pool and return it
if self.check_pattern(complete_pattern, data) and complete_pattern not in pattern_pool:
if verbose:
print("Superpattern is frequent!")
pattern_pool.append(list(set(complete_pattern)))
# otherwise try to make some smaller frequent patterns, trying to make them
# as large as possible
elif len(new_pattern) >= 2:
original_pool_size = len(pattern_pool)
if verbose:
print("Superpattern is not frequent, trying subpatterns")
# try to find some frequent subpatterns in the new_pattern, in the interest of efficiency
# we will not try to do this as quickly as possible
start_size = len(new_pattern) // 3
for i in range(start_size, 2*len(seed), -len(seed)*4):
if verbose:
print("Combinations size:", i)
# shuffle new pattern so we have some variety
random.shuffle(new_pattern)
perms = combinations(new_pattern, i)
for candidate in perms:
# make sure that seed is in the candidate
candidate = list(seed) + list(candidate)
if self.check_pattern(candidate, data) and sorted(candidate) not in pattern_pool:
if verbose:
print("Candidate OK:", candidate)
pattern_pool.append(sorted(candidate))
# if we are trying combinations we don't care about finding
# a while bunch of subpatterns, one should be enough to keep the
# pool from shrinking
if len(pattern_pool) > original_pool_size + 2:
break
if len(pattern_pool) > original_pool_size + 2:
if verbose:
print("Sufficient patterns generated, breaking")
break
if len(pattern_pool) == 0:
pattern_pool = [list(seed)]
if verbose:
print("Size of pattern pool:", len(pattern_pool))
return pattern_pool
def mine(self, data, min_length=6, k=20, tau=0.1, max_iterations=50, verbose=False):
# initialize by mining all frequent patterns up to length 3
fp_tree = FPTree(min_support=self.min_support)
_, _ = fp_tree.create_tree(data)
self.f_patterns = fp_tree.mine(max_length=3)
if verbose:
print("Tree created")
# put them all into a pattern pool
full_pattern_pool = []
for patt_len, patterns in self.f_patterns.items():
# skip length 1 patterns, they probably won't be too useful for merging
if patt_len > 1:
for pattern, support in patterns.items():
full_pattern_pool.append(pattern)
pattern_pool = full_pattern_pool
if verbose:
print("Pattern pool created", len(pattern_pool))
# iterate the PatternFusion algorithm
while len(pattern_pool) >= k:
# pick k seed patterns from the pool at random
if len(pattern_pool) >= k:
pool = random.sample(pattern_pool, k)
else:
pool = pattern_pool
pattern_pool = []
for seed in pool:
nearby_patterns = []
new_pattern = []
# get the distance between each pattern and the other patterns
for other_pattern in full_pattern_pool:
if self.pattern_distance(seed, other_pattern, data) < tau:
for item in other_pattern:
if item not in new_pattern and item not in list(seed):
new_pattern.append(item)
# get the distance between each pattern and all other patterns in the pool
for other_pattern in pattern_pool:
if self.pattern_distance(seed, other_pattern, data) < tau:
for item in other_pattern:
if item not in new_pattern:
new_pattern.append(item)
pattern_pool = self.fusion(seed, new_pattern, data, pattern_pool, verbose)
# add the
if verbose:
print("New pool:", pattern_pool, "size:", len(pattern_pool))
print()
# filter out patterns which meet our min_support
frequent_patterns = []
for pattern in pattern_pool:
if len(pattern) > self.min_support and pattern not in frequent_patterns:
frequent_patterns.append(pattern)
return frequent_patterns
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment