Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Update your pypi index in s3
"""
Job can be run on mirror instance(s) to update local PyPi index
To use with S3, create ~/.boto or set BOTO_CONFIG when running:
[sudo] BOTO_CONFIG=/etc/boto_pypi.cfg python update_pypi.py flask -b your-pypi-s3-bucket
"""
import json
import logging
import os
import uuid
import datetime
from argparse import ArgumentParser
import requests
import BeautifulSoup
import boto
from boto.s3.key import Key
# Local pypi index path, if not using S3
PYPI_PATH = '/centos/pypi/web'
# Send files to S3 rather than downloading them locally, use tmp file
USE_S3 = False
TMP_FILE_FOR_S3 = os.path.join(PYPI_PATH, 'tos3.file.')
# File that tracks when index was last modified
LAST_MODIFIED_FILE = os.path.join(PYPI_PATH, 'last-modified')
S3_LAST_MODIFIED_FILE = 'last-modified'
# e.g. Full path /centos/pypi/web/packages/py2.py3/D/Django
PACKAGE_PATH = os.path.join(PYPI_PATH, 'packages') # Under that there is a version, and letter i.e d or D
FULL_PACKAGE_PATH = PACKAGE_PATH + '/{python_version}/{first_letter}/{package_name}'
S3_FULL_PACKAGE_PATH = 'packages/{python_version}/{first_letter}/{package_name}'
# Index (simple)
INDEX_PATH = os.path.join(PYPI_PATH, 'simple')
# i.e. /centos/pypi/web/simple/Django/index.html
FULL_INDEX_PATH = INDEX_PATH + '/{package_name}'
S3_FULL_INDEX_PATH = 'simple/{package_name}'
# Index link, insert after </h1>
LINK_HTML = '<a href="../../packages/{python_version}/{first_letter}/{package_name}/{filename}#md5={md5_digest}" ' \
'rel="internal">{filename}</a><br/>'
# Package info url
PYPI_API_URL = 'https://pypi.python.org/pypi/{package_name}/json'
# Skip .exe files
SKIP_WINDOWS_PACKAGES = True
# Respect the >= == <= when passed with package version
# This is not fully implemented, only == functionality is working
LIMIT_PACKAGE_VERSIONS = True
# Skip dev packages
SKIP_DEV_ALPHA_BETA_RC_PACKAGES = True
# Minimum release date, set to None to ignore
PYPI_MIN_UPLOAD_TIME = '2013-01-01T00:00:00'
# S3 Bucket CONN
S3_BUCKET = None
S3_CONN = None
def log_output(message, log_level=logging.INFO, ):
if log_level != logging.DEBUG:
print(message)
logging.info("PyPi updater> {}".format(message))
def write_last_modified(date_string):
"""
Writes last modified file with date string contents
"""
try:
with open(LAST_MODIFIED_FILE, 'w') as f:
f.write(date_string)
except Exception as e:
log_output("Unable to write last modified file. {} {}".format(type(e).__name__, e))
def write_last_modified_to_s3(date_string):
"""
Writes last modified date string to s3
"""
s3_key = Key(S3_BUCKET)
s3_key.key = S3_LAST_MODIFIED_FILE
s3_key.set_contents_from_string(date_string)
s3_key.set_acl('public-read')
def save_to_local(filename, python_version, package_name, md5_digest, package_url):
"""
Download and save the file locally the the file system
:return:
"""
first_letter = package_name[0]
# Make dirs for packages
cur_package_path = FULL_PACKAGE_PATH.format(python_version=python_version,
first_letter=first_letter,
package_name=package_name)
if not os.path.exists(cur_package_path):
os.makedirs(cur_package_path)
# Download the package file to correct place if not already exists
file_path = os.path.join(cur_package_path, filename)
if not os.path.exists(file_path):
message = "Downloading package {}".format(file_path)
log_output(message)
r = requests.get(package_url)
open(file_path, 'wb').write(r.content)
else:
message = "Already downloaded package {}, continuing ...".format(file_path)
log_output(message)
return False # The file already exists
# Make dirs for the index.html file for the current package
cur_index_path = FULL_INDEX_PATH.format(package_name=package_name)
if not os.path.exists(cur_index_path):
os.makedirs(cur_index_path)
# Check if index exists
file_path = os.path.join(cur_index_path, 'index.html')
# Index file handling
if not os.path.exists(file_path):
# Create file or update
message = "Creating new index file {}".format(file_path)
log_output(message)
index_file = open(file_path, 'w')
first_line = '<html><head><title>Links for {0}</title><meta name="api-version" value="2"/>' \
'</head><body><h1>Links for {0}</h1>'.format(package_name)
index_file.write(first_line)
# Write the link to the index file
package_html = LINK_HTML.format(first_letter=first_letter,
python_version=python_version,
package_name=package_name,
filename=filename,
md5_digest=md5_digest)
index_file.write(package_html)
last_line = '</body></html>'
index_file.write(last_line)
index_file.close()
else:
# File already exists
message = "Updating index file {}".format(file_path)
log_output(message)
# Update the file using BeautifulSoup
with open(file_path) as index_file:
txt = index_file.read()
soup = BeautifulSoup.BeautifulSoup(txt)
# Create new link, Write the link
package_html = LINK_HTML.format(first_letter=first_letter,
python_version=python_version,
package_name=package_name,
filename=filename,
md5_digest=md5_digest)
# Insert it into the document
soup.body.append(BeautifulSoup.BeautifulSoup(package_html))
# Save the file again, keeping it pretty
with open(file_path, "w") as index_file:
index_file.write(str(soup.prettify()))
return True
def save_to_s3(filename, python_version, package_name, md5_digest, package_url):
"""
Download and save the file to s3
:return:
"""
first_letter = package_name[0]
# Make dirs for packages
cur_package_path = S3_FULL_PACKAGE_PATH.format(python_version=python_version,
first_letter=first_letter,
package_name=package_name)
# Download the package file to correct place if not already exists
file_path = os.path.join(cur_package_path, filename)
key_exists = S3_BUCKET.get_key(file_path)
if not key_exists:
message = "Downloading package {}".format(file_path)
log_output(message)
r = requests.get(package_url)
tmp_file = TMP_FILE_FOR_S3 + str(uuid.uuid4())
open(tmp_file, 'wb').write(r.content)
s3_key = Key(S3_BUCKET)
s3_key.key = file_path
s3_key.set_contents_from_filename(tmp_file)
s3_key.set_acl('public-read')
# Cleanup temporary file
os.remove(tmp_file)
else:
message = "Already downloaded package {}, continuing ...".format(file_path)
log_output(message)
return False # The file already exists
# Make dirs for the index.html file for the current package
cur_index_path = S3_FULL_INDEX_PATH.format(package_name=package_name)
# Check if index exists
file_path = os.path.join(cur_index_path, 'index.html')
key_exists = S3_BUCKET.get_key(file_path)
# Index file handling
if not key_exists:
# Create file or update
message = "Creating new index file {}".format(file_path)
log_output(message)
tmp_file = TMP_FILE_FOR_S3 + str(uuid.uuid4())
index_file = open(tmp_file, 'w')
first_line = '<html><head><title>Links for {0}</title><meta name="api-version" value="2"/>' \
'</head><body><h1>Links for {0}</h1>'.format(package_name)
index_file.write(first_line)
# Write the link to the index file
package_html = LINK_HTML.format(first_letter=first_letter,
python_version=python_version,
package_name=package_name,
filename=filename,
md5_digest=md5_digest)
index_file.write(package_html)
last_line = '</body></html>'
index_file.write(last_line)
index_file.close()
else:
# File already exists
log_output("Updating index file {}".format(file_path))
# Download file from S3
s3_key = Key(S3_BUCKET)
s3_key.key = file_path
tmp_file = TMP_FILE_FOR_S3 + str(uuid.uuid4())
s3_key.get_contents_to_filename(tmp_file)
# Update the file using BeautifulSoup
with open(tmp_file) as index_file:
txt = index_file.read()
soup = BeautifulSoup.BeautifulSoup(txt)
# Create new link, Write the link
package_html = LINK_HTML.format(first_letter=first_letter,
python_version=python_version,
package_name=package_name,
filename=filename,
md5_digest=md5_digest)
# Insert it into the document
soup.body.append(BeautifulSoup.BeautifulSoup(package_html))
# Save the file again, keeping it pretty
with open(tmp_file, "w") as index_file:
index_file.write(str(soup.prettify()))
# Upload file to S3
s3_key = Key(S3_BUCKET)
s3_key.key = file_path
s3_key.set_contents_from_filename(tmp_file)
s3_key.set_acl('public-read')
# Cleanup the temporary file
os.remove(tmp_file)
return True
def process_package(package_name, dependency_for=None):
"""
Processes an individual package or line in a requirements.txt file
:param dependency_for:
:param package_name:
:return: bool success
"""
success = False
if dependency_for:
message = "Checking dependency package {}".format(package_name)
log_output(message)
# Figure out what if there is a less than greater than or equal for a package
release_compare_symbol = package_version = None
if LIMIT_PACKAGE_VERSIONS:
if "<=" in package_name:
release_compare_symbol = "<="
elif ">=" in package_name:
release_compare_symbol = "<="
elif "==" in package_name:
release_compare_symbol = "=="
# Cleanup the name
package_name = package_name.replace('>', '=').replace('<', '=').replace(' ', '').replace('\n', '').\
replace(';', '=').replace(',', '=')
# Skip comments in file
if not package_name or package_name.startswith('#'):
log_output("PyPi updater> Skipping comment in file.", log_level=logging.DEBUG)
return False
# Sanitize package name from requirements file
package_name_split = package_name.split('=')
package_name = package_name_split[0]
if release_compare_symbol:
package_version = package_name_split[-1]
package_details_url = PYPI_API_URL.format(package_name=package_name)
response = requests.get(package_details_url)
if response.status_code != 200:
message = "Could not find package {}".format(package_name)
log_output(message, log_level=logging.ERROR)
return False
else:
# Updated package name with correct case
try:
data = json.loads(response.content)
except ValueError:
message = "No JSON for package {} at url {}, continuing...".format(package_name, package_details_url)
log_output(message, log_level=logging.ERROR)
return False
package_name = data['info']['name']
releases = data.get('releases')
# Recursively get each dependency
dependencies = data['info'].get('requires_dist')
dependencies2 = data['info'].get('requires_dist')
if dependencies2:
dependencies.extend(dependencies2)
dependencies = list(set(dependencies))
if dependencies:
for dependency in dependencies:
process_package(package_name=dependency, dependency_for=package_name)
for release in releases.keys():
release_data = releases[release]
release = release.lower()
# Skip dev releases, if skip set and specific version not specified
if SKIP_DEV_ALPHA_BETA_RC_PACKAGES and release_data and release_compare_symbol != '==' and \
('rc' in release or 'dev' in release or 'alpha' in release or 'beta' in release):
message = "Skipping dev release {} for package {}".format(release, package_name)
log_output(message, log_level=logging.DEBUG)
continue
# Get release if there is info for the release
if release_data:
# Figure out if only getting a range or specific package version
if package_version and release_compare_symbol:
if release_compare_symbol == '==':
if str(release) != package_version:
continue
elif release_compare_symbol == '>=':
if str(release) < package_version:
continue
elif release_compare_symbol == '<=':
if str(release) != package_version:
continue
# Print release info
message = "Checking package {} release {}".format(package_name, release)
log_output(message)
# Get all the different packages for the current release
for release_package in release_data:
# Relevant information about the release
package_type = release_package['packagetype']
python_version = release_package['python_version']
package_url = release_package['url']
filename = release_package['filename']
md5_digest = release_package['md5_digest']
upload_time = release_package['upload_time']
# Skip windows package release
if '_win' in package_type and SKIP_WINDOWS_PACKAGES:
log_output("Skipping windows package {}".format(filename), log_level=logging.DEBUG)
continue
# Skip old packages if date is set and not looking for a specific package version ==
if PYPI_MIN_UPLOAD_TIME and upload_time < PYPI_MIN_UPLOAD_TIME and release_compare_symbol != '==':
log_output("Skipping old package {}, dated {}".format(filename, upload_time),
log_level=logging.INFO)
continue
# Package types: source distribution (sdist) or built distribution (bdist_*)
log_output("The package type is {}, version {}, url {}, file {}, md5 {}"
"".format(package_type, python_version, package_url, filename, md5_digest),
log_level=logging.DEBUG)
save_to_file = save_to_s3 if USE_S3 else save_to_local
success = save_to_file(filename=filename, python_version=python_version,
package_name=package_name, md5_digest=md5_digest,
package_url=package_url)
return success
def process_package_or_file(req_file_path=None, package_name=None):
"""
Process package or file wrapper
:param req_file_path:
:param package_name:
:return:
"""
# Start message
message = "Staring index update"
index_modified = False
log_output(message)
# Determine if processing single package or file path
if req_file_path:
# Iterate over each package in the requirements file
for line in open(req_file_path).readlines():
success = process_package(package_name=line)
if success:
index_modified = True
elif package_name:
# Process single package
success = process_package(package_name=package_name)
if success:
index_modified = True
# Update the timestamp for the last-modified file if new packages added to index
if index_modified:
now_string = datetime.datetime.now().isoformat()
log_output("Updates detected, updating last-modified file")
write_last_modified_to_s3(now_string) if USE_S3 else write_last_modified(now_string)
def main():
# Setup logging
if not os.path.exists(PYPI_PATH):
message = "Base PyPi index path '{}' not found, creating...".format(PYPI_PATH)
print(message)
logging.warning("PyPi updater> {}".format(message))
os.makedirs(PYPI_PATH)
log_filename = os.path.join(PYPI_PATH, 'indexer.log')
log_level = logging.DEBUG
logging.basicConfig(filename=log_filename, level=log_level)
# Setup arg parser, takes a single package for input or requirements file using -r flag
parser = ArgumentParser()
parser.add_argument('package_name', nargs='?')
parser.add_argument('-r', '--requirement', dest='req_file_path', required=False, help='Optional requirement file')
parser.add_argument('-b', '--s3-bucket', '--s3', dest='s3_bucket', required=False, help='Use S3 bucket as storage')
args = vars(parser.parse_args())
req_file_path = args.get('req_file_path')
package_name = args.get('package_name')
s3_bucket = args.get('s3_bucket')
if not (req_file_path or package_name):
raise Exception("Requirements file or package name is required, none given")
if req_file_path and package_name:
raise Exception("Requirements file or package name is required, not both")
if req_file_path and not os.path.exists(_req_file_path):
raise Exception("Requirements file not found")
# Use S3, by default uses local storage
if s3_bucket:
global USE_S3
USE_S3 = True
global S3_CONN
S3_CONN = boto.connect_s3(is_secure=False)
global S3_BUCKET
S3_BUCKET = S3_CONN.get_bucket(s3_bucket)
# Run main with either requirements file or package name
process_package_or_file(req_file_path=req_file_path, package_name=package_name)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment