Skip to content

Instantly share code, notes, and snippets.

@rafaelhenrique
Created November 4, 2015 17:44
Show Gist options
  • Save rafaelhenrique/2ba5153031fcd63caa52 to your computer and use it in GitHub Desktop.
Save rafaelhenrique/2ba5153031fcd63caa52 to your computer and use it in GitHub Desktop.
https_request_wrapper to determine encryption of HTTPS
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import ssl
import socket
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.poolmanager import PoolManager
class HTTPSAdapter(HTTPAdapter):
""""
Transport adapter that allows us to use different encryption.
"""
def __init__(self, server, port, *args, **kwargs):
self.server = server
self.port = port
super(HTTPSAdapter, self).__init__(*args, **kwargs)
def init_poolmanager(self, connections, maxsize, block=False):
ssl_version = self.verify_encryption[1]
self.poolmanager = PoolManager(num_pools=connections,
maxsize=maxsize,
block=block,
ssl_version=ssl_version)
@property
def verify_encryption(self):
versions = [
getattr(ssl, 'PROTOCOL_TLSv1', None),
getattr(ssl, 'PROTOCOL_SSLv23', None),
getattr(ssl, 'PROTOCOL_TLSv1_1', None),
getattr(ssl, 'PROTOCOL_TLSv1_2', None),
]
for version in filter(None, versions):
try:
socket_ssl = ssl.wrap_socket(socket.socket(
socket.AF_INET, socket.SOCK_STREAM),
server_side=False,
cert_reqs=ssl.CERT_NONE,
ca_certs=None,
do_handshake_on_connect=True,
suppress_ragged_eofs=True,
ssl_version=version
)
socket_ssl.settimeout(5)
socket_ssl.connect((self.server, self.port))
cipher = socket_ssl.cipher()
ssl_version = socket_ssl.ssl_version
socket_ssl.close()
return cipher, ssl_version
except ssl.SSLError as e:
print(e)
except socket.error as e:
print(e)
finally:
socket_ssl.close()
raise Exception("Not encryption verified.")
def https_request_wrapper(url, server, port):
request_session = requests.Session()
adapter = HTTPSAdapter(server=server, port=port)
request_session.mount(url, adapter)
return request_session
if __name__ == "__main__":
# How use this??? See here....
request_session = https_request_wrapper(
url='https://127.0.0.1',
server='127.0.0.1',
port=443,
)
headers = {
'Content-Type': 'text/xml; charset=utf-8',
'SOAPAction': ('http://127.0.0.1/GenericAction')
}
url = ('https://127.0.0.1/GenericAction.svc')
# Read SOAP xml / REST is accepted
with open('payload_test.xml') as fp:
payload = fp.read()
# format payload to send
payload = payload.replace("\n", "").encode('utf-8')
response = request_session.post(
url, data=payload, headers=headers, verify=False)
import xml.dom.minidom
xml = xml.dom.minidom.parseString(response.content).toprettyxml()
print(xml)
print(response.status_code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment