Skip to content

Instantly share code, notes, and snippets.

@amb
Created February 13, 2021 21:06
Show Gist options
  • Save amb/c1ade483248eb090992cc69241aa66db to your computer and use it in GitHub Desktop.
Save amb/c1ade483248eb090992cc69241aa66db to your computer and use it in GitHub Desktop.
Simple chainanalysis with Python and Blockcypher
from blockcypher import get_address_details, get_transaction_details
import json
import datetime
def open_chain_info(addr, func):
"cache web API data locally"
def _datetime_conv(o):
if isinstance(o, datetime.datetime):
return o.__str__()
filepath = "store/" + addr + ".json"
try:
f = open(filepath, "r")
try:
c = json.load(f)
except json.decoder.JSONDecodeError:
print("Faulty JSON, reloading...")
else:
f.close()
return c
except FileNotFoundError:
print("File not found, creating:", filepath)
f = open(filepath, "w")
contents = func(addr)
json.dump(contents, f, default=_datetime_conv)
f.close()
return contents
class BitcoinAddressInputs:
__slots__ = "transactions", "input_addresses", "total_received", "address", "all_addresses"
def __init__(self, address):
self.address = address
self.transactions = []
self.input_addresses = set()
self.total_received = 1.0
def pretty_transactions(self):
return "".join(f"{i[0]} \t=> {self.address} \t{i[1]}\n" for i in self.transactions)
def list_transactions(self):
# tuple(input, output, amount)
return [(i[0], self.address, i[1]) for i in self.transactions]
def add_input(self, a, val):
s = set([a])
self.input_addresses |= s
self.transactions.append((a, val))
def normalize_inputs(self):
"Divide all inputs by (total_received/sum(inputs))"
s = sum(i[1] for i in self.transactions)
t = self.total_received
self.transactions = [(i[0], i[1] * t / s) for i in self.transactions]
def get_all_addresses(self):
return set([self.address]) | self.input_addresses
def address_all_inputs(walk_address, max_total=2.0, filtered=set([])):
"read all inputs for a given Bitcoin address"
adi = BitcoinAddressInputs(walk_address)
res = open_chain_info(walk_address, get_address_details)
t = res["total_received"] / 100000000
if t > max_total:
return None
adi.total_received = t
for t in res["txrefs"]:
tx = t["tx_hash"]
deets = open_chain_info(tx, get_transaction_details)
# check if walk_address in outputs
atest = [
i
for i in deets["outputs"]
if i["addresses"] is not None and walk_address in i["addresses"]
]
assert len(atest) < 2
# walk through all inputs
if len(atest) > 0:
for it in deets["inputs"]:
if "addresses" not in it or it["addresses"][0] in filtered:
# empty input, or address in filter list
continue
a = it["addresses"][0]
assert len(it["addresses"]) == 1
adi.add_input(a, atest[0]["value"] / 100000000)
# adi.add_input(a, atest[0]["value"] / 100000000 / len(deets["inputs"]))
# adi.add_input(a, 1.0)
adi.normalize_inputs()
return adi
filter_these = set([""])
# backtrack N steps
# 14 max
addr = ""
# input transactions
btc_i = address_all_inputs(addr, filtered=filter_these)
all_addr = btc_i.get_all_addresses()
psource, pdest, pvalue = [list(i) for i in zip(*btc_i.list_transactions())]
already_scanned = set([addr])
inputs_loop = btc_i.input_addresses
for _ in range(5):
new_inputs = set([])
for a in inputs_loop:
if a in already_scanned:
continue
already_scanned.add(a)
new_i = address_all_inputs(a, filtered=filter_these)
if new_i is None:
# hit max received
continue
all_addr |= new_i.get_all_addresses()
new_inputs |= new_i.input_addresses
a, b, c = [tuple(i)[0] for i in zip(*new_i.list_transactions())]
psource.append(a)
pdest.append(b)
pvalue.append(c)
inputs_loop = new_inputs
# format data for plotting
indices = {i: e for e, i in enumerate(all_addr)}
plabel = list(indices.keys())
for i in range(len(psource)):
psource[i] = indices[psource[i]]
pdest[i] = indices[pdest[i]]
def transform_a(tx_inputs, value):
all_addresses = []
all_inputs = {}
all_inputs[addr] = list(zip(tx_inputs, [value] * len(tx_inputs)))
for _ in range(0):
all_addresses.extend(tx_inputs)
new_addresses = set()
for i in tx_inputs:
d, v = address_all_inputs(i)
ai = set(d) - filter_these
all_inputs[i] = list(zip(ai, [v] * len(ai)))
new_addresses |= ai
tx_inputs = list(new_addresses)
return all_inputs
def plot_transactions(labels, psource, pdest, pvalue):
import plotly
import plotly.graph_objects as go
gs = go.Sankey(
orientation="v",
node=dict(pad=15, thickness=20, label=labels),
link=dict(source=psource, target=pdest, value=pvalue),
)
fig = go.Figure(data=[gs])
fig.update_layout(title_text="Blockchain transactions", font_size=10)
# fig.show()
plotly.offline.plot(fig, filename="html/transactions.html")
def plot_input_graph(g_inputs):
indexing = {}
labels = []
# index all discovered addresses as numbers
count = 0
for k, v in g_inputs.items():
indexing[k] = count
labels.append(k)
count += 1
for a, b in v:
if a not in indexing:
indexing[a] = count
labels.append(a)
count += 1
# create list for plotly
psource, pdest, pvalue = [], [], []
for k, v in g_inputs.items():
for s in v:
psource.append(indexing[s[0]])
pdest.append(indexing[k])
pvalue.append(s[1])
plot_transactions(labels, psource, pdest, pvalue)
plot_transactions(plabel, psource, pdest, pvalue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment