Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Demonstrate use cardano v1 api to receive funds.
from pyswagger import App, Security
from pyswagger.contrib.client.requests import Client
from pyswagger.primitives import Primitive, create_str, validate_str
from mnemonic import Mnemonic
def validate_hex_base16(obj, ret, val, ctx):
print('validate', val)
return val
prim = Primitive()
prim.register('string', 'hex|base16', create_str, validate_hex_base16)
json_url = 'https://gist.githubusercontent.com/yihuang/358e0b1944e8c0cac204d20eb6efd35e/raw/e6ffc159d098e7fabfce71965dba1e708f66e3bf/gistfile1.txt'
app = App.load(json_url, prim=prim)
app.prepare(strict=True)
auth = Security(app)
client = Client(auth, send_opt={'verify': 'scripts/tls-files/ca.crt'})
def create_wallet(name):
phrase = Mnemonic('english').generate()
print('try to create wallet: ', phrase)
args = dict(
operation='restore',
name=name,
assuranceLevel='strict',
spendingPassword=None,
backupPhrase=phrase.split()
)
req_rsp = app.root.paths['/api/v1/wallets'].post(body=args)
rsp = client.request(req_rsp)
assert rsp.status == 201, 'invalid rsp %s %s' % (rsp.status, rsp.raw)
return rsp.data.data.id
def get_first_wallet():
req_rsp = app.root.paths['/api/v1/wallets'].get()
rsp = client.request(req_rsp)
if len(rsp.data.data) > 0:
return rsp.data.data[0].id
else:
return create_wallet('default')
def create_account(wallet_id):
req_rsp = app.root.paths['/api/v1/wallets/{walletId}/accounts'].post(body={'name': 'default'})
rsp = client.request(req_rsp)
assert rsp.status == 200, 'invalid rsp %s %s' % (rsp.status, rsp.raw)
return rsp.data.data.index
def get_first_account(wallet_id):
req_rsp = app.root.paths['/api/v1/wallets/{walletId}/accounts'].get(walletId=wallet_id)
rsp = client.request(req_rsp)
if len(rsp.data.data) > 0:
return rsp.data.data[0].index
else:
return create_account(wallet_id)
def poll_transaction_list(wallet_id, account_index, last_tx_id='0'):
req_rsp = app.root.paths['/api/v1/transactions'].get(wallet_id=wallet_id, account_index=account_index, id=('GT[%s]'%last_tx_id))
rsp = client.request(req_rsp)
items = rsp.data.data
for item in items:
if item.type == 'foreign' and item.direction == 'incoming':
print('incoming tx', item.id, item.outputs[0].address, item.amount)
if items:
print('last tx id', items[-1].id)
return items[-1].id
else:
return last_tx_id
def new_address(wallet_id, account_index):
args = {'walletId': wallet_id, 'accountIndex': account_index}
req_rsp = app.root.paths['/api/v1/addresses'].post(body=args)
rsp = client.request(req_rsp)
assert rsp.status == 200, 'invalid rsp %s %s' % (rsp.status, rsp.raw)
return rsp.data.data.id
wallet_id = get_first_wallet()
account_index = get_first_account(wallet_id)
print(wallet_id, account_index)
print('test new accress', new_address(wallet_id, account_index))
import sys, time
last_tx_id = '0'
if len(sys.argv) > 1 :
last_tx_id = sys.argv[1]
print('start monitor incoming transactions')
while True:
time.sleep(1)
last_tx_id = poll_transaction_list(wallet_id, account_index, last_tx_id)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment