Create a gist now

Instantly share code, notes, and snippets.

@orf /pypi_counter.py Secret
Created Dec 21, 2013

# Don't hate on the code. It was made quickly.
from progressbar import ProgressBar
from progressbar.widgets import ETA, Counter, Bar, Percentage
from archive import extract
import xmlrpclib
import multiprocessing
import requests
import io
import urlparse
import subprocess
import tempfile
import os
import csv
import shutil
import json
import rfc6266
import zipfile
import tarfile
DOWNLOAD_DIRECTORY = "E:\\PyPi\\"
def process_package(package_name):
# Stage 1: Retrieve the package info from pypi. Some packages from list_packages() don't seem
# to exist. In this case the server returns a non-json response, which we catch and return None
try:
package_info = requests.get("https://pypi.python.org/pypi/%s/json" % package_name).json()
except Exception:
return package_name, None
info = package_info["info"]
# Returner is the object we will be returning (funnily enough).
# It contains a bunch of info about the package.
# Keys to note:
# * errors: A list of things that went wrong while processing this package.
# This could be an exception while downloading or processing the package, like the archive being
# corrupt or unsafe.
# * languages: A dict of languages, the keys are the language name and the values are a dict containing
# the number of code lines, comments and blank spaces.
returner = {"home_page_host": urlparse.urlparse(info["home_page"] or "").hostname,
"downloads": info["downloads"]["last_month"],
"version": info["version"],
"package_types": [x["packagetype"] for x in package_info["urls"]],
"download_url": info.get("download_url", None),
"license": info.get("license", None),
"docs_url": info.get("docs_url", None),
"source_size": None,
"errors": [],
"languages": {}}
# Lets make a unique temporary directory to work with
processing_dir = tempfile.mkdtemp(dir=DOWNLOAD_DIRECTORY)
# We want to process source distribution packages (sdist) first, bdist_dumb then bdist_egg's next.
# This creates a list with sdist url's first, then bdist_dumb's and then finally bdist_egg's.
to_download = []
for ptype in ("sdist", "bdist_dumb", "bdist_egg"):
to_download.extend(filter(lambda x: x["packagetype"] == ptype, package_info["urls"]))
# Sometimes packages only have a "download_url" and not any actual packages hosted on PyPi.
# These links are either to a file (an archive or a .py file) or to a download page.
if info.get("download_url", None) not in (None, "UNKNOWN", ""):
p = urlparse.urlparse(info["download_url"])
filename = os.path.split(p.path)[1]
url = info["download_url"]
# Sometimes the URL's dont have a scheme. Assume its http:// in that case
if not p.scheme:
url = "http://%s" % info["download_url"]
# If its not a HTML file (most likely a download page rather than a file)
if filename and not filename.endswith(".html"):
to_download.append({"url": url,
"filename": filename})
for download_info in to_download:
try:
resp = requests.get(download_info['url'], stream=True)
except Exception:
returner["errors"].append("DOWNLOAD_FAILED")
continue
# Get the filename from the server. In some cases (like the github tarball service) the path is
# somesite.com/somepath with no extension, that returns the filename in the content-disposition header.
# We need to extract this, as the archive library needs to know the extension to work out how to extract it.
if "content-disposition" in resp.headers:
# I'm not sure if this is safe, so its likely not.
# We could use .filename_sanitized() but we don't know the extension :S
filename = os.path.basename(rfc6266.parse_requests_response(resp).filename_unsafe)
else:
# As a fallback use the filename in the download_info.
filename = download_info["filename"]
source_archive_path = os.path.join(processing_dir, filename)
with open(source_archive_path, "wb") as download_fd:
for chunk in resp.iter_content(chunk_size=2028):
download_fd.write(chunk)
returner["source_size"] = os.path.getsize(source_archive_path)
# If the source_archive_path doesn't have an extension for whatever reason then we need to guess it.
if not os.path.splitext(source_archive_path)[1]:
# Check if its a zipfile or a tarfile
for func, ext in ((zipfile.is_zipfile, ".zip"),
(tarfile.is_tarfile, ".tar.gz")):
if func(source_archive_path):
print "Detected that %s has extension %s" % (download_info["url"], ext)
os.rename(source_archive_path, source_archive_path + ext)
source_archive_path = source_archive_path + ext
break
source_directory = os.path.join(processing_dir, "s")
if not source_archive_path.endswith(".py"):
# Attempt to extract the archive, storing any exceptions in the "errors" key
try:
extract(source_archive_path, source_directory, safe=True)
except Exception, e:
print source_archive_path
print "%s while extracting download %s for %s" % (type(e), download_info["url"], package_name)
returner["errors"].append(type(e).__name__)
continue
else:
# Its just a single .py file, make that the "source_directory" for clock.exe to run over
os.rename(source_archive_path, source_directory)
# Execute clock.exe on the source directory.
source_counter_output = subprocess.check_output([os.path.abspath("clock.exe"), "--quiet", "--csv", source_directory])
# The output will be a newline, followed by a header, followed by the actual results in CSV format.
# So we can just skip the first two lines and feed the rest into a csv.reader.
csv_reader = csv.reader(source_counter_output.split("\n")[2:])
for row in csv_reader:
if not row:
continue
returner["languages"][row[1]] = {
"files": row[0],
"blank": row[2],
"comment": row[3],
"code": row[4]
}
break
# Attempt to remove the processing_dir.
# This fails sometimes on Windows and I don't know why (likely permission based, often with .git index files).
shutil.rmtree(processing_dir, ignore_errors=True)
if os.path.isdir(processing_dir):
print "Could not remove %s" % processing_dir
return package_name, returner
if __name__ == "__main__":
client = xmlrpclib.ServerProxy('http://pypi.python.org/pypi')
all_packages = client.list_packages()
print "Found %s packages" % len(all_packages)
pool = multiprocessing.Pool()
bar = ProgressBar(maxval=len(all_packages),
widgets=["Processing (", Percentage(), "): ", Bar(), " ", Counter(), " ", ETA()]).start()
with io.open("results.txt", "wb") as output_file:
for result in pool.imap_unordered(process_package, all_packages):
bar.update(bar.currval + 1)
output_file.write("%s: %s\n" % (result[0], json.dumps(result[1])))
bar.finish()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment