Skip to content

Instantly share code, notes, and snippets.

@BlinkyStitt
Last active March 29, 2022 18:27
Show Gist options
  • Save BlinkyStitt/608d522ff68fd06ccc2d9c057ba4c206 to your computer and use it in GitHub Desktop.
Save BlinkyStitt/608d522ff68fd06ccc2d9c057ba4c206 to your computer and use it in GitHub Desktop.
force debug_traceTransaction
import logging
from brownie import accounts, chain, network, web3
from brownie._config import CONFIG
from requests.exceptions import ReadTimeout
from time import sleep
from flashprofits.transaction_helpers import get_transaction
logger = logging.getLogger(__name__)
def quick_change_network(name, block=None):
"""Change networks without preserving anything (like accounts)."""
if network.is_connected():
# web3 responded
if block and block < 0:
block = chain[-1].number + block
if network.show_active() == name:
# we are already connected to the desired network
if block:
# check that we are also connected to the correct block
try:
# TODO: something is wrong here. we are getting hardhat's --fork-block-number flag even though we are trying to use ganache
fork_block = CONFIG.active_network["cmd_settings"]["fork_block"]
except KeyError:
pass
else:
if fork_block == block:
# we are forked at the right block. no need to do anything
# TODO: actually, maybe we should check that chain[-1] is our block too? otherwise there might be state changes from other tests
return
# TODO: if we are on hardhat, we can send an rpc request instead of disconnecting
# we are connected to a different network/wrong fork block, disconnect
network.disconnect()
else:
if block and block < 0:
network.connect(name)
block = chain[-1].number + block
network.disconnect()
if block:
CONFIG.networks[name]["cmd_settings"]["fork_block"] = block
# TODO: do something so that we get a unique port. i think brownie's parallel runner code should be able to do this for us
network.connect(name)
def make_transaction_traceable(tx):
# first, try to do something that gets the trace. if that works, we don't need to do anything special
try:
logger.debug("Attempting to get internal_transfers for %s", tx.txid)
tx.internal_transfers
except Exception as e:
# reset _internal_transfers to None so that
# tx._internal_transfers = None
# TODO: warning?
# TODO: if error is f"Network id {chain_id} does not correspond to a network that Hardhat can trace", then we can't do this method either
logger.debug("Unable to fetch internal transfers for %s: %s", tx.txid, e)
else:
# we were able to trace it. no need to do anything fancy
# if you run your own node, this will be the common case. nodes almost always limit debug_traceTransaction
return tx
# figure out what network to use
# TODO: think more about this. we might need overrides
active_network = network.show_active()
if active_network.endswith("-fork"):
# primary_network, _ = active_network.rsplit('-', 1)
fork_network = active_network
else:
# primary_network = active_network
fork_network = active_network + "-fork"
# load data about the block
block = tx.block_number
target_timestamp = chain[block].timestamp
# turn off autofetch sources. we don't need the sources for every transaction in the block. that slows us down and we don't use them
old_autofetch = CONFIG.settings.get("autofetch_sources")
CONFIG.settings["autofetch_sources"] = False
# get the transactions that we need to rebuild the block
# fetching from a forked node would hit their caches, but we have our own caches on get_transaction
block_txs = []
found = False
logger.debug("fetching txs in block %s", block)
for i, other_txid in enumerate(chain[block].transactions):
logger.debug(f"fetching #{i} {other_txid.hex()}...")
# i would prefer to web3.eth.get_raw_transaction, but if we don't have debug_traceTransaction, we probably don't have it either
# TODO: try anyways? query the explorer for the raw transaction?
block_txs.append(get_transaction(other_txid, fix_unknown_events=False))
if other_txid == tx.txid:
# no need to fetch anything past our target transaction
found = True
break
if not found:
# this shouldn't happen
raise ValueError
# return autofetch_sources to the previous value
CONFIG.settings["autofetch_sources"] = old_autofetch
logger.info(f"{len(block_txs)} transaction(s) to replay")
# fork at the block before the target transaction's block
quick_change_network(fork_network, block=block - 1)
# turn off automine so that all the replayed transactions are in the same block
# TODO: different providers might do this differently. does brownie have a helper for this?
web3.provider.make_request("evm_setAutomine", [False])
# replay the transactions
unlocked = set()
for tx in block_txs:
if tx.receiver == "0x0000000000000000000000000000000000001000":
# this is a special deposit transaction and our test nodes do not handle them
# TODO: only skip on bsc?
# TODO: is skipping going to cause issues?
logger.debug("skipped validator deposit %s", tx.txid)
continue
# unlock the account
if tx.sender not in unlocked:
logger.debug(f"unlocking {tx.sender}")
accounts.at(tx.sender, force=True)
unlocked.add(tx.sender)
# send the transaction from the account
# these params probably don't perfectly match what the original sender gave, but hopefully they are close enough to get the same result
# TODO: this is probably different with EIP1559
tx_params = {
"from": tx.sender,
"gas": tx.gas_limit,
"gasPrice": tx.gas_price,
"to": tx.receiver,
"data": tx.input,
"nonce": tx.nonce,
"value": tx.value,
}
# this is an intentional typo so the logs line up
logger.debug(f"replayin {tx.txid}...")
# txids are very likely not going to match
# allow reverts. don't wait for confirmations
new_txid = web3.eth.send_transaction(tx_params).hex()
logger.debug(f"replayed {tx.txid} as {new_txid}")
# new_txid is the last tx's new txid. and the last tx is the one we care about
# mine all the replayed transactions
# TODO: this almost always times out, but it seems to work fine. investigate
try:
chain.mine(timestamp=target_timestamp)
except ReadTimeout as e:
logger.warning("%s", e)
# TODO: sleep until our block is mined?
# fetch the original transaction with its new id. this transaction will work with debug_traceTransaction
tx = chain.get_transaction(new_txid)
# TODO: compare all the transactions (at least while testing)
# TODO: make sure chain[-1].block_number == block
logger.warning("do something so that the caller knows they need to web3.disconnect() at the end of their querying")
return tx
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment