Skip to content

Instantly share code, notes, and snippets.

@rfguimaraes
Created February 14, 2024 07:25
Snippet showing a tentative distribution update with pulp deb
import time
import pulpcore
from pulpcore.client.pulp_deb import (
AptRepositorySyncURL,
DebAptDistribution,
DebAptRemote,
DebAptRepository,
DistributionsAptApi,
RemotesAptApi,
RepositoriesAptApi,
DebVerbatimPublication,
PublicationsVerbatimApi,
PatcheddebAptDistribution,
)
from pulpcore.client.pulpcore import Configuration
def wait_for_href(core_client, response):
if not hasattr(response, "task"):
return response.pulp_href
tasks_api = pulpcore.client.pulpcore.TasksApi(api_client=core_client)
while True:
task = tasks_api.read(response.task)
if task.state not in ("waiting", "running"):
return task.created_resources[0]
time.sleep(1)
if __name__ == "__main__":
# Note: tested using the suggested setup of OCI images with docker-compose
repo_name = "kitware-test"
configuration = Configuration(
username="admin",
password="password",
host="http://localhost:8080",
)
core_client = pulpcore.client.pulpcore.ApiClient(configuration)
deb_client = pulpcore.client.pulp_deb.ApiClient(configuration)
# Create repository
repo_api = RepositoriesAptApi(api_client=deb_client)
repo_href = wait_for_href(core_client, repo_api.create(
DebAptRepository(name=repo_name)),
)
# Create remote
remote_api = RemotesAptApi(api_client=deb_client)
remote = DebAptRemote(
name=repo_name,
url="http://apt.kitware.com/ubuntu",
distributions="jammy",
architectures="amd64",
download_concurrency=1,
sync_sources=False,
policy="on_demand",
)
remote_href = wait_for_href(core_client, remote_api.create(remote))
# Create distribution
dist_api = DistributionsAptApi(api_client=deb_client)
deb_dist = DebAptDistribution(name=repo_name, base_path=repo_name)
dist_href = wait_for_href(core_client, dist_api.create(deb_dist))
# Sync repository with remote
sync_url = AptRepositorySyncURL(
remote=remote_href,
mirror=False,
optimize=True,
)
repo_api.sync(
deb_apt_repository_href=repo_href,
apt_repository_sync_url=sync_url,
)
# Create publication
publication_api = PublicationsVerbatimApi(api_client=deb_client)
publication = DebVerbatimPublication(
repository=repo_href,
)
publication_href = wait_for_href(
core_client,
publication_api.create(publication),
)
# Everything up to this point works as expected.
# One can check the existence of the remote, repository, publication, and
# synced metadata via the cli.
# Additionally, trying to access
# http://localhost:8080/pulp/content/kitware-test/ gives
# "404: Distribution is not pointing to a publication, repository, repository version, or remote."
# Update distribution
# Here is where the problem lies
patched_dist = PatcheddebAptDistribution(
publication=publication_href,
)
dist_api.partial_update(
deb_apt_distribution_href=dist_href,
patcheddeb_apt_distribution=patched_dist,
)
# After this, accessing
# http://localhost:8080/pulp/content/kitware-test/ gives
# 404: Not Found, but if one does the update of the distribution via the cli,
# all the contents are there correctly
#
# Also tested using full updates instead of only partial updates and had the
# same issue.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment