Created
January 8, 2021 22:48
-
-
Save mikeshultz/aec0d5c94998be4ef827226b88ed8547 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
""" | |
Detect any contracts that selfdestruct that interacted with a contract. | |
1) Look for code before, during, and after contract, any transition from | |
having code to having no code is suspect. | |
2) If not marked as suspect and no code found, look through etherscan for a | |
contract creation transaction in the account's history for good measure | |
""" | |
import sys | |
import requests | |
from argparse import ArgumentParser | |
from web3 import Web3 | |
TX_ENDPOINT = 'https://api.etherscan.io/api?module=account&action=txlist&address={address}&startblock={startblock}&endblock=99999999&sort=asc&apikey={apikey}' | |
INTERNAL_TX_ENDPOINT = 'https://api.etherscan.io/api?module=account&action=txlistinternal&address={address}&startblock={startblock}&endblock=99999999&sort=asc&apikey={apikey}' | |
def parse_args(argv): | |
parser = ArgumentParser() | |
parser.add_argument('-e', '--epoch', dest='epoch', type=int, default=0, | |
help='Block number to use as epoch for given contract') | |
parser.add_argument('-c', '--contract', dest='contract', | |
help='Contract address to probe') | |
parser.add_argument('-u', '--url', dest='endpoint', type=str, | |
default="http://localhost:8545", | |
help='JSON-RPC endpoint to query') | |
parser.add_argument('-a', '--etherscan-key', dest='etherscan_key', | |
help='Etherscan API key') | |
return parser.parse_args(argv) | |
def get_endpoint(link_template, address, etherscan_api_key, startblock=0): | |
return link_template.format( | |
address=address, | |
apikey=etherscan_api_key, | |
startblock=startblock, | |
) | |
def get_normal_transactions(address, etherscan_key, epoch): | |
txr = requests.get(get_endpoint( | |
TX_ENDPOINT, | |
address, | |
etherscan_key, | |
startblock=epoch | |
)) | |
if txr.status_code != 200: | |
raise Exception( | |
'Unexpected HTTP status code from etherscan {}'.format( | |
txr.status_code | |
) | |
) | |
return txr.json() | |
def get_internal_transactions(address, etherscan_key, epoch): | |
inr = requests.get(get_endpoint( | |
INTERNAL_TX_ENDPOINT, | |
address, | |
etherscan_key, | |
startblock=epoch | |
)) | |
if inr.status_code != 200: | |
raise Exception( | |
'Unexpected HTTP status code from etherscan {}'.format( | |
inr.status_code | |
) | |
) | |
return inr.json() | |
def get_etherscan_transactions(args): | |
txjason = get_normal_transactions( | |
args.contract, | |
args.etherscan_key, | |
args.epoch | |
) | |
injason = get_internal_transactions( | |
args.contract, | |
args.etherscan_key, | |
args.epoch | |
) | |
combined = txjason['result'] | |
combined.extend(injason['result']) | |
return sorted( | |
combined, | |
key=lambda x: '{}-{}'.format( | |
x['blockNumber'].zfill(9), | |
x['transactionIndex'].zfill(3) | |
) | |
) | |
def ensure_list_append(l, k, v): | |
if l.get(k) is None: | |
l[k] = list() | |
l[k].append(v) | |
def anormal(v): | |
if not v or v == '0x': | |
return None | |
return Web3.toChecksumAddress(v) | |
def main(): | |
suspects = [] | |
account_transactions = {} | |
args = parse_args(sys.argv[1:]) | |
web3 = Web3(Web3.HTTPProvider(args.endpoint)) | |
latest_block = web3.eth.blockNumber | |
transactions = get_etherscan_transactions(args) | |
for tx in transactions: | |
ensure_list_append(account_transactions, tx['from'], tx) | |
ensure_list_append(account_transactions, tx['to'], tx) | |
for account in account_transactions.keys(): | |
has_code = False | |
is_suspect = False | |
blocks = [tx['blockNumber'] for tx in account_transactions[account]] | |
normal = anormal(account) | |
print('checking account: ', normal) | |
for block in blocks: | |
block = int(block) | |
try: | |
prev_has_code = web3.eth.getCode( | |
normal, | |
block_identifier=block-1 | |
) != b'' | |
except ValueError as err: | |
if 'not found' in str(err): | |
print('Unable to execute eth_getCode [{}, {}]'.format( | |
normal, | |
block-1 | |
)) | |
else: | |
raise err | |
try: | |
now_has_code = web3.eth.getCode( | |
normal, | |
block_identifier=block | |
) != b'' | |
except ValueError as err: | |
if 'not found' in str(err): | |
print('Unable to execute eth_getCode [{}, {}]'.format( | |
normal, | |
block-1 | |
)) | |
else: | |
raise err | |
try: | |
next_has_code = web3.eth.getCode( | |
normal, | |
block_identifier=block+1 | |
) != b'' | |
except ValueError as err: | |
if 'not found' in str(err): | |
print('Unable to execute eth_getCode [{}, {}]'.format( | |
normal, | |
block-1 | |
)) | |
else: | |
raise err | |
if ( | |
has_code and not prev_has_code | |
or prev_has_code and not now_has_code | |
or now_has_code and not next_has_code | |
): | |
suspects.append(account) | |
break | |
latest_has_code = web3.eth.getCode( | |
normal, | |
block_identifier=latest_block | |
) != b'' | |
if has_code and not latest_has_code: | |
suspects.append(account) | |
# For good measure, look through the account history to see if | |
# there's a contract creation transaction | |
if not is_suspect and not latest_has_code: | |
res = get_normal_transactions( | |
account, | |
args.etherscan_key, | |
0 | |
) | |
account_txs = res['result'] | |
to_txs = list(filter( | |
lambda x: normal == anormal(x['to']), | |
account_txs | |
)) | |
# Look for transactions that created this contract account | |
if len(to_txs) > 0 and any([ | |
x['contractAddress'] == anormal(account) | |
for x in to_txs | |
if x.get('contractAddress') is not None | |
]): | |
suspects.append(account) | |
print('Suspects:', suspects) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment