Skip to content

Instantly share code, notes, and snippets.

@sorce
Last active November 5, 2019 11:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sorce/396fe44739392756abac8c2e213bb11e to your computer and use it in GitHub Desktop.
Save sorce/396fe44739392756abac8c2e213bb11e to your computer and use it in GitHub Desktop.
Create, sign and send a counterparty transaction from an address controlled by a trezor
#!/usr/bin/env python
'''
if you find this script helpful and would like to give some crypto to
the cause, we would be very happy to receive it :)
donate bitcoin / counterparty: 3L19gTtMMJHpkAjYVduUZETdduwAfM7NGR
'''
import sys
import json
import requests
from requests.auth import HTTPBasicAuth
import struct
import bitcoin.base58
from bitcoin.core import b2x, b2lx, x, lx, str_money_value
from bitcoin.core.serialize import Hash, Hash160
from Crypto.Cipher import ARC4
from decimal import Decimal
import logging
from logging.handlers import RotatingFileHandler
worklog = logging.getLogger('sign_trezor_tx')
#shandler = logging.handlers.RotatingFileHandler('sign_trezor_tx.log', maxBytes=1024*1024*10, backupCount=1000)
shandler = logging.StreamHandler(sys.stdout)
worklog.setLevel(logging.DEBUG)
worklog.addHandler(shandler)
shandler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s', '%Y/%m/%d %H:%M:%S'))
try:
from trezorlib.client import TrezorClient, TrezorClientVerbose
from trezorlib.transport import get_transport, enumerate_devices
from trezorlib.tx_api import TxApiInsight
from trezorlib import messages as proto
from trezorlib.ckd_public import PRIME_DERIVATION_FLAG
# patch trezorlib TxApiInsight to send txhashes across as plaintext string (instead of bytes) when using python3
class pt_TxApiInsight(TxApiInsight):
def get_tx(self, txhash):
if type(txhash) is bytes:
txhash = txhash.decode('utf-8')
return super(pt_TxApiInsight, self).get_tx(txhash)
TxApiBitcoin = pt_TxApiInsight(network='insight_bitcoin', url='https://insight.bitpay.com/api/')
except ImportError:
worklog.exception('Error importing trezorlib, install it with: pip install trezor')
sys.exit(0)
try:
input = raw_input
except NameError:
pass
NETWORK = 'mainnet'
from bitcoin import SelectParams
def set_network(n):
global NETWORK
NETWORK = n
SelectParams(n)
def to_satoshis(x):
return int((Decimal(x) * Decimal(1e8)).quantize(Decimal('1.00000000')))
def fetch_utxos_insight(address):
if NETWORK == 'mainnet':
url = 'https://insight.bitpay.com/api/addr/{}/utxo'.format(address)
else:
url = 'https://test-insight.bitpay.com/api/addr/{}/utxo'.format(address)
try:
resp = requests.get(url, verify=False, timeout=5)
return json.loads(resp.text)
except Exception as e:
worklog.exception('Exception fetching UTXOs from /addr/{}/utxo endpoint in insight'.format(address))
return None
def fetch_utxos_smartbit(address):
if NETWORK == 'mainnet':
url = 'https://api.smartbit.com.au/v1/blockchain/address/{}/unspent'.format(address)
else:
url = 'https://testnet-api.smartbit.com.au/v1/blockchain/address/{}/unspent'.format(address)
try:
resp = requests.get(url, verify=False, timeout=5)
j = json.loads(resp.text)
return j['unspent']
except Exception as e:
worklog.exception('Exception fetching UTXOs for address {}'.format(address))
def broadcast_tx_smartbit(stx):
if NETWORK == 'mainnet':
url = 'https://api.smartbit.com.au/v1/blockchain/pushtx'
else:
url = 'https://testnet-api.smartbit.com.au/v1/blockchain/pushtx'
try:
resp = requests.post(url, data=json.dumps({'hex': stx}), verify=False, timeout=5)
try:
return json.loads(resp.text)
except json.decoder.JSONDecodeError:
worklog.error('Error broadcasting transaction to network: {}'.format(resp.text))
return None
except Exception as e:
worklog.exception('Failed to broadcast transaction to network')
return None
b26_digits = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
def b26encode(asset):
if asset == 'BTC': return 0
if asset == 'XCP': return 1
if asset[0] == 'A':
return int(asset_name[1 : ])
else:
n = 0
for c in asset:
n *= 26
if c not in b26_digits:
return None
n += b26_digits.index(c)
return n
def address_pack(address):
return bitcoin.base58.decode(address)[ : -4]
def encode_counterparty_op_return_send(prevout_hash, asset, quantity, destination):
asset_id = b26encode(asset)
destination_short_bytes = address_pack(destination)
prevout_hash = x(prevout_hash)
data = b'CNTRPRTY'
data += struct.pack('B', 2) # enhanced send
data += struct.pack('>QQ21s', asset_id, quantity, destination_short_bytes)
data += b''
obj1 = ARC4.new(prevout_hash)
encrypted = obj1.encrypt(data)
worklog.debug('Created counterparty OP_RETURN hex: {}'.format(b2x(encrypted)))
return encrypted
def create_tx(source, source_n, destination, change_n, asset, qty, divisible, fee, opt_in_rbf=False):
inputs = []
outputs = []
tx_sequence = 0xFFFFFFFF if not opt_in_rbf else 0xFFFFFFFF-2
# get an UTXO for the source address
utxos = fetch_utxos_smartbit(source)
if not utxos:
worklog.error('Unable to find any UTXO for {}'.format(source))
raise Exception('Could not find any UTXO for {}'.format(source))
worklog.debug('Found {} UTXO on {}'.format(len(utxos), source))
source_utxo = list(reversed(sorted(utxos, key=lambda x: x['value_int'])))[0]
worklog.debug('Using UTXO with most value ({} BTC)'.format(str_money_value(source_utxo['value_int'])))
if source_utxo['value_int'] < fee:
worklog.error('Unable to find any UTXO for {} with a value >= to fee ({})'.format(source, fee))
return None
counterparty_op_return = encode_counterparty_op_return_send(
source_utxo['txid'], asset,
int(qty) if not divisible else to_satoshis(qty),
destination
)
def input_script_type(address_n):
t = proto.InputScriptType.SPENDADDRESS
if address_n is None:
pass
elif address_n[0] == (49 | PRIME_DERIVATION_FLAG):
t = proto.InputScriptType.SPENDP2SHWITNESS
return t
def output_script_type(address_n):
t = proto.OutputScriptType.PAYTOADDRESS
if address_n is None:
pass
elif address_n[0] == (49 | PRIME_DERIVATION_FLAG):
t = proto.OutputScriptType.PAYTOP2SHWITNESS
return t
inputs.append(proto.TxInputType(
prev_hash = x(source_utxo['txid']),
prev_index = int(source_utxo['n']),
amount = source_utxo['value_int'],
address_n = source_n,
sequence = tx_sequence,
script_type = input_script_type(source_n)
))
# add the counterparty send output
outputs.append(proto.TxOutputType(
amount = 0,
script_type = proto.OutputScriptType.PAYTOOPRETURN,
op_return_data = counterparty_op_return
))
# change output, if necessary
change_value = source_utxo['value_int'] - fee
if change_value > 543:
outputs.append(proto.TxOutputType(
amount = change_value,
script_type = output_script_type(change_n),
address_n = change_n
))
return inputs, outputs
def get_trezor_client(verbose):
devices = enumerate_devices()
if not devices:
worklog.info('Cannot find any trezor device connected')
return None
# use the first trezor found
devpath = devices[0].get_path()
try:
transport = get_transport(devpath, prefix_search=True)
except Exception as e:
worklog.exception('Could not get trezor device, path={}'.format(devpath))
return None
if verbose:
return TrezorClientVerbose(transport=transport)
else:
return TrezorClient(transport=transport)
def parser():
import argparse
def str2bool(v):
# https://stackoverflow.com/a/43357954
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected: yes/no/true/false')
parser = argparse.ArgumentParser(
description = 'Create, sign and broadcast a counterparty transaction from a trezor device'
)
parser.add_argument('-v', '--verbose', help='verbose output', action='store_true', default=False)
parser.add_argument('-n', '--network', help='network to use', choices=['mainnet', 'testnet'], default='mainnet')
parser.add_argument('-t', '--type', help='script type used to derive addresses', choices=['address', 'segwit', 'p2shsegwit'], default='address')
parser.add_argument('-r', '--replaceable', help='opt transaction in to replace-by-fee', action='store_true', default=False)
parser.add_argument(
'-a', '--address-path',
help="node path of address containing asset to send out (e.g. 44'/0'/0'/0/4)",
type=str, required=True
)
parser.add_argument('-x', '--asset', help='asset you wish to send', type=str, required=True)
parser.add_argument('-q', '--quantity', help='amount of asset you wish to send', type=Decimal, required=True)
parser.add_argument(
'-d', '--divisible',
help='whether the asset is divisible or not, used for calculating the proper amount of asset to send as divisible asset quantities must be defined in satoshis',
type=str2bool, required=True
)
parser.add_argument('-o', '--destination', help='destination adddress where you want to send asset', type=str, required=True)
parser.add_argument(
'-c', '--change-path',
help="node path of address that you wish to use for change (e.g. 44'/0'/0'/0/5)",
type=str, required=True
)
parser.add_argument('-f', '--fee', help='static fee (in satoshis) to use for the transaction', type=int, required=True)
return parser
if __name__ == '__main__':
args = parser().parse_args()
try:
set_network(args.network.lower())
coin = 'Bitcoin'
if args.network.lower() == 'testnet':
coin = 'Testnet'
tzcl = get_trezor_client(args.verbose)
if not tzcl:
worklog.error('Failed to get trezor device, aborting')
sys.exit(1)
tzcl.set_tx_api(TxApiBitcoin)
def name_to_type(name):
if name == 'segwit':
return proto.InputScriptType.SPENDWITNESS
elif name == 'p2shsegwit':
return proto.InputScriptType.SPENDP2SHWITNESS
else:
return proto.InputScriptType.SPENDADDRESS
script_type = name_to_type(args.type)
source_bip32_path = tzcl.expand_path(args.address_path)
address = tzcl.get_address(coin, source_bip32_path, False, script_type=script_type)
change_bip32_path = tzcl.expand_path(args.change_path)
change_address = tzcl.get_address(coin, change_bip32_path, False, script_type=script_type)
worklog.info('attempting to send {} {} from {} to {} with change address {} and fee of {} BTC'.format(
args.quantity, args.asset, address, args.destination, change_address, str_money_value(args.fee)
))
inputs, outputs = create_tx(
address, source_bip32_path, args.destination,
change_bip32_path, args.asset.upper(), args.quantity,
args.divisible, args.fee, opt_in_rbf=args.replaceable
)
worklog.debug('Signing transaction, check your trezor device for input ...')
sigs, stx = tzcl.sign_tx(coin, inputs, outputs)
worklog.info('Signed transaction:\n{}'.format(b2x(stx)))
if input('\nDo you want broadcast this transaction?\n-> ').lower() in ['y', 'yes', 'yeah', 'sure']:
tx = broadcast_tx_smartbit(b2x(stx))
if not tx:
worklog.info('Failed to broadcast transaction, if the only errors were network related you can try broadcasting the signed transaction again yourself')
else:
worklog.info('Broadcast transaction to network: {}'.format(tx))
else:
worklog.info('Will not broadcast transaction')
except Exception as e:
worklog.exception('Error')
try:
tzcl.close()
except:
pass
sys.exit(1)
else:
tzcl.close()
worklog.info('Goodbye')
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment