Skip to content

Instantly share code, notes, and snippets.

@henryiii
Last active February 7, 2024 03:21
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save henryiii/432868baf4a69432b6d1c0220592af3d to your computer and use it in GitHub Desktop.
Save henryiii/432868baf4a69432b6d1c0220592af3d to your computer and use it in GitHub Desktop.
Download all pyproject.tomls
import contextlib
import sqlite3
import tomllib
from collections import Counter
def main():
counter = Counter()
with contextlib.closing(sqlite3.connect("pyproject_contents.db")) as con:
cursor = con.cursor()
for row in cursor.execute("SELECT contents FROM pyproject"):
contents, = row
try:
toml = tomllib.loads(contents)
backend = toml.get("build-system", {}).get("build-backend", "unknown")
if isinstance(backend, str):
counter[backend] += 1
else:
counter["busted"] += 1
except tomllib.TOMLDecodeError:
counter["broken"] += 1
for i, (k, v) in enumerate(counter.most_common()):
print(f"{i:3} {k}: {v}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import argparse
import pickle
from collections import Counter
from collections.abc import Generator
from pathlib import Path
from typing import Any
def dig(value: Any, key: str, *keys: str) -> Any:
res = value.get(key, {})
return dig(res, *keys) if keys else res
def all_keys(
d: dict[str, Any], level: int, *prefixes: str
) -> Generator[str, None, None]:
for key, value in d.items():
if isinstance(value, dict) and level > 0:
yield from all_keys(value, level - 1, *prefixes, key)
else:
yield ".".join([*prefixes, key])
def get_tomls_cached(db: str) -> Generator[tuple[str, str, dict[str, Any]], None, None]:
pkl = Path(f"{db}.pkl")
with pkl.open("rb") as f:
yield from pickle.load(f)
def main(tool: str, get_contents: bool, level: int = 0) -> None:
if tool:
if get_contents:
print(f"{tool} contents:")
else:
print(tool + ".*" * (level + 1) + ":")
else:
if get_contents:
raise AssertionError("Can't get contents with no section")
print("*:")
if get_contents and level > 0:
raise AssertionError("Can't use level with contents")
counter = Counter()
for _, _, toml in get_tomls_cached("pyproject_contents.db"):
item = dig(toml, *tool.split(".")) if tool else toml
if item:
if get_contents:
counter[repr(item)] += 1
else:
counter += Counter(all_keys(item, level=level))
for k, v in counter.most_common():
print(f"{k}: {v}")
def blame(tool: str, string: str) -> None:
if string:
print(tool, "=", string)
else:
print(tool, "= ...")
for name, version, toml in get_tomls_cached("pyproject_contents.db"):
item = dig(toml, *tool.split(".")) if tool else toml
if not string and item:
print(name, version, "=", repr(item))
elif repr(item) == string:
print(name, version)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("tool", help="Tool to processes")
parser.add_argument("-c", "--contents", action="store_true")
parser.add_argument(
"-l", "--level", type=int, default=0, help="Unpack nested levels"
)
parser.add_argument(
"-b",
"--blame",
help="print matching project names, empty string to print any value (careful)",
)
args = parser.parse_args()
if args.blame is not None:
assert args.level == 0
assert not args.contents
blame(args.tool, args.blame)
else:
main(args.tool, args.contents, args.level)
#!/usr/bin/env python
import contextlib
import sqlite3
import tomllib
from collections import Counter
def main():
counter = Counter()
with contextlib.closing(sqlite3.connect("pyproject_contents.db")) as con:
cursor = con.cursor()
for row in cursor.execute("SELECT contents FROM pyproject"):
contents, = row
with contextlib.suppress(tomllib.TOMLDecodeError):
toml = tomllib.loads(contents)
tools = toml.get("tool", {}).keys()
counter += Counter(f"tool.{k}" for k in tools)
for i, (k, v) in enumerate(counter.most_common()):
print(f"{i:3} {k}: {v}")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import contextlib
import pickle
import sqlite3
from collections.abc import Generator
from pathlib import Path
from typing import Any
import tomllib
def get_tomls(db: str) -> Generator[tuple[str, str, dict[str, Any]], None, None]:
with contextlib.closing(sqlite3.connect(db)) as con:
cursor = con.cursor()
for row in cursor.execute(
"SELECT project_name, project_version, contents FROM pyproject"
):
project_name, project_version, contents = row
with contextlib.suppress(tomllib.TOMLDecodeError):
yield project_name, project_version, tomllib.loads(contents)
def make_cache(db: str) -> None:
pkl = Path(f"{db}.pkl")
with pkl.open("wb", pickle.HIGHEST_PROTOCOL) as f:
pickle.dump(list(get_tomls(db)), f)
if __name__ == "__main__":
make_cache("pyproject_contents.db")
MIT License
Copyright (c) 2023 Françoise CONIL
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# /// script
# dependencies = ["aiohttp", "packaging"]
# requires-python = ">=3.11"
# ///
"""
Downloads all pyproject.toml files and puts them in a database. Doesn't talk to
GitHub if the package name + version already is in the database.
You need an input CSV to work on. To prepare one, you can use
pyproject-latest-to-csv.py.
"""
import asyncio
import contextlib
import csv
import itertools
import logging
import sqlite3
import sys
import time
from collections.abc import Iterator
import aiohttp
from packaging.version import Version
LOG = logging.getLogger(__name__)
PYPROJECT_CREATE = """CREATE TABLE IF NOT EXISTS
pyproject(project_name TEXT PRIMARY KEY, project_version TEXT, contents TEXT)
"""
INSERT_CONTENTS = """INSERT INTO pyproject
VALUES (:project_name, :project_version, :contents)
"""
GET_CONTENTS = """SELECT project_version FROM pyproject WHERE project_name=?
"""
DELETE_CONTENTS = """DELETE FROM pyproject WHERE project_name=?
"""
csv.field_size_limit(sys.maxsize)
async def get_data(
session: aiohttp.ClientSession, path: str, repo: int, name: str
) -> str | None:
if path.count("/") != 4 or not path.endswith("/pyproject.toml"):
LOG.warning("Project %s has non-top-level path %s", name, path)
return None
url = f"https://raw.githubusercontent.com/pypi-data/pypi-mirror-{repo}/code/{path}"
try:
async with session.get(url) as response:
if response.status == 429:
LOG.error("Rate limited when accessing %s", name)
raise RuntimeError("Rate limited")
if response.status != 200:
LOG.error("pycodeorg.get_data failed to retrieve %s", name)
return None
try:
data = await response.text()
except UnicodeDecodeError:
LOG.exception("Unicode decode error on %s", name)
return None
return data
except (
aiohttp.http_exceptions.BadHttpMessage,
aiohttp.client_exceptions.ClientResponseError,
):
LOG.exception("Failed reading %s", name)
return None
async def worker(
iterator: Iterator[str], session: aiohttp.ClientSession, cursor: sqlite3.Cursor, thread: int
) -> None:
with contextlib.suppress(StopIteration):
for i in itertools.count(0):
if i and i % 200 == 0:
LOG.info("PROGRESS %d: %d", thread, i)
line = next(iterator)
with cursor.connection:
result = cursor.execute(GET_CONTENTS, (line["project_name"],))
value = result.fetchone()
if value and Version(line["project_version"]) <= Version(value[0]):
continue
data = await get_data(
session, line["path"], line["repository"], line["project_name"]
)
if not data:
continue
with cursor.connection:
cursor.execute(DELETE_CONTENTS, (line["project_name"],))
cursor.execute(
INSERT_CONTENTS,
{
"project_name": line["project_name"],
"project_version": line["project_version"],
"contents": data,
},
)
async def main() -> None:
with contextlib.closing(sqlite3.connect("pyproject_contents.db")) as cnx_backend:
cur_backend = cnx_backend.cursor()
cur_backend.execute(PYPROJECT_CREATE)
with open("extract-pyproject-all-versions.csv", newline="") as f:
reader = csv.DictReader(f)
total = len(list(reader))
print(f"Processing {total} projects")
with open("extract-pyproject-latest.csv", newline="") as f:
reader = csv.DictReader(f)
iterator = iter(reversed(list(reader)))
async with aiohttp.ClientSession() as session, asyncio.TaskGroup() as tg:
for i in range(8):
tg.create_task(worker(iterator, session, cur_backend, i))
if __name__ == "__main__":
start_time = time.time()
logging.basicConfig(filename="pyproject_contents.log", level=logging.INFO)
asyncio.run(main())
end_time = time.time()
duration_msg = f"Getting files took : {end_time - start_time:0.3} seconds."
LOG.info(duration_msg)
print(duration_msg)
# /// script
# dependencies = ["duckdb"]
# ///
"""
Originally from https://framapiaf.org/@fcodvpt/111540079686191842
https://gitlab.liris.cnrs.fr/fconil-small-programs/packaging/get-pypi-packages-backends
https://sethmlarson.dev/security-developer-in-residence-weekly-report-18
https://gist.github.com/sethmlarson/852341a9b7899eda7d22d8c362c0a095
curl -L --remote-name-all $(curl -L "https://github.com/pypi-data/data/raw/main/links/dataset.txt")
MIT licensed.
"""
import duckdb
ALL_VERSIONS_QUERY = """SELECT project_name, COUNT(project_name) AS nb_uploads,
MAX(project_version) AS max_version,
LIST(DISTINCT project_version) AS all_versions,
MAX(uploaded_on) AS max_uploaded_on,
LIST(DISTINCT uploaded_on) AS all_uploaded_on,
LIST(DISTINCT repository) AS all_repository,
LIST(DISTINCT path) AS all_path
FROM '*.parquet'
WHERE (date_part('year', uploaded_on) >= '2018') AND regexp_matches(path, 'pyproject.toml$') AND skip_reason == ''
GROUP BY project_name;
"""
res = duckdb.sql(ALL_VERSIONS_QUERY)
res.to_csv("extract-pyproject-all-versions.csv", header=True)
LATEST_QUERY = """WITH lpv AS (SELECT project_name, COUNT(project_name) AS nb_uploads,
MAX(uploaded_on) AS max_uploaded_on,
LIST(DISTINCT uploaded_on) AS all_uploaded_on
FROM '*.parquet'
WHERE (date_part('year', uploaded_on) >= '2018') AND regexp_matches(path, 'pyproject.toml$') AND skip_reason == ''
GROUP BY project_name)
SELECT ip.repository, ip.project_name, ip.project_version, lpv.nb_uploads,
ip.uploaded_on, date_part('year', ip.uploaded_on) AS year, ip.path
FROM '*.parquet' as ip
JOIN lpv ON ip.project_name=lpv.project_name AND ip.uploaded_on=lpv.max_uploaded_on
WHERE regexp_matches(path, 'pyproject.toml$') AND skip_reason == '';
"""
# res = duckdb.sql(LATEST_QUERY).show()
res = duckdb.sql(LATEST_QUERY)
res.to_csv("extract-pyproject-latest.csv", header=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment