Skip to content

Instantly share code, notes, and snippets.

@Ragnoroct
Last active February 17, 2023 22:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ragnoroct/940dfc3146bb3ee4887cb29841bb2d68 to your computer and use it in GitHub Desktop.
Save Ragnoroct/940dfc3146bb3ee4887cb29841bb2d68 to your computer and use it in GitHub Desktop.
aws signed requests v4 using stdlib
"""
MIT License
Copyright (c) 2023 Will Bender
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from urllib.parse import urlparse, quote
from re import search as re_search
from hashlib import new as hashlib_new
from hmac import new as hmac_new
from datetime import datetime as datetimeclass
QUERYSTR_SAFE = (
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz" "0123456789" "_.-~"
)
hash_func = lambda msg: hashlib_new(name="sha256", data=msg)
hmac_func = lambda key, msg: hmac_new(digestmod="sha256", key=key, msg=msg)
urllib_quote = lambda val: quote(val, "") # quote / to %2F
def aws_sign_headers_v4(
method,
url,
aws_access_key_id,
aws_secret_access_key,
aws_security_token="",
aws_service="",
aws_region="",
body=None,
optional_headers=None,
):
"""
Sign request to authenticate iam requests against api gateway
The result is a dict containing the signature to authenticate
AWS Signature Version 4 Documentation:
https://docs.aws.amazon.com/general/latest/gr/create-signed-request.html
References:
https://github.com/iksteen/aws-request-signer
https://github.com/DavidMuller/aws-requests-auth
https://stackoverflow.com/questions/39352648/access-aws-api-gateway-with-iam-roles-from-python/39357370#39357370
"""
# prevent self footgun when you forget to post body when body is expected
if body is None and method.upper() in {"POST", "PUT", "PATCH"}:
raise Exception(
"body must explicitly be a string or an empty string for POST, PUT, PATCH"
)
# optional to sign headers other than x-amz and host
if optional_headers is None:
optional_headers = {}
# format: <gateway-id>.execute-api.<region>.amazonaws.com
url_match = re_search(r"[^.]+\.([^.]+)\.([^.]+)\.amazonaws.com", url)
if url_match is not None:
aws_service = aws_service or url_match[1]
aws_region = aws_region or url_match[2]
if not aws_service:
raise Exception("aws_service is missing value and unable reference from url")
if not aws_region:
raise Exception("aws_region is missing value and unable reference from url")
url_parsed = urlparse(url)
if aws_service != "execute-api":
raise Exception("currently only execute-api service is supported")
now = datetimeclass.utcnow()
aws_host = url_parsed.hostname
amz_algorithm = "AWS4-HMAC-SHA256" # signature version 4
request_date = now.strftime("%Y%m%d")
request_date_time_z = now.strftime("%Y%m%dT%H%M%SZ")
credential_scope = f"{request_date}/{aws_region}/{aws_service}/aws4_request"
payload_hash = get_payload_hash(body)
headers_to_sign = optional_headers.copy()
headers_to_sign["host"] = aws_host
# step 1: create a canonical request
http_method = method.upper()
canonical_uri = get_canonical_uri(url_parsed.path)
canonical_query_str = get_canonical_query_str(url_parsed.query, urlpresign=False)
canonical_headers = get_canonical_headers(headers_to_sign)
signed_headers = get_signed_headers(headers_to_sign.keys())
canonical_request = "\n".join(
[
http_method,
canonical_uri,
canonical_query_str,
canonical_headers,
signed_headers,
payload_hash,
]
)
# step 2: create a hash of the canonical request
canonical_request_hash = (
hash_func(canonical_request.encode("utf-8")).hexdigest().lower()
)
# step 3: create a string to sign
signature_components_str = "\n".join(
[
amz_algorithm,
request_date_time_z,
credential_scope,
canonical_request_hash,
]
)
# step 4: calculate the signature
key_init = ("AWS4" + aws_secret_access_key).encode("utf-8")
k_date = hmac_func(key_init, now.strftime("%Y%m%d").encode("utf-8")).digest()
k_region = hmac_func(k_date, aws_region.encode("utf-8")).digest()
k_service = hmac_func(k_region, aws_service.encode("utf-8")).digest()
k_signing = hmac_func(k_service, b"aws4_request").digest() # signature version 4
signature_hex_digest = (
hmac_func(k_signing, signature_components_str.encode("utf-8"))
.hexdigest()
.lower()
)
authorization_header = "{} Credential={}/{}, SignedHeaders={}, Signature={}".format(
amz_algorithm,
aws_access_key_id,
credential_scope,
signed_headers,
signature_hex_digest,
)
extra_headers = {
"x-amz-content-sha256": payload_hash,
"x-amz-date": request_date_time_z,
}
if aws_security_token:
extra_headers["x-amz-security-token"] = aws_security_token
return {"authorization": authorization_header, **extra_headers}
def aws_sign_url_v4(
method,
url,
aws_access_key_id,
aws_secret_access_key,
expires_seconds=60,
aws_security_token="",
aws_service="",
aws_region="",
body="",
optional_headers=None,
):
"""
Sign url with credentials to authenticate iam requests against api gateway
The result is a presigned url
AWS Signature Version 4 Documentation:
https://docs.aws.amazon.com/general/latest/gr/create-signed-request.html
References:
https://github.com/iksteen/aws-request-signer
https://github.com/DavidMuller/aws-requests-auth
https://stackoverflow.com/questions/39352648/access-aws-api-gateway-with-iam-roles-from-python/39357370#39357370
"""
# prevent self footgun when you forget to post body when body is expected
if body is None and method.upper() in {"POST", "PUT", "PATCH"}:
raise Exception(
"body must explicitly be a string or an empty string for POST, PUT, PATCH"
)
# optional to sign headers other than x-amz and host
if optional_headers is None:
optional_headers = {}
# format: <gateway-id>.execute-api.<region>.amazonaws.com
url_match = re_search(r"[^.]+\.([^.]+)\.([^.]+)\.amazonaws.com", url)
if url_match is not None:
aws_service = aws_service or url_match[1]
aws_region = aws_region or url_match[2]
if not aws_service:
raise Exception("aws_service is missing value and unable reference from url")
if not aws_region:
raise Exception("aws_region is missing value and unable reference from url")
url_parsed = urlparse(url)
if aws_service != "execute-api":
raise Exception("currently only execute-api service is supported")
now = datetimeclass.utcnow()
aws_host = url_parsed.hostname
amz_algorithm = "AWS4-HMAC-SHA256" # signature version 4
request_date = now.strftime("%Y%m%d")
request_date_time_z = now.strftime("%Y%m%dT%H%M%SZ")
credential_scope = f"{request_date}/{aws_region}/{aws_service}/aws4_request"
payload_hash = get_payload_hash(body)
headers_to_sign = optional_headers.copy()
headers_to_sign["host"] = aws_host
# step 1: create a canonical request
http_method = method.upper()
canonical_uri = get_canonical_uri(url_parsed.path)
canonical_query_str = get_canonical_query_str(
url_parsed.query,
urlpresign=True,
presign_expires=expires_seconds,
signed_headers=headers_to_sign.keys(),
amz_algorithm=amz_algorithm,
payload_hash=payload_hash,
aws_access_key_id=aws_access_key_id,
credential_scope=credential_scope,
request_date_time_z=request_date_time_z,
aws_security_token=aws_security_token,
)
canonical_headers = get_canonical_headers(headers_to_sign)
signed_headers = get_signed_headers(headers_to_sign.keys())
canonical_request = "\n".join(
[
http_method,
canonical_uri,
canonical_query_str,
canonical_headers,
signed_headers,
payload_hash,
]
)
# step 2: create a hash of the canonical request
canonical_request_hash = (
hash_func(canonical_request.encode("utf-8")).hexdigest().lower()
)
# step 3: create a string to sign
signature_components_str = "\n".join(
[
amz_algorithm,
request_date_time_z,
credential_scope,
canonical_request_hash,
]
)
# step 4: calculate the signature
key_init = ("AWS4" + aws_secret_access_key).encode("utf-8")
k_date = hmac_func(key_init, now.strftime("%Y%m%d").encode("utf-8")).digest()
k_region = hmac_func(k_date, aws_region.encode("utf-8")).digest()
k_service = hmac_func(k_region, aws_service.encode("utf-8")).digest()
k_signing = hmac_func(k_service, b"aws4_request").digest() # signature version 4
signature_hex_digest = (
hmac_func(k_signing, signature_components_str.encode("utf-8"))
.hexdigest()
.lower()
)
# step 5: add the signature to the request
new_query = canonical_query_str
if not new_query.endswith("&"):
new_query += "&"
new_query += "&".join(
[
"X-Amz-Signature={}".format(urllib_quote(signature_hex_digest)),
]
)
# noinspection PyProtectedMember
return url_parsed._replace(query=new_query).geturl()
def get_payload_hash(payload):
"""
A string created using the payload in the body of the HTTP request as input to a
hash function. This string uses lowercase hexadecimal characters. If the payload
is empty, use an empty string as the input to the hash function.
"""
if payload is None:
payload = ""
return hash_func(payload.encode("utf-8")).hexdigest().lower()
def get_signed_headers(signed_headers):
"""
The list of optional_headers that you included in CanonicalHeaders, separated by
semicolons (;). This indicates which optional_headers are part of the signing
process. Header names must use lowercase characters and must appear in alphabetical
order.
"""
return ";".join(sorted(signed_headers))
def get_canonical_headers(headers_to_sign):
"""
The request optional_headers, that will be signed, and their values, separated by
newline characters. Header names must use lowercase characters, must appear in
alphabetical order, and must be followed by a colon (:). For the values, trim
any leading or trailing spaces, convert sequential spaces to a single space,
and separate the values for a multi-value header using commas. You must include
the host header (HTTP/1.1) or the :authority header (HTTP/2), and any x-amz-*
optional_headers in the signature. You can optionally include other standard
optional_headers in the signature, such as content-type.
undocumented: canonical_headers end with newline
"""
headers_list = []
for header_name in headers_to_sign:
headers_list.append(
"{}:{}".format(header_name.lower(), headers_to_sign[header_name])
)
headers_list.sort()
return "\n".join(headers_list) + "\n"
def get_canonical_uri(url_path):
"""
The URI-encoded version of the absolute path component URL (everything between
the host and the question mark character (?) that starts the query string
parameters). If the absolute path is empty, use a forward slash character (/).
"""
canonical_uri = quote(url_path)
if canonical_uri == "":
canonical_uri = "/"
return canonical_uri
def get_canonical_query_str(
query_str,
urlpresign=False,
presign_expires=60,
signed_headers=None,
amz_algorithm="",
payload_hash="",
aws_access_key_id="",
credential_scope="",
request_date_time_z="",
aws_security_token="",
):
"""
The URL-encoded query string parameters, separated by ampersands (&).
Percent-encode reserved characters, including the space character. Encode names
and values separately. If there are empty parameters, append the equals sign to
the parameter name before encoding. After encoding, sort the parameters
alphabetically by key name. If there is no query string, use an empty string
("").
undocumented*: all the x-amz-* querystring params need to be included in canonical
query string.
notes: X-Amz-Security-Token should be included in the
canonical query string or appended after the signature is calculated
depending on the service. X-Amz-Signature is added after the signature
is calculated (makes sense).
"""
if urlpresign:
# add all amazon params to query string
query_str = query_str
if not query_str.endswith("&"):
query_str += "&"
query_str += "&".join(
[
"X-Amz-Algorithm={}".format(urllib_quote(amz_algorithm)),
"X-Amz-Content-Sha256={}".format(urllib_quote(payload_hash)),
"X-Amz-Credential={}".format(
urllib_quote(aws_access_key_id + "/" + credential_scope)
),
"X-Amz-Date={}".format(urllib_quote(request_date_time_z)),
"X-Amz-Expires={}".format(urllib_quote(str(presign_expires))),
"X-Amz-SignedHeaders={}".format(
urllib_quote(";".join(sorted(signed_headers)))
),
]
)
if aws_security_token != "":
query_str += "&X-Amz-Security-Token={}".format(
urllib_quote(aws_security_token)
)
canonical_querystring = ""
querystring_sorted = "&".join(sorted(query_str.split("&")))
for query_param in querystring_sorted.split("&"):
key_val_split = query_param.split("=", 1)
key = key_val_split[0]
if len(key_val_split) > 1:
val = key_val_split[1]
else:
val = ""
if not is_val_unquoted(val):
raise Exception(
f"query string is not quoted properly: query='{query_str}' val='{val}'"
)
if key:
if canonical_querystring:
canonical_querystring += "&"
canonical_querystring += "=".join([key, val])
return canonical_querystring
def is_val_unquoted(val):
return not val.rstrip(QUERYSTR_SAFE + "/%")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment