Created
July 8, 2013 22:25
-
-
Save c0ldlimit/5953014 to your computer and use it in GitHub Desktop.
#python multiple #tor workers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import httplib | |
import socks | |
import urllib2 | |
from Queue import Queue | |
from threading import Thread, Condition, Lock | |
from threading import active_count as threading_active_count | |
import time | |
from pymongo import Connection | |
import pymongo | |
url_format = 'http://www.imdb.com/user/ur{0}/ratings' | |
http_codes_counter = {} | |
MONGODB_HOSTNAME = '192.168.0.118' | |
""" | |
https://gist.github.com/869791 | |
SocksiPy + urllib handler | |
version: 0.2 | |
author: e | |
This module provides a Handler which you can use with urllib2 to allow it to tunnel your connection through a socks.sockssocket socket, without monkey patching the original socket... | |
""" | |
class SocksiPyConnection(httplib.HTTPConnection): | |
def __init__(self, proxytype, proxyaddr, proxyport = None, rdns = True, username = None, password = None, *args, **kwargs): | |
self.proxyargs = (proxytype, proxyaddr, proxyport, rdns, username, password) | |
httplib.HTTPConnection.__init__(self, *args, **kwargs) | |
def connect(self): | |
self.sock = socks.socksocket() | |
self.sock.setproxy(*self.proxyargs) | |
if isinstance(self.timeout, float): | |
self.sock.settimeout(self.timeout) | |
self.sock.connect((self.host, self.port)) | |
class SocksiPyHandler(urllib2.HTTPHandler): | |
def __init__(self, *args, **kwargs): | |
self.args = args | |
self.kw = kwargs | |
urllib2.HTTPHandler.__init__(self) | |
def http_open(self, req): | |
def build(host, port=None, strict=None, timeout=0): | |
conn = SocksiPyConnection(*self.args, host=host, port=port, strict=strict, timeout=timeout, **self.kw) | |
return conn | |
return self.do_open(build, req) | |
class Monitor(Thread): | |
def __init__(self, queue, discovery): | |
Thread.__init__(self) | |
self.queue = queue | |
self.discovery = discovery | |
self.finish_signal = False | |
def finish(self): | |
self.finish_signal = True | |
def run(self): | |
while not self.finish_signal: | |
time.sleep(5) | |
print "Elements in Queue:", self.queue.qsize(), "Active Threads:", threading_active_count(), "Exceptions Counter:", self.discovery.exception_counter | |
class Worker(Thread): | |
def __init__(self, queue, discovery, socks_proxy_port): | |
Thread.__init__(self) | |
self.queue = queue | |
self.discovery = discovery | |
self.socks_proxy_port = socks_proxy_port | |
self.opener = urllib2.build_opener(SocksiPyHandler(socks.PROXY_TYPE_SOCKS4, 'localhost', self.socks_proxy_port)) | |
self.conn = Connection(MONGODB_HOSTNAME, 27017) | |
self.db = self.conn.scraping | |
self.coll = self.db.imdb.ratings | |
def get_url(self, url): | |
try: | |
#h = urllib2.urlopen(url) | |
h = self.opener.open(url) | |
return h.getcode() | |
except urllib2.URLError, e: | |
return e.code | |
def run(self): | |
while True: | |
try: | |
index = self.queue.get() | |
if index == None: | |
self.queue.put(None) # Notify the next worker | |
break | |
url = url_format.format(index) | |
code = self.get_url(url) | |
self.coll.update({'index':index}, {'$set': {'last_response':code}}) | |
self.discovery.lock.acquire() | |
self.discovery.records_to_process -= 1 | |
if self.discovery.records_to_process == 0: | |
self.discovery.lock.notify() | |
self.discovery.lock.release() | |
except (socks.Socks4Error, httplib.BadStatusLine), e: | |
# TypeError: 'Socks4Error' object is not callable | |
print e | |
self.discovery.exception_counter_lock.acquire() | |
self.discovery.exception_counter += 1 | |
self.discovery.exception_counter_lock.release() | |
pass # leave this element for the next cycle | |
time.sleep(1.5) | |
class Croupier(Thread): | |
Base = 0 | |
Top = 25000000 | |
#Top = 1000 | |
def __init__(self, queue, discovery): | |
Thread.__init__(self) | |
self.conn = Connection(MONGODB_HOSTNAME, 27017) | |
self.db = self.conn.scraping | |
self.coll = self.db.imdb.ratings | |
self.finish_signal = False | |
self.queue = queue | |
self.discovery = discovery | |
self.discovery.records_to_process = 0 | |
def run(self): | |
# Look if imdb collection is empty. Only if its empty we create all the items | |
c = self.coll.count() | |
if c == 0: | |
print "Inserting items" | |
self.coll.ensure_index([('index', pymongo.ASCENDING), ('last_response', pymongo.ASCENDING)]) | |
for i in xrange(Croupier.Base, Croupier.Top): | |
self.coll.insert({'index':i, 'url': url_format.format(i), 'last_response': 0}) | |
else: | |
print "Using #", c, " persisted items" | |
while True: | |
#items = self.coll.find({'last_response': {'$ne': 200}}) | |
items = self.coll.find({'$and': [{'last_response': {'$ne': 200}}, {'last_response' : {'$ne': 404}}]}, timeout = False) | |
self.discovery.records_to_process = items.count() | |
if self.discovery.records_to_process == 0: | |
break | |
for item in items: | |
self.queue.put(item['index']) | |
# Wait until the last item is updated on the db | |
self.discovery.lock.acquire() | |
while self.discovery.records_to_process != 0: | |
self.discovery.lock.wait() | |
self.discovery.lock.release() | |
# time.sleep(5) | |
# Send a 'signal' to workers to finish | |
self.queue.put(None) | |
def finish(self): | |
self.finish_signal = True | |
class Discovery: | |
NWorkers = 71 | |
SocksProxyBasePort = 9050 | |
Contention = 10000 | |
def __init__(self): | |
self.queue = Queue(Discovery.Contention) | |
self.workers = [] | |
self.lock = Condition() | |
self.exception_counter_lock = Lock() | |
self.records_to_process = 0 | |
self.exception_counter = 0 | |
def start(self): | |
croupier = Croupier(self.queue, self) | |
croupier.start() | |
for i in range(Discovery.NWorkers): | |
worker = Worker(self.queue, self, Discovery.SocksProxyBasePort + i) | |
self.workers.append(worker) | |
for w in self.workers: | |
w.start() | |
monitor = Monitor(self.queue, self) | |
monitor.start() | |
for w in self.workers: | |
w.join() | |
croupier.join() | |
print "Queue finished with:", self.queue.qsize(), "elements" | |
monitor.finish() | |
def main(): | |
discovery = Discovery() | |
discovery.start() | |
if __name__ == '__main__': | |
main() | |
# | |
# MISC NOTES | |
# | |
# - How many IMDB ratings pages are currently indexed by Google? query: inurl:www.imdb.com/user/*/ratings | |
# - [pymongo] cursor id '239432858681488351' not valid at server Options: http://groups.google.com/group/mongodb-user/browse_thread/thread/4ed6e3d77fb1c2cf?pli=1 | |
# That error generally means that the cursor timed out on the server - | |
# this could be the case if you are performing a long running operation | |
# while iterating over the cursor. The best bet is probably to turn off | |
# the timeout by passing "timeout=False" in your call to find: | |
# |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment