Created
August 5, 2023 08:12
-
-
Save vsc55/b5828e5e4e2c51e506b0da4de6ca0c06 to your computer and use it in GitHub Desktop.
Auto generador de ebuilds para Plex Media Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Utilidad para generar ebuild gentoo para las nuevas versiones de Plex. | |
# | |
# Copyright © 2023 Javier Pastor (aka VSC55) | |
# <jpastor at cerebelum dot net> | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
__author__ = "VSC55" | |
__copyright__ = "Copyright © 2023, Javier Pastor" | |
__credits__ = "Javier Pastor" | |
__license__ = "GPL" | |
__version__ = "1.0.0" | |
__maintainer__ = 'Javier Pastor' | |
__email__ = "python@cerebelum.net" | |
__status__ = "Development" | |
# Dependencias necesarias: | |
# python -m pip install colorama | |
# python -m pip install requests | |
import os | |
import re | |
import requests | |
import subprocess | |
import shutil | |
import glob | |
from datetime import datetime | |
from colorama import init, Fore, Style | |
# Inicializar colorama | |
init() | |
# Definir configuración | |
# URL con info para obtener Token: https://support.plex.tv/articles/204059436-finding-an-authentication-token-x-plex-token/ | |
url_plex = "https://plex.tv/api/downloads/5.json?channel=plexpass&X-Plex-Token=TOKENQUI" | |
path_ebuils = r"/usr/local/portage/media-tv/plex-media-server/" | |
path_changelog = r"/usr/local/portage/media-tv/plex-media-server/ChangeLog" | |
log_file = r"/var/log/updatePlexEbuild.log" | |
ebuild_name = "ebuild" | |
recipient_email = "you@mail.domain" | |
changelog_marker = "********************* Changelog *********************" | |
distfiles_src = r"/usr/portage/distfiles/" | |
distfiles_dst = r"/usr/local/portage/distfiles/" | |
# Logs | |
class Logger: | |
# Definición de tipos de mensajes para el log | |
class MsgType: | |
OK = 1 | |
INFO = 2 | |
WARNING = 3 | |
ERROR = 4 | |
def __init__(self, log_file = None): | |
""" | |
Inicializa la clase Logger con el archivo de registro especificado. | |
Args: | |
log_file (str): Ruta del archivo de registro donde se almacenarán los mensajes. Puede ser None si no se desea almacenar los mensajes en un archivo. | |
""" | |
self.log_file = log_file | |
def log(self, message, message_type=MsgType.INFO, send_mail=None, force_exit = None, subject = None, body = None): | |
""" | |
Registra un mensaje en el log y lo imprime en la consola con un color específico. | |
Puede enviar un correo electrónico opcionalmente. | |
Args: | |
message (str): El mensaje que se registrará en el log. | |
message_type (int): El tipo de mensaje. Debe ser uno de los valores de MsgType. | |
send_mail (bool): Si es True, enviará un correo electrónico con el mensaje. | |
force_exit (bool): Si es True, forzará la salida del programa después de imprimir el mensaje. | |
subject (str): El asunto del correo electrónico. Solo se utiliza si send_mail es True. | |
body (str): El cuerpo del correo electrónico. Solo se utiliza si send_mail es True. | |
""" | |
timestamp = self._formatted_timestamp() | |
# Diccionario que mapea message_type a las cadenas de prefijo y colores | |
type_mapping = { | |
self.MsgType.ERROR: ("[ERROR]", Fore.RED), | |
self.MsgType.WARNING: ("[WARNING]", Fore.YELLOW), | |
self.MsgType.INFO: ("[INFO]", Fore.GREEN), | |
} | |
# Obtener el valor predeterminado para message_type (opción por defecto) | |
prefix, color = type_mapping.get(message_type, ("[OK]", Fore.CYAN)) | |
if send_mail is None: | |
send_mail = message_type == self.MsgType.ERROR | |
if force_exit is None: | |
force_exit = message_type == self.MsgType.ERROR | |
# ** EL CODIGO SIGUIENTE SE HA REMPLAZADO POR type_mapping ** | |
# if message_type == self.MsgType.ERROR: | |
# prefix = "[ERROR]" | |
# color = Fore.RED | |
# if send_mail is None: | |
# send_mail = True | |
# if force_exit is None: | |
# force_exit = True | |
# elif message_type == self.MsgType.WARNING: | |
# prefix = "[WARNING]" | |
# color = Fore.YELLOW | |
# elif message_type == self.MsgType.INFO: | |
# prefix = "[INFO]" | |
# color = Fore.GREEN | |
# else: | |
# prefix = "[OK]" | |
# color = Fore.CYAN | |
# ** EL CODIGO SIGUIENTE SE HA REMPLAZADO POR type_mapping ** | |
prefix_message = f"{timestamp} {prefix} - {message}" | |
if self.log_file is not None: | |
with open(self.log_file, "a") as file: | |
file.write(prefix_message + "\n") | |
print(color + prefix_message + Style.RESET_ALL) | |
if send_mail == True: | |
if subject is None: | |
subject = f"Plex Nueva Version - {prefix}" | |
if body is None: | |
body = message | |
send_email(subject, body, recipient_email) | |
if force_exit == True: | |
exit(); | |
def _formatted_timestamp(self): | |
""" | |
Devuelve una cadena de tiempo formateada. | |
Returns: | |
str: Cadena de tiempo con formato "[%H:%M:%S - %d/%m/%Y]". | |
""" | |
now = datetime.now() | |
return now.strftime("[%H:%M:%S - %d/%m/%Y]") | |
# Datos JSON Web | |
class OnlineVersion: | |
def __init__(self, url, logger=None, get_data_on_init=True): | |
""" | |
Inicializa la clase OnlineVersion con la URL del JSON, un objeto Logger opcional y la opción para obtener los datos del JSON al inicializar. | |
Args: | |
url (str): URL del JSON que contiene la información de la versión en línea. | |
logger (Logger): Objeto Logger para registrar mensajes. Por defecto, es None. | |
get_data_on_init (bool): Si se deben obtener los datos del JSON al inicializar la clase. Por defecto, es True. | |
""" | |
self.url = url | |
self.version_full = None | |
self.version = None | |
self.hash = None | |
self.info_new = None | |
self.info_fix = None | |
self.logger = logger | |
if get_data_on_init: | |
self.refresh() | |
def _extract_hash(self, version_str = None): | |
""" | |
Extrae el HASH de la versión a partir de una cadena que contiene el número de versión y el HASH. | |
Args: | |
version_str (str): La cadena que contiene el número de versión y el HASH. | |
Returns: | |
str: El HASH extraído de la cadena. Si no se encuentra un HASH, devuelve None. | |
""" | |
if version_str is not None: | |
# Utilizamos una expresión regular para extraer el HASH omitido después del "-" | |
match = re.search(r'-(\w+)$', version_str) | |
if match: | |
return match.group(1) | |
return None | |
def _extract_version(self, version_str = None): | |
""" | |
Extrae el número de versión de una cadena que contiene el número de versión y el HASH. | |
Args: | |
version_str (str): La cadena que contiene el número de versión y el HASH. | |
Returns: | |
str: El número de versión extraído de la cadena. Si no se encuentra un número de versión, devuelve None. | |
""" | |
if version_str is not None: | |
# Utilizamos una expresión regular para extraer la versión sin el identificador después del "-" | |
match = re.match(r'(\d+\.\d+\.\d+\.\d+)', version_str) | |
if match: | |
return match.group(1) | |
return None | |
def refresh(self): | |
""" | |
Obtiene los datos del JSON en línea y actualiza los atributos de la clase con la versión más reciente, el HASH y otra información relevante. | |
Returns: | |
bool: True si la actualización de los datos fue exitosa, False en caso contrario. | |
""" | |
self.version_full = None | |
self.version = None | |
self.hash = None | |
logMsg = "" | |
try: | |
# Realizar la solicitud GET a la URL | |
response = requests.get(self.url) | |
# Comprobar si la solicitud fue exitosa (código de estado 200) | |
if response.status_code == 200: | |
data = response.json() | |
if "computer" in data and "Linux" in data["computer"] and "version" in data["computer"]["Linux"]: | |
version_full = data["computer"]["Linux"]["version"] | |
if version_full.strip(): # Comprobar que la versión no está vacía | |
self.version_full = version_full | |
self.version = self._extract_version(self.get_version_full()) | |
self.hash = self._extract_hash(self.get_version_full()) | |
self.info_new = data["computer"]["Linux"]["items_added"] | |
self.info_fix = data["computer"]["Linux"]["items_fixed"] | |
return True | |
else: | |
logMsg = "La versión en el JSON está vacía!" | |
else: | |
logMsg = "La estructura del JSON no es la esperada!" | |
else: | |
logMsg = f"Error al obtener la URL. Código de estado: {response.status_code}" | |
except requests.RequestException as e: | |
logMsg = f"Error en la solicitud: {e}" | |
except KeyError: | |
logMsg = "La estructura del JSON no es la esperada!" | |
except Exception as e: | |
logMsg = f"Error inesperado: {e}" | |
if self.logger: | |
self.logger.log(logMsg, self.logger.MsgType.ERROR) | |
return False | |
def get_version_full(self): | |
""" | |
Devuelve la versión completa que contiene el número de versión y el HASH. | |
Returns: | |
str: Versión completa que contiene el número de versión y el HASH. | |
""" | |
return self.version_full | |
def get_version(self): | |
""" | |
Devuelve el número de versión extraído de la versión completa. | |
Returns: | |
str: Número de versión extraído de la versión completa. | |
""" | |
return self.version | |
def get_hash(self): | |
""" | |
Devuelve el HASH extraído de la versión completa. | |
Returns: | |
str: HASH extraído de la versión completa. | |
""" | |
return self.hash | |
def get_info_new(self): | |
""" | |
Devuelve la información sobre las novedades de la versión más reciente. | |
Returns: | |
str: Información sobre las novedades de la versión más reciente. | |
""" | |
return self.info_new | |
def get_info_fix(self): | |
""" | |
Devuelve la información sobre las correcciones de la versión más reciente. | |
Returns: | |
str: Información sobre las correcciones de la versión más reciente. | |
""" | |
return self.info_fix | |
# Ficheros ebuild locales | |
class VersionManager: | |
def __init__(self, ebuilds_path = None, new_version = None, logger=None): | |
""" | |
Inicializa la clase VersionManager con la ruta de los ebuilds locales, la nueva versión y un objeto Logger opcional. | |
Args: | |
ebuilds_path (str): Ruta de la carpeta que contiene los ebuilds locales. | |
new_version (str): Nueva versión del ebuild que se está actualizando. | |
logger (Logger): Objeto Logger para registrar mensajes. Por defecto, es None. | |
""" | |
self.logger = logger | |
self.ebuilds_path = ebuilds_path | |
self.new_version = new_version | |
self.ebuild_latest_version = None | |
self.latest_version = None | |
self.refresh_latest_version() | |
def refresh_latest_version(self): | |
""" | |
Actualiza la versión más reciente del ebuild local y la nueva versión especificada en la clase. | |
Returns: | |
bool: True si se encontró la versión más reciente del ebuild, False en caso contrario. | |
""" | |
self.ebuild_latest_version = None | |
self.latest_version = None | |
self.ebuild_latest_version = self.find_latest_version_file() | |
self.latest_version = self.get_version_from_filename(self.get_ebuild_latest_version()) | |
if self.ebuild_latest_version is not None: | |
return True | |
else: | |
return False | |
def get_version_from_filename(self, filename = None): | |
""" | |
Extrae el número de versión del nombre del archivo ebuild. | |
Args: | |
filename (str): Nombre del archivo ebuild. | |
Returns: | |
str: Número de versión extraído del nombre del archivo. Si no se encuentra un número de versión, devuelve None. | |
""" | |
if filename is not None: | |
# Utilizamos una expresión regular para extraer la versión del nombre del archivo | |
match = re.search(r'(\d+\.\d+\.\d+\.\d+)', filename) | |
if match: | |
return match.group(1) | |
return None | |
def find_latest_version_file(self): | |
""" | |
Busca y devuelve el nombre del archivo ebuild que tiene la versión más reciente en la carpeta especificada. | |
Returns: | |
str: Nombre del archivo ebuild con la versión más reciente. Si no se encuentra ningún ebuild, devuelve None. | |
""" | |
latest_version = None | |
latest_file = None | |
directory = self.ebuilds_path | |
# Recorremos los archivos en el directorio | |
for filename in os.listdir(directory): | |
if filename.endswith(".ebuild"): | |
version = self.get_version_from_filename(filename) | |
if version: | |
if latest_version is None or version > latest_version: | |
latest_version = version | |
latest_file = filename | |
return latest_file | |
def get_ebuild_file(self, version = None): | |
""" | |
Genera el nombre del archivo ebuild basado en la versión especificada o la nueva versión de la clase. | |
Args: | |
version (str): Versión del ebuild. Si no se proporciona, se utilizará la nueva versión de la clase. | |
Returns: | |
str: Nombre del archivo ebuild. | |
""" | |
if not version: | |
version = self.new_version | |
file_ebuild = f"plex-media-server-{version}.ebuild" | |
return file_ebuild | |
def get_ebuild_path(self, version = None): | |
""" | |
Genera la ruta completa del archivo ebuild basada en la versión especificada o la nueva versión de la clase. | |
Args: | |
version (str): Versión del ebuild. Si no se proporciona, se utilizará la nueva versión de la clase. | |
Returns: | |
str: Ruta completa del archivo ebuild. | |
""" | |
file_ebuild = self.get_ebuild_file(version) | |
path_ebuild = os.path.join(self.ebuilds_path, file_ebuild) | |
return path_ebuild | |
def is_exist_version(self, version = None): | |
""" | |
Comprueba si el archivo ebuild con la versión especificada o la nueva versión de la clase existe. | |
Args: | |
version (str): Versión del ebuild. Si no se proporciona, se utilizará la nueva versión de la clase. | |
Returns: | |
bool: True si el archivo ebuild existe, False en caso contrario. | |
""" | |
path_ebuild = self.get_ebuild_path(version) | |
return os.path.exists(path_ebuild) | |
def get_ebuild_latest_version(self): | |
""" | |
Obtiene el nombre del archivo ebuild correspondiente a la última versión disponible. | |
Returns: | |
str: Nombre del archivo ebuild de la última versión disponible. | |
""" | |
return self.ebuild_latest_version | |
def get_latest_version(self): | |
""" | |
Devuelve la versión más reciente del ebuild. | |
Returns: | |
str: Versión más reciente del ebuild. | |
""" | |
return self.latest_version | |
def is_exist_latest_version(self): | |
""" | |
Comprueba si el archivo ebuild con la versión más reciente existe. | |
Returns: | |
bool: True si el archivo ebuild con la versión más reciente existe, False en caso contrario. | |
""" | |
return self.is_exist_version(self.get_latest_version()) | |
def is_program_installed(program_name): | |
""" | |
Comprueba si un programa está instalado en el sistema. | |
Args: | |
program_name (str): Nombre del programa a verificar. | |
Returns: | |
bool: True si el programa está instalado, False en caso contrario. | |
""" | |
try: | |
# Verificar si el programa existe utilizando el comando 'which' en sistemas Unix/Linux | |
if os.name == 'posix': # Si el sistema es Unix/Linux | |
subprocess.check_output(['which', program_name], stderr=subprocess.STDOUT) | |
else: # Si el sistema es Windows | |
# Verificar si el programa existe utilizando el comando 'where' | |
subprocess.check_output(['where', program_name], stderr=subprocess.STDOUT) | |
return True | |
except subprocess.CalledProcessError: | |
return False | |
def send_email(subject, body, recipient_email): | |
""" | |
Envía un correo electrónico utilizando el comando 'mail' de Linux. | |
Args: | |
subject (str): Asunto del correo electrónico. | |
body (str): Cuerpo del correo electrónico. | |
recipient_email (str): Dirección de correo electrónico del destinatario. | |
Returns: | |
bool: True si el correo electrónico se envió correctamente, False en caso contrario. | |
""" | |
try: | |
if not recipient_email: | |
logger.log("La dirección de correo electrónico del destinatario no se proporcionó. No se enviará el correo.", message_type=logger.MsgType.WARNING) | |
return False | |
# Construir el comando mail | |
command = f'echo "{body}" | mail -s "{subject}" {recipient_email}' | |
# Ejecutar el comando mail | |
subprocess.run(command, shell=True) | |
logger.log("Correo electrónico enviado con éxito." , message_type=logger.MsgType.OK) | |
return True | |
except Exception as e: | |
logger.log(f"Error al enviar el correo electrónico: {e}", message_type=logger.MsgType.ERROR, force_exit=False) | |
return False | |
if __name__ == "__main__": | |
print("Update ebuild Plex ({0}) By {1}".format(__version__, __author__)) | |
print("") | |
# Iniciamos log | |
logger = Logger(log_file) | |
# Comprobamos si el path de los ebuils existe | |
if not os.path.exists(path_ebuils): | |
logger.log(f"La ruta {path_ebuils} no existe!", message_type=logger.MsgType.ERROR) | |
# Obtener la versión del JSON desde la web | |
OnlineVer = OnlineVersion(url_plex, logger=logger, get_data_on_init=False) | |
OnlineVer.refresh() | |
json_version = OnlineVer.get_version_full() | |
version = OnlineVer.get_version() | |
hash = OnlineVer.get_hash() | |
if not json_version or not version or not hash: | |
logger.log("No se pudo obtener la versión o el hash!", message_type=logger.MsgType.ERROR) | |
else: | |
logger.log(f"Online: {json_version}") | |
# Obtenemos versiones locales | |
files_manager = VersionManager(path_ebuils, version, logger) | |
file_ebuild = files_manager.get_ebuild_file() | |
path_ebuild = files_manager.get_ebuild_path() | |
if files_manager.is_exist_version(): | |
logger.log(f"La versión {version} existe, no hay nada que hacer.", message_type=logger.MsgType.OK, force_exit=True) | |
else: | |
logger.log(f"El archivo de la versión {version} no existe, inicio proceso de creacion...") | |
latest_version = files_manager.get_latest_version() | |
if not files_manager.is_exist_latest_version(): | |
logger.log("No se encontro ninguna versión en la carpeta!", message_type=logger.MsgType.ERROR) | |
else: | |
logger.log(f"La versión más moderna es: {latest_version}") | |
# Copiar el archivo y realizar el reemplazo | |
try: | |
path_ebuild_source = files_manager.get_ebuild_path(latest_version) | |
with open(path_ebuild_source, "r") as source: | |
content = source.read() | |
# Verificar si MAGIC2 existe en el contenido original | |
if 'MAGIC2="' not in content: | |
logger.log("MAGIC2 no encontrado en el archivo origen!", message_type=logger.MsgType.ERROR) | |
# Utilizamos una expresión regular para buscar y reemplazar MAGIC2 con el nuevo valor | |
content = re.sub(r'MAGIC2="[^"]*"', f'MAGIC2="{hash}"', content) | |
# Verificar si MAGIC2 fue reemplazado correctamente (no quedó vacío) | |
if 'MAGIC2=""' in content: | |
logger.log("El reemplazo de MAGIC2 resultó en un valor vacío!", message_type=logger.MsgType.ERROR) | |
with open(path_ebuild, "w", newline='\n') as destination: | |
destination.write(content) | |
logger.log(f"Nuevo archivo creado: {file_ebuild}", message_type=logger.MsgType.OK) | |
except Exception as e: | |
logger.log(f"Error al copiar y reemplazar el archivo: {e}", message_type=logger.MsgType.ERROR) | |
# Procesamos Changelog | |
new = OnlineVer.get_info_new() | |
fix = OnlineVer.get_info_fix() | |
try: | |
with open(path_changelog, "r", encoding="utf-8") as file_changelog: | |
content = file_changelog.read() | |
# Buscar la posición de la versión en el contenido del archivo | |
version_position = content.find(version) | |
# Comprobar si la versión existe en el archivo | |
if version_position == -1: | |
logger.log(f"La versión {version} no existe en el changelog, añadiendo...", logger.MsgType.INFO) | |
new_text_changelog = f"""\n\n\n | |
Plex Media Server {version} | |
----------------------------------------- | |
NEW: | |
{new} | |
FIX: | |
{fix} | |
""" | |
changelog_position = content.find(changelog_marker) | |
if changelog_position != -1: | |
# Encontrar el inicio del siguiente párrafo después de "********************* Changelog *********************" | |
next_paragraph_position = content.find("\n", changelog_position + len(changelog_marker)) | |
if next_paragraph_position == -1: | |
# Si no hay otro párrafo después de "********************* Changelog *********************", insertar el nuevo texto al final | |
content = content[:changelog_position] + changelog_marker + new_text_changelog + content[changelog_position:] | |
else: | |
# Insertar el nuevo texto justo después de "********************* Changelog *********************" | |
content = content[:next_paragraph_position] + new_text_changelog + content[next_paragraph_position:] | |
else: | |
# Si no se encontró "********************* Changelog *********************", agregarlo al final del archivo | |
content += "\n" + changelog_marker + "\n" + new_text_changelog | |
with open(path_changelog, "w", encoding="utf-8") as file: | |
file.write(content) | |
logger.log("Info ChangeLog añadido correctamente.", logger.MsgType.OK) | |
else: | |
logger.log(f"La versión {version} ya existe en el ChangeLog!", logger.MsgType.INFO) | |
except FileNotFoundError: | |
logger.log("El archivo no fue encontrado.", logger.MsgType.WARNING) | |
except Exception as e: | |
logger.log(f"Error al procesar el archivo: {e}", logger.MsgType.WARNING) | |
# Verificar si el programa está instalado | |
if not is_program_installed(ebuild_name): | |
logger.log(f"El programa {ebuild_name} no está instalado en el sistema, no estamos en Gentoo!", message_type=logger.MsgType.WARNING, force_exit=True) | |
# Comando que deseas ejecutar | |
command_to_execute = f"ebuild {path_ebuild} digest" | |
try: | |
logger.log(f"Actualizamos ebuild...") | |
with open(log_file, "a") as cmdrunlog: | |
# Ejecutar el comando y esperar a que termine | |
subprocess.run(command_to_execute, shell=True, check=True, stdout=cmdrunlog, stderr=subprocess.STDOUT) | |
except subprocess.CalledProcessError as e: | |
logger.log(f"Error al ejecutar el comando: {e}", message_type=logger.MsgType.ERROR) | |
except FileNotFoundError as e: | |
logger.log(f"Comando no encontrado: {e}", message_type=logger.MsgType.ERROR) | |
# Copias los archivos descargados al hacer digest | |
# plexmediaserver_1.32.5.7349-8f4248874_i386.deb | |
files_downloads = f"plexmediaserver_{json_version}_*.deb" | |
matching_files = glob.glob(os.path.join(distfiles_src, files_downloads)) | |
# Copiar los archivos que coinciden con el patrón a la carpeta de destino | |
for file_path in matching_files: | |
file_name = os.path.basename(file_path) | |
destination_path = os.path.join(distfiles_dst, file_name) | |
if not os.path.exists(destination_path): | |
try: | |
shutil.copy(file_path, distfiles_dst) | |
logger.log(f"Archivo {file_name} copiado con éxito.", message_type=logger.MsgType.OK) | |
except Exception as e: | |
logger.log(f"Error al copiar el archivo {file_name}: {e}", message_type=logger.MsgType.WARNING, send_mail= True) | |
else: | |
logger.log(f"El archivo {file_name} ya existe.", message_type=logger.MsgType.INFO) | |
subject = f"Plex Nueva Version ({version}) - [OK]" | |
body = f"New:\n {new}\n\nFix:\n{fix}" | |
logger.log("Proceso finalizado correctamente!", message_type=logger.MsgType.OK, send_mail=True, subject=subject, body=body) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment