Last active
November 5, 2019 11:03
-
-
Save sorce/396fe44739392756abac8c2e213bb11e to your computer and use it in GitHub Desktop.
Create, sign and send a counterparty transaction from an address controlled by a trezor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
if you find this script helpful and would like to give some crypto to | |
the cause, we would be very happy to receive it :) | |
donate bitcoin / counterparty: 3L19gTtMMJHpkAjYVduUZETdduwAfM7NGR | |
''' | |
import sys | |
import json | |
import requests | |
from requests.auth import HTTPBasicAuth | |
import struct | |
import bitcoin.base58 | |
from bitcoin.core import b2x, b2lx, x, lx, str_money_value | |
from bitcoin.core.serialize import Hash, Hash160 | |
from Crypto.Cipher import ARC4 | |
from decimal import Decimal | |
import logging | |
from logging.handlers import RotatingFileHandler | |
worklog = logging.getLogger('sign_trezor_tx') | |
#shandler = logging.handlers.RotatingFileHandler('sign_trezor_tx.log', maxBytes=1024*1024*10, backupCount=1000) | |
shandler = logging.StreamHandler(sys.stdout) | |
worklog.setLevel(logging.DEBUG) | |
worklog.addHandler(shandler) | |
shandler.setFormatter(logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s', '%Y/%m/%d %H:%M:%S')) | |
try: | |
from trezorlib.client import TrezorClient, TrezorClientVerbose | |
from trezorlib.transport import get_transport, enumerate_devices | |
from trezorlib.tx_api import TxApiInsight | |
from trezorlib import messages as proto | |
from trezorlib.ckd_public import PRIME_DERIVATION_FLAG | |
# patch trezorlib TxApiInsight to send txhashes across as plaintext string (instead of bytes) when using python3 | |
class pt_TxApiInsight(TxApiInsight): | |
def get_tx(self, txhash): | |
if type(txhash) is bytes: | |
txhash = txhash.decode('utf-8') | |
return super(pt_TxApiInsight, self).get_tx(txhash) | |
TxApiBitcoin = pt_TxApiInsight(network='insight_bitcoin', url='https://insight.bitpay.com/api/') | |
except ImportError: | |
worklog.exception('Error importing trezorlib, install it with: pip install trezor') | |
sys.exit(0) | |
try: | |
input = raw_input | |
except NameError: | |
pass | |
NETWORK = 'mainnet' | |
from bitcoin import SelectParams | |
def set_network(n): | |
global NETWORK | |
NETWORK = n | |
SelectParams(n) | |
def to_satoshis(x): | |
return int((Decimal(x) * Decimal(1e8)).quantize(Decimal('1.00000000'))) | |
def fetch_utxos_insight(address): | |
if NETWORK == 'mainnet': | |
url = 'https://insight.bitpay.com/api/addr/{}/utxo'.format(address) | |
else: | |
url = 'https://test-insight.bitpay.com/api/addr/{}/utxo'.format(address) | |
try: | |
resp = requests.get(url, verify=False, timeout=5) | |
return json.loads(resp.text) | |
except Exception as e: | |
worklog.exception('Exception fetching UTXOs from /addr/{}/utxo endpoint in insight'.format(address)) | |
return None | |
def fetch_utxos_smartbit(address): | |
if NETWORK == 'mainnet': | |
url = 'https://api.smartbit.com.au/v1/blockchain/address/{}/unspent'.format(address) | |
else: | |
url = 'https://testnet-api.smartbit.com.au/v1/blockchain/address/{}/unspent'.format(address) | |
try: | |
resp = requests.get(url, verify=False, timeout=5) | |
j = json.loads(resp.text) | |
return j['unspent'] | |
except Exception as e: | |
worklog.exception('Exception fetching UTXOs for address {}'.format(address)) | |
def broadcast_tx_smartbit(stx): | |
if NETWORK == 'mainnet': | |
url = 'https://api.smartbit.com.au/v1/blockchain/pushtx' | |
else: | |
url = 'https://testnet-api.smartbit.com.au/v1/blockchain/pushtx' | |
try: | |
resp = requests.post(url, data=json.dumps({'hex': stx}), verify=False, timeout=5) | |
try: | |
return json.loads(resp.text) | |
except json.decoder.JSONDecodeError: | |
worklog.error('Error broadcasting transaction to network: {}'.format(resp.text)) | |
return None | |
except Exception as e: | |
worklog.exception('Failed to broadcast transaction to network') | |
return None | |
b26_digits = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' | |
def b26encode(asset): | |
if asset == 'BTC': return 0 | |
if asset == 'XCP': return 1 | |
if asset[0] == 'A': | |
return int(asset_name[1 : ]) | |
else: | |
n = 0 | |
for c in asset: | |
n *= 26 | |
if c not in b26_digits: | |
return None | |
n += b26_digits.index(c) | |
return n | |
def address_pack(address): | |
return bitcoin.base58.decode(address)[ : -4] | |
def encode_counterparty_op_return_send(prevout_hash, asset, quantity, destination): | |
asset_id = b26encode(asset) | |
destination_short_bytes = address_pack(destination) | |
prevout_hash = x(prevout_hash) | |
data = b'CNTRPRTY' | |
data += struct.pack('B', 2) # enhanced send | |
data += struct.pack('>QQ21s', asset_id, quantity, destination_short_bytes) | |
data += b'' | |
obj1 = ARC4.new(prevout_hash) | |
encrypted = obj1.encrypt(data) | |
worklog.debug('Created counterparty OP_RETURN hex: {}'.format(b2x(encrypted))) | |
return encrypted | |
def create_tx(source, source_n, destination, change_n, asset, qty, divisible, fee, opt_in_rbf=False): | |
inputs = [] | |
outputs = [] | |
tx_sequence = 0xFFFFFFFF if not opt_in_rbf else 0xFFFFFFFF-2 | |
# get an UTXO for the source address | |
utxos = fetch_utxos_smartbit(source) | |
if not utxos: | |
worklog.error('Unable to find any UTXO for {}'.format(source)) | |
raise Exception('Could not find any UTXO for {}'.format(source)) | |
worklog.debug('Found {} UTXO on {}'.format(len(utxos), source)) | |
source_utxo = list(reversed(sorted(utxos, key=lambda x: x['value_int'])))[0] | |
worklog.debug('Using UTXO with most value ({} BTC)'.format(str_money_value(source_utxo['value_int']))) | |
if source_utxo['value_int'] < fee: | |
worklog.error('Unable to find any UTXO for {} with a value >= to fee ({})'.format(source, fee)) | |
return None | |
counterparty_op_return = encode_counterparty_op_return_send( | |
source_utxo['txid'], asset, | |
int(qty) if not divisible else to_satoshis(qty), | |
destination | |
) | |
def input_script_type(address_n): | |
t = proto.InputScriptType.SPENDADDRESS | |
if address_n is None: | |
pass | |
elif address_n[0] == (49 | PRIME_DERIVATION_FLAG): | |
t = proto.InputScriptType.SPENDP2SHWITNESS | |
return t | |
def output_script_type(address_n): | |
t = proto.OutputScriptType.PAYTOADDRESS | |
if address_n is None: | |
pass | |
elif address_n[0] == (49 | PRIME_DERIVATION_FLAG): | |
t = proto.OutputScriptType.PAYTOP2SHWITNESS | |
return t | |
inputs.append(proto.TxInputType( | |
prev_hash = x(source_utxo['txid']), | |
prev_index = int(source_utxo['n']), | |
amount = source_utxo['value_int'], | |
address_n = source_n, | |
sequence = tx_sequence, | |
script_type = input_script_type(source_n) | |
)) | |
# add the counterparty send output | |
outputs.append(proto.TxOutputType( | |
amount = 0, | |
script_type = proto.OutputScriptType.PAYTOOPRETURN, | |
op_return_data = counterparty_op_return | |
)) | |
# change output, if necessary | |
change_value = source_utxo['value_int'] - fee | |
if change_value > 543: | |
outputs.append(proto.TxOutputType( | |
amount = change_value, | |
script_type = output_script_type(change_n), | |
address_n = change_n | |
)) | |
return inputs, outputs | |
def get_trezor_client(verbose): | |
devices = enumerate_devices() | |
if not devices: | |
worklog.info('Cannot find any trezor device connected') | |
return None | |
# use the first trezor found | |
devpath = devices[0].get_path() | |
try: | |
transport = get_transport(devpath, prefix_search=True) | |
except Exception as e: | |
worklog.exception('Could not get trezor device, path={}'.format(devpath)) | |
return None | |
if verbose: | |
return TrezorClientVerbose(transport=transport) | |
else: | |
return TrezorClient(transport=transport) | |
def parser(): | |
import argparse | |
def str2bool(v): | |
# https://stackoverflow.com/a/43357954 | |
if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
return True | |
elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
return False | |
else: | |
raise argparse.ArgumentTypeError('Boolean value expected: yes/no/true/false') | |
parser = argparse.ArgumentParser( | |
description = 'Create, sign and broadcast a counterparty transaction from a trezor device' | |
) | |
parser.add_argument('-v', '--verbose', help='verbose output', action='store_true', default=False) | |
parser.add_argument('-n', '--network', help='network to use', choices=['mainnet', 'testnet'], default='mainnet') | |
parser.add_argument('-t', '--type', help='script type used to derive addresses', choices=['address', 'segwit', 'p2shsegwit'], default='address') | |
parser.add_argument('-r', '--replaceable', help='opt transaction in to replace-by-fee', action='store_true', default=False) | |
parser.add_argument( | |
'-a', '--address-path', | |
help="node path of address containing asset to send out (e.g. 44'/0'/0'/0/4)", | |
type=str, required=True | |
) | |
parser.add_argument('-x', '--asset', help='asset you wish to send', type=str, required=True) | |
parser.add_argument('-q', '--quantity', help='amount of asset you wish to send', type=Decimal, required=True) | |
parser.add_argument( | |
'-d', '--divisible', | |
help='whether the asset is divisible or not, used for calculating the proper amount of asset to send as divisible asset quantities must be defined in satoshis', | |
type=str2bool, required=True | |
) | |
parser.add_argument('-o', '--destination', help='destination adddress where you want to send asset', type=str, required=True) | |
parser.add_argument( | |
'-c', '--change-path', | |
help="node path of address that you wish to use for change (e.g. 44'/0'/0'/0/5)", | |
type=str, required=True | |
) | |
parser.add_argument('-f', '--fee', help='static fee (in satoshis) to use for the transaction', type=int, required=True) | |
return parser | |
if __name__ == '__main__': | |
args = parser().parse_args() | |
try: | |
set_network(args.network.lower()) | |
coin = 'Bitcoin' | |
if args.network.lower() == 'testnet': | |
coin = 'Testnet' | |
tzcl = get_trezor_client(args.verbose) | |
if not tzcl: | |
worklog.error('Failed to get trezor device, aborting') | |
sys.exit(1) | |
tzcl.set_tx_api(TxApiBitcoin) | |
def name_to_type(name): | |
if name == 'segwit': | |
return proto.InputScriptType.SPENDWITNESS | |
elif name == 'p2shsegwit': | |
return proto.InputScriptType.SPENDP2SHWITNESS | |
else: | |
return proto.InputScriptType.SPENDADDRESS | |
script_type = name_to_type(args.type) | |
source_bip32_path = tzcl.expand_path(args.address_path) | |
address = tzcl.get_address(coin, source_bip32_path, False, script_type=script_type) | |
change_bip32_path = tzcl.expand_path(args.change_path) | |
change_address = tzcl.get_address(coin, change_bip32_path, False, script_type=script_type) | |
worklog.info('attempting to send {} {} from {} to {} with change address {} and fee of {} BTC'.format( | |
args.quantity, args.asset, address, args.destination, change_address, str_money_value(args.fee) | |
)) | |
inputs, outputs = create_tx( | |
address, source_bip32_path, args.destination, | |
change_bip32_path, args.asset.upper(), args.quantity, | |
args.divisible, args.fee, opt_in_rbf=args.replaceable | |
) | |
worklog.debug('Signing transaction, check your trezor device for input ...') | |
sigs, stx = tzcl.sign_tx(coin, inputs, outputs) | |
worklog.info('Signed transaction:\n{}'.format(b2x(stx))) | |
if input('\nDo you want broadcast this transaction?\n-> ').lower() in ['y', 'yes', 'yeah', 'sure']: | |
tx = broadcast_tx_smartbit(b2x(stx)) | |
if not tx: | |
worklog.info('Failed to broadcast transaction, if the only errors were network related you can try broadcasting the signed transaction again yourself') | |
else: | |
worklog.info('Broadcast transaction to network: {}'.format(tx)) | |
else: | |
worklog.info('Will not broadcast transaction') | |
except Exception as e: | |
worklog.exception('Error') | |
try: | |
tzcl.close() | |
except: | |
pass | |
sys.exit(1) | |
else: | |
tzcl.close() | |
worklog.info('Goodbye') | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment