Skip to content

Instantly share code, notes, and snippets.

@AdrienHorgnies
Last active December 20, 2021 12:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AdrienHorgnies/2d46d2c7f0672f804f35597060cb19da to your computer and use it in GitHub Desktop.
Save AdrienHorgnies/2d46d2c7f0672f804f35597060cb19da to your computer and use it in GitHub Desktop.
Dummy implementation of a blockchain system with a single miner to display how the mining process works
import time
from collections import Counter
from dataclasses import dataclass, field
from hashlib import sha512
from typing import List
import matplotlib.pyplot as plt
import numpy as np
from tqdm import trange
@dataclass(frozen=False)
class Block:
previous_block_hash: bytes
messages: tuple
nonce: int
time: int
target_difficulty: int
@dataclass(frozen=True)
class FrozenBlock:
previous_block_hash: bytes
messages: tuple
nonce: int
time: int
target_difficulty: int
@dataclass(frozen=True)
class BlockchainSystem:
block_time: float = 6
adaptive_difficulty_interval: int = 10
block_time_sensitivity: float = .10
genesis: FrozenBlock = field(default_factory=lambda: FrozenBlock(
previous_block_hash=bytes(0),
messages=tuple(),
nonce=0,
time=int(time.time()),
target_difficulty=0
))
class Miner:
def __init__(self, system: BlockchainSystem):
self.blockchain: List[FrozenBlock] = [system.genesis]
self.system: BlockchainSystem = system
self.queue: List[str] = []
@property
def last_block(self):
return self.blockchain[-1]
def receive(self, message: str):
self.queue.append(message)
@staticmethod
def hash(block):
m = sha512()
m.update(
f"{block.previous_block_hash}{block.nonce}{block.messages}{block.time}{block.target_difficulty}".encode())
return m.digest()[:10]
def compute_difficulty(self):
if len(self.blockchain) % self.system.adaptive_difficulty_interval > 0:
return self.last_block.target_difficulty
few_last_times = [b.time for b in self.blockchain[-self.system.adaptive_difficulty_interval:]]
duration_avg = np.ediff1d(few_last_times + [int(time.time())]).mean()
ref = self.system.block_time
if np.isclose(duration_avg, ref, rtol=self.system.block_time_sensitivity):
return self.last_block.target_difficulty
elif duration_avg < ref:
diff = (ref - duration_avg) / ref
dist = round(diff / self.system.block_time_sensitivity)
return self.last_block.target_difficulty + dist
else:
diff = (duration_avg - ref) / ref
dist = round(diff / self.system.block_time_sensitivity)
return self.last_block.target_difficulty - dist
def select(self):
return Block(
previous_block_hash=self.hash(self.last_block),
messages=tuple(self.queue),
nonce=0,
time=int(time.time()),
target_difficulty=self.compute_difficulty())
def is_mined(self, block):
"""
A block is mined if its hash starts with a selection of numeric characters.
The number of concerned characters, and the selection of numeric characters depends on the difficulty.
n | n = self.target_difficulty // 256
e | e = 256 - self.target_difficulty % 256
The n first characters must be the byte 0
The n + 1 character must be a byte up to e (excluded).
"""
h = self.hash(block)
max_difficulty_chars = block.target_difficulty // 256
if not all(b == 0 for b in h[:max_difficulty_chars]):
return False
complement_difficulty = 256 - block.target_difficulty % 256
return h[max_difficulty_chars] < complement_difficulty
def mine(self, block):
while not self.is_mined(block):
block.nonce += 1
self.queue = []
self.blockchain.append(FrozenBlock(**vars(block)))
def main():
system = BlockchainSystem()
m = Miner(system)
print('searching stationary state')
for _ in trange(1000):
block = m.select()
m.mine(block)
print('simulating under stationary state')
for _ in trange(1000):
block = m.select()
m.mine(block)
difficulties = [b.target_difficulty for b in m.blockchain[1000:]]
print(np.mean(difficulties), np.median(difficulties))
counter = Counter(difficulties)
block_times = [b.time for b in m.blockchain[1000:]]
durations = np.ediff1d(block_times)
print(np.mean(durations), np.median(durations))
fig, (ax1, ax2) = plt.subplots(ncols=2)
ax1.set(xlabel="Difficulty", ylabel="Block count",
title="Difficulty")
ax2.set(xlabel="Block time",
title="Block time")
ax1.hist(difficulties, bins=len(counter))
ax2.hist(durations, bins=50)
plt.show()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment