Skip to content

Instantly share code, notes, and snippets.

@v1nc
Last active November 24, 2022 07:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save v1nc/1a4ecf7cc868d5683198453bebce581d to your computer and use it in GitHub Desktop.
Save v1nc/1a4ecf7cc868d5683198453bebce581d to your computer and use it in GitHub Desktop.
Bitcoin Heuristics 1+2
import requests
import sys
import time
# add a list of addresses to the clusters list
def add_address_list_to_clusters(clusters, new_list):
found_cluster = None
# check all new addresses in the list
for new_address in new_list:
# skip address if empty
if len(new_address) == 0:
pass
# stop search if cluster was found
if not found_cluster == None:
break
# search all clusters if a cluster already contains the address
for cluster in clusters:
if new_address in cluster:
found_cluster = cluster
break
if found_cluster == None:
# none of the new addresses was found in the clusters, add as new cluster
clusters.append(new_list)
else:
# one of the new addresses was found in the clusters, add the new address to the found cluster
found_cluster = found_cluster | new_list
pass
# check if two addresses are in the same cluster
def check_if_addresses_in_same_cluster(clusters, a1, a2):
for cluster in clusters:
if a1 in cluster:
if a2 in cluster:
return True
else:
return False
if a2 in cluster:
if a1 in cluster:
return True
else:
return False
return False
# checks if an address was part of a transaction before a given timestamp
def check_if_transaction_before_time(address, timestamp):
url = f"https://blockchain.info/rawaddr/{address}"
resp = requests.get(url=url)
time.sleep(10) # respect API limit
data = resp.json()
if not "txs" in data or len(data["txs"]) == 0:
return False
for transaction in data["txs"]:
if transaction["time"] < timestamp:
return True
return False
# merge clusters of the input of a transaction and the cluster of a given address
def merge_clusters(clusters, transaction, address):
if "inputs" in transaction and len(transaction["inputs"]) > 0:
if ("prev_out" in transaction["inputs"][0]
and "addr" in transaction["inputs"][0]["prev_out"]
and len(transaction["inputs"][0]["prev_out"]["addr"]) > 0):
# get one input address
input_address = transaction["inputs"][0]["prev_out"]["addr"]
new_address_cluster = None
for cluster in clusters:
if address in cluster:
new_address_cluster = cluster
break
if not new_address_cluster == None:
merged = False
for cluster in clusters:
if input_address in cluster:
# merge clusters of the input address and the new unused address
cluster = cluster | new_address_cluster
merged = True
break
if merged:
clusters.remove(new_address_cluster)
URL = "https://blockchain.info/rawblock/"
BLOCK = 200000
if len(sys.argv) > 1:
BLOCK = int(sys.argv[1])
FULL_URL = f"{URL}{BLOCK}"
# get block data
resp = requests.get(url=FULL_URL)
time.sleep(10) # respect API limit
data = resp.json()
addresses = set({})
clusters = []
# do heuristic 1
# loop over all transactions
for transaction in data["tx"]:
# get all inputs of the transaction
if "inputs" in transaction:
current_inputs = set({})
# loop over all input addresses of the transaction
for inputs in transaction["inputs"]:
if "prev_out" in inputs:
if "addr" in inputs["prev_out"] and len(inputs["prev_out"]["addr"]) > 0:
# add address to the list of all addresses
addresses.add(inputs["prev_out"]["addr"])
# add address to the list of addresses of the current transaction
current_inputs.add(inputs["prev_out"]["addr"])
# add set of input addresses of the current transaction to the clusters
add_address_list_to_clusters(clusters, current_inputs)
# loop over all out addresses
if "out" in transaction:
for outputs in transaction["out"]:
if "addr" in outputs and len(outputs["addr"])> 0:
# add address to the list of all addresses
addresses.add(outputs["addr"])
# add set only containing the address to the clusters
add_address_list_to_clusters(clusters, set({outputs["addr"]}))
single_clusters_count = 0
total = len(addresses)
cluster_count_1 = 0
cluster_count_2 = 0
multi_cluster_count = 0
# count clusters for heuristic 1
for cluster in clusters:
if len(cluster) > 0:
cluster_count_1+=1
# do heuristic 2
# loop over all transactions
for transaction in data["tx"]:
# find transactions with out length 2
if ("out" in transaction and len(transaction["out"]) == 2
and "addr" in transaction["out"][0]
and "addr" in transaction["out"][1]):
a1 = transaction["out"][0]["addr"]
a2 = transaction["out"][1]["addr"]
if check_if_addresses_in_same_cluster(clusters, a1, a2):
# addresses are already in the same cluster, dont need to check them again
pass
else:
a1_appeared_before = check_if_transaction_before_time(a1, transaction["time"])
a2_appeared_before = check_if_transaction_before_time(a2, transaction["time"])
if a1_appeared_before and not a2_appeared_before:
# a2 and input are same owner, merge cluster of a2 with input cluster
merge_clusters(clusters, transaction, a2)
if not a1_appeared_before and a2_appeared_before:
# a1 and input are same owner, merge cluster of a1 with input cluster
merge_clusters(clusters, transaction, a1)
pass
# count clusters for heuristic 2
for cluster in clusters:
if len(cluster) == 1:
single_clusters_count+=1
cluster_count_2+=1
if len(cluster) > 1:
multi_cluster_count+=1
cluster_count_2+=1
print(f'Output for block of height: {BLOCK}')
print(f'---')
print(f'Number of addresses in total: \t\t\t\t {total}')
print(f'Number of clusters after heuristic 1: \t\t\t {cluster_count_1}')
print(f'Number of clusters after heuristic 2: \t\t\t {cluster_count_2}')
print(f'---')
print(f'Number of these clusters with exactly one address: \t {single_clusters_count}')
print(f'Number of these clusters with more than one address: \t {multi_cluster_count}')
print(f'---')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment