Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
from klein import Klein
from twisted.python.filepath import FilePath
import logging
import time
import treq
class pypiProxyService(object):
pypiURL = "https://pypi.python.org/simple/"
downloadCache = FilePath("cache/")
indexCacheTime = 86400 # 1 day
app = Klein()
packageData = {}
def _fetchPyPIURL(self, path):
return treq.get(self.pypiURL + path)
@app.route("/<string:package>/")
def _package(self, request, package):
def _cache(result):
self.packageData[package] = {"data": result, "time": time.time()}
print "Saved cached index for {}".format(package)
return result
if self.packageData.get(package):
if self.packageData[package]["time"] > time.time() - self.indexCacheTime:
print "Served cached index for " + package
return self.packageData[package]["data"]
print "Cached index for {} is stale".format(package)
d = self._fetchPyPIURL(request.uri)
d.addCallback(treq.content)
d.addCallback(_cache)
return d
@app.route("/../packages/<string:version>/<string:letter>/<string:package>/<string:packagename>")
def _download(self, request, version, letter, package, packagename):
def _cache(result):
with self.downloadCache.child(version + "-" + packagename).open("w") as f:
f.write(result)
print "Saved package {} to cache.".format(packagename)
return result
if version + "-" + packagename in self.downloadCache.listdir():
print "Served cached package " + packagename
with self.downloadCache.child(version + "-" + packagename).open() as f:
pkgData = f.read()
return pkgData
d = self._fetchPyPIURL(request.uri)
d.addCallback(treq.content)
d.addCallback(_cache)
return d
p = pypiProxyService()
p.app.run("localhost", 8080)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.