Last active
September 1, 2022 14:09
-
-
Save Vyryn/21bdfe0d3ec66baf62eeadc60fc6db41 to your computer and use it in GitHub Desktop.
Utilities for reliably publishing transactions to the wax network with notoriously unreliable api endpoints.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Utilities for interacting with unreliable Wax endpoints for transactions on that network. | |
Copyright (C) 2021 Vyryn | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU Affero General Public License as | |
published by the Free Software Foundation, either version 3 of the | |
License, or (at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU Affero General Public License for more details. | |
You should have received a copy of the GNU Affero General Public License | |
along with this program. If not, see <https://www.gnu.org/licenses/>. | |
""" | |
import asyncio | |
import binascii | |
import hashlib | |
import random | |
from typing import List, Union | |
from json import JSONDecodeError, dumps | |
import aiohttp | |
from aioeos import EosTransaction, EosKey, EosJsonRpc, EosAction, serializer | |
from aioeos.contracts import eosio_token | |
from aioeos.exceptions import EosAssertMessageException, EosRpcException | |
from aioeos.rpc import ERROR_NAME_MAP | |
from wax_contracts import atomicassets, atomictoolsx | |
wax_apis = [ | |
'https://wax.pink.gg', | |
'https://api.waxsweden.org', | |
'https://wax.greymass.com', | |
'https://wax.eosn.io', | |
'https://wax.blokcrafters.io', | |
'https://api-wax.eosarabia.net', | |
'https://wax.eoseoul.io', | |
'https://chain.wax.io' | |
] | |
wax_history_api = '/v2/history/get_transaction' | |
def log(message: typing.Any, severity='INFO') -> None: | |
""" | |
Prints message to console if bot's severity level warrants it. Allows more customizability in what to log and | |
what not to log than classic critical, error, warn, info, debug. | |
""" | |
if severity in settings.DONT_LOG: | |
return | |
print(f'[{severity}] {repr(message)}') | |
def format_wax_amount(amount): | |
total = f'{amount:.8f}' | |
return f'{total} WAX' | |
class InvalidWaxCardSend(InvalidResponse): | |
pass | |
class NoCardsException(UnableToCompleteRequestedAction): | |
pass | |
class InvalidInput(UserWarning): | |
"""The user provided input is invalid.""" | |
class InvalidResponse(ValueError): | |
"""A response from an external API was invalid.""" | |
class UnableToCompleteRequestedAction(Exception): | |
"""I was unable to complete the requested action and it was my fault.""" | |
class EosJsonRpcWrapper(EosJsonRpc): | |
"""Wrapper class for EosJsonRpc to reuse an aiohttp session which is good practice.""" | |
def __init__(self, url: str, ses: aiohttp.ClientSession = None) -> None: | |
self.ses = ses | |
super().__init__(url) | |
async def post(self, endpoint: str, json=None) -> dict: | |
if json is None: | |
json = {} | |
if self.ses is not None: | |
async with self.ses.post( | |
f'{self.URL}/v1{endpoint}', | |
json=json | |
) as res: | |
try: | |
resp_dict = await res.json(content_type=None) | |
except JSONDecodeError: | |
resp_dict = {'code': 500, 'error': {'name': 'JSONDecodeError'}} | |
# Poor coding practice, but this is what the lib uses. I've added use of status but kept code too as | |
# it is what the lib uses. | |
if res.status == 500 or resp_dict.get('code') == 500: | |
error = resp_dict.get('error', {}) | |
raise ERROR_NAME_MAP.get( | |
error.get('name'), | |
EosRpcException | |
)(error) | |
return resp_dict | |
# If self has no session, just use super.post which creates a session and cleans up each time. This is done | |
# instead of making a self.ses if one isn't provided in order to ensure proper cleanup without requiring use | |
# of a context manager to invoke this object. | |
return await super().post(endpoint, json=json) | |
def get_resp_code(response): | |
code = response.get('code', 0) | |
if code == 0 or not type(code) == int: | |
code = response.get('statusCode', 0) | |
try: | |
code = int(code) | |
except TypeError: | |
pass | |
return code | |
class WaxConnection: | |
"""Manager class for idempotently coordinating multiple EosJsonRpcWrappers to increase robustness of | |
connection to the network with unreliable APIs. Replaces sign_and_push_transaction with custom | |
execute_transactio so this is a generalized class but also provides methods for some specialized | |
high level interactions. I suppose this is what you do when you really want to use a library you don't like. | |
Bot object can be any object with a session attribute and a wax_ac attribute. Wax_ac should be a wax account | |
object, and session an aiohttp client session.""" | |
def __init__(self, bot, addrs: [str] = None) -> None: | |
self.adders = addrs or wax_apis | |
self.wax_ac = bot.wax_ac | |
self.bot = bot | |
self.session = self.bot.session | |
self.rpc = [EosJsonRpcWrapper(addr, ses=self.session) for addr in self.adders] | |
# A semi-random sampling that can evolve over time as errors come up to help ensure the most reliable api is | |
# the most frequently used. | |
self.weighted_rpc = self.rpc.copy() | |
self.weighted_rpc.extend([random.choice(self.rpc) for _ in range(20)]) | |
self.log = lambda x: log(x, 'DBUG') | |
async def execute_transaction( | |
self, actions: Union[EosAction, List[EosAction]], context_free_bytes=bytes(32)) -> dict: | |
"""Attempts to sign and push a transaction to one API. Failing that, it goes to another all the way down the | |
list. Pass a list of EosActions or a single EosAction.""" | |
# Convert to list if it isn't one already | |
if not isinstance(actions, list): | |
actions = [actions] | |
failed_rpcs, suc = set(), 'None' | |
block, chain_id = None, None | |
# Try getting the head block from each rpc until one succeeds | |
for rpc in self.rpc: | |
try: | |
self.log(f'Attempting to get head block from {rpc.URL}') | |
block = await rpc.get_head_block() | |
chain_id = await rpc.get_chain_id() | |
for action in actions: | |
if isinstance(action.data, dict): | |
abi_bin = await self.rpc[0].abi_json_to_bin( | |
action.account, action.name, action.data | |
) | |
action.data = binascii.unhexlify(abi_bin['binargs']) | |
suc = rpc.URL | |
break | |
except IndexError: | |
failed_rpcs.add(rpc.URL) | |
continue | |
except aiohttp.ClientConnectorError: | |
failed_rpcs.add(rpc.URL) | |
continue | |
if block is None: | |
raise InvalidWaxCardSend(f'Failed to get head block from any of my {len(self.rpc)} RPC urls.') | |
if failed_rpcs: | |
self.log(f'Failed to get head block from {failed_rpcs} but eventually got {block} from {suc}.') | |
transaction = EosTransaction( | |
ref_block_num=block['block_num'] & 65535, | |
ref_block_prefix=block['ref_block_prefix'], | |
actions=actions) | |
# Serialize transaction just once for all APIs so it will be idempotent, very important if sending to | |
# several nodes. | |
serialized_transaction = serializer.serialize(transaction) | |
digest = hashlib.sha256( | |
b''.join((chain_id, serialized_transaction, context_free_bytes)) | |
).digest() | |
signatures = [self.wax_ac.key.sign(digest)] | |
serialized_transaction = (binascii.hexlify(serialized_transaction).decode()) | |
future = self.bot.loop.create_future() # for the callback upon success | |
# Send the transaction to all connected nodes simultaneously. | |
tasks = [asyncio.create_task(self.tx(future, rpc, signatures, serialized_transaction)) for rpc in self.rpc] | |
# Add a timeout so this doesn't hang indefinitely if no APIs are working | |
async def timeout(fut: asyncio.Future, length: int): | |
await asyncio.sleep(length) | |
fut.set_result(None) | |
tasks.append(asyncio.create_task(timeout(future, 10))) | |
result = await future | |
if not result: | |
raise InvalidWaxCardSend('Hmm, all the APIs I am connected to seem to be down at the moment.') | |
for task in tasks: | |
task.cancel() | |
return result | |
async def tx(self, fut: asyncio.Future, rpc: EosJsonRpcWrapper, signatures: list, serialized_transaction: str) \ | |
-> None: | |
"""Does a single transaction push to a single EosJsonRpcWrapper object, if it is successful sets the | |
callback future.""" | |
try: | |
resp = await rpc.push_transaction(signatures=signatures, serialized_transaction=serialized_transaction) | |
except EosAssertMessageException as e: | |
raise InvalidInput(e) | |
content = dumps(resp).replace('\\', '') | |
if 'authorization' in content and 'block_num' in content: | |
self.log(f'I think {content} is a valid tx.') | |
fut.set_result(resp) | |
async def transfer_funds(self, receiver: str, amount, sender: str = 'Unknown'): | |
prep_amount = format_wax_amount(amount) | |
actions = [ | |
eosio_token.transfer( | |
from_addr=self.wax_ac.name, | |
to_addr=receiver, | |
quantity=prep_amount, | |
memo=f'Funds transfer by {sender} on behalf of NFT Tip Bot.', | |
authorization=[self.wax_ac.authorization('active')] | |
) | |
] | |
return await self.execute_transaction(actions) | |
async def transfer_assets(self, receiver: str, asset_ids: [int], sender: str = 'Unknown', memo: str = None): | |
if not memo: | |
memo = f'Asset transfer by {sender} on behalf of NFT Tip Bot.' | |
actions = [ | |
atomicassets.transfer( | |
from_addr=self.wax_ac.name, | |
to_addr=receiver, | |
asset_ids=asset_ids, | |
memo=memo, | |
authorization=[self.wax_ac.authorization('active')] | |
) | |
] | |
return await self.execute_transaction(actions) | |
async def mint_asset(self, to_addr: str, template_id: int, test=False): | |
if test: | |
collection = 'testcol12345' | |
schema = 'testcard' | |
else: | |
collection = 'crptomonkeys' | |
schema = 'crptomonkeys' | |
actions = [ | |
atomicassets.mintasset( | |
minter=self.wax_ac.name, | |
to_addr=to_addr, | |
collection=collection, | |
schema=schema, | |
template_id=template_id, | |
authorization=[self.wax_ac.authorization('active')] | |
) | |
] | |
return await self.execute_transaction(actions) | |
def update_weighted_rpc(self, faulty: EosJsonRpcWrapper) -> None: | |
"""Does one update of the semi-random weighted rpc list by removing the faulty entry and adding a random one | |
if the selection is too thin or too small.""" | |
try: | |
self.weighted_rpc.remove(faulty) | |
except ValueError: | |
pass | |
if len(self.weighted_rpc) < 20 or len(set(self.weighted_rpc)) < 3: | |
self.weighted_rpc.append(random.choice(self.rpc)) | |
async def get_link_id_and_confirm_claimlink_creation(self, tx_id) -> str: | |
"""Attempts to confirm that a claimlink was successfully created and get its link_id to present to the | |
recipient in a claimlink.""" | |
cycles = 0 | |
params = { | |
'id': tx_id | |
} | |
selected = '' | |
while cycles < 30: | |
await asyncio.sleep(min(2 ** cycles, 64)) # Exponential backoff | |
selected = random.choice(self.weighted_rpc) | |
host = selected.URL | |
try: | |
async with self.session.get(host + wax_history_api, params=params) as resp: | |
response = await resp.json() | |
self.log(f'Response to attempt to get history for {tx_id}: {response}') | |
code = get_resp_code(response) | |
if code < 400 and response.get('executed', False): | |
link_id = response['actions'][1]['act']['data']['link_id'] | |
return link_id | |
except aiohttp.ServerDisconnectedError: | |
self.update_weighted_rpc(selected) | |
# this_host_failed += 1 | |
await asyncio.sleep(random.randrange(1, 5)) | |
except (aiohttp.ContentTypeError, IndexError, KeyError): | |
self.update_weighted_rpc(selected) | |
self.update_weighted_rpc(selected) | |
raise InvalidWaxCardSend('I submitted the transaction, but WAX failed to process it within' | |
f' 27 minutes, so the attempt has timed out. I have tried submitting' | |
f' to {len(self.rpc)} different APIs during this time but was unable ' | |
f'to confirm the transaction through any of them.') | |
async def create_claimlink(self, asset_ids: [int], memo=None, wait_for_confirmation=True) -> (int, str): | |
"""Creates and returns a claimlink for the specified asset ids.""" | |
if not memo: | |
memo = 'NFT Tip Bot reward claimlink.' | |
memo += ' WARNING: Tip bot claimlinks may be cancelled 91 days after issuance.' | |
# Generate the link's keypair | |
keypair = EosKey() | |
priv_key = keypair.to_wif() | |
key = keypair.to_public() | |
actions = [ | |
atomictoolsx.announcelink( | |
creator=self.wax_ac.name, | |
key=key, | |
asset_ids=asset_ids, | |
memo=memo, | |
authorization=[self.wax_ac.authorization('active')]), | |
atomicassets.transfer( | |
from_addr=self.wax_ac.name, | |
to_addr='atomictoolsx', | |
asset_ids=asset_ids, | |
memo='link', | |
authorization=[self.wax_ac.authorization('active')]) | |
] | |
result = await self.execute_transaction(actions) | |
tx_id = result['transaction_id'] | |
self.log(f'Claimlink submission. Result is: {result}') | |
# Need to confirm tx_id to know what link to give user. If wait_for_confirmation is off then take best guess. | |
if wait_for_confirmation: | |
link_id = await self.get_link_id_and_confirm_claimlink_creation(tx_id) | |
else: | |
link_id = str(result).split("link_id': ")[1].split(',')[0] | |
link_id = link_id[1:].split("'")[0] | |
link = f'https://wax.atomichub.io/trading/link/{link_id}?key={priv_key}' | |
return link | |
async def get_random_cryptomonkey_link(self, user: str, memo='', num=1) -> str: | |
if memo == '': | |
f'Random NFT reward for {user}.' | |
else: | |
memo += f' ({user})' | |
if len(self.bot.cached_card_ids) < 1: | |
raise NoCardsException(f"The NFT Tip Bot account is empty, so I can't send {user} any more cards") | |
if len(memo) > 256: | |
raise InvalidInput(f'Your memo must be less than 256 characters long. With the bit I add,' | |
f'it is currently {len(memo)} characters long.') | |
# Choose an asset and make a claim link. Ensure parallel executions can't choose the same asset. | |
selected_asset_ids = [] | |
while self.bot.cl_reentrancy_guard: | |
await asyncio.sleep(0.2) | |
self.bot.cl_reentrancy_guard = True | |
n_cards = len(self.bot.cached_card_ids) | |
if n_cards < num: | |
raise NoCardsException(f"The bot account only has {n_cards} cards at the moment, " | |
f"so I can't send {user} {num} cards.") | |
for i in range(num): | |
# bot.cached_card_ids is refreshed every minute and shuffled at fetch time. | |
selected_asset_ids.append(self.bot.cached_card_ids.pop()) | |
self.bot.cl_reentrancy_guard = False | |
link = await self.create_claimlink(selected_asset_ids, memo=memo) | |
return link | |
'''atomicasets.py''' | |
"""Helpers for creating actions on atomicassets contract. By Vyryn""" | |
from aioeos import types | |
contract = 'atomicassets' | |
def mintasset(minter: str, to_addr: str, collection: str, schema: str, template_id: int = -1, | |
immutable_data=[], mutable_data=[], authorization=[]) -> types.EosAction: | |
return types.EosAction( | |
account=contract, | |
name='mintasset', | |
authorization=authorization, | |
data={ | |
'authorized_minter': minter, | |
'collection_name': collection, | |
'schema_name': schema, | |
'template_id': template_id, | |
'new_asset_owner': to_addr, | |
'immutable_data': immutable_data, | |
'mutable_data': mutable_data, | |
'tokens_to_back': [] | |
} | |
) | |
def transfer(from_addr: str, to_addr: str, asset_ids: [int], memo: str = '', authorization=[]) -> types.EosAction: | |
return types.EosAction( | |
account=contract, | |
name='transfer', | |
authorization=authorization, | |
data={ | |
'from': from_addr, | |
'to': to_addr, | |
'asset_ids': asset_ids, | |
'memo': memo | |
} | |
) | |
'''atomictools.py''' | |
"""Helpers for creating actions on atomictoolsx contract. By Vyryn""" | |
from aioeos import types | |
contract = 'atomictoolsx' | |
def announcelink(creator: str, key: str, asset_ids: [int], memo: str, authorization=[]) -> types.EosAction: | |
return types.EosAction( | |
account=contract, | |
name='announcelink', | |
authorization=authorization, | |
data={ | |
'creator': creator, | |
'key': key, | |
'asset_ids': asset_ids, | |
'memo': memo | |
} | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment