Skip to content

Instantly share code, notes, and snippets.

@tombulled
Last active April 12, 2024 03:28
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tombulled/d313c54a0681fcf0ba6d8092f11411e6 to your computer and use it in GitHub Desktop.
Save tombulled/d313c54a0681fcf0ba6d8092f11411e6 to your computer and use it in GitHub Desktop.
YTApiaryDeviceCrypto implemented in Python 3. Original by @leptos-null (https://gist.github.com/leptos-null/8792b9c50fddc00cf525ed5055a872dc). A working version can be found at the end of the gist (https://gist.github.com/tombulled/d313c54a0681fcf0ba6d8092f11411e6#gistcomment-3069388)
# Go here for the working version: https://gist.github.com/tombulled/d313c54a0681fcf0ba6d8092f11411e6#gistcomment-3069388
import hashlib
from pprint import pprint
import base64
import bytebuffer # https://github.com/alon-sage/python-bytebuffer, pip install bytebuffer
# Other ByteBuffer classes I've found: https://github.com/iGio90/PyByteBuffer, https://github.com/aglyzov/bytebuffer
import secrets
import pyaes # https://github.com/ricmoo/pyaes, pip install pyaes
"""
This code is a Python translation of LMApiaryDeviceCrypto, originally written in Objective-C.
https://gist.github.com/leptos-null/8792b9c50fddc00cf525ed5055a872dc
The LMApiaryDeviceCrypto class was reverse engineered by Leptos from YouTube by Google.
This version does *not* work, however a working version can be found at the end of the gist
I used Teleriks Fiddler as a proxy, enabled https decrypting, installed and trusted their https certificate on my phone,
then connected to the proxy server from my phone to sniff the https packets.
Things I'm unsure of:
1) Is the HTTP Body the request body, or the response body?
2) (See code comments)
"""
PROJECT_KEY = b'WrM95onSB5FfXofSzKWgkNZGiosfmCCAcTH4htvkuj4=' # YouTube Music Base64 Encoded Project Key
API_KEY = 'AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og' # One of YouTube Musics API Keys
HMAC_LENGTH = 4 # LMApiaryDeviceCrypto.h specifies '@param hmacLength Specify 4'?
CC_SHA1_DIGEST_LENGTH = 20
URL = 'https://youtubei.googleapis.com/youtubei/v1/browse?key=AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og' # The test url to sign
HTTP_BODY = [0x0A, 0xB8, 0x05, 0x0A, 0xCA, 0x02, ..., 0x92, 0x01, 0x00, 0xF8, 0x01, 0x03] # Unfortunately this also contained personal information, so has been redacted
HTTP_BODY = bytes(HTTP_BODY) # Not sure if this is the correct http body, but this is the (protobuf?) request body it sent to the api server.
class ByteBuffer2: # Is this better than @alon-sage's bytebuffer? (I found this on github somewhere, but can't find it again)
"""
Bytebuffer of flexible size
"""
def __init__(self, size=None):
self.__size = size
self.__bytebuffer = bytearray()
def put(self, b):
return self.__put(b)
def __put(self, b):
self.__bytebuffer.extend(b)
def put_int(self, i):
"""
Adding an integer to the bytebuffer.
Adds 3 bytes.
:param i: Integer
"""
self.__put(i.to_bytes(3, 'big'))
def put_long(self, l):
"""
Adding a 'Long' to the bytebuffer.
Adds 8 bytes.
(Python doesn't truly deal with longs, so give an integer with the same maxsize as a long.)
:param l: Integer 'Long'
"""
self.__put(l.to_bytes(8, 'big'))
def get_bytebuffer(self):
"""
Return the bytebuffer (type bytearray())
:return: bytearray() bytebuffer
"""
return bytes(self.__bytebuffer)
def get_length(self):
return len(self.__bytebuffer)
class NetCryptoError(Exception): pass
class ApiaryDeviceCrypto(object):
def __init__(self, project_key, sign_length): # project_key (bytearray), sign_length (int)
self.hmac_length = sign_length
internal_hmac_length = 0x10
project_key_length = len(project_key)
if project_key_length >= internal_hmac_length:
project_key = project_key[:internal_hmac_length]
self.hmac_key = project_key[internal_hmac_length:project_key_length - internal_hmac_length]
self.project_key = project_key
def set_device_components(self, id, key): # id (string), key (string)
self.device_key = self.decrypt_encoded_string(key)
self.device_id = id
def sign_connection(self, url, http_body): # url (bytes), http_body (bytes)
signed_url = self.sign_data(url, True, HMAC_LENGTH)
signed_content = self.sign_data(http_body, False, CC_SHA1_DIGEST_LENGTH)
compound_value = f'device_id={self.device_id},data={signed_url},content={signed_content}'
return {'X-Goog-Device-Auth': compound_value}
def sign_data(self, data, pad, hmac_length):
digest = hashlib.sha1()
digest.update(self.device_key)
hashed_data = digest.digest()[:4]
# Pad data here?
append_length = hmac_length # min(hmac_length, ...)?
zero_byte = bytes([0])
new_data = bytebuffer.ByteBuffer.allocate(len(hashed_data) + append_length + 1)
new_data.put(bytearray(zero_byte))
new_data.put(bytearray(hashed_data))
new_data.put(bytearray(hmac.new(self.device_key, data, hashlib.sha1)), 0, append_length)
encoded_data = base64.b64encode(new_data.get_bytes()).rstrip(b'=') # Is this equivelant to Base64 encoding 'without padding'?
return encoded_data.decode('utf-8')
def perform_crypto(self, data, output_len, iv, operation):
key = self.project_key[:0x10]
aes = pyaes.AESModeOfOperationCTR(key)
if operation == 'encrypt': return aes.encrypt(data)[:output_len] # Does the iv need to be used here?
elif operation == 'decrypt': return aes.decrypt(data)[:output_len] # Does the iv need to be used here?
def padded_data(self, data):
pad_mod = 0x10
data_length = len(data)
length_mod = data_length % pad_mod
if length_mod != 0:
pad_data = data + b'\x00' * (pad_mod - length_mod) # Is this the correct way of padding the data?
return pad_data
else:
return data
def project_key_signature(self):
magic = 0x10000000000000001
data = ByteBuffer2() # Note: using a different bytebuffer here
data.put_long(magic)
data.put(self.project_key)
data.put_long(magic)
data.put(self.hmac_key)
digest = hashlib.sha1()
digest.update(data.get_bytebuffer())
return digest.digest()[:4]
def decrypt_encoded_string(self, encoded):
decoded = base64.b64decode(encoded)
first_byte = decoded[0]
if int(first_byte) == 0:
if len(decoded) > 0xc:
low_pad = self.padded_data(decoded[5:8])
some_val = len(decoded) - self.hmac_length - 0xd
high_pad = self.padded_data(decoded[0xd:some_val])
if some_val >= 0:
if self.verify_signed_data(decoded):
high_pad = self.padded_data(decoded[0xd:some_val])
return self.perform_crypto(high_pad, some_val, low_pad, 'decrypt')
else:
raise NetCryptoError("Could not verify encrypted data")
else:
raise NetCryptoError("Could not determine cipher")
else:
raise NetCryptoError("Could not determine initializion vector")
else:
raise NetCryptoError("Could not determine key sign")
def encrypt_and_encode(self, data):
zero_byte = bytes([0])
project_sig = self.project_key_signature()
iv_data = secrets.token_bytes(8) # Is this a correct iv, is it needed?
crypto = self.perform_crypto(self.padded_data(data), len(data), self.padded_data(iv_data), 'encrypt')
ret_pre = len(project_sig) + len(iv_data) + len(crypto)
bytebuffer.ByteBuffer.allocate(ret_pre + self.hmac_length)
mut_data.put(bytearray(zero_byte))
mut_data.put(bytearray(project_sig))
mut_data.put(bytearray(iv_data))
mut_data.put(bytearray(crypto))
magic_byte = bytes([83])
more_data = bytebuffer.ByteBuffer.allocate(ret_pre + 9)
more_data.put(bytearray(magic_byte))
more_data.set_position(9)
more_data.put(mut_data._array, 0, ret_pre)
mut_data.put(hmac.new(self.hmac_key, more_data.get_bytes(), hashlib.sha1), 0, self.hmac_length)
encoded_data = base64.b64encode(mut_data.get_bytes())
return encoded_data.decode('utf-8')
def verify_signed_data(self, data):
project_hash = data[1:4]
if project_hash == self.project_key_signature:
length_diff = len(data) - self.hmac_length
if length_diff >= 0:
high_data = data[length_diff:self.hmac_length]
low_data = data[0:length_diff]
mut_data = bytebuffer.ByteBuffer.allocate(length_diff + 9)
magic_byte = bytes([83])
mut_data.put(bytearray(magic_byte))
mut_data.set_position(9)
mut_data.put(low_data)
check_data = hmac.new(self.hmac_key, mut_data.get_bytes(), hashlib.sha1)
return high_data == check_data
return False
def pad_b64(string):
remainder = len(string) % 4
if remainder:
return string + '=' * (4 - remainder)
return string
# Found by using 'requests' as follows: (pip install requests)
# requests.post('https://youtubei.googleapis.com/deviceregistration/v1/devices?key=AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og&rawDeviceId=RAW_DEVICE_ID').json()
device = \
{
'id': '<<Not sure if private, so have redacted>>',
'key': '<<Not sure if private, so have redacted>>',
}
adc = ApiaryDeviceCrypto(base64.b64decode(PROJECT_KEY), HMAC_LENGTH)
adc.set_device_components(device['id'], pad_b64(device['key'])) # Haven't got past this yet, keep getting: NetCryptoError: Could not verify encrypted data
# signed_url = adc.sign_data(URL.encode(), True, HMAC_LENGTH)
# adc.sign_connection(URL.encode(), HTTP_BODY)
@leptos-null
Copy link

leptos-null commented Oct 29, 2019

Is the HTTP Body the request body, or the response body?

You'll be signing the request body.

Is the device dictionary (id and key) private?

It's not private so much as that it has personal information, but they are supposed to be unique to you, so you shouldn't really share it. As I describe here, you can request a device dictionary fairly easily (curl -X POST "https://youtubei.googleapis.com/deviceregistration/v1/devices?key=AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og&rawDeviceId=DemonstrationDeviceID")

Edit: Sorry, I see your comment above that line now.

pad_data = data + b'\x00' * (pad_mod - length_mod) # Is this the correct way of padding the data?

Yep. That may be written as

if length_mod != 0:
    data += b'\x00' * (pad_mod - length_mod)
return data

iv_data = secrets.token_bytes(8) # Is this a correct iv, is it needed?

That's correct. It's not necessarily needed. That data is used in two places. The first is the iv (Initialization vector) for the AES routine [pyaes seems like it may be handling this internally (I have not checked if this is the case)]. The second is inserted directly into the data that's signed. This is to prevent an observer from determining patterns in the data. From the perspective of the routines working, this could be b'\x00' * 8.

bytes.rstrip(b'=') # Is this equivelant to Base64 encoding 'without padding'?

yes

append_length = hmac_length # min(hmac_length, ...)?

append_length = min(hmac_length, CC_SHA1_DIGEST_LENGTH)

# Pad data here?

if pad:
    data += b'\x00'

@leptos-null
Copy link

Got it working. There were a few problems.
The first was that Objective-C uses [start, length] for slices, and python uses [start, end].
The second is verify_signed_data compares project_hash against self.project_key_signature (should be self.project_key_signature())
The third is that hmac.new returns an HMAC object, not bytes.

My adjusted verify_signed_data method:

    def verify_signed_data(self, data: bytes) -> bool:
        project_hash = data[1:5]

        if project_hash == self.project_key_signature():
            length_diff = len(data) - self.hmac_length

            if length_diff >= 0:
                high_data = data[length_diff:]
                low_data = data[:length_diff]
                mut_data = bytes([83])

                mut_data += b'\x00' * 8
                mut_data += low_data

                check_data = hmac.new(self.hmac_key, mut_data, hashlib.sha1)

                return high_data == check_data.digest()[:self.hmac_length]

        return False

@tombulled
Copy link
Author

@leptos-null Thanks for the time and effort you've put into this, I really appreciate all of your help. I've taken on board your comments and adapted the program, however I keep getting the same error (Could not verify encrypted data). Please can you take another look at my updated code and compare it with yours to see what I'm still doing wrong as I see you managed to get your code working in the end.

@tombulled
Copy link
Author

import hashlib
from pprint import pprint
import base64
import bytebuffer # https://github.com/alon-sage/python-bytebuffer, pip install bytebuffer
# Other ByteBuffer classes I've found: https://github.com/iGio90/PyByteBuffer, https://github.com/aglyzov/bytebuffer
import secrets
import pyaes # https://github.com/ricmoo/pyaes, pip install pyaes
import requests # https://github.com/psf/requests, pip install requests

"""
This code is a Python translation of LMApiaryDeviceCrypto, originally written in Objective-C.
    https://gist.github.com/leptos-null/8792b9c50fddc00cf525ed5055a872dc

The LMApiaryDeviceCrypto class was reverse engineered by Leptos from YouTube by Google.

This currently does *not* work, and some values have been redacted due to privacy

I used Teleriks Fiddler as a proxy, enabled https decrypting, installed and trusted their https certificate on my phone,
    then connected to the proxy server from my phone to sniff the https packets.

"""

PROJECT_KEY = b'WrM95onSB5FfXofSzKWgkNZGiosfmCCAcTH4htvkuj4=' # YouTube Music Base64 Encoded Project Key
API_KEY = 'AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og' # One of YouTube Musics API Keys
HMAC_LENGTH = 4
CC_SHA1_DIGEST_LENGTH = 20
URL = 'https://youtubei.googleapis.com/youtubei/v1/browse?key=AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og' # The test url to sign
# Note: I know I've redacted most of the http body, however does it look like it starts + ends with the correct bytes?
HTTP_BODY = [0x0A, 0xB8, 0x05, 0x0A, 0xCA, 0x02, 0x52, 0x02, ..., 0x92, 0x01, 0x00, 0xF8, 0x01, 0x03] # Been redacted
HTTP_BODY = bytes(HTTP_BODY)

class ByteBuffer2:
    """
    Bytebuffer of flexible size
    """

    def __init__(self, size=None):
        self.__size = size
        self.__bytebuffer = bytearray()

    def put(self, b):
        return self.__put(b)

    def __put(self, b):
        self.__bytebuffer.extend(b)

    def put_int(self, i):
        """
        Adding an integer to the bytebuffer.
        Adds 3 bytes.
        :param i: Integer
        """
        self.__put(i.to_bytes(3, 'big'))

    def put_long(self, l):
        """
        Adding a 'Long' to the bytebuffer.
        Adds 8 bytes.
        (Python doesn't truly deal with longs, so give an integer with the same maxsize as a long.)
        :param l: Integer 'Long'
        """
        self.__put(l.to_bytes(8, 'big'))

    def get_bytebuffer(self):
        """
        Return the bytebuffer (type bytearray())
        :return: bytearray() bytebuffer
        """
        return bytes(self.__bytebuffer)

    def get_length(self):
        return len(self.__bytebuffer)

class NetCryptoError(Exception): pass

class ApiaryDeviceCrypto(object):
    def __init__(self, project_key, sign_length): # project_key (bytearray), sign_length (int)
        self.hmac_length = sign_length

        internal_hmac_length = 0x10
        project_key_length = len(project_key)

        if project_key_length >= internal_hmac_length:
            project_key = project_key[:internal_hmac_length]

            self.hmac_key = project_key[internal_hmac_length:project_key_length - internal_hmac_length]

        self.project_key = project_key

    def set_device_components(self, id, key): # id (string), key (string)
        self.device_key = self.decrypt_encoded_string(key)
        self.device_id = id

    def sign_connection(self, url, http_body): # url (bytes), http_body (bytes)
        signed_url = self.sign_data(url, True, HMAC_LENGTH)
        signed_content = self.sign_data(http_body, False, CC_SHA1_DIGEST_LENGTH)

        compound_value = f'device_id={self.device_id},data={signed_url},content={signed_content}'

        return {'X-Goog-Device-Auth': compound_value}

    def sign_data(self, data, pad, hmac_length):
        digest = hashlib.sha1()

        digest.update(self.device_key)

        hashed_data = digest.digest()[:4]

        if pad:
            data += b'\x00'

        append_length = min(hmac_length, CC_SHA1_DIGEST_LENGTH)
        zero_byte = b'\x00'

        new_data = bytebuffer.ByteBuffer.allocate(len(hashed_data) + append_length + 1)
        new_data.put(bytearray(zero_byte))
        new_data.put(bytearray(hashed_data))
        new_data.put(bytearray(hmac.new(self.device_key, data, hashlib.sha1)), 0, append_length)

        encoded_data = base64.b64encode(new_data.get_bytes()).rstrip(b'=')

        return encoded_data.decode('utf-8')

    def perform_crypto(self, data, output_len, iv, operation):
        key = self.project_key[:0x10]
        aes = pyaes.AESModeOfOperationCTR(key)

        if operation == 'encrypt': return aes.encrypt(data)[:output_len]
        elif operation == 'decrypt': return aes.decrypt(data)[:output_len]

    def padded_data(self, data):
        pad_mod = 0x10
        data_length = len(data)
        length_mod = data_length % pad_mod

        if length_mod != 0:
            if length_mod != 0:
                data += b'\x00' * (pad_mod - length_mod)

        return data

    def project_key_signature(self):
        magic = 0x1000000000000000 # Had to change this

        data = ByteBuffer2() # Note: using a different bytebuffer here
        data.put_long(magic)
        data.put(self.project_key)
        data.put_long(magic)
        data.put(self.hmac_key)

        digest = hashlib.sha1()
        digest.update(data.get_bytebuffer())

        return digest.digest()[:4]

    def decrypt_encoded_string(self, encoded):
        decoded = base64.b64decode(encoded)
        first_byte = decoded[0]

        if int(first_byte) == 0:
            if len(decoded) > 0xc:
                low_pad = self.padded_data(decoded[5:8])
                some_val = len(decoded) - self.hmac_length - 0xd

                high_pad = self.padded_data(decoded[0xd:some_val])

                if some_val >= 0:
                    if self.verify_signed_data(decoded):
                        high_pad = self.padded_data(decoded[0xd:some_val])

                        return self.perform_crypto(high_pad, some_val, low_pad, 'decrypt')
                    else:
                        raise NetCryptoError("Could not verify encrypted data")
                else:
                    raise NetCryptoError("Could not determine cipher")
            else:
                raise NetCryptoError("Could not determine initializion vector")
        else:
            raise NetCryptoError("Could not determine key sign")

    def encrypt_and_encode(self, data):
        zero_byte = b'\x00'
        project_sig = self.project_key_signature()

        iv_data = b'\x00' * 8 # secrets.token_bytes(8)

        crypto = self.perform_crypto(self.padded_data(data), len(data), self.padded_data(iv_data), 'encrypt')

        ret_pre = len(project_sig) + len(iv_data) + len(crypto)

        bytebuffer.ByteBuffer.allocate(ret_pre + self.hmac_length)
        mut_data.put(bytearray(zero_byte))
        mut_data.put(bytearray(project_sig))
        mut_data.put(bytearray(iv_data))
        mut_data.put(bytearray(crypto))

        magic_byte = bytes([83])

        more_data = bytebuffer.ByteBuffer.allocate(ret_pre + 9)
        more_data.put(bytearray(magic_byte))
        more_data.set_position(9)
        more_data.put(mut_data._array, 0, ret_pre)

        mut_data.put(hmac.new(self.hmac_key, more_data.get_bytes(), hashlib.sha1), 0, self.hmac_length)

        encoded_data = base64.b64encode(mut_data.get_bytes())

        return encoded_data.decode('utf-8')

    def verify_signed_data(self, data: bytes) -> bool:
        project_hash = data[1:5]

        if project_hash == self.project_key_signature():
            length_diff = len(data) - self.hmac_length

            if length_diff >= 0:
                high_data = data[length_diff:]
                low_data = data[:length_diff]
                mut_data = bytes([83])

                mut_data += b'\x00' * 8
                mut_data += low_data

                check_data = hmac.new(self.hmac_key, mut_data, hashlib.sha1)

                return high_data == check_data.digest()[:self.hmac_length]

        return False

def pad_b64(string):
    remainder = len(string) % 4

    if remainder:
    	return string + '=' * (4 - remainder)

    return string

# I've changed this to make running the program easier
device = requests.post('https://youtubei.googleapis.com/deviceregistration/v1/devices?key=AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og&rawDeviceId=RAW_DEVICE_ID').json()

adc = ApiaryDeviceCrypto(base64.b64decode(PROJECT_KEY), HMAC_LENGTH)

adc.set_device_components(device['id'], pad_b64(device['key'])) # Haven't got past this yet, keep getting: NetCryptoError: Could not verify encrypted data

# signed_url = adc.sign_data(URL.encode(), True, HMAC_LENGTH)

# adc.sign_connection(URL.encode(), HTTP_BODY)

@leptos-null
Copy link

leptos-null commented Oct 29, 2019

I didn't really like using the byte buffers that weren't apart of the standard library, so I removed those dependencies

import hashlib
import hmac
import secrets
import base64
import pyaes # https://github.com/ricmoo/pyaes, pip install pyaes

"""
This code is a Python translation of LMApiaryDeviceCrypto, originally written in Objective-C.
    https://gist.github.com/leptos-null/8792b9c50fddc00cf525ed5055a872dc

The LMApiaryDeviceCrypto class was reverse engineered by Leptos from YouTube by Google.

I used Teleriks Fiddler as a proxy, enabled https decrypting, installed and trusted their https certificate on my phone,
    then connected to the proxy server from my phone to sniff the https packets.
"""

PROJECT_KEY = b'WrM95onSB5FfXofSzKWgkNZGiosfmCCAcTH4htvkuj4=' # YouTube Music Base64 Encoded Project Key
API_KEY = 'AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og' # One of YouTube Musics API Keys
HMAC_LENGTH = 4 # LMApiaryDeviceCrypto.h specifies '@param hmacLength Specify 4'
CC_SHA1_DIGEST_LENGTH = 20
URL = 'https://youtubei.googleapis.com/youtubei/v1/browse?key=AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og' # The test url to sign
HTTP_BODY = [0xB8, 0x05, 0x0A, 0xCA, 0x02, ..., 0x92, 0x01, 0x00, 0xF8, 0x01, 0x03] # Unfortunately this also contained personal information, so has been redacted
HTTP_BODY = bytes(HTTP_BODY)

def pad_b64(string: str) -> str:
    """
    Pad a given string with base64 padding characters, if needed
    """
    pad_size = 4
    remainder = len(string) % pad_size

    if remainder:
    	string += '=' * (pad_size - remainder)
    return string

class NetCryptoError(Exception): pass

class ApiaryDeviceCrypto(object):
    def __init__(self, project_key: bytes, hmac_length: int):
        self.hmac_length = hmac_length

        internal_hmac_length = 0x10
        project_key_length = len(project_key)

        if project_key_length >= internal_hmac_length:
            self.project_key = project_key[:internal_hmac_length]
            self.hmac_key = project_key[internal_hmac_length:]
    
    def set_device_components(self, id: str, key: str):
        self.device_key = self.decrypt_encoded_string(key)
        self.device_id = id
    
    def sign_url_request(self, url: bytes, http_body: bytes) -> dict:
        """
        The HTTP header field and value for signing a given request
        """
        signed_url = self.sign_data(url, True, 4)
        signed_content = self.sign_data(http_body, False, CC_SHA1_DIGEST_LENGTH)
        
        compound_value = f'device_id={self.device_id},data={signed_url},content={signed_content}'

        return {'X-Goog-Device-Auth': compound_value}
    
    def sign_data(self, data: bytes, pad: bool, hmac_length: int) -> str:
        digest = hashlib.sha1()
        digest.update(self.device_key)

        hashed_data = digest.digest()[:4]

        if pad:
            data += b'\x00'
        
        append_length = min(hmac_length, CC_SHA1_DIGEST_LENGTH)
        auth_code = hmac.new(self.device_key, data, hashlib.sha1)

        new_data = b'\x00'
        new_data += hashed_data
        new_data += auth_code.digest()[:append_length]

        encoded_data: bytes = base64.b64encode(new_data).rstrip(b'=')
        return encoded_data.decode('utf-8')
    
    def perform_crypto(self, data: bytes, output_len: int, iv: bytes, operation: str) -> bytes:
        key = self.project_key[:0x10]
        aes = pyaes.AESModeOfOperationCTR(key)
        # note: endianness, padding, iv not mentioned
        #   (original implementation specified big endian, no padding, and an iv)
        #   this seems to work fine right now
        if operation == 'encrypt': return aes.encrypt(data)[:output_len] # Does the iv need to be used here?
        elif operation == 'decrypt': return aes.decrypt(data)[:output_len] # Does the iv need to be used here?

    def padded_data(self, data: bytes) -> bytes:
        pad_mod = 0x10
        length_mod = len(data) % pad_mod

        if length_mod:
            data += b'\x00' * (pad_mod - length_mod)
        return data
    
    def project_key_signature(self) -> bytes:
        magic = bytes([00, 00, 00, 00, 00, 00, 00, 0x10])
        
        data = magic
        data += self.project_key
        data += magic
        data += self.hmac_key

        digest = hashlib.sha1()
        digest.update(data)

        return digest.digest()[:4]
    
    def decrypt_encoded_string(self, encoded) -> bytes:
        decoded = base64.b64decode(pad_b64(encoded))
        first_byte = decoded[0]

        if first_byte == 0:
            if len(decoded) >= 0xd:
                iv_data = self.padded_data(decoded[5:0xd])
                some_val = len(decoded) - self.hmac_length - 0xd

                if some_val >= 0:
                    if self.verify_signed_data(decoded):
                        cipher = self.padded_data(decoded[0xd:some_val + 0xd])

                        return self.perform_crypto(cipher, some_val, iv_data, 'decrypt')
                    else:
                        raise NetCryptoError("Could not verify encrypted data")
                else:
                    raise NetCryptoError("Could not determine cipher")
            else:
                raise NetCryptoError("Could not determine initializion vector")
        else:
            raise NetCryptoError("Could not determine key sign")
    
    def encrypt_and_encode(self, data: bytes) -> str:
        mut_data = bytes([0])
        mut_data += self.project_key_signature()

        iv_data = secrets.token_bytes(8)
        mut_data += iv_data

        crypto = self.perform_crypto(self.padded_data(data), len(data), self.padded_data(iv_data), 'encrypt')

        mut_data += crypto

        more_data = bytes([83])
        more_data += b'\x00' * 8
        more_data += mut_data
        
        auth_code = hmac.new(self.hmac_key, more_data, hashlib.sha1)
        mut_data += auth_code.digest()[:self.hmac_length]

        encoded_data = base64.b64encode(mut_data)

        return encoded_data.decode('utf-8')

    def verify_signed_data(self, data: bytes) -> bool:
        project_hash = data[1:5]

        if project_hash == self.project_key_signature():
            length_diff = len(data) - self.hmac_length

            if length_diff >= 0:
                high_data = data[length_diff:]
                low_data = data[:length_diff]
                mut_data = bytes([83])

                mut_data += b'\x00' * 8
                mut_data += low_data

                auth_code = hmac.new(self.hmac_key, mut_data, hashlib.sha1)
                check_data = auth_code.digest()[:self.hmac_length]
                return high_data == check_data

        return False

device = requests.post('https://youtubei.googleapis.com/deviceregistration/v1/devices?key=AIzaSyDK3iBpDP9nHVTk2qL73FLJICfOC3c51Og&rawDeviceId=RAW_DEVICE_ID').json()

adc = ApiaryDeviceCrypto(base64.b64decode(PROJECT_KEY), HMAC_LENGTH)
adc.set_device_components(device['id'], device['key'])
print(adc.sign_url_request(URL.encode(), HTTP_BODY))

@tombulled
Copy link
Author

@leptos-null Thank you so much, this got it working! I really hope this is able to help others beside myself, and I look forward to fully implementing this in my project. I'll make sure to properly update this gist and will be uploading all of my code in due course. Thanks again for everything!

@SuhatAkbulak
Copy link

hello sir How do we create an "HTTP BODY"?.I ran this library but "File" yt-apiary-device-crypto.py ", line 95 compound_value = f'device_id = {self.device_id}, data = {signed_url}, content = {signed_content} '" I get such an error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment