Skip to content

Instantly share code, notes, and snippets.

@vsc55
Created August 5, 2023 08:12
Show Gist options
  • Save vsc55/b5828e5e4e2c51e506b0da4de6ca0c06 to your computer and use it in GitHub Desktop.
Save vsc55/b5828e5e4e2c51e506b0da4de6ca0c06 to your computer and use it in GitHub Desktop.
Auto generador de ebuilds para Plex Media Server
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# Utilidad para generar ebuild gentoo para las nuevas versiones de Plex.
#
# Copyright © 2023 Javier Pastor (aka VSC55)
# <jpastor at cerebelum dot net>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
__author__ = "VSC55"
__copyright__ = "Copyright © 2023, Javier Pastor"
__credits__ = "Javier Pastor"
__license__ = "GPL"
__version__ = "1.0.0"
__maintainer__ = 'Javier Pastor'
__email__ = "python@cerebelum.net"
__status__ = "Development"
# Dependencias necesarias:
# python -m pip install colorama
# python -m pip install requests
import os
import re
import requests
import subprocess
import shutil
import glob
from datetime import datetime
from colorama import init, Fore, Style
# Inicializar colorama
init()
# Definir configuración
# URL con info para obtener Token: https://support.plex.tv/articles/204059436-finding-an-authentication-token-x-plex-token/
url_plex = "https://plex.tv/api/downloads/5.json?channel=plexpass&X-Plex-Token=TOKENQUI"
path_ebuils = r"/usr/local/portage/media-tv/plex-media-server/"
path_changelog = r"/usr/local/portage/media-tv/plex-media-server/ChangeLog"
log_file = r"/var/log/updatePlexEbuild.log"
ebuild_name = "ebuild"
recipient_email = "you@mail.domain"
changelog_marker = "********************* Changelog *********************"
distfiles_src = r"/usr/portage/distfiles/"
distfiles_dst = r"/usr/local/portage/distfiles/"
# Logs
class Logger:
# Definición de tipos de mensajes para el log
class MsgType:
OK = 1
INFO = 2
WARNING = 3
ERROR = 4
def __init__(self, log_file = None):
"""
Inicializa la clase Logger con el archivo de registro especificado.
Args:
log_file (str): Ruta del archivo de registro donde se almacenarán los mensajes. Puede ser None si no se desea almacenar los mensajes en un archivo.
"""
self.log_file = log_file
def log(self, message, message_type=MsgType.INFO, send_mail=None, force_exit = None, subject = None, body = None):
"""
Registra un mensaje en el log y lo imprime en la consola con un color específico.
Puede enviar un correo electrónico opcionalmente.
Args:
message (str): El mensaje que se registrará en el log.
message_type (int): El tipo de mensaje. Debe ser uno de los valores de MsgType.
send_mail (bool): Si es True, enviará un correo electrónico con el mensaje.
force_exit (bool): Si es True, forzará la salida del programa después de imprimir el mensaje.
subject (str): El asunto del correo electrónico. Solo se utiliza si send_mail es True.
body (str): El cuerpo del correo electrónico. Solo se utiliza si send_mail es True.
"""
timestamp = self._formatted_timestamp()
# Diccionario que mapea message_type a las cadenas de prefijo y colores
type_mapping = {
self.MsgType.ERROR: ("[ERROR]", Fore.RED),
self.MsgType.WARNING: ("[WARNING]", Fore.YELLOW),
self.MsgType.INFO: ("[INFO]", Fore.GREEN),
}
# Obtener el valor predeterminado para message_type (opción por defecto)
prefix, color = type_mapping.get(message_type, ("[OK]", Fore.CYAN))
if send_mail is None:
send_mail = message_type == self.MsgType.ERROR
if force_exit is None:
force_exit = message_type == self.MsgType.ERROR
# ** EL CODIGO SIGUIENTE SE HA REMPLAZADO POR type_mapping **
# if message_type == self.MsgType.ERROR:
# prefix = "[ERROR]"
# color = Fore.RED
# if send_mail is None:
# send_mail = True
# if force_exit is None:
# force_exit = True
# elif message_type == self.MsgType.WARNING:
# prefix = "[WARNING]"
# color = Fore.YELLOW
# elif message_type == self.MsgType.INFO:
# prefix = "[INFO]"
# color = Fore.GREEN
# else:
# prefix = "[OK]"
# color = Fore.CYAN
# ** EL CODIGO SIGUIENTE SE HA REMPLAZADO POR type_mapping **
prefix_message = f"{timestamp} {prefix} - {message}"
if self.log_file is not None:
with open(self.log_file, "a") as file:
file.write(prefix_message + "\n")
print(color + prefix_message + Style.RESET_ALL)
if send_mail == True:
if subject is None:
subject = f"Plex Nueva Version - {prefix}"
if body is None:
body = message
send_email(subject, body, recipient_email)
if force_exit == True:
exit();
def _formatted_timestamp(self):
"""
Devuelve una cadena de tiempo formateada.
Returns:
str: Cadena de tiempo con formato "[%H:%M:%S - %d/%m/%Y]".
"""
now = datetime.now()
return now.strftime("[%H:%M:%S - %d/%m/%Y]")
# Datos JSON Web
class OnlineVersion:
def __init__(self, url, logger=None, get_data_on_init=True):
"""
Inicializa la clase OnlineVersion con la URL del JSON, un objeto Logger opcional y la opción para obtener los datos del JSON al inicializar.
Args:
url (str): URL del JSON que contiene la información de la versión en línea.
logger (Logger): Objeto Logger para registrar mensajes. Por defecto, es None.
get_data_on_init (bool): Si se deben obtener los datos del JSON al inicializar la clase. Por defecto, es True.
"""
self.url = url
self.version_full = None
self.version = None
self.hash = None
self.info_new = None
self.info_fix = None
self.logger = logger
if get_data_on_init:
self.refresh()
def _extract_hash(self, version_str = None):
"""
Extrae el HASH de la versión a partir de una cadena que contiene el número de versión y el HASH.
Args:
version_str (str): La cadena que contiene el número de versión y el HASH.
Returns:
str: El HASH extraído de la cadena. Si no se encuentra un HASH, devuelve None.
"""
if version_str is not None:
# Utilizamos una expresión regular para extraer el HASH omitido después del "-"
match = re.search(r'-(\w+)$', version_str)
if match:
return match.group(1)
return None
def _extract_version(self, version_str = None):
"""
Extrae el número de versión de una cadena que contiene el número de versión y el HASH.
Args:
version_str (str): La cadena que contiene el número de versión y el HASH.
Returns:
str: El número de versión extraído de la cadena. Si no se encuentra un número de versión, devuelve None.
"""
if version_str is not None:
# Utilizamos una expresión regular para extraer la versión sin el identificador después del "-"
match = re.match(r'(\d+\.\d+\.\d+\.\d+)', version_str)
if match:
return match.group(1)
return None
def refresh(self):
"""
Obtiene los datos del JSON en línea y actualiza los atributos de la clase con la versión más reciente, el HASH y otra información relevante.
Returns:
bool: True si la actualización de los datos fue exitosa, False en caso contrario.
"""
self.version_full = None
self.version = None
self.hash = None
logMsg = ""
try:
# Realizar la solicitud GET a la URL
response = requests.get(self.url)
# Comprobar si la solicitud fue exitosa (código de estado 200)
if response.status_code == 200:
data = response.json()
if "computer" in data and "Linux" in data["computer"] and "version" in data["computer"]["Linux"]:
version_full = data["computer"]["Linux"]["version"]
if version_full.strip(): # Comprobar que la versión no está vacía
self.version_full = version_full
self.version = self._extract_version(self.get_version_full())
self.hash = self._extract_hash(self.get_version_full())
self.info_new = data["computer"]["Linux"]["items_added"]
self.info_fix = data["computer"]["Linux"]["items_fixed"]
return True
else:
logMsg = "La versión en el JSON está vacía!"
else:
logMsg = "La estructura del JSON no es la esperada!"
else:
logMsg = f"Error al obtener la URL. Código de estado: {response.status_code}"
except requests.RequestException as e:
logMsg = f"Error en la solicitud: {e}"
except KeyError:
logMsg = "La estructura del JSON no es la esperada!"
except Exception as e:
logMsg = f"Error inesperado: {e}"
if self.logger:
self.logger.log(logMsg, self.logger.MsgType.ERROR)
return False
def get_version_full(self):
"""
Devuelve la versión completa que contiene el número de versión y el HASH.
Returns:
str: Versión completa que contiene el número de versión y el HASH.
"""
return self.version_full
def get_version(self):
"""
Devuelve el número de versión extraído de la versión completa.
Returns:
str: Número de versión extraído de la versión completa.
"""
return self.version
def get_hash(self):
"""
Devuelve el HASH extraído de la versión completa.
Returns:
str: HASH extraído de la versión completa.
"""
return self.hash
def get_info_new(self):
"""
Devuelve la información sobre las novedades de la versión más reciente.
Returns:
str: Información sobre las novedades de la versión más reciente.
"""
return self.info_new
def get_info_fix(self):
"""
Devuelve la información sobre las correcciones de la versión más reciente.
Returns:
str: Información sobre las correcciones de la versión más reciente.
"""
return self.info_fix
# Ficheros ebuild locales
class VersionManager:
def __init__(self, ebuilds_path = None, new_version = None, logger=None):
"""
Inicializa la clase VersionManager con la ruta de los ebuilds locales, la nueva versión y un objeto Logger opcional.
Args:
ebuilds_path (str): Ruta de la carpeta que contiene los ebuilds locales.
new_version (str): Nueva versión del ebuild que se está actualizando.
logger (Logger): Objeto Logger para registrar mensajes. Por defecto, es None.
"""
self.logger = logger
self.ebuilds_path = ebuilds_path
self.new_version = new_version
self.ebuild_latest_version = None
self.latest_version = None
self.refresh_latest_version()
def refresh_latest_version(self):
"""
Actualiza la versión más reciente del ebuild local y la nueva versión especificada en la clase.
Returns:
bool: True si se encontró la versión más reciente del ebuild, False en caso contrario.
"""
self.ebuild_latest_version = None
self.latest_version = None
self.ebuild_latest_version = self.find_latest_version_file()
self.latest_version = self.get_version_from_filename(self.get_ebuild_latest_version())
if self.ebuild_latest_version is not None:
return True
else:
return False
def get_version_from_filename(self, filename = None):
"""
Extrae el número de versión del nombre del archivo ebuild.
Args:
filename (str): Nombre del archivo ebuild.
Returns:
str: Número de versión extraído del nombre del archivo. Si no se encuentra un número de versión, devuelve None.
"""
if filename is not None:
# Utilizamos una expresión regular para extraer la versión del nombre del archivo
match = re.search(r'(\d+\.\d+\.\d+\.\d+)', filename)
if match:
return match.group(1)
return None
def find_latest_version_file(self):
"""
Busca y devuelve el nombre del archivo ebuild que tiene la versión más reciente en la carpeta especificada.
Returns:
str: Nombre del archivo ebuild con la versión más reciente. Si no se encuentra ningún ebuild, devuelve None.
"""
latest_version = None
latest_file = None
directory = self.ebuilds_path
# Recorremos los archivos en el directorio
for filename in os.listdir(directory):
if filename.endswith(".ebuild"):
version = self.get_version_from_filename(filename)
if version:
if latest_version is None or version > latest_version:
latest_version = version
latest_file = filename
return latest_file
def get_ebuild_file(self, version = None):
"""
Genera el nombre del archivo ebuild basado en la versión especificada o la nueva versión de la clase.
Args:
version (str): Versión del ebuild. Si no se proporciona, se utilizará la nueva versión de la clase.
Returns:
str: Nombre del archivo ebuild.
"""
if not version:
version = self.new_version
file_ebuild = f"plex-media-server-{version}.ebuild"
return file_ebuild
def get_ebuild_path(self, version = None):
"""
Genera la ruta completa del archivo ebuild basada en la versión especificada o la nueva versión de la clase.
Args:
version (str): Versión del ebuild. Si no se proporciona, se utilizará la nueva versión de la clase.
Returns:
str: Ruta completa del archivo ebuild.
"""
file_ebuild = self.get_ebuild_file(version)
path_ebuild = os.path.join(self.ebuilds_path, file_ebuild)
return path_ebuild
def is_exist_version(self, version = None):
"""
Comprueba si el archivo ebuild con la versión especificada o la nueva versión de la clase existe.
Args:
version (str): Versión del ebuild. Si no se proporciona, se utilizará la nueva versión de la clase.
Returns:
bool: True si el archivo ebuild existe, False en caso contrario.
"""
path_ebuild = self.get_ebuild_path(version)
return os.path.exists(path_ebuild)
def get_ebuild_latest_version(self):
"""
Obtiene el nombre del archivo ebuild correspondiente a la última versión disponible.
Returns:
str: Nombre del archivo ebuild de la última versión disponible.
"""
return self.ebuild_latest_version
def get_latest_version(self):
"""
Devuelve la versión más reciente del ebuild.
Returns:
str: Versión más reciente del ebuild.
"""
return self.latest_version
def is_exist_latest_version(self):
"""
Comprueba si el archivo ebuild con la versión más reciente existe.
Returns:
bool: True si el archivo ebuild con la versión más reciente existe, False en caso contrario.
"""
return self.is_exist_version(self.get_latest_version())
def is_program_installed(program_name):
"""
Comprueba si un programa está instalado en el sistema.
Args:
program_name (str): Nombre del programa a verificar.
Returns:
bool: True si el programa está instalado, False en caso contrario.
"""
try:
# Verificar si el programa existe utilizando el comando 'which' en sistemas Unix/Linux
if os.name == 'posix': # Si el sistema es Unix/Linux
subprocess.check_output(['which', program_name], stderr=subprocess.STDOUT)
else: # Si el sistema es Windows
# Verificar si el programa existe utilizando el comando 'where'
subprocess.check_output(['where', program_name], stderr=subprocess.STDOUT)
return True
except subprocess.CalledProcessError:
return False
def send_email(subject, body, recipient_email):
"""
Envía un correo electrónico utilizando el comando 'mail' de Linux.
Args:
subject (str): Asunto del correo electrónico.
body (str): Cuerpo del correo electrónico.
recipient_email (str): Dirección de correo electrónico del destinatario.
Returns:
bool: True si el correo electrónico se envió correctamente, False en caso contrario.
"""
try:
if not recipient_email:
logger.log("La dirección de correo electrónico del destinatario no se proporcionó. No se enviará el correo.", message_type=logger.MsgType.WARNING)
return False
# Construir el comando mail
command = f'echo "{body}" | mail -s "{subject}" {recipient_email}'
# Ejecutar el comando mail
subprocess.run(command, shell=True)
logger.log("Correo electrónico enviado con éxito." , message_type=logger.MsgType.OK)
return True
except Exception as e:
logger.log(f"Error al enviar el correo electrónico: {e}", message_type=logger.MsgType.ERROR, force_exit=False)
return False
if __name__ == "__main__":
print("Update ebuild Plex ({0}) By {1}".format(__version__, __author__))
print("")
# Iniciamos log
logger = Logger(log_file)
# Comprobamos si el path de los ebuils existe
if not os.path.exists(path_ebuils):
logger.log(f"La ruta {path_ebuils} no existe!", message_type=logger.MsgType.ERROR)
# Obtener la versión del JSON desde la web
OnlineVer = OnlineVersion(url_plex, logger=logger, get_data_on_init=False)
OnlineVer.refresh()
json_version = OnlineVer.get_version_full()
version = OnlineVer.get_version()
hash = OnlineVer.get_hash()
if not json_version or not version or not hash:
logger.log("No se pudo obtener la versión o el hash!", message_type=logger.MsgType.ERROR)
else:
logger.log(f"Online: {json_version}")
# Obtenemos versiones locales
files_manager = VersionManager(path_ebuils, version, logger)
file_ebuild = files_manager.get_ebuild_file()
path_ebuild = files_manager.get_ebuild_path()
if files_manager.is_exist_version():
logger.log(f"La versión {version} existe, no hay nada que hacer.", message_type=logger.MsgType.OK, force_exit=True)
else:
logger.log(f"El archivo de la versión {version} no existe, inicio proceso de creacion...")
latest_version = files_manager.get_latest_version()
if not files_manager.is_exist_latest_version():
logger.log("No se encontro ninguna versión en la carpeta!", message_type=logger.MsgType.ERROR)
else:
logger.log(f"La versión más moderna es: {latest_version}")
# Copiar el archivo y realizar el reemplazo
try:
path_ebuild_source = files_manager.get_ebuild_path(latest_version)
with open(path_ebuild_source, "r") as source:
content = source.read()
# Verificar si MAGIC2 existe en el contenido original
if 'MAGIC2="' not in content:
logger.log("MAGIC2 no encontrado en el archivo origen!", message_type=logger.MsgType.ERROR)
# Utilizamos una expresión regular para buscar y reemplazar MAGIC2 con el nuevo valor
content = re.sub(r'MAGIC2="[^"]*"', f'MAGIC2="{hash}"', content)
# Verificar si MAGIC2 fue reemplazado correctamente (no quedó vacío)
if 'MAGIC2=""' in content:
logger.log("El reemplazo de MAGIC2 resultó en un valor vacío!", message_type=logger.MsgType.ERROR)
with open(path_ebuild, "w", newline='\n') as destination:
destination.write(content)
logger.log(f"Nuevo archivo creado: {file_ebuild}", message_type=logger.MsgType.OK)
except Exception as e:
logger.log(f"Error al copiar y reemplazar el archivo: {e}", message_type=logger.MsgType.ERROR)
# Procesamos Changelog
new = OnlineVer.get_info_new()
fix = OnlineVer.get_info_fix()
try:
with open(path_changelog, "r", encoding="utf-8") as file_changelog:
content = file_changelog.read()
# Buscar la posición de la versión en el contenido del archivo
version_position = content.find(version)
# Comprobar si la versión existe en el archivo
if version_position == -1:
logger.log(f"La versión {version} no existe en el changelog, añadiendo...", logger.MsgType.INFO)
new_text_changelog = f"""\n\n\n
Plex Media Server {version}
-----------------------------------------
NEW:
{new}
FIX:
{fix}
"""
changelog_position = content.find(changelog_marker)
if changelog_position != -1:
# Encontrar el inicio del siguiente párrafo después de "********************* Changelog *********************"
next_paragraph_position = content.find("\n", changelog_position + len(changelog_marker))
if next_paragraph_position == -1:
# Si no hay otro párrafo después de "********************* Changelog *********************", insertar el nuevo texto al final
content = content[:changelog_position] + changelog_marker + new_text_changelog + content[changelog_position:]
else:
# Insertar el nuevo texto justo después de "********************* Changelog *********************"
content = content[:next_paragraph_position] + new_text_changelog + content[next_paragraph_position:]
else:
# Si no se encontró "********************* Changelog *********************", agregarlo al final del archivo
content += "\n" + changelog_marker + "\n" + new_text_changelog
with open(path_changelog, "w", encoding="utf-8") as file:
file.write(content)
logger.log("Info ChangeLog añadido correctamente.", logger.MsgType.OK)
else:
logger.log(f"La versión {version} ya existe en el ChangeLog!", logger.MsgType.INFO)
except FileNotFoundError:
logger.log("El archivo no fue encontrado.", logger.MsgType.WARNING)
except Exception as e:
logger.log(f"Error al procesar el archivo: {e}", logger.MsgType.WARNING)
# Verificar si el programa está instalado
if not is_program_installed(ebuild_name):
logger.log(f"El programa {ebuild_name} no está instalado en el sistema, no estamos en Gentoo!", message_type=logger.MsgType.WARNING, force_exit=True)
# Comando que deseas ejecutar
command_to_execute = f"ebuild {path_ebuild} digest"
try:
logger.log(f"Actualizamos ebuild...")
with open(log_file, "a") as cmdrunlog:
# Ejecutar el comando y esperar a que termine
subprocess.run(command_to_execute, shell=True, check=True, stdout=cmdrunlog, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
logger.log(f"Error al ejecutar el comando: {e}", message_type=logger.MsgType.ERROR)
except FileNotFoundError as e:
logger.log(f"Comando no encontrado: {e}", message_type=logger.MsgType.ERROR)
# Copias los archivos descargados al hacer digest
# plexmediaserver_1.32.5.7349-8f4248874_i386.deb
files_downloads = f"plexmediaserver_{json_version}_*.deb"
matching_files = glob.glob(os.path.join(distfiles_src, files_downloads))
# Copiar los archivos que coinciden con el patrón a la carpeta de destino
for file_path in matching_files:
file_name = os.path.basename(file_path)
destination_path = os.path.join(distfiles_dst, file_name)
if not os.path.exists(destination_path):
try:
shutil.copy(file_path, distfiles_dst)
logger.log(f"Archivo {file_name} copiado con éxito.", message_type=logger.MsgType.OK)
except Exception as e:
logger.log(f"Error al copiar el archivo {file_name}: {e}", message_type=logger.MsgType.WARNING, send_mail= True)
else:
logger.log(f"El archivo {file_name} ya existe.", message_type=logger.MsgType.INFO)
subject = f"Plex Nueva Version ({version}) - [OK]"
body = f"New:\n {new}\n\nFix:\n{fix}"
logger.log("Proceso finalizado correctamente!", message_type=logger.MsgType.OK, send_mail=True, subject=subject, body=body)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment