Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Using Boto S3 with overriding the actual connection endpoint, but not the apparent hostname.
# This how you can control where boto/requests are actually connecting to,
# while not otherwise mucking with ANY of the HTTP headers, which could invalidate AWS signatures
# The only DNS lookup that should be triggered by this is from the PATCH_new_conn function.
# No other DNS lookups should occur.
#
# For example, this allows you to configure a Ceph RGW instance to respond to the AWS hostname s3.amazonaws.com,
# and then test it, without any modification of your local DNS.
#
# Copyright Robin Johnson 2016 <robin.johnson@dreamhost.com>
import requests.packages.urllib3.connection
import boto
import boto.s3.connection
class CustomHTTPConnection_requests(requests.packages.urllib3.connection.HTTPConnection):
def __init__(self, *args, **kw):
self.connect_host = kw.pop('connect_host', None)
self.connect_port = kw.pop('connect_port', None)
#print('Spawned %s forced via %s:%s' % (self.__class__, self.connect_host, self.connect_port))
return super(CustomHTTPConnection_requests, self).__init__(*args, **kw)
class CustomHTTPSConnection_requests(requests.packages.urllib3.connection.HTTPSConnection):
def __init__(self, *args, **kw):
self.connect_host = kw.pop('connect_host', None)
self.connect_port = kw.pop('connect_port', None)
#print('Spawned %s forced via %s:%s' % (self.__class__, self.connect_host, self.connect_port))
return super(CustomHTTPSConnection_requests, self).__init__(*args, **kw)
def PATCH_new_conn(self):
""" Establish a socket connection, possibly to an overridden host/port.
:return: a new socket connection
"""
# resolve hostname to an ip address; use your own
# resolver here, as otherwise the system resolver will be used.
(host, port) = (self.host, self.port)
if self.connect_host is not None:
host = self.connect_host
if self.connect_port is not None:
port = self.connect_port
try:
conn = socket.create_connection(
(host, port),
self.timeout,
self.source_address,
)
except (AttributeError, TypeError) as e: # Python 2.6
conn = socket.create_connection(
(host, port),
self.timeout,
)
return conn
CustomHTTPConnection_requests._new_conn = PATCH_new_conn
CustomHTTPSConnection_requests._new_conn = PATCH_new_conn
# This is the "Factory" that boto.s3.connection.S3Connection expects.
def HTTPConnectionFactoryFactory(connect_host, connect_port, ssl=False):
cls = CustomHTTPConnection_requests
if ssl:
cls = CustomHTTPSConnection_requests
def f(*args, **kw):
#print('Factory: Prepared %s forced via %s:%s' % (cls, connect_host, connect_port))
(kw['connect_host'], kw['connect_port']) = (connect_host, connect_port)
return cls(*args, **kw)
return f
def __main__():
connect_host = 'some-random-hostname'
connect_port = 7480 # Ceph RGW directly!
connect_ssl = False
factory = HTTPConnectionFactoryFactory(connect_host, connect_port, ssl=connect_ssl)
factory_arg = (factory, ()) # second tuple element is for error handling
s3conn = boto.s3.connection.S3Connection('<aws access key>', '<aws secret key>', https_connection_factory=factory_arg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.