Skip to content

Instantly share code, notes, and snippets.

@tonymorony
Created June 30, 2019 19:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonymorony/3194c70f1f2c6b303fe22c59a2894080 to your computer and use it in GitHub Desktop.
Save tonymorony/3194c70f1f2c6b303fe22c59a2894080 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import re
import os
import requests
import json
import platform
import configparser
import sys
def config(filename, section):
parser = configparser.ConfigParser()
conf_file = (os.path.join(os.getcwd(),filename))
parser.read(conf_file)
db = {}
if parser.has_section(section):
params = parser.items(section)
for param in params:
db[param[0]] = param[1]
return db
# define function that fetchs rpc creds from .conf
def def_credentials(chain):
operating_system = platform.system()
if operating_system == 'Darwin':
ac_dir = os.environ['HOME'] + '/Library/Application Support/Komodo'
elif operating_system == 'Linux':
ac_dir = os.environ['HOME'] + '/.komodo'
elif operating_system == 'Windows':
ac_dir = '%s/komodo/' % os.environ['APPDATA']
if chain == 'KMD':
coin_config_file = str(ac_dir + '/komodo.conf')
else:
coin_config_file = str(ac_dir + '/' + chain + '/' + chain + '.conf')
with open(coin_config_file, 'r') as f:
for line in f:
l = line.rstrip()
#print(l)
if re.search('rpcuser', l):
rpcuser = l.replace('rpcuser=', '')
elif re.search('rpcpassword', l):
rpcpassword = l.replace('rpcpassword=', '')
elif re.search('rpcport', l):
rpcport = l.replace('rpcport=', '')
rpc='http://' + rpcuser + ':' + rpcpassword + '@127.0.0.1:' + rpcport
return(rpc)
# define function that posts json data
def post_rpc(url, payload, auth=None):
print(payload)
try:
r = requests.post(url, data=json.dumps(payload), auth=auth)
return(json.loads(r.text))
except Exception as e:
raise Exception("Couldn't connect to " + url + ": ", e)
def payload(method, paramlist):
if len(paramlist) > 0:
params = paramlist[0]
else:
params = paramlist
return { "jsonrpc": "1.0", "id": "python", "method": method, "params": params }
int_methods = ['getblockhash']
json_methods = ['getaddressbalance','getaddressdeltas']
non_json_result = ['getnewaddress', 'getbalance', 'dumpprivkey']
def rpc(chain, method, *args):
params = dict(locals())
paramlist = []
for x in params:
paramlist.append(params[x])
paramlist.reverse()
operating_system = platform.system()
if operating_system == 'Darwin':
rpc_params = paramlist[2]
elif operating_system == 'Linux':
rpc_params = paramlist[2]
elif operating_system == 'Windows':
rpc_params = paramlist[0]
post_rpc_output = post_rpc(def_credentials(chain), payload(method, rpc_params))
print(post_rpc_output)
if "result" in post_rpc_output:
if post_rpc_output["result"] is not None:
return post_rpc_output['result']
elif "error" in post_rpc_output:
return post_rpc_output['error']
else:
print("none result")
elif "error" in post_rpc_output:
print("error result")
return post_rpc_output['error']
elif method in non_json_result:
print("-- JSON RESULT --")
return post_rpc_output
else:
return "info: "+post_rpc_output.splitlines()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment