Skip to content

Instantly share code, notes, and snippets.

@petersohn
Last active August 29, 2015 13:56
Show Gist options
  • Save petersohn/9197557 to your computer and use it in GitHub Desktop.
Save petersohn/9197557 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from HTMLParser import HTMLParser
import re
import io
import sys
import datetime
import time
import httplib
from argparse import ArgumentParser
from threading import Thread, Lock, Condition
import threading
import signal
def printLevel(level, *args):
global debugOn
if not debugOn:
return
output = ""
for i in range(0, level):
output += str(i % 10)
output += " "
for arg in args:
output += str(arg)
output += "\n"
sys.stderr.write(output)
def filterAscii(s):
return filter(lambda c: ord(c) < 128, s)
def logThread(s):
sys.stderr.write(str(datetime.datetime.now()) + " " +
threading.current_thread().name + ": " + s + "\n")
class ThreadPool:
def __init__(self, numThreads):
self.threads = []
self.tasks = []
self.condition = Condition(Lock())
self.running = True
while len(self.threads) < numThreads:
newThread = Thread(target = self._run)
newThread.start()
self.threads += [newThread]
def _taskLenStr(self):
return "(n=" + str(len(self.tasks)) + ")"
def join(self, stop = False):
self.condition.acquire()
logThread("Finishing tasks, waking up threads " + self._taskLenStr())
try:
self.running = False
if stop:
self.tasks = []
self.condition.notify_all()
finally:
self.condition.release()
logThread("Joining threads")
for thread in self.threads:
thread.join()
logThread(thread.name + " joined")
logThread("All threads joined")
def addTask(self, task):
logThread("adding new task to thread pool " + self._taskLenStr())
if not self.running:
logThread("Cannot add task: already stopped.")
self.condition.acquire()
try:
self.tasks.append(task)
logThread("New task added to thread pool " + self._taskLenStr())
self.condition.notify()
finally:
self.condition.release()
def _run(self):
while True:
self.condition.acquire()
logThread("Waiting for task " + self._taskLenStr())
try:
while self.running and len(self.tasks) == 0:
logThread("Task queue empty, blocking")
self.condition.wait()
if not self.running and len(self.tasks) == 0:
logThread("Quit")
return
task = self.tasks.pop()
except KeyboardInterrupt:
logThread("Interrupted.")
return
except:
logThread("Thread execution failed.")
finally:
self.condition.release()
logThread("Running next task " + self._taskLenStr())
task()
logThread("Task finished " + self._taskLenStr())
class Tagger:
def __init__(self, tagname, condition, nth = 0, beginAction=None, endAction = None):
self.tagname = tagname
self.condition = condition
self.nth = nth
self.beginAction = beginAction
self.endAction = endAction
self.reset()
def start(self, level, tag, attrs):
if not self.active and tag == self.tagname and self.condition(attrs):
self.activeLevel = level
self.levelCounter += 1
self.active = True
if self.beginAction:
self.beginAction(attrs)
printLevel(level, tag, " activated (counter = ", self.levelCounter, ")")
def end(self, level, tag):
if self.active and level == self.activeLevel:
printLevel(level, self.tagname, " deactivated (counter = ", self.levelCounter, ")")
self.active = False
if self.endAction:
self.endAction()
if level < self.activeLevel:
self.levelCounter = 0
def isActive(self):
return self.active and (self.nth == 0 or self.nth == self.levelCounter)
def reset(self):
self.active = False
self.activeLevel = 0
self.levelCounter = 0
def hasAttrWithValue(attrName, value):
return lambda attrs: attrName in attrs and attrs[attrName] == value
def hasAttrWithRegexMatch(attrName, regex):
return lambda attrs: attrName in attrs and regex.match(attrs[attrName])
def _and(func1, func2):
return lambda *args, **keys: func1(*args, **keys) and func2(*args, **keys)
def isTaggerActive(tagger):
return lambda *args, **keys: tagger.isActive()
def readUrl(url):
protocolPosition = url.find("://")
if url[:protocolPosition] != "http":
logThread("Bad protocol")
raise Exception()
url = url[protocolPosition + 3:]
slashPosition = url.find("/")
address = url[0:slashPosition]
requestString = url[slashPosition:]
logThread("readUrl: create connection: " + address)
connection = httplib.HTTPConnection(address, timeout=60)
logThread("readUrl: request: " + requestString)
connection.request("GET", requestString)
response = connection.getresponse()
if response.status != httplib.OK:
logThread("readUrl: error " + str(response.status) + " " + response.reason)
connection.close()
raise Exception()
result = response.read()
connection.close()
return result
class PageParser(HTMLParser):
def handle_starttag(self, tag, attrs):
self.tagStack += [tag]
printLevel(self._getLevel(), "handling tag start: ", tag, " (", attrs, ")")
attrDict = dict(attrs)
for tagger in self.taggers:
tagger.start(self._getLevel(), tag, attrDict)
def handle_endtag(self, tag):
while len(self.tagStack) > 0 and self.tagStack[len(self.tagStack) - 1] != tag:
printLevel(self._getLevel(), "found unfinished tag: ", self.tagStack[len(self.tagStack) - 1])
self._popTagStack()
printLevel(self._getLevel(), "handling tag end: ", tag)
for tagger in self.taggers:
tagger.end(self._getLevel(), tag)
self._popTagStack()
def handle_data(self, data):
if self.dataHandler:
self.dataHandler(data)
def _getLevel(self):
return len(self.tagStack)
def _popTagStack(self):
self.tagStack = self.tagStack[0:len(self.tagStack) - 1]
def readPage(self, url):
self._reset()
logThread(url + " : reading page")
retryNum = 0
while True:
try:
pageContents = readUrl(url)
except KeyboardInterrupt:
logThread(url + " : Interrupted")
raise
except:
logThread(url + " : Error reading page (retry #" + str(retryNum) + ")")
if retryNum == len(self.sleepTimes):
logThread(url + " : Failed to read.")
raise
sleepTime = self.sleepTimes[retryNum]
logThread("Waiting " + str(sleepTime))
time.sleep(sleepTime)
retryNum += 1
continue
break
logThread(url + " : read finished, parsing")
self.feed(filterAscii(pageContents))
logThread(url + " : parse finished")
def _reset(self):
self.tagStack = []
for tagger in self.taggers:
tagger.reset()
def __init__(self, taggers, dataHandler):
HTMLParser.__init__(self)
self.taggers = taggers
self.dataHandler = dataHandler
self._reset()
self.sleepTimes = [0, 0, 1, 1, 2, 2, 4, 4, 8, 8, 16, 16, 32, 32]
#self.sleepTimes = [0, 0, 1, 1]
def urlSubpage(url, subpage):
return re.compile("^(.*/)[^/]*$").match(url).group(1) + subpage
class ThreadParser:
def readThread(self, url):
self.output = {}
while url not in self.urlsRead:
self._resetPage()
self.pageParser.readPage(url)
logThread(url + " : Posts found:" + str(self.foundCount))
self.urlsRead.add(url)
if self.noContinue or not self.nextPage:
for postDate, postNum in self.output.items():
sys.stdout.write(self.prefix + postDate + ";" + str(postNum) + "\n")
return
url = urlSubpage(url, self.nextPage)
def _resetPage(self):
self.nextPage = None
self.foundCount = 0
def _setNextPage(self, title):
self.nextPage = title
def handleData(self, data):
if self.inTd.isActive():
searchResult = self.parsedDateFormat.search(data)
if searchResult:
month, day, year = searchResult.group(1, 2, 3)
date = datetime.date(int(year), int(month), int(day))
self.foundCount += 1
postDate = date.strftime(self.dateFormat)
if not postDate in self.output:
self.output[postDate] = 0
self.output[postDate] += 1
def __init__(self, dateFormat = "%Y-%m-%d", noContinue = False, prefix = ""):
self.dateFormat = dateFormat
self.noContinue = noContinue
self.prefix = prefix
self.inPosts = Tagger("div", hasAttrWithValue("id", "posts"))
self.inTable = Tagger("table", _and(
isTaggerActive(self.inPosts),
hasAttrWithRegexMatch("id", re.compile('post\d+$'))))
self.inTr = Tagger("tr", isTaggerActive(self.inTable), 1)
self.inTd = Tagger("td", isTaggerActive(self.inTr), 1)
self.inNextPage = Tagger("a",
hasAttrWithRegexMatch("title", re.compile("Next Page")),
beginAction = lambda attrs: self._setNextPage(attrs["href"]))
self.pageParser = PageParser(
[self.inPosts, self.inTable, self.inTr, self.inTd, self.inNextPage],
self.handleData)
self.parsedDateFormat = re.compile("(\d{2})-(\d{2})-(\d{4})")
self.output = {}
self.urlsRead = set()
self._resetPage()
def parseThread(urls, *args, **keys):
parser = ThreadParser(*args, **keys)
for url in urls:
parser.readThread(url)
class ForumParser:
def readForum(self, url):
while url not in self.urlsRead:
self._setNextPage(None)
self.currentUrl = url
self.pageParser.readPage(url)
self.urlsRead.add(url)
if self.noContinue or not self.nextPage:
return
url = urlSubpage(url, self.nextPage)
def joinReaders(self, stop):
self.pool.join(stop)
def _setNextPage(self, title):
self.nextPage = title
def _setThreadPage(self, title):
self.threadPage = title
def _beginA(self, link):
self._setThreadPage(link)
self.title = ""
def parseThread(self, url, title):
try:
parseThread([url], self.dateFormat, self.noContinue, title + ";")
except KeyboardInterrupt:
raise
except:
logThread("Failed to read thread: " + title + "(" + url + ")")
def _endA(self):
if not self.threadPage:
logThread("Oops! Thread URL not found")
return
if not self.threadFilter or re.search(self.threadFilter, self.title):
logThread("Reading thread: " + self.title)
global debugOn
if debugOn:
return
url = urlSubpage(self.currentUrl, self.threadPage)
if url in self.threadsRead:
logThread("Thread already read, ignoring.")
return
self.threadsRead.add(url)
title = self.title
self.pool.addTask(lambda: self.parseThread(url, title))
self._setThreadPage(None)
def handleData(self, data):
if self.inA.isActive():
self.title += data
def __init__(self, threadFilter, dateFormat, noContinue, numProcesses):
self.dateFormat = dateFormat
self.noContinue = noContinue
self.threadFilter = None
if threadFilter:
self.threadFilter = re.compile(threadFilter)
self.inA = Tagger("a",
hasAttrWithRegexMatch("id", re.compile("thread_title_")),
beginAction = lambda attrs: self._beginA(attrs["href"]),
endAction = self._endA)
self.inNextPage = Tagger("a",
hasAttrWithRegexMatch("title", re.compile("Next Page")),
beginAction = lambda attrs: self._setNextPage(attrs["href"]))
self.pageParser = PageParser([self.inA, self.inNextPage], self.handleData)
self.pool = ThreadPool(numProcesses)
self.urlsRead = set()
self.threadsRead = set()
def main():
argumentParser = ArgumentParser()
argumentParser.add_argument('-f', '--date-format', dest='dateFormat',
default = "%Y-%m-%d", help = "Date format for output")
argumentParser.add_argument('--forum', default = False,
action = "store_true", help = "Parse a forum instead of a thread")
argumentParser.add_argument('--thread-filter', dest='threadFilter',
default=None, help = "Regex to filter thread titles")
argumentParser.add_argument('--num-processes', dest='numProcesses',
default=32, type=int,
help = "Number of processes used for forum parsing")
argumentParser.add_argument('--no-continue', dest='noContinue',
default = False, action = "store_true",
help = "Do not continue to further pages")
argumentParser.add_argument('-d', '--debug', dest='debugOn',
default = False, action = "store_true",
help = "Print debug information for parsing")
argumentParser.add_argument('url', nargs='+', help = "The URLs to parse")
args = argumentParser.parse_args()
global debugOn
debugOn = args.debugOn
if args.forum:
try:
forumParser = ForumParser(args.threadFilter,
args.dateFormat, args.noContinue,
numProcesses = args.numProcesses)
for url in args.url:
forumParser.readForum(url)
except KeyboardInterrupt:
sys.stderr.write("Interrupted. Waiting for threads to finish...\n");
forumParser.joinReaders(stop=True)
return 1
finally:
forumParser.joinReaders(stop=False)
else:
parseThread(args.url, dateFormat = args.dateFormat,
noContinue = args.noContinue, prefix=";")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment