Skip to content

Instantly share code, notes, and snippets.

@Vyryn
Last active September 1, 2022 14:09
Show Gist options
  • Save Vyryn/21bdfe0d3ec66baf62eeadc60fc6db41 to your computer and use it in GitHub Desktop.
Save Vyryn/21bdfe0d3ec66baf62eeadc60fc6db41 to your computer and use it in GitHub Desktop.
Utilities for reliably publishing transactions to the wax network with notoriously unreliable api endpoints.
"""
Utilities for interacting with unreliable Wax endpoints for transactions on that network.
Copyright (C) 2021 Vyryn
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
import asyncio
import binascii
import hashlib
import random
from typing import List, Union
from json import JSONDecodeError, dumps
import aiohttp
from aioeos import EosTransaction, EosKey, EosJsonRpc, EosAction, serializer
from aioeos.contracts import eosio_token
from aioeos.exceptions import EosAssertMessageException, EosRpcException
from aioeos.rpc import ERROR_NAME_MAP
from wax_contracts import atomicassets, atomictoolsx
wax_apis = [
'https://wax.pink.gg',
'https://api.waxsweden.org',
'https://wax.greymass.com',
'https://wax.eosn.io',
'https://wax.blokcrafters.io',
'https://api-wax.eosarabia.net',
'https://wax.eoseoul.io',
'https://chain.wax.io'
]
wax_history_api = '/v2/history/get_transaction'
def log(message: typing.Any, severity='INFO') -> None:
"""
Prints message to console if bot's severity level warrants it. Allows more customizability in what to log and
what not to log than classic critical, error, warn, info, debug.
"""
if severity in settings.DONT_LOG:
return
print(f'[{severity}] {repr(message)}')
def format_wax_amount(amount):
total = f'{amount:.8f}'
return f'{total} WAX'
class InvalidWaxCardSend(InvalidResponse):
pass
class NoCardsException(UnableToCompleteRequestedAction):
pass
class InvalidInput(UserWarning):
"""The user provided input is invalid."""
class InvalidResponse(ValueError):
"""A response from an external API was invalid."""
class UnableToCompleteRequestedAction(Exception):
"""I was unable to complete the requested action and it was my fault."""
class EosJsonRpcWrapper(EosJsonRpc):
"""Wrapper class for EosJsonRpc to reuse an aiohttp session which is good practice."""
def __init__(self, url: str, ses: aiohttp.ClientSession = None) -> None:
self.ses = ses
super().__init__(url)
async def post(self, endpoint: str, json=None) -> dict:
if json is None:
json = {}
if self.ses is not None:
async with self.ses.post(
f'{self.URL}/v1{endpoint}',
json=json
) as res:
try:
resp_dict = await res.json(content_type=None)
except JSONDecodeError:
resp_dict = {'code': 500, 'error': {'name': 'JSONDecodeError'}}
# Poor coding practice, but this is what the lib uses. I've added use of status but kept code too as
# it is what the lib uses.
if res.status == 500 or resp_dict.get('code') == 500:
error = resp_dict.get('error', {})
raise ERROR_NAME_MAP.get(
error.get('name'),
EosRpcException
)(error)
return resp_dict
# If self has no session, just use super.post which creates a session and cleans up each time. This is done
# instead of making a self.ses if one isn't provided in order to ensure proper cleanup without requiring use
# of a context manager to invoke this object.
return await super().post(endpoint, json=json)
def get_resp_code(response):
code = response.get('code', 0)
if code == 0 or not type(code) == int:
code = response.get('statusCode', 0)
try:
code = int(code)
except TypeError:
pass
return code
class WaxConnection:
"""Manager class for idempotently coordinating multiple EosJsonRpcWrappers to increase robustness of
connection to the network with unreliable APIs. Replaces sign_and_push_transaction with custom
execute_transactio so this is a generalized class but also provides methods for some specialized
high level interactions. I suppose this is what you do when you really want to use a library you don't like.
Bot object can be any object with a session attribute and a wax_ac attribute. Wax_ac should be a wax account
object, and session an aiohttp client session."""
def __init__(self, bot, addrs: [str] = None) -> None:
self.adders = addrs or wax_apis
self.wax_ac = bot.wax_ac
self.bot = bot
self.session = self.bot.session
self.rpc = [EosJsonRpcWrapper(addr, ses=self.session) for addr in self.adders]
# A semi-random sampling that can evolve over time as errors come up to help ensure the most reliable api is
# the most frequently used.
self.weighted_rpc = self.rpc.copy()
self.weighted_rpc.extend([random.choice(self.rpc) for _ in range(20)])
self.log = lambda x: log(x, 'DBUG')
async def execute_transaction(
self, actions: Union[EosAction, List[EosAction]], context_free_bytes=bytes(32)) -> dict:
"""Attempts to sign and push a transaction to one API. Failing that, it goes to another all the way down the
list. Pass a list of EosActions or a single EosAction."""
# Convert to list if it isn't one already
if not isinstance(actions, list):
actions = [actions]
failed_rpcs, suc = set(), 'None'
block, chain_id = None, None
# Try getting the head block from each rpc until one succeeds
for rpc in self.rpc:
try:
self.log(f'Attempting to get head block from {rpc.URL}')
block = await rpc.get_head_block()
chain_id = await rpc.get_chain_id()
for action in actions:
if isinstance(action.data, dict):
abi_bin = await self.rpc[0].abi_json_to_bin(
action.account, action.name, action.data
)
action.data = binascii.unhexlify(abi_bin['binargs'])
suc = rpc.URL
break
except IndexError:
failed_rpcs.add(rpc.URL)
continue
except aiohttp.ClientConnectorError:
failed_rpcs.add(rpc.URL)
continue
if block is None:
raise InvalidWaxCardSend(f'Failed to get head block from any of my {len(self.rpc)} RPC urls.')
if failed_rpcs:
self.log(f'Failed to get head block from {failed_rpcs} but eventually got {block} from {suc}.')
transaction = EosTransaction(
ref_block_num=block['block_num'] & 65535,
ref_block_prefix=block['ref_block_prefix'],
actions=actions)
# Serialize transaction just once for all APIs so it will be idempotent, very important if sending to
# several nodes.
serialized_transaction = serializer.serialize(transaction)
digest = hashlib.sha256(
b''.join((chain_id, serialized_transaction, context_free_bytes))
).digest()
signatures = [self.wax_ac.key.sign(digest)]
serialized_transaction = (binascii.hexlify(serialized_transaction).decode())
future = self.bot.loop.create_future() # for the callback upon success
# Send the transaction to all connected nodes simultaneously.
tasks = [asyncio.create_task(self.tx(future, rpc, signatures, serialized_transaction)) for rpc in self.rpc]
# Add a timeout so this doesn't hang indefinitely if no APIs are working
async def timeout(fut: asyncio.Future, length: int):
await asyncio.sleep(length)
fut.set_result(None)
tasks.append(asyncio.create_task(timeout(future, 10)))
result = await future
if not result:
raise InvalidWaxCardSend('Hmm, all the APIs I am connected to seem to be down at the moment.')
for task in tasks:
task.cancel()
return result
async def tx(self, fut: asyncio.Future, rpc: EosJsonRpcWrapper, signatures: list, serialized_transaction: str) \
-> None:
"""Does a single transaction push to a single EosJsonRpcWrapper object, if it is successful sets the
callback future."""
try:
resp = await rpc.push_transaction(signatures=signatures, serialized_transaction=serialized_transaction)
except EosAssertMessageException as e:
raise InvalidInput(e)
content = dumps(resp).replace('\\', '')
if 'authorization' in content and 'block_num' in content:
self.log(f'I think {content} is a valid tx.')
fut.set_result(resp)
async def transfer_funds(self, receiver: str, amount, sender: str = 'Unknown'):
prep_amount = format_wax_amount(amount)
actions = [
eosio_token.transfer(
from_addr=self.wax_ac.name,
to_addr=receiver,
quantity=prep_amount,
memo=f'Funds transfer by {sender} on behalf of NFT Tip Bot.',
authorization=[self.wax_ac.authorization('active')]
)
]
return await self.execute_transaction(actions)
async def transfer_assets(self, receiver: str, asset_ids: [int], sender: str = 'Unknown', memo: str = None):
if not memo:
memo = f'Asset transfer by {sender} on behalf of NFT Tip Bot.'
actions = [
atomicassets.transfer(
from_addr=self.wax_ac.name,
to_addr=receiver,
asset_ids=asset_ids,
memo=memo,
authorization=[self.wax_ac.authorization('active')]
)
]
return await self.execute_transaction(actions)
async def mint_asset(self, to_addr: str, template_id: int, test=False):
if test:
collection = 'testcol12345'
schema = 'testcard'
else:
collection = 'crptomonkeys'
schema = 'crptomonkeys'
actions = [
atomicassets.mintasset(
minter=self.wax_ac.name,
to_addr=to_addr,
collection=collection,
schema=schema,
template_id=template_id,
authorization=[self.wax_ac.authorization('active')]
)
]
return await self.execute_transaction(actions)
def update_weighted_rpc(self, faulty: EosJsonRpcWrapper) -> None:
"""Does one update of the semi-random weighted rpc list by removing the faulty entry and adding a random one
if the selection is too thin or too small."""
try:
self.weighted_rpc.remove(faulty)
except ValueError:
pass
if len(self.weighted_rpc) < 20 or len(set(self.weighted_rpc)) < 3:
self.weighted_rpc.append(random.choice(self.rpc))
async def get_link_id_and_confirm_claimlink_creation(self, tx_id) -> str:
"""Attempts to confirm that a claimlink was successfully created and get its link_id to present to the
recipient in a claimlink."""
cycles = 0
params = {
'id': tx_id
}
selected = ''
while cycles < 30:
await asyncio.sleep(min(2 ** cycles, 64)) # Exponential backoff
selected = random.choice(self.weighted_rpc)
host = selected.URL
try:
async with self.session.get(host + wax_history_api, params=params) as resp:
response = await resp.json()
self.log(f'Response to attempt to get history for {tx_id}: {response}')
code = get_resp_code(response)
if code < 400 and response.get('executed', False):
link_id = response['actions'][1]['act']['data']['link_id']
return link_id
except aiohttp.ServerDisconnectedError:
self.update_weighted_rpc(selected)
# this_host_failed += 1
await asyncio.sleep(random.randrange(1, 5))
except (aiohttp.ContentTypeError, IndexError, KeyError):
self.update_weighted_rpc(selected)
self.update_weighted_rpc(selected)
raise InvalidWaxCardSend('I submitted the transaction, but WAX failed to process it within'
f' 27 minutes, so the attempt has timed out. I have tried submitting'
f' to {len(self.rpc)} different APIs during this time but was unable '
f'to confirm the transaction through any of them.')
async def create_claimlink(self, asset_ids: [int], memo=None, wait_for_confirmation=True) -> (int, str):
"""Creates and returns a claimlink for the specified asset ids."""
if not memo:
memo = 'NFT Tip Bot reward claimlink.'
memo += ' WARNING: Tip bot claimlinks may be cancelled 91 days after issuance.'
# Generate the link's keypair
keypair = EosKey()
priv_key = keypair.to_wif()
key = keypair.to_public()
actions = [
atomictoolsx.announcelink(
creator=self.wax_ac.name,
key=key,
asset_ids=asset_ids,
memo=memo,
authorization=[self.wax_ac.authorization('active')]),
atomicassets.transfer(
from_addr=self.wax_ac.name,
to_addr='atomictoolsx',
asset_ids=asset_ids,
memo='link',
authorization=[self.wax_ac.authorization('active')])
]
result = await self.execute_transaction(actions)
tx_id = result['transaction_id']
self.log(f'Claimlink submission. Result is: {result}')
# Need to confirm tx_id to know what link to give user. If wait_for_confirmation is off then take best guess.
if wait_for_confirmation:
link_id = await self.get_link_id_and_confirm_claimlink_creation(tx_id)
else:
link_id = str(result).split("link_id': ")[1].split(',')[0]
link_id = link_id[1:].split("'")[0]
link = f'https://wax.atomichub.io/trading/link/{link_id}?key={priv_key}'
return link
async def get_random_cryptomonkey_link(self, user: str, memo='', num=1) -> str:
if memo == '':
f'Random NFT reward for {user}.'
else:
memo += f' ({user})'
if len(self.bot.cached_card_ids) < 1:
raise NoCardsException(f"The NFT Tip Bot account is empty, so I can't send {user} any more cards")
if len(memo) > 256:
raise InvalidInput(f'Your memo must be less than 256 characters long. With the bit I add,'
f'it is currently {len(memo)} characters long.')
# Choose an asset and make a claim link. Ensure parallel executions can't choose the same asset.
selected_asset_ids = []
while self.bot.cl_reentrancy_guard:
await asyncio.sleep(0.2)
self.bot.cl_reentrancy_guard = True
n_cards = len(self.bot.cached_card_ids)
if n_cards < num:
raise NoCardsException(f"The bot account only has {n_cards} cards at the moment, "
f"so I can't send {user} {num} cards.")
for i in range(num):
# bot.cached_card_ids is refreshed every minute and shuffled at fetch time.
selected_asset_ids.append(self.bot.cached_card_ids.pop())
self.bot.cl_reentrancy_guard = False
link = await self.create_claimlink(selected_asset_ids, memo=memo)
return link
'''atomicasets.py'''
"""Helpers for creating actions on atomicassets contract. By Vyryn"""
from aioeos import types
contract = 'atomicassets'
def mintasset(minter: str, to_addr: str, collection: str, schema: str, template_id: int = -1,
immutable_data=[], mutable_data=[], authorization=[]) -> types.EosAction:
return types.EosAction(
account=contract,
name='mintasset',
authorization=authorization,
data={
'authorized_minter': minter,
'collection_name': collection,
'schema_name': schema,
'template_id': template_id,
'new_asset_owner': to_addr,
'immutable_data': immutable_data,
'mutable_data': mutable_data,
'tokens_to_back': []
}
)
def transfer(from_addr: str, to_addr: str, asset_ids: [int], memo: str = '', authorization=[]) -> types.EosAction:
return types.EosAction(
account=contract,
name='transfer',
authorization=authorization,
data={
'from': from_addr,
'to': to_addr,
'asset_ids': asset_ids,
'memo': memo
}
)
'''atomictools.py'''
"""Helpers for creating actions on atomictoolsx contract. By Vyryn"""
from aioeos import types
contract = 'atomictoolsx'
def announcelink(creator: str, key: str, asset_ids: [int], memo: str, authorization=[]) -> types.EosAction:
return types.EosAction(
account=contract,
name='announcelink',
authorization=authorization,
data={
'creator': creator,
'key': key,
'asset_ids': asset_ids,
'memo': memo
}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment