Last active
August 29, 2015 13:56
-
-
Save petersohn/9197557 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from HTMLParser import HTMLParser | |
import re | |
import io | |
import sys | |
import datetime | |
import time | |
import httplib | |
from argparse import ArgumentParser | |
from threading import Thread, Lock, Condition | |
import threading | |
import signal | |
def printLevel(level, *args): | |
global debugOn | |
if not debugOn: | |
return | |
output = "" | |
for i in range(0, level): | |
output += str(i % 10) | |
output += " " | |
for arg in args: | |
output += str(arg) | |
output += "\n" | |
sys.stderr.write(output) | |
def filterAscii(s): | |
return filter(lambda c: ord(c) < 128, s) | |
def logThread(s): | |
sys.stderr.write(str(datetime.datetime.now()) + " " + | |
threading.current_thread().name + ": " + s + "\n") | |
class ThreadPool: | |
def __init__(self, numThreads): | |
self.threads = [] | |
self.tasks = [] | |
self.condition = Condition(Lock()) | |
self.running = True | |
while len(self.threads) < numThreads: | |
newThread = Thread(target = self._run) | |
newThread.start() | |
self.threads += [newThread] | |
def _taskLenStr(self): | |
return "(n=" + str(len(self.tasks)) + ")" | |
def join(self, stop = False): | |
self.condition.acquire() | |
logThread("Finishing tasks, waking up threads " + self._taskLenStr()) | |
try: | |
self.running = False | |
if stop: | |
self.tasks = [] | |
self.condition.notify_all() | |
finally: | |
self.condition.release() | |
logThread("Joining threads") | |
for thread in self.threads: | |
thread.join() | |
logThread(thread.name + " joined") | |
logThread("All threads joined") | |
def addTask(self, task): | |
logThread("adding new task to thread pool " + self._taskLenStr()) | |
if not self.running: | |
logThread("Cannot add task: already stopped.") | |
self.condition.acquire() | |
try: | |
self.tasks.append(task) | |
logThread("New task added to thread pool " + self._taskLenStr()) | |
self.condition.notify() | |
finally: | |
self.condition.release() | |
def _run(self): | |
while True: | |
self.condition.acquire() | |
logThread("Waiting for task " + self._taskLenStr()) | |
try: | |
while self.running and len(self.tasks) == 0: | |
logThread("Task queue empty, blocking") | |
self.condition.wait() | |
if not self.running and len(self.tasks) == 0: | |
logThread("Quit") | |
return | |
task = self.tasks.pop() | |
except KeyboardInterrupt: | |
logThread("Interrupted.") | |
return | |
except: | |
logThread("Thread execution failed.") | |
finally: | |
self.condition.release() | |
logThread("Running next task " + self._taskLenStr()) | |
task() | |
logThread("Task finished " + self._taskLenStr()) | |
class Tagger: | |
def __init__(self, tagname, condition, nth = 0, beginAction=None, endAction = None): | |
self.tagname = tagname | |
self.condition = condition | |
self.nth = nth | |
self.beginAction = beginAction | |
self.endAction = endAction | |
self.reset() | |
def start(self, level, tag, attrs): | |
if not self.active and tag == self.tagname and self.condition(attrs): | |
self.activeLevel = level | |
self.levelCounter += 1 | |
self.active = True | |
if self.beginAction: | |
self.beginAction(attrs) | |
printLevel(level, tag, " activated (counter = ", self.levelCounter, ")") | |
def end(self, level, tag): | |
if self.active and level == self.activeLevel: | |
printLevel(level, self.tagname, " deactivated (counter = ", self.levelCounter, ")") | |
self.active = False | |
if self.endAction: | |
self.endAction() | |
if level < self.activeLevel: | |
self.levelCounter = 0 | |
def isActive(self): | |
return self.active and (self.nth == 0 or self.nth == self.levelCounter) | |
def reset(self): | |
self.active = False | |
self.activeLevel = 0 | |
self.levelCounter = 0 | |
def hasAttrWithValue(attrName, value): | |
return lambda attrs: attrName in attrs and attrs[attrName] == value | |
def hasAttrWithRegexMatch(attrName, regex): | |
return lambda attrs: attrName in attrs and regex.match(attrs[attrName]) | |
def _and(func1, func2): | |
return lambda *args, **keys: func1(*args, **keys) and func2(*args, **keys) | |
def isTaggerActive(tagger): | |
return lambda *args, **keys: tagger.isActive() | |
def readUrl(url): | |
protocolPosition = url.find("://") | |
if url[:protocolPosition] != "http": | |
logThread("Bad protocol") | |
raise Exception() | |
url = url[protocolPosition + 3:] | |
slashPosition = url.find("/") | |
address = url[0:slashPosition] | |
requestString = url[slashPosition:] | |
logThread("readUrl: create connection: " + address) | |
connection = httplib.HTTPConnection(address, timeout=60) | |
logThread("readUrl: request: " + requestString) | |
connection.request("GET", requestString) | |
response = connection.getresponse() | |
if response.status != httplib.OK: | |
logThread("readUrl: error " + str(response.status) + " " + response.reason) | |
connection.close() | |
raise Exception() | |
result = response.read() | |
connection.close() | |
return result | |
class PageParser(HTMLParser): | |
def handle_starttag(self, tag, attrs): | |
self.tagStack += [tag] | |
printLevel(self._getLevel(), "handling tag start: ", tag, " (", attrs, ")") | |
attrDict = dict(attrs) | |
for tagger in self.taggers: | |
tagger.start(self._getLevel(), tag, attrDict) | |
def handle_endtag(self, tag): | |
while len(self.tagStack) > 0 and self.tagStack[len(self.tagStack) - 1] != tag: | |
printLevel(self._getLevel(), "found unfinished tag: ", self.tagStack[len(self.tagStack) - 1]) | |
self._popTagStack() | |
printLevel(self._getLevel(), "handling tag end: ", tag) | |
for tagger in self.taggers: | |
tagger.end(self._getLevel(), tag) | |
self._popTagStack() | |
def handle_data(self, data): | |
if self.dataHandler: | |
self.dataHandler(data) | |
def _getLevel(self): | |
return len(self.tagStack) | |
def _popTagStack(self): | |
self.tagStack = self.tagStack[0:len(self.tagStack) - 1] | |
def readPage(self, url): | |
self._reset() | |
logThread(url + " : reading page") | |
retryNum = 0 | |
while True: | |
try: | |
pageContents = readUrl(url) | |
except KeyboardInterrupt: | |
logThread(url + " : Interrupted") | |
raise | |
except: | |
logThread(url + " : Error reading page (retry #" + str(retryNum) + ")") | |
if retryNum == len(self.sleepTimes): | |
logThread(url + " : Failed to read.") | |
raise | |
sleepTime = self.sleepTimes[retryNum] | |
logThread("Waiting " + str(sleepTime)) | |
time.sleep(sleepTime) | |
retryNum += 1 | |
continue | |
break | |
logThread(url + " : read finished, parsing") | |
self.feed(filterAscii(pageContents)) | |
logThread(url + " : parse finished") | |
def _reset(self): | |
self.tagStack = [] | |
for tagger in self.taggers: | |
tagger.reset() | |
def __init__(self, taggers, dataHandler): | |
HTMLParser.__init__(self) | |
self.taggers = taggers | |
self.dataHandler = dataHandler | |
self._reset() | |
self.sleepTimes = [0, 0, 1, 1, 2, 2, 4, 4, 8, 8, 16, 16, 32, 32] | |
#self.sleepTimes = [0, 0, 1, 1] | |
def urlSubpage(url, subpage): | |
return re.compile("^(.*/)[^/]*$").match(url).group(1) + subpage | |
class ThreadParser: | |
def readThread(self, url): | |
self.output = {} | |
while url not in self.urlsRead: | |
self._resetPage() | |
self.pageParser.readPage(url) | |
logThread(url + " : Posts found:" + str(self.foundCount)) | |
self.urlsRead.add(url) | |
if self.noContinue or not self.nextPage: | |
for postDate, postNum in self.output.items(): | |
sys.stdout.write(self.prefix + postDate + ";" + str(postNum) + "\n") | |
return | |
url = urlSubpage(url, self.nextPage) | |
def _resetPage(self): | |
self.nextPage = None | |
self.foundCount = 0 | |
def _setNextPage(self, title): | |
self.nextPage = title | |
def handleData(self, data): | |
if self.inTd.isActive(): | |
searchResult = self.parsedDateFormat.search(data) | |
if searchResult: | |
month, day, year = searchResult.group(1, 2, 3) | |
date = datetime.date(int(year), int(month), int(day)) | |
self.foundCount += 1 | |
postDate = date.strftime(self.dateFormat) | |
if not postDate in self.output: | |
self.output[postDate] = 0 | |
self.output[postDate] += 1 | |
def __init__(self, dateFormat = "%Y-%m-%d", noContinue = False, prefix = ""): | |
self.dateFormat = dateFormat | |
self.noContinue = noContinue | |
self.prefix = prefix | |
self.inPosts = Tagger("div", hasAttrWithValue("id", "posts")) | |
self.inTable = Tagger("table", _and( | |
isTaggerActive(self.inPosts), | |
hasAttrWithRegexMatch("id", re.compile('post\d+$')))) | |
self.inTr = Tagger("tr", isTaggerActive(self.inTable), 1) | |
self.inTd = Tagger("td", isTaggerActive(self.inTr), 1) | |
self.inNextPage = Tagger("a", | |
hasAttrWithRegexMatch("title", re.compile("Next Page")), | |
beginAction = lambda attrs: self._setNextPage(attrs["href"])) | |
self.pageParser = PageParser( | |
[self.inPosts, self.inTable, self.inTr, self.inTd, self.inNextPage], | |
self.handleData) | |
self.parsedDateFormat = re.compile("(\d{2})-(\d{2})-(\d{4})") | |
self.output = {} | |
self.urlsRead = set() | |
self._resetPage() | |
def parseThread(urls, *args, **keys): | |
parser = ThreadParser(*args, **keys) | |
for url in urls: | |
parser.readThread(url) | |
class ForumParser: | |
def readForum(self, url): | |
while url not in self.urlsRead: | |
self._setNextPage(None) | |
self.currentUrl = url | |
self.pageParser.readPage(url) | |
self.urlsRead.add(url) | |
if self.noContinue or not self.nextPage: | |
return | |
url = urlSubpage(url, self.nextPage) | |
def joinReaders(self, stop): | |
self.pool.join(stop) | |
def _setNextPage(self, title): | |
self.nextPage = title | |
def _setThreadPage(self, title): | |
self.threadPage = title | |
def _beginA(self, link): | |
self._setThreadPage(link) | |
self.title = "" | |
def parseThread(self, url, title): | |
try: | |
parseThread([url], self.dateFormat, self.noContinue, title + ";") | |
except KeyboardInterrupt: | |
raise | |
except: | |
logThread("Failed to read thread: " + title + "(" + url + ")") | |
def _endA(self): | |
if not self.threadPage: | |
logThread("Oops! Thread URL not found") | |
return | |
if not self.threadFilter or re.search(self.threadFilter, self.title): | |
logThread("Reading thread: " + self.title) | |
global debugOn | |
if debugOn: | |
return | |
url = urlSubpage(self.currentUrl, self.threadPage) | |
if url in self.threadsRead: | |
logThread("Thread already read, ignoring.") | |
return | |
self.threadsRead.add(url) | |
title = self.title | |
self.pool.addTask(lambda: self.parseThread(url, title)) | |
self._setThreadPage(None) | |
def handleData(self, data): | |
if self.inA.isActive(): | |
self.title += data | |
def __init__(self, threadFilter, dateFormat, noContinue, numProcesses): | |
self.dateFormat = dateFormat | |
self.noContinue = noContinue | |
self.threadFilter = None | |
if threadFilter: | |
self.threadFilter = re.compile(threadFilter) | |
self.inA = Tagger("a", | |
hasAttrWithRegexMatch("id", re.compile("thread_title_")), | |
beginAction = lambda attrs: self._beginA(attrs["href"]), | |
endAction = self._endA) | |
self.inNextPage = Tagger("a", | |
hasAttrWithRegexMatch("title", re.compile("Next Page")), | |
beginAction = lambda attrs: self._setNextPage(attrs["href"])) | |
self.pageParser = PageParser([self.inA, self.inNextPage], self.handleData) | |
self.pool = ThreadPool(numProcesses) | |
self.urlsRead = set() | |
self.threadsRead = set() | |
def main(): | |
argumentParser = ArgumentParser() | |
argumentParser.add_argument('-f', '--date-format', dest='dateFormat', | |
default = "%Y-%m-%d", help = "Date format for output") | |
argumentParser.add_argument('--forum', default = False, | |
action = "store_true", help = "Parse a forum instead of a thread") | |
argumentParser.add_argument('--thread-filter', dest='threadFilter', | |
default=None, help = "Regex to filter thread titles") | |
argumentParser.add_argument('--num-processes', dest='numProcesses', | |
default=32, type=int, | |
help = "Number of processes used for forum parsing") | |
argumentParser.add_argument('--no-continue', dest='noContinue', | |
default = False, action = "store_true", | |
help = "Do not continue to further pages") | |
argumentParser.add_argument('-d', '--debug', dest='debugOn', | |
default = False, action = "store_true", | |
help = "Print debug information for parsing") | |
argumentParser.add_argument('url', nargs='+', help = "The URLs to parse") | |
args = argumentParser.parse_args() | |
global debugOn | |
debugOn = args.debugOn | |
if args.forum: | |
try: | |
forumParser = ForumParser(args.threadFilter, | |
args.dateFormat, args.noContinue, | |
numProcesses = args.numProcesses) | |
for url in args.url: | |
forumParser.readForum(url) | |
except KeyboardInterrupt: | |
sys.stderr.write("Interrupted. Waiting for threads to finish...\n"); | |
forumParser.joinReaders(stop=True) | |
return 1 | |
finally: | |
forumParser.joinReaders(stop=False) | |
else: | |
parseThread(args.url, dateFormat = args.dateFormat, | |
noContinue = args.noContinue, prefix=";") | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment