Skip to content

Instantly share code, notes, and snippets.

@dalf
Created March 2, 2016 19:18
Show Gist options
  • Save dalf/d179276d73de9dd9bf1d to your computer and use it in GitHub Desktop.
Save dalf/d179276d73de9dd9bf1d to your computer and use it in GitHub Desktop.
import logging
import pycurl
import threading
from itertools import cycle
# use of cStringIO
from cStringIO import StringIO
from time import time
from urllib import urlencode
from multiprocessing import Pool, TimeoutError
from multiprocessing.pool import ThreadPool
CURL_SHARE = pycurl.CurlShare()
CURL_SHARE.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS)
MULTI_HANDLER = pycurl.CurlMulti()
def __test_callback(*args):
print "callback called"
def no_callback(*args):
pass
def get_connection(source_address=None):
# pycurl initialization
h = pycurl.Curl()
# follow redirects
h.setopt(h.FOLLOWLOCATION, True)
# h.setopt(h.VERBOSE, True)
# consistently use ipv4
h.setopt(h.IPRESOLVE, pycurl.IPRESOLVE_V4)
h.setopt(pycurl.SHARE, CURL_SHARE)
# make sure cookie are sent back when there is redirect
h.setopt(pycurl.COOKIEFILE, '')
# enable compression : faster download
h.setopt(pycurl.ENCODING, 'gzip, deflate')
# seems to be faster ?
h.setopt(pycurl.TCP_NODELAY, 1)
h.setopt(pycurl.TCP_KEEPALIVE, 1)
if source_address:
h.setopt(h.INTERFACE, source_address)
return h
class RequestContainer(object):
def __init__(self,
url,
curl_handler,
method='GET',
headers=None,
cookies=None,
callback=None,
data=None,
timeout=2.0,
ssl_verification=False):
if headers is None:
headers = {}
if cookies is None:
cookies = {}
if callback is None:
callback = no_callback
if data is not None:
curl_handler.setopt(curl_handler.POSTFIELDS, urlencode(data))
self.url = url
self.headers = headers
self.cookies = cookies
self.timeout = int(timeout * 1000) # in milisecs
self.callback = callback
self.curl_handler = curl_handler
self._response_buffer = StringIO()
self.response = None
curl_handler.setopt(curl_handler.WRITEFUNCTION, self._response_buffer.write)
curl_handler.setopt(curl_handler.SSL_VERIFYPEER, int(ssl_verification))
def extract_response(self):
infos = (
("TOTAL_TIME", pycurl.TOTAL_TIME),
# ("NAMELOOKUP_TIME", pycurl.NAMELOOKUP_TIME),
# ("CONNECT_TIME", pycurl.CONNECT_TIME),
# ("PRETRANSFER_TIME" ,pycurl.PRETRANSFER_TIME),
# ("STARTTRANSFER_TIME", pycurl.STARTTRANSFER_TIME),
# ("REDIRECT_TIME", pycurl.REDIRECT_TIME),
# ("REDIRECT_COUNT", pycurl.REDIRECT_COUNT)
)
for i in infos:
print "{name} {time} {url}".format(name = i[0], time = self.curl_handler.getinfo(i[1]), url=self.url)
body = self._response_buffer.getvalue()
status_code = self.curl_handler.getinfo(pycurl.HTTP_CODE)
self.response = ResponseContainer(body, status_code, self.url)
def finish(self):
print "finish"
if self.callback:
# return self.callback(self.response)
self.callback(self.response)
return True
class ResponseContainer(object):
def __init__(self, body, status_code, url):
self.text = self.content = body
self.status_code = status_code
self.url = url
class MultiRequest(object):
def __init__(self, multi_handler=None, source_ips=None):
self.requests = {}
if multi_handler:
self._curl_multi_handler = multi_handler
else:
self._curl_multi_handler = MULTI_HANDLER
self.source_ips = cycle(source_ips) if source_ips else cycle([None])
self.lock = threading.RLock()
def add(self, url, **kwargs):
handle = get_connection(next(self.source_ips))
request_container = RequestContainer(url, handle, **kwargs)
try:
self._curl_multi_handler.add_handle(handle)
except:
print 'meep'
pass
self.requests[handle] = request_container
def send_requests(self):
select_timeout = 0.5
# set timeout
timeout = max(c.timeout for c in self.requests.values())
for h, c in self.requests.iteritems():
h.setopt(h.CONNECTTIMEOUT_MS, timeout)
h.setopt(h.TIMEOUT_MS, timeout)
h.setopt(h.URL, c.url)
c.headers['Connection'] = 'keep-alive'
# c.headers['Accept-Encoding'] = 'gzip, deflate'
h.setopt(h.HTTPHEADER,
['{0}: {1}'.format(k, v)
for k, v in c.headers.iteritems()])
if c.cookies:
h.setopt(h.COOKIE, '; '.join('{0}={1}'.format(k, v)
for k, v in c.cookies.iteritems()))
else:
h.unsetopt(h.COOKIE)
search_start = time()
handles_num = len(self.requests)
async = []
while handles_num:
ret = self._curl_multi_handler.select(select_timeout)
if ret == -1:
continue
while 1:
ret, new_handles_num = self._curl_multi_handler.perform()
# handle finished
if new_handles_num < handles_num:
_, success_list, error_list = self._curl_multi_handler.info_read()
for h in success_list:
# init self.requests[h].response
# call in main thread to avoid conflict usage of the curl handler
self.requests[h].extract_response()
# calling callbacks
async.append(POOL.apply_async(self.requests[h].callback, ( self.requests[h].response, )))
for h, err_code, err_string in error_list:
logging.warn('Error on %s: "%s"', self.requests[h].url, err_string)
handles_num -= len(success_list) + len(error_list)
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
for a in async:
remaining_time = max(0.0, timeout - (time() - search_start))
try:
a.get(timeout = remaining_time)
except TimeoutError:
logging.warning('engine timeout: {0}'.format(th._engine_name))
# self._curl_multi_handler.close()
return self.requests.values()
# Thread pool with the GIL limitation
POOL = ThreadPool(8)
# Process : doesn't work for now, the callback has to be rewritten
# POOL = Pool(8)
if __name__ == '__main__':
r = MultiRequest()
r.add('http://httpbin.org/delay/0', headers={'User-Agent': 'x'})
r.add('http://127.0.0.1:7777/', headers={'User-Agent': 'x'})
r.add('http://httpbin.org/delay/0', cookies={'as': 'sa', 'bb': 'cc'})
r.add('http://httpbin.org/delay/0', callback=__test_callback, timeout=1.0, headers={'User-Agent': 'x'})
for v in r.send_requests():
print v.url
print v.response.text
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment