Skip to content

Instantly share code, notes, and snippets.

@mikeshultz
Created January 8, 2021 22:48
Show Gist options
  • Save mikeshultz/aec0d5c94998be4ef827226b88ed8547 to your computer and use it in GitHub Desktop.
Save mikeshultz/aec0d5c94998be4ef827226b88ed8547 to your computer and use it in GitHub Desktop.
#!/bin/env python3
"""
Detect any contracts that selfdestruct that interacted with a contract.
1) Look for code before, during, and after contract, any transition from
having code to having no code is suspect.
2) If not marked as suspect and no code found, look through etherscan for a
contract creation transaction in the account's history for good measure
"""
import sys
import requests
from argparse import ArgumentParser
from web3 import Web3
TX_ENDPOINT = 'https://api.etherscan.io/api?module=account&action=txlist&address={address}&startblock={startblock}&endblock=99999999&sort=asc&apikey={apikey}'
INTERNAL_TX_ENDPOINT = 'https://api.etherscan.io/api?module=account&action=txlistinternal&address={address}&startblock={startblock}&endblock=99999999&sort=asc&apikey={apikey}'
def parse_args(argv):
parser = ArgumentParser()
parser.add_argument('-e', '--epoch', dest='epoch', type=int, default=0,
help='Block number to use as epoch for given contract')
parser.add_argument('-c', '--contract', dest='contract',
help='Contract address to probe')
parser.add_argument('-u', '--url', dest='endpoint', type=str,
default="http://localhost:8545",
help='JSON-RPC endpoint to query')
parser.add_argument('-a', '--etherscan-key', dest='etherscan_key',
help='Etherscan API key')
return parser.parse_args(argv)
def get_endpoint(link_template, address, etherscan_api_key, startblock=0):
return link_template.format(
address=address,
apikey=etherscan_api_key,
startblock=startblock,
)
def get_normal_transactions(address, etherscan_key, epoch):
txr = requests.get(get_endpoint(
TX_ENDPOINT,
address,
etherscan_key,
startblock=epoch
))
if txr.status_code != 200:
raise Exception(
'Unexpected HTTP status code from etherscan {}'.format(
txr.status_code
)
)
return txr.json()
def get_internal_transactions(address, etherscan_key, epoch):
inr = requests.get(get_endpoint(
INTERNAL_TX_ENDPOINT,
address,
etherscan_key,
startblock=epoch
))
if inr.status_code != 200:
raise Exception(
'Unexpected HTTP status code from etherscan {}'.format(
inr.status_code
)
)
return inr.json()
def get_etherscan_transactions(args):
txjason = get_normal_transactions(
args.contract,
args.etherscan_key,
args.epoch
)
injason = get_internal_transactions(
args.contract,
args.etherscan_key,
args.epoch
)
combined = txjason['result']
combined.extend(injason['result'])
return sorted(
combined,
key=lambda x: '{}-{}'.format(
x['blockNumber'].zfill(9),
x['transactionIndex'].zfill(3)
)
)
def ensure_list_append(l, k, v):
if l.get(k) is None:
l[k] = list()
l[k].append(v)
def anormal(v):
if not v or v == '0x':
return None
return Web3.toChecksumAddress(v)
def main():
suspects = []
account_transactions = {}
args = parse_args(sys.argv[1:])
web3 = Web3(Web3.HTTPProvider(args.endpoint))
latest_block = web3.eth.blockNumber
transactions = get_etherscan_transactions(args)
for tx in transactions:
ensure_list_append(account_transactions, tx['from'], tx)
ensure_list_append(account_transactions, tx['to'], tx)
for account in account_transactions.keys():
has_code = False
is_suspect = False
blocks = [tx['blockNumber'] for tx in account_transactions[account]]
normal = anormal(account)
print('checking account: ', normal)
for block in blocks:
block = int(block)
try:
prev_has_code = web3.eth.getCode(
normal,
block_identifier=block-1
) != b''
except ValueError as err:
if 'not found' in str(err):
print('Unable to execute eth_getCode [{}, {}]'.format(
normal,
block-1
))
else:
raise err
try:
now_has_code = web3.eth.getCode(
normal,
block_identifier=block
) != b''
except ValueError as err:
if 'not found' in str(err):
print('Unable to execute eth_getCode [{}, {}]'.format(
normal,
block-1
))
else:
raise err
try:
next_has_code = web3.eth.getCode(
normal,
block_identifier=block+1
) != b''
except ValueError as err:
if 'not found' in str(err):
print('Unable to execute eth_getCode [{}, {}]'.format(
normal,
block-1
))
else:
raise err
if (
has_code and not prev_has_code
or prev_has_code and not now_has_code
or now_has_code and not next_has_code
):
suspects.append(account)
break
latest_has_code = web3.eth.getCode(
normal,
block_identifier=latest_block
) != b''
if has_code and not latest_has_code:
suspects.append(account)
# For good measure, look through the account history to see if
# there's a contract creation transaction
if not is_suspect and not latest_has_code:
res = get_normal_transactions(
account,
args.etherscan_key,
0
)
account_txs = res['result']
to_txs = list(filter(
lambda x: normal == anormal(x['to']),
account_txs
))
# Look for transactions that created this contract account
if len(to_txs) > 0 and any([
x['contractAddress'] == anormal(account)
for x in to_txs
if x.get('contractAddress') is not None
]):
suspects.append(account)
print('Suspects:', suspects)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment