Created
March 2, 2016 19:18
-
-
Save dalf/d179276d73de9dd9bf1d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import pycurl | |
import threading | |
from itertools import cycle | |
# use of cStringIO | |
from cStringIO import StringIO | |
from time import time | |
from urllib import urlencode | |
from multiprocessing import Pool, TimeoutError | |
from multiprocessing.pool import ThreadPool | |
CURL_SHARE = pycurl.CurlShare() | |
CURL_SHARE.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS) | |
MULTI_HANDLER = pycurl.CurlMulti() | |
def __test_callback(*args): | |
print "callback called" | |
def no_callback(*args): | |
pass | |
def get_connection(source_address=None): | |
# pycurl initialization | |
h = pycurl.Curl() | |
# follow redirects | |
h.setopt(h.FOLLOWLOCATION, True) | |
# h.setopt(h.VERBOSE, True) | |
# consistently use ipv4 | |
h.setopt(h.IPRESOLVE, pycurl.IPRESOLVE_V4) | |
h.setopt(pycurl.SHARE, CURL_SHARE) | |
# make sure cookie are sent back when there is redirect | |
h.setopt(pycurl.COOKIEFILE, '') | |
# enable compression : faster download | |
h.setopt(pycurl.ENCODING, 'gzip, deflate') | |
# seems to be faster ? | |
h.setopt(pycurl.TCP_NODELAY, 1) | |
h.setopt(pycurl.TCP_KEEPALIVE, 1) | |
if source_address: | |
h.setopt(h.INTERFACE, source_address) | |
return h | |
class RequestContainer(object): | |
def __init__(self, | |
url, | |
curl_handler, | |
method='GET', | |
headers=None, | |
cookies=None, | |
callback=None, | |
data=None, | |
timeout=2.0, | |
ssl_verification=False): | |
if headers is None: | |
headers = {} | |
if cookies is None: | |
cookies = {} | |
if callback is None: | |
callback = no_callback | |
if data is not None: | |
curl_handler.setopt(curl_handler.POSTFIELDS, urlencode(data)) | |
self.url = url | |
self.headers = headers | |
self.cookies = cookies | |
self.timeout = int(timeout * 1000) # in milisecs | |
self.callback = callback | |
self.curl_handler = curl_handler | |
self._response_buffer = StringIO() | |
self.response = None | |
curl_handler.setopt(curl_handler.WRITEFUNCTION, self._response_buffer.write) | |
curl_handler.setopt(curl_handler.SSL_VERIFYPEER, int(ssl_verification)) | |
def extract_response(self): | |
infos = ( | |
("TOTAL_TIME", pycurl.TOTAL_TIME), | |
# ("NAMELOOKUP_TIME", pycurl.NAMELOOKUP_TIME), | |
# ("CONNECT_TIME", pycurl.CONNECT_TIME), | |
# ("PRETRANSFER_TIME" ,pycurl.PRETRANSFER_TIME), | |
# ("STARTTRANSFER_TIME", pycurl.STARTTRANSFER_TIME), | |
# ("REDIRECT_TIME", pycurl.REDIRECT_TIME), | |
# ("REDIRECT_COUNT", pycurl.REDIRECT_COUNT) | |
) | |
for i in infos: | |
print "{name} {time} {url}".format(name = i[0], time = self.curl_handler.getinfo(i[1]), url=self.url) | |
body = self._response_buffer.getvalue() | |
status_code = self.curl_handler.getinfo(pycurl.HTTP_CODE) | |
self.response = ResponseContainer(body, status_code, self.url) | |
def finish(self): | |
print "finish" | |
if self.callback: | |
# return self.callback(self.response) | |
self.callback(self.response) | |
return True | |
class ResponseContainer(object): | |
def __init__(self, body, status_code, url): | |
self.text = self.content = body | |
self.status_code = status_code | |
self.url = url | |
class MultiRequest(object): | |
def __init__(self, multi_handler=None, source_ips=None): | |
self.requests = {} | |
if multi_handler: | |
self._curl_multi_handler = multi_handler | |
else: | |
self._curl_multi_handler = MULTI_HANDLER | |
self.source_ips = cycle(source_ips) if source_ips else cycle([None]) | |
self.lock = threading.RLock() | |
def add(self, url, **kwargs): | |
handle = get_connection(next(self.source_ips)) | |
request_container = RequestContainer(url, handle, **kwargs) | |
try: | |
self._curl_multi_handler.add_handle(handle) | |
except: | |
print 'meep' | |
pass | |
self.requests[handle] = request_container | |
def send_requests(self): | |
select_timeout = 0.5 | |
# set timeout | |
timeout = max(c.timeout for c in self.requests.values()) | |
for h, c in self.requests.iteritems(): | |
h.setopt(h.CONNECTTIMEOUT_MS, timeout) | |
h.setopt(h.TIMEOUT_MS, timeout) | |
h.setopt(h.URL, c.url) | |
c.headers['Connection'] = 'keep-alive' | |
# c.headers['Accept-Encoding'] = 'gzip, deflate' | |
h.setopt(h.HTTPHEADER, | |
['{0}: {1}'.format(k, v) | |
for k, v in c.headers.iteritems()]) | |
if c.cookies: | |
h.setopt(h.COOKIE, '; '.join('{0}={1}'.format(k, v) | |
for k, v in c.cookies.iteritems())) | |
else: | |
h.unsetopt(h.COOKIE) | |
search_start = time() | |
handles_num = len(self.requests) | |
async = [] | |
while handles_num: | |
ret = self._curl_multi_handler.select(select_timeout) | |
if ret == -1: | |
continue | |
while 1: | |
ret, new_handles_num = self._curl_multi_handler.perform() | |
# handle finished | |
if new_handles_num < handles_num: | |
_, success_list, error_list = self._curl_multi_handler.info_read() | |
for h in success_list: | |
# init self.requests[h].response | |
# call in main thread to avoid conflict usage of the curl handler | |
self.requests[h].extract_response() | |
# calling callbacks | |
async.append(POOL.apply_async(self.requests[h].callback, ( self.requests[h].response, ))) | |
for h, err_code, err_string in error_list: | |
logging.warn('Error on %s: "%s"', self.requests[h].url, err_string) | |
handles_num -= len(success_list) + len(error_list) | |
if ret != pycurl.E_CALL_MULTI_PERFORM: | |
break | |
for a in async: | |
remaining_time = max(0.0, timeout - (time() - search_start)) | |
try: | |
a.get(timeout = remaining_time) | |
except TimeoutError: | |
logging.warning('engine timeout: {0}'.format(th._engine_name)) | |
# self._curl_multi_handler.close() | |
return self.requests.values() | |
# Thread pool with the GIL limitation | |
POOL = ThreadPool(8) | |
# Process : doesn't work for now, the callback has to be rewritten | |
# POOL = Pool(8) | |
if __name__ == '__main__': | |
r = MultiRequest() | |
r.add('http://httpbin.org/delay/0', headers={'User-Agent': 'x'}) | |
r.add('http://127.0.0.1:7777/', headers={'User-Agent': 'x'}) | |
r.add('http://httpbin.org/delay/0', cookies={'as': 'sa', 'bb': 'cc'}) | |
r.add('http://httpbin.org/delay/0', callback=__test_callback, timeout=1.0, headers={'User-Agent': 'x'}) | |
for v in r.send_requests(): | |
print v.url | |
print v.response.text |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment