Skip to content

Instantly share code, notes, and snippets.

@jossef
Created February 13, 2023 19:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jossef/4b45d96a9f7c16a97127ba5eb7d11e30 to your computer and use it in GitHub Desktop.
Save jossef/4b45d96a9f7c16a97127ba5eb7d11e30 to your computer and use it in GitHub Desktop.
part of a research to hunt attacker's activity
import concurrent
import csv
import logging
from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
import requests
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from tqdm import tqdm
from urllib3 import Retry
from PIL import Image
// download this image from:
// https://secure.gravatar.com/avatar/default?size=225
DEFAULT_GRAVATAR_IMAGE = Image.open('gravatar_template_image.jpg')
def get_pypi_user_info(username, session):
url = f'https://pypi.org/user/{username}/'
r = session.get(url)
r.raise_for_status()
soup = BeautifulSoup(r.content, 'html.parser')
profile_info_element = soup.find(class_='author-profile__info')
registration_date = ''
author_profile = soup.find(class_='author-profile')
profile_image_url = author_profile.find('img')['src']
r = session.get(profile_image_url)
r.raise_for_status()
image = Image.open(BytesIO(r.content))
is_gravatar_default_image = list(image.getdata()) == list(DEFAULT_GRAVATAR_IMAGE.getdata())
if profile_image_url and profile_image_url.startswith('https://warehouse-camo.ingress.cmh1.psfhosted.org'):
hex_encoded_url = profile_image_url.split('/')[-1]
profile_image_url = bytes.fromhex(hex_encoded_url).decode()
registration_date_element = profile_info_element.find('time')
if registration_date_element:
registration_date = registration_date_element.attrs.get('datetime', '')
display_name = ''
profile_name_element = soup.find(class_='author-profile__name')
if profile_name_element:
display_name = profile_name_element.text
published_packages = soup.findAll(class_='package-snippet')
published_packages_count = len(published_packages)
return display_name, registration_date, published_packages_count, profile_image_url, is_gravatar_default_image
def get_all_pypi_usernames(session):
url = "https://pypi.org/sitemap.xml"
r = session.get(url)
r.raise_for_status()
soup = BeautifulSoup(r.text, 'xml')
sitemaps = soup.findAll('sitemap')
sitemap_urls = map(lambda x: x.find('loc').text, sitemaps)
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(session.get, sitemap_url) for sitemap_url in sitemap_urls]
for future in concurrent.futures.as_completed(futures):
r = future.result()
r.raise_for_status()
soup = BeautifulSoup(r.text, 'xml')
url_elements = soup.findAll('url')
pypi_urls = map(lambda x: x.find('loc').text, url_elements)
pypi_urls = filter(lambda x: x.startswith('https://pypi.org/user/'), pypi_urls)
pypi_urls = map(lambda x: x.replace('https://pypi.org/user/', '').strip('/'), pypi_urls)
pypi_urls = map(lambda x: x.lower(), pypi_urls)
yield from pypi_urls
def main():
session = requests.session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s - %(message)s')
logging.info('loading usernames...')
pypi_usernames = get_all_pypi_usernames(session)
pypi_usernames = list(pypi_usernames)
pypi_usernames = pypi_usernames
logging.info(f'loaded {len(pypi_usernames)} usernames')
with open('users.csv', 'w+') as f, tqdm(total=len(pypi_usernames)) as progress:
writer = csv.writer(f)
writer.writerow(['username', 'display name', 'registration date', 'number of packages', 'image url', 'is default image'])
with ThreadPoolExecutor(max_workers=10) as executor:
futures_map = {executor.submit(get_pypi_user_info, pypi_username, session): pypi_username for pypi_username in pypi_usernames}
for future in concurrent.futures.as_completed(futures_map):
pypi_username = futures_map[future]
try:
display_name, registration_date, number_of_packages, image_url, is_gravatar_default_image = future.result()
writer.writerow([pypi_username, display_name, registration_date, number_of_packages, image_url, is_gravatar_default_image])
progress.update()
f.flush()
except:
logging.exception(f'failed to process {pypi_username}')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment