Skip to content

Instantly share code, notes, and snippets.

@sveetch
Created June 14, 2024 15:53
Show Gist options
  • Save sveetch/b1e224e0c3d8d141a6a9f69d836532f6 to your computer and use it in GitHub Desktop.
Save sveetch/b1e224e0c3d8d141a6a9f69d836532f6 to your computer and use it in GitHub Desktop.
Python script to get information about package activity against project requirements, Work In Progress
"""
A commandline script to get project dependencies informations using Libraries.io API.
This requires a valid API key from 'Libraries.io' that you can get just by register
to the site itself, you will need to write it to a file to give as command argument.
Requirements:
* requests>=2.32.3;
* semantic-version>=2.10.0;
* humanize>=4.9.0;
* packaging>=24.0;
TODO:
* Implements 'major_lateness' package info that should only list the major version
lateness;
* Method 'get_package_data' should manage non success response status;
* cache directory may be automatically created if given but does not exist yet;
* Add support for requirements file format (setup.cfg, pip requirements, poetry?,
etc..) to read a batch of packages to analyze;
"""
import json
import datetime
import time
from operator import itemgetter
from pathlib import Path
import humanize
import requests
import semantic_version
PACKAGE_DETAIL_ENDPOINT = (
"https://libraries.io/api/{plateform}/{name}?api_key={key}"
)
class ExtendedJsonEncoder(json.JSONEncoder):
"""
Additional opiniated support for more basic object types.
Usage sample: ::
json.dumps(..., cls=ExtendedJsonEncoder)
"""
def default(self, obj):
# Support for pathlib.Path to a string
if isinstance(obj, Path):
return str(obj)
# Support for set to a list
if isinstance(obj, set):
return list(obj)
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat()
if isinstance(obj, semantic_version.Version):
return str(obj)
# Let the base class default method raise the TypeError
return json.JSONEncoder.default(self, obj)
class DependenciesAnalyzer:
def __init__(self, api_key, cachedir=None):
self.api_key = api_key
self.cachedir = cachedir
self.now_date = datetime.datetime.now()
# Time in seconds to pause before an API request (to embrace limit of 60
# requests max per minute)
self.api_pause = 1
def endpoint_package_info(self, name):
"""
Request package detail API endpoint for given package name.
"""
time.sleep(self.api_pause)
endpoint_url = PACKAGE_DETAIL_ENDPOINT.format(
plateform="Pypi",
name=name,
key=self.api_key,
)
return requests.get(endpoint_url)
def get_package_data(self, name):
"""
Get package detail either from API or from cache if any.
"""
print("🐛 Package:", name)
# Build expected cache file name if cache is enabled
cache_file = None
if self.cachedir:
cache_file = self.cachedir / "{}.json".format(name)
# Use cache if exists without any condition
if cache_file and cache_file.exists():
print(" - Loading data from cache")
output = json.loads(cache_file.read_text())
else:
# Get payload from API
response = self.endpoint_package_info(name)
# TODO: Non success response status should raise an error
print(" [{}]".format(response.status_code), "GET", response.url)
output = response.json()
# Build cache if cache is enabled
if self.cachedir:
print(" - Writing cache:", str(cache_file))
cache_file.write_text(json.dumps(output, indent=4))
return output
def compute_lateness(self, target, versions):
"""
Compute version lateness for a given version target.
Lateness is only about version higher than targeted version and that are not
build releases or pre releases
Arguments:
target (string or semantic_version.Version): The targeted version
to check against package released versions. If a string it will be
coerced to a semantic version.
versions (list): List of dictionnaries (as computed from
``build_package_informations()``) for all existing release versions.
Returns:
list: A list of tuples for all existing version higher
than given target release version. Tuple first item is the version
number (as a ``semantic_version.Version`` object and second item is its
release publishing datetime.
"""
if not isinstance(target, semantic_version.Version):
target = semantic_version.Version.coerce(target)
return [
(str(item["number"]), item["published_at"])
for item in versions
if (
item["number"] > target and
len(item["number"].prerelease) == 0 and
len(item["number"].build) == 0
)
]
def build_package_informations(self, name, target=None):
"""
Build package informations with useful collected data
Arguments:
name (string): The package name to search on Pypi.
Keyword Arguments:
target (string or semantic_version.Version): The targeted version
to check against package released versions. If a string it will be
coerced to a semantic version.
Returns:
dict: The computed package informations.
"""
data = self.get_package_data(name)
# Rebuild the version list to patch some values in useful types
versions = []
for item in data["versions"]:
# Enforce real datetime
item["published_at"] = datetime.datetime.fromisoformat(
item["published_at"].split(".")[0]
)
# Original release number from Pypi
item["_number"] = item["number"]
# Coerce original number to a semantic version
item["number"] = semantic_version.Version.coerce(item["number"])
versions.append(item)
# Once number has been coerced it can be used to reorder versions properly
# on number
versions = sorted(data["versions"], key=itemgetter("number"))
highest_published = versions[-1]["published_at"]
published_delta = humanize.naturaldelta(self.now_date - highest_published)
# Compute version lateness if a version has been given
all_lateness = self.compute_lateness(target, versions) if target else None
return {
"name": name,
"highest_version": semantic_version.Version.coerce(
data["latest_release_number"]
),
"version_target": target,
"pypi_url": data["package_manager_url"],
"repository_url": data["repository_url"],
#"versions": versions,
# Computed from highest release version
"highest_published": highest_published,
# Include all version release higher than targeted version
# Would be None if no targeted version has been given or a list if given.
# Can be an empty list if targeted version match the latest release
"all_lateness": all_lateness,
# Delay since the latest package release
"last_activity": published_delta,
# TODO: May only list the major version lateness
"major_lateness": None,
}
def analyze(self, requirements):
"""
* Open requirements file
* parse it to get only package line items
* split each line into package+version
* parse version with 'packaging.Specifier'
"""
return
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description=(
"Check project dependencies files (currently from a Pip requirements "
"file) to know about their stability."
),
)
parser.add_argument(
"source",
type=Path,
default=None,
help=(
"Pip requirements file."
)
)
parser.add_argument(
"--destination",
default=None,
help=(
"If given, its a filepath where to write report else it will be print to"
"the standard output."
)
)
parser.add_argument(
"--cachedir",
default=None,
type=Path,
help=(
"A directory where to look for Libraries.io package details cache. It is "
"expected to be in format as returned from package details endpoint in"
"JSON. Script look for a file with the name of package name ended by "
"'.json'. If found, no attempt to request API will be done for the"
"package. This is mostly for debugging purpose as you should always use"
"the most recent package details."
)
)
parser.add_argument(
"--filekey",
default=None,
type=Path,
help=(
"A simple text file which only contains the Libraries.io API key to use "
"to request the API. It is required since you can not requests the API "
"without an API key."
)
)
args = parser.parse_args()
destination_filepath = None
if args.destination:
destination_filepath = Path(args.destination)
if not args.source.exists():
raise OSError("Given source path does not exists: {}".format(args.source))
elif not args.source.is_file():
raise OSError("Given source path is not a file: {}".format(
args.source
))
if args.cachedir:
if not args.cachedir.is_dir():
raise OSError("Given cachedir path is not a directory: {}".format(
args.cachedir
))
if not args.cachedir.exists():
raise OSError("Given cachedir path does not exists: {}".format(
args.cachedir
))
if not args.filekey:
raise ValueError("The '--filekey' argument is required.")
elif not args.filekey.exists():
raise OSError("Given filekey path does not exists: {}".format(args.filekey))
elif not args.filekey.is_file():
raise OSError("Given filekey path is not a file: {}".format(
args.filekey
))
else:
api_key = args.filekey.read_text()
analyzer = DependenciesAnalyzer(api_key, cachedir=args.cachedir)
#print()
#infos = analyzer.build_package_informations(
#"django-browser-reload",
#target="1.12.1",
#)
#print(
#json.dumps(infos, indent=4, cls=ExtendedJsonEncoder)
#)
#print()
#infos = analyzer.build_package_informations("diskette")
#print(
#json.dumps(infos, indent=4, cls=ExtendedJsonEncoder)
#)
#print()
#infos = analyzer.build_package_informations(
#"django-blog-zinnia",
#target="0.18",
#)
#print(
#json.dumps(infos, indent=4, cls=ExtendedJsonEncoder)
#)
#print()
#infos = analyzer.build_package_informations(
#"django",
#target="3.2",
#)
#print(
#json.dumps(infos, indent=4, cls=ExtendedJsonEncoder)
#)
analyzer.analyze(args.source)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment