Created
February 9, 2016 23:58
-
-
Save brizzbane/8282c10ff39d8d4ed147 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import time | |
import pyuv | |
import gruvi | |
import pycurl | |
from io import BytesIO | |
from gruvi.futures import Future | |
from tornado import httputil | |
from tornado.escape import utf8, native_str | |
import collections | |
from net.config import configure_logging | |
configure_logging() | |
logger = logging.getLogger('cURLclient') | |
debug, info, warn, error, critical = logger.debug, logger.info, logger.warn, logger.error, logger.critical | |
class cURLSMTPClient(object): | |
def __init__(self, ioloop, max_clients=500): | |
self.defaults = dict(cURLSMTPRequest._DEFAULTS) | |
self.ioloop = ioloop | |
self._timer = pyuv.Timer(self.ioloop) | |
self._fd_map = {} | |
self._multi = pycurl.CurlMulti() | |
self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout) | |
self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._sock_state_cb) | |
self._curls = [self._curl_create() for _ in range(max_clients)] | |
self._free_list = self._curls[:] | |
self._requests = collections.deque() | |
def check_handle(self, check_handle): | |
error('check handle %s' % check_handle) | |
# check_handle.stop() | |
def prepare_handle(self, prepare_handle): | |
error('prepare handle %s' % prepare_handle) | |
#curl = self._curl_setup_request(request) | |
# self._multi.add_handle(curl) | |
prepare_handle.stop() | |
return 0 | |
def req(self, request, raise_error=True): | |
future = Future() | |
def handle_response(response): | |
for _ in range(10): | |
print("response %s" % response) | |
if raise_error and response.error: | |
future.set_exception(response.error) | |
else: | |
future.set_result(response) | |
# self._requests.append((request, handle_response)) | |
#self._process_queue() | |
curl = pycurl.Curl() | |
request = _RequestProxy(request, self.defaults) | |
curl.info = { | |
# "headers": httputil.HTTPHeaders(), | |
"buffer": BytesIO(), | |
"request": request, | |
"callback": handle_response, | |
"curl_start_time": time.time(), | |
} | |
self._curl_setup_request(curl, request, curl.info['buffer']) | |
self._multi.add_handle(curl) | |
self._set_timeout(0) | |
return future | |
def _process_queue(self): | |
while True: | |
started = 0 | |
while self._free_list and self._requests: | |
started += 1 | |
curl = self._free_list.pop() | |
(request, callback) = self._requests.popleft() | |
request = _RequestProxy(request, self.defaults) | |
curl.info = { | |
# "headers": httputil.HTTPHeaders(), | |
"buffer": BytesIO(), | |
"request": request, | |
"callback": callback, | |
"curl_start_time": time.time(), | |
} | |
self._curl_setup_request(curl, request, curl.info['buffer']) | |
self._multi.add_handle(curl) | |
if not started: | |
break | |
def _set_timeout(self, msecs): | |
debug(msecs) | |
if msecs >= 0: | |
timeout = msecs / 1000.0 | |
debug('timeout %s' % timeout) | |
self._timer.start(self._timer_cb, timeout, 0) | |
def _timer_cb(self, timer): | |
self._timer.stop() | |
debug('timer obj %s' % timer) | |
while True: | |
try: | |
ret, num_handles = self._multi.socket_action( pycurl.SOCKET_TIMEOUT, 0) | |
if ret != pycurl.E_CALL_MULTI_PERFORM: | |
break | |
except pycurl.error as e: | |
ret = e.args[0] | |
self._finish_pending_requests() | |
def _curl_create(self): | |
curl = pycurl.Curl() | |
curl.setopt(pycurl.VERBOSE, 1) | |
curl.setopt(pycurl.DEBUGFUNCTION, self._curl_debug) | |
return curl | |
def _curl_setup_request(self, curl, request, buffer): | |
curl.setopt(pycurl.URL, native_str(request.url)) | |
curl.setopt(pycurl.UPLOAD, True) | |
curl.setopt(pycurl.MAIL_FROM, native_str('t@yahoo.com')) | |
curl.setopt(pycurl.MAIL_RCPT, ['ab@mailinator.com']) | |
curl.setopt(pycurl.READDATA, BytesIO('hiya')) | |
def _finish_pending_requests(self): | |
"""Process any requests that were completed by the last | |
call to multi.socket_action. | |
""" | |
while True: | |
num_q, ok_list, err_list = self._multi.info_read() | |
for curl in ok_list: | |
self._finish(curl) | |
for curl, errnum, errmsg in err_list: | |
self._finish(curl, errnum, errmsg) | |
if num_q == 0: | |
break | |
self._process_queue() | |
def _finish(self, curl, curl_error=None, curl_message=None): | |
curlinfo = curl.info | |
curl.info = None | |
self._multi.remove_handle(curl) | |
self._free_list.append(curl) | |
buffer = curlinfo['buffer'] | |
if curl_error: | |
error = cURLError(curl_error, curl_message) | |
code = error.code | |
effective_url = None | |
buffer.close() | |
buffer = None | |
else: | |
error = None | |
code = curl.getinfo(pycurl.HTTP_CODE) | |
effective_url = curl.getinfo(pycurl.EFFECTIVE_URL) | |
buffer.seek(0) | |
# the various curl timings are documented at | |
# http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html | |
time_info = dict( | |
queue=curlinfo["curl_start_time"] - curlinfo["request"].start_time, | |
namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME), | |
connect=curl.getinfo(pycurl.CONNECT_TIME), | |
pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME), | |
starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME), | |
total=curl.getinfo(pycurl.TOTAL_TIME), | |
redirect=curl.getinfo(pycurl.REDIRECT_TIME), | |
) | |
try: | |
curlinfo["callback"](cURLResponse( | |
request=curlinfo["request"], code=code, | |
buffer=curlinfo['buffer'], effective_url=effective_url, error=error, | |
request_time=time.time() - curlinfo["curl_start_time"], | |
time_info=time_info)) | |
except Exception as e: | |
critical('exception %s' %e) | |
pass | |
# self.handle_callback_exception(info["callback"]) | |
def _handle_force_timeout(self, timer): | |
"""Called by IOLoop periodically to ask libcurl to process any | |
events it may have forgotten about. | |
""" | |
info(timer) | |
while True: | |
try: | |
ret, num_handles = self._multi.socket_all() | |
except pycurl.error as e: | |
ret = e.args[0] | |
if ret != pycurl.E_CALL_MULTI_PERFORM: | |
break | |
self._finish_pending_requests() | |
def _sock_state_cb(self, event, fd, multi, data): | |
from gruvi.poll import Poller | |
if fd not in self._fd_map: | |
handle = Poller(self.ioloop) | |
self._fd_map[fd] = handle | |
if event == pycurl.POLL_NONE: | |
handle = Poller(self.ioloop) | |
self._fd_map[fd] = handle | |
elif event == pycurl.POLL_REMOVE: | |
if fd in self._fd_map: | |
handle = self._fd_map.pop(fd) | |
handle.close() | |
elif event == pycurl.POLL_IN: | |
handle.add_callback(fd, pyuv.UV_READABLE, self._poll_in_cb) | |
elif event == pycurl.POLL_OUT: | |
handle.add_callback(fd, pyuv.UV_WRITABLE, self._poll_out_cb) | |
elif event == pycurl.POLL_INOUT: | |
handle.add_callback((pyuv.UV_WRITABLE|pyuv.UV_READABLE), self._poll_out_cb) | |
#handle.start((pyuv.UV_WRITABLE|pyuv.UV_READABLE), self._poll_out_cb) | |
''' | |
if event == pycurl.POLL_NONE: | |
handle = Poller(self.ioloop) | |
self._fd_map[fd] = handle | |
elif event == pycurl.POLL_REMOVE: | |
if fd in self._fd_map: | |
handle = self._fd_map.pop(fd) | |
handle.close() | |
elif event == pycurl.POLL_IN: | |
handle.start(pyuv.UV_READABLE, self._poll_in_cb) | |
elif event == pycurl.POLL_OUT: | |
handle.start(pyuv.UV_WRITABLE, self._poll_out_cb) | |
elif event == pycurl.POLL_INOUT: | |
handle.start((pyuv.UV_WRITABLE|pyuv.UV_READABLE), self._poll_out_cb) | |
''' | |
def _handle_events(self, fd, events): | |
"""Called by IOLoop when there is activity on one of our | |
file descriptors. | |
""" | |
action = 0 | |
if events & pyuv.UV_READABLE: | |
action |= pycurl.CSELECT_IN | |
if events & ioloop.IOLoop.WRITE: | |
action |= pycurl.CSELECT_OUT | |
while True: | |
try: | |
ret, num_handles = self._multi.socket_action(fd, action) | |
except pycurl.error as e: | |
ret = e.args[0] | |
if ret != pycurl.E_CALL_MULTI_PERFORM: | |
break | |
self._finish_pending_requests() | |
def _poll_in_cb(self, handle, events, error): | |
if error is not None: | |
return | |
action = 0 | |
if events & pyuv.UV_READABLE: | |
action |= pycurl.CSELECT_IN | |
ret, num_handles = self._multi.socket_action(handle.fileno(), action) | |
def _poll_out_cb(self, handle, events, error): | |
if error is not None: | |
return | |
action = 0 | |
if events & pyuv.UV_WRITABLE: | |
action |= pycurl.CSELECT_OUT | |
ret, num_handles = self._multi.socket_action(handle.fileno(), action) | |
def _poll_inout_cb(self, handle, events, error): | |
if error is not None: | |
return | |
action = 0 | |
if events & pyuv.UV_WRITABLE: | |
action |= pycurl.CSELECT_OUT | |
ret, num_handles = self._multi.socket_action(handle.fileno(), action) | |
elif events & pyuv.UV_READABLE: | |
action |= pycurl.CSELECT_IN | |
ret, num_handles = self._multi.socket_action(handle.fileno(), action) | |
def _poll_cb(self, handle, events, error): | |
if error is not None: | |
return | |
action = 0 | |
if events & pyuv.UV_READABLE: | |
action |= pycurl.CSELECT_IN | |
self._multi.socket_action(handle.fd, action) | |
if events & pyuv.UV_WRITABLE: | |
action |= pycurl.CSELECT_OUT | |
self._multi.socket_action(handle.fd, action) | |
self._finish_pending_requests() | |
def _curl_debug(self, debug_type, debug_msg): | |
debug('debug type %s debug msg %s' % (debug_type, debug_msg)) | |
debug_types = ('I', '<', '>', '<', '>') | |
if debug_type == 0: | |
debug('%s', debug_msg.strip()) | |
elif debug_type in (1, 2): | |
for line in debug_msg.splitlines(): | |
debug('%s %s', debug_types[debug_type], line) | |
elif debug_type == 4: | |
debug('%s %r', debug_types[debug_type], debug_msg) | |
class cURLSMTPRequest(object): | |
"""HTTP client request object.""" | |
# Default values for HTTPRequest parameters. | |
# Merged with the values on the request object by AsyncHTTPClient | |
# implementations. | |
_DEFAULTS = dict( | |
connect_timeout=60.0, | |
request_timeout=15.0, | |
follow_redirects=True, | |
max_redirects=5, | |
decompress_response=True, | |
proxy_password='', | |
allow_nonstandard_methods=True, | |
validate_cert=True) | |
def __init__(self, url, mail_from, mail_rcpt, message, headers=None, body=None, | |
auth_username=None, auth_password=None, auth_mode=None, | |
connect_timeout=None, request_timeout=None, | |
if_modified_since=None, follow_redirects=None, | |
max_redirects=None, user_agent=None, use_gzip=None, | |
network_interface=None, streaming_callback=None, | |
header_callback=None, prepare_curl_callback=None, | |
proxy_host=None, proxy_port=None, proxy_username=None, | |
proxy_password=None, allow_nonstandard_methods=None, | |
validate_cert=None, ca_certs=None, | |
allow_ipv6=None, | |
client_key=None, client_cert=None, body_producer=None, | |
expect_100_continue=False, decompress_response=None, | |
ssl_options=None): | |
self.headers = headers | |
self.proxy_host = proxy_host | |
self.proxy_port = proxy_port | |
self.proxy_username = proxy_username | |
self.proxy_password = proxy_password | |
self.url = url | |
self.mail_from = mail_from | |
self.mail_rcpt = [mail_rcpt] | |
self.message = BytesIO(message) | |
self.body_producer = body_producer | |
self.auth_username = auth_username | |
self.auth_password = auth_password | |
self.auth_mode = auth_mode | |
self.connect_timeout = connect_timeout | |
self.request_timeout = request_timeout | |
self.follow_redirects = follow_redirects | |
self.max_redirects = max_redirects | |
self.user_agent = user_agent | |
if decompress_response is not None: | |
self.decompress_response = decompress_response | |
else: | |
self.decompress_response = use_gzip | |
self.network_interface = network_interface | |
self.streaming_callback = streaming_callback | |
self.header_callback = header_callback | |
self.prepare_curl_callback = prepare_curl_callback | |
self.allow_nonstandard_methods = allow_nonstandard_methods | |
self.validate_cert = validate_cert | |
self.ca_certs = ca_certs | |
self.allow_ipv6 = allow_ipv6 | |
self.client_key = client_key | |
self.client_cert = client_cert | |
self.ssl_options = ssl_options | |
self.expect_100_continue = expect_100_continue | |
self.start_time = time.time() | |
@property | |
def headers(self): | |
return self._headers | |
@headers.setter | |
def headers(self, value): | |
self._headers = value | |
@property | |
def body(self): | |
return self._body | |
@body.setter | |
def body(self, value): | |
self._body = value | |
class _RequestProxy(object): | |
"""Combines an object with a dictionary of defaults. | |
Used internally by AsyncHTTPClient implementations. | |
""" | |
def __init__(self, request, defaults): | |
self.request = request | |
self.defaults = defaults | |
def __getattr__(self, name): | |
request_attr = getattr(self.request, name) | |
if request_attr is not None: | |
return request_attr | |
elif self.defaults is not None: | |
return self.defaults.get(name, None) | |
else: | |
return None | |
class cURLResponse(object): | |
"""HTTP Response object. | |
Attributes: | |
* request: HTTPRequest object | |
* code: numeric HTTP status code, e.g. 200 or 404 | |
* reason: human-readable reason phrase describing the status code | |
* headers: `tornado.httputil.HTTPHeaders` object | |
* effective_url: final location of the resource after following any | |
redirects | |
* buffer: ``cStringIO`` object for response body | |
* body: response body as string (created on demand from ``self.buffer``) | |
* error: Exception object, if any | |
* request_time: seconds from request start to finish | |
* time_info: dictionary of diagnostic timing information from the request. | |
Available data are subject to change, but currently uses timings | |
available from http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html, | |
plus ``queue``, which is the delay (if any) introduced by waiting for | |
a slot under `AsyncHTTPClient`'s ``max_clients`` setting. | |
""" | |
def __init__(self, request, code, headers=None, buffer=None, | |
effective_url=None, error=None, request_time=None, | |
time_info=None, reason=None): | |
if isinstance(request, _RequestProxy): | |
self.request = request.request | |
else: | |
self.request = request | |
self.code = code | |
self.reason = reason | |
if headers is not None: | |
self.headers = headers | |
self.buffer = buffer | |
self._body = None | |
if effective_url is None: | |
self.effective_url = request.url | |
else: | |
self.effective_url = effective_url | |
self.error = error | |
self.request_time = request_time | |
self.time_info = time_info or {} | |
def _get_body(self): | |
if self.buffer is None: | |
return None | |
elif self._body is None: | |
self._body = self.buffer.getvalue() | |
return self._body | |
body = property(_get_body) | |
def rethrow(self): | |
"""If there was an error on the request, raise an `HTTPError`.""" | |
if self.error: | |
raise self.error | |
def __repr__(self): | |
args = ",".join("%s=%r" % i for i in sorted(self.__dict__.items())) | |
return "%s(%s)" % (self.__class__.__name__, args) | |
class cURLError(Exception): | |
"""Exception thrown for an unsuccessful HTTP request. | |
Attributes: | |
* ``code`` - HTTP error integer error code, e.g. 404. Error code 599 is | |
used when no HTTP response was received, e.g. for a timeout. | |
* ``response`` - `HTTPResponse` object, if any. | |
Note that if ``follow_redirects`` is False, redirects become HTTPErrors, | |
and you can look at ``error.response.headers['Location']`` to see the | |
destination of the redirect. | |
""" | |
def __init__(self, code, message=None, response=None): | |
self.code = code | |
self.message = message | |
self.response = response | |
super(cURLError, self).__init__(code, message, response) | |
def __str__(self): | |
return "cURL %d: %s" % (self.code, self.message) | |
if __name__ == '__main__': | |
ioloop = gruvi.get_hub().loop | |
c = cURLSMTPClient(ioloop) | |
r = cURLSMTPRequest(url='smtp://mail.mailinator.com/', mail_from='a@yahoo.com', mail_rcpt='test@mailinator.com', | |
message='hi') | |
future = gruvi.Fiber(c.req, (r,)) | |
future.start() | |
gruvi.get_hub().switch() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment