Skip to content

Instantly share code, notes, and snippets.

@chespinoza
Last active April 26, 2021 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chespinoza/7c6dd9c8b51154147faf56c738483933 to your computer and use it in GitHub Desktop.
Save chespinoza/7c6dd9c8b51154147faf56c738483933 to your computer and use it in GitHub Desktop.
import os
from urllib import parse
import boto3
import requests
from aws_requests_auth.aws_auth import AWSRequestsAuth
from dotenv import load_dotenv
NA_AUTH_URI = "https://api.amazon.com/auth/o2/token"
EU_AUTH_URI = "https://api.amazon.co.uk/auth/o2/token"
FE_AUTH_URI = "https://api.amazon.co.jp/auth/o2/token"
#ENDPOINT = "https://sandbox.sellingpartnerapi-eu.amazon.com"
ENDPOINT = "https://sellingpartnerapi-eu.amazon.com"
load_dotenv()
TOKEN_URL = EU_AUTH_URI
REFRESH_TOKEN = os.environ["REFRESH_TOKEN"]
CLIENT_ID = os.environ["CLIENT_ID"]
CLIENT_SECRET = os.environ["CLIENT_SECRET"]
REGION = os.environ["REGION"]
MARKETPLACES = os.environ["MARKETPLACES"]
AWS_ACCESS_KEY_ID = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_ACCESS_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]
AWS_ROLE_ARN = os.environ["AWS_ROLE_ARN"]
AWS_SESSION_NAME = os.environ["AWS_SESSION_NAME"]
REQUEST_URI = f"/orders/v0/orders?MarketplaceIds={MARKETPLACES}&CreatedAfter=2021-01-05"
url = parse.urlparse(ENDPOINT+REQUEST_URI)
# request lwa access token
lwa_payload = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"refresh_token": REFRESH_TOKEN,
"grant_type": "refresh_token",
}
access_token = requests.post(TOKEN_URL, data=lwa_payload).json()["access_token"]
# AWS Auth
client = boto3.client(
"sts",
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=REGION,
)
credentials = client.assume_role(
RoleArn=AWS_ROLE_ARN,
RoleSessionName=AWS_SESSION_NAME,
)["Credentials"]
auth = AWSRequestsAuth(
aws_access_key=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_token=credentials["SessionToken"],
aws_host=url.hostname,
aws_region=REGION,
aws_service="execute-api",
)
if __name__ == "__main__":
headers = {"x-amz-access-token": access_token}
result = requests.get(url.geturl(), auth=auth, headers=headers)
breakpoint()
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment