Skip to content

Instantly share code, notes, and snippets.

@treystout
Created January 23, 2014 22:08
Show Gist options
  • Save treystout/8587801 to your computer and use it in GitHub Desktop.
Save treystout/8587801 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys
import string
from collections import deque, defaultdict
def findTerm( search, text ):
# Unique set of search terms (lowercase)
terms = set( s for s in search.lower().split(" ") )
# List of all the words (lowercase)
words = [t for t in text.lower().split(" ")]
# A dictionary of term -> positions
tPositions = defaultdict( deque )
# The last term seen in each iteration
lastTerm = None
# The best slice we've found so far
bestSlice = []
for i, w in enumerate( words ):
# Is this one of the search terms?
if w in terms:
# Add its position to the hash
tPositions[ w ].append( i )
# Have we seen this term more than once?
if len( tPositions[ w ] ) > 1:
# If this is the only term we've seen so far, drop the last position and use the latest
if len( tPositions ) == 1:
tPositions[ w ].popleft()
elif lastTerm and lastTerm != w:
newDistance = i - tPositions[ lastTerm ][ -1 ]
oldDistance = tPositions[ lastTerm ][ -1 ] - tPositions[ w ][ -2 ]
if newDistance < oldDistance:
tPositions[ w ].popleft()
elif len( tPositions ) > 2:
tPositions[ w ].pop()
if len( tPositions ) == len( terms ):
newSlice = sorted( p[-1] for p in tPositions.values() )
newSliceSize = newSlice[-1] - newSlice[0]
if not bestSlice:
bestSlice = newSlice
elif newSliceSize < bestSlice[-1] - bestSlice[0]:
bestSlice = newSlice
# Update the lastTerm
lastTerm = w
return words[ bestSlice[0] : bestSlice[-1] + 1 ]
text = "The George Washington Bridge in New York City is one of the oldest bridges ever constructed " \
"It is now being remodeled because the bridge is a landmark City officials say that the landmark " \
"bridge effort will create a lot of new jobs in the city " * 10000
search = "Landmark City Bridge"
result = findTerm( search, text )
print result
#!/usr/bin/env python
import re
import sys
import heapq
from collections import defaultdict
TEXT = """
The George Washington Bridge in New York City is one of the oldest bridges ever constructed. It is
now being remodeled because the bridge is a landmark. City officials say that the landmark bridge
effort will create a lot of new jobs in the city.
""".strip()
SEARCH_TERMS = ["Landmark", "City", "Bridge"]
class Solution( object ):
NUM_INSTANCES = 0
def __init__( self, sortedChoices, remainingClusters ):
self.sortedChoices = sortedChoices
self.remainingClusters = remainingClusters
Solution.NUM_INSTANCES += 1
def __cmp__( self, other ):
return cmp( self.minCost(), other.minCost() )
# True if this solution is complete
def isComplete( self ):
return not self.remainingClusters
# The distance between two scalar ranges
def rangeDist( self, lo1, hi1, lo2, hi2 ):
if hi1 > lo2 and lo1 < hi2:
return 0
elif hi2 > lo1 and lo2 < hi1:
return 0
elif hi1 < lo2:
return lo2 - hi1
else:
return lo1 - hi2
# The minimum possible cost for this solution
def minCost( self ):
if not hasattr( self, "_minCost" ):
realCost = self.sortedChoices[ -1 ] - self.sortedChoices[ 0 ]
minRemainingCost = min(
self.rangeDist( self.sortedChoices[ 0 ], self.sortedChoices[ -1 ], c[ 0 ], c[ -1 ] )
for c in self.remainingClusters ) if self.remainingClusters else 0
self._minCost = realCost + minRemainingCost
return self._minCost
def main():
# Split up the input text
words = TEXT.split()
# A pattern to clean the input
patt = re.compile( r"\W+" )
# Make a map of the indices that words appear at
wordsByPos = defaultdict( list )
for i, word in enumerate( words ):
wordsByPos[ patt.sub( "", word.lower() ) ].append( i )
# For each search term, make a list of the indices it appears at
termsByPos = dict( (term, wordsByPos.get( term.lower(), [] )) for term in SEARCH_TERMS )
# The values from the dict are the positions we need to pick one of to get the solution.
# Put the smallest ones first to go most constrained values first.
clusters = sorted( filter( bool, termsByPos.values() ), key = lambda c: len( c ) )
optimalSolution = None
heap = []
iterations = 0
# Make a bunch of solutions for the choices in the first cluster
for i in clusters[ 0 ]:
heapq.heappush( heap, Solution( [i], clusters[ 1: ] ) )
# Keep making new solutions until one is complete
while True:
s = heapq.heappop( heap )
# If it's complete, we're done!
if s.isComplete():
optimalSolution = s
break
# Otherwise make children of this solution and put them in the queue
else:
for i in s.remainingClusters[ 0 ]:
childSolution = Solution( sorted( s.sortedChoices + [i] ), s.remainingClusters[ 1: ] )
heapq.heappush( heap, childSolution )
iterations += 1
print words[ optimalSolution.sortedChoices[ 0 ] : optimalSolution.sortedChoices[ -1 ] + 1 ]
print "Took %d total iterations, %d partial solutions" % (iterations, Solution.NUM_INSTANCES)
return 0
if __name__ == "__main__":
# import hotshot
# import hotshot.stats
# prof = hotshot.Profile( "snippet.prof" )
# prof.start()
sys.exit( main() )
# prof.stop()
# prof.close()
from re import compile
import itertools
# Query/indexing parsing needs symmetric processing
DELIMITERS = '\s+|\.+|,+'
class Term:
"""
Term represents a word in the index, its character offset from the
beginning of the original document and its relative word offset.
"""
def __init__(self, term, char_offset, word_offset):
self.term = term.lower()
self.char_offset = char_offset
self.word_offset = word_offset
def __str__(self):
return self.term + "[" + str(self.word_offset) + "]"
class TermIndex:
"""
TermIndex is essentially keeps track of all index terms, their character
offsets (ascending) and their word offsets in the document
"""
def __init__(self, doc):
self.document = doc
self._index_terms = {}
self._parse_document()
def term_occurrences(self, term):
"""Return the occurrences of and indexed term, if found"""
occurrences = None
if term in self._index_terms:
occurrences = self._index_terms[term]
return occurrences
# PRIVATE
def _parse_document(self):
"""Parses the document to be indexed, identifying its Terms"""
# Parse doc and build term index
ignore = compile(DELIMITERS)
index_term = None
char_offset = 0
word_offset = 0
# Add space to delimit the document and simplify parsing
doc_to_parse = self.document + ' '
for c in doc_to_parse:
if ignore.match(c) == None:
if index_term == None:
# Start building our term
index_term = Term(c, char_offset, word_offset)
word_offset += 1
else:
# Add to our term
index_term.term += c
elif index_term:
if index_term.term in self._index_terms:
# Subsequent Term appearances in the document
self._index_terms[index_term.term].append(index_term)
else:
# Term appears for the first time
self._index_terms[index_term.term] = [index_term]
index_term = None
char_offset += 1
class Query:
"""
Query represents a query against an index. In effect, this class represents
a poorman's proximity search algorithm.
"""
def __init__(self, query):
self.original_query = query
delimiters = compile(DELIMITERS)
# Create a list of usable original query
self.query_terms = filter(lambda x: len(x) > 0, delimiters.split(query.lower()))
self.query_results = []
def perform(self, term_index):
"""Performs the query on the provided term_index"""
# No terms? No results
if len(self.original_query) == 0:
print "Nothing to search for."
return
# Try to find all terms in the index
found_terms = []
missing_term = False
for query_term in self.query_terms:
term_occurrences = term_index.term_occurrences(query_term)
if term_occurrences:
found_terms.extend(term_occurrences)
else:
missing_term = True
break;
# All terms must be found
if missing_term:
print "Not all terms were found."
return
best_match = self._best_match(found_terms)
self._highlight(best_match, term_index.document)
# PRIVATE
def _best_match(self, found_terms):
"""Return best match phrase of all possible options"""
combos = itertools.permutations(found_terms, len(self.query_terms))
complete_results = filter(self._complete, combos)
deduped_results = {}
for result in complete_results:
sorted_result = sorted(result, key=lambda x: x.word_offset)
# Create a key for the deduped results dictionary
key = ''
for r in sorted_result:
key += r.term + str(r.char_offset)
if key not in deduped_results:
self.query_results.append(sorted_result)
deduped_results[key] = True
best_distance = None
best_match = None
# The terms are now in order, we now calculate their total word
# distance score by adding up each offset from the first to the
# second word, second to third word, etc.
#
# lowest distance score wins. Ties are broken by the earliest match.
for qr in self.query_results:
total_distance = 0
word_offset = qr[0].word_offset
k = ''
for idx, term in enumerate(qr):
if idx > 0:
total_distance += term.word_offset - word_offset
word_offset = term.word_offset
k += str(term) + ' '
if best_distance == None or best_distance > total_distance:
best_distance = total_distance
best_match = qr # our best match so far
return best_match
def _complete(self, x):
"Return True if the query terms consist of comlete unique terms"""
keys = {}
for t in x:
keys[t.term] = t
return len(keys) == len(self.query_terms)
def _highlight(self, match, doc):
"""Simple highlighter indicating which block of text matched"""
print ""
print "Highlighted results below:"
print ""
start_offset = match[0].char_offset
last_term = match[len(match) - 1]
end_offset = last_term.char_offset + len(last_term.term)
print doc[:start_offset] + '[' + \
doc[start_offset:end_offset] + ']' + \
doc[end_offset:]
def test():
ti = TermIndex("""The George Washington Bridge in New York City is one of
the oldest bridges ever constructed. It is now being remodeled because the
bridge is a landmark. City officials say that the landmark bridge effort
will create a lot of new jobs in the city.""" * 1 )
default_query = "Landmark City Bridge"
while True:
print ""
prompt = 'Enter your query (q to quit) [{}]: '.format(default_query)
qt = "" # raw_input(prompt)
if len(qt) == 0:
qt = default_query
else:
qt = qt.strip()
if qt == 'q':
break;
q = Query(qt)
q.perform(ti)
break
if __name__ == "__main__":
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment