Skip to content

Instantly share code, notes, and snippets.

@ryancdotorg
Created November 28, 2022 21:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryancdotorg/575e65a973357e8c34c7812f7cf8397f to your computer and use it in GitHub Desktop.
Save ryancdotorg/575e65a973357e8c34c7812f7cf8397f to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
def _pylib():
import sys
from pathlib import Path
pylib = Path(Path.home(), 'code', 'pylib')
if pylib.is_dir():
sys.path.append(str(pylib.resolve()))
_pylib()
from sys import argv, stderr, stdout, version_info
from functools import partial
eprint = partial(print, file=stderr)
import concatjson
import re, json
import requests
from requests.sessions import Session
import time
from threading import Thread,local
from queue import Queue
domains = set()
instances = {}
results = []
RE1 = re.compile(r'\b(@?[a-z0-9_]+)@([a-z0-9.-]+[.](xn--[a-z0-9-]+|[A-Za-z]+))\b', re.I)
RE2 = re.compile(r'^https?://([a-z0-9.-]+)/(@[a-z0-9_]+)$', re.I)
for filename in argv[1:]:
for account in concatjson.load(filename):
uid, username, hits = account['id'], account['username'], []
urls = []
url = account['url']
try:
for u in account['entities']['url']['urls']:
if u['url'] == url: url = u['expanded_url'] or url
if u['expanded_url']: urls.append(u['expanded_url'])
except:
pass
name = account['name']
location = account['location']
description = account['description']
for u in urls:
m = RE2.search(u)
if m is not None:
domain, localpart = m.group(1).lower(), m.group(2).lower()
hits.append((localpart, domain))
for field in (name, location, description):
if field is not None:
m = RE1.search(field)
if m is not None:
localpart, domain = m.group(1).lower(), m.group(2).lower()
hits.append((localpart, domain))
if len(hits):
results.append({'id': uid, 'username': username, 'hits': hits})
for localpart, domain in hits:
domains.add(domain)
thread_local = local()
q1 = Queue(maxsize=0)
q2 = Queue(maxsize=0)
for d in domains: q1.put(d)
def get_session():
if not hasattr(thread_local,'session'):
thread_local.session = requests.Session() # Create a new Session if not exists
return thread_local.session
def check_domain():
session = get_session()
while not q1.empty():
domain = q1.get()
eprint('q1', domain, q1.qsize())
url = f'https://{domain}/.well-known/nodeinfo'
try:
with session.get(url, timeout=3) as response:
if response.status_code == 200:
obj = json.loads(response.content)
for link in obj.get('links', []):
rel, href = link.get('rel', None), link.get('href', None)
if rel == 'http://nodeinfo.diaspora.software/ns/schema/2.0':
#print(domain, href)
q2.put((domain, href))
else:
#print(domain, response.status_code)
pass
except:
pass
q1.task_done()
def check_nodeinfo():
session = get_session()
while not q1.empty():
while not q2.empty():
domain, url = q2.get()
eprint('q2', domain, q2.qsize())
try:
with session.get(url, timeout=3) as response:
if response.status_code == 200:
obj = json.loads(response.content)
instances[domain] = obj
else:
#print(domain, response.status_code)
pass
except:
pass
q2.task_done()
def check_all_domains(domains):
thread_num = 8
workers = []
for i in range(thread_num):
t_worker_1 = Thread(target=check_domain)
t_worker_1.start()
workers.append(t_worker_1)
t_worker_2 = Thread(target=check_nodeinfo)
t_worker_2.start()
workers.append(t_worker_2)
q1.join()
eprint('q1 joined')
q2.join()
eprint('q2 joined')
for acct in results:
for localpart, domain in acct.get('hits', []):
if domain in instances:
if not localpart[0] == '@': localpart = '@' + localpart
print(acct['username'], acct['id'], localpart+'@'+domain)
check_all_domains(domains)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment