Skip to content

Instantly share code, notes, and snippets.

@anthonyprintup
Last active September 15, 2022 08:01
Show Gist options
  • Save anthonyprintup/e6be7e0eb8defc3437c1a8f8ae2f53e3 to your computer and use it in GitHub Desktop.
Save anthonyprintup/e6be7e0eb8defc3437c1a8f8ae2f53e3 to your computer and use it in GitHub Desktop.
Script for managing a matrix-synapse server
from __future__ import annotations
from enum import IntEnum, auto as _enum_auto
# noinspection PyArgumentList
class ExitCode(IntEnum):
# No errors occurred
SUCCESS: ExitCode = 0
# Certificate related errors
CERTIFICATE_ERROR: ExitCode = _enum_auto()
# Synapse server related errors
SYNAPSE_ERROR: ExitCode = _enum_auto()
import sys
import logging
import asyncio
from asyncio import AbstractEventLoop
from subprocess import PIPE
# noinspection PyProtectedMember
from asyncio.subprocess import Process
from hashlib import md5
from pathlib import Path
from datetime import datetime
import yaml
import httpx
import psutil
from cryptography import x509
from cryptography.x509 import Certificate
from cryptography.x509.extensions import Extension, AuthorityInformationAccess, AccessDescription
from cryptography.hazmat.primitives.serialization import Encoding
from cli.exit_codes import ExitCode
# Synapse paths
SYNAPSE_PATH: Path = Path.home() / "synapse"
SYNAPSE_CONFIG_PATH: Path = SYNAPSE_PATH / "homeserver.yaml"
SYNAPSE_ACTIVATE_ENVIRONMENT_SCRIPT_PATH: Path = SYNAPSE_PATH / "env" / "bin" / "activate"
# Parse the homeserver config
SYNAPSE_CONFIG: dict = yaml.safe_load(SYNAPSE_CONFIG_PATH.read_bytes())
SYNAPSE_HOMESERVER_PID_PATH: Path = Path(SYNAPSE_CONFIG["pid_file"])
SYNAPSE_CERTIFICATE_PATH: Path = Path(SYNAPSE_CONFIG["tls_certificate_path"])
SYNAPSE_CERTIFICATE_KEY_PATH: Path = Path(SYNAPSE_CONFIG["tls_private_key_path"])
# cPanel SSL paths
SSL_CERTIFICATES_PATH: Path = Path.home() / "ssl" / "certs"
SSL_CERTIFICATES_KEYS_PATH: Path = Path.home() / "ssl" / "keys"
# Local cache path
SSL_CERTIFICATES_CACHE_PATH: Path = Path(__file__).parent.parent.absolute() / "certificate_cache"
SSL_CERTIFICATES_CACHE_PATH.mkdir(exist_ok=True)
def find_current_certificate() -> Certificate:
return x509.load_pem_x509_certificate(SYNAPSE_CERTIFICATE_PATH.read_bytes())
def find_latest_valid_certificate() -> Certificate:
now: datetime = datetime.now()
# Iterate over the certificates
valid_certificates: list[Certificate] = []
for certificate_file_path in (path for path in SSL_CERTIFICATES_PATH.iterdir()
if not path.is_dir() and path.suffix == ".crt"):
# Parse the certificate
certificate: Certificate = x509.load_pem_x509_certificate(certificate_file_path.read_bytes())
# Filter the certificates by issuer
if b"Let's Encrypt" not in certificate.issuer.public_bytes():
continue
# Filter expired certificates
if not certificate.not_valid_before <= now <= certificate.not_valid_after:
continue
valid_certificates.append(certificate)
# Check if any valid certificates were found
if not valid_certificates:
raise RuntimeError("Couldn't find any valid certificates.")
# Return the latest certificate
return max(valid_certificates, key=lambda cert: cert.not_valid_after)
def find_certificate_key(certificate: Certificate) -> bytes:
# Compute the key file prefix
public_key_n: str = f"{certificate.public_key().public_numbers().n:x}"
key_file_prefix: str = f"{public_key_n[:5]}_{public_key_n[-5:]}"
# Iterate all the keys
for key_path in (path for path in SSL_CERTIFICATES_KEYS_PATH.iterdir()
if not path.is_dir() and path.suffix == ".key"):
if not key_path.name.startswith(key_file_prefix):
continue
# Return the matching key
return key_path.read_bytes()
raise RuntimeError(f"Failed to find a key file for {certificate}.")
def lookup_issuer_certificate(access_location: str) -> Certificate:
if not access_location:
raise RuntimeError("Invalid access location provided.")
access_location_hash: str = md5(access_location.encode()).hexdigest()
# Check if the access location exists in the cache
for certificate_path in SSL_CERTIFICATES_CACHE_PATH.iterdir():
if certificate_path.name == access_location_hash:
return x509.load_pem_x509_certificate(certificate_path.read_bytes())
# Download the certificate, parse it, and save it to the cache
response: httpx.Response = httpx.get(access_location)
if not response.is_success:
raise RuntimeError(f"Failed to download the issuer certificate: {access_location!r}.")
# Parse the certificate
issuer_certificate: Certificate = x509.load_der_x509_certificate(response.content)
# Save the certificate to the cache
cached_access_location_path: Path = SSL_CERTIFICATES_CACHE_PATH / access_location_hash
cached_access_location_path.write_bytes(issuer_certificate.public_bytes(encoding=Encoding.PEM))
# Return the issuer certificate
return issuer_certificate
def generate_certificate_chain(certificate: Certificate, depth: int = 1) -> tuple[Certificate, ...]:
certificate_chain: list[Certificate] = [certificate]
current_certificate = certificate
for _ in range(depth, 0, -1):
# Attempt to get "AuthorityInformationAccess", will raise a TypeError if it doesn't exist
authority_information_access: Extension = (
current_certificate.extensions.get_extension_for_class(AuthorityInformationAccess))
ca_issuers_index: int = 1
ca_issuers_description: AccessDescription = authority_information_access.value[ca_issuers_index]
access_location: str = ca_issuers_description.access_location.value
# Download the certificate or fetch it from cache
issuer_certificate: Certificate = lookup_issuer_certificate(access_location)
# Append it to the certificate chain
certificate_chain.append(issuer_certificate)
# Parse the next certificate
current_certificate = issuer_certificate
return tuple(certificate_chain)
def regenerate_certificate_data(regenerate_if_valid: bool = False) -> None:
# Find the current certificate
current_certificate: Certificate = find_current_certificate()
# Find the latest valid certificate
valid_certificate: Certificate = find_latest_valid_certificate()
# Skip regenerating if the certificates match
if not regenerate_if_valid and current_certificate == valid_certificate:
logging.info("Current certificate matches latest valid certificate, skipping regeneration.")
return
# Generate a certificate chain
valid_certificate_key: bytes = find_certificate_key(certificate=valid_certificate)
certificate_chain: tuple[Certificate, ...] = generate_certificate_chain(certificate=valid_certificate)
# Save the certificate chain to the proper location
logging.info(f"Writing the certificate chain to {SYNAPSE_CERTIFICATE_PATH}.")
SYNAPSE_CERTIFICATE_PATH.write_bytes(
data=b"".join(certificate.public_bytes(encoding=Encoding.PEM) for certificate in certificate_chain))
# Save the certificate key to the proper location
logging.info(f"Writing the certificate key to {SYNAPSE_CERTIFICATE_KEY_PATH}.")
SYNAPSE_CERTIFICATE_KEY_PATH.write_bytes(data=valid_certificate_key)
def is_synapse_running() -> bool:
if SYNAPSE_HOMESERVER_PID_PATH.exists():
synapse_pid: int = int(SYNAPSE_HOMESERVER_PID_PATH.read_text())
return psutil.pid_exists(synapse_pid)
return False
async def start_synapse_server() -> None:
# Spawn a shell process
shell_process: Process = await asyncio.create_subprocess_exec(
program="/bin/bash", stdin=PIPE, stdout=PIPE, stderr=PIPE)
# Set up the shell environment
shell_process.stdin.write(data=f"cd {SYNAPSE_PATH}\n".encode())
shell_process.stdin.write(data=f"source {SYNAPSE_ACTIVATE_ENVIRONMENT_SCRIPT_PATH}\n".encode())
# Run the Synapse server
shell_process.stdin.write(data=b"synctl start && exit\n")
# Fetch the output from synctl
stdout, stderr = await shell_process.communicate()
if stdout:
logging.info(f"Output from synctl: {stdout.rstrip().decode()}")
if stderr:
logging.error(f"Errors from synctl: {stderr.rstrip().decode()}")
def main() -> ExitCode:
# Setup logging
logging.basicConfig(format="[%(asctime)s, %(levelname)s] %(message)s",
datefmt="%H:%M", level=logging.DEBUG,
stream=sys.stdout)
logging.getLogger("asyncio").setLevel(logging.WARNING)
# Regenerate certificates
logging.info("Attempting to regenerate certificate data.")
try:
regenerate_certificate_data()
except (RuntimeError, TypeError) as exception:
logging.error("Exception thrown in regenerate_certificate_data.")
logging.critical(exception, exc_info=True)
return ExitCode.CERTIFICATE_ERROR
# Manage synapse
if not is_synapse_running():
logging.info("Attempting to start the Synapse server.")
try:
event_loop: AbstractEventLoop = asyncio.new_event_loop()
event_loop.run_until_complete(start_synapse_server())
event_loop.close()
except (OSError, IOError) as exception:
logging.error("Exception thrown in start_synapse_server.")
logging.critical(exception, exc_info=True)
return ExitCode.SYNAPSE_ERROR
else:
logging.info("The Synapse server is already running.")
# Success
return ExitCode.SUCCESS
if __name__ == "__main__":
raise SystemExit(main())
@anthonyprintup
Copy link
Author

The script is intended to be used a cron job.

This is a proof of concept for generating certificate files and managing the Synapse server under the following configuration:

Automated steps:

  • find the current certificate which is being used (see find_current_certificate),
  • find the latest certificate which is valid (filters for Let's Encrypt certificates by default; see find_latest_valid_certificate),
  • check to see if the current certificate matches the latest certificate, and skip regeneration if it does,
  • generate a certificate chain (see generate_certificate_chain),
  • locate the private key for the certificate (see find_certificate_key),
  • write the certificate chain and the private key to the proper Synapse paths (provided by the configuration file - homeserver.yaml),
  • see if the Synapse server is already running, and skip management if it is (see is_synapse_running),
  • manage the Synapse server by starting it if necessary (see start_synapse_server).

Dependencies:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment