Skip to content

Instantly share code, notes, and snippets.

@sud0woodo
Last active January 21, 2024 12:27
Show Gist options
  • Save sud0woodo/36a3177de819621ed162b222c6f861e7 to your computer and use it in GitHub Desktop.
Save sud0woodo/36a3177de819621ed162b222c6f861e7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Simple script for extracting the hash values within the `portal.html` file of a FortiGate image file.
This script was made to automate the process when a directory containing multiple FortiGate images is given,
and was used during the investigation of CVE-2022-42475.
Example usage extracting the hashes of all 7.0.x images within a directory, writing the output to v7.0.x.json:
python bulkcheck.py --dir fortigate_images/V7/ -o v7.0.x.json
"""
from __future__ import annotations
import argparse
import gzip
import json
from pathlib import Path
import re
import shlex
import shutil
import subprocess
import zipfile
from typing import Iterable, Tuple, TYPE_CHECKING
# External dependencies
from dissect.target import Target
from dissect.target.exceptions import FileNotFoundError as DissectFileNotFoundError
from dissect.util import cpio
from tqdm import tqdm
if TYPE_CHECKING:
from dissect.util.cpio import CpioFile
__case__ = "DIVD-2022-00063"
__reference__ = (
"https://csirt.divd.nl//2022/12/14/DIVD-2022-00063-Fortinet-sslvpnd-update/"
)
__author__ = "Axel Boesenach"
__email__ = "axel@divd.nl"
__version__ = "1.0.0"
PORTAL_PATH = "migadmin/sslvpn/portal.html.gz"
HASH_PATTERN = re.compile("\/([a-f0-9]{32})\/")
class BulkCheck:
"""Gather the hashes that are present within the `portal.html` file of a FortiGate image.
Args:
image_dir: The directory containing the FortiGate images.
forti_dir: The temporary directory used to store the files during the process.
rootfspath: The location of the `rootfs.gz` file in the FortiGate image.
"""
def __init__(self, image_dir: str, forti_dir: str, rootfspath: str) -> None:
self.image_dir = image_dir
self.forti_dir = forti_dir
self.rootfspath = rootfspath
self.pkginfo_json = f"{self.forti_dir}/migadmin/ng/vpn/map/pkginfo.json"
self.image_zips = list(Path(self.image_dir).rglob("*.zip"))
self.failed = {}
self.versions = {}
def hashes(self) -> None:
"""Main function responsible for gathering the hashes within the images."""
total = len(self.image_zips)
print(f"* found {total} images")
for zfile, fortios_image in tqdm(
self._get_images(), total=total, unit="images"
):
if not fortios_image:
self.failed[zfile] = f"no fortios image found"
continue
Path(self.forti_dir).mkdir(parents=True, exist_ok=True)
# Extract the rootfs
rootfs = self._rootfs(target_path=fortios_image)
if not rootfs:
shutil.rmtree(self.forti_dir, ignore_errors=False, onerror=None)
self.failed[zfile] = f"extracting {self.rootfspath} failed"
continue
try:
rootfs.extractall(path=self.forti_dir)
except:
# Corrupt .gz
shutil.rmtree(self.forti_dir, ignore_errors=False, onerror=None)
self.failed[zfile] = "rootfs decompress failed"
continue
# Extract the migadmin.tar.xz
self._xz()
# Grab the version from the pkginfo.json file
version = self._version()
if version not in self.versions:
self.versions[version] = []
version_hash = self._version_hash()
if version_hash:
self.versions[version].append(version_hash)
else:
shutil.rmtree(self.forti_dir, ignore_errors=False, onerror=None)
self.failed[zfile] = f"extracting version hash failed"
# Remove the directory and the decompressed image
shutil.rmtree(self.forti_dir, ignore_errors=False, onerror=None)
def _get_images(self) -> Iterable[Tuple[str, str]]:
"""Retrieve FortiGate images from the zipfiles.
Yields:
A string object holding the location of the image.
"""
for zfile in self.image_zips:
with zipfile.ZipFile(zfile, "r") as image_zip:
fortios_images = [
f.filename
for f in image_zip.filelist
if f.filename.startswith("fortios")
]
for fortios_image in fortios_images:
if fortios_image:
image_zip.extract(fortios_image, path=self.forti_dir)
yield zfile.name, f"{self.forti_dir}/{fortios_image}"
else:
yield zfile.name, None
def _rootfs(self, target_path: str) -> CpioFile:
"""Extract the rootfs file within the fortios.vhd.
The rootfs file is GZIP-ped within the fortios.vhd file, we open the `rootfs.gz` file
directly using the CPIO implementation within dissect.
Args:
target_path: Path of the fortios disk image.
Returns:
A `CpioFile` object.
"""
t = Target.open(target_path)
path = t.fs.path(self.rootfspath)
try:
return cpio.open(fileobj=path.open())
except DissectFileNotFoundError:
return
def _xz(self) -> None:
"""Extract the migadmin `xz` archive using the FortiGate custom xz binary."""
# We only need the migadmin.tar.xz
for cmd_str in ["sbin/xz -d migadmin.tar.xz", "sbin/ftar -xf migadmin.tar"]:
cmd = shlex.split(f"chroot {self.forti_dir} {cmd_str}")
ps = subprocess.Popen(cmd, cwd=self.forti_dir)
ps.wait()
def _version(self) -> str:
"""Retrieve the version of the image from the `version.json` file.
Older versions do not seem to have the `version.json` in their image, try and retrieve the
`pkginfo.json` instead.
Returns:
A string object containing the concatenated major, minor and build version.
"""
with open(self.pkginfo_json, "rb") as json_fh:
version_json = json.load(json_fh)
major = version_json["ver"]
minor = version_json["mr"]
build = version_json["build"]
return f"{major}.{minor}.{build}"
def _version_hash(self) -> str:
"""Extract the hash from the portal.html file.
Returns:
The hash within the `portal.html` file.
"""
with gzip.open(f"{self.forti_dir}/{PORTAL_PATH}", "r") as gzip_fh:
portal_contents = gzip_fh.read().decode()
version = re.search(HASH_PATTERN, portal_contents).group(1)
return version
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-d", "--dir", required=True, help="Directory containing the targets"
)
parser.add_argument(
"-f",
"--fdir",
required=False,
default="/tmp/forti",
help="Temporary directory for the FortiGate files.",
)
parser.add_argument(
"-p",
"--rootfspath",
required=False,
default="/rootfs.gz",
help="Path containing the rootfs we want",
)
parser.add_argument("-o", "--output", required=True, help="Output versions to JSON")
args = parser.parse_args()
if Path(f"{args.fdir}").exists():
raise FileExistsError(
f"{args.fdir} already exists on the system, set different --fdir argument."
)
bulkcheck = BulkCheck(
image_dir=args.dir, forti_dir=args.fdir, rootfspath=args.rootfspath
)
bulkcheck.hashes()
success = sum([len(i) for i in bulkcheck.versions.values()])
failed = len(bulkcheck.failed)
print(f"* success: {success}\t|\tfailed: {failed}")
# Store the results in the JSON files
with open(args.output, "w") as result_fh, open("failed.json", "w") as failed_fh:
result_fh.write(json.dumps(bulkcheck.versions))
print(f"+ results written to: {args.output}")
if failed:
# Store the failed images in failed.json
failed_fh.write(json.dumps(bulkcheck.failed))
print("+ failed images written to failed.json")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment