Skip to content

Instantly share code, notes, and snippets.

@veqtor
Last active October 11, 2021 22:07
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save veqtor/3b6fb636b1e150f9f0f0ff1c11d96132 to your computer and use it in GitHub Desktop.
Save veqtor/3b6fb636b1e150f9f0f0ff1c11d96132 to your computer and use it in GitHub Desktop.
# This is a sample Python script.
# Press ⌃R to execute it or replace it with your code.
# Press Double ⇧ to search everywhere for classes, files, tool windows, actions, and settings.
import aiohttp
event_loop = None
import asyncio
from pprint import pprint
aio_session = None
import time
import pandas as pd
import json
from dataclasses import dataclass
from tqdm import tqdm
import requests
# These are known exchange addresses
whitelist = {'tz1MHDcPPMZsK9mPA8XwUSw5kNqoJG3pXJ2f',
'tz1SiPXX4MYGNJNDsRc7n8hkvUqFzg8xqF9m',
'tz1ch7nvDDAchbAPfTio3yBsdvN1SZJZj1wY',
'tz1TmBz5CkiMrrcpxATTg1nDtd1CUoBd17DC',
'tz1NcoDFXMAfB26mpBhVrdSHmppyTeccT6Fi',
'tz1Z7eWGw18LqUgRqmDqNZFQx7f8GEHXRfT8',
'tz1bDhCGNZLQw1QXgf6MCzo6EtAVSGkqEB11',
'tz1bDXD6nNSrebqmAnnKKwnX1QdePSMCj4MX',
'tz1NH4A2kRyFQUYhyi9aL8jrrySQUrCgNsX9',
'tz1YpWsMwc4gSyjtxEF3JbmN6YrGiDidaSmg',
'tz1cgJLxcS8ZTgq5dXPtojJJR7Bp8fKDyxPK',
'tz1VeaJWkdr2m5YaKFgeAafenhHhDcMxWfHC',
}
@dataclass
class TzpClient:
url: str
headers: dict
def run_query(self, query: str, variables: dict, extract=False):
request = requests.post(
self.url,
headers=self.headers,
json={"query": query, "variables": variables},
)
assert request.ok, f"Failed with code {request.status_code}"
return request.json()
get_tzp = lambda self, account: self.run_query(
"""
query MyQuery(account: String!) {
tzprofiles(where: {account: {_eq: account}}) {
account
invalid_claims
valid_claims
contract
}
}
""",
{"account": account},
)
tzp_client = None
async def get_tzp_proofs(acc):
global tzp_client
if tzp_client is None:
tzp_client = TzpClient(url="https://indexer.tzprofiles.com/v1/graphql", headers={})
try:
proofs = await tzp_client.get_tzp(acc)
total_proofs = []
for pr in proofs:
total_proofs += pr['valid_claims']
return total_proofs
except:
return []
@dataclass
class HicDexClient:
url: str
headers: dict
def run_query(self, query: str, variables: dict, extract=False):
request = requests.post(
self.url,
headers=self.headers,
json={"query": query, "variables": variables},
)
assert request.ok, f"Failed with code {request.status_code}"
return request.json()
get_objkt = lambda self, id: self.run_query(
"""
query Objkt($id: bigint!) {
hic_et_nunc_token_by_pk(id: $id) {
id
title
creator {
address
name
}
description
mime
}
}
""",
{"id": id},
)['data']['hic_et_nunc_token_by_pk']
def get_session():
global aio_session
if aio_session is None:
import aiohttp
session_timeout = aiohttp.ClientTimeout(total=None, connect=1., sock_connect=2., sock_read=15.)
aio_session = aiohttp.ClientSession(timeout=session_timeout)
return aio_session
async def http_request(session: aiohttp.ClientSession, method: str, **kwargs):
"""Wrapped aiohttp call with preconfigured headers and logging"""
headers = {
**kwargs.pop('headers', {}),
'User-Agent': 'versum_dipdup',
}
request_string = kwargs['url'] + '?' + '&'.join(
[f'{key}={value}' for key, value in kwargs.get('params', {}).items()])
# _logger.debug('Calling `%s`', request_string)
async with getattr(session, method)(
skip_auto_headers={'User-Agent'},
headers=headers,
**kwargs,
) as response:
return await response.json(content_type=None)
async def get_incoming_txs(wallet: str):
txs = []
returned_zero = False
page = 0
while not returned_zero:
url = f'https://api.tzkt.io/v1/operations/transactions?target={wallet}&parameter.null=true&offset.pg={page}&limit=25'
res = await http_request(get_session(), 'get', url=url)
txs += res
returned_zero = len(res) == 0
page += 1
return txs
async def get_outgoing_txs(wallet: str):
txs = []
returned_zero = False
page = 0
while not returned_zero:
url = f'https://api.tzkt.io/v1/operations/transactions?sender={wallet}&parameter.null=true&offset.pg={page}&limit=25'
res = await http_request(get_session(), 'get', url=url)
txs += res
returned_zero = len(res) == 0
page += 1
return txs
async def get_collect_transfer_operation(ophash):
res = None
send_op = None
col_op = None
try:
url = f'https://api.tzkt.io/v1/operations/transactions/{ophash}?entrypoint=transfer&status=applied&sender=KT1HbQepzV1nVGg8QVznG7z4RcHseD5kwqBn'
res = await http_request(get_session(), 'get', url=url)
send_op = [tx for tx in res if tx['target']['address'] == 'KT1RJ6PbjHpwc3M5rw5s2Nbmefwbuwbdxton']
send_op = [tx for tx in send_op if 'parameter' in tx]
send_op = [tx for tx in send_op if tx['parameter'].get('entrypoint', '') == 'transfer']
send_op = [tx for tx in send_op if 'value' in tx['parameter']]
send_op = [tx for tx in send_op if len(tx['parameter']['value']) > 0]
if len(send_op) == 0:
return None
col_op = send_op[0]['parameter']['value'][0]['txs']
if len(col_op) == 1:
col_op = col_op[0]
col_op['ophash'] = ophash
return [col_op]
else:
return None
except Exception as e:
print(e)
print(res, send_op, col_op)
pprint(send_op)
raise e
async def get_collect_operations(acc):
url = f'https://api.tzkt.io/v1/accounts/{acc}/operations?entrypoint=collect&status=applied&limit=1000&target=KT1HbQepzV1nVGg8QVznG7z4RcHseD5kwqBn'
res = await http_request(get_session(), 'get', url=url)
real_ops = []
for tx in res:
ctf = None
retries = 0
while ctf is None and retries < 5:
try:
_ctf = await get_collect_transfer_operation(tx['hash'])
if _ctf is None:
retries = 10
continue
ctf = _ctf
real_ops += ctf
except Exception as e:
print('get_collect_operations', e, tx['hash'])
# raise e
time.sleep(pow(2, retries))
retries += 1
time.sleep(0.1)
return real_ops
async def check_sus(acc):
url = f'https://api.tzkt.io/v1/accounts/{acc}?metadata=true'
res = await http_request(get_session(), 'get', url=url)
maybe_sus = False
if 'metadata' in res:
if res['metadata']['kind'] == 'person':
maybe_sus = len(list(res['metadata'].keys())) < 3
else:
return False
else:
maybe_sus = True
if maybe_sus:
tzproofs = await get_tzp_proofs(acc)
if len(tzproofs) < 1:
return True
return False
async def recursive_check_puppets(acc, depth=0, max_depth=5, current_puppets=None, with_incoming=True):
print(f'Checking {acc}')
if acc.lower().startswith('kt'):
return [], [], False
if current_puppets is None:
current_puppets = set()
sus = await check_sus(acc)
if not sus:
return [], [], False
txs = await get_outgoing_txs(acc)
txs = [tx for tx in txs if tx['amount'] > 1000000]
puppets = set([tx['target']['address'] for tx in txs])
puppets -= whitelist
puppets -= current_puppets
print(f'Outgoing puppet accounts {len(puppets)}')
if with_incoming:
txs = await get_incoming_txs(acc)
txs = [tx for tx in txs if tx['amount'] > 1000000]
inc_puppets = set([tx['sender']['address'] for tx in txs])
inc_puppets -= whitelist
inc_puppets -= current_puppets
print(f'Incoming funds accounts {len(inc_puppets)}')
puppets = puppets | inc_puppets
total_puppets = list(puppets)
total_puppets = [p for p in total_puppets if not p.lower().startswith('kt')]
puppets = set(total_puppets)
funding_ops = txs
if depth < max_depth:
for puppet in list(puppets):
pupps, fops, sus = (await recursive_check_puppets(puppet, depth + 1, max_depth=max_depth,
current_puppets=current_puppets | puppets,
with_incoming=with_incoming))
total_puppets += pupps
funding_ops += fops
time.sleep(0.1)
if not sus:
total_puppets.remove(puppet)
return total_puppets, funding_ops, True
async def checkit(acc):
hdclient = HicDexClient(url="https://api.hicdex.com/v1/graphql", headers={})
ttclient = HicDexClient(url="https://hdapi.teztools.io/v1/graphql", headers={})
puppets, funding_ops, _ = (await recursive_check_puppets(acc))
puppets = list(set(puppets))
pprint(puppets)
puppets = puppets
collection = []
for puppet in tqdm(puppets, desc='Gathering puppets'):
txs = await get_collect_operations(puppet)
time.sleep(1)
collection += txs
pprint(collection[0])
objkt_ids = list(set([coll['token_id'] for coll in collection]))
collects = {oi: {} for oi in objkt_ids}
collect_amts = {oi: 0 for oi in objkt_ids}
used_ophashes = set()
for coll in collection:
if coll['ophash'] not in used_ophashes:
try:
collect_map = collects[coll['token_id']]
if coll['to_'] not in collect_map:
collect_map[coll['to_']] = {'amount': 0, 'txs': 0}
entry = collect_map[coll['to_']]
entry['amount'] += int(coll['amount'])
entry['txs'] += 1
collect_map[coll['to_']] = entry
collect_amts[coll['token_id']] += int(coll['amount'])
used_ophashes.add(coll['ophash'])
except Exception as e:
print(e)
print(coll)
raise e
collected_objkts = []
for objkt_id in tqdm(objkt_ids, desc='gathering botted objkts'):
objkt = None
retries = 0
while objkt is None and retries < 5:
try:
objkt = hdclient.get_objkt(id=objkt_id)
except:
try:
objkt = ttclient.get_objkt(id=objkt_id)
except:
time.sleep(pow(2, retries))
retries += 1
if objkt is None:
continue
objkt['collects'] = json.dumps(collects[objkt_id])
objkt['total_amount'] = collect_amts[objkt_id]
creator = objkt['creator']
objkt['creator'] = creator['address']
objkt['creator_name'] = creator['name']
objkt['description'] = objkt['description'].replace('\n', ' - ')[:16] + '...'
objkt['title'] = objkt['title'].replace('\n', ' - ')
collected_objkts.append(objkt)
df = pd.DataFrame.from_dict(data=collected_objkts)
df = df[['id', 'title', 'creator', 'total_amount', 'creator_name', 'collects', 'description', 'mime']]
df = df.sort_values(by=['creator', 'total_amount', 'id'], ascending=[False, False, True])
print(df)
df.to_csv(f'{acc}.tsv', sep='\t')
df = pd.DataFrame.from_dict(data=funding_ops)
df.to_csv(f'{acc}_puppet_funding_ops.tsv', sep='\t')
puppets_dict = [{'address': p} for p in puppets]
df = pd.DataFrame.from_dict(data=puppets_dict)
df.to_csv(f'{acc}_puppet_accounts.tsv', sep='\t')
# Press the green button in the gutter to run the script.
if __name__ == '__main__':
asyncio.run(checkit('tz1LLQ63fE66fdACEa9MGPYcUsKtiCHDPmSD'))
# See PyCharm help at https://www.jetbrains.com/help/pycharm/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment