Skip to content

Instantly share code, notes, and snippets.

@achow101
Created September 18, 2019 21:18
Show Gist options
  • Save achow101/f24309bba501cc08c9a3a3d55fa6eec6 to your computer and use it in GitHub Desktop.
Save achow101/f24309bba501cc08c9a3a3d55fa6eec6 to your computer and use it in GitHub Desktop.
#! /usr/bin/env python3
import base64
import decimal
from http import HTTPStatus
import http.client
import json
import logging
import os
import socket
import time
import urllib.parse
import argparse
import atexit
import psutil
import random
parser = argparse.ArgumentParser()
parser.add_argument('datadir')
args = parser.parse_args()
HTTP_TIMEOUT = 30
USER_AGENT = "AuthServiceProxy/0.1"
log = logging.getLogger("BitcoinRPC")
class JSONRPCException(Exception):
def __init__(self, rpc_error, http_status=None):
try:
errmsg = '%(message)s (%(code)i)' % rpc_error
except (KeyError, TypeError):
errmsg = ''
super().__init__(errmsg)
self.error = rpc_error
self.http_status = http_status
def EncodeDecimal(o):
if isinstance(o, decimal.Decimal):
return str(o)
raise TypeError(repr(o) + " is not JSON serializable")
class AuthServiceProxy():
__id_count = 0
# ensure_ascii: escape unicode as \uXXXX, passed to json.dumps
def __init__(self, service_url, service_name=None, timeout=HTTP_TIMEOUT, connection=None, ensure_ascii=True):
self.__service_url = service_url
self._service_name = service_name
self.ensure_ascii = ensure_ascii # can be toggled on the fly by tests
self.__url = urllib.parse.urlparse(service_url)
user = None if self.__url.username is None else self.__url.username.encode('utf8')
passwd = None if self.__url.password is None else self.__url.password.encode('utf8')
authpair = user + b':' + passwd
self.__auth_header = b'Basic ' + base64.b64encode(authpair)
self.timeout = timeout
self._set_conn(connection)
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
# Python internal stuff
raise AttributeError
if self._service_name is not None:
name = "%s.%s" % (self._service_name, name)
return AuthServiceProxy(self.__service_url, name, connection=self.__conn)
def _request(self, method, path, postdata):
'''
Do a HTTP request, with retry if we get disconnected (e.g. due to a timeout).
This is a workaround for https://bugs.python.org/issue3566 which is fixed in Python 3.5.
'''
headers = {'Host': self.__url.hostname,
'User-Agent': USER_AGENT,
'Authorization': self.__auth_header,
'Content-type': 'application/json'}
if os.name == 'nt':
# Windows somehow does not like to re-use connections
# TODO: Find out why the connection would disconnect occasionally and make it reusable on Windows
self._set_conn()
try:
self.__conn.request(method, path, postdata, headers)
return self._get_response()
except http.client.BadStatusLine as e:
if e.line == "''": # if connection was closed, try again
self.__conn.close()
self.__conn.request(method, path, postdata, headers)
return self._get_response()
else:
raise
except (BrokenPipeError, ConnectionResetError):
# Python 3.5+ raises BrokenPipeError instead of BadStatusLine when the connection was reset
# ConnectionResetError happens on FreeBSD with Python 3.4
self.__conn.close()
self.__conn.request(method, path, postdata, headers)
return self._get_response()
def get_request(self, *args, **argsn):
AuthServiceProxy.__id_count += 1
log.debug("-{}-> {} {}".format(
AuthServiceProxy.__id_count,
self._service_name,
json.dumps(args or argsn, default=EncodeDecimal, ensure_ascii=self.ensure_ascii),
))
if args and argsn:
raise ValueError('Cannot handle both named and positional arguments')
return {'version': '1.1',
'method': self._service_name,
'params': args or argsn,
'id': AuthServiceProxy.__id_count}
def __call__(self, *args, **argsn):
postdata = json.dumps(self.get_request(*args, **argsn), default=EncodeDecimal, ensure_ascii=self.ensure_ascii)
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8'))
if response['error'] is not None:
raise JSONRPCException(response['error'], status)
elif 'result' not in response:
raise JSONRPCException({
'code': -343, 'message': 'missing JSON-RPC result'}, status)
elif status != HTTPStatus.OK:
raise JSONRPCException({
'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status)
else:
return response['result']
def batch(self, rpc_call_list):
postdata = json.dumps(list(rpc_call_list), default=EncodeDecimal, ensure_ascii=self.ensure_ascii)
log.debug("--> " + postdata)
response, status = self._request('POST', self.__url.path, postdata.encode('utf-8'))
if status != HTTPStatus.OK:
raise JSONRPCException({
'code': -342, 'message': 'non-200 HTTP status code but no JSON-RPC error'}, status)
return response
def _get_response(self):
req_start_time = time.time()
try:
http_response = self.__conn.getresponse()
except socket.timeout:
raise JSONRPCException({
'code': -344,
'message': '%r RPC took longer than %f seconds. Consider '
'using larger timeout for calls that take '
'longer to return.' % (self._service_name,
self.__conn.timeout)})
if http_response is None:
raise JSONRPCException({
'code': -342, 'message': 'missing HTTP response from server'})
content_type = http_response.getheader('Content-Type')
if content_type != 'application/json':
raise JSONRPCException(
{'code': -342, 'message': 'non-JSON HTTP response with \'%i %s\' from server' % (http_response.status, http_response.reason)},
http_response.status)
responsedata = http_response.read().decode('utf8')
response = json.loads(responsedata, parse_float=decimal.Decimal)
elapsed = time.time() - req_start_time
if "error" in response and response["error"] is None:
log.debug("<-%s- [%.6f] %s" % (response["id"], elapsed, json.dumps(response["result"], default=EncodeDecimal, ensure_ascii=self.ensure_ascii)))
else:
log.debug("<-- [%.6f] %s" % (elapsed, responsedata))
return response, http_response.status
def __truediv__(self, relative_uri):
return AuthServiceProxy("{}/{}".format(self.__service_url, relative_uri), self._service_name, connection=self.__conn)
def _set_conn(self, connection=None):
port = 80 if self.__url.port is None else self.__url.port
if connection:
self.__conn = connection
self.timeout = connection.timeout
elif self.__url.scheme == 'https':
self.__conn = http.client.HTTPSConnection(self.__url.hostname, port, timeout=self.timeout)
else:
self.__conn = http.client.HTTPConnection(self.__url.hostname, port, timeout=self.timeout)
# Wait for cookie file to be created
while not os.path.exists(args.datadir + '/regtest/.cookie'):
time.sleep(0.5)
# Read .cookie file to get user and pass
with open(args.datadir + '/regtest/.cookie') as f:
userpass = f.readline().lstrip().rstrip()
rpc = AuthServiceProxy('http://{}@127.0.0.1:18443'.format(userpass))
# Wait for bitcoind to be ready
ready = False
while not ready:
try:
rpc.getblockchaininfo()
ready = True
except Exception:
time.sleep(0.5)
pass
print('ready')
for item in rpc.listwalletdir()['wallets']:
if 'big' == item['name']:
break;
else:
rpc.createwallet('big')
if 'big' not in rpc.listwallets():
rpc.loadwallet('big')
def_rpc = AuthServiceProxy('http://{}@127.0.0.1:18443/wallet/'.format(userpass))
big_rpc = AuthServiceProxy('http://{}@127.0.0.1:18443/wallet/big'.format(userpass))
print('mining')
gen_addr = big_rpc.getnewaddress()
if rpc.getblockcount() == 0:
big_rpc.generatetoaddress(200, gen_addr)
self_addr = big_rpc.getnewaddress()
send_addr = def_rpc.getnewaddress()
for i in range(0, 10000):
if i % 100 == 0:
print(i)
big_rpc.generatetoaddress(1, gen_addr)
for j in range(0, 10):
if big_rpc.getbalance() < 500:
print("rebalancing")
def_rpc.sendtoaddress(big_rpc.getnewaddress(), def_rpc.getbalance(), "", "", True)
big_rpc.generatetoaddress(6, gen_addr)
amt = round(random.randint(10000, 500 * 100000000) / 100000000, 8)
addr = random.choice([send_addr, self_addr])
big_rpc.sendtoaddress(addr, amt)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment