Skip to content

Instantly share code, notes, and snippets.

@0300dbdd1b
Last active November 10, 2023 01:49
Show Gist options
  • Save 0300dbdd1b/91fa8b64613b9d0d0f1c712389c8e27f to your computer and use it in GitHub Desktop.
Save 0300dbdd1b/91fa8b64613b9d0d0f1c712389c8e27f to your computer and use it in GitHub Desktop.
This code is a refactoring of UTXOracle.py, enabling remote connection to the bitoin node using RPC
##############################################################
# remote_utxoracle.py #
# This code is a refactoring of UTXOracle.py #
# Enabling remote connection to the bitcoin node using RPC #
##############################################################
# Based on UTXOracle.py version 7 (https://utxo.live/oracle/UTXOracle.py)
# Using python-bitcoinrpc (https://github.com/jgarzik/python-bitcoinrpc)
# Import required libraries
from bitcoinrpc.authproxy import AuthServiceProxy
from datetime import datetime, timezone
from math import log10
# Define connection parameters for the Bitcoin client
RPC_USER = ""
RPC_PASS = ""
RPC_HOST = "xxx.xxx.xxx.xxx"
RPC_PORT = 8332
DEFAULT_TIMEOUT = 120
# Try to establish a connection to the Bitcoin client
try:
client = AuthServiceProxy(f"http://{RPC_USER}:{RPC_PASS}@{RPC_HOST}:{RPC_PORT}", timeout=DEFAULT_TIMEOUT)
except Exception as e:
# Exit the program if the connection fails and inform the user of the error
print(f"Error initializing the client: {e}")
exit()
def get_time_of_block(client, block_height):
"""
Retrieves the timestamp of a block given its height.
Args:
- client: Authenticated connection to the Bitcoin client.
- block_height: The height of the block whose timestamp is to be retrieved.
Returns:
- Block timestamp in seconds since Unix Epoch.
"""
block_hash = client.getblockhash(int(block_height))
block_header = client.getblockheader(block_hash)
return block_header['time']
def get_block_data(client, block_height):
"""
Retrieves block data and its corresponding timestamp.
Args:
- client: Authenticated connection to the Bitcoin client.
- block_height: The height of the block whose data is to be retrieved.
Returns:
- Tuple containing block data and its timestamp as a datetime object.
"""
block_hash = client.getblockhash(int(block_height))
block = client.getblock(block_hash, 2)
time_in_seconds = int(block['time'])
time_datetime = datetime.fromtimestamp(time_in_seconds, tz=timezone.utc)
return block, time_datetime
# Get the current block height from the local Bitcoin node
block_count = int(client.getblockcount())
# Retrieve the timestamp of the current block height
latest_time_in_seconds = get_time_of_block(client, block_count)
time_datetime = datetime.fromtimestamp(latest_time_in_seconds, tz=timezone.utc)
# Calculate the timestamp for UTC midnight of the latest day
latest_utc_midnight = datetime(time_datetime.year, time_datetime.month, time_datetime.day, 0, 0, 0, tzinfo=timezone.utc)
# Determine the latest possible price date as the day before today
seconds_in_a_day = 86400 # 60 seconds * 60 minutes * 24 hours
yesterday_seconds = latest_time_in_seconds - seconds_in_a_day
latest_price_day = datetime.fromtimestamp(yesterday_seconds, tz=timezone.utc)
latest_price_date = latest_price_day.strftime("%Y-%m-%d")
# Inform the user about the connection status and available price dates
print(f"Connected to local node at block #: {block_count}")
print(f"Latest available price date is: {latest_price_date}")
print("Earliest available price date is: 2020-07-26 (full node)")
#########################
# Get a date input from the user for which the BTC price is to be estimated
date_entered = input("\nEnter date in YYYY-MM-DD (or 'q' to quit):")
# If the user chooses to quit, exit the program
if date_entered == 'q':
exit()
# Validate the entered date
try:
year, month, day = [int(part) for part in date_entered.split('-')]
datetime_entered = datetime(year, month, day, tzinfo=timezone.utc)
# Ensure the entered date is before today
if datetime_entered >= latest_utc_midnight:
print("\nThe date entered is not before the current date, please try again")
exit()
# Ensure the entered date is after the earliest acceptable date (2020-07-26)
july_26_2020 = datetime(2020, 7, 26, tzinfo=timezone.utc)
if datetime_entered < july_26_2020:
print("\nThe date entered is before 2020-07-26, please try again")
exit()
except ValueError:
# Handle invalid date format
print("\nError interpreting date. Likely not entered in format YYYY-MM-DD")
print("Please try again\n")
exit()
# Convert the entered date to a timestamp and a formatted string for display
price_day_seconds = int(datetime_entered.timestamp())
price_day_date_utc = datetime_entered.strftime("%B %d, %Y")
#########################
# Estimate the block height corresponding to the entered date
# This is done by first making an approximate guess and then refining it
seconds_since_price_day = latest_time_in_seconds - price_day_seconds
blocks_ago_estimate = round(144 * seconds_since_price_day / seconds_in_a_day) # Average 144 blocks per day
price_day_block_estimate = block_count - blocks_ago_estimate
# Refine the estimated block height by iteratively adjusting
# the block height until it matches the timestamp of the entered date.
time_in_seconds = get_time_of_block(client, int(price_day_block_estimate))
# Calculate the difference in timestamps to estimate how many blocks we should adjust by
# We use the average of 144 blocks per day for this estimate.
seconds_difference = time_in_seconds - price_day_seconds
block_jump_estimate = round(144 * seconds_difference / seconds_in_a_day)
# Refine our block estimate by iteratively adjusting the block height until the timestamp matches our target.
# We continue this process until the adjustment is less than 6 blocks or until we oscillate between two values.
last_estimate = 0
last_last_estimate = 0
while block_jump_estimate > 6 and block_jump_estimate != last_last_estimate:
last_last_estimate = last_estimate
last_estimate = block_jump_estimate
price_day_block_estimate -= block_jump_estimate
time_in_seconds = get_time_of_block(client, int(price_day_block_estimate))
seconds_difference = time_in_seconds - price_day_seconds
block_jump_estimate = round(144 * seconds_difference / seconds_in_a_day)
# Further refine the block height by adjusting one block at a time until we find the exact first block of the target day
if time_in_seconds > price_day_seconds:
while time_in_seconds > price_day_seconds:
price_day_block_estimate -= 1
time_in_seconds = get_time_of_block(client, int(price_day_block_estimate))
price_day_block_estimate += 1
elif time_in_seconds < price_day_seconds:
while time_in_seconds < price_day_seconds:
price_day_block_estimate += 1
time_in_seconds = get_time_of_block(client, int(price_day_block_estimate))
price_day_block = price_day_block_estimate
#########################
# Now, we'll analyze the transaction outputs on the target day to build a distribution (bell curve) of BTC amounts.
# We're using a logarithmic scale for the bell curve to capture both small and large BTC amounts.
# Define the range (in log10) of BTC amounts to use for bins.
first_bin_value = -6
last_bin_value = 6
# Create the bins for the bell curve.
output_bell_curve_bins = [0.0] # Start with 0 for satoshis
# Create bins logarithmically from 100 sats (1e-6 BTC) to 100k (1e5) BTC.
for exponent in range(first_bin_value, last_bin_value):
bins_in_range = [10 ** (exponent + b / 200) for b in range(200)]
output_bell_curve_bins.extend(bins_in_range)
# Initialize a list to keep track of the number of transaction outputs in each bin.
output_bell_curve_bin_counts = [float(0.0) for _ in range(len(output_bell_curve_bins))]
number_of_bins = len(output_bell_curve_bins)
#########################
# Retrieve all transaction output amounts from all blocks on the target day.
# Display progress to the user as we process each block.
print(f"\nReading all blocks on {price_day_date_utc}...")
print("\nThis will take a few minutes (~144 blocks)...")
print("\nHeight\tTime(utc)\t\tTime(32bit)\t\tCompletion %")
block, time_datetime = get_block_data(client, price_day_block)
target_day_of_month = time_datetime.day
# Continue reading blocks until we reach a block on the day after the target day.
while target_day_of_month == time_datetime.day:
# Show progress to the user
progress_estimate = 100.0 * (time_datetime.hour + time_datetime.minute / 60) / 24.0
time_32bit = f"{int(time_datetime.timestamp()) & 0b11111111111111111111111111111111:32b}"
print(f"{price_day_block}\t{time_datetime.strftime('%H:%M:%S')}\t{time_32bit}\t{progress_estimate:.2f}%")
# Process each transaction output in the block and place it in the appropriate bin
for tx in block['tx']:
for output in tx['vout']:
amount = float(output['value'])
if 1e-6 < amount < 1e6:
amount_log = log10(amount)
percent_in_range = (amount_log - first_bin_value) / (last_bin_value - first_bin_value)
bin_number_est = int(percent_in_range * number_of_bins)
while output_bell_curve_bins[bin_number_est] <= amount:
bin_number_est += 1
output_bell_curve_bin_counts[bin_number_est - 1] += 1.0
# Fetch the next block to process
price_day_block += 1
block, time_datetime = get_block_data(client, price_day_block)
#########################
# Filter out the noise in the bell curve.
# This includes very small (below 1k sats) and very large (above 10 BTC) outputs.
output_bell_curve_bin_counts[:401] = [0] * 401
output_bell_curve_bin_counts[1601:] = [0] * (number_of_bins - 1601)
# Identify and smooth out bins corresponding to common round BTC amounts.
round_btc_bins = [
201, 401, 461, 496, 540, 601, 661, 696, 740, 801,
861, 896, 940, 1001, 1061, 1096, 1140, 1201
]
for r in round_btc_bins:
output_bell_curve_bin_counts[r] = 0.5 * (output_bell_curve_bin_counts[r+1] + output_bell_curve_bin_counts[r-1])
# Normalize the bell curve so that the area under it sums to 1.
curve_sum = sum(output_bell_curve_bin_counts[201:1601])
normalized_counts = [count / curve_sum for count in output_bell_curve_bin_counts[201:1601]]
# Filter out any extremely high values that might distort the curve.
filtered_counts = [min(count, 0.008) for count in normalized_counts]
output_bell_curve_bin_counts[201:1601] = filtered_counts
#########################
# Construct a stencil that represents common round USD output amounts.
# We'll slide this stencil over the bell curve to estimate the BTC price in USD.
# Create an empty stencil the same size as the bell curve.
round_usd_stencil = [0.0] * number_of_bins
# Define bin locations and their corresponding values for the stencil.
# These locations and values are based on historical analysis of BTC transaction data (2020-2023).
stencil_values = {
401: 0.0005957955691168063, 402: 0.0004454790662303128, 429: 0.0001763099393598914,
430: 0.0001851801497144573, 461: 0.0006205616481885794, 462: 0.0005985696860584984,
496: 0.0006919505728046619, 497: 0.0008912933078342840, 540: 0.0009372916238804205,
541: 0.0017125522985034724, 600: 0.0021702347223143030, 601: 0.0037018622326411380,
602: 0.0027322168706743802, 603: 0.0016268322583097678, 604: 0.0012601953416497664,
661: 0.0041425242880295460, 662: 0.0039247767475640830, 696: 0.0032399441632017228,
697: 0.0037112959007355585, 740: 0.0049921908828370000, 741: 0.0070636869018197105,
801: 0.0080000000000000000, 802: 0.0065431388282424440, 803: 0.0044279509203361735,
861: 0.0046132440551747015, 862: 0.0043647851395531140, 896: 0.0031980892880846567,
897: 0.0034237641632481910, 939: 0.0025995335505435034, 940: 0.0032631930982226645,
941: 0.0042753262790881080, 1001: 0.0037699501474772350, 1002: 0.0030872891064215764,
1003: 0.0023237040836798163, 1061: 0.0023671764210889895, 1062: 0.0020106877104798474,
1140: 0.0009099214128654502, 1141: 0.0012008546799361498, 1201: 0.0007862586076341524,
1202: 0.0006900048077192579
}
# Populate the stencil with the defined values.
for bin_location, value in stencil_values.items():
round_usd_stencil[bin_location] = value
#########################
# Slide the stencil over the bell curve to determine where it fits best.
# The best fit indicates the most likely BTC price in USD.
# Initialize variables to keep track of the best fit
best_slide = 0
best_slide_score = 0.0
total_score = 0.0
# Create a list to store the scores for each slide of the stencil over the bell curve.
slide_scores = []
# Iterate over a range, sliding the stencil over the bell curve.
for slide in range(-200, 200):
shifted_curve = output_bell_curve_bin_counts[201+slide:1401+slide]
slide_score = sum([shifted_curve[n] * round_usd_stencil[n+201] for n in range(len(shifted_curve))])
slide_scores.append(slide_score)
total_score += slide_score
if slide_score > best_slide_score:
best_slide_score = slide_score
best_slide = slide
# Calculate the estimated BTC price in USD based on the best fit.
usd100_in_btc_best = output_bell_curve_bins[801+best_slide]
btc_in_usd_best = 100 / usd100_in_btc_best
# Calculate scores for the neighboring positions of the best fit.
neighbor_up_score = sum([output_bell_curve_bin_counts[201+best_slide+1+n] * round_usd_stencil[n+201] for n in range(1200)])
neighbor_down_score = sum([output_bell_curve_bin_counts[201+best_slide-1+n] * round_usd_stencil[n+201] for n in range(1200)])
# Determine which neighboring position has a higher score.
best_neighbor = -1 if neighbor_down_score > neighbor_up_score else 1
usd100_in_btc_2nd = output_bell_curve_bins[801+best_slide+best_neighbor]
btc_in_usd_2nd = 100 / usd100_in_btc_2nd
# Compute a weighted average of the two BTC price estimates (best fit and its best neighbor).
avg_score = total_score / len(slide_scores)
a1, a2 = best_slide_score - avg_score, abs((neighbor_down_score if neighbor_down_score > neighbor_up_score else neighbor_up_score) - avg_score)
w1, w2 = a1 / (a1 + a2), a2 / (a1 + a2)
price_estimate = int(w1 * btc_in_usd_best + w2 * btc_in_usd_2nd)
# Display the final estimated BTC price in USD.
print(f"\nThe {price_day_date_utc} BTC price estimate is: ${price_estimate:,}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment