Skip to content

Instantly share code, notes, and snippets.

@debuss
Last active September 8, 2022 14:13
Show Gist options
  • Save debuss/4d332dbdec2697ee76f0bc7689801857 to your computer and use it in GitHub Desktop.
Save debuss/4d332dbdec2697ee76f0bc7689801857 to your computer and use it in GitHub Desktop.
Pega Hotfix Validator script in Python3
# Pega Hotfix Validator script.
# @author Alexandre Debusschère <alexandre.debusschere@hey.com>
# @see https://docs.pega.com/keeping-current-pega/86/manually-verifying-hotfix-files-using-third-party-tools
from sys import exit, argv
from os import path, chdir
from zipfile import ZipFile
from operator import itemgetter
from shutil import rmtree
import json
import subprocess
class HotfixValidator:
EXIT_SUCCESS = 0
EXIT_FAILURE = 1
def __init__(self, argv):
self.argc = len(argv)
self.argv = argv
@staticmethod
def _out(messages):
if isinstance(messages, str):
messages = [messages]
for message in messages:
print(message)
def _success(self, messages):
if isinstance(messages, str):
messages = [messages]
self._out(map(lambda message: "\033[0;32m" + message + "\033[0m", messages))
def _info(self, messages):
if isinstance(messages, str):
messages = [messages]
self._out(map(lambda message: "\033[0;36m" + message + "\033[0m", messages))
def _error(self, messages):
if isinstance(messages, str):
messages = [messages]
self._out(map(lambda message: "\033[0;31m" + message + "\033[0m", messages))
exit(self.EXIT_FAILURE)
def _help(self):
self._out([
'Pega Hotfix Validator script', '',
'Usage: python3 hf-validator.py [file.zip]', '',
'Required argument:',
' file.zip',
' The hotfix zip file downloaded from Pega MSP'
])
def _extract_hotfix(self):
if not path.exists(self.argv[1]) or not self.argv[1].lower().endswith('.zip'):
self._error('Provided hotfix is not a file or not ZIP file...')
self.working_directory = './' + self.argv[1][0:-4]
ZipFile(self.argv[1]).extractall(self.working_directory)
self._out('ZIP file extracted to '+self.working_directory)
def _set_working_directory(self):
chdir(self.working_directory)
self._out('Changed working directory to '+self.working_directory)
def _extractJsonSigfile(self):
f = open('SIGFILE.JSON')
self._sig = json.load(f)
f.close()
self._out('Extracted and decoded SIGFILE.JSON')
def _decode_certificates_to_files(self):
pegasystems = self._sig['certificates'][0]
intermediate = self._sig['certificates'][1]
subprocess.call(f'echo {pegasystems} | base64 --decode > pegasystems.der', shell=True)
self._out('Decoded certificate pegasystems.der')
subprocess.call(f'echo {intermediate} | base64 --decode > intermediate.der', shell=True)
self._out('Decoded certificates intermediate.der')
def _translate_certificates_into_crt_format(self):
subprocess.call('openssl x509 -in pegasystems.der -inform der > pegasystems.crt', shell=True)
self._out('Translated pegasystems.der to pegasystems.crt')
subprocess.call('openssl x509 -in intermediate.der -inform der > intermediate.crt', shell=True)
self._out('Translated intermediate.der to intermediate.crt')
def _verify_subject_certificate(self):
output = subprocess.getoutput('openssl x509 -in pegasystems.crt -text -noout')
values = 'C = US, ST = Massachusetts, L = Cambridge, O = Pegasystems Inc., CN = Pegasystems Inc.'
if values not in output:
self._error('Unable to match the certificate subject with the required one')
self._out('Verified subject certificate')
def _verify_certificate_chain(self):
output = subprocess.getoutput('openssl verify -crl_download -crl_check -untrusted intermediate.crt pegasystems.crt')
if output != 'pegasystems.crt: OK':
self._error('Unable to verify certificate chain: '+output)
self._out('Verified certificate chain')
def _extract_public_key_from_pegasystems_certificate(self):
subprocess.call('openssl x509 -pubkey -noout -in pegasystems.der -inform der > pubkey.pub', shell=True)
self._out('Extracted public key from PegaSystems certificate')
def _check_signatures(self):
for element in self._sig['signatures']:
name, signature = itemgetter('path', 'signature')(element)
self._info('Checking file: '+name)
subprocess.call(f'echo {signature} | base64 --decode > signature.sig', shell=True)
self._out('Decoded signature')
result = subprocess.getoutput(f'openssl dgst -verify pubkey.pub -keyform PEM -sha256 -signature signature.sig {name}')
if result != 'Verified OK':
self._error('Unable to verify certificate')
self._out('Verified certificate')
def _delete_extract_folder(self):
chdir('..')
rmtree(self.working_directory)
self._out('Cleaned folder '+self.working_directory)
def run(self):
if self.argc < 2:
self._help()
self._error('Missing hotfix ZIP file...')
self._extract_hotfix()
self._set_working_directory()
self._extractJsonSigfile()
self._decode_certificates_to_files()
self._translate_certificates_into_crt_format()
self._verify_subject_certificate()
self._verify_certificate_chain()
self._extract_public_key_from_pegasystems_certificate()
self._check_signatures()
self._delete_extract_folder()
self._success(f'Hotfix {self.argv[1]} validated successfully !')
exit(self.EXIT_SUCCESS)
validator = HotfixValidator(argv)
validator.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment