Skip to content

Instantly share code, notes, and snippets.

@nishnik
Created October 28, 2016 22:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nishnik/dec38f723d1054e9ebb0d30891266ce3 to your computer and use it in GitHub Desktop.
Save nishnik/dec38f723d1054e9ebb0d30891266ce3 to your computer and use it in GitHub Desktop.
def load_dataset():
"Load the sample dataset. must be numbered from 1"
return [[1, 3, 4], [2, 3, 5], [1, 2, 3, 5], [2, 5]]
def createC1(dataset):
"Create a list of candidate item sets of size one."
c1 = []
for transaction in dataset:
for item in transaction:
if not [item] in c1:
c1.append([item])
c1.sort()
#frozenset because it will be a ket of a dictionary.
return map(frozenset, c1)
def scanD(dataset, candidates, min_support):
"Returns all candidates that meets a minimum support level"
sscnt = {}
for tid in dataset:
for can in candidates:
if can.issubset(tid):
sscnt.setdefault(can, 0)
sscnt[can] += 1
num_items = float(len(dataset))
retlist = []
support_data = {}
for key in sscnt:
support = sscnt[key] / num_items
if support >= min_support:
retlist.insert(0, key)
support_data[key] = support
return retlist, support_data
def aprioriGen(freq_sets, k):
"Generate the joint transactions from candidate sets"
retList = []
lenLk = len(freq_sets)
for i in range(lenLk):
for j in range(i + 1, lenLk):
L1 = list(freq_sets[i])[:k - 2]
L2 = list(freq_sets[j])[:k - 2]
L1.sort()
L2.sort()
if L1 == L2:
retList.append(freq_sets[i] | freq_sets[j])
return retList
def apriori(dataset, C1, minsupport=0):
"Generate a list of candidate item sets"
D = map(set, dataset)
L1, support_data = scanD(D, C1, minsupport)
L = [L1]
k = 2
while (len(L[k - 2]) > 0):
Ck = aprioriGen(L[k - 2], k)
Lk, supK = scanD(D, Ck, minsupport)
support_data.update(supK)
L.append(Lk)
k += 1
return L, support_data
def generateRules(L, support_data, min_confidence=0.3):
"""Create the association rules
L: list of frequent item sets
support_data: support data for those itemsets
min_confidence: minimum confidence threshold
"""
rules = []
for i in range(1, len(L)):
for freqSet in L[i]:
H1 = [frozenset([item]) for item in freqSet]
# print "freqSet", freqSet, 'H1', H1
if (i > 1):
rules_from_conseq(freqSet, H1, support_data, rules, min_confidence)
else:
calc_confidence(freqSet, H1, support_data, rules, min_confidence)
return rules
def calc_confidence(freqSet, H, support_data, rules, min_confidence=0.3):
"Evaluate the rule generated"
pruned_H = []
for conseq in H:
conf = support_data[freqSet] / support_data[freqSet - conseq]
lift = support_data[freqSet] / (support_data[freqSet - conseq] * support_data[conseq])
if conf >= min_confidence:
# print freqSet - conseq, '--->', conseq, 'conf:', conf
rules.append((freqSet - conseq, conseq, conf, lift))
pruned_H.append(conseq)
return pruned_H
def rules_from_conseq(freqSet, H, support_data, rules, min_confidence=0.3):
"Generate a set of candidate rules"
m = len(H[0])
if (len(freqSet) > (m + 1)):
Hmp1 = aprioriGen(H, m + 1)
Hmp1 = calc_confidence(freqSet, Hmp1, support_data, rules, min_confidence)
if len(Hmp1) > 1:
rules_from_conseq(freqSet, Hmp1, support_data, rules, min_confidence)
def from_apriori_rule_to_genetic_data(rule):
to_ret = [0]*2*len_data
for a in rule[0]:
to_ret[a-1] = 1
for a in rule[1]:
to_ret[len_data+a-1] = 1
return to_ret
def from_genetic_data_to_frozen(gen_d):
first = []
second = []
for i in range(2*len_data):
if gen_d[i] == 1:
if i < len_data:
first.append(i+1)
else:
second.append(i-len_data+1)
return ((frozenset(sorted(first)), frozenset(sorted(second))))
dataset = load_dataset()
C1 = createC1(dataset)
len_data = len(C1) # must be global
L, support_data = apriori(dataset, C1)
rules = generateRules(L, support_data)
initial_population_rules = sorted(rules, key=lambda x: x[2], reverse = True)[0:5]
initial_population = []
for a in initial_population_rules:
tmp = from_apriori_rule_to_genetic_data(a)
initial_population.append(tmp)
def fitness(gen_d, target):
tmp = from_genetic_data_to_frozen(gen_d)
try:
lift = support_data[tmp[0].union(tmp[1])] / (support_data[tmp[0]] * support_data[tmp[1]])
except:
lift = -100
return (target - lift)
def add(x,y): return x+y
def grade(pop, target):
'Find average fitness for a population.'
summed = reduce(add, (fitness(x, target) for x in pop), 0)
return summed / (len(pop) * 1.0)
from random import random, randint
def evolve(pop, target, retain=0.5, random_select=0.05, mutate=0.01):
graded = [ (fitness(x, target), x) for x in pop]
graded = [ x[1] for x in sorted(graded)]
retain_length = int(len(graded)*retain)
parents = graded[:retain_length]
# randomly add other individuals to promote genetic diversity
for individual in graded[retain_length:]:
if random_select > random():
parents.append(individual)
# mutate some individuals
for individual in parents:
if mutate > random():
pos_to_mutate = randint(0, len(individual)-1)
# this mutation is not ideal, because it
# restricts the range of possible values,
# but the function is unaware of the min/max
# values used to create the individuals,
individual[pos_to_mutate] = randint(
min(individual), max(individual))
# crossover parents to create children
parents_length = len(parents)
desired_length = len(pop) - parents_length
children = []
while len(children) < desired_length:
male = randint(0, parents_length-1)
female = randint(0, parents_length-1)
if male != female:
male = parents[male]
female = parents[female]
half = len(male) / 2
child = male[:half] + female[half:]
children.append(child)
parents.extend(children)
return parents
target = 1.5
p = initial_population
fitness_history = [grade(p, target),]
for i in xrange(100):
p = evolve(p, target)
fitness_history.append(grade(p, target))
# You will see fitness_history decreasing
# This has some short comings that the population might become redundant
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment