Skip to content

Instantly share code, notes, and snippets.

@hoov
Created May 6, 2016 17:38
Show Gist options
  • Save hoov/c633db9609b27c4a44dc4069673c70bb to your computer and use it in GitHub Desktop.
Save hoov/c633db9609b27c4a44dc4069673c70bb to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import argparse
import logging
import ssl
import select
import urllib3.contrib.pyopenssl
from OpenSSL import SSL
from ndg.httpsclient.https import HTTPSConnection
from ndg.httpsclient.ssl_peer_verification import ServerSSLCertVerification
from urllib3.connection import HTTPSConnection as HTTPSConnection_
from urllib3.contrib.pyopenssl import WrappedSocket, _openssl_verify, _verify_callback, DEFAULT_SSL_CIPHER_LIST
logger = logging.getLogger(__name__) if __name__ is not "__main__" else logging.getLogger(__file__)
# This was shamefully lifted from urllib3.connection, but we need it to do slightly different things.
def ssl_wrap_socket(sock, keyfile=None, certfile=None, cert_reqs=ssl.CERT_NONE,
ca_certs=None, server_hostname=None,
ssl_version=None, ca_cert_dir=None):
ctx = SSL.Context(ssl_version)
if certfile:
keyfile = keyfile or certfile # Match behaviour of the normal python ssl library
ctx.use_certificate_file(certfile)
if keyfile:
ctx.use_privatekey_file(keyfile)
if cert_reqs != ssl.CERT_NONE:
ctx.set_verify(_openssl_verify[cert_reqs], _verify_callback)
if ca_certs or ca_cert_dir:
try:
ctx.load_verify_locations(ca_certs, ca_cert_dir)
except SSL.Error as e:
raise ssl.SSLError('bad ca_certs: %r' % ca_certs, e)
else:
ctx.set_default_verify_paths()
# Disable TLS compression to mitigate CRIME attack (issue #309)
OP_NO_COMPRESSION = 0x20000
ctx.set_options(OP_NO_COMPRESSION)
# Set list of supported ciphersuites.
ctx.set_cipher_list(DEFAULT_SSL_CIPHER_LIST)
cnx = SSL.Connection(ctx, sock)
if server_hostname:
server_hostname = server_hostname.encode('utf-8')
cnx.set_tlsext_host_name(server_hostname)
cnx.set_connect_state()
while True:
try:
cnx.do_handshake()
except SSL.WantReadError:
rd, _, _ = select.select([sock], [], [], sock.gettimeout())
if not rd:
raise timeout('select timed out')
continue
except SSL.Error as e:
raise ssl.SSLError('bad handshake: %r' % e)
break
return WrappedSocket(cnx, sock)
class OurHTTPSConnection(HTTPSConnection_):
def connect(self):
conn = self._new_conn()
self._prepare_conn(conn)
self.sock = ssl_wrap_socket(conn, self.key_file, self.cert_file, cert_reqs=ssl.CERT_NONE,
ssl_version=SSL.TLSv1_2_METHOD)
def make_ssl_request_urllib3(timeout=None, verify=False):
HOSTNAME = "thoover.dev.insightsquared.com"
logger.info("Making SSL request with timeout %r", timeout)
conn = OurHTTPSConnection(HOSTNAME, port=9443)
conn.connect()
logger.info("Socket class is %r", conn.sock.socket.__module__)
try:
conn.request("GET", "/login/")
resp = conn.getresponse()
logger.info("Got a %d %s back", resp.status, resp.reason)
resp.read()
finally:
conn.close()
def gevent_main():
from gevent import monkey
monkey.patch_all()
main()
def main():
logger.info("Trying urllib3")
make_ssl_request_urllib3(2000)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
subparser = parser.add_subparsers(dest="mode")
subparser.add_parser("gevent")
subparser.add_parser("normal")
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG)
logging.captureWarnings(True)
if args.mode == "normal":
main()
elif args.mode == "gevent":
gevent_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment