-
-
Save sud0woodo/36a3177de819621ed162b222c6f861e7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
Simple script for extracting the hash values within the `portal.html` file of a FortiGate image file. | |
This script was made to automate the process when a directory containing multiple FortiGate images is given, | |
and was used during the investigation of CVE-2022-42475. | |
Example usage extracting the hashes of all 7.0.x images within a directory, writing the output to v7.0.x.json: | |
python bulkcheck.py --dir fortigate_images/V7/ -o v7.0.x.json | |
""" | |
from __future__ import annotations | |
import argparse | |
import gzip | |
import json | |
from pathlib import Path | |
import re | |
import shlex | |
import shutil | |
import subprocess | |
import zipfile | |
from typing import Iterable, Tuple, TYPE_CHECKING | |
# External dependencies | |
from dissect.target import Target | |
from dissect.target.exceptions import FileNotFoundError as DissectFileNotFoundError | |
from dissect.util import cpio | |
from tqdm import tqdm | |
if TYPE_CHECKING: | |
from dissect.util.cpio import CpioFile | |
__case__ = "DIVD-2022-00063" | |
__reference__ = ( | |
"https://csirt.divd.nl//2022/12/14/DIVD-2022-00063-Fortinet-sslvpnd-update/" | |
) | |
__author__ = "Axel Boesenach" | |
__email__ = "axel@divd.nl" | |
__version__ = "1.0.0" | |
PORTAL_PATH = "migadmin/sslvpn/portal.html.gz" | |
HASH_PATTERN = re.compile("\/([a-f0-9]{32})\/") | |
class BulkCheck: | |
"""Gather the hashes that are present within the `portal.html` file of a FortiGate image. | |
Args: | |
image_dir: The directory containing the FortiGate images. | |
forti_dir: The temporary directory used to store the files during the process. | |
rootfspath: The location of the `rootfs.gz` file in the FortiGate image. | |
""" | |
def __init__(self, image_dir: str, forti_dir: str, rootfspath: str) -> None: | |
self.image_dir = image_dir | |
self.forti_dir = forti_dir | |
self.rootfspath = rootfspath | |
self.pkginfo_json = f"{self.forti_dir}/migadmin/ng/vpn/map/pkginfo.json" | |
self.image_zips = list(Path(self.image_dir).rglob("*.zip")) | |
self.failed = {} | |
self.versions = {} | |
def hashes(self) -> None: | |
"""Main function responsible for gathering the hashes within the images.""" | |
total = len(self.image_zips) | |
print(f"* found {total} images") | |
for zfile, fortios_image in tqdm( | |
self._get_images(), total=total, unit="images" | |
): | |
if not fortios_image: | |
self.failed[zfile] = f"no fortios image found" | |
continue | |
Path(self.forti_dir).mkdir(parents=True, exist_ok=True) | |
# Extract the rootfs | |
rootfs = self._rootfs(target_path=fortios_image) | |
if not rootfs: | |
shutil.rmtree(self.forti_dir, ignore_errors=False, onerror=None) | |
self.failed[zfile] = f"extracting {self.rootfspath} failed" | |
continue | |
try: | |
rootfs.extractall(path=self.forti_dir) | |
except: | |
# Corrupt .gz | |
shutil.rmtree(self.forti_dir, ignore_errors=False, onerror=None) | |
self.failed[zfile] = "rootfs decompress failed" | |
continue | |
# Extract the migadmin.tar.xz | |
self._xz() | |
# Grab the version from the pkginfo.json file | |
version = self._version() | |
if version not in self.versions: | |
self.versions[version] = [] | |
version_hash = self._version_hash() | |
if version_hash: | |
self.versions[version].append(version_hash) | |
else: | |
shutil.rmtree(self.forti_dir, ignore_errors=False, onerror=None) | |
self.failed[zfile] = f"extracting version hash failed" | |
# Remove the directory and the decompressed image | |
shutil.rmtree(self.forti_dir, ignore_errors=False, onerror=None) | |
def _get_images(self) -> Iterable[Tuple[str, str]]: | |
"""Retrieve FortiGate images from the zipfiles. | |
Yields: | |
A string object holding the location of the image. | |
""" | |
for zfile in self.image_zips: | |
with zipfile.ZipFile(zfile, "r") as image_zip: | |
fortios_images = [ | |
f.filename | |
for f in image_zip.filelist | |
if f.filename.startswith("fortios") | |
] | |
for fortios_image in fortios_images: | |
if fortios_image: | |
image_zip.extract(fortios_image, path=self.forti_dir) | |
yield zfile.name, f"{self.forti_dir}/{fortios_image}" | |
else: | |
yield zfile.name, None | |
def _rootfs(self, target_path: str) -> CpioFile: | |
"""Extract the rootfs file within the fortios.vhd. | |
The rootfs file is GZIP-ped within the fortios.vhd file, we open the `rootfs.gz` file | |
directly using the CPIO implementation within dissect. | |
Args: | |
target_path: Path of the fortios disk image. | |
Returns: | |
A `CpioFile` object. | |
""" | |
t = Target.open(target_path) | |
path = t.fs.path(self.rootfspath) | |
try: | |
return cpio.open(fileobj=path.open()) | |
except DissectFileNotFoundError: | |
return | |
def _xz(self) -> None: | |
"""Extract the migadmin `xz` archive using the FortiGate custom xz binary.""" | |
# We only need the migadmin.tar.xz | |
for cmd_str in ["sbin/xz -d migadmin.tar.xz", "sbin/ftar -xf migadmin.tar"]: | |
cmd = shlex.split(f"chroot {self.forti_dir} {cmd_str}") | |
ps = subprocess.Popen(cmd, cwd=self.forti_dir) | |
ps.wait() | |
def _version(self) -> str: | |
"""Retrieve the version of the image from the `version.json` file. | |
Older versions do not seem to have the `version.json` in their image, try and retrieve the | |
`pkginfo.json` instead. | |
Returns: | |
A string object containing the concatenated major, minor and build version. | |
""" | |
with open(self.pkginfo_json, "rb") as json_fh: | |
version_json = json.load(json_fh) | |
major = version_json["ver"] | |
minor = version_json["mr"] | |
build = version_json["build"] | |
return f"{major}.{minor}.{build}" | |
def _version_hash(self) -> str: | |
"""Extract the hash from the portal.html file. | |
Returns: | |
The hash within the `portal.html` file. | |
""" | |
with gzip.open(f"{self.forti_dir}/{PORTAL_PATH}", "r") as gzip_fh: | |
portal_contents = gzip_fh.read().decode() | |
version = re.search(HASH_PATTERN, portal_contents).group(1) | |
return version | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"-d", "--dir", required=True, help="Directory containing the targets" | |
) | |
parser.add_argument( | |
"-f", | |
"--fdir", | |
required=False, | |
default="/tmp/forti", | |
help="Temporary directory for the FortiGate files.", | |
) | |
parser.add_argument( | |
"-p", | |
"--rootfspath", | |
required=False, | |
default="/rootfs.gz", | |
help="Path containing the rootfs we want", | |
) | |
parser.add_argument("-o", "--output", required=True, help="Output versions to JSON") | |
args = parser.parse_args() | |
if Path(f"{args.fdir}").exists(): | |
raise FileExistsError( | |
f"{args.fdir} already exists on the system, set different --fdir argument." | |
) | |
bulkcheck = BulkCheck( | |
image_dir=args.dir, forti_dir=args.fdir, rootfspath=args.rootfspath | |
) | |
bulkcheck.hashes() | |
success = sum([len(i) for i in bulkcheck.versions.values()]) | |
failed = len(bulkcheck.failed) | |
print(f"* success: {success}\t|\tfailed: {failed}") | |
# Store the results in the JSON files | |
with open(args.output, "w") as result_fh, open("failed.json", "w") as failed_fh: | |
result_fh.write(json.dumps(bulkcheck.versions)) | |
print(f"+ results written to: {args.output}") | |
if failed: | |
# Store the failed images in failed.json | |
failed_fh.write(json.dumps(bulkcheck.failed)) | |
print("+ failed images written to failed.json") | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment