Skip to content

Instantly share code, notes, and snippets.

@nmoinvaz
Created December 23, 2022 01:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nmoinvaz/64c440b6adf5ff4dadd9fdc682b536cd to your computer and use it in GitHub Desktop.
Save nmoinvaz/64c440b6adf5ff4dadd9fdc682b536cd to your computer and use it in GitHub Desktop.
Apple Notarization Submission Script
#!/usr/bin/env python
import argparse
import boto3
import jwt
import requests
import time
import os
import hashlib
from datetime import timezone
from datetime import datetime
from datetime import timedelta
parser = argparse.ArgumentParser(description='Notary submission')
parser.add_argument('--key_id', help='Apple store connect API key', action='store', required=True)
parser.add_argument('--issuer_id', help='Apple store connect Issuer ID', action='store', required=True)
parser.add_argument('--private_key', help='Apple store Connect private key', action='store', required=True)
parser.add_argument('--path', help='Submission file', action='store', required=True)
args, unknown = parser.parse_known_args()
print("Creating Apple Store Connect JWT")
jwt_header = {
"kid": args.key_id,
}
jwt_payload = {
"iss": args.issuer_id,
"iat": datetime.now(tz=timezone.utc),
"exp": datetime.now(tz=timezone.utc) + timedelta(seconds=300),
"aud": "appstoreconnect-v1",
}
with open(args.private_key, "rb") as f:
private_key_data = f.read()
f.close()
asc_token = jwt.encode(jwt_payload, private_key_data, algorithm='ES256', headers=jwt_header)
print(asc_token)
print("Creating new submission")
with open(args.path, "rb") as f:
hash = hashlib.sha256()
hash.update(f.read())
sha256 = hash.hexdigest()
asc_endpoint = "https://appstoreconnect.apple.com/notary/v2/submissions"
asc_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {asc_token}"
}
asc_data = {
"submissionName": os.path.basename(args.path),
"sha256": sha256
}
print(f"Submission info {asc_data}")
print(f"Real sha256: {sha256}")
sub_res = requests.post(asc_endpoint, json=asc_data, headers=asc_headers).json()
if "data" not in sub_res:
print(f"Failed to create submission {sub_res}")
exit(1)
sub_id = sub_res["data"]["id"]
print(f"New submission created: {sub_id}")
print(f"Uploading submission file {args.path} to S3...")
aws_info = sub_res["data"]["attributes"]
#print(aws_info)
s3 = boto3.client("s3",
aws_access_key_id=aws_info["awsAccessKeyId"],
aws_secret_access_key=aws_info["awsSecretAccessKey"],
aws_session_token=aws_info["awsSessionToken"]
)
s3_resp = s3.upload_file(args.path, aws_info["bucket"], aws_info["object"])
while True:
print("Checking submission status...")
sub_res = requests.get(f"{asc_endpoint}/{sub_id}", headers=asc_headers).json()
status = sub_res["data"]["attributes"]["status"]
print(f"Submission status: {status}")
if status == "In Progress":
print("Waiting 30 seconds...")
time.sleep(30)
else:
break
print("Submission complete")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment