Last active
March 29, 2023 15:24
-
-
Save sjperkins/15d3d196387cff8b6dc0dcca7f756fe8 to your computer and use it in GitHub Desktop.
PUT operation in S3 Rest API with AWS Signature Generation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from configparser import ConfigParser | |
from collections import namedtuple | |
from datetime import datetime | |
from difflib import diff_bytes | |
import os | |
import os.path | |
from pprint import pprint | |
from urllib.parse import urlparse, quote | |
from hashlib import sha256 | |
import hashlib | |
import hmac | |
from xml.etree import ElementTree | |
import requests | |
AwsConfig = namedtuple("AwsConfig", "aws_access_key,aws_secret_access_key,aws_region") | |
def read_aws_config() -> AwsConfig: | |
credentials = ConfigParser() | |
credentials.read(os.path.expanduser("~/.aws/credentials")) | |
aws_access_key = credentials.get("default", "aws_access_key_id") | |
aws_secret_key = credentials.get("default", "aws_secret_access_key") | |
config = ConfigParser() | |
config.read(os.path.expanduser("~/.aws/config")) | |
aws_region = config.get("default", "region") | |
return AwsConfig(aws_access_key, aws_secret_key, aws_region) | |
UriBits = namedtuple("UrlBits", "bucket,path,query") | |
def parse_url(url: str) -> UriBits: | |
url_bits = urlparse(url) | |
if url_bits.scheme != "s3": | |
raise argparse.ArgumentTypeError(f"{url} scheme '{url_bits.scheme}' != 's3'") | |
if url_bits.query != "": | |
raise argparse.ArgumentTypeError(f"Query strings not yet handled '{url_bits.query}'") | |
return UriBits(url_bits.netloc, url_bits.path, {}) | |
def parse_args() -> argparse.Namespace: | |
p = argparse.ArgumentParser() | |
p.add_argument("url", type=parse_url) | |
return p.parse_args() | |
def canonical_request(http_method: str, uri: str, query: dict, headers: dict, payload: bytes) -> str: | |
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html | |
# Task 1: Create a Canonical Request | |
assert http_method in {"HEAD", "GET", "PUT", "DELETE"} | |
assert "host" in headers | |
lower_keys = sorted(k.lower() for k in headers.keys()) | |
query_str = "&".join(f"{quote(k.encode('utf-8'))}={quote(v.encode('utf-8'))}" for k, v in query.items()) | |
header_str = "\n".join(f"{k}:{headers[k].strip()}" for k in lower_keys) | |
signed_header_str = ";".join(lower_keys) | |
parts = (http_method, uri, query_str, f"{header_str}\n", signed_header_str, sha256(payload).hexdigest()) | |
return "\n".join(item for item in parts) | |
def signing_str(canonical_request: str, today: datetime, aws_region: str) -> str: | |
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html | |
# Task 2: Create a String to Sign | |
return (f"AWS4-HMAC-SHA256\n" | |
f"{today.strftime('%Y%m%dT%H%M%SZ')}\n" | |
f"{today.strftime('%Y%m%d')}/{aws_region}/s3/aws4_request\n" | |
f"{sha256(canonical_request.encode('utf-8')).hexdigest()}") | |
def auth_header(aws_config, headers, today, signature): | |
# https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html | |
credential = f"{aws_config.aws_access_key}/{today.strftime('%Y%m%d')}/{aws_config.aws_region}/s3/aws4_request" | |
signed_header_str = ";".join(k.lower() for k in sorted(headers.keys())) | |
return ",".join(( | |
f"AWS4-HMAC-SHA256 Credential={credential}", | |
f"SignedHeaders={signed_header_str}", | |
f"Signature={signature}" | |
)) | |
def hmac_256(key: str, data: str) -> str: | |
return hmac.new(key, msg=data.encode("utf-8"), digestmod=sha256) | |
def signature(aws_config: AwsConfig, today: datetime, signing_str: str) -> str: | |
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html | |
# See https://stackoverflow.com/q/13019203/1611416 | |
secret_key = f"AWS4{aws_config.aws_secret_access_key}".encode("utf-8") | |
date_key = hmac_256(secret_key, today.strftime("%Y%m%d")).digest() | |
date_region_key = hmac_256(date_key, aws_config.aws_region).digest() | |
date_region_service_key = hmac_256(date_region_key, "s3").digest() | |
signing_key = hmac_256(date_region_service_key, "aws4_request").digest() | |
return hmac_256(signing_key, signing_str).hexdigest() | |
def test_signature_calculation(): | |
# https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-header-based-auth.html | |
# These values from worked exapmle in "Example: GET Object" Section | |
aws_access_key = "AKIAIOSFODNN7EXAMPLE" | |
aws_secret_access_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" | |
aws_region = "us-east-1" | |
aws_service = "s3" | |
date = "20130524" | |
string_to_sign = """AWS4-HMAC-SHA256 | |
20130524T000000Z | |
20130524/us-east-1/s3/aws4_request | |
7344ae5b7ee6c3e7e6b0fe0640412a37625d1fbfff95c48bbb2dc43964946972""" | |
expected = "f0e8bdb87c964420e857bd35b5d6ed310bd44f0170aba48dd91039c6036bdb41" | |
# See https://stackoverflow.com/q/13019203/1611416 | |
date_key = hmac_256(f"AWS4{aws_secret_access_key}".encode("utf-8"), date).digest() | |
date_region_key = hmac_256(date_key, aws_region).digest() | |
date_region_service_key = hmac_256(date_region_key, aws_service).digest() | |
signing_key = hmac_256(date_region_service_key, "aws4_request").digest() | |
signature = hmac_256(signing_key, string_to_sign).hexdigest() | |
assert signature == expected, f"{signature} != {expected}" | |
if __name__ == "__main__": | |
test_signature_calculation() | |
aws_config = read_aws_config() | |
args = parse_args() | |
pprint(aws_config) | |
pprint(args) | |
payload = os.urandom(32) | |
host = f"{args.url.bucket}.s3.{aws_config.aws_region}.amazonaws.com" | |
today = datetime.utcnow() | |
headers = { | |
"host": host, | |
"x-amz-content-sha256": sha256(payload).hexdigest(), | |
"x-amz-date": today.strftime("%Y%m%dT%H%M%SZ"), | |
"x-amz-expires": "3600", | |
} | |
creq = canonical_request( | |
"PUT", | |
args.url.path, | |
args.url.query, | |
headers, | |
payload) | |
sstr = signing_str(creq, today, aws_config.aws_region) | |
sig = signature(aws_config, today, sstr) | |
print(f"Canonical Request\n" | |
f"-----------------\n" | |
f"{creq}\n" | |
f"-----------------\n") | |
print(f"Signing String\n" | |
f"--------------\n" | |
f"{sstr}\n" | |
f"--------------\n") | |
print(f"Signature\n" | |
f"---------\n" | |
f"{sig}\n" | |
f"---------\n") | |
headers = { | |
"Authorization": auth_header(aws_config, headers, today, sig), | |
**headers | |
} | |
url = f"http://{headers['host']}{args.url.path}" | |
response = requests.put(url, data=payload, headers=headers) | |
if not response.ok: | |
tree = ElementTree.fromstring(response.text) | |
for sign_bytes in tree.findall("StringToSignBytes"): | |
aws_signing_str = bytes.fromhex(sign_bytes.text.replace(" ", "")) | |
if aws_signing_str != sstr.encode("utf-8"): | |
print("Mismatched String to Sign") | |
print(aws_signing_str) | |
print(sstr.encode('utf-8')) | |
for canonical_bytes in tree.findall("CanonicalRequestBytes"): | |
aws_canonical_str = bytes.fromhex(canonical_bytes.text.replace(" ", "")) | |
if aws_canonical_str != creq.encode('utf-8'): | |
print("Mismatched Canonical Strings") | |
print(aws_canonical_str) | |
print(creq.encode('utf-8')) | |
for i, (a, b) in enumerate(zip(aws_canonical_str, creq.encode("utf-8"))): | |
if a != b: | |
print(f"Bytes {i} differs {chr(a)} != {chr(b)}") | |
import requests_toolbelt.utils.dump | |
data = requests_toolbelt.utils.dump.dump_all(response) | |
print("\n\n") | |
print(data.decode("utf-8", errors="ignore")) | |
else: | |
print(response) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment