Created
December 17, 2019 22:50
-
-
Save andiac/319536a853103792f9e32f4f68285922 to your computer and use it in GitHub Desktop.
Aho-Corasick Automaton
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
class TrieNode: | |
def __init__(self, endWord = False, parent = None, char = ''): | |
# to record duplicate words | |
self.count = int(endWord) | |
self.children = dict() | |
# fail pointer of Aho Corasick Automaton | |
self.fail = None | |
# recording the parent in order to output the matched concept | |
self.parent = parent | |
# for elegant printing | |
self.char = char | |
class AhoCorasickAutomaton: | |
def __init__(self, concepts = [], buildFailPointers = False, allowDuplicateConcept=True): | |
self.trieRoot = TrieNode() | |
self.allowDuplicateConcept=allowDuplicateConcept | |
for concept in concepts: | |
self.addConcept(concept) | |
if buildFailPointers: | |
self.buildFailPointers() | |
def addConcept(self, concept): | |
pointer = self.trieRoot | |
concept_len = len(concept) | |
for i, char in enumerate(concept): | |
if char not in pointer.children: | |
pointer.children[char] = TrieNode(i == concept_len - 1, pointer, char) | |
else: | |
if i == concept_len - 1: | |
if self.allowDuplicateConcept: | |
pointer.children[char].count += 1 | |
else: | |
pointer.children[char].count = 1 | |
pointer = pointer.children[char] | |
def buildFailPointers(self): | |
queue = deque() | |
queue.append(self.trieRoot) | |
# BFS | |
while len(queue) > 0: | |
# current head | |
curHead = queue.popleft() | |
for char in curHead.children.keys(): | |
if curHead is self.trieRoot: | |
curHead.children[char].fail = self.trieRoot | |
else: | |
pointer = curHead.fail | |
# if the current head and its "fail posterity" have the children with same char, just spot the current head's child's fail pointer to the first "fail posterity"'s correspond child | |
while pointer is not None: | |
if char in pointer.children: | |
curHead.children[char].fail = pointer.children[char] | |
break | |
pointer = pointer.fail | |
if pointer is None: | |
curHead.children[char].fail = self.trieRoot | |
queue.append(curHead.children[char]) | |
def query(self, sentence): | |
result = [] | |
pointer = self.trieRoot | |
for char in sentence: | |
# lost, move to fail | |
while char not in pointer.children and pointer is not self.trieRoot: | |
pointer = pointer.fail | |
if char not in pointer.children: | |
pointer = self.trieRoot | |
else: | |
# move one forward step on trie | |
pointer = pointer.children[char] | |
# check if the "current suffix" is matched | |
backPointer = pointer | |
while backPointer is not self.trieRoot: | |
if backPointer.count > 0: # matched! | |
concept = '' | |
printPointer = backPointer | |
while printPointer is not self.trieRoot: | |
concept += printPointer.char | |
printPointer = printPointer.parent | |
for _ in range(backPointer.count): | |
result.append(concept[::-1]) # reverse | |
backPointer = backPointer.fail | |
return result | |
# test cases | |
if __name__ == '__main__': | |
ac1 = AhoCorasickAutomaton(['she', 'he', 'he', 'he', 'say', 'shr', 'her'], buildFailPointers=True, allowDuplicateConcept=False) | |
print(ac1.query('yasherhs')) | |
ac2 = AhoCorasickAutomaton(['she', 'he', 'he', 'he', 'say', 'shr', 'her'], buildFailPointers=True) | |
print(ac2.query('yasherhs')) | |
print(ac1.query('yasherhs yasherhs')) | |
print(ac2.query('yasherhs yasherhs')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment