Skip to content

Instantly share code, notes, and snippets.

@volcan01010
Last active December 21, 2021 10:20
Show Gist options
  • Save volcan01010/e41539d8f0eb5a8cff9278afab154b47 to your computer and use it in GitHub Desktop.
Save volcan01010/e41539d8f0eb5a8cff9278afab154b47 to your computer and use it in GitHub Desktop.
Python script to scan self-hosted Docker registry for the Log4Shell vulnerability (CVE-2021-4228) using Trivy and python-dxf

scan_registry.py

A Python script to scan all accessible containers in a self-hosted Docker registry for the Log4Shell vulnerability (CVE-2021-4228). It uses Trivy and python-dxf.

Dependencies

The script is compatible with Python 3.6+

# Upgrade pip and DXF
pip install --upgrade pip
pip install python-dxf

# Install Trivy (simple binary install for Linux)
curl -sfL https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/install.sh | sh -s -- -b /usr/local/bin v0.17.0

Environment setup

Requires environment variable for registry details:

export DXF_HOST=xxxx
export DXF_USERNAME=xxxx
export DXF_PASSWORD=xxxx

Also requires user to be in docker group and to be logged into the Docker registry in the terminal.

docker login -u "$DXF_USERNAME" -p "$DXF_PASSWORD" "$DXF_HOST"

Running

python scan_registry.py

The script will print log messages to the console (stdout) and write the final result as vulnerable_images.json in the current directory.

"""
Script to get full list of containers in local registry.
---
MIT License
Copyright (c) 2021 Dr John A Stevenson
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
from concurrent.futures import ThreadPoolExecutor
import datetime as dt
import json
import logging
import os
from pathlib import Path
from pprint import pformat
from tempfile import NamedTemporaryFile
import subprocess
import sys
from typing import Dict, List, Tuple, Iterator, Optional, Union
# Define types (should use TypedDict but that isn't in standard library on Python 3.6)
ScanResult = List[Dict[str, Union[str, List[Dict[str, str]]]]]
ScanResultList = List[Tuple[str, Optional[ScanResult]]]
VulnerableImages = Dict[str, List[str]]
DXF_HOST = os.environ['DXF_HOST']
DXF_USERNAME = os.environ['DXF_USERNAME']
BAD_CVES = {
'CVE-2021-44228', # RCE bug for Java's log4j logging library
'CVE-2021-45046', # Relates to CVE-2021-44228 log4j vulnerability
}
def scan_registry():
"""Scan all the containers on DXF_HOST registry for BAD_CVES."""
logging.info('Starting run at %s', dt.datetime.now())
# Use 'generator pipeline' pattern to get container scan results
repos = (repo for repo in list_repos())
images = [alias for repo in repos for alias in list_aliases(repo)]
results = scan_many(images)
logging.info('%s containers scanned', len(results))
# Extract list of containers with each vulnerability
vulnerable_images = list_vulnerable(results, BAD_CVES)
logging.info('Vulnerable images:\n%s', pformat(vulnerable_images))
logging.info('Completing run at %s', dt.datetime.now())
return vulnerable_images
def list_repos() -> Iterator[str]:
"""Use DXF to get a list of containers using connection details provided
in environment variables."""
# List repos
logging.info("Listing repos on %s (as %s)", DXF_HOST, DXF_USERNAME)
result = subprocess.run(['dxf', 'list-repos'], stdout=subprocess.PIPE,
check=True)
repos = [repo.decode('utf-8') for repo in result.stdout.split()]
for repo in repos:
logging.debug("Repo: %s", repo)
yield repo
def list_aliases(repo: str) -> Iterator[str]:
"""Use DXF to yield tags (aliases) for a container."""
try:
result = subprocess.run(
['dxf', 'list-aliases', repo], check=True,
stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
aliases = result.stdout.decode('utf-8').split()
except subprocess.CalledProcessError as exc:
aliases = []
if exc.returncode == 13:
logging.error("Unauthorized for %s", repo)
elif exc.returncode == 2:
logging.error("Not found for %s", repo)
else:
raise
for alias in aliases:
yield f'{repo}:{alias}'
def scan_many(images: List[str]) -> ScanResultList:
"""Scan all the images for a repo. Use Threads for concurrent
execution. Return a list of image: result dictionaries"""
with ThreadPoolExecutor(max_workers=8) as executor:
results = executor.map(scan_image, images, timeout=5)
return list(zip(images, results))
def scan_image(image: str) -> Optional[ScanResult]:
"""Run Trivy scan on remote image and return results as dictionary created
from the output JSON."""
logging.info('Scanning %s', image)
tempfile = NamedTemporaryFile(delete=False)
try:
# Trivy scan. --light uses smaller DB without descriptions
subprocess.run(
['trivy', 'image', '--light', '-f', 'json', '-o', tempfile.name,
f'{DXF_HOST}/{image}'],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True)
# Read JSON data from temporary file
with open(tempfile.name, encoding='utf-8') as results_file:
scan_result = json.load(results_file)
except subprocess.CalledProcessError as exc:
# Null result if scan failed
scan_result = None
logging.error("Failed to scan %s (returncode %s)\n%s",
image, exc.returncode, exc)
finally:
os.remove(tempfile.name)
return scan_result
def list_vulnerable(results: ScanResultList,
bad_cves: List[str]) -> VulnerableImages:
"""List the images vulnerable to each of the bad_cves."""
vulnerable_images: VulnerableImages = {'Scan failed': []}
for cve in bad_cves:
vulnerable_images[cve] = []
for alias, scan_result in results:
if scan_result:
if has_cve(cve, scan_result):
vulnerable_images[cve].append(alias)
else:
vulnerable_images['Scan failed'].append(alias)
return vulnerable_images
def has_cve(cve: str, scan_result: ScanResult) -> bool:
"""Check whether a scan result includes the CVE."""
# Extract CVEs from scan_result
scan_cves = []
for target in scan_result:
vulnerabilities = target.get('Vulnerabilities')
if vulnerabilities:
ids = [v['VulnerabilityID'] for v in vulnerabilities]
scan_cves.extend(ids)
return cve in scan_cves
if __name__ == "__main__":
# Set up logging
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
# Run script
vulnerable_images = scan_registry()
output_file = Path.cwd() / 'vulnerable_images.json'
output_file.write_text(json.dumps(vulnerable_images))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment