Skip to content

Instantly share code, notes, and snippets.

@itherunder
Created December 30, 2021 04:16
Show Gist options
  • Save itherunder/bb522a3a2d4aa04d24a75e0798f3819e to your computer and use it in GitHub Desktop.
Save itherunder/bb522a3a2d4aa04d24a75e0798f3819e to your computer and use it in GitHub Desktop.
监听以太坊和bsc 上面的Transfer 或者Swap 事件的,还能监听特定地址
__author__ = 'itherunder'
'''
https://etherscan.io/tokens?ps=100&p=1,从这里拿的
coins = document.getElementsByClassName('text-primary')
res = []
for (let i = 0; i < 100; i++) {
var name = coins[i].text
var href = coins[i].href
console.log(name, href)
res.push([name, href])
}
console.log(res)
只关注一些常用swap的token的大额转入转出:
big_tokens.txt
https://api.etherscan.io/api?module=proxy&action=eth_blockNumber&apikey=YourApiKeyToken
这个获取最新块高
https://api.etherscan.io/api?module=logs&action=getLogs&fromBlock=13794127&toBlock=latest&topic0=0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef&apikey=YourApiKeyToken
这个获取Transfer日志
https://api.bscscan.com/api?module=proxy&action=eth_blockNumber&apikey=YourApiKeyToken
这个获取最新块高
https://api.bscscan.com/api?module=logs&action=getLogs&fromBlock=13794127&toBlock=latest&topic0=0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef&apikey=YourApiKeyToken
这个获取Transfer日志
'''
import logging, logging.handlers
from disco.types.base import enum
from discord import channel, message
from discord.errors import ClientException
from requests.api import get
from web3 import Web3
import time, requests, json, asyncio
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib, traceback, sys, redis
from disco.api.client import APIClient
config = {}
with open('config.txt', 'r') as r:
for l in r:
if l.startswith('#'): continue
k, v = l.strip().split('#')
config[k] = v
api_key = config['etherscan_api_key']
bsc_api_key = config['bscscan_api_key']
block_number_url = 'https://api.etherscan.io/api?module=proxy&action=eth_blockNumber&apikey=' + api_key
get_log_url = 'https://api.etherscan.io/api?module=logs&action=getLogs&fromBlock=%d&toBlock=latest&topic0=%s&apikey=' + api_key
bsc_block_number_url = 'https://api.bscscan.com/api?module=proxy&action=eth_blockNumber&apikey=' + bsc_api_key
bsc_get_log_url = 'https://api.bscscan.com/api?module=logs&action=getLogs&fromBlock=%d&toBlock=latest&topic0=%s&apikey=' + bsc_api_key
transfer_id = '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
swap_id = '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822'
before_block_number, messages, messages_cnt, cur_messages_cnt = 0, [], 0, 0
bsc_before_block_number = 0
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('[%(asctime)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
console.setFormatter(formatter)
# file = logging.FileHandler('logs/info.log', mode='a')
file = logging.handlers.RotatingFileHandler('logs/info.log', mode='a', encoding='utf-8', maxBytes=10*1024*1024, backupCount=30)
file.setLevel(logging.INFO)
file.setFormatter(formatter)
logger = logging.getLogger('logger')
logger.setLevel(logging.INFO)
logger.addHandler(console)
logger.addHandler(file)
mail_from = config['mail_from']
mail_to = config['mail_to'].split('&')
pswd = config['pswd']
discord_token = config['discord_token']
channel_id = config['channel_id']
# discord_client = APIClient(discord_token)
big_tokens = {}
with open('big_tokens.txt', 'r') as r:
for l in r:
coin, addr = l.strip().split('#')
big_tokens[addr.lower()] = coin.lower()
token_decimals = {}
with open('token_decimals.txt', 'r') as r:
for l in r:
addr, decimal = l.strip().split('#')
token_decimals[addr.lower()] = int(decimal)
token_prices = {}
with open('token_prices.txt', 'r') as r:
for l in r:
addr, price = l.strip().split('#')
token_prices[addr.lower()] = float(price)
# SOS 持有者的地址
holders_address = {}
with open('../listen_address/holders_info.txt', 'r') as r:
for l in r:
addr, hold = l.strip().split('#')
addr = addr.lower()
holders_address[addr] = eval(hold)
# 大佬address 的tag
address_tags = {}
with open('../listen_address/taged_holders_address.txt', 'r', encoding='utf-8') as r:
for l in r:
addr, tag = l.strip().split('#')
addr = addr.lower()
address_tags[addr] = tag
# redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
price_url = 'https://api.coingecko.com/api/v3/coins/markets?vs_currency=usd&ids=%s'
all_tokens = json.loads(open('tokens.txt', 'r', encoding='utf-8').read())
symbol_to_id = {}
for token in all_tokens:
id, symbol = token['id'], token['symbol'].lower()
if symbol in symbol_to_id:
continue
symbol_to_id[symbol] = id
def get_price(symbol):
id = symbol_to_id[symbol.lower()]
res = requests.get(price_url % id)
if res.status_code != 200:
return (0, res.status_code)
obj = json.loads(res.text)[0]
return (obj['current_price'], obj['price_change_percentage_24h'])
# def send_discord_message(msg):
# global messages_cnt, cur_messages_cnt
# messages_cnt += 1
# cur_messages_cnt += 1
# if cur_messages_cnt % 5 == 0:
# log('[info] 还有%d 条消息待发送, 已发送%d 条消息' % (len(messages) // 5, messages_cnt), False)
# time.sleep(3)
# # return
# # LINE()
# discord_client.channels_get(channel_id)
# # LINE()
# if len(msg) > 4000: msg = msg[:4000]
# discord_client.channels_messages_create(channel_id, msg)
# # LINE()
def LINE():
print('############### LINE', sys._getframe(1).f_lineno)
# def send_discord(msg, chain='eth', tx=None):
# # LINE()
# if type(msg) != str:
# receipt = msg
# # title = '%s上监听到大额交易: Dollar: %f, Token: %s, From: %s, To: %s, Value: %d' % (chain, receipt['value'], receipt['token'], receipt['f'], receipt['t'], receipt['v'])
# title = '%s: Dollar: %f, Token: %s, Value: %d' % (chain, receipt['value'], receipt['token'], receipt['v'])
# # content = '\r\n详细信息见: \nhttps://%s/tx/%s \nhttps://etherchain.org/tx/%s' % ('etherscan.io' if chain == 'eth' else 'bscscan.com', receipt['transactionHash'], receipt['transactionHash']) # + '\r\n详细信息: %s' % tx
# # content = '\n详细信息见: https://%s/tx/%s\r\n' % ('etherscan.io' if chain == 'eth' else 'bscscan.com', receipt['transactionHash']) # + '\r\n详细信息: %s' % tx
# content = '详见: https://%s/tx/%s\r\n' % ('etherscan.io' if chain == 'eth' else 'bscscan.com', receipt['transactionHash']) # + '\r\n详细信息: %s' % tx
# else:
# title, content = msg[:32], msg
# message = title + '\r\n' + content
# messages.append(message)
# if len(messages) >= 10:
# send_discord_message('\r\n'.join(messages))
# messages.clear()
def send_mail(msg, chain='eth', tx=None):
# LINE()
if type(msg) != str:
receipt = msg
addr = receipt['holder_address']
if len(addr) != 40 and '.eth' not in addr:
return
# title = '%s上监听到大额交易: Dollar: %f, Token: %s, From: %s, To: %s, Value: %d' % (chain, receipt['value'], receipt['token'], receipt['f'], receipt['t'], receipt['v'])
title = '检测到几个大佬交易,详见content'
# content = '\r\n详细信息见: \nhttps://%s/tx/%s \nhttps://etherchain.org/tx/%s' % ('etherscan.io' if chain == 'eth' else 'bscscan.com', receipt['transactionHash'], receipt['transactionHash']) # + '\r\n详细信息: %s' % tx
# content = '\n详细信息见: https://%s/tx/%s\r\n' % ('etherscan.io' if chain == 'eth' else 'bscscan.com', receipt['transactionHash']) # + '\r\n详细信息: %s' % tx
content = '大佬地址: %s, value: %f \n 详见: https://%s/tx/%s\r\n' % (addr, receipt['v'], 'etherscan.io' if chain == 'eth' else 'bscscan.com', receipt['transactionHash']) # + '\r\n详细信息: %s' % tx
else:
title, content = msg[:32], msg
messages.append('\r\n' + content)
if len(messages) >= 100:
message = '\r\n'.join(messages)
messages.clear()
else:
return
mail = MIMEMultipart()
mail.attach(MIMEText(message, 'plain', 'utf-8'))
mail['Subject'], mail['From'] = title, mail_from
s = smtplib.SMTP_SSL('smtp.qq.com', 465)
s.login(mail_from, pswd)
for _ in mail_to:
s.sendmail(mail_from, _, mail.as_string())
def log(msg, is_send=True):
logger.info(msg)
if is_send:
send_mail(msg)
def get_cur_block_number(chain='eth'):
global before_block_number, bsc_before_block_number
res = requests.get(block_number_url if chain == 'eth' else bsc_block_number_url)
if res.status_code != 200:
log('[%s error] get block number error: wrong status code: %d' % (chain, res.status_code))
return before_block_number if chain == 'eth' else bsc_before_block_number
info = json.loads(res.text)
if 'status' in info and info['status'] != '1':
log('[%s error] get block number error: wrong block number: %d detail: %s' % (chain, before_block_number if chain == 'eth' else bsc_before_block_number, info))
return before_block_number if chain == 'eth' else bsc_before_block_number
cur_block_number = int(info['result'], 16)
return cur_block_number
# # 监听大额交易的
# def listen_big_transfer(addr, chain, txhash, receipts, receipt, threshold, f, t, v):
# if addr in big_tokens:
# # 根据token_decimals 和价格来判断是不是很多
# symbol = big_tokens[addr].lower()
# if redis_client.exists(symbol):
# price = float(redis_client.get(symbol))
# else:
# try:
# price, _ = get_price(symbol) # 使用实时的请求,如果请求出问题就使用以前获取的
# redis_client.setex(symbol, 3600, price)
# except Exception as e:
# # log('[error] request %s price error.' % symbol)
# price = 0
# if price == 0:
# log('[%s error] request %s price 429 for coingecko api: too many requests.' % (chain, symbol), True)
# price = token_prices[addr]
# value = v / (10 ** token_decimals[addr]) * price
# # log('[%s debug] the value of tx %s is %f' % (chain, txhash, value), False)
# if value > threshold:
# receipt['token'], receipt['f'], receipt['t'], receipt['v'], receipt['value'] = big_tokens[addr], f, t, v, value
# # log('[info] %s' % receipt, False)
# # log('[info] txhash: %s, value: $%f, coin: %s, addr: %s, f: %s, t: %s, v: %d' % (txhash, value, symbol, addr, f, t, v), False)
# log('[%s info] txhash: %s, value: $%f, coin: %s, v: %d' % (chain, txhash, value, symbol, v), False)
# receipts.append(receipt)
# 监听我存的一些地址的
def listen_holders_address(receipts, receipt, chain, txhash, f, t, v):
receipt['v'] = v
if f in holders_address:
log('[%s info] txhash: %s, f: %s, t: %s, v: %d' % (chain, txhash, f, t, v), False)
receipt['holder_address'] = '%s (%s)' % (f, address_tags[f]) if f in address_tags else f
receipts.append(receipt)
elif t in holders_address:
log('[%s info] txhash: %s, f: %s, t: %s, v: %d' % (chain, txhash, f, t, v), False)
receipt['holder_address'] = '%s (%s)' % (t, address_tags[t]) if t in address_tags else t
receipts.append(receipt)
# threshold 单位dollar
def get_latest_transfer_logs(chain='eth', threshold=100000, logid=transfer_id):
global before_block_number, bsc_before_block_number
cur_block_number = get_cur_block_number(chain)
# _block_number_eth, _block_number_bsc = before_block_number, bsc_before_block_number
res = requests.get((get_log_url if chain == 'eth' else bsc_get_log_url) % (before_block_number if chain == 'eth' else bsc_before_block_number, logid))
if res.status_code != 200:
log('[%s error] get Transfer Log error: wrong status code: %d' % (chain, res.status_code))
return
info = json.loads(res.text)
if info['status'] != '1':
log('[%s error] get Transfer Log error: wrong status code: %s, detail: %s, at blocknumber %d' % (chain, info['status'], info, before_block_number if chain == 'eth' else bsc_before_block_number), info['message'] != 'No records found')
return
log('[%s info] ############# there has %d Transfer Event from %d to latest' % (chain, len(info['result']), before_block_number if chain == 'eth' else bsc_before_block_number), False)
receipts, tx_transfer = [], {}
for receipt in info['result']:
txhash = receipt['transactionHash'].lower()
if len(receipt['topics']) > 2:
a, f, t, v = receipt['address'], receipt['topics'][1], receipt['topics'][2], receipt['data']
if v == '0x':
v = '0x0' if len(receipt['topics']) == 3 else receipt['topics'][3]
# 以前的全部放在data 字段里面的
else:
a, f, t, v = receipt['address'], receipt['data'][:64], receipt['data'][64:64+64], receipt['data'][64+64:64+64+64]
addr, f, t, v = a.lower()[2:], f[26:].lower(), t[26:].lower(), int(v[2:], 16)
# log('[info] txhash: %s, addr: %s, f: %s, t: %s, v: %d' % (txhash, addr, f, t, v), False)
# listen_big_transfer(addr, chain, txhash, receipts, receipt, threshold, f, t) # 这个函数监听大额交易的
listen_holders_address(receipts, receipt, chain, txhash, f, t, v) # 这个函数监听我存的优秀地址的
tx_transfer[txhash] = tx_transfer.get(txhash, []) + [receipt]
for receipt in receipts:
txhash = receipt['transactionHash'].lower()
# if txhash in tx_transfer and len(tx_transfer[txhash]) > 1:
# send_discord(receipt, chain, tx_transfer[txhash])
if txhash in tx_transfer:
send_mail(receipt, chain, tx_transfer[txhash])
if txhash in tx_transfer: tx_transfer.pop(txhash)
if chain == 'eth': before_block_number = cur_block_number
else: bsc_before_block_number = cur_block_number
def start():
while True:
get_latest_transfer_logs('eth', 100000)
time.sleep(7.5)
get_latest_transfer_logs('bsc', 10000)
time.sleep(7.5)
def main():
try:
start()
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
log('[error] catch Exception: %s, current block number: %d, detail: %s' % (str(e), before_block_number, str(repr(traceback.format_exception(exc_type, exc_value, exc_traceback)))))
finally:
messages.clear() # 只好清除以前的消息
main()
if __name__ == '__main__':
# for i in range(20):
# send_mail('2333')
before_block_number = get_cur_block_number('eth')
bsc_before_block_number = get_cur_block_number('bsc')
main()
def test_token_is_in_list():
for i, token in enumerate(big_tokens.values()):
print(i, token, symbol_to_id[token.lower()])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment