Skip to content

Instantly share code, notes, and snippets.

@RobinLinus
Last active July 4, 2023 02:45
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RobinLinus/7af1838985a51cb395ffcb9b9aed202a to your computer and use it in GitHub Desktop.
Save RobinLinus/7af1838985a51cb395ffcb9b9aed202a to your computer and use it in GitHub Desktop.
#
# Basic Implementation of the Sats4Files protocol
# It allows a client and a server to perform an atomic swap on files against coins.
# Verifiable encryption allows the client to check the file integrity before buying the decryption key
# For more infos, see this Gist https://gist.github.com/RobinLinus/bb1368ce812935c02948a193aedc386c/
#
import ecdsa
from ecdsa.ellipticcurve import INFINITY
import hashlib
import random
curve = ecdsa.SECP256k1.curve
G = ecdsa.SECP256k1.generator
order = ecdsa.SECP256k1.order
import zlib
import time
def string_to_chunks(string):
# Compress string to bytes
byte_string = zlib.compress(string.encode())
# Chunk the byte string into 31-byte chunks
byte_chunks = [byte_string[i:i+31] for i in range(0, len(byte_string), 31)]
# Convert each byte chunk to an integer
int_chunks = []
for chunk in byte_chunks:
# Convert each byte to an integer and shift it to the correct position
# in the resulting integer
int_chunk = 0
for i in range(len(chunk)):
int_chunk += chunk[i] << (8 * (len(chunk) - i - 1))
int_chunks.append(int_chunk)
return int_chunks
def chunks_to_string(int_chunks):
# Convert each integer to a byte string
byte_strings = [int_chunk.to_bytes((int_chunk.bit_length() + 7) // 8, byteorder='big') for int_chunk in int_chunks]
# Join the byte strings to form the original byte string
byte_string = b''.join(byte_strings)
# Decompress the byte string to a string
string = zlib.decompress(byte_string).decode()
return string
def extended_euclidean(a, b):
if b == 0:
return a, 1, 0
else:
g, x, y = extended_euclidean(b, a % b)
return g, y, x - (a // b) * y
def inverse_mod(a, m):
g, x, _ = extended_euclidean(a, m)
if g != 1:
raise Exception("Modular inverse does not exist")
else:
return x % m
def add(a, b, p):
return (a + b) % p
def subtract(a, b, p):
return (a - b) % p
def multiply(a, b, p):
return (a * b) % p
def divide(a, b, p):
return multiply(a, inverse_mod(b, p), p)
def lagrange_interpolation(xs, ys, x, p):
"""Performs Lagrange interpolation over a finite field.
Args:
xs (list): x-coordinates of the known points
ys (list): y-coordinates of the known points
x (int): x-coordinate of the unknown point
p (int): Prime number representing the order of the finite field
Returns:
int: y-coordinate of the unknown point
"""
assert len(xs) == len(ys), "xs and ys must have the same length"
y = 0
for i in range(len(xs)):
li = 1
for j in range(len(xs)):
if i != j:
numer = subtract(x, xs[j], p)
denom = subtract(xs[i], xs[j], p)
li = multiply(li, divide(numer, denom, p), p)
y = add(y, multiply(ys[i], li, p), p)
return y
def enumerate_integers(n, start=1):
return list(range(start, start+n))
def lagrange_interpolation_curve(curve, G, xs, ys, x):
"""Performs Lagrange interpolation over elliptic curve points.
Args:
curve (ecdsa.ellipticcurve.Curve): The elliptic curve to use
G (ecdsa.ellipticcurve.Point): The generator point of the elliptic curve
xs (list): x-coordinates of the known points
ys (list): y-coordinates of the known points (elliptic curve points)
x (int): x-coordinate of the unknown point
Returns:
ecdsa.ellipticcurve.Point: The y-coordinate (point) of the unknown point
"""
assert len(xs) == len(ys), "xs and ys must have the same length"
y = INFINITY # Initialize y as the point at infinity
for i in range(len(xs)):
li = 1
for j in range(len(xs)):
if i != j:
numer = x - xs[j]
denom = xs[i] - xs[j]
li *= numer * inverse_mod(denom, G.order())
y = y + ys[i] * li
return y
def map_scalars_to_points(scalars, generator):
points = []
for scalar in scalars:
point = generator * scalar
points.append(point)
return points
def hash_curve_points(point_list):
# Create a SHA-256 hash object
sha256 = hashlib.sha256()
# Iterate through each curve point in the list
for point in point_list:
# Serialize the curve point to a byte string
point_bytes = point.to_bytes()
# Update the SHA-256 hash with the serialized curve point bytes
sha256.update(point_bytes)
# Get the digest of the hash as a byte string
hash_bytes = sha256.digest()
# Return the hash digest as a hex string
return hash_bytes.hex()
def encrypt(ys):
# Generate a random 31-byte integer unique to this request
rand_int = random.randint(0, (2**8)**31)
ys.append(rand_int)
num_chunks = len(ys)
xs = enumerate_integers(num_chunks)
ood_xs = enumerate_integers(num_chunks, num_chunks + 1)
ood_ys = []
for i in range( num_chunks ):
x = ood_xs[i]
y = lagrange_interpolation(xs, ys, x, order)
ood_ys.append(y)
secret = ood_ys.pop()
adaptor_point = G * secret
return secret, adaptor_point, ood_ys
def verify(ood_ys, adaptor_point):
num_chunks = len(ood_ys) + 1
OOD_YS = map_scalars_to_points(ood_ys, G)
OOD_YS.append(adaptor_point)
xs = enumerate_integers(num_chunks)
ood_xs = enumerate_integers(num_chunks, num_chunks + 1)
YS = []
for i in range(num_chunks - 1):
x = xs[i]
Y = lagrange_interpolation_curve(curve, G, ood_xs, OOD_YS, x)
YS.append(Y)
y_affine = Y.to_affine()
file_id = hash_curve_points( YS )
return file_id
def decrypt(ood_ys, secret):
ood_ys.append(secret)
num_chunks = len(ood_ys)
xs = enumerate_integers(num_chunks)
file = []
for i in range(num_chunks - 1):
x = xs[i]
y = lagrange_interpolation(xs, ys, x, order)
file.append(y)
file = chunks_to_string(file)
return file
# Example usage
file = """Bitcoin: A Peer-to-Peer Electronic Cash System
Abstract. A purely peer-to-peer version of electronic cash would allow online
payments to be sent directly from one party to another without going through a
financial institution. Digital signatures provide part of the solution, but the main
benefits are lost if a trusted third party is still required to prevent double-spending.
We propose a solution to the double-spending problem using a peer-to-peer network.
The network timestamps transactions by hashing them into an ongoing chain of
hash-based proof-of-work, forming a record that cannot be changed without redoing
the proof-of-work. The longest chain not only serves as proof of the sequence of
events witnessed, but proof that it came from the largest pool of CPU power. As
long as a majority of CPU power is controlled by nodes that are not cooperating to
attack the network, they'll generate the longest chain and outpace attackers. The
network itself requires minimal structure. Messages are broadcast on a best effort
basis, and nodes can leave and rejoin the network at will, accepting the longest
proof-of-work chain as proof of what happened while they were gone."""
# The Author publishes their file and notifies the Client about file_id somehow
ys = string_to_chunks(file)
file_id = hash_curve_points( map_scalars_to_points(ys, G))
print(f'Step 0: Publish file {file_id}' )
# Client requests file_id from the Server
# The Server responds with the encrypted file and an adaptor point to be used in the Lightning invoice
start = time.time()
secret, adaptor_point, ood_ys = encrypt(ys)
print('\nStep 1: Server responds to Client request with encrypted file and adapator point')
print('ood_ys', ood_ys)
print('adaptor_point', adaptor_point.to_affine())
print(f'Encrypted in {time.time()-start:.2f} seconds')
# Client verifies the file_id of the encrypted file
start = time.time()
file_id = verify(ood_ys, adaptor_point)
print(f'\nStep 2: Client verifies the file id {file_id}')
print(f'Verified in {time.time()-start:.2f} seconds')
# Client purchases the secret via LN
print('\nStep 3: Client purchases via LN PTLC the secret dlog of the adaptor point', secret)
# Client decrypts the file using the secret
start = time.time()
print('\nStep 4: Client decrypts file:\n')
file_id = verify(ood_ys, adaptor_point)
print( decrypt(ood_ys, secret) )
print(f'Decrypted in {time.time()-start:.2f} seconds')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment