Created
May 30, 2018 06:53
-
-
Save yihuang/d6fb374bb0786a0c64eb7a1d3cf1703e to your computer and use it in GitHub Desktop.
Demonstrate use cardano v1 api to receive funds.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyswagger import App, Security | |
from pyswagger.contrib.client.requests import Client | |
from pyswagger.primitives import Primitive, create_str, validate_str | |
from mnemonic import Mnemonic | |
def validate_hex_base16(obj, ret, val, ctx): | |
print('validate', val) | |
return val | |
prim = Primitive() | |
prim.register('string', 'hex|base16', create_str, validate_hex_base16) | |
json_url = 'https://gist.githubusercontent.com/yihuang/358e0b1944e8c0cac204d20eb6efd35e/raw/e6ffc159d098e7fabfce71965dba1e708f66e3bf/gistfile1.txt' | |
app = App.load(json_url, prim=prim) | |
app.prepare(strict=True) | |
auth = Security(app) | |
client = Client(auth, send_opt={'verify': 'scripts/tls-files/ca.crt'}) | |
def create_wallet(name): | |
phrase = Mnemonic('english').generate() | |
print('try to create wallet: ', phrase) | |
args = dict( | |
operation='restore', | |
name=name, | |
assuranceLevel='strict', | |
spendingPassword=None, | |
backupPhrase=phrase.split() | |
) | |
req_rsp = app.root.paths['/api/v1/wallets'].post(body=args) | |
rsp = client.request(req_rsp) | |
assert rsp.status == 201, 'invalid rsp %s %s' % (rsp.status, rsp.raw) | |
return rsp.data.data.id | |
def get_first_wallet(): | |
req_rsp = app.root.paths['/api/v1/wallets'].get() | |
rsp = client.request(req_rsp) | |
if len(rsp.data.data) > 0: | |
return rsp.data.data[0].id | |
else: | |
return create_wallet('default') | |
def create_account(wallet_id): | |
req_rsp = app.root.paths['/api/v1/wallets/{walletId}/accounts'].post(body={'name': 'default'}) | |
rsp = client.request(req_rsp) | |
assert rsp.status == 200, 'invalid rsp %s %s' % (rsp.status, rsp.raw) | |
return rsp.data.data.index | |
def get_first_account(wallet_id): | |
req_rsp = app.root.paths['/api/v1/wallets/{walletId}/accounts'].get(walletId=wallet_id) | |
rsp = client.request(req_rsp) | |
if len(rsp.data.data) > 0: | |
return rsp.data.data[0].index | |
else: | |
return create_account(wallet_id) | |
def poll_transaction_list(wallet_id, account_index, last_tx_id='0'): | |
req_rsp = app.root.paths['/api/v1/transactions'].get(wallet_id=wallet_id, account_index=account_index, id=('GT[%s]'%last_tx_id)) | |
rsp = client.request(req_rsp) | |
items = rsp.data.data | |
for item in items: | |
if item.type == 'foreign' and item.direction == 'incoming': | |
print('incoming tx', item.id, item.outputs[0].address, item.amount) | |
if items: | |
print('last tx id', items[-1].id) | |
return items[-1].id | |
else: | |
return last_tx_id | |
def new_address(wallet_id, account_index): | |
args = {'walletId': wallet_id, 'accountIndex': account_index} | |
req_rsp = app.root.paths['/api/v1/addresses'].post(body=args) | |
rsp = client.request(req_rsp) | |
assert rsp.status == 200, 'invalid rsp %s %s' % (rsp.status, rsp.raw) | |
return rsp.data.data.id | |
wallet_id = get_first_wallet() | |
account_index = get_first_account(wallet_id) | |
print(wallet_id, account_index) | |
print('test new accress', new_address(wallet_id, account_index)) | |
import sys, time | |
last_tx_id = '0' | |
if len(sys.argv) > 1 : | |
last_tx_id = sys.argv[1] | |
print('start monitor incoming transactions') | |
while True: | |
time.sleep(1) | |
last_tx_id = poll_transaction_list(wallet_id, account_index, last_tx_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment