Skip to content

Instantly share code, notes, and snippets.

@gimre-xymcity
Created April 27, 2023 17:19
Show Gist options
  • Save gimre-xymcity/60f7d09ff533971908de2bf1f5fc5154 to your computer and use it in GitHub Desktop.
Save gimre-xymcity/60f7d09ff533971908de2bf1f5fc5154 to your computer and use it in GitHub Desktop.
symbol PLTE digester
import argparse
from pathlib import Path
import time
import tqdm
from symbolchain.CryptoTypes import Hash256, Signature
from symbolchain.facade.SymbolFacade import SymbolFacade
from symbolchain.sc import BlockFactory, Height, TransactionType
from symbolchain.symbol.Merkle import MerkleHashBuilder
OBJECTS_PER_STORAGE_DIRECTORY = 10_000
def parse_args():
parser = argparse.ArgumentParser(description='Digester')
parser.add_argument('--data', help='path to data directory', required=True)
parser.add_argument('--bpf', help='blocks per file (from config-node fileDatabaseBatchSize)', default=100, type=int)
parser.add_argument('--start', help='start height', default=200_000)
parser.add_argument('--watch', help='after processing, watch the chain for changes', action='store_true')
return parser.parse_args()
class Digester:
def __init__(self, data_path, file_database_batch_size):
self.data_path = data_path
self.file_database_batch_size = file_database_batch_size
self.batch_file_path = None
self.batch_file_data = None
self.block_offsets = None
self.end = 0
def get_chain_height(self):
with open(self.data_path / 'index.dat', 'rb') as input_file:
data = input_file.read()
return Height.deserialize(data)
def get_semi_finalized_height(self):
with open(self.data_path / 'voting_status.dat', 'rb', buffering=0) as input_file:
data = input_file.read()
epoch = int.from_bytes(data[0:4], byteorder='little', signed=False)
round = int.from_bytes(data[4:8], byteorder='little', signed=False)
foo = int.from_bytes(data[8:12], byteorder='little', signed=False)
prevote_path = self.data_path / 'votes_backup' / f'{epoch}' / f'{round}_prevote.dat'
if not prevote_path.exists():
prevote_path = self.data_path / 'votes_backup' / f'{epoch}' / f'{round - 1}_prevote.dat'
with prevote_path.open(mode='rb', buffering=0) as input_file:
data = input_file.read()
offset = 0xD8
height = int.from_bytes(data[offset:offset+8], byteorder='little', signed=False)
return height
return 0
def parse_offsets(self, ignore_first_zero_offset=False):
"""There are absolute offsets to blocks at the beginning of the file, parse and store them."""
buffer = memoryview(self.batch_file_data)
self.block_offsets = []
self.num_blocks = 0
for i in range(self.file_database_batch_size):
offset = int.from_bytes(buffer[:8], byteorder='little', signed=False)
# if this is very first batch file it will have 0 as a first entry
# for any other batch file, 0 means all offsets have been read
if not offset and not ignore_first_zero_offset and i == 0:
break
self.block_offsets.append(offset)
buffer = buffer[8:]
def read_batchfile(self, height, force_reread):
id = (height // self.file_database_batch_size) * self.file_database_batch_size
directory = f'{id // OBJECTS_PER_STORAGE_DIRECTORY:05}'
name = f'{id % OBJECTS_PER_STORAGE_DIRECTORY:05}.dat'
file_path = self.data_path / directory / name
if self.batch_file_path == file_path and not force_reread:
return
with file_path.open(mode='rb', buffering=0) as input_file:
self.batch_file_data = input_file.read()
self.batch_file_path = file_path
self.parse_offsets(id == 0)
def get_block(self, height, force_reread):
self.read_batchfile(height, force_reread)
entry_in_batch = height % self.file_database_batch_size
if entry_in_batch >= len(self.block_offsets):
raise RuntimeError(f'block with given height ({height}) is not present in batch file')
offset = self.block_offsets[entry_in_batch]
return BlockFactory.deserialize(self.batch_file_data[offset:])
def process_blocks(digester, parser, start_height, end_height, force_reread=False):
for height in tqdm.tqdm(range(start_height, end_height), desc='block progress', mininterval=0.5, ascii=False):
block = digester.get_block(height, force_reread)
parser.parse_block(block)
print('\n')
parser.finalize()
class PltParser:
def __init__(self):
self.facade = SymbolFacade('mainnet')
self.block = None
self.tx_index = None
self.parent_tx = None
def store_metadata(self, data_type, sub_tx):
with open('metadata.csv', 'a', encoding='utf8') as output_file:
line = f'{data_type};{self.facade.hash_transaction(self.parent_tx)};{self.block.height.value};{self.tx_index}\n'
output_file.write(line)
def parse_transfer_message(self, tx, message):
if b'PLTE' == message[4:8]:
self.store_metadata('PLTE', tx)
with open(f'plte_{self.block.height.value}_{self.tx_index}.dat', 'wb') as out_file:
out_file.write(message)
print(f'got plte at height: {self.block.height}')
elif b'PLTE' in message:
position = message.index(b'PLTE')
if position > 4:
size = int.from_bytes(message[position-4:position], byteorder='little', signed=False)
if size < 65536:
print('slight chance', message)
def parse_transaction(self, index, parent_tx, sub_tx):
if sub_tx.type_ != TransactionType.TRANSFER:
return
if not sub_tx.message or len(sub_tx.message) < 4:
return
self.tx_index = index
self.parent_tx = parent_tx
return self.parse_transfer_message(sub_tx, sub_tx.message)
def parse_block(self, block):
if 0 == len(block.transactions):
return
self.block = block
for index, tx in enumerate(self.block.transactions):
if tx.type_ in [TransactionType.AGGREGATE_COMPLETE, TransactionType.AGGREGATE_BONDED]:
for sub_tx in tx.transactions:
self.parse_transaction(index, tx, sub_tx)
else:
self.parse_transaction(index, tx, tx)
def finalize(self):
pass
def main(Parser):
args = parse_args()
data_path = Path(args.data)
if not data_path.exists():
print(f'{data_path} does not exist')
digester = Digester(data_path, args.bpf)
finalized_height = digester.get_semi_finalized_height()
digester.end = finalized_height
parser = Parser()
start = int(args.start)
print(f'finalized height {finalized_height}')
process_blocks(digester, parser, start, finalized_height)
if args.watch:
while True:
print('waiting for finalized height change', end='', flush=True)
while True:
temp_finalized_height = digester.get_semi_finalized_height()
if temp_finalized_height != finalized_height:
break
time.sleep(60)
print('.', end='', flush=True)
print(f' ({temp_finalized_height}) ✅')
process_blocks(digester, parser, finalized_height, temp_finalized_height, force_reread=True)
finalized_height = temp_finalized_height
if '__main__' == __name__:
main(PltParser)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment