Last active
January 31, 2020 02:55
-
-
Save achow101/51751f10f394ec1a5fb1cd77a3e30181 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
import argparse | |
import binascii | |
import hashlib | |
import io | |
import struct | |
# RPC connection stuff, from test/functional/test_framework/authproxy.py | |
# Copyright (c) 2011 Jeff Garzik | |
# | |
# Previous copyright, from python-jsonrpc/jsonrpc/proxy.py: | |
# | |
# Copyright (c) 2007 Jan-Klaas Kollhof | |
# | |
# This file is part of jsonrpc. | |
# | |
# jsonrpc is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU Lesser General Public License as published by | |
# the Free Software Foundation; either version 2.1 of the License, or | |
# (at your option) any later version. | |
# | |
# This software is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU Lesser General Public License for more details. | |
# | |
# You should have received a copy of the GNU Lesser General Public License | |
# along with this software; if not, write to the Free Software | |
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA | |
"""HTTP proxy for opening RPC connection to bitcoind. | |
AuthServiceProxy has the following improvements over python-jsonrpc's | |
ServiceProxy class: | |
- HTTP connections persist for the life of the AuthServiceProxy object | |
(if server supports HTTP/1.1) | |
- sends protocol 'version', per JSON-RPC 1.1 | |
- sends proper, incrementing 'id' | |
- sends Basic HTTP authentication headers | |
- parses all JSON numbers that look like floats as Decimal | |
- uses standard Python json lib | |
""" | |
import base64 | |
import decimal | |
from http import HTTPStatus | |
import http.client | |
import json | |
import logging | |
import os | |
import socket | |
import time | |
import urllib.parse | |
HTTP_TIMEOUT = 30 | |
USER_AGENT = "AuthServiceProxy/0.1" | |
log = logging.getLogger("BitcoinRPC") | |
class JSONRPCException(Exception): | |
def __init__(self, rpc_error, http_status=None): | |
try: | |
errmsg = '%(message)s (%(code)i)' % rpc_error | |
except (KeyError, TypeError): | |
errmsg = '' | |
super().__init__(errmsg) | |
self.error = rpc_error | |
self.http_status = http_status | |
def EncodeDecimal(o): | |
if isinstance(o, decimal.Decimal): | |
return str(o) | |
raise TypeError(repr(o) + " is not JSON serializable") | |
class AuthServiceProxy(): | |
__id_count = 0 | |
# ensure_ascii: escape unicode as \uXXXX, passed to json.dumps | |
def __init__(self, service_url, service_name=None, timeout=HTTP_TIMEOUT, connection=None, ensure_ascii=True): | |
self.__service_url = service_url | |
self._service_name = service_name | |
self.ensure_ascii = ensure_ascii # can be toggled on the fly by tests | |
self.__url = urllib.parse.urlparse(service_url) | |
user = None if self.__url.username is None else self.__url.username.encode('utf8') | |
passwd = None if self.__url.password is None else self.__url.password.encode('utf8') | |
authpair = user + b':' + passwd | |
self.__auth_header = b'Basic ' + base64.b64encode(authpair) | |
self.timeout = timeout | |
self._set_conn(connection) | |
def __getattr__(self, name): | |
if name.startswith('__') and name.endswith('__'): | |
# Python internal stuff | |
raise AttributeError | |
if self._service_name is not None: | |
name = "%s.%s" % (self._service_name, name) | |
return AuthServiceProxy(self.__service_url, name, connection=self.__conn) | |
def _request(self, method, path, postdata): | |
''' | |
Do a HTTP request, with retry if we get disconnected (e.g. due to a timeout). | |
This is a workaround for https://bugs.python.org/issue3566 which is fixed in Python 3.5. | |
''' | |
headers = {'Host': self.__url.hostname, | |
'User-Agent': USER_AGENT, | |
'Authorization': self.__auth_header, | |
'Content-type': 'application/json'} | |
if os.name == 'nt': | |
# Windows somehow does not like to re-use connections | |
# TODO: Find out why the connection would disconnect occasionally and make it reusable on Windows | |
self._set_conn() | |
try: | |
self.__conn.request(method, path, postdata, headers) | |
return self._get_response() | |
except http.client.BadStatusLine as e: | |
if e.line == "''": # if connection was closed, try again | |
self.__conn.close() | |
self.__conn.request(method, path, postdata, headers) | |
return self._get_response() | |
else: | |
raise | |
except (BrokenPipeError, ConnectionResetError): | |
# Python 3.5+ raises BrokenPipeError instead of BadStatusLine when the connection was reset | |
# ConnectionResetError happens on FreeBSD with Python 3.4 | |
self.__conn.close() | |
self.__conn.request(method, path, postdata, headers) | |
return self._get_response() | |
def get_request(self, *args, **argsn): | |
AuthServiceProxy.__id_count += 1 | |
log.debug("-{}-> {} {}".format( | |
AuthServiceProxy.__id_count, | |
self._service_name, | |
json.dumps(args or argsn, default=EncodeDecimal, ensure_ascii=self.ensure_ascii), | |
)) | |
if args and argsn: | |
raise ValueError('Cannot handle both named and positional arguments') | |
return {'version': '1.1', | |
'method': self._service_name, | |
'params': args or argsn, | |
'id': AuthServiceProxy.__id_count} | |
def __call__(self, *args, **argsn): | |
postdata = json.dumps(self.get_request(*args, **argsn), default=EncodeDecimal, ensure_ascii=self.ensure_ascii) | |
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8')) | |
if response['error'] is not None: | |
raise JSONRPCException(response['error'], status) | |
elif 'result' not in response: | |
raise JSONRPCException({ | |
'code': -343, 'message': 'missing JSON-RPC result'}, status) | |
elif status != HTTPStatus.OK: | |
raise JSONRPCException({ | |
'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status) | |
else: | |
return response['result'] | |
def batch(self, rpc_call_list): | |
postdata = json.dumps(list(rpc_call_list), default=EncodeDecimal, ensure_ascii=self.ensure_ascii) | |
log.debug("--> " + postdata) | |
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8')) | |
if status != HTTPStatus.OK: | |
raise JSONRPCException({ | |
'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status) | |
return response | |
def _get_response(self): | |
req_start_time = time.time() | |
try: | |
http_response = self.__conn.getresponse() | |
except socket.timeout: | |
raise JSONRPCException({ | |
'code': -344, | |
'message': '%r RPC took longer than %f seconds. Consider ' | |
'using larger timeout for calls that take ' | |
'longer to return.' % (self._service_name, | |
self.__conn.timeout)}) | |
if http_response is None: | |
raise JSONRPCException({ | |
'code': -342, 'message': 'missing HTTP response from server'}) | |
content_type = http_response.getheader('Content-Type') | |
if content_type != 'application/json': | |
raise JSONRPCException( | |
{'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server' % (http_response.status, http_response.reason)}, | |
http_response.status) | |
responsedata = http_response.read().decode('utf8') | |
response = json.loads(responsedata, parse_float=decimal.Decimal) | |
elapsed = time.time() - req_start_time | |
if "error" in response and response["error"] is None: | |
log.debug("<-%s- [%.6f] %s" % (response["id"], elapsed, json.dumps(response["result"], default=EncodeDecimal, ensure_ascii=self.ensure_ascii))) | |
else: | |
log.debug("<-- [%.6f] %s" % (elapsed, responsedata)) | |
return response, http_response.status | |
def __truediv__(self, relative_uri): | |
return AuthServiceProxy("{}/{}".format(self.__service_url, relative_uri), self._service_name, connection=self.__conn) | |
def _set_conn(self, connection=None): | |
port = 80 if self.__url.port is None else self.__url.port | |
if connection: | |
self.__conn = connection | |
self.timeout = connection.timeout | |
elif self.__url.scheme == 'https': | |
self.__conn = http.client.HTTPSConnection(self.__url.hostname, port, timeout=self.timeout) | |
else: | |
self.__conn = http.client.HTTPConnection(self.__url.hostname, port, timeout=self.timeout) | |
# from test/functional/test_framework/messages.py | |
# Serialization/deserialization tools | |
def sha256(s): | |
return hashlib.new('sha256', s).digest() | |
def hash256(s): | |
return sha256(sha256(s)) | |
def ser_compact_size(l): | |
r = b"" | |
if l < 253: | |
r = struct.pack("B", l) | |
elif l < 0x10000: | |
r = struct.pack("<BH", 253, l) | |
elif l < 0x100000000: | |
r = struct.pack("<BI", 254, l) | |
else: | |
r = struct.pack("<BQ", 255, l) | |
return r | |
def deser_compact_size(f): | |
nit = struct.unpack("<B", f.read(1))[0] | |
if nit == 253: | |
nit = struct.unpack("<H", f.read(2))[0] | |
elif nit == 254: | |
nit = struct.unpack("<I", f.read(4))[0] | |
elif nit == 255: | |
nit = struct.unpack("<Q", f.read(8))[0] | |
return nit | |
def deser_string(f): | |
nit = deser_compact_size(f) | |
return f.read(nit) | |
def ser_string(s): | |
return ser_compact_size(len(s)) + s | |
def deser_uint256(f): | |
r = 0 | |
for i in range(8): | |
t = struct.unpack("<I", f.read(4))[0] | |
r += t << (i * 32) | |
return r | |
def ser_uint256(u): | |
rs = b"" | |
for i in range(8): | |
rs += struct.pack("<I", u & 0xFFFFFFFF) | |
u >>= 32 | |
return rs | |
def uint256_from_str(s): | |
r = 0 | |
t = struct.unpack("<IIIIIIII", s[:32]) | |
for i in range(8): | |
r += t[i] << (i * 32) | |
return r | |
def uint256_from_compact(c): | |
nbytes = (c >> 24) & 0xFF | |
v = (c & 0xFFFFFF) << (8 * (nbytes - 3)) | |
return v | |
def deser_vector(f, c): | |
nit = deser_compact_size(f) | |
r = [] | |
for i in range(nit): | |
t = c() | |
t.deserialize(f) | |
r.append(t) | |
return r | |
# ser_function_name: Allow for an alternate serialization function on the | |
# entries in the vector (we use this for serializing the vector of transactions | |
# for a witness block). | |
def ser_vector(l, ser_function_name=None): | |
r = ser_compact_size(len(l)) | |
for i in l: | |
if ser_function_name: | |
r += getattr(i, ser_function_name)() | |
else: | |
r += i.serialize() | |
return r | |
def deser_uint256_vector(f): | |
nit = deser_compact_size(f) | |
r = [] | |
for i in range(nit): | |
t = deser_uint256(f) | |
r.append(t) | |
return r | |
def ser_uint256_vector(l): | |
r = ser_compact_size(len(l)) | |
for i in l: | |
r += ser_uint256(i) | |
return r | |
def deser_string_vector(f): | |
nit = deser_compact_size(f) | |
r = [] | |
for i in range(nit): | |
t = deser_string(f) | |
r.append(t) | |
return r | |
def ser_string_vector(l): | |
r = ser_compact_size(len(l)) | |
for sv in l: | |
r += ser_string(sv) | |
return r | |
class COutPoint: | |
__slots__ = ("hash", "n") | |
def __init__(self, hash=0, n=0): | |
self.hash = hash | |
self.n = n | |
def deserialize(self, f): | |
self.hash = deser_uint256(f) | |
self.n = struct.unpack("<I", f.read(4))[0] | |
def serialize(self): | |
r = b"" | |
r += ser_uint256(self.hash) | |
r += struct.pack("<I", self.n) | |
return r | |
def __repr__(self): | |
return "COutPoint(hash=%064x n=%i)" % (self.hash, self.n) | |
class CTxIn: | |
__slots__ = ("nSequence", "prevout", "scriptSig") | |
def __init__(self, outpoint=None, scriptSig=b"", nSequence=0): | |
if outpoint is None: | |
self.prevout = COutPoint() | |
else: | |
self.prevout = outpoint | |
self.scriptSig = scriptSig | |
self.nSequence = nSequence | |
def deserialize(self, f): | |
self.prevout = COutPoint() | |
self.prevout.deserialize(f) | |
self.scriptSig = deser_string(f) | |
self.nSequence = struct.unpack("<I", f.read(4))[0] | |
def serialize(self): | |
r = b"" | |
r += self.prevout.serialize() | |
r += ser_string(self.scriptSig) | |
r += struct.pack("<I", self.nSequence) | |
return r | |
def __repr__(self): | |
return "CTxIn(prevout=%s scriptSig=%s nSequence=%i)" \ | |
% (repr(self.prevout), self.scriptSig.hex(), | |
self.nSequence) | |
class CTxOut: | |
__slots__ = ("nValue", "scriptPubKey") | |
def __init__(self, nValue=0, scriptPubKey=b""): | |
self.nValue = nValue | |
self.scriptPubKey = scriptPubKey | |
def deserialize(self, f): | |
self.nValue = struct.unpack("<q", f.read(8))[0] | |
self.scriptPubKey = deser_string(f) | |
def serialize(self): | |
r = b"" | |
r += struct.pack("<q", self.nValue) | |
r += ser_string(self.scriptPubKey) | |
return r | |
def __repr__(self): | |
return "CTxOut(nValue=%i.%08i scriptPubKey=%s)" \ | |
% (self.nValue // COIN, self.nValue % COIN, | |
self.scriptPubKey.hex()) | |
class CScriptWitness: | |
__slots__ = ("stack",) | |
def __init__(self): | |
# stack is a vector of strings | |
self.stack = [] | |
def __repr__(self): | |
return "CScriptWitness(%s)" % \ | |
(",".join([x.hex() for x in self.stack])) | |
def is_null(self): | |
if self.stack: | |
return False | |
return True | |
class CTxInWitness: | |
__slots__ = ("scriptWitness",) | |
def __init__(self): | |
self.scriptWitness = CScriptWitness() | |
def deserialize(self, f): | |
self.scriptWitness.stack = deser_string_vector(f) | |
def serialize(self): | |
return ser_string_vector(self.scriptWitness.stack) | |
def __repr__(self): | |
return repr(self.scriptWitness) | |
def is_null(self): | |
return self.scriptWitness.is_null() | |
class CTxWitness: | |
__slots__ = ("vtxinwit",) | |
def __init__(self): | |
self.vtxinwit = [] | |
def deserialize(self, f): | |
for i in range(len(self.vtxinwit)): | |
self.vtxinwit[i].deserialize(f) | |
def serialize(self): | |
r = b"" | |
# This is different than the usual vector serialization -- | |
# we omit the length of the vector, which is required to be | |
# the same length as the transaction's vin vector. | |
for x in self.vtxinwit: | |
r += x.serialize() | |
return r | |
def __repr__(self): | |
return "CTxWitness(%s)" % \ | |
(';'.join([repr(x) for x in self.vtxinwit])) | |
def is_null(self): | |
for x in self.vtxinwit: | |
if not x.is_null(): | |
return False | |
return True | |
class CTransaction: | |
__slots__ = ("hash", "nLockTime", "nVersion", "sha256", "vin", "vout", | |
"wit") | |
def __init__(self, tx=None): | |
if tx is None: | |
self.nVersion = 1 | |
self.vin = [] | |
self.vout = [] | |
self.wit = CTxWitness() | |
self.nLockTime = 0 | |
self.sha256 = None | |
self.hash = None | |
else: | |
self.nVersion = tx.nVersion | |
self.vin = copy.deepcopy(tx.vin) | |
self.vout = copy.deepcopy(tx.vout) | |
self.nLockTime = tx.nLockTime | |
self.sha256 = tx.sha256 | |
self.hash = tx.hash | |
self.wit = copy.deepcopy(tx.wit) | |
def deserialize(self, f): | |
self.nVersion = struct.unpack("<i", f.read(4))[0] | |
self.vin = deser_vector(f, CTxIn) | |
flags = 0 | |
if len(self.vin) == 0: | |
flags = struct.unpack("<B", f.read(1))[0] | |
# Not sure why flags can't be zero, but this | |
# matches the implementation in bitcoind | |
if (flags != 0): | |
self.vin = deser_vector(f, CTxIn) | |
self.vout = deser_vector(f, CTxOut) | |
else: | |
self.vout = deser_vector(f, CTxOut) | |
if flags != 0: | |
self.wit.vtxinwit = [CTxInWitness() for i in range(len(self.vin))] | |
self.wit.deserialize(f) | |
else: | |
self.wit = CTxWitness() | |
self.nLockTime = struct.unpack("<I", f.read(4))[0] | |
self.sha256 = None | |
self.hash = None | |
def serialize_without_witness(self): | |
r = b"" | |
r += struct.pack("<i", self.nVersion) | |
r += ser_vector(self.vin) | |
r += ser_vector(self.vout) | |
r += struct.pack("<I", self.nLockTime) | |
return r | |
# Only serialize with witness when explicitly called for | |
def serialize_with_witness(self): | |
flags = 0 | |
if not self.wit.is_null(): | |
flags |= 1 | |
r = b"" | |
r += struct.pack("<i", self.nVersion) | |
if flags: | |
dummy = [] | |
r += ser_vector(dummy) | |
r += struct.pack("<B", flags) | |
r += ser_vector(self.vin) | |
r += ser_vector(self.vout) | |
if flags & 1: | |
if (len(self.wit.vtxinwit) != len(self.vin)): | |
# vtxinwit must have the same length as vin | |
self.wit.vtxinwit = self.wit.vtxinwit[:len(self.vin)] | |
for i in range(len(self.wit.vtxinwit), len(self.vin)): | |
self.wit.vtxinwit.append(CTxInWitness()) | |
r += self.wit.serialize() | |
r += struct.pack("<I", self.nLockTime) | |
return r | |
# Regular serialization is with witness -- must explicitly | |
# call serialize_without_witness to exclude witness data. | |
def serialize(self): | |
return self.serialize_with_witness() | |
# Recalculate the txid (transaction hash without witness) | |
def rehash(self): | |
self.sha256 = None | |
self.calc_sha256() | |
return self.hash | |
# We will only cache the serialization without witness in | |
# self.sha256 and self.hash -- those are expected to be the txid. | |
def calc_sha256(self, with_witness=False): | |
if with_witness: | |
# Don't cache the result, just return it | |
return uint256_from_str(hash256(self.serialize_with_witness())) | |
if self.sha256 is None: | |
self.sha256 = uint256_from_str(hash256(self.serialize_without_witness())) | |
self.hash = binascii.hexlify(hash256(self.serialize_without_witness())[::-1]).decode() | |
def is_valid(self): | |
self.calc_sha256() | |
for tout in self.vout: | |
if tout.nValue < 0 or tout.nValue > 21000000 * COIN: | |
return False | |
return True | |
def __repr__(self): | |
return "CTransaction(nVersion=%i vin=%s vout=%s wit=%s nLockTime=%i)" \ | |
% (self.nVersion, repr(self.vin), repr(self.vout), repr(self.wit), self.nLockTime) | |
class CBlockHeader: | |
__slots__ = ("hash", "hashMerkleRoot", "hashPrevBlock", "nBits", "nNonce", | |
"nTime", "nVersion", "sha256") | |
def __init__(self, header=None): | |
if header is None: | |
self.set_null() | |
else: | |
self.nVersion = header.nVersion | |
self.hashPrevBlock = header.hashPrevBlock | |
self.hashMerkleRoot = header.hashMerkleRoot | |
self.nTime = header.nTime | |
self.nBits = header.nBits | |
self.nNonce = header.nNonce | |
self.sha256 = header.sha256 | |
self.hash = header.hash | |
self.calc_sha256() | |
def set_null(self): | |
self.nVersion = 1 | |
self.hashPrevBlock = 0 | |
self.hashMerkleRoot = 0 | |
self.nTime = 0 | |
self.nBits = 0 | |
self.nNonce = 0 | |
self.sha256 = None | |
self.hash = None | |
def deserialize(self, f): | |
self.nVersion = struct.unpack("<i", f.read(4))[0] | |
self.hashPrevBlock = deser_uint256(f) | |
self.hashMerkleRoot = deser_uint256(f) | |
self.nTime = struct.unpack("<I", f.read(4))[0] | |
self.nBits = struct.unpack("<I", f.read(4))[0] | |
self.nNonce = struct.unpack("<I", f.read(4))[0] | |
self.sha256 = None | |
self.hash = None | |
def serialize(self): | |
r = b"" | |
r += struct.pack("<i", self.nVersion) | |
r += ser_uint256(self.hashPrevBlock) | |
r += ser_uint256(self.hashMerkleRoot) | |
r += struct.pack("<I", self.nTime) | |
r += struct.pack("<I", self.nBits) | |
r += struct.pack("<I", self.nNonce) | |
return r | |
def calc_sha256(self): | |
if self.sha256 is None: | |
r = b"" | |
r += struct.pack("<i", self.nVersion) | |
r += ser_uint256(self.hashPrevBlock) | |
r += ser_uint256(self.hashMerkleRoot) | |
r += struct.pack("<I", self.nTime) | |
r += struct.pack("<I", self.nBits) | |
r += struct.pack("<I", self.nNonce) | |
self.sha256 = uint256_from_str(hash256(r)) | |
self.hash = binascii.hexlify(hash256(r)[::-1]).decode() | |
def rehash(self): | |
self.sha256 = None | |
self.calc_sha256() | |
return self.sha256 | |
def __repr__(self): | |
return "CBlockHeader(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x)" \ | |
% (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, | |
time.ctime(self.nTime), self.nBits, self.nNonce) | |
class CBlock(CBlockHeader): | |
__slots__ = ("vtx",) | |
def __init__(self, header=None): | |
super(CBlock, self).__init__(header) | |
self.vtx = [] | |
def deserialize(self, f): | |
super(CBlock, self).deserialize(f) | |
self.vtx = deser_vector(f, CTransaction) | |
def serialize(self, with_witness=True): | |
r = b"" | |
r += super(CBlock, self).serialize() | |
if with_witness: | |
r += ser_vector(self.vtx, "serialize_with_witness") | |
else: | |
r += ser_vector(self.vtx, "serialize_without_witness") | |
return r | |
# Calculate the merkle root given a vector of transaction hashes | |
@classmethod | |
def get_merkle_root(cls, hashes): | |
while len(hashes) > 1: | |
newhashes = [] | |
for i in range(0, len(hashes), 2): | |
i2 = min(i+1, len(hashes)-1) | |
newhashes.append(hash256(hashes[i] + hashes[i2])) | |
hashes = newhashes | |
return uint256_from_str(hashes[0]) | |
def calc_merkle_root(self): | |
hashes = [] | |
for tx in self.vtx: | |
tx.calc_sha256() | |
hashes.append(ser_uint256(tx.sha256)) | |
return self.get_merkle_root(hashes) | |
def calc_witness_merkle_root(self): | |
# For witness root purposes, the hash of the | |
# coinbase, with witness, is defined to be 0...0 | |
hashes = [ser_uint256(0)] | |
for tx in self.vtx[1:]: | |
# Calculate the hashes with witness data | |
hashes.append(ser_uint256(tx.calc_sha256(True))) | |
return self.get_merkle_root(hashes) | |
def is_valid(self): | |
self.calc_sha256() | |
target = uint256_from_compact(self.nBits) | |
if self.sha256 > target: | |
return False | |
for tx in self.vtx: | |
if not tx.is_valid(): | |
return False | |
if self.calc_merkle_root() != self.hashMerkleRoot: | |
return False | |
return True | |
def solve(self): | |
self.rehash() | |
target = uint256_from_compact(self.nBits) | |
while self.sha256 > target: | |
self.nNonce += 1 | |
self.rehash() | |
def __repr__(self): | |
return "CBlock(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x vtx=%s)" \ | |
% (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, | |
time.ctime(self.nTime), self.nBits, self.nNonce, repr(self.vtx)) | |
def create_coinbase(height): | |
"""Create a coinbase transaction with OP_TRUE output, assuming no miner fees.""" | |
coinbase = CTransaction() | |
coinbase.vin.append(CTxIn(COutPoint(0, 0xffffffff), b'\x03' + height.to_bytes(3, byteorder='little'), 0xffffffff)) | |
coinbaseoutput = CTxOut() | |
coinbaseoutput.nValue = 0 | |
coinbaseoutput.scriptPubKey = b'\x51' | |
coinbase.vout = [coinbaseoutput] | |
coinbase.calc_sha256() | |
return coinbase | |
parser = argparse.ArgumentParser(description='Check whether transactions are consensus valid by submitting a block proposal to bitcoind') | |
parser.add_argument('rpcuser', help='The RPC username') | |
parser.add_argument('rpcpassword', help='The password for the RPC user') | |
parser.add_argument('txs', metavar='tx', nargs='+', help='The transactions to test') | |
args = parser.parse_args() | |
node = AuthServiceProxy("http://%s:%s@127.0.0.1:8332"%(args.rpcuser, args.rpcpassword)) | |
tmpl = node.getblocktemplate({'rules': ['segwit']}) | |
block = CBlock() | |
block.nVersion = tmpl["version"] | |
block.hashPrevBlock = int(tmpl["previousblockhash"], 16) | |
block.nTime = tmpl["curtime"] | |
block.nBits = int(tmpl["bits"], 16) | |
block.nNonce = 0 | |
block.vtx = [create_coinbase(int(tmpl["height"]))] | |
for tx in args.txs: | |
txb = binascii.unhexlify(tx) | |
txf = io.BytesIO(txb) | |
ctx = CTransaction() | |
ctx.deserialize(txf) | |
ctx.rehash() | |
block.vtx.append(ctx) | |
withash = hash256(ser_uint256(block.calc_witness_merkle_root()) + (b'\x00' * 32)) | |
witout = CTxOut() | |
witout.nValue = 0 | |
witout.scriptPubKey = b'\x6a\x24\xaa\x21\xa9\xed' + withash | |
block.vtx[0].vout.append(witout) | |
witstack = CScriptWitness() | |
witstack.stack.append(b'\x00' * 32) | |
witin = CTxInWitness() | |
witin.scriptWitness = witstack | |
block.vtx[0].wit.vtxinwit.append(witin) | |
block.vtx[0].rehash() | |
block.hashMerkleRoot = block.calc_merkle_root() | |
rsp = node.getblocktemplate(template_request={'data': block.serialize().hex(), 'mode': 'proposal', 'rules': ['segwit']}) | |
if rsp: | |
print(rsp) | |
else: | |
print("Transactions are consensus valid") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment