-
-
Save RobinLinus/7af1838985a51cb395ffcb9b9aed202a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Basic Implementation of the Sats4Files protocol | |
# It allows a client and a server to perform an atomic swap on files against coins. | |
# Verifiable encryption allows the client to check the file integrity before buying the decryption key | |
# For more infos, see this Gist https://gist.github.com/RobinLinus/bb1368ce812935c02948a193aedc386c/ | |
# | |
import ecdsa | |
from ecdsa.ellipticcurve import INFINITY | |
import hashlib | |
import random | |
curve = ecdsa.SECP256k1.curve | |
G = ecdsa.SECP256k1.generator | |
order = ecdsa.SECP256k1.order | |
import zlib | |
import time | |
def string_to_chunks(string): | |
# Compress string to bytes | |
byte_string = zlib.compress(string.encode()) | |
# Chunk the byte string into 31-byte chunks | |
byte_chunks = [byte_string[i:i+31] for i in range(0, len(byte_string), 31)] | |
# Convert each byte chunk to an integer | |
int_chunks = [] | |
for chunk in byte_chunks: | |
# Convert each byte to an integer and shift it to the correct position | |
# in the resulting integer | |
int_chunk = 0 | |
for i in range(len(chunk)): | |
int_chunk += chunk[i] << (8 * (len(chunk) - i - 1)) | |
int_chunks.append(int_chunk) | |
return int_chunks | |
def chunks_to_string(int_chunks): | |
# Convert each integer to a byte string | |
byte_strings = [int_chunk.to_bytes((int_chunk.bit_length() + 7) // 8, byteorder='big') for int_chunk in int_chunks] | |
# Join the byte strings to form the original byte string | |
byte_string = b''.join(byte_strings) | |
# Decompress the byte string to a string | |
string = zlib.decompress(byte_string).decode() | |
return string | |
def extended_euclidean(a, b): | |
if b == 0: | |
return a, 1, 0 | |
else: | |
g, x, y = extended_euclidean(b, a % b) | |
return g, y, x - (a // b) * y | |
def inverse_mod(a, m): | |
g, x, _ = extended_euclidean(a, m) | |
if g != 1: | |
raise Exception("Modular inverse does not exist") | |
else: | |
return x % m | |
def add(a, b, p): | |
return (a + b) % p | |
def subtract(a, b, p): | |
return (a - b) % p | |
def multiply(a, b, p): | |
return (a * b) % p | |
def divide(a, b, p): | |
return multiply(a, inverse_mod(b, p), p) | |
def lagrange_interpolation(xs, ys, x, p): | |
"""Performs Lagrange interpolation over a finite field. | |
Args: | |
xs (list): x-coordinates of the known points | |
ys (list): y-coordinates of the known points | |
x (int): x-coordinate of the unknown point | |
p (int): Prime number representing the order of the finite field | |
Returns: | |
int: y-coordinate of the unknown point | |
""" | |
assert len(xs) == len(ys), "xs and ys must have the same length" | |
y = 0 | |
for i in range(len(xs)): | |
li = 1 | |
for j in range(len(xs)): | |
if i != j: | |
numer = subtract(x, xs[j], p) | |
denom = subtract(xs[i], xs[j], p) | |
li = multiply(li, divide(numer, denom, p), p) | |
y = add(y, multiply(ys[i], li, p), p) | |
return y | |
def enumerate_integers(n, start=1): | |
return list(range(start, start+n)) | |
def lagrange_interpolation_curve(curve, G, xs, ys, x): | |
"""Performs Lagrange interpolation over elliptic curve points. | |
Args: | |
curve (ecdsa.ellipticcurve.Curve): The elliptic curve to use | |
G (ecdsa.ellipticcurve.Point): The generator point of the elliptic curve | |
xs (list): x-coordinates of the known points | |
ys (list): y-coordinates of the known points (elliptic curve points) | |
x (int): x-coordinate of the unknown point | |
Returns: | |
ecdsa.ellipticcurve.Point: The y-coordinate (point) of the unknown point | |
""" | |
assert len(xs) == len(ys), "xs and ys must have the same length" | |
y = INFINITY # Initialize y as the point at infinity | |
for i in range(len(xs)): | |
li = 1 | |
for j in range(len(xs)): | |
if i != j: | |
numer = x - xs[j] | |
denom = xs[i] - xs[j] | |
li *= numer * inverse_mod(denom, G.order()) | |
y = y + ys[i] * li | |
return y | |
def map_scalars_to_points(scalars, generator): | |
points = [] | |
for scalar in scalars: | |
point = generator * scalar | |
points.append(point) | |
return points | |
def hash_curve_points(point_list): | |
# Create a SHA-256 hash object | |
sha256 = hashlib.sha256() | |
# Iterate through each curve point in the list | |
for point in point_list: | |
# Serialize the curve point to a byte string | |
point_bytes = point.to_bytes() | |
# Update the SHA-256 hash with the serialized curve point bytes | |
sha256.update(point_bytes) | |
# Get the digest of the hash as a byte string | |
hash_bytes = sha256.digest() | |
# Return the hash digest as a hex string | |
return hash_bytes.hex() | |
def encrypt(ys): | |
# Generate a random 31-byte integer unique to this request | |
rand_int = random.randint(0, (2**8)**31) | |
ys.append(rand_int) | |
num_chunks = len(ys) | |
xs = enumerate_integers(num_chunks) | |
ood_xs = enumerate_integers(num_chunks, num_chunks + 1) | |
ood_ys = [] | |
for i in range( num_chunks ): | |
x = ood_xs[i] | |
y = lagrange_interpolation(xs, ys, x, order) | |
ood_ys.append(y) | |
secret = ood_ys.pop() | |
adaptor_point = G * secret | |
return secret, adaptor_point, ood_ys | |
def verify(ood_ys, adaptor_point): | |
num_chunks = len(ood_ys) + 1 | |
OOD_YS = map_scalars_to_points(ood_ys, G) | |
OOD_YS.append(adaptor_point) | |
xs = enumerate_integers(num_chunks) | |
ood_xs = enumerate_integers(num_chunks, num_chunks + 1) | |
YS = [] | |
for i in range(num_chunks - 1): | |
x = xs[i] | |
Y = lagrange_interpolation_curve(curve, G, ood_xs, OOD_YS, x) | |
YS.append(Y) | |
y_affine = Y.to_affine() | |
file_id = hash_curve_points( YS ) | |
return file_id | |
def decrypt(ood_ys, secret): | |
ood_ys.append(secret) | |
num_chunks = len(ood_ys) | |
xs = enumerate_integers(num_chunks) | |
file = [] | |
for i in range(num_chunks - 1): | |
x = xs[i] | |
y = lagrange_interpolation(xs, ys, x, order) | |
file.append(y) | |
file = chunks_to_string(file) | |
return file | |
# Example usage | |
file = """Bitcoin: A Peer-to-Peer Electronic Cash System | |
Abstract. A purely peer-to-peer version of electronic cash would allow online | |
payments to be sent directly from one party to another without going through a | |
financial institution. Digital signatures provide part of the solution, but the main | |
benefits are lost if a trusted third party is still required to prevent double-spending. | |
We propose a solution to the double-spending problem using a peer-to-peer network. | |
The network timestamps transactions by hashing them into an ongoing chain of | |
hash-based proof-of-work, forming a record that cannot be changed without redoing | |
the proof-of-work. The longest chain not only serves as proof of the sequence of | |
events witnessed, but proof that it came from the largest pool of CPU power. As | |
long as a majority of CPU power is controlled by nodes that are not cooperating to | |
attack the network, they'll generate the longest chain and outpace attackers. The | |
network itself requires minimal structure. Messages are broadcast on a best effort | |
basis, and nodes can leave and rejoin the network at will, accepting the longest | |
proof-of-work chain as proof of what happened while they were gone.""" | |
# The Author publishes their file and notifies the Client about file_id somehow | |
ys = string_to_chunks(file) | |
file_id = hash_curve_points( map_scalars_to_points(ys, G)) | |
print(f'Step 0: Publish file {file_id}' ) | |
# Client requests file_id from the Server | |
# The Server responds with the encrypted file and an adaptor point to be used in the Lightning invoice | |
start = time.time() | |
secret, adaptor_point, ood_ys = encrypt(ys) | |
print('\nStep 1: Server responds to Client request with encrypted file and adapator point') | |
print('ood_ys', ood_ys) | |
print('adaptor_point', adaptor_point.to_affine()) | |
print(f'Encrypted in {time.time()-start:.2f} seconds') | |
# Client verifies the file_id of the encrypted file | |
start = time.time() | |
file_id = verify(ood_ys, adaptor_point) | |
print(f'\nStep 2: Client verifies the file id {file_id}') | |
print(f'Verified in {time.time()-start:.2f} seconds') | |
# Client purchases the secret via LN | |
print('\nStep 3: Client purchases via LN PTLC the secret dlog of the adaptor point', secret) | |
# Client decrypts the file using the secret | |
start = time.time() | |
print('\nStep 4: Client decrypts file:\n') | |
file_id = verify(ood_ys, adaptor_point) | |
print( decrypt(ood_ys, secret) ) | |
print(f'Decrypted in {time.time()-start:.2f} seconds') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment