Skip to content

Instantly share code, notes, and snippets.

@ca0abinary
Created February 2, 2023 21:34
Show Gist options
  • Save ca0abinary/25191485044a7ccbe7d147cde2c4717e to your computer and use it in GitHub Desktop.
Save ca0abinary/25191485044a7ccbe7d147cde2c4717e to your computer and use it in GitHub Desktop.
Reads the partition table of an EBS snapshot using Ebs Direct API
# python -m pip install pyreadpartitions
# Update the volume_id to a volume with one or more snapshots you have access to
# Run the script!
from pyreadpartitions import get_disk_partitions_info
def main():
ebs = EbsDirectIO(volume_id='vol-0ec018d77cf9af4dd')
print(f'''
Got data for volume {ebs.snapshot["VolumeId"]}
Volume Size: {ebs.snapshot["VolumeSize"]} GiB
Snapshot Id: {ebs.snapshot["SnapshotId"]}
Snapshot Start: {ebs.snapshot["StartTime"]}
Snapshot State: {ebs.snapshot["State"]}
''')
info = get_disk_partitions_info(ebs)
print(f' Uses {"MBR" if info.mbr is not None else "GPT"} partition table {info.mbr if info.mbr is not None else info.gpt}')
ebs.close()
#####
# EbsDirectIO.py
#####
from collections import namedtuple
from typing import List
import boto3
import sqlite3
from mypy_boto3_ec2 import EC2Client
from mypy_boto3_ebs import EBSClient
from mypy_boto3_ec2.type_defs import SnapshotTypeDef
from mypy_boto3_ebs.type_defs import BlockTypeDef
class EbsDirectIO:
ec2_client:EC2Client = None
ebs_client:EBSClient = None
snapshot:SnapshotTypeDef = None
offset:int = 0
block_size:int = 524288
db_connection:sqlite3.Connection = None
db_cursor:sqlite3.Cursor = None
DataBlock = namedtuple("DataBlock", "block_index block_token data")
def __init__(self, ec2_client:EC2Client=None,
ebs_client:EBSClient=None,
snapshot:SnapshotTypeDef=None,
volume_id:str=None
) -> None:
self.ebs_client = ebs_client
if self.ebs_client is None:
self.ebs_client = boto3.client('ebs')
self.ec2_client = ec2_client
if self.ec2_client is None:
self.ec2_client = boto3.client('ec2')
self.snapshot = snapshot
if self.snapshot is None:
snapshots = self.ec2_client.describe_snapshots(Filters=[{
'Name': 'volume-id',
'Values': [volume_id]
}])['Snapshots']
self.snapshot = sorted(snapshots,
key=lambda x: x['StartTime'],
reverse=True)[0]
self.db_connect()
first_block = self.db_get_block(1)
if first_block is None:
initial_blocks = self.ebs_client.list_snapshot_blocks(SnapshotId=self.snapshot["SnapshotId"], MaxResults=100)["Blocks"]
self.db_put_block_defs(initial_blocks)
def db_connect(self) -> None:
self.db_connection = sqlite3.connect(f'{self.snapshot["SnapshotId"]}.db')
with self.db_connection as con:
con.execute('CREATE TABLE IF NOT EXISTS [blocks] (block_index INT NOT NULL PRIMARY KEY, block_token INT NOT NULL, data BLOB NULL)')
def db_put_block_defs(self, blocks:List[BlockTypeDef]) -> None:
with self.db_connection as con:
con.executemany('''
INSERT INTO [blocks] (block_index, block_token) VALUES (?, ?)
ON CONFLICT(block_index) DO UPDATE SET block_token=excluded.block_token
''', [(x['BlockIndex'], x['BlockToken']) for x in blocks])
def db_get_block(self, block_index:int) -> DataBlock:
with self.db_connection as con:
res = con.execute('SELECT * FROM [blocks] WHERE block_index = ?', (block_index,))
result = res.fetchone()
if result is None: return None
return self.DataBlock(*result)
def db_put_block(self, block_index:int, block_data:bytes) -> None:
with self.db_connection as con:
con.execute('UPDATE [blocks] SET data = ? WHERE block_index = ?', (block_data, block_index,))
def tell(self) -> int: return self.offset
def fileno(self) -> int: return 0
def seek(self, offset:int=0) -> int:
self.offset = offset
return self.offset
def close(self) -> None:
self.db_connection.close()
def read(self, size:int=-1) -> bytes:
offset_block = int(self.offset / self.block_size)
num_blocks = int(size / self.block_size)
if num_blocks < 1 and size > 0: num_blocks = 1
data_buffer = bytearray()
for i in range(0, num_blocks):
block_data = self.db_get_block(offset_block + i)
if block_data is None:
block_defs = self.ebs_client.list_snapshot_blocks(SnapshotId=self.snapshot["SnapshotId"], StartingBlockIndex=offset_block, MaxResults=100)["Blocks"]
self.db_put_block_defs(block_defs)
block_data = self.db_get_block(offset_block + i)
if block_data.data is None:
data = self.ebs_client.get_snapshot_block(SnapshotId=self.snapshot['SnapshotId'],
BlockIndex=block_data.block_index,
BlockToken=block_data.block_token)
blob = data['BlockData'].read()
self.db_put_block(block_data.block_index, blob)
data_buffer.extend(blob)
else:
data = self.db_get_block(offset_block + i)
data_buffer.extend(data.data)
sub_block_offset = self.offset % self.block_size
self.offset = self.offset + size
return bytes(data_buffer[sub_block_offset:sub_block_offset+size])
####
# Run
####
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment