Skip to content

Instantly share code, notes, and snippets.

@santeri3700
Created January 8, 2024 12:56
Show Gist options
  • Save santeri3700/89766bd3bc4ea6af618f697c04e7689b to your computer and use it in GitHub Desktop.
Save santeri3700/89766bd3bc4ea6af618f697c04e7689b to your computer and use it in GitHub Desktop.
A script for regenerating Uyuni DEB channel repodata for channels with invalid Packages files (uyuni#7788)
#!/usr/bin/python3
import datetime
import os
try:
from spacewalk.server import rhnSQL
except ImportError:
print("## Could not import spacewalk.server.rhnSQL! This script needs to be run on the Uyuni Server.")
exit(1)
# This script is published under public domain.
# Use at your own risk! No warranties provided!
# Author: Santeri Pikarinen (github.com/santeri3700)
repodata_cache_dir = "/var/cache/rhn/repodata/" # Trailing slash is required
# Get list of channel labels (subdirectories) under the repodata directory
# These directories and Packages files are genereated by Taskomatic DebPackageWriter
try:
repodata_channel_labels = os.listdir(repodata_cache_dir)
except FileNotFoundError:
print(f"## The {repodata_cache_dir} directory does not exist. Are you not running this on the Uyuni Server?")
exit(1)
# print(f"Checking Packages files under the following channel labels: {repodata_channel_labels}\n")
# Loop through each channel label and open the Packages file for reading only
all_invalid_packages = []
channels_to_be_regen = []
for repodata_channel_label in repodata_channel_labels:
try:
with open(repodata_cache_dir + repodata_channel_label + "/Packages", "r") as packages_file:
invalid_packages = []
packages_file_mtime = os.path.getmtime(repodata_cache_dir + repodata_channel_label + "/Packages")
packages_file_iso8601 = datetime.datetime.fromtimestamp(packages_file_mtime).isoformat()
packagesgz_file_mtime = os.path.getmtime(repodata_cache_dir + repodata_channel_label + "/Packages.gz")
packagesgz_file_iso8601 = datetime.datetime.fromtimestamp(packages_file_mtime).isoformat()
for line in packages_file:
if line.startswith("Filename: "):
# Example line: "Filename: mychannel-ubuntu2204_amd64/getPackage/mypackage_1.2.3-4.amd64-deb.deb"
# Strip the "Filename: " part without being length specific
filepath = line.replace("Filename: ", "").strip()
# Split the filepath per "/" into a list
filepath_parts = filepath.split("/")
# Get the channel label from the filepath
filepath_channel_label = filepath_parts[0]
# Get the package name from the filepath
filepath_package_name = filepath_parts[2]
# The package filename is invalid if the channel label is not the same as the repodata channel label
if repodata_channel_label != filepath_channel_label:
invalid_package = {
'real_channel': repodata_channel_label,
'wrong_channel': filepath_channel_label,
'package_name': filepath_package_name,
'filepath': filepath
}
invalid_packages.append(invalid_package)
all_invalid_packages.append(invalid_package)
if invalid_packages:
print(f"### {repodata_channel_label} ###")
print(f"# Packages file last modified: {packages_file_iso8601}")
print(f"# Packages.gz file last modified: {packages_file_iso8601}")
print("# Downloading these packages will fail if not subscribed to the channel they are pointing to.")
for p in invalid_packages:
print(
"# - Package "
f"'{p['package_name']}' "
"present in "
f"'{p['real_channel']}'"
" but points to "
f"'{p['wrong_channel']}'"
)
print("#"*15+"\n")
# Prompt for regenerating the Packages file
if invalid_packages:
regen_answer = input(
"Do you want to regenerate the Packages file for channel "
f"'{repodata_channel_label}'"
"? [y/N] ")
if regen_answer.lower() == "y":
channels_to_be_regen.append(repodata_channel_label)
except FileNotFoundError:
# This channel is probably not a DEB channel if the "Packages" file doesn't exist
pass
if all_invalid_packages:
print(
f"## There are total of {len(all_invalid_packages)} packages "
"which are pointing to a wrong channel in the repodata/Packages file."
)
if channels_to_be_regen:
print(
f"## There are total of {len(channels_to_be_regen)} channels "
"which will be regenerated as per your request."
)
# Connect to database
try:
rhnSQL.initDB()
except Exception as e:
print("## Could not connect to the database!")
print(e)
exit(1)
# Loop through each channel to be regenerated
for channel_label in channels_to_be_regen:
print(f"## Regenerating Packages file for channel '{channel_label}'")
# Add the channel to the rhnRepoRegenQueue table
# https://github.com/uyuni-project/uyuni/blob/Uyuni-2023.12/python/spacewalk/server/taskomatic.py#L36-L50
add_query = """insert into rhnRepoRegenQueue
(id, channel_label, client, reason, force, bypass_filters,
next_action, created, modified)
values (
sequence_nextval('rhn_repo_regen_queue_id_seq'),
:channel, :client, :reason, :force, :bypass_filters,
current_timestamp, current_timestamp, current_timestamp
)
"""
statement = rhnSQL.prepare(add_query)
result = statement.execute(
channel=channel_label,
client="uyuni_invalid_deb_repodata_regen.py",
reason="Invalid Packages file",
force="t",
bypass_filters="f"
)
if result == 1:
rhnSQL.commit()
else:
print("## ERROR: Could not add the channel to the rhnRepoRegenQueue table! Continuing...")
# Get rhnRepoRegenQueue contents
get_query = """select id, channel_label, client, reason, force, bypass_filters,
next_action, created, modified
from rhnRepoRegenQueue
order by id desc
"""
statement = rhnSQL.prepare(get_query)
statement.execute()
rows = statement.fetchall()
if rows:
print("## rhnRepoRegenQueue:")
for row in rows:
print(
f"- [{row[0]}] "
f"{row[2]}: "
f"'{row[1]}'"
" - "
f"\"{row[3]}\""
" - "
f"{row[8]}"
)
else:
print("## rhnRepoRegenQueue is empty!")
print("Done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment