Skip to content

Instantly share code, notes, and snippets.

@sjperkins
Last active March 29, 2023 15:24
Show Gist options
  • Save sjperkins/15d3d196387cff8b6dc0dcca7f756fe8 to your computer and use it in GitHub Desktop.
Save sjperkins/15d3d196387cff8b6dc0dcca7f756fe8 to your computer and use it in GitHub Desktop.
PUT operation in S3 Rest API with AWS Signature Generation
import argparse
from configparser import ConfigParser
from collections import namedtuple
from datetime import datetime
from difflib import diff_bytes
import os
import os.path
from pprint import pprint
from urllib.parse import urlparse, quote
from hashlib import sha256
import hashlib
import hmac
from xml.etree import ElementTree
import requests
AwsConfig = namedtuple("AwsConfig", "aws_access_key,aws_secret_access_key,aws_region")
def read_aws_config() -> AwsConfig:
credentials = ConfigParser()
credentials.read(os.path.expanduser("~/.aws/credentials"))
aws_access_key = credentials.get("default", "aws_access_key_id")
aws_secret_key = credentials.get("default", "aws_secret_access_key")
config = ConfigParser()
config.read(os.path.expanduser("~/.aws/config"))
aws_region = config.get("default", "region")
return AwsConfig(aws_access_key, aws_secret_key, aws_region)
UriBits = namedtuple("UrlBits", "bucket,path,query")
def parse_url(url: str) -> UriBits:
url_bits = urlparse(url)
if url_bits.scheme != "s3":
raise argparse.ArgumentTypeError(f"{url} scheme '{url_bits.scheme}' != 's3'")
if url_bits.query != "":
raise argparse.ArgumentTypeError(f"Query strings not yet handled '{url_bits.query}'")
return UriBits(url_bits.netloc, url_bits.path, {})
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser()
p.add_argument("url", type=parse_url)
return p.parse_args()
def canonical_request(http_method: str, uri: str, query: dict, headers: dict, payload: bytes) -> str:
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
# Task 1: Create a Canonical Request
assert http_method in {"HEAD", "GET", "PUT", "DELETE"}
assert "host" in headers
lower_keys = sorted(k.lower() for k in headers.keys())
query_str = "&".join(f"{quote(k.encode('utf-8'))}={quote(v.encode('utf-8'))}" for k, v in query.items())
header_str = "\n".join(f"{k}:{headers[k].strip()}" for k in lower_keys)
signed_header_str = ";".join(lower_keys)
parts = (http_method, uri, query_str, f"{header_str}\n", signed_header_str, sha256(payload).hexdigest())
return "\n".join(item for item in parts)
def signing_str(canonical_request: str, today: datetime, aws_region: str) -> str:
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
# Task 2: Create a String to Sign
return (f"AWS4-HMAC-SHA256\n"
f"{today.strftime('%Y%m%dT%H%M%SZ')}\n"
f"{today.strftime('%Y%m%d')}/{aws_region}/s3/aws4_request\n"
f"{sha256(canonical_request.encode('utf-8')).hexdigest()}")
def auth_header(aws_config, headers, today, signature):
# https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html
credential = f"{aws_config.aws_access_key}/{today.strftime('%Y%m%d')}/{aws_config.aws_region}/s3/aws4_request"
signed_header_str = ";".join(k.lower() for k in sorted(headers.keys()))
return ",".join((
f"AWS4-HMAC-SHA256 Credential={credential}",
f"SignedHeaders={signed_header_str}",
f"Signature={signature}"
))
def hmac_256(key: str, data: str) -> str:
return hmac.new(key, msg=data.encode("utf-8"), digestmod=sha256)
def signature(aws_config: AwsConfig, today: datetime, signing_str: str) -> str:
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
# See https://stackoverflow.com/q/13019203/1611416
secret_key = f"AWS4{aws_config.aws_secret_access_key}".encode("utf-8")
date_key = hmac_256(secret_key, today.strftime("%Y%m%d")).digest()
date_region_key = hmac_256(date_key, aws_config.aws_region).digest()
date_region_service_key = hmac_256(date_region_key, "s3").digest()
signing_key = hmac_256(date_region_service_key, "aws4_request").digest()
return hmac_256(signing_key, signing_str).hexdigest()
def test_signature_calculation():
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html
# These values from worked exapmle in "Example: GET Object" Section
aws_access_key = "AKIAIOSFODNN7EXAMPLE"
aws_secret_access_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
aws_region = "us-east-1"
aws_service = "s3"
date = "20130524"
string_to_sign = """AWS4-HMAC-SHA256
20130524T000000Z
20130524/us-east-1/s3/aws4_request
7344ae5b7ee6c3e7e6b0fe0640412a37625d1fbfff95c48bbb2dc43964946972"""
expected = "f0e8bdb87c964420e857bd35b5d6ed310bd44f0170aba48dd91039c6036bdb41"
# See https://stackoverflow.com/q/13019203/1611416
date_key = hmac_256(f"AWS4{aws_secret_access_key}".encode("utf-8"), date).digest()
date_region_key = hmac_256(date_key, aws_region).digest()
date_region_service_key = hmac_256(date_region_key, aws_service).digest()
signing_key = hmac_256(date_region_service_key, "aws4_request").digest()
signature = hmac_256(signing_key, string_to_sign).hexdigest()
assert signature == expected, f"{signature} != {expected}"
if __name__ == "__main__":
test_signature_calculation()
aws_config = read_aws_config()
args = parse_args()
pprint(aws_config)
pprint(args)
payload = os.urandom(32)
host = f"{args.url.bucket}.s3.{aws_config.aws_region}.amazonaws.com"
today = datetime.utcnow()
headers = {
"host": host,
"x-amz-content-sha256": sha256(payload).hexdigest(),
"x-amz-date": today.strftime("%Y%m%dT%H%M%SZ"),
"x-amz-expires": "3600",
}
creq = canonical_request(
"PUT",
args.url.path,
args.url.query,
headers,
payload)
sstr = signing_str(creq, today, aws_config.aws_region)
sig = signature(aws_config, today, sstr)
print(f"Canonical Request\n"
f"-----------------\n"
f"{creq}\n"
f"-----------------\n")
print(f"Signing String\n"
f"--------------\n"
f"{sstr}\n"
f"--------------\n")
print(f"Signature\n"
f"---------\n"
f"{sig}\n"
f"---------\n")
headers = {
"Authorization": auth_header(aws_config, headers, today, sig),
**headers
}
url = f"http://{headers['host']}{args.url.path}"
response = requests.put(url, data=payload, headers=headers)
if not response.ok:
tree = ElementTree.fromstring(response.text)
for sign_bytes in tree.findall("StringToSignBytes"):
aws_signing_str = bytes.fromhex(sign_bytes.text.replace(" ", ""))
if aws_signing_str != sstr.encode("utf-8"):
print("Mismatched String to Sign")
print(aws_signing_str)
print(sstr.encode('utf-8'))
for canonical_bytes in tree.findall("CanonicalRequestBytes"):
aws_canonical_str = bytes.fromhex(canonical_bytes.text.replace(" ", ""))
if aws_canonical_str != creq.encode('utf-8'):
print("Mismatched Canonical Strings")
print(aws_canonical_str)
print(creq.encode('utf-8'))
for i, (a, b) in enumerate(zip(aws_canonical_str, creq.encode("utf-8"))):
if a != b:
print(f"Bytes {i} differs {chr(a)} != {chr(b)}")
import requests_toolbelt.utils.dump
data = requests_toolbelt.utils.dump.dump_all(response)
print("\n\n")
print(data.decode("utf-8", errors="ignore"))
else:
print(response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment