Skip to content

Instantly share code, notes, and snippets.

@itherunder
Last active April 12, 2022 07:29
Show Gist options
  • Save itherunder/c2f1e1565c40988385114fb92fc9dd65 to your computer and use it in GitHub Desktop.
Save itherunder/c2f1e1565c40988385114fb92fc9dd65 to your computer and use it in GitHub Desktop.
获取以太坊上开源的代码

usage

  1. 将待获取的合约地址(含有0x)保存在./contracts.txt
  2. 如果是windows下的话,需要先创建./bytecodes以及./sourcecodes这两个文件夹
  3. python3 getContracts.py

函数

  1. getByteCode 获取字节码
  2. getSrcCode 获取源码
  3. getABI 获取ABI
  4. getCode 多线程获取xxx
import time, requests, json, os
from threading import Thread, Lock
__author__ = "Zhou.Liao"
if not os.path.exists('./gettedAddress'):
fp = open('./gettedAddress', 'w')
fp.close()
if not os.path.exists('bytecodes/'):
os.system('mkdir bytecodes/')
if not os.path.exists('sourcecodes/'):
os.system('mkdir sourcecodes/')
apis = [
'R8GU49QUDUBJHDEMAWDYQTQRSU1F5UE67N',
'AMKHQ2XEHNYEKDA6Y4XN7Y1TKAA2EGGUGV',
'W6Q3NZ4M2JZ38GF5AW2DF6X5VUKFH94VR4'
]
print('initializing contracts...')
contracts = []
with open('contracts.txt', 'r', encoding='utf-8') as rf:
for line in rf.readlines():
if line.startswith('0x'): contracts.append(line.strip())
print('initialized finished! total: %d' % len(contracts))
mutex = Lock()
gettedAddress = {}
with open('gettedAddress', 'r', encoding='utf-8') as rf:
for line in rf.readlines():
gettedAddress[line.strip()] = 1
globalCnt = len(gettedAddress)
def mutexWriteFile(file, type, message):
mutex.acquire()
with open(file, type, encoding='utf-8') as fd:
fd.write(message)
if file == 'sleep':
with open(file, 'r', encoding='utf-8') as rf:
flag = len(rf.readlines()) > 951116
if flag:
os.remove(file)
mutex.release()
def mutexGetContract():
global contracts
if not contracts: return None
mutex.acquire()
contract = contracts.pop()
mutex.release()
return contract
def getByteCode(contract):
res = requests.get('http://api-cn.etherscan.com/api?module=proxy&action=eth_getCode&address=%s&apikey=%s' % (contract, apis[ord(contract[-1]) % 3]))
# print(res.text)
res = json.loads(res.text)
if not res:
return False
if not res['result'].startswith('0x'):
return False
bytecode = res['result']
if bytecode == '0x':
mutexWriteFile('destroyedContract.txt', 'a', contract+'\n')
else:
with open('bytecodes/%s' % contract, 'w') as wf:
wf.write(bytecode)
return True
def getSrcCode(contract):
res = requests.get('http://api-cn.etherscan.com/api?module=contract&action=getsourcecode&address=%s&apikey=%s' % (contract, apis[ord(contract[-1]) % 3]))
# print(res.text)
res = json.loads(res.text)
if not res:
return False
if res['status'] == '0':
if res['result'] == 'Invalid Address format':
mutexWriteFile('invalid', 'a', contract+'\n')
return True
return False
if res['result'][0]['SourceCode']:
with open('sourcecodes/%s' % contract, 'w', encoding='utf-8') as wf:
wf.write(json.dumps(res))
return True
def getABI(contract):
res = requests.get('http://api-cn.etherscan.com/api?module=contract&action=getabi&address=%s&apikey=%s' % (contract, apis[ord(contract[-1]) % 3]))
# print(res.text)
res = json.loads(res.text)
if not res:
return False
if res['status'] == '0':
if res['result'] == 'Invalid Address format':
mutexWriteFile('invalid', 'a', contract+'\n')
return True
return False
if res['result'][0]['SourceCode']:
with open('sourcecodes/%s' % contract, 'w', encoding='utf-8') as wf:
wf.write(json.dumps(res))
return True
def getCode():
global globalCnt, gettedAddress
with open('startinfo', 'a', encoding='utf-8') as af:
af.write(time.asctime(time.localtime(time.time()))+'\n')
while True:
contract = mutexGetContract()
if not contract: break
if contract in gettedAddress: continue
print('%s [INFO] %d contract: %s' % (time.strftime("%y-%m-%d %H:%M:%S", time.localtime()), globalCnt, contract))
try:
#while not getByteCode(contract): pass
while not getSrcCode(contract): pass
mutex.acquire()
globalCnt += 1
mutex.release()
mutexWriteFile('gettedAddress', 'a', contract+'\n')
except Exception as e:
mutexWriteFile('error', 'a', contract + '#' + str(e)+'\n')
if __name__ == "__main__":
startTime = time.asctime(time.localtime(time.time()))
threads = []
for _ in range(4):
thread = Thread(target=getCode)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
endTime = time.asctime(time.localtime(time.time()))
print('end: ', endTime, '\nstart:', startTime)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment