Skip to content

Instantly share code, notes, and snippets.

@andreif
Last active May 27, 2018 02:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andreif/7597c434fd93f28fb3fed02030e8e774 to your computer and use it in GitHub Desktop.
Save andreif/7597c434fd93f28fb3fed02030e8e774 to your computer and use it in GitHub Desktop.
import base64
import datetime
import hashlib
import hmac
import io
import json
import mimetypes
import os
import uuid
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen
def guess_content_type(path):
return mimetypes.guess_type(path)[0] or 'application/octet-stream'
class FormData(object):
def __init__(self, data=None, files=None):
self.boundary = uuid.uuid4().hex
self._content = io.BytesIO()
for name, value in (data or {}).items():
self.write_part(value=value, name=name)
for name, path in (files or {}).items():
with open(path, 'rb') as fd:
value = fd.read()
self.write_part(value=value, name=name,
filename=os.path.basename(path),
content_type=guess_content_type(path))
def write_part(self, value, content_type=None, **kwargs):
params = ''.join([f'; {k}="{v}"' for k, v in kwargs.items()])
ct = f'\r\nContent-Type: {content_type}' if content_type else ''
self._content.write(
f'--{self.boundary}\r\n'
f'Content-Disposition: form-data{params}{ct}\r\n\r\n'.encode())
if not isinstance(value, (str, bytes)):
value = str(value)
if isinstance(value, str):
value = value.encode()
self._content.write(value + b'\r\n')
@property
def content_type(self):
return f'multipart/form-data; boundary={self.boundary}'
@property
def body(self):
return self._content.getvalue() + \
f'--{self.boundary}--\r\n'.encode()
class IAM(object):
def __init__(self, key, secret, token=None):
self.key = key
self.secret = secret
self.token = token
class S3Bucket(object):
def __init__(self, name, region, iam):
self.name = name
self.region = region
self.iam = iam
def upload(self, path, key,
acl='public-read', # 'private'
expires_in=60 * 5,
content_type=None,
cache_control=f'max-age={60 * 60 * 24 * 30}',
content_disposition='attachment',
encryption='AES256',
content_length_range=(1000, 20 * 1000 * 1000), # default: any
):
# The literal string '${filename}' is an S3 field variable for key.
# https://aws.amazon.com/articles/1434#aws-table
if key.endswith('/'):
key += '${filename}'
key = key.lstrip('/')
now = datetime.datetime.utcnow()
expires = now + datetime.timedelta(seconds=expires_in)
iso_now = now.strftime('%Y%m%dT%H%M%S000Z')
raw_date = now.strftime('%Y%m%d')
cred = f'{self.iam.key}/{raw_date}/{self.region}/s3/aws4_request'
algorithm = 'AWS4-HMAC-SHA256'
content_type = content_type or guess_content_type(path)
policy = {
'expiration': expires.strftime('%Y-%m-%dT%H:%M:%S.000Z'),
'conditions': [
{'bucket': self.name},
{'acl': acl},
['starts-with', '$key', ''],
{'success_action_status': '201'},
{'x-amz-credential': cred},
{'x-amz-algorithm': algorithm},
{'x-amz-date': iso_now},
{'content-type': content_type},
]
}
data = {
'success_action_status': 201,
'x-amz-credential': cred,
'x-amz-date': iso_now,
'x-amz-algorithm': algorithm,
'key': key,
'acl': acl,
'content-type': content_type,
}
if self.iam.token:
policy['conditions'].append(
{'x-amz-security-token': self.iam.token})
data['x-amz-security-token'] = self.iam.token
if cache_control:
policy['conditions'].append({'Cache-Control': cache_control})
data['Cache-Control'] = cache_control
if content_disposition:
policy['conditions'].append({
'Content-Disposition': content_disposition
})
data['Content-Disposition'] = content_disposition
with open(path, 'rb') as f:
md5 = base64.b64encode(hashlib.md5(f.read()).digest())
policy['conditions'].append({
'Content-MD5': md5.decode(),
})
data['Content-MD5'] = md5
if encryption:
policy['conditions'].append(
{'x-amz-server-side-encryption': encryption})
data['x-amz-server-side-encryption'] = encryption
if content_length_range:
policy['conditions'].append(
['content-length-range'] + list(content_length_range))
policy = base64.b64encode(json.dumps(policy).encode())
date_key = self.hmac(
key='AWS4' + self.iam.secret, msg=raw_date).digest()
date_region_key = self.hmac(
key=date_key, msg=self.region).digest()
date_region_service_key = self.hmac(
key=date_region_key, msg='s3').digest()
signing_key = self.hmac(
key=date_region_service_key, msg='aws4_request').digest()
signature = self.hmac(
key=signing_key, msg=policy).hexdigest()
data['policy'] = policy
data['x-amz-signature'] = signature
data = FormData(data=data, files={'file': path})
try:
with urlopen(Request(
url=self.url,
data=data.body,
headers={'Content-Type': data.content_type}
)) as res:
res.content = res.read()
return res
except HTTPError as e:
e.fp.content = e.fp.read()
return e.fp
except URLError as e:
print(e)
def hmac(self, key, msg):
if isinstance(key, str):
key = key.encode('utf-8')
if isinstance(msg, str):
msg = msg.encode('utf-8')
return hmac.new(key=key, msg=msg, digestmod=hashlib.sha256)
@property
def url(self):
return 'https://s3{0}.amazonaws.com/{1}'.format(
'' if not self.region or self.region == 'us-east-1'
else '-' + self.region,
self.name)
if __name__ == '__main__':
bucket = S3Bucket(
name=os.environ['S3_BUCKET'],
region=os.environ['S3_REGION'],
iam=IAM(key=os.environ['S3_KEY'],
secret=os.environ['S3_SECRET']))
r = bucket.upload(path=__file__, key='/')
if r:
print(r)
print(r.status)
print(r.content.decode())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment