Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Upload RPM to Pulp Repository and Publish
#!/usr/bin/env python
import os
import sys
import requests
import argparse
import json
import time
pulp_user = 'admin'
pulp_pass = 'admin'
def read_in_chunks(file_object, chunk_size=65536):
while True:
data =
if not data:
yield data
def progress_bar(progress, total, last_update):
# Print a friendly progress bar
target_percent = last_update * 5
current_percent = (float(progress)/total) * 100
if current_percent > target_percent or target_percent == 100:
while current_percent > (last_update * 5):
last_update += 1
sys.stdout.write("[%-20s] %d%%" % ('='*last_update, 5*last_update))
return last_update + 1
return last_update
def import_upload(base_url, upload_id, repo):
# Import the upload to the repo
global pulp_user
global pulp_pass
url = "%s/pulp/api/v2/repositories/%s/actions/import_upload/" % (base_url, repo)
payload = {
"override_config": {},
"unit_type_id": "rpm",
"upload_id": "%s" % upload_id,
"unit_key": {}
res =, data=json.dumps(payload), verify=False, auth=(pulp_user, pulp_pass))
return json.loads(res.text)['spawned_tasks'][0]['_href']
def publish_repo(base_url, repo):
# Publish the repo
global pulp_user
global pulp_pass
url = "%s/pulp/api/v2/repositories/%s/actions/publish/" % (base_url, repo)
payload = { 'id': 'yum_distributor' }
res =, data=json.dumps(payload), verify=False, auth=(pulp_user, pulp_pass))
return json.loads(res.text)['spawned_tasks'][0]['_href']
def check_task_status(base_url, task_href):
# Retrieve the task status
global pulp_user
global pulp_pass
url = base_url + task_href
res = requests.get(url, verify=False, auth=(pulp_user, pulp_pass))
res = json.loads(res.text)
# Check for errors
if res['state'] == 'error':
if 'traceback' in res.keys():
print res['traceback']
print "Unknown error"
elif res['state'] == 'finished':
if type(res['result']['details']) == list:
return res['state']
if 'errors' in res['result']['details'].keys():
print "Task could not complete:"
print "\n".join(res['result']['details']['errors'])
return res['state']
def request_upload(base_url):
# Request an upload id
global pulp_user
global pulp_pass
url = "%s/pulp/api/v2/content/uploads/" % base_url
res =, verify=False, auth=(pulp_user, pulp_pass))
return json.loads(res.text)['upload_id']
def delete_upload(base_url, upload_id):
# Delete the upload request
url = "%s/pulp/api/v2/content/uploads/%s" % (base_url, upload_id)
res = requests.delete(url, verify=False, auth=(pulp_user, pulp_pass))
return res.status_code
def chunk_upload(base_url, upload_id, filename):
global pulp_user
global pulp_pass
index = 0
headers = {'Content-Type': 'application/octet-stream'}
url = "%s/pulp/api/v2/content/uploads/%s" % (base_url, upload_id)
# Determine file information
if not os.path.isfile(filename):
print "Cannot open file '%s'" % filename
fh = open(filename, 'r')
content_path = os.path.abspath(filename)
content_size = os.stat(content_path).st_size
# Upload the file
print "Upload ID: %s" % upload_id
print "Uploading: %s [%s]" % (filename, content_size)
for chunk in read_in_chunks(fh):
offset = index + len(chunk)
headers['Content-length'] = content_size
headers['Content-Range'] = 'bytes %s-%s/%s' % (index, offset, content_size)
url_with_offset = "%s/%i/" % (url, index)
res = requests.put(url_with_offset, data=chunk, headers=headers, verify=False, auth=(pulp_user, pulp_pass))
if res.status_code != 200:
print "Error uploading chunk to %s: %s [%s]" % (url_with_offset, res.reason, res.status_code)
progress_update = progress_bar(offset, content_size, progress_update)
index = offset
def main(args):
global pulp_user
global pulp_pass
pulp_user = args.username
pulp_pass = args.password
base_url = "https://%s" % args.server
import_res = ''
publish_res = ''
# Request an upload_id
upload_id = request_upload(base_url)
chunk_upload(base_url, upload_id, args.file)
print ""
# Import the file into the repo
print "Importing %s into %s repository..." % (args.file, args.repo)
import_task_href = import_upload(base_url, upload_id, args.repo)
while import_res != "finished":
import_res = check_task_status(base_url, import_task_href)
# Publish the repo
print "Publishing %s repository..." % args.repo
publish_task_href = publish_repo(base_url, args.repo)
while publish_res != "finished":
publish_res = check_task_status(base_url, publish_task_href)
# Delete the upload request
print "Cleaning up..."
delete_upload(base_url, upload_id)
if __name__ == '__main__':
# create the top-level parser
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--username', type=str, default='admin')
parser.add_argument('-p', '--password', type=str, default='admin')
parser.add_argument('-s', '--server', type=str, required=True)
parser.add_argument('-f', '--file', type=str, required=True)
parser.add_argument('-r', '--repo', type=str, required=True)
args = parser.parse_args()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.