Skip to content

Instantly share code, notes, and snippets.

@jsok
Created February 11, 2015 01:05
Show Gist options
  • Save jsok/da688c60302287d39e52 to your computer and use it in GitHub Desktop.
Save jsok/da688c60302287d39e52 to your computer and use it in GitHub Desktop.
Using boto.s3.bucket when your bucket name as dot(s) in it
import logging
import socket, ssl
import re
import boto
from boto.s3.connection import S3Connection, OrdinaryCallingFormat
from boto.https_connection import CertValidatingHTTPSConnection
logging.basicConfig(level=logging.WARNING)
class TestHttpConntection(CertValidatingHTTPSConnection):
# !! Unsafe on Python < 2.7.9
def __init__(self, *args, **kwargs):
CertValidatingHTTPSConnection.__init__(self, *args, **kwargs) # No super, it's an old-style class
self.ssl_ctx = ssl.create_default_context(cafile=self.ca_certs) # Defaults to cert validation
if self.cert_file is not None:
self.ssl_ctx.load_cert_chain(certfile=self.cert_file, keyfile=self.key_file)
def connect(self):
"Connect to a host on a given (SSL) port."
if hasattr(self, "timeout"):
sock = socket.create_connection((self.host, self.port), self.timeout)
else:
sock = socket.create_connection((self.host, self.port))
if re.match(".*\.s3.*\.amazonaws\.com", self.host):
patched_host = ".".join(self.host.rsplit(".", 4)[1:])
boto.log.warn("Connecting to '%s', validated as '%s'", self.host, patched_host)
self.sock = self.ssl_ctx.wrap_socket(sock, server_hostname=patched_host)
conn = S3Connection(https_connection_factory=(TestHttpConntection, ()))
bucket = con.get_bucket('sub.sub.domain.org')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment