Customizable CurlMulti Wrapper with influxdb support.
#!/usr/bin/env python -u
# -*- coding: utf8 -*-
import json
import pycurl
from io import BytesIO
from urllib.parse import urlencode, urlparse
from influxdb import InfluxDBClient
# Configuration Defaults
DEFAULTS = { 'influxdb': { 'host': ''
, 'port': '8086'
, 'user': 'root'
, 'pass': 'root'
, 'db': 'test'
, 'retention': '31d'
, 'pycurl' : { 'connect_timeout': 10
, 'timeout': 10
, 'max_conns': 10
, 'reuse': True
, 'output': { 'console': True
, 'headers': True
, 'timeline': True
, 'verbose': False
#{{{ Non-configuration constants
ANSII = { 'bold': '\033[1m'
, 'black': '\033[30m', 'red': '\033[31m' , 'green': '\033[32m', 'orange': '\033[33m'
, 'blue': '\033[34m', 'purple': '\033[35m' , 'cyan': '\033[36m', 'lightgrey': '\033[37m'
, 'end': '\033[0m'
, 'darkgrey': '\033[90m', 'lightred': '\033[91m', 'lightgreen': '\033[92m', 'yellow': '\033[93m'
, 'lightblue': '\033[94m', 'pink': '\033[95m', 'lightcyan': '\033[96m'
, 'size' : [ 'SIZE_UPLOAD', 'SIZE_DOWNLOAD' ]
, 'header' : [ 'HEADER_SIZE', 'REQUEST_SIZE' ]
, 'code' : [ 'HTTP_CODE', 'HTTP_CONNECTCODE' ]
, 'count' : [ 'NUM_CONNECTS', 'REDIRECT_COUNT' ]
, 'speed' : [ 'SPEED_UPLOAD', 'SPEED_DOWNLOAD' ]
OUTPUT = """Timeline:
|--{yellow}NAMELOOKUP{end} {namelookup_time}ms
|--|--{cyan}CONNECT{end} {connect_time}ms ({yellow}CONNECT{end} +{connect}ms)
|--|--|--{cyan}APPCONNECT{end} {appconnect_time}ms ({yellow}SSL{end} +{ssl}ms)
|--|--|--|--{cyan}PRETRANSFER{end} {pretransfer_time}ms ({yellow}PRETRANSFER{end} +{pretransfer_time}ms)
|--|--|--|--|--{cyan}STARTTRANSFER{end} {starttransfer_time}ms ({yellow}WAIT{end} +{server}ms)
|--|--|--|--|--|--{cyan}TOTAL{end} {total_time}ms ({yellow}RECEIVE{end} +{transfer}ms)
\--|--|--|--|--|--{lightred}REDIRECT{end} {redirect_time}ms
class influxDBWrapper(object):
#{{{ def __init__
def __init__(self):
self.points = []
self.influxdb_host = DEFAULTS['influxdb']['host']
self.influxdb_port = DEFAULTS['influxdb']['port']
self.influxdb_user = DEFAULTS['influxdb']['user']
self.influxdb_pass = DEFAULTS['influxdb']['pass']
self.influxdb_db = DEFAULTS['influxdb']['db']
self.influxdb_retention = DEFAULTS['influxdb']['retention']
#{{{ def connect
def connect(self):
self.influxdb = InfluxDBClient(self.influxdb_host, self.influxdb_port, self.influxdb_user, self.influxdb_pass, self.influxdb_db)
self.influxdb.create_retention_policy(self.influxdb_retention, self.influxdb_retention, 1, default=True)
#{{{ def ping
def ping(self):
print(self.influxdb.request('ping', expected_response_code=204))
#{{{ def add
def add(self, p):
#{{{ def push
def push(self, points=None):
if not points:
points = self.points
#{{{ def print
def print(self):
#{{{ def drop_db
def drop_db(self):
class CurlWrapper(object):
#{{{ def __init__
def __init__(self):
self.requests = []
self.connect_timeout = DEFAULTS['pycurl']['connect_timeout']
self.timeout = DEFAULTS['pycurl']['timeout']
self.max_conns = DEFAULTS['pycurl']['max_conns']
self.reuse = DEFAULTS['pycurl']['reuse']
self.output = DEFAULTS['pycurl']['output']
#{{{def fetch
def fetch(self, r):
return self.multi_requests([r])
#{{{ def add
def add(self, r):
#{{{ def perform
def perform(self, max_conns=None):
return self.multi_requests(self.requests, max_conns)
#{{{ def multi_requests
def multi_requests(self, requests, max_conns=None):
responses = []
num_handles = 0
max_conns = max_conns or self.max_conns
cm = pycurl.CurlMulti()
#Loop while we got requests to launch or running handles
while requests or num_handles:
# Debug
if self.output['verbose']:
print( '\r'
, 'requests', len(self.requests)
, 'responses', len(responses)
, 'num_handles', num_handles
, end=''
# Feed CurlMulti with requests
while requests and num_handles < max_conns:
r = requests.pop(0)
c = self.prepare_curl_handle(**r)
# Add handle to cm
num_handles += 1
# Add to results to avoid c losing scope
# Call cm.perform forever
while True:
ret, num_handles = cm.perform()
# Break the loop when cm wants to wait
if ret != pycurl.E_CALL_MULTI_PERFORM:
# Get the results
_, ok_list, err_list = cm.info_read()
# Process the ok_list
for c in ok_list:
# Process the err_list
for c, errno, errmsg in err_list:
c.errno, c.errmsg = str(errno), str(errmsg)
# Generate timings and outputs
for c in responses:
if c.output['console']:
return responses
#{{{ def prepare_curl_handle
def prepare_curl_handle(self, url, c=None, method='GET', params=None, headers=None, reuse=None, resolve=None, ua=None, id=None, output={}, tags={}, **kwargs):
# Create the Curl Handle and set defaults
if not c:
c = pycurl.Curl()
c.res = {}
c.res['id'] = 'default'
c.res['tags'] = {}
c.res['reuse'] = self.reuse
c.errno, c.errmsg = None, None
c.output = dict(self.output)
# Don't want to share the DNS cache between handles
s = pycurl.CurlShare()
s.setopt(pycurl.SH_SHARE, pycurl.LOCK_DATA_DNS)
c.setopt(pycurl.SHARE, s)
# Set output options
for o, b in output.items():
if o in self.output and type(b) == bool:
c.output[o] = b
# Enable verbose mode if needed
if c.output['verbose']:
c.setopt(c.VERBOSE, 1)
# Avoid connection reuse if needed
if reuse == None:
c.res['reuse'] = self.reuse
if c.res['reuse'] == False:
c.setopt(pycurl.FORBID_REUSE, 1)
# Set request method
method = method.upper()
if method == 'GET':
c.setopt(c.HTTPGET, 1)
elif method == 'POST':
c.setopt(c.POST, 1)
c.setopt(c.CUSTOMREQUEST, method)
# Set request params
if params:
if not isinstance(params, str):
print('Request params must be a string.')
if method == 'GET':
url = url + '?' + params
c.setopt(c.POSTFIELDS, params)
# User Agent
if ua and isinstance(ua, str):
c.setopt(c.USERAGENT, ua)
# Fake DNS cache
if resolve and isinstance(resolve, list):
c.setopt(c.RESOLVE, resolve)
# Request Headers
if headers and isinstance(headers, dict):
c.setopt(c.HTTPHEADER, [ '{}: {}'.format(key, value) for key, value in headers.items() ])
# Response Headers
c.headers_text = ""
c.headers = {}
def headers_cb(x):
txt = x.decode('ascii')
c.headers_text += txt
if ': ' in txt:
k, v = txt.split(': ',1)
c.headers[k.strip()] = v.strip()
c.setopt(c.HEADERFUNCTION, headers_cb)
# Setup the curl object
c.buffer = BytesIO()
c.setopt(c.WRITEDATA, c.buffer)
c.setopt(c.URL, url)
c.setopt(c.CONNECTTIMEOUT, self.connect_timeout)
c.setopt(c.TIMEOUT, self.timeout)
c.setopt(c.FOLLOWLOCATION, 1)
c.setopt(c.MAXREDIRS, 5)
c.setopt(c.NOSIGNAL, 1)
c.setopt(c.COOKIEJAR, '/dev/null')
# Keep some info we'll need later
c.res['method'] = method
c.res['url'] = url
if id:
c.res['id'] = id
if tags:
c.res['tags'] = tags
# Store kwargs, they can be of use later
if hasattr(c, 'kwargs'):
c.kwargs = {**c.kwargs, **kwargs}
c.kwargs = kwargs
return c
#{{{ def get_timings
def get_timings(self, c):
# Populate tags
t = c.tags = {}
t['id'] = c.res['id']
t['method'] = c.res['method']
t['primary_ip'] = c.getinfo(c.PRIMARY_IP)
if hasattr(c, 'errno'):
t['errno'], t['errmsg'] = c.errno, c.errmsg
# Dismante the url
t['url'] = c.res['url']
uparse = urlparse(t['url'])
t['scheme'], t['hostname'], t['path'] = uparse.scheme, uparse.hostname, uparse.path or '/'
# Dismantle the effective url
t['effective_url'] = c.getinfo(pycurl.EFFECTIVE_URL)
euparse = urlparse(t['effective_url'])
t['effective_scheme'], t['effective_hostname'], t['effective_path'] = euparse.scheme, euparse.hostname, euparse.path or '/'
# Overwrite the tags dictionary with the provided one
for k, v in c.res['tags'].items():
t[k] = v
# Populate measurements
m = c.measurements = {}
for type, names in CURLOPTS.items():
m[type] = {}
for name in names:
func = getattr(c, name)
name = name.lower()
m[type][name] = c.getinfo(func)
# Generate the range between timings
r = ranges = {}
r['dns'], r['redirect'] = m['time']['namelookup_time'], m['time']['redirect_time']
r['connect'] = m['time']['connect_time'] - m['time']['namelookup_time']
r['ssl'] = 0.0
if m['time']['appconnect_time'] > 0:
r['ssl'] = m['time']['appconnect_time'] - m['time']['connect_time']
r['server'] = m['time']['starttransfer_time'] - m['time']['pretransfer_time']
r['transfer'] = m['time']['total_time'] - m['time']['starttransfer_time']
m['range'] = r
# Extra, non metric material
c.res['content_type'] = c.getinfo(c.CONTENT_TYPE) or None
c.res['encoding'] = 'iso-8859-1'
if c.res['content_type']:
content_type = c.res['content_type'].lower()
if 'charset=' in content_type:
c.res['encoding'] = content_type.split('=')[-1]
# Generate the Timeline
tr = time_and_ranges = {}
for k,v in tr.items():
tr[k] = int(v*1000)
c.res['timeline'] = OUTPUT.format(**tr)
# Store Body and Headers info
c.res['primary_ip'] = c.getinfo(c.PRIMARY_IP)
c.res['primary_port'] = c.getinfo(c.PRIMARY_PORT)
c.res['local_ip'] = c.getinfo(c.LOCAL_IP)
c.res['local_port'] = c.getinfo(c.LOCAL_PORT)
c.res['body'] = c.buffer.getvalue()
c.res['body_text'] = str(c.buffer.getvalue().decode(c.res['encoding']))
c.res['body_size'] = len(c.buffer.getvalue().decode(c.res['encoding']))
c.res['headers'] = c.headers
c.res['headers_text'] = c.headers_text
c.res['headers_size'] = len(c.headers_text)
#{{{ def print_info
def print_info(self, c):
t, m = c.tags, c.measurements
header_format = ( ANSII['bold']
, c.res['id'], c.res['method'], c.res['url']
, c.res['local_ip'] or None, c.res['local_port'] or None, c.res['primary_ip'] or None, c.res['primary_port'] or None
, c.res['reuse']
, ANSII['end']
print('{}[{}] {} \'{}\' {}:{} -> {}:{} Reuse:{}{}'.format(*header_format))
# Response Info
if m['code']['http_code'] == 200:
resp_color = ANSII['green']
elif m['code']['http_code'] == 0:
resp_color = ANSII['red']
resp_color = ANSII['yellow']
resp_format = ( resp_color
, m['code']['http_code']
, int(m['size']['size_download']) , int(m['time']['total_time']*1000)
, int(m['range']['dns']*1000), int(m['range']['connect']*1000), int(m['range']['ssl']*1000), int(m['range']['server']*1000), int(m['range']['transfer']*1000)
, int(m['range']['redirect']*1000)
, c.res['content_type']
, ANSII['end']
print('{}>> {} <{}B> {}ms [DNS(+{}ms) TCP(+{}ms) SSL(+{}ms) TTFB(+{}ms) TTLB(+{}ms)] RED(+{}ms) {}{}'.format(*resp_format))
# Headers
if c.output['headers']:
for k,v in c.headers.items():
print('{lightgrey}{}{end}: {cyan}{}{end}'.format(k,v,**ANSII))
# Error code and message
if m['code']['http_code'] == 0:
print('{}Error {}: {}{}'.format(ANSII['red'], t['errno'], t['errmsg'], ANSII['end']))
if c.output['timeline']:
#{{{ def getpoints
def getpoints(self, res):
ps = points = []
for c in res:
for m, f in c.measurements.items():
p = { 'measurement' : m
, 'tags' : c.tags
, 'fields' : f
#, 'time' : int(time.time())
#, 'time' : strftime ("%Y-%m-%d %H:%M:%S"))
return ps
if __name__ == '__main__':
# Params to send
params_dic = {'arg1':1,'arg2':2,'arg3':3}
params_str = str(params_dic)
params_enc = urlencode(params_dic)
# Create our CurlWrapper instance
cw = CurlWrapper()
# Set Some Globals
cw.output['influxdb'] = True
cw.output['console'] = True
cw.output['headers'] = True
cw.output['timeline'] = True
cw.output['verbose'] = False
cw.connect_timeout = 10
cw.reuse = True
# Add some queries
cw.add({'url':'notfound'}) # DNS not found
cw.add({'url':'lol://what'}) # protocol not found
cw.add({'url':''}) # Expired SSL Cert
cw.add({'url':''}) # Return JSON
cw.add({'url':''}) # simple get
cw.add({'url':''}) # 404
cw.add({'url':''}) # 500
cw.add({'url':''}) # get with inline args
cw.add({'url':'','params':params_enc}) # get with arg pairs
cw.add({'url':'','method':'POST','params':params_str,'headers':{'Content-type':'application/json'}}) # post json
cw.add({'url':'','method':'POST','params':params_enc}) # post form-urlencoded form data
cw.add({'url':'','output':{'timeline':False,'headers':False},'id':'concrete','ua':'IE6.0','reuse':False}) # Changing options for concrete query
cw.add({'url':'','tags':{'hostname':'httpbin_custom'},'id':'changing_tag'}) # Overwriting the hostname tag
# Run all queries at once
responses = cw.perform(max_conns=10)
# Get all the influxDB points
points = cw.getpoints(responses)
# Create a influxDBWrapper class
iw = influxDBWrapper()
# Send the points to influxDB
# Run a one-shot query
