Skip to content

Instantly share code, notes, and snippets.

@evaisse
Last active January 25, 2022 18:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save evaisse/d1b85f2a9bf9d8a2ac77ec43404fed7f to your computer and use it in GitHub Desktop.
Save evaisse/d1b85f2a9bf9d8a2ac77ec43404fed7f to your computer and use it in GitHub Desktop.
A log4j version scan over a given path.
#!/usr/bin/python
"""
sudo python3 log4jscan.py evaisse -d /System/Volumes/Data/Applications/Transporter.app/Contents/itms/share
Output stuff like this :
Script version: 2.1 (scans jar/war/ear/zip files)
Start scanning on "/System/Volumes/Data/Applications/Transporter.app/"
========================================================================
md5: c8bd8b5c5aaaa07a3dcbf57de01c9266
Source: META-INF/maven/org.apache.logging.log4j/log4j-core/pom.xml
JNDI-Class: JNDI Class Found
Path= /System/Volumes/Data/Applications/Transporter.app/Contents/itms/share/OSGi-Bundles/org.apache.logging.log4j.core-2.11.2.jar
log4j 1.2.17
------------------------------------------------------------------------
md5: 78b1ecd14d44aaae25481d5c473eda7f
Source: META-INF/maven/org.apache.tika/tika-parsers/pom.xml
JNDI-Class: JNDI Class Not Found
...
...
...
========================================================================
Scan 2624 JAR/ZIP/WAR/EAR(s) files with 82 unique(s) checksum(s) in 1 second.
Found 3 differents versions of log4j in 6 files : {'2.11.2', '1.2.17', 'Unknown'}
"""
from email.mime import base
import os
import hashlib
from datetime import datetime
import re
import argparse
import zipfile
import timeit
desc = 'Script version: 2.1 (scans jar/war/ear/zip files)'
separator = '------------------------------------------------------------------------'
parser = argparse.ArgumentParser(description='Process JAR files to extract info about log4j versions.\n%s' % desc)
parser.add_argument('login',type=str, help='user login')
parser.add_argument('-d', '--base_dir',type=str, help='base scan directory', default="/")
scans = {}
suspicious_file = {}
def write_report(path, source, jdi, log4jv, md5):
print(' -> Log4j version %s' % log4jv)
if len(suspicious_file) != 0: report.write('%s\n' % separator)
# store for stats
suspicious_file[md5] = suspicious_file.get(md5, [])
suspicious_file[md5].append({ "path": path, "source": source, "jdi": jdi, "version": log4jv, "md5": md5 })
report.write('md5: %s\n' % md5)
report.write('Source: %s\n' % source)
report.write('JNDI-Class: %s\n' % jdi)
report.write('Path= %s\n' % path)
report.write('log4j %s\n' % log4jv)
def relative_time(time_diff_secs):
# https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
# Each tuple in the sequence gives the name of a unit, and the number of
# previous units which go into it.
weeks_per_month = 365.242 / 12 / 7
intervals = [('minute', 60), ('hour', 60), ('day', 24), ('week', 7),
('month', weeks_per_month), ('year', 12)]
unit, number = 'second', abs(time_diff_secs)
for new_unit, ratio in intervals:
new_number = float(number) / ratio
# If the new number is too small, don't go to the next unit.
if new_number < 2:
break
unit, number = new_unit, new_number
shown_num = int(number)
return '{} {}'.format(shown_num, unit + ('' if shown_num == 1 else 's'))
def md5file(filepath):
try:
return hashlib.md5(open(filepath, 'rb').read()).hexdigest()
except:
return hashlib.md5(filepath).hexdigest()
def read_zip_file(jar_file, readfile=False):
try:
zf = zipfile.ZipFile(jar_file, 'r')
except:
print(' --> Unable to read zip file ')
return False
files = []
try:
lst = zf.infolist()
for zi in lst:
if zi.is_dir():
continue
files.append(zi.filename)
if readfile and readfile == zi.filename:
return zf.read(zi.filename)
finally:
zf.close()
if readfile:
return False
return files
def scan_jar_file(path):
md5 = md5file(path)
scans[md5] = scans.get(md5, [])
scans[md5].append(path)
path = os.path.realpath(path)
fs = read_zip_file(path)
if not fs:
return
manifest = False
jdi = "JNDI Class Not Found"
log4jv = 'Unknown'
for manifest_filepath in fs:
manifest_basename = os.path.basename(manifest_filepath)
if manifest_basename.lower() == "JndiLookup.class".lower():
jdi = "JNDI Class Found"
if'pom.xml' in manifest_basename or 'log4j' in manifest_basename:
manifest = manifest_filepath
if not manifest:
return
# scan the content of the zip
content = read_zip_file(path, manifest)
if not content:
return
if not "<artifactId>log4j</artifactId>" in str(content):
return
match = re.search(r'.*<artifactId>log4j</artifactId>([^<]+)<version>(?P<version>[^<]+)<.*', str(content), re.DOTALL)
if match: log4jv = match.group('version')
if not re.search(r'\d+\.\d+\.\d+', log4jv) and 'log4j' in os.path.basename(path):
match = re.search(r'(?P<version>\d+\.\d+\.\d+)\.(jar|zip|war|ear)$', os.path.basename(path))
if match: log4jv = match.group('version')
if log4jv != 'Unknown' and not re.search(r'^\d+\.\d+\.\d+$', log4jv):
print('Invalid version %s in file : %s' % (log4jv, path))
log4jv = 'Unknown(%s)' % log4jv
write_report(path, manifest, jdi, log4jv, md5)
if __name__ == '__main__':
args = parser.parse_args()
report_filename = os.path.join(os.path.dirname(__file__), 'log4jscan.%s.txt' % args.login)
report = open(report_filename, 'w+')
report.write(desc+'\n')
report.write('Start scanning on "%s"\n' % args.base_dir)
report.write("%s\n" % separator.replace('-', '='))
# make it readable for all
os.chmod(report_filename, 0o664)
start = timeit.default_timer()
for root, dir, files in os.walk(args.base_dir):
for f in files:
if f.endswith('.jar') or f.endswith('.zip') or f.endswith('.war') or f.endswith('.ear'):
fullpath = os.path.join(root, f)
print('SCAN: %s' % fullpath)
scan_jar_file(fullpath)
fcount = 0
versions = set()
for f in suspicious_file.values():
fcount += len(f)
for v in f:
versions.add(v['version'])
allscans = 0
for n in scans:
allscans += len(n)
stop = timeit.default_timer()
summary = [
"%s" % separator.replace('-', '='),
'Scan %d JAR/ZIP/WAR/EAR(s) files with %d unique(s) checksum(s) in %s.' % (allscans, len(scans), relative_time(stop - start))
]
if len(versions):
summary.append('Found %d differents versions of log4j in %d files : %s' % (len(versions), len(suspicious_file), versions))
else:
summary.append('Scan clear !')
# write & summarize report header
for line in summary:
print(line)
report.write(line+"\n")
print('Report file wrote in : %s' % report_filename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment