Last active
January 25, 2022 18:57
-
-
Save evaisse/d1b85f2a9bf9d8a2ac77ec43404fed7f to your computer and use it in GitHub Desktop.
A log4j version scan over a given path.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
""" | |
sudo python3 log4jscan.py evaisse -d /System/Volumes/Data/Applications/Transporter.app/Contents/itms/share | |
Output stuff like this : | |
Script version: 2.1 (scans jar/war/ear/zip files) | |
Start scanning on "/System/Volumes/Data/Applications/Transporter.app/" | |
======================================================================== | |
md5: c8bd8b5c5aaaa07a3dcbf57de01c9266 | |
Source: META-INF/maven/org.apache.logging.log4j/log4j-core/pom.xml | |
JNDI-Class: JNDI Class Found | |
Path= /System/Volumes/Data/Applications/Transporter.app/Contents/itms/share/OSGi-Bundles/org.apache.logging.log4j.core-2.11.2.jar | |
log4j 1.2.17 | |
------------------------------------------------------------------------ | |
md5: 78b1ecd14d44aaae25481d5c473eda7f | |
Source: META-INF/maven/org.apache.tika/tika-parsers/pom.xml | |
JNDI-Class: JNDI Class Not Found | |
... | |
... | |
... | |
======================================================================== | |
Scan 2624 JAR/ZIP/WAR/EAR(s) files with 82 unique(s) checksum(s) in 1 second. | |
Found 3 differents versions of log4j in 6 files : {'2.11.2', '1.2.17', 'Unknown'} | |
""" | |
from email.mime import base | |
import os | |
import hashlib | |
from datetime import datetime | |
import re | |
import argparse | |
import zipfile | |
import timeit | |
desc = 'Script version: 2.1 (scans jar/war/ear/zip files)' | |
separator = '------------------------------------------------------------------------' | |
parser = argparse.ArgumentParser(description='Process JAR files to extract info about log4j versions.\n%s' % desc) | |
parser.add_argument('login',type=str, help='user login') | |
parser.add_argument('-d', '--base_dir',type=str, help='base scan directory', default="/") | |
scans = {} | |
suspicious_file = {} | |
def write_report(path, source, jdi, log4jv, md5): | |
print(' -> Log4j version %s' % log4jv) | |
if len(suspicious_file) != 0: report.write('%s\n' % separator) | |
# store for stats | |
suspicious_file[md5] = suspicious_file.get(md5, []) | |
suspicious_file[md5].append({ "path": path, "source": source, "jdi": jdi, "version": log4jv, "md5": md5 }) | |
report.write('md5: %s\n' % md5) | |
report.write('Source: %s\n' % source) | |
report.write('JNDI-Class: %s\n' % jdi) | |
report.write('Path= %s\n' % path) | |
report.write('log4j %s\n' % log4jv) | |
def relative_time(time_diff_secs): | |
# https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python | |
# Each tuple in the sequence gives the name of a unit, and the number of | |
# previous units which go into it. | |
weeks_per_month = 365.242 / 12 / 7 | |
intervals = [('minute', 60), ('hour', 60), ('day', 24), ('week', 7), | |
('month', weeks_per_month), ('year', 12)] | |
unit, number = 'second', abs(time_diff_secs) | |
for new_unit, ratio in intervals: | |
new_number = float(number) / ratio | |
# If the new number is too small, don't go to the next unit. | |
if new_number < 2: | |
break | |
unit, number = new_unit, new_number | |
shown_num = int(number) | |
return '{} {}'.format(shown_num, unit + ('' if shown_num == 1 else 's')) | |
def md5file(filepath): | |
try: | |
return hashlib.md5(open(filepath, 'rb').read()).hexdigest() | |
except: | |
return hashlib.md5(filepath).hexdigest() | |
def read_zip_file(jar_file, readfile=False): | |
try: | |
zf = zipfile.ZipFile(jar_file, 'r') | |
except: | |
print(' --> Unable to read zip file ') | |
return False | |
files = [] | |
try: | |
lst = zf.infolist() | |
for zi in lst: | |
if zi.is_dir(): | |
continue | |
files.append(zi.filename) | |
if readfile and readfile == zi.filename: | |
return zf.read(zi.filename) | |
finally: | |
zf.close() | |
if readfile: | |
return False | |
return files | |
def scan_jar_file(path): | |
md5 = md5file(path) | |
scans[md5] = scans.get(md5, []) | |
scans[md5].append(path) | |
path = os.path.realpath(path) | |
fs = read_zip_file(path) | |
if not fs: | |
return | |
manifest = False | |
jdi = "JNDI Class Not Found" | |
log4jv = 'Unknown' | |
for manifest_filepath in fs: | |
manifest_basename = os.path.basename(manifest_filepath) | |
if manifest_basename.lower() == "JndiLookup.class".lower(): | |
jdi = "JNDI Class Found" | |
if'pom.xml' in manifest_basename or 'log4j' in manifest_basename: | |
manifest = manifest_filepath | |
if not manifest: | |
return | |
# scan the content of the zip | |
content = read_zip_file(path, manifest) | |
if not content: | |
return | |
if not "<artifactId>log4j</artifactId>" in str(content): | |
return | |
match = re.search(r'.*<artifactId>log4j</artifactId>([^<]+)<version>(?P<version>[^<]+)<.*', str(content), re.DOTALL) | |
if match: log4jv = match.group('version') | |
if not re.search(r'\d+\.\d+\.\d+', log4jv) and 'log4j' in os.path.basename(path): | |
match = re.search(r'(?P<version>\d+\.\d+\.\d+)\.(jar|zip|war|ear)$', os.path.basename(path)) | |
if match: log4jv = match.group('version') | |
if log4jv != 'Unknown' and not re.search(r'^\d+\.\d+\.\d+$', log4jv): | |
print('Invalid version %s in file : %s' % (log4jv, path)) | |
log4jv = 'Unknown(%s)' % log4jv | |
write_report(path, manifest, jdi, log4jv, md5) | |
if __name__ == '__main__': | |
args = parser.parse_args() | |
report_filename = os.path.join(os.path.dirname(__file__), 'log4jscan.%s.txt' % args.login) | |
report = open(report_filename, 'w+') | |
report.write(desc+'\n') | |
report.write('Start scanning on "%s"\n' % args.base_dir) | |
report.write("%s\n" % separator.replace('-', '=')) | |
# make it readable for all | |
os.chmod(report_filename, 0o664) | |
start = timeit.default_timer() | |
for root, dir, files in os.walk(args.base_dir): | |
for f in files: | |
if f.endswith('.jar') or f.endswith('.zip') or f.endswith('.war') or f.endswith('.ear'): | |
fullpath = os.path.join(root, f) | |
print('SCAN: %s' % fullpath) | |
scan_jar_file(fullpath) | |
fcount = 0 | |
versions = set() | |
for f in suspicious_file.values(): | |
fcount += len(f) | |
for v in f: | |
versions.add(v['version']) | |
allscans = 0 | |
for n in scans: | |
allscans += len(n) | |
stop = timeit.default_timer() | |
summary = [ | |
"%s" % separator.replace('-', '='), | |
'Scan %d JAR/ZIP/WAR/EAR(s) files with %d unique(s) checksum(s) in %s.' % (allscans, len(scans), relative_time(stop - start)) | |
] | |
if len(versions): | |
summary.append('Found %d differents versions of log4j in %d files : %s' % (len(versions), len(suspicious_file), versions)) | |
else: | |
summary.append('Scan clear !') | |
# write & summarize report header | |
for line in summary: | |
print(line) | |
report.write(line+"\n") | |
print('Report file wrote in : %s' % report_filename) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment