Skip to content

Instantly share code, notes, and snippets.

@JeremyRubin
Created October 30, 2021 21:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JeremyRubin/8bacafdb997c588ef2a203ffeb726b83 to your computer and use it in GitHub Desktop.
Save JeremyRubin/8bacafdb997c588ef2a203ffeb726b83 to your computer and use it in GitHub Desktop.
PowSwap Code
#!/usr/bin/python3
# Originally shared ~ 7/1/19 with Dan Elitzer
# Copyright 2019 Jeremy Rubin
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
import bitcointx
import hashlib
import os
from bitcointx.wallet import CBitcoinSecret
from bitcointx.core.script import CScript, OP_CHECKMULTISIG, CScriptOp, OP_2, OP_CHECKSIGVERIFY, OP_1, OP_CHECKMULTISIGVERIFY, OP_CHECKSIG
import time
CHAIN_SIZE = 100
# SecretChain is a hash chain H(H(H(...H(x))))
# Each H(...) also has an associated pk = H(...)xG
class SecretChain:
def __init__(self, seed = None):
self.seed = os.urandom(32) if seed is None else seed
self.secrets = [0]*int(CHAIN_SIZE)
self.secrets[0] = hashlib.sha256(self.seed).digest()
for i in range(1,int(CHAIN_SIZE)):
self.secrets[i] = hashlib.sha256(self.secrets[i-1]).digest()
self.pks = list(map(lambda x: CBitcoinSecret.from_secret_bytes(x).pub, self.secrets))
def pop(self):
return self.secrets.pop() if len(self.secrets) else None
# SecretTraversal lets us receive new preimages from Secret Chain
# It is initialized with all the Pks from the entire chain
# Step by step, we check that the latest secret reveals
# the next PK, and that the hash of the latest secret is the prior secret.
class SecretTraversal:
def __init__(self, pks):
self.last = None
self.pks = pks
self.next = pks.pop()
self.count = 0
def update(self, secret):
if secret is None:
return False
pk = CBitcoinSecret.from_secret_bytes(secret).pub
secret_hash = hashlib.sha256(secret).digest()
if self.last is None:
if self.next != pk:
print('nope1')
return False
else:
if self.last != secret_hash:
print('no')
return False
if self.next != pk:
print('nope')
return False
print("Learned Secret SHA256(%s) == %s:"%(secret, secret_hash))
print("%s x G == %s"%(secret, self.next))
self.last = secret
self.next = self.pks.pop() if self.pks else None
print("Next Secret: ", self.next, "\n")
self.count += 1
return True
def get_next(self):
return self.pks[-1]
def seek(self, pk):
sec = self.last
for _ in range(self.count):
if CBitcoinSecret.from_secret_bytes(sec).pub == pk:
return sec
sec = hashlib.sha256(sec).digest()
return None
class Player:
def __init__(self):
self.sec = CBitcoinSecret.from_secret_bytes(os.urandom(32))
self.my_revoke_chain = SecretChain()
self.counterparty_revokes = None
def scriptKey(self):
return CScript(CScriptOp.encode_op_pushdata(self.sec.pub))
def get_my_public_revokes(self):
return self.my_revoke_chain.pks.copy()
def set_counterparty_revokes(self, pks):
self.counterparty_revokes = SecretTraversal(pks)
def set_channel_state(self, cs):
self.cs = cs
def get_my_address(self):
return self.scriptKey()
def get_my_key(self):
return self.sec.pub
def get_counterparty_address(self):
return b"DUMMY ADDRESS"
def get_counterparty_revoke(self):
return self.counterparty_revokes.get_next()
def get_counterparty_key(self):
return b"DUMMY KEY"
def revoke(self):
return self.my_revoke_chain.pop()
def receive_revoke(self, secret):
return self.counterparty_revokes.update(secret)
def set_latest_resolution(self, latest_resolution):
self.latest_resolution = latest_resolution
import copy
class OpenChannel:
def __init__(self, player_one, player_two, amt):
self.player_one = player_one
self.player_two = player_two
self.input = self.agree_on_input()
self.channel_is_open = False
# mirror channel states
cs1 = ChannelState(self.input, amt, amt)
cs2 = ChannelState(self.input, amt, amt)
# load each players revoke chains
player_one.set_counterparty_revokes(player_two.get_my_public_revokes())
player_two.set_counterparty_revokes(player_one.get_my_public_revokes())
# load channel states
player_one.set_channel_state(cs1)
player_two.set_channel_state(cs2)
def agree_on_input(self):
# We don't have an actual input...
return COutPoint(b"0"*32, 0)
def sign_and_broadcast_open_channel(self):
self.channel_is_open = True
def run_channel(self):
# First two states are a null update
next_state = [(0,0), (0, 0), (-1*COIN, -2*COIN), (1*COIN, 2*COIN)]
count = 0
while True:
# generate the next state
cs1 = self.player_one.cs.update(next_state.pop(0))
cs2 = self.player_two.cs.update(next_state.pop(0))
# Flip between these two payment types
next_state += next_state[::-1]
# create the transactions for the next state
txns_for_p2 = cs1.get_txns(
self.player_one.get_my_address(),
self.player_one.get_my_key(),
self.player_one.get_counterparty_address(),
self.player_one.get_counterparty_key(),
self.player_one.get_counterparty_revoke()
)
txns_for_p1 = cs2.get_txns(
self.player_two.get_my_address(),
self.player_two.get_my_key(),
self.player_two.get_counterparty_address(),
self.player_two.get_counterparty_key(),
self.player_two.get_counterparty_revoke()
)
# TODO: Sign the TXNs
# We should half-sign the start transaction
# fully sign resolutin transactions
# not sign the revoke transaction
# exchange the transactions & set them as latest resolution
self.player_one.set_latest_resolution(txns_for_p1)
self.player_two.set_latest_resolution(txns_for_p2)
# Once exchanged, ratchet up to eliminate the prior state
# Order doesn't matter
assert(self.player_two.receive_revoke(self.player_one.revoke()) and
self.player_one.receive_revoke(self.player_two.revoke()))
# set the state to the update
self.player_one.set_channel_state(cs1)
self.player_two.set_channel_state(cs2)
# If this is our first pass, we still have yet to open the channel, so do that sub-protocol
if self.channel_is_open:
self.sign_and_broadcast_open_channel()
print("Player One Estimated Value", self.player_one.cs.estimate_my_value())
print("Player Two Estimated Value", self.player_two.cs.estimate_my_value())
count += 1
def get_output_script(self):
return CScript([self.player_one.get_my_key(), OP_CHECKSIGVERIFY, self.player_two.get_my_key(), OP_CHECKSIG])
def revokable_script(signer_one, signer_two, revoke_key):
# Either signer_one + revoke_key; OR
# signer_one + signer_two; BUT NOT
# signer_two + revoke_key
return CScript( [ OP_1, signer_two, revoke_key, OP_2, OP_CHECKMULTISIGVERIFY, signer_one, OP_CHECKSIG])
def segwit(script):
return script
class ChannelState:
def __init__(self, _input, my_coins, ur_coins, revoke_timeout = 24, contract_duration = 365*24, block_offset = 100):
self.input = _input
self.my_payout_nblocks_first = my_coins
self.ur_payout_nblocks_first = ur_coins
self.my_payout_timeout_first = my_coins
self.ur_payout_timeout_first = ur_coins
self.revoke_timeout = revoke_timeout
self.contract_duration = contract_duration
self.block_offset = block_offset
def update(self, delta, contract_delta=None):
(a, b) = delta
c = copy.copy(self)
c.my_payout_nblocks_first += a
c.ur_payout_nblocks_first -= a
c.my_payout_timeout_first += b
c.ur_payout_timeout_first -= b
return c
def get_txns(self, my_address, my_key, ur_address, ur_key, ur_revoke_key):
return DiffChanUpdate.update_from_me_to_you(
self.input,
self.my_payout_nblocks_first,
self.my_payout_timeout_first,
my_address, my_key,
self.ur_payout_nblocks_first,
self.ur_payout_timeout_first,
ur_address, ur_key, ur_revoke_key,
self.revoke_timeout, self.contract_duration, self.block_offset)
def estimate_my_value(self):
p = self.estimate_p_nblocks_first()
q = 1.0 - p
return p*self.my_payout_nblocks_first + q * self.my_payout_timeout_first
def estimate_p_nblocks_first(self):
return 0.99
class DiffChanUpdate:
# returns three sets of transactions:
# 1 start txn
# 1 revoke
# 1 set of contested resolutions
@staticmethod
def update_from_me_to_you(input_,
my_payout_nblocks_first, my_payout_timeout_first, my_address, my_key,
ur_payout_nblocks_first, ur_payout_timeout_first, ur_address, ur_key, ur_revoke_key,
revoke_timeout = 24, contract_duration = 365*24, block_offset = 100
):
until_block = expected_blocks_hours_from_now(contract_duration) + block_offset
rth = relative_timelock_hours(revoke_timeout)
revokable_address = segwit(revokable_script(my_key, ur_key, ur_revoke_key))
if my_payout_nblocks_first == my_payout_timeout_first:
my_payout = my_payout_nblocks_first
assert(ur_payout_nblocks_first == ur_payout_timeout_first)
ur_payout = ur_payout_nblocks_first
start = TxnBuilder()\
.input(input_)\
.output(my_payout, my_address)\
.output(ur_payout, revokable_address)\
.finish()
finish = TxnBuilder()\
.input(start.outpoint(1), rth)\
.output(ur_payout, ur_address)
revoked = TxnBuilder()\
.input(start.outpoint(1))\
.output(ur_payout, my_address)
return [start, [finish], revoked]
elif my_payout_nblocks_first > my_payout_timeout_first:
my_payout = my_payout_nblocks_first - my_payout_timeout_first
my_immediate_payout = my_payout_timeout_first
start = TxnBuilder()\
.input(input_)\
.output(my_immediate_payout, my_address)\
.output(ur_payout_timeout_first, revokable_address)\
.finish()
finish_timeout = TxnBuilder()\
.absolute_timelock_time(contract_duration)\
.input(start.outpoint(1), rth)\
.output(ur_payout_timeout_first, ur_address)
finish_nblocks = TxnBuilder()\
.absolute_timelock_blocks(until_block)\
.input(start.outpoint(1), rth)\
.output(my_payout, my_address)\
.output(ur_payout_nblocks_first, ur_address)
revoked = TxnBuilder()\
.input(start.outpoint(1))\
.output(ur_payout, my_address)
return [start, [finish_timeout, finish_nblocks], revoked]
elif my_payout_timeout_first > my_payout_nblocks_first:
my_payout = my_payout_timeout_first - my_payout_nblocks_first
my_immediate_payout = my_payout_nblocks_first
start = TxnBuilder()\
.input(input_)\
.output(my_immediate_payout, my_address)\
.output(ur_payout_nblocks_first, revokable_address)\
.finish()
finish_timeout = TxnBuilder()\
.absolute_timelock_time(contract_duration)\
.input(start.outpoint(1))\
.output(my_payout, my_address)\
.output(ur_payout_timeout_first, ur_address)
finish_nblocks = TxnBuilder()\
.absolute_timelock_blocks(until_block)\
.input(start.outpoint(1))\
.output(ur_payout_nblocks_first, ur_address)
revoked = TxnBuilder()\
.input(start.outpoint(1))\
.output(ur_payout, my_address)
return [start, [finish_timeout, finish_nblocks], revoked]
SEQUENCE_DISABLED = 0x80000000
def relative_timelock_hours(h):
b = SEQUENCE_DISABLED
b ^= 1<<31
b |= 1<<31
b |= (h * 60) >> 9
return b
def abs_hours_from_now(h):
return h
def expected_blocks_hours_from_now(h):
return (h/6.0)
from bitcointx.core import b2x, lx, COIN, COutPoint, CMutableTxOut, CMutableTxIn, CMutableTransaction, Hash160
class TxnBuilder:
def __init__(self):
self.outputs = []
self.inputs = []
self.locktime = 0
self.finalized = None
pass
def input(self, input_, seq = SEQUENCE_DISABLED):
self.inputs.append(CMutableTxIn(input_, nSequence=seq))
return self
def output(self, amount, to):
self.outputs.append(CMutableTxOut(amount, to))
return self
def absolute_timelock_time(self, l):
self.nLockTime = l
return self
def absolute_timelock_blocks(self, l):
self.nLockTime = l
return self
def finish(self):
self.finalized = CMutableTransaction(self.inputs, self.outputs, self.locktime, 2)
return self
def outpoint(self, n):
assert(n < len(self.outputs))
return COutPoint(self.finalized.GetTxid(), n)
Alice = Player()
Bob = Player()
oc = OpenChannel(Alice, Bob, 100*COIN)
oc.run_channel()
print(CScript(list(oc.get_output_script())))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment