Skip to content

Instantly share code, notes, and snippets.

@yihuang
Created May 30, 2018 06:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save yihuang/d6fb374bb0786a0c64eb7a1d3cf1703e to your computer and use it in GitHub Desktop.
Save yihuang/d6fb374bb0786a0c64eb7a1d3cf1703e to your computer and use it in GitHub Desktop.
Demonstrate use cardano v1 api to receive funds.
from pyswagger import App, Security
from pyswagger.contrib.client.requests import Client
from pyswagger.primitives import Primitive, create_str, validate_str
from mnemonic import Mnemonic
def validate_hex_base16(obj, ret, val, ctx):
print('validate', val)
return val
prim = Primitive()
prim.register('string', 'hex|base16', create_str, validate_hex_base16)
json_url = 'https://gist.githubusercontent.com/yihuang/358e0b1944e8c0cac204d20eb6efd35e/raw/e6ffc159d098e7fabfce71965dba1e708f66e3bf/gistfile1.txt'
app = App.load(json_url, prim=prim)
app.prepare(strict=True)
auth = Security(app)
client = Client(auth, send_opt={'verify': 'scripts/tls-files/ca.crt'})
def create_wallet(name):
phrase = Mnemonic('english').generate()
print('try to create wallet: ', phrase)
args = dict(
operation='restore',
name=name,
assuranceLevel='strict',
spendingPassword=None,
backupPhrase=phrase.split()
)
req_rsp = app.root.paths['/api/v1/wallets'].post(body=args)
rsp = client.request(req_rsp)
assert rsp.status == 201, 'invalid rsp %s %s' % (rsp.status, rsp.raw)
return rsp.data.data.id
def get_first_wallet():
req_rsp = app.root.paths['/api/v1/wallets'].get()
rsp = client.request(req_rsp)
if len(rsp.data.data) > 0:
return rsp.data.data[0].id
else:
return create_wallet('default')
def create_account(wallet_id):
req_rsp = app.root.paths['/api/v1/wallets/{walletId}/accounts'].post(body={'name': 'default'})
rsp = client.request(req_rsp)
assert rsp.status == 200, 'invalid rsp %s %s' % (rsp.status, rsp.raw)
return rsp.data.data.index
def get_first_account(wallet_id):
req_rsp = app.root.paths['/api/v1/wallets/{walletId}/accounts'].get(walletId=wallet_id)
rsp = client.request(req_rsp)
if len(rsp.data.data) > 0:
return rsp.data.data[0].index
else:
return create_account(wallet_id)
def poll_transaction_list(wallet_id, account_index, last_tx_id='0'):
req_rsp = app.root.paths['/api/v1/transactions'].get(wallet_id=wallet_id, account_index=account_index, id=('GT[%s]'%last_tx_id))
rsp = client.request(req_rsp)
items = rsp.data.data
for item in items:
if item.type == 'foreign' and item.direction == 'incoming':
print('incoming tx', item.id, item.outputs[0].address, item.amount)
if items:
print('last tx id', items[-1].id)
return items[-1].id
else:
return last_tx_id
def new_address(wallet_id, account_index):
args = {'walletId': wallet_id, 'accountIndex': account_index}
req_rsp = app.root.paths['/api/v1/addresses'].post(body=args)
rsp = client.request(req_rsp)
assert rsp.status == 200, 'invalid rsp %s %s' % (rsp.status, rsp.raw)
return rsp.data.data.id
wallet_id = get_first_wallet()
account_index = get_first_account(wallet_id)
print(wallet_id, account_index)
print('test new accress', new_address(wallet_id, account_index))
import sys, time
last_tx_id = '0'
if len(sys.argv) > 1 :
last_tx_id = sys.argv[1]
print('start monitor incoming transactions')
while True:
time.sleep(1)
last_tx_id = poll_transaction_list(wallet_id, account_index, last_tx_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment