Skip to content

Instantly share code, notes, and snippets.

@lukaszb
Created December 16, 2013 00:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukaszb/7980109 to your computer and use it in GitHub Desktop.
Save lukaszb/7980109 to your computer and use it in GitHub Desktop.
Small app for creating pypi stats (mainly, extracting Python version classifier). It's generally hacky and smelly but makes the job done. Also, I needed some indexes so am using Django for db.
#!/usr/bin/env python
# encoding: utf-8
import os
import sys
from django.conf import settings
abspath = lambda *p: os.path.abspath(os.path.join(*p))
ROOT_DIR = abspath(os.path.dirname(__file__))
APP = os.path.splitext(os.path.basename(__file__))[0]
sys.path.insert(0, ROOT_DIR)
DBNAME = os.path.expanduser('~/.pypistats.sqlite')
SETTINGS = {
'DATABASES': {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': DBNAME,
}
},
'INSTALLED_APPS': [APP],
}
if not settings.configured:
settings.configure(**SETTINGS)
# =============================================================================
# Here starts real app
# =============================================================================
from concurrent import futures
from django.core.management import call_command
from django.db import models
from jsonfield import JSONField
from pkgtools.pypi import PyPIXmlRpc
import argparse
import datetime
import frogress
import os
BATCH_SIZE = 500
WORKERS = 75
def AppMeta(table_name, **kwargs):
return type('Meta', (), dict({
'app_label': APP,
'__module__': APP,
'db_table': table_name,
}, **kwargs))
class Package(models.Model):
name = models.CharField(max_length=1024, unique=True)
versions = JSONField(default=list)
class Meta:
app_label = APP
__module__ = APP
db_table = 'package'
def __str__(self):
return self.name
class PackageRelease(models.Model):
package = models.ForeignKey(Package, related_name='releases')
version = models.CharField(max_length=128)
python2 = models.BooleanField(default=False)
python3 = models.BooleanField(default=False)
uploaded_at = models.DateTimeField(null=True)
Meta = AppMeta('package_release', unique_together=('package', 'version'))
def __str__(self):
return '%s | %s' % (self.package, self.version)
def log(msg):
print(" * %s" % msg)
def has_python_support(classifiers, python_ver):
expected = 'Programming Language :: Python :: %s' % python_ver
for classifier in classifiers:
if classifier.startswith(expected):
return True
return False
def fetch_package(name):
pypi = PyPIXmlRpc()
versions = pypi.package_releases(name)
return Package(name=name, versions=versions)
def fetch_new_packages():
log("Fetching packages ...")
pypi = PyPIXmlRpc()
packages = pypi.list_packages()
#packages = open('/tmp/pypi.packages').readlines()
existing = Package.objects.values_list('name', flat=True)
missing = sorted(set(packages) - set(existing))
if not missing:
return
log("There are %s packages missing. Adding them ..." % len(missing))
with futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
packages_versions = executor.map(fetch_package, missing)
iterator = frogress.bar(packages_versions, steps=len(packages))
Package.objects.bulk_create(iterator)
def to_datetime(dt):
return datetime.datetime(*dt.timetuple()[:6])
def fetch_package_releases(package_version):
# need to accept tuple as argument - required by executor api
package, version = package_version
pypi = PyPIXmlRpc()
data = pypi.release_data(package.name, version)
classifiers = data.get('classifiers', [])
urls = pypi.release_urls(package.name, version)
upload_times = [to_datetime(url['upload_time']) for url in urls]
if upload_times:
upload_time = sorted(upload_times)[0]
else:
upload_time = None
return PackageRelease(
package=package,
version=version,
python2=has_python_support(classifiers, 2),
python3=has_python_support(classifiers, 3),
uploaded_at=upload_time,
)
def fetch_packages_releases(packages=None):
packages = packages or Package.objects.all()
simple_releases = set((p, ver) for p in packages for ver in p.versions)
qs = PackageRelease.objects.select_related('package__name')
existing = set((r.package, r.version) for r in qs)
missing = sorted(simple_releases - existing)
if not missing:
log("There are no missing releases to fetch")
return
with futures.ThreadPoolExecutor(max_workers=WORKERS) as executor:
log("There are %s missing package releases ..." % len(missing))
releases = executor.map(fetch_package_releases, missing)
iterator = frogress.bar(releases, steps=len(missing))
for pr in iterator:
pr.save()
def main(recreate=False, fetch_packages=False):
get_app_orig = models.get_app
def get_app(app_label,*a, **kw):
if app_label==APP:
return sys.modules[__name__]
return get_app_orig(app_label, *a, **kw)
models.get_app = get_app
models.loading.cache.app_store[type(APP+'.models',(),{'__file__':__file__})] = APP
if recreate:
log('Removing database ...')
os.remove(DBNAME)
if not os.path.isfile(DBNAME):
log('Creating database ...')
call_command('syncdb', interactive=False, verbosity=0)
if recreate or fetch_packages:
fetch_new_packages()
fetch_packages_releases()
if __name__ == '__main__':
first_run = not os.path.isfile(DBNAME)
parser = argparse.ArgumentParser()
parser.add_argument('-r', '--recreate', action='store_true', default=False,
help='Recreates database')
parser.add_argument('-f', '--fetch-packages', action='store_true',
default=False, help='Fetch new packages')
parser.add_argument('-s', '--shell', action='store_true', default=False)
namespace = parser.parse_args()
if namespace.shell:
import ipdb; ipdb.set_trace()
else:
main(
recreate=namespace.recreate,
fetch_packages=first_run or namespace.fetch_packages,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment