Created
April 27, 2023 17:19
-
-
Save gimre-xymcity/60f7d09ff533971908de2bf1f5fc5154 to your computer and use it in GitHub Desktop.
symbol PLTE digester
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from pathlib import Path | |
import time | |
import tqdm | |
from symbolchain.CryptoTypes import Hash256, Signature | |
from symbolchain.facade.SymbolFacade import SymbolFacade | |
from symbolchain.sc import BlockFactory, Height, TransactionType | |
from symbolchain.symbol.Merkle import MerkleHashBuilder | |
OBJECTS_PER_STORAGE_DIRECTORY = 10_000 | |
def parse_args(): | |
parser = argparse.ArgumentParser(description='Digester') | |
parser.add_argument('--data', help='path to data directory', required=True) | |
parser.add_argument('--bpf', help='blocks per file (from config-node fileDatabaseBatchSize)', default=100, type=int) | |
parser.add_argument('--start', help='start height', default=200_000) | |
parser.add_argument('--watch', help='after processing, watch the chain for changes', action='store_true') | |
return parser.parse_args() | |
class Digester: | |
def __init__(self, data_path, file_database_batch_size): | |
self.data_path = data_path | |
self.file_database_batch_size = file_database_batch_size | |
self.batch_file_path = None | |
self.batch_file_data = None | |
self.block_offsets = None | |
self.end = 0 | |
def get_chain_height(self): | |
with open(self.data_path / 'index.dat', 'rb') as input_file: | |
data = input_file.read() | |
return Height.deserialize(data) | |
def get_semi_finalized_height(self): | |
with open(self.data_path / 'voting_status.dat', 'rb', buffering=0) as input_file: | |
data = input_file.read() | |
epoch = int.from_bytes(data[0:4], byteorder='little', signed=False) | |
round = int.from_bytes(data[4:8], byteorder='little', signed=False) | |
foo = int.from_bytes(data[8:12], byteorder='little', signed=False) | |
prevote_path = self.data_path / 'votes_backup' / f'{epoch}' / f'{round}_prevote.dat' | |
if not prevote_path.exists(): | |
prevote_path = self.data_path / 'votes_backup' / f'{epoch}' / f'{round - 1}_prevote.dat' | |
with prevote_path.open(mode='rb', buffering=0) as input_file: | |
data = input_file.read() | |
offset = 0xD8 | |
height = int.from_bytes(data[offset:offset+8], byteorder='little', signed=False) | |
return height | |
return 0 | |
def parse_offsets(self, ignore_first_zero_offset=False): | |
"""There are absolute offsets to blocks at the beginning of the file, parse and store them.""" | |
buffer = memoryview(self.batch_file_data) | |
self.block_offsets = [] | |
self.num_blocks = 0 | |
for i in range(self.file_database_batch_size): | |
offset = int.from_bytes(buffer[:8], byteorder='little', signed=False) | |
# if this is very first batch file it will have 0 as a first entry | |
# for any other batch file, 0 means all offsets have been read | |
if not offset and not ignore_first_zero_offset and i == 0: | |
break | |
self.block_offsets.append(offset) | |
buffer = buffer[8:] | |
def read_batchfile(self, height, force_reread): | |
id = (height // self.file_database_batch_size) * self.file_database_batch_size | |
directory = f'{id // OBJECTS_PER_STORAGE_DIRECTORY:05}' | |
name = f'{id % OBJECTS_PER_STORAGE_DIRECTORY:05}.dat' | |
file_path = self.data_path / directory / name | |
if self.batch_file_path == file_path and not force_reread: | |
return | |
with file_path.open(mode='rb', buffering=0) as input_file: | |
self.batch_file_data = input_file.read() | |
self.batch_file_path = file_path | |
self.parse_offsets(id == 0) | |
def get_block(self, height, force_reread): | |
self.read_batchfile(height, force_reread) | |
entry_in_batch = height % self.file_database_batch_size | |
if entry_in_batch >= len(self.block_offsets): | |
raise RuntimeError(f'block with given height ({height}) is not present in batch file') | |
offset = self.block_offsets[entry_in_batch] | |
return BlockFactory.deserialize(self.batch_file_data[offset:]) | |
def process_blocks(digester, parser, start_height, end_height, force_reread=False): | |
for height in tqdm.tqdm(range(start_height, end_height), desc='block progress', mininterval=0.5, ascii=False): | |
block = digester.get_block(height, force_reread) | |
parser.parse_block(block) | |
print('\n') | |
parser.finalize() | |
class PltParser: | |
def __init__(self): | |
self.facade = SymbolFacade('mainnet') | |
self.block = None | |
self.tx_index = None | |
self.parent_tx = None | |
def store_metadata(self, data_type, sub_tx): | |
with open('metadata.csv', 'a', encoding='utf8') as output_file: | |
line = f'{data_type};{self.facade.hash_transaction(self.parent_tx)};{self.block.height.value};{self.tx_index}\n' | |
output_file.write(line) | |
def parse_transfer_message(self, tx, message): | |
if b'PLTE' == message[4:8]: | |
self.store_metadata('PLTE', tx) | |
with open(f'plte_{self.block.height.value}_{self.tx_index}.dat', 'wb') as out_file: | |
out_file.write(message) | |
print(f'got plte at height: {self.block.height}') | |
elif b'PLTE' in message: | |
position = message.index(b'PLTE') | |
if position > 4: | |
size = int.from_bytes(message[position-4:position], byteorder='little', signed=False) | |
if size < 65536: | |
print('slight chance', message) | |
def parse_transaction(self, index, parent_tx, sub_tx): | |
if sub_tx.type_ != TransactionType.TRANSFER: | |
return | |
if not sub_tx.message or len(sub_tx.message) < 4: | |
return | |
self.tx_index = index | |
self.parent_tx = parent_tx | |
return self.parse_transfer_message(sub_tx, sub_tx.message) | |
def parse_block(self, block): | |
if 0 == len(block.transactions): | |
return | |
self.block = block | |
for index, tx in enumerate(self.block.transactions): | |
if tx.type_ in [TransactionType.AGGREGATE_COMPLETE, TransactionType.AGGREGATE_BONDED]: | |
for sub_tx in tx.transactions: | |
self.parse_transaction(index, tx, sub_tx) | |
else: | |
self.parse_transaction(index, tx, tx) | |
def finalize(self): | |
pass | |
def main(Parser): | |
args = parse_args() | |
data_path = Path(args.data) | |
if not data_path.exists(): | |
print(f'{data_path} does not exist') | |
digester = Digester(data_path, args.bpf) | |
finalized_height = digester.get_semi_finalized_height() | |
digester.end = finalized_height | |
parser = Parser() | |
start = int(args.start) | |
print(f'finalized height {finalized_height}') | |
process_blocks(digester, parser, start, finalized_height) | |
if args.watch: | |
while True: | |
print('waiting for finalized height change', end='', flush=True) | |
while True: | |
temp_finalized_height = digester.get_semi_finalized_height() | |
if temp_finalized_height != finalized_height: | |
break | |
time.sleep(60) | |
print('.', end='', flush=True) | |
print(f' ({temp_finalized_height}) ✅') | |
process_blocks(digester, parser, finalized_height, temp_finalized_height, force_reread=True) | |
finalized_height = temp_finalized_height | |
if '__main__' == __name__: | |
main(PltParser) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment