Skip to content

Instantly share code, notes, and snippets.

@mrtrkmn
Last active January 8, 2019 17:49
Show Gist options
  • Save mrtrkmn/01c2423c988f9e58455fd3c7395d0e55 to your computer and use it in GitHub Desktop.
Save mrtrkmn/01c2423c988f9e58455fd3c7395d0e55 to your computer and use it in GitHub Desktop.
Creating and Managing Iroha Assets and Accounts
import iroha
import block_pb2
import endpoint_pb2
import endpoint_pb2_grpc
import queries_pb2
import grpc
## Getting Iroha Objects
tx_builder = iroha.ModelTransactionBuilder()
query_builder = iroha.ModelQueryBuilder()
crypto = iroha.ModelCrypto()
proto_tx_helper = iroha.ModelProtoTransaction()
proto_query_helper = iroha.ModelProtoQuery()
## Read public and private keys
admin_priv = open("admin@test.priv", "r").read()
admin_pub = open("admin@test.pub", "r").read()
key_pair = crypto.convertFromExisting(admin_pub, admin_priv)
## Function which prints transaction status with a single synchronus simpe call.
def print_status(tx):
# Create status request
print("Hash of the transaction: ", tx.hash().hex())
tx_hash = tx.hash().blob()
# The work with byte arrays is different in Python 2 and 3
# Check python version
if sys.version_info[0] == 2:
# Python 2 version
tx_hash = ''.join(map(chr, tx_hash))
else:
# Python 3 version
tx_hash = bytes(tx_hash)
# Create request
request = endpoint_pb2.TxStatusRequest()
request.tx_hash = tx_hash
# Create connection to Iroha
channel = grpc.insecure_channel(IP+':'+port)
stub = endpoint_pb2_grpc.CommandServiceStub(channel)
# Send request
response = stub.Status(request)
status = endpoint_pb2.TxStatus.Name(response.tx_status)
print("Status of transaction is:", status)
if status != "COMMITTED":
print("Your transaction wasn't committed")
exit(1)
## Sending transactions to Iroha
def send_tx(tx, key_pair):
tx_blob = proto_tx_helper.signAndAddSignature(tx, key_pair).blob()
proto_tx = block_pb2.Transaction()
if sys.version_info[0] == 2:
tmp = ''.join(map(chr, tx_blob))
else:
tmp = bytes(tx_blob)
proto_tx.ParseFromString(tmp)
channel = grpc.insecure_channel(IP+':'+port)
stub = endpoint_pb2_grpc.CommandServiceStub(channel)
stub.Torii(proto_tx)
## Send query to Iroha and receive a response
def send_query(query, key_pair):
query_blob = proto_query_helper.signAndAddSignature(query, key_pair).blob()
proto_query = queries_pb2.Query()
if sys.version_info[0] == 2:
tmp = ''.join(map(chr, query_blob))
else:
tmp = bytes(query_blob)
proto_query.ParseFromString(tmp)
channel = grpc.insecure_channel(IP+':'+port)
query_stub = endpoint_pb2_grpc.QueryServiceStub(channel)
query_response = query_stub.Find(proto_query)
return query_response
## Creating domain and assets in python
tx = tx_builder.creatorAccountId(creator) \
.createdTime(current_time) \
.createDomain("domain", "user") \
.createAsset("coin", "domain", 2).build()
send_tx(tx, key_pair)
print_status(tx)
## Creaing asset quantity :
tx = tx_builder.creatorAccountId(creator) \
.createdTime(current_time) \
.addAssetQuantity("coin#domain", "1000.2").build()
send_tx(tx, key_pair)
print_status(tx)
## Creating account:
user1_kp = crypto.generateKeypair()
tx = tx_builder.creatorAccountId(creator) \
.createdTime(current_time) \
.createAccount("userone", "domain", user1_kp.publicKey()).build()
send_tx(tx, key_pair)
print_status(tx)
## Sending assets:
tx = tx_builder.creatorAccountId(creator) \
.createdTime(current_time) \
.transferAsset("admin@test", "userone@domain", "coin#domain", "Some message",
˓→"2.0").build()
send_tx(tx, key_pair)
print_status(tx)
## Querying assets info :
query = query_builder.creatorAccountId(creator) \
.createdTime(current_time) \
.queryCounter(1) \
.getAssetInfo("coin#domain") \
.build()
query_response = send_query(query, key_pair)
if not query_response.HasField("asset_response"):
print("Query response error")
exit(1)
else:
print("Query responded with asset response")
asset_info = query_response.asset_response.asset
print("Asset Id =", asset_info.asset_id)
print("Precision =", asset_info.precision)
## Querying account assets.
query = query_builder.creatorAccountId(creator) \
.createdTime(current_time) \
.queryCounter(11) \
.getAccountAssets("userone@domain", "coin#domain") \
.build()
query_response = send_query(query, key_pair)
print(query_response)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment