Last active
March 5, 2022 09:07
-
-
Save horstle/f3680902708084a11abb6428fb5fcff4 to your computer and use it in GitHub Desktop.
Python script to extract libwidevinecdm.so from a Chrome OS recovery image (as .bin or .zip) in the same directory as the script.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from __future__ import absolute_import, division, unicode_literals | |
import os | |
from struct import calcsize, unpack | |
from zipfile import ZipFile | |
from io import UnsupportedOperation | |
class ChromeOSImage: | |
""" | |
The main class handling a Chrome OS image | |
Information related to ext2 is sourced from here: https://www.nongnu.org/ext2-doc/ext2.html | |
""" | |
def __init__(self, imgpath): | |
"""Prepares the image""" | |
self.imgpath = imgpath | |
self.bstream = self.get_bstream(imgpath) | |
self.part_offset = None | |
self.sb_dict = None | |
self.blocksize = None | |
self.blk_groups = None | |
def gpt_header(self): | |
"""Returns the needed parts of the GPT header, can be easily expanded if necessary""" | |
header_fmt = '<8s4sII4x4Q16sQ3I' | |
header_size = calcsize(header_fmt) | |
lba_size = 512 # assuming LBA size | |
self.seek_stream(lba_size) | |
# GPT Header entries: signature, revision, header_size, header_crc32, (reserved 4x skipped,) current_lba, backup_lba, | |
# first_usable_lba, last_usable_lba, disk_guid, start_lba_part_entries, num_part_entries, | |
# size_part_entry, crc32_part_entries | |
_, _, _, _, _, _, _, _, _, start_lba_part_entries, num_part_entries, size_part_entry, _ = unpack(header_fmt, self.read_stream(header_size)) | |
return (start_lba_part_entries, num_part_entries, size_part_entry) | |
def chromeos_offset(self): | |
"""Calculate the Chrome OS losetup start offset""" | |
part_format = '<16s16sQQQ72s' | |
entries_start, entries_num, entry_size = self.gpt_header() # assuming partition table is GPT | |
lba_size = 512 # assuming LBA size | |
self.seek_stream(entries_start * lba_size) | |
if not calcsize(part_format) == entry_size: | |
print(4, 'Partition table entries are not 128 bytes long') | |
return 0 | |
for index in range(1, entries_num + 1): # pylint: disable=unused-variable | |
# Entry: type_guid, unique_guid, first_lba, last_lba, attr_flags, part_name | |
_, _, first_lba, _, _, part_name = unpack(part_format, self.read_stream(entry_size)) | |
part_name = part_name.decode('utf-16').strip('\x00') | |
if part_name == 'ROOT-A': # assuming partition name is ROOT-A | |
offset = first_lba * lba_size | |
break | |
if not offset: | |
print(4, 'Failed to calculate losetup offset.') | |
return 0 | |
return offset | |
def extract_file(self, filename, extract_path): | |
"""Extracts the file from the image""" | |
self.part_offset = self.chromeos_offset() | |
self.sb_dict = self.superblock() | |
self.blk_groups = self.block_groups() | |
bin_filename = filename.encode('ascii') | |
chunksize = 4 * 1024**2 | |
chunk1 = self.read_stream(chunksize) | |
while True: | |
chunk2 = self.read_stream(chunksize) | |
if not chunk2: | |
print(4, 'File {filename} not found in the ChromeOS image'.format(filename=filename)) | |
return False | |
chunk = chunk1 + chunk2 | |
if bin_filename in chunk: | |
i_index_pos = chunk.index(bin_filename) - 8 | |
dir_dict = self.dir_entry(chunk[i_index_pos:i_index_pos + len(filename) + 8]) | |
if dir_dict['inode'] < self.sb_dict['s_inodes_count'] and dir_dict['name_len'] == len(filename): | |
break | |
chunk1 = chunk2 | |
blk_group_num = (dir_dict['inode'] - 1) // self.sb_dict['s_inodes_per_group'] | |
blk_group = self.blk_groups[blk_group_num] | |
i_index_in_group = (dir_dict['inode'] - 1) % self.sb_dict['s_inodes_per_group'] | |
inode_pos = self.part_offset + self.blocksize * blk_group['bg_inode_table'] + self.sb_dict['s_inode_size'] * i_index_in_group | |
inode_dict, _ = self.inode_table(inode_pos) | |
return self.write_file(inode_dict, os.path.join(extract_path, filename)) | |
def superblock(self): | |
"""Get relevant info from the superblock, assert it's an ext2 fs""" | |
names = ('s_inodes_count', 's_blocks_count', 's_r_blocks_count', 's_free_blocks_count', 's_free_inodes_count', 's_first_data_block', | |
's_log_block_size', 's_log_frag_size', 's_blocks_per_group', 's_frags_per_group', 's_inodes_per_group', 's_mtime', 's_wtime', | |
's_mnt_count', 's_max_mnt_count', 's_magic', 's_state', 's_errors', 's_minor_rev_level', 's_lastcheck', 's_checkinterval', | |
's_creator_os', 's_rev_level', 's_def_resuid', 's_def_resgid', 's_first_ino', 's_inode_size', 's_block_group_nr', | |
's_feature_compat', 's_feature_incompat', 's_feature_ro_compat', 's_uuid', 's_volume_name', 's_last_mounted', | |
's_algorithm_usage_bitmap', 's_prealloc_block', 's_prealloc_dir_blocks') | |
fmt = '<13I6H4I2HI2H3I16s16s64sI2B818x' | |
fmt_len = calcsize(fmt) | |
self.seek_stream(self.part_offset + 1024) # superblock starts after 1024 byte | |
pack = self.read_stream(fmt_len) | |
sb_dict = dict(list(zip(names, unpack(fmt, pack)))) | |
sb_dict['s_magic'] = hex(sb_dict['s_magic']) | |
assert sb_dict['s_magic'] == '0xef53' # assuming/checking this is an ext2 fs | |
block_groups_count1 = sb_dict['s_blocks_count'] / sb_dict['s_blocks_per_group'] | |
block_groups_count1 = int(block_groups_count1) if float(int(block_groups_count1)) == block_groups_count1 else int(block_groups_count1) + 1 | |
block_groups_count2 = sb_dict['s_inodes_count'] / sb_dict['s_inodes_per_group'] | |
block_groups_count2 = int(block_groups_count2) if float(int(block_groups_count2)) == block_groups_count2 else int(block_groups_count2) + 1 | |
assert block_groups_count1 == block_groups_count2 | |
sb_dict['block_groups_count'] = block_groups_count1 | |
self.blocksize = 1024 << sb_dict['s_log_block_size'] | |
return sb_dict | |
def block_group(self): | |
"""Get info about a block group""" | |
names = ('bg_block_bitmap', 'bg_inode_bitmap', 'bg_inode_table', 'bg_free_blocks_count', 'bg_free_inodes_count', 'bg_used_dirs_count', 'bg_pad') | |
fmt = '<3I4H12x' | |
fmt_len = calcsize(fmt) | |
pack = self.read_stream(fmt_len) | |
blk = unpack(fmt, pack) | |
blk_dict = dict(list(zip(names, blk))) | |
return blk_dict | |
def block_groups(self): | |
"""Get info about all block groups""" | |
if self.blocksize == 1024: | |
self.seek_stream(self.part_offset + 2 * self.blocksize) | |
else: | |
self.seek_stream(self.part_offset + self.blocksize) | |
blk_groups = [] | |
for i in range(self.sb_dict['block_groups_count']): # pylint: disable=unused-variable | |
blk_group = self.block_group() | |
blk_groups.append(blk_group) | |
return blk_groups | |
def inode_table(self, inode_pos): | |
"""Reads and returns an inode table and inode size""" | |
names = ('i_mode', 'i_uid', 'i_size', 'i_atime', 'i_ctime', 'i_mtime', 'i_dtime', 'i_gid', 'i_links_count', 'i_blocks', 'i_flags', | |
'i_osd1', 'i_block0', 'i_block1', 'i_block2', 'i_block3', 'i_block4', 'i_block5', 'i_block6', 'i_block7', 'i_block8', | |
'i_block9', 'i_block10', 'i_block11', 'i_blocki', 'i_blockii', 'i_blockiii', 'i_generation', 'i_file_acl', 'i_dir_acl', 'i_faddr') | |
fmt = '<2Hi4I2H3I15I4I12x' | |
fmt_len = calcsize(fmt) | |
inode_size = self.sb_dict['s_inode_size'] | |
self.seek_stream(inode_pos) | |
pack = self.read_stream(fmt_len) | |
inode = unpack(fmt, pack) | |
inode_dict = dict(list(zip(names, inode))) | |
inode_dict['i_mode'] = hex(inode_dict['i_mode']) | |
blocks = inode_dict['i_size'] / self.blocksize | |
inode_dict['blocks'] = int(blocks) if float(int(blocks)) == blocks else int(blocks) + 1 | |
self.read_stream(inode_size - fmt_len) | |
return inode_dict, inode_size | |
@staticmethod | |
def dir_entry(chunk): | |
"""Returns the directory entry found in chunk""" | |
dir_names = ('inode', 'rec_len', 'name_len', 'file_type', 'name') | |
dir_fmt = '<IHBB' + str(len(chunk) - 8) + 's' | |
dir_dict = dict(list(zip(dir_names, unpack(dir_fmt, chunk)))) | |
return dir_dict | |
def iblock_ids(self, blk_id, ids_to_read): | |
"""Reads the block indices/IDs from an indirect block""" | |
seek_pos = self.part_offset + self.blocksize * blk_id | |
self.seek_stream(seek_pos) | |
fmt = '<' + str(int(self.blocksize / 4)) + 'I' | |
ids = list(unpack(fmt, self.read_stream(self.blocksize))) | |
ids_to_read -= len(ids) | |
return ids, ids_to_read | |
def iiblock_ids(self, blk_id, ids_to_read): | |
"""Reads the block indices/IDs from a doubly-indirect block""" | |
seek_pos = self.part_offset + self.blocksize * blk_id | |
self.seek_stream(seek_pos) | |
fmt = '<' + str(int(self.blocksize / 4)) + 'I' | |
iids = unpack(fmt, self.read_stream(self.blocksize)) | |
ids = [] | |
for iid in iids: | |
if ids_to_read <= 0: | |
break | |
ind_block_ids, ids_to_read = self.iblock_ids(iid, ids_to_read) | |
ids += ind_block_ids | |
return ids, ids_to_read | |
def seek_stream(self, seek_pos): | |
"""Move position of bstream to seek_pos""" | |
try: | |
self.bstream[0].seek(seek_pos) | |
self.bstream[1] = seek_pos | |
return | |
except UnsupportedOperation: | |
chunksize = 4 * 1024**2 | |
if seek_pos >= self.bstream[1]: | |
while seek_pos - self.bstream[1] > chunksize: | |
self.read_stream(chunksize) | |
self.read_stream(seek_pos - self.bstream[1]) | |
return | |
self.bstream[0].close() | |
self.bstream[1] = 0 | |
self.bstream = self.get_bstream(self.imgpath) | |
while seek_pos - self.bstream[1] > chunksize: | |
self.read_stream(chunksize) | |
self.read_stream(seek_pos - self.bstream[1]) | |
return | |
def read_stream(self, num_of_bytes): | |
"""Read and return a chunk of the bytestream""" | |
self.bstream[1] += num_of_bytes | |
return self.bstream[0].read(num_of_bytes) | |
def get_block_ids(self, inode_dict): | |
"""Get all block indices/IDs of an inode""" | |
ids_to_read = inode_dict['blocks'] | |
block_ids = [inode_dict['i_block' + str(i)] for i in range(12)] | |
ids_to_read -= 12 | |
if not inode_dict['i_blocki'] == 0: | |
iblocks, ids_to_read = self.iblock_ids(inode_dict['i_blocki'], ids_to_read) | |
block_ids += iblocks | |
if not inode_dict['i_blockii'] == 0: | |
iiblocks, ids_to_read = self.iiblock_ids(inode_dict['i_blockii'], ids_to_read) | |
block_ids += iiblocks | |
return block_ids[:inode_dict['blocks']] | |
def read_file(self, block_ids): | |
"""Read blocks specified by IDs into a dict""" | |
block_dict = {} | |
for block_id in block_ids: | |
seek_pos = self.part_offset + self.blocksize * block_id | |
self.seek_stream(seek_pos) | |
block_dict[block_id] = self.read_stream(self.blocksize) | |
return block_dict | |
@staticmethod | |
def write_file_chunk(opened_file, chunk, bytes_to_write): | |
"""Writes bytes to file in chunks""" | |
if len(chunk) > bytes_to_write: | |
opened_file.write(chunk[:bytes_to_write]) | |
return 0 | |
opened_file.write(chunk) | |
return bytes_to_write - len(chunk) | |
def write_file(self, inode_dict, filepath): | |
"""Writes file specified by its inode to filepath""" | |
bytes_to_write = inode_dict['i_size'] | |
block_ids = self.get_block_ids(inode_dict) | |
block_ids_sorted = block_ids[:] | |
block_ids_sorted.sort() | |
block_dict = self.read_file(block_ids_sorted) | |
write_dir = os.getcwd() | |
with open(filepath, 'wb') as opened_file: | |
for block_id in block_ids: | |
bytes_to_write = self.write_file_chunk(opened_file, block_dict[block_id], bytes_to_write) | |
if bytes_to_write == 0: | |
return True | |
return False | |
@staticmethod | |
def get_bstream(imgpath): | |
"""Get a bytestream of the image""" | |
if imgpath.endswith('.zip'): | |
bstream = ZipFile(imgpath, 'r').open(os.path.basename(imgpath).strip('.zip'), 'r') # pylint: disable=consider-using-with | |
else: | |
bstream = open(imgpath, 'rb') # pylint: disable=consider-using-with | |
return [bstream, 0] | |
if __name__ == '__main__': | |
for filename in os.listdir(os.getcwd()): | |
if filename.startswith('chromeos'): | |
ChromeOSImage(os.path.join(os.getcwd(), filename)).extract_file('libwidevinecdm.so', os.getcwd()) | |
break |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment