Skip to content

Instantly share code, notes, and snippets.

@darkerego
Last active November 28, 2022 03:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darkerego/dbc915b769b727e9a9cf880af90abe4d to your computer and use it in GitHub Desktop.
Save darkerego/dbc915b769b727e9a9cf880af90abe4d to your computer and use it in GitHub Desktop.
Etherum Keyscanner
import argparse
# modified fetch function with semaphore
import asyncio
import json
import pprint
import logging
import aiofiles
import aiohttp
from web3 import Web3, HTTPProvider
import os
from os.path import join, dirname
from dotenv import load_dotenv
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
ethplorer_api_key = os.environ.get("ethplorer_api_key")
infura_api_key = os.environ.get("infura_api_key")
logging.basicConfig(filename='explorer.log',
filemode='a',
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
datefmt='%H:%M:%S',
level=logging.INFO)
class EtherScanner:
def __init__(self, api_key, infura_key, output=None, threshold=0, batch_size=10):
self.api_key = api_key
self.infura_key = infura_key
self.connection = Web3(HTTPProvider(f'https://mainnet.infura.io/v3/{infura_key}'))
self.endpoint = 'https://api.ethplorer.io'
self.log = output
self.logger = logging.Logger(name='Eth Explorer')
self.sem = asyncio.Semaphore(batch_size)
self.total_value = 0
self.threshold = threshold
async def __ainit__(self):
self.session = aiohttp.ClientSession()
async def log_to_disc(self, data):
async with aiofiles.open(self.log, 'a') as f:
await f.write(pprint.pformat(data))
def privkey_to_address(self, key):
acct = self.connection.eth.account.from_key(str(key))
return acct.address
def override_price(self, contract):
if contract == '0x2b591e99afe9f32eaa6214f7b7629768c40eeb39':
return 0.0274
return 0
async def parse(self, resp):
resp = json.loads(resp.decode())
eth_bal = resp['ETH']['balance']
eth_address = resp['address']
# print(f'ETH address: {eth_address} balance: {eth_bal}')
json_result = {}
json_result['address'] = eth_address
json_result['eth_bal'] = eth_bal
json_result['tokens'] = {}
if resp.get('tokens'):
tokens = resp['tokens']
for token in tokens:
decimal_m = 1
# print(token)
token_dec = token['tokenInfo']['decimals']
for x in range(int(token_dec)):
decimal_m *= 10
token_bal = float(token['balance']) / decimal_m
token_name = token['tokenInfo']['name']
token_symbol = token['tokenInfo']['symbol']
token_address = token['tokenInfo']['address']
json_result['tokens'][token_symbol] = {}
json_result['tokens'][token_symbol]['Name'] = token_name
json_result['tokens'][token_symbol]['Symbol'] = token_symbol
json_result['tokens'][token_symbol]['Address'] = token_address
json_result['tokens'][token_symbol]['Balance'] = token_bal
result = f'Token: {token_name}\n'
result += f'Symbol: {token_symbol}\n'
result += f'Address: {token_address}\n'
result += f'Balance: {token_bal}\n'
if token['tokenInfo']['price']:
check_override = self.override_price(token_address)
if check_override == 0:
token_price = token['tokenInfo']['price']['bid']
else:
token_price = check_override
token_value = float(token_price) * (float(token_bal))
self.total_value += token_value
result += f'Value: ${token_value}\n'
json_result['tokens'][token_symbol]['Value'] = token_value
else:
result += 'Value: None/Unknown'
json_result['tokens'][token_symbol]['Value'] = 'None/Unknown'
pprint.pprint(json_result)
await self.log_to_disc(json_result)
async def fetch(self, url):
async with self.session.get(url) as response:
resp = await response.read()
stat = response.status
# print("{}:{} with delay {}".format(date, resp, stat))
# pprint.pprint(resp.decode())
return stat, resp
async def bound_fetch(self, url):
# Getter function with semaphore.
async with self.sem:
return await self.fetch(url)
async def fetch_parse(self, url):
stat, resp = await self.bound_fetch(url)
if stat == 200:
try:
await self.parse(resp)
except KeyError as err:
self.logger.error(f'Key Error: {err} with url {url}')
return 200, resp
else:
return 0, False
async def run(self, urls):
# url = "http://localhost:8080/{}"
tasks = []
# create instance of Semaphore
# Create client session that will ensure we dont open new connection
# per each request.
for url in urls:
# pass Semaphore and session to every GET request
task = asyncio.ensure_future(self.fetch_parse(url))
tasks.append(task)
responses = asyncio.gather(*tasks)
return await responses
async def address_info(self, addresses):
urls = []
for address in addresses:
urls.append(f'{self.endpoint}/getAddressInfo/{address}?apiKey={self.api_key}')
return await self.run(urls)
def divide_chunks(self, l, n):
# looping till length l
for i in range(0, len(l), n):
yield l[i:i + n]
async def main():
args = argparse.ArgumentParser()
args.add_argument('-f', '--file', dest='input', help='List of private keys')
args.add_argument('-o', '--output', dest='output', default='output.log', help='Output positive balances to this file')
args.add_argument('-b', '--batch', type=int, default=5, help='Send in batch of this size to api')
args.add_argument('-t', '--threshold', type=float, default=2,
help='Do not add to total if value not at least this size')
args = args.parse_args()
#load_dotenv()
#ethplorer_apikey =
keys = []
api = EtherScanner(api_key=ethplorer_api_key, infura_key=infura_api_key, output=args.output, threshold=args.threshold)
await api.__ainit__()
with open(args.input, 'r') as f:
f = f.readlines()
for line in f:
keys.append(line.strip('\r\n'))
pubkeys = []
for key in keys:
try:
addr = api.privkey_to_address(key)
except ValueError:
print(f'error with {key}')
else:
# print(f'Checking {addr}')
pubkeys.append(addr)
print(f'Loaded {len(pubkeys)} keys ... ')
if len(pubkeys) > args.batch:
batches = api.divide_chunks(pubkeys, args.batch)
for batch in batches:
await api.address_info(batch)
await asyncio.sleep(1)
print('-------------------')
print('TOTAL VALUE SO FAR:')
print(f'${api.total_value}')
print('-------------------')
else:
await api.address_info(pubkeys)
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main())
loop.run_until_complete(future)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment