Created
November 4, 2015 17:44
-
-
Save rafaelhenrique/2ba5153031fcd63caa52 to your computer and use it in GitHub Desktop.
https_request_wrapper to determine encryption of HTTPS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import absolute_import | |
import ssl | |
import socket | |
import requests | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.poolmanager import PoolManager | |
class HTTPSAdapter(HTTPAdapter): | |
"""" | |
Transport adapter that allows us to use different encryption. | |
""" | |
def __init__(self, server, port, *args, **kwargs): | |
self.server = server | |
self.port = port | |
super(HTTPSAdapter, self).__init__(*args, **kwargs) | |
def init_poolmanager(self, connections, maxsize, block=False): | |
ssl_version = self.verify_encryption[1] | |
self.poolmanager = PoolManager(num_pools=connections, | |
maxsize=maxsize, | |
block=block, | |
ssl_version=ssl_version) | |
@property | |
def verify_encryption(self): | |
versions = [ | |
getattr(ssl, 'PROTOCOL_TLSv1', None), | |
getattr(ssl, 'PROTOCOL_SSLv23', None), | |
getattr(ssl, 'PROTOCOL_TLSv1_1', None), | |
getattr(ssl, 'PROTOCOL_TLSv1_2', None), | |
] | |
for version in filter(None, versions): | |
try: | |
socket_ssl = ssl.wrap_socket(socket.socket( | |
socket.AF_INET, socket.SOCK_STREAM), | |
server_side=False, | |
cert_reqs=ssl.CERT_NONE, | |
ca_certs=None, | |
do_handshake_on_connect=True, | |
suppress_ragged_eofs=True, | |
ssl_version=version | |
) | |
socket_ssl.settimeout(5) | |
socket_ssl.connect((self.server, self.port)) | |
cipher = socket_ssl.cipher() | |
ssl_version = socket_ssl.ssl_version | |
socket_ssl.close() | |
return cipher, ssl_version | |
except ssl.SSLError as e: | |
print(e) | |
except socket.error as e: | |
print(e) | |
finally: | |
socket_ssl.close() | |
raise Exception("Not encryption verified.") | |
def https_request_wrapper(url, server, port): | |
request_session = requests.Session() | |
adapter = HTTPSAdapter(server=server, port=port) | |
request_session.mount(url, adapter) | |
return request_session | |
if __name__ == "__main__": | |
# How use this??? See here.... | |
request_session = https_request_wrapper( | |
url='https://127.0.0.1', | |
server='127.0.0.1', | |
port=443, | |
) | |
headers = { | |
'Content-Type': 'text/xml; charset=utf-8', | |
'SOAPAction': ('http://127.0.0.1/GenericAction') | |
} | |
url = ('https://127.0.0.1/GenericAction.svc') | |
# Read SOAP xml / REST is accepted | |
with open('payload_test.xml') as fp: | |
payload = fp.read() | |
# format payload to send | |
payload = payload.replace("\n", "").encode('utf-8') | |
response = request_session.post( | |
url, data=payload, headers=headers, verify=False) | |
import xml.dom.minidom | |
xml = xml.dom.minidom.parseString(response.content).toprettyxml() | |
print(xml) | |
print(response.status_code) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment