Last active
November 28, 2022 03:03
-
-
Save darkerego/dbc915b769b727e9a9cf880af90abe4d to your computer and use it in GitHub Desktop.
Etherum Keyscanner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
# modified fetch function with semaphore | |
import asyncio | |
import json | |
import pprint | |
import logging | |
import aiofiles | |
import aiohttp | |
from web3 import Web3, HTTPProvider | |
import os | |
from os.path import join, dirname | |
from dotenv import load_dotenv | |
dotenv_path = join(dirname(__file__), '.env') | |
load_dotenv(dotenv_path) | |
ethplorer_api_key = os.environ.get("ethplorer_api_key") | |
infura_api_key = os.environ.get("infura_api_key") | |
logging.basicConfig(filename='explorer.log', | |
filemode='a', | |
format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', | |
datefmt='%H:%M:%S', | |
level=logging.INFO) | |
class EtherScanner: | |
def __init__(self, api_key, infura_key, output=None, threshold=0, batch_size=10): | |
self.api_key = api_key | |
self.infura_key = infura_key | |
self.connection = Web3(HTTPProvider(f'https://mainnet.infura.io/v3/{infura_key}')) | |
self.endpoint = 'https://api.ethplorer.io' | |
self.log = output | |
self.logger = logging.Logger(name='Eth Explorer') | |
self.sem = asyncio.Semaphore(batch_size) | |
self.total_value = 0 | |
self.threshold = threshold | |
async def __ainit__(self): | |
self.session = aiohttp.ClientSession() | |
async def log_to_disc(self, data): | |
async with aiofiles.open(self.log, 'a') as f: | |
await f.write(pprint.pformat(data)) | |
def privkey_to_address(self, key): | |
acct = self.connection.eth.account.from_key(str(key)) | |
return acct.address | |
def override_price(self, contract): | |
if contract == '0x2b591e99afe9f32eaa6214f7b7629768c40eeb39': | |
return 0.0274 | |
return 0 | |
async def parse(self, resp): | |
resp = json.loads(resp.decode()) | |
eth_bal = resp['ETH']['balance'] | |
eth_address = resp['address'] | |
# print(f'ETH address: {eth_address} balance: {eth_bal}') | |
json_result = {} | |
json_result['address'] = eth_address | |
json_result['eth_bal'] = eth_bal | |
json_result['tokens'] = {} | |
if resp.get('tokens'): | |
tokens = resp['tokens'] | |
for token in tokens: | |
decimal_m = 1 | |
# print(token) | |
token_dec = token['tokenInfo']['decimals'] | |
for x in range(int(token_dec)): | |
decimal_m *= 10 | |
token_bal = float(token['balance']) / decimal_m | |
token_name = token['tokenInfo']['name'] | |
token_symbol = token['tokenInfo']['symbol'] | |
token_address = token['tokenInfo']['address'] | |
json_result['tokens'][token_symbol] = {} | |
json_result['tokens'][token_symbol]['Name'] = token_name | |
json_result['tokens'][token_symbol]['Symbol'] = token_symbol | |
json_result['tokens'][token_symbol]['Address'] = token_address | |
json_result['tokens'][token_symbol]['Balance'] = token_bal | |
result = f'Token: {token_name}\n' | |
result += f'Symbol: {token_symbol}\n' | |
result += f'Address: {token_address}\n' | |
result += f'Balance: {token_bal}\n' | |
if token['tokenInfo']['price']: | |
check_override = self.override_price(token_address) | |
if check_override == 0: | |
token_price = token['tokenInfo']['price']['bid'] | |
else: | |
token_price = check_override | |
token_value = float(token_price) * (float(token_bal)) | |
self.total_value += token_value | |
result += f'Value: ${token_value}\n' | |
json_result['tokens'][token_symbol]['Value'] = token_value | |
else: | |
result += 'Value: None/Unknown' | |
json_result['tokens'][token_symbol]['Value'] = 'None/Unknown' | |
pprint.pprint(json_result) | |
await self.log_to_disc(json_result) | |
async def fetch(self, url): | |
async with self.session.get(url) as response: | |
resp = await response.read() | |
stat = response.status | |
# print("{}:{} with delay {}".format(date, resp, stat)) | |
# pprint.pprint(resp.decode()) | |
return stat, resp | |
async def bound_fetch(self, url): | |
# Getter function with semaphore. | |
async with self.sem: | |
return await self.fetch(url) | |
async def fetch_parse(self, url): | |
stat, resp = await self.bound_fetch(url) | |
if stat == 200: | |
try: | |
await self.parse(resp) | |
except KeyError as err: | |
self.logger.error(f'Key Error: {err} with url {url}') | |
return 200, resp | |
else: | |
return 0, False | |
async def run(self, urls): | |
# url = "http://localhost:8080/{}" | |
tasks = [] | |
# create instance of Semaphore | |
# Create client session that will ensure we dont open new connection | |
# per each request. | |
for url in urls: | |
# pass Semaphore and session to every GET request | |
task = asyncio.ensure_future(self.fetch_parse(url)) | |
tasks.append(task) | |
responses = asyncio.gather(*tasks) | |
return await responses | |
async def address_info(self, addresses): | |
urls = [] | |
for address in addresses: | |
urls.append(f'{self.endpoint}/getAddressInfo/{address}?apiKey={self.api_key}') | |
return await self.run(urls) | |
def divide_chunks(self, l, n): | |
# looping till length l | |
for i in range(0, len(l), n): | |
yield l[i:i + n] | |
async def main(): | |
args = argparse.ArgumentParser() | |
args.add_argument('-f', '--file', dest='input', help='List of private keys') | |
args.add_argument('-o', '--output', dest='output', default='output.log', help='Output positive balances to this file') | |
args.add_argument('-b', '--batch', type=int, default=5, help='Send in batch of this size to api') | |
args.add_argument('-t', '--threshold', type=float, default=2, | |
help='Do not add to total if value not at least this size') | |
args = args.parse_args() | |
#load_dotenv() | |
#ethplorer_apikey = | |
keys = [] | |
api = EtherScanner(api_key=ethplorer_api_key, infura_key=infura_api_key, output=args.output, threshold=args.threshold) | |
await api.__ainit__() | |
with open(args.input, 'r') as f: | |
f = f.readlines() | |
for line in f: | |
keys.append(line.strip('\r\n')) | |
pubkeys = [] | |
for key in keys: | |
try: | |
addr = api.privkey_to_address(key) | |
except ValueError: | |
print(f'error with {key}') | |
else: | |
# print(f'Checking {addr}') | |
pubkeys.append(addr) | |
print(f'Loaded {len(pubkeys)} keys ... ') | |
if len(pubkeys) > args.batch: | |
batches = api.divide_chunks(pubkeys, args.batch) | |
for batch in batches: | |
await api.address_info(batch) | |
await asyncio.sleep(1) | |
print('-------------------') | |
print('TOTAL VALUE SO FAR:') | |
print(f'${api.total_value}') | |
print('-------------------') | |
else: | |
await api.address_info(pubkeys) | |
loop = asyncio.get_event_loop() | |
future = asyncio.ensure_future(main()) | |
loop.run_until_complete(future) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment