|
from cryptography import x509 |
|
from cryptography.hazmat.primitives.asymmetric import ec |
|
from cryptography.hazmat.primitives import hashes, serialization |
|
from cryptography.x509.oid import NameOID |
|
import jwt |
|
import cattrs |
|
|
|
import datetime |
|
from base64 import b64encode |
|
from uuid import uuid4 |
|
|
|
from typing import List, Union |
|
|
|
from appstoreserverlibrary.signed_data_verifier import ( |
|
SignedDataVerifier, |
|
VerificationException, |
|
VerificationStatus, |
|
_ChainVerifier, |
|
) |
|
from appstoreserverlibrary.models.Environment import Environment |
|
from appstoreserverlibrary.models.JWSTransactionDecodedPayload import ( |
|
JWSTransactionDecodedPayload, |
|
) |
|
from appstoreserverlibrary.models.JWSRenewalInfoDecodedPayload import ( |
|
JWSRenewalInfoDecodedPayload, |
|
) |
|
from appstoreserverlibrary.models.ResponseBodyV2DecodedPayload import ( |
|
ResponseBodyV2DecodedPayload, |
|
) |
|
from appstoreserverlibrary.models.AppTransaction import AppTransaction |
|
|
|
|
|
class JWSMocker: |
|
def __init__(self): |
|
self._private_key = ec.generate_private_key(ec.SECP256R1()) |
|
# Adapted from https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate |
|
subject = issuer = x509.Name( |
|
[ |
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"), |
|
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, "Illinois"), |
|
x509.NameAttribute(NameOID.LOCALITY_NAME, "Chicago"), |
|
x509.NameAttribute( |
|
NameOID.ORGANIZATION_NAME, "AppStoreServerLibrary Testing" |
|
), |
|
] |
|
) |
|
|
|
self._cert = ( |
|
x509.CertificateBuilder() |
|
.subject_name(subject) |
|
.issuer_name(issuer) |
|
.public_key(self._private_key.public_key()) |
|
.serial_number(x509.random_serial_number()) |
|
.not_valid_before(datetime.datetime.now(datetime.timezone.utc)) |
|
.not_valid_after( |
|
datetime.datetime.now(datetime.timezone.utc) |
|
+ datetime.timedelta(days=10) |
|
) |
|
.sign(self._private_key, hashes.SHA256()) |
|
) |
|
|
|
self._x5c = [ |
|
b64encode( |
|
self._cert.public_key().public_bytes( |
|
encoding=serialization.Encoding.DER, |
|
format=serialization.PublicFormat.SubjectPublicKeyInfo, |
|
) |
|
).decode("utf8") |
|
] |
|
|
|
def sign_jws( |
|
self, |
|
payload: Union[ |
|
JWSTransactionDecodedPayload, |
|
JWSRenewalInfoDecodedPayload, |
|
ResponseBodyV2DecodedPayload, |
|
AppTransaction, |
|
], |
|
) -> str: |
|
data = cattrs.unstructure(payload) |
|
return jwt.encode( |
|
data, |
|
self._private_key, |
|
algorithm="ES256", |
|
headers={"kid": "AppStoreServerLibrary Testing", "x5c": self._x5c}, |
|
) |
|
|
|
def verifier( |
|
self, environment: Environment, bundle_id: str, app_apple_id: str = None |
|
) -> SignedDataVerifier: |
|
v = SignedDataVerifier( |
|
root_certificates=[self._cert], |
|
enable_online_checks=False, |
|
environment=environment, |
|
bundle_id=bundle_id, |
|
app_apple_id=app_apple_id, |
|
) |
|
v._chain_verifier = _TestingChainVerifier(self) |
|
return v |
|
|
|
|
|
class _TestingChainVerifier(_ChainVerifier): |
|
def __init__(self, mock: JWSMocker): |
|
self._mock = mock |
|
|
|
def verify_chain( |
|
self, certificates: List[str], perform_online_checks: bool, effective_date: int |
|
) -> str: |
|
if certificates != self._mock._x5c: |
|
raise VerificationException(VerificationStatus.VERIFICATION_FAILURE) |
|
return self._mock._cert.public_key().public_bytes( |
|
encoding=serialization.Encoding.PEM, |
|
format=serialization.PublicFormat.SubjectPublicKeyInfo, |
|
) |
|
|
|
if __name__ == "__main__": |
|
from appstoreserverlibrary.models.Type import Type |
|
from appstoreserverlibrary.models.InAppOwnershipType import InAppOwnershipType |
|
from appstoreserverlibrary.models.TransactionReason import TransactionReason |
|
|
|
mock = JWSMocker() |
|
now = datetime.datetime.now() |
|
transaction = mock.sign_jws( |
|
JWSTransactionDecodedPayload( |
|
originalTransactionId="123456", |
|
transactionId="123456", |
|
webOrderLineItemId="0000001", |
|
bundleId="com.example.app", |
|
productId="com.example.app.premium.annual", |
|
subscriptionGroupIdentifier="com.example.app.premium", |
|
purchaseDate=now.timestamp() * 1000, |
|
originalPurchaseDate=now.timestamp() * 1000, |
|
expiresDate=(now + datetime.timedelta(days=365)).timestamp() * 1000, |
|
quantity=1, |
|
type=Type.AUTO_RENEWABLE_SUBSCRIPTION, |
|
appAccountToken=None, |
|
inAppOwnershipType=InAppOwnershipType.PURCHASED, |
|
signedDate=now.timestamp() * 1000, |
|
revocationReason=None, |
|
revocationDate=None, |
|
isUpgraded=False, |
|
offerType=None, |
|
offerIdentifier=None, |
|
environment=Environment.SANDBOX, |
|
storefront="USA", |
|
storefrontId="143441", |
|
transactionReason=TransactionReason.PURCHASE, |
|
) |
|
) |
|
verifier = mock.verifier( |
|
environment=Environment.SANDBOX, bundle_id="com.example.app" |
|
) |
|
decoded = verifier.verify_and_decode_signed_transaction(transaction) |
|
print(decoded) |