Skip to content

Instantly share code, notes, and snippets.

@brizzbane
Created February 9, 2016 23:58
Show Gist options
  • Save brizzbane/8282c10ff39d8d4ed147 to your computer and use it in GitHub Desktop.
Save brizzbane/8282c10ff39d8d4ed147 to your computer and use it in GitHub Desktop.
import logging
import time
import pyuv
import gruvi
import pycurl
from io import BytesIO
from gruvi.futures import Future
from tornado import httputil
from tornado.escape import utf8, native_str
import collections
from net.config import configure_logging
configure_logging()
logger = logging.getLogger('cURLclient')
debug, info, warn, error, critical = logger.debug, logger.info, logger.warn, logger.error, logger.critical
class cURLSMTPClient(object):
def __init__(self, ioloop, max_clients=500):
self.defaults = dict(cURLSMTPRequest._DEFAULTS)
self.ioloop = ioloop
self._timer = pyuv.Timer(self.ioloop)
self._fd_map = {}
self._multi = pycurl.CurlMulti()
self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._sock_state_cb)
self._curls = [self._curl_create() for _ in range(max_clients)]
self._free_list = self._curls[:]
self._requests = collections.deque()
def check_handle(self, check_handle):
error('check handle %s' % check_handle)
# check_handle.stop()
def prepare_handle(self, prepare_handle):
error('prepare handle %s' % prepare_handle)
#curl = self._curl_setup_request(request)
# self._multi.add_handle(curl)
prepare_handle.stop()
return 0
def req(self, request, raise_error=True):
future = Future()
def handle_response(response):
for _ in range(10):
print("response %s" % response)
if raise_error and response.error:
future.set_exception(response.error)
else:
future.set_result(response)
# self._requests.append((request, handle_response))
#self._process_queue()
curl = pycurl.Curl()
request = _RequestProxy(request, self.defaults)
curl.info = {
# "headers": httputil.HTTPHeaders(),
"buffer": BytesIO(),
"request": request,
"callback": handle_response,
"curl_start_time": time.time(),
}
self._curl_setup_request(curl, request, curl.info['buffer'])
self._multi.add_handle(curl)
self._set_timeout(0)
return future
def _process_queue(self):
while True:
started = 0
while self._free_list and self._requests:
started += 1
curl = self._free_list.pop()
(request, callback) = self._requests.popleft()
request = _RequestProxy(request, self.defaults)
curl.info = {
# "headers": httputil.HTTPHeaders(),
"buffer": BytesIO(),
"request": request,
"callback": callback,
"curl_start_time": time.time(),
}
self._curl_setup_request(curl, request, curl.info['buffer'])
self._multi.add_handle(curl)
if not started:
break
def _set_timeout(self, msecs):
debug(msecs)
if msecs >= 0:
timeout = msecs / 1000.0
debug('timeout %s' % timeout)
self._timer.start(self._timer_cb, timeout, 0)
def _timer_cb(self, timer):
self._timer.stop()
debug('timer obj %s' % timer)
while True:
try:
ret, num_handles = self._multi.socket_action( pycurl.SOCKET_TIMEOUT, 0)
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
except pycurl.error as e:
ret = e.args[0]
self._finish_pending_requests()
def _curl_create(self):
curl = pycurl.Curl()
curl.setopt(pycurl.VERBOSE, 1)
curl.setopt(pycurl.DEBUGFUNCTION, self._curl_debug)
return curl
def _curl_setup_request(self, curl, request, buffer):
curl.setopt(pycurl.URL, native_str(request.url))
curl.setopt(pycurl.UPLOAD, True)
curl.setopt(pycurl.MAIL_FROM, native_str('t@yahoo.com'))
curl.setopt(pycurl.MAIL_RCPT, ['ab@mailinator.com'])
curl.setopt(pycurl.READDATA, BytesIO('hiya'))
def _finish_pending_requests(self):
"""Process any requests that were completed by the last
call to multi.socket_action.
"""
while True:
num_q, ok_list, err_list = self._multi.info_read()
for curl in ok_list:
self._finish(curl)
for curl, errnum, errmsg in err_list:
self._finish(curl, errnum, errmsg)
if num_q == 0:
break
self._process_queue()
def _finish(self, curl, curl_error=None, curl_message=None):
curlinfo = curl.info
curl.info = None
self._multi.remove_handle(curl)
self._free_list.append(curl)
buffer = curlinfo['buffer']
if curl_error:
error = cURLError(curl_error, curl_message)
code = error.code
effective_url = None
buffer.close()
buffer = None
else:
error = None
code = curl.getinfo(pycurl.HTTP_CODE)
effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
buffer.seek(0)
# the various curl timings are documented at
# http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html
time_info = dict(
queue=curlinfo["curl_start_time"] - curlinfo["request"].start_time,
namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
connect=curl.getinfo(pycurl.CONNECT_TIME),
pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME),
starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME),
total=curl.getinfo(pycurl.TOTAL_TIME),
redirect=curl.getinfo(pycurl.REDIRECT_TIME),
)
try:
curlinfo["callback"](cURLResponse(
request=curlinfo["request"], code=code,
buffer=curlinfo['buffer'], effective_url=effective_url, error=error,
request_time=time.time() - curlinfo["curl_start_time"],
time_info=time_info))
except Exception as e:
critical('exception %s' %e)
pass
# self.handle_callback_exception(info["callback"])
def _handle_force_timeout(self, timer):
"""Called by IOLoop periodically to ask libcurl to process any
events it may have forgotten about.
"""
info(timer)
while True:
try:
ret, num_handles = self._multi.socket_all()
except pycurl.error as e:
ret = e.args[0]
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
self._finish_pending_requests()
def _sock_state_cb(self, event, fd, multi, data):
from gruvi.poll import Poller
if fd not in self._fd_map:
handle = Poller(self.ioloop)
self._fd_map[fd] = handle
if event == pycurl.POLL_NONE:
handle = Poller(self.ioloop)
self._fd_map[fd] = handle
elif event == pycurl.POLL_REMOVE:
if fd in self._fd_map:
handle = self._fd_map.pop(fd)
handle.close()
elif event == pycurl.POLL_IN:
handle.add_callback(fd, pyuv.UV_READABLE, self._poll_in_cb)
elif event == pycurl.POLL_OUT:
handle.add_callback(fd, pyuv.UV_WRITABLE, self._poll_out_cb)
elif event == pycurl.POLL_INOUT:
handle.add_callback((pyuv.UV_WRITABLE|pyuv.UV_READABLE), self._poll_out_cb)
#handle.start((pyuv.UV_WRITABLE|pyuv.UV_READABLE), self._poll_out_cb)
'''
if event == pycurl.POLL_NONE:
handle = Poller(self.ioloop)
self._fd_map[fd] = handle
elif event == pycurl.POLL_REMOVE:
if fd in self._fd_map:
handle = self._fd_map.pop(fd)
handle.close()
elif event == pycurl.POLL_IN:
handle.start(pyuv.UV_READABLE, self._poll_in_cb)
elif event == pycurl.POLL_OUT:
handle.start(pyuv.UV_WRITABLE, self._poll_out_cb)
elif event == pycurl.POLL_INOUT:
handle.start((pyuv.UV_WRITABLE|pyuv.UV_READABLE), self._poll_out_cb)
'''
def _handle_events(self, fd, events):
"""Called by IOLoop when there is activity on one of our
file descriptors.
"""
action = 0
if events & pyuv.UV_READABLE:
action |= pycurl.CSELECT_IN
if events & ioloop.IOLoop.WRITE:
action |= pycurl.CSELECT_OUT
while True:
try:
ret, num_handles = self._multi.socket_action(fd, action)
except pycurl.error as e:
ret = e.args[0]
if ret != pycurl.E_CALL_MULTI_PERFORM:
break
self._finish_pending_requests()
def _poll_in_cb(self, handle, events, error):
if error is not None:
return
action = 0
if events & pyuv.UV_READABLE:
action |= pycurl.CSELECT_IN
ret, num_handles = self._multi.socket_action(handle.fileno(), action)
def _poll_out_cb(self, handle, events, error):
if error is not None:
return
action = 0
if events & pyuv.UV_WRITABLE:
action |= pycurl.CSELECT_OUT
ret, num_handles = self._multi.socket_action(handle.fileno(), action)
def _poll_inout_cb(self, handle, events, error):
if error is not None:
return
action = 0
if events & pyuv.UV_WRITABLE:
action |= pycurl.CSELECT_OUT
ret, num_handles = self._multi.socket_action(handle.fileno(), action)
elif events & pyuv.UV_READABLE:
action |= pycurl.CSELECT_IN
ret, num_handles = self._multi.socket_action(handle.fileno(), action)
def _poll_cb(self, handle, events, error):
if error is not None:
return
action = 0
if events & pyuv.UV_READABLE:
action |= pycurl.CSELECT_IN
self._multi.socket_action(handle.fd, action)
if events & pyuv.UV_WRITABLE:
action |= pycurl.CSELECT_OUT
self._multi.socket_action(handle.fd, action)
self._finish_pending_requests()
def _curl_debug(self, debug_type, debug_msg):
debug('debug type %s debug msg %s' % (debug_type, debug_msg))
debug_types = ('I', '<', '>', '<', '>')
if debug_type == 0:
debug('%s', debug_msg.strip())
elif debug_type in (1, 2):
for line in debug_msg.splitlines():
debug('%s %s', debug_types[debug_type], line)
elif debug_type == 4:
debug('%s %r', debug_types[debug_type], debug_msg)
class cURLSMTPRequest(object):
"""HTTP client request object."""
# Default values for HTTPRequest parameters.
# Merged with the values on the request object by AsyncHTTPClient
# implementations.
_DEFAULTS = dict(
connect_timeout=60.0,
request_timeout=15.0,
follow_redirects=True,
max_redirects=5,
decompress_response=True,
proxy_password='',
allow_nonstandard_methods=True,
validate_cert=True)
def __init__(self, url, mail_from, mail_rcpt, message, headers=None, body=None,
auth_username=None, auth_password=None, auth_mode=None,
connect_timeout=None, request_timeout=None,
if_modified_since=None, follow_redirects=None,
max_redirects=None, user_agent=None, use_gzip=None,
network_interface=None, streaming_callback=None,
header_callback=None, prepare_curl_callback=None,
proxy_host=None, proxy_port=None, proxy_username=None,
proxy_password=None, allow_nonstandard_methods=None,
validate_cert=None, ca_certs=None,
allow_ipv6=None,
client_key=None, client_cert=None, body_producer=None,
expect_100_continue=False, decompress_response=None,
ssl_options=None):
self.headers = headers
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.proxy_username = proxy_username
self.proxy_password = proxy_password
self.url = url
self.mail_from = mail_from
self.mail_rcpt = [mail_rcpt]
self.message = BytesIO(message)
self.body_producer = body_producer
self.auth_username = auth_username
self.auth_password = auth_password
self.auth_mode = auth_mode
self.connect_timeout = connect_timeout
self.request_timeout = request_timeout
self.follow_redirects = follow_redirects
self.max_redirects = max_redirects
self.user_agent = user_agent
if decompress_response is not None:
self.decompress_response = decompress_response
else:
self.decompress_response = use_gzip
self.network_interface = network_interface
self.streaming_callback = streaming_callback
self.header_callback = header_callback
self.prepare_curl_callback = prepare_curl_callback
self.allow_nonstandard_methods = allow_nonstandard_methods
self.validate_cert = validate_cert
self.ca_certs = ca_certs
self.allow_ipv6 = allow_ipv6
self.client_key = client_key
self.client_cert = client_cert
self.ssl_options = ssl_options
self.expect_100_continue = expect_100_continue
self.start_time = time.time()
@property
def headers(self):
return self._headers
@headers.setter
def headers(self, value):
self._headers = value
@property
def body(self):
return self._body
@body.setter
def body(self, value):
self._body = value
class _RequestProxy(object):
"""Combines an object with a dictionary of defaults.
Used internally by AsyncHTTPClient implementations.
"""
def __init__(self, request, defaults):
self.request = request
self.defaults = defaults
def __getattr__(self, name):
request_attr = getattr(self.request, name)
if request_attr is not None:
return request_attr
elif self.defaults is not None:
return self.defaults.get(name, None)
else:
return None
class cURLResponse(object):
"""HTTP Response object.
Attributes:
* request: HTTPRequest object
* code: numeric HTTP status code, e.g. 200 or 404
* reason: human-readable reason phrase describing the status code
* headers: `tornado.httputil.HTTPHeaders` object
* effective_url: final location of the resource after following any
redirects
* buffer: ``cStringIO`` object for response body
* body: response body as string (created on demand from ``self.buffer``)
* error: Exception object, if any
* request_time: seconds from request start to finish
* time_info: dictionary of diagnostic timing information from the request.
Available data are subject to change, but currently uses timings
available from http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html,
plus ``queue``, which is the delay (if any) introduced by waiting for
a slot under `AsyncHTTPClient`'s ``max_clients`` setting.
"""
def __init__(self, request, code, headers=None, buffer=None,
effective_url=None, error=None, request_time=None,
time_info=None, reason=None):
if isinstance(request, _RequestProxy):
self.request = request.request
else:
self.request = request
self.code = code
self.reason = reason
if headers is not None:
self.headers = headers
self.buffer = buffer
self._body = None
if effective_url is None:
self.effective_url = request.url
else:
self.effective_url = effective_url
self.error = error
self.request_time = request_time
self.time_info = time_info or {}
def _get_body(self):
if self.buffer is None:
return None
elif self._body is None:
self._body = self.buffer.getvalue()
return self._body
body = property(_get_body)
def rethrow(self):
"""If there was an error on the request, raise an `HTTPError`."""
if self.error:
raise self.error
def __repr__(self):
args = ",".join("%s=%r" % i for i in sorted(self.__dict__.items()))
return "%s(%s)" % (self.__class__.__name__, args)
class cURLError(Exception):
"""Exception thrown for an unsuccessful HTTP request.
Attributes:
* ``code`` - HTTP error integer error code, e.g. 404. Error code 599 is
used when no HTTP response was received, e.g. for a timeout.
* ``response`` - `HTTPResponse` object, if any.
Note that if ``follow_redirects`` is False, redirects become HTTPErrors,
and you can look at ``error.response.headers['Location']`` to see the
destination of the redirect.
"""
def __init__(self, code, message=None, response=None):
self.code = code
self.message = message
self.response = response
super(cURLError, self).__init__(code, message, response)
def __str__(self):
return "cURL %d: %s" % (self.code, self.message)
if __name__ == '__main__':
ioloop = gruvi.get_hub().loop
c = cURLSMTPClient(ioloop)
r = cURLSMTPRequest(url='smtp://mail.mailinator.com/', mail_from='a@yahoo.com', mail_rcpt='test@mailinator.com',
message='hi')
future = gruvi.Fiber(c.req, (r,))
future.start()
gruvi.get_hub().switch()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment