Skip to content

Instantly share code, notes, and snippets.

@ismaelsadeeq
Last active May 15, 2024 17:48
Show Gist options
  • Save ismaelsadeeq/e2767040b720f65986976b3f8c6b7b20 to your computer and use it in GitHub Desktop.
Save ismaelsadeeq/e2767040b720f65986976b3f8c6b7b20 to your computer and use it in GitHub Desktop.
import configparser
import concurrent.futures
import json
import requests
Config = configparser.ConfigParser()
Config.read("rpc_config.ini")
URL = Config.get("RPC_INFO", "URL")
RPCUSER = Config.get("RPC_INFO", "RPCUSER")
RPCPASSWORD = Config.get("RPC_INFO", "RPCPASSWORD")
END = 843457
START = 842456
FILE_PATH = "clusters.json"
def make_request(payload):
headers = {'content-type': "application/json", 'cache-control': "no-cache"}
response = requests.request("POST", URL, data=payload, headers=headers, auth=(RPCUSER, RPCPASSWORD))
return json.loads(response.text)["result"]
def getblockhash(block_number):
payload = json.dumps({"method": "getblockhash", "params": [block_number]})
return make_request(payload)
def get_block(number):
block_hash = getblockhash(number)
payload = json.dumps({"method": "getblock", "params": [block_hash, 2]})
return make_request(payload)
def analyze_block(block_number):
try:
print(f"processing block {block_number}")
seen_transactions = set()
txs_in_package = set()
block_txs = get_block(block_number)['tx']
for tx in block_txs:
seen_transactions.add(tx["txid"])
for parent in tx["vin"]:
if "txid" in parent and parent["txid"] in seen_transactions:
txs_in_package.add(tx["txid"])
number_of_block_txs = len(block_txs)
number_of_txs_in_package = len(txs_in_package)
individual_txs = number_of_block_txs - number_of_txs_in_package
if number_of_block_txs > 0:
return {
"block": block_number,
"txs_in_block": number_of_block_txs,
"cluster_1_txs": individual_txs,
"perc_of_cluster_1_txs": int((individual_txs / number_of_block_txs) * 100),
"txs_in_cluster_size_more_than_1": number_of_txs_in_package,
"perc_of_txs_in_cluster_size_more_than_1": int((number_of_txs_in_package / number_of_block_txs) * 100)
}
except Exception as e:
print(f"Failed to analyze block {block_number}: {e}")
return None
def analyze_blocks_parallel():
response = []
total_txs = 0
total_package_txs = 0
total_individual = 0
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(analyze_block, block_number) for block_number in range(START, END)]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
response.append(result)
total_txs += result["txs_in_block"]
total_individual += result["cluster_1_txs"]
total_package_txs += result["txs_in_cluster_size_more_than_1"]
if total_txs > 0:
response.append({
"total_txs_in_block": total_txs,
"total_cluster_1_txs": total_individual,
"total_perc_of_cluster_1_txs": int((total_individual / total_txs) * 100),
"total_txs_in_cluster_size_more_than_1": total_package_txs,
"total_perc_of_txs_in_cluster_size_more_than_1": int((total_package_txs / total_txs) * 100)
})
return response
def dump_data_to_file(file_path, logs):
logs_json = json.dumps(logs, indent=4)
try:
with open(file_path, "w") as dump_file:
dump_file.write(logs_json)
except Exception as e:
print(f"Failed to write analyzed mempool txs to {file_path}: {e}")
if __name__ == "__main__":
response = analyze_blocks_parallel()
dump_data_to_file(FILE_PATH, response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment