Skip to content

Instantly share code, notes, and snippets.

@countercept
Last active August 22, 2023 16:02
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 11 You must be signed in to fork a gist
  • Save countercept/7765ba05ad00255bcf6a4a26d7647f6e to your computer and use it in GitHub Desktop.
Save countercept/7765ba05ad00255bcf6a4a26d7647f6e to your computer and use it in GitHub Desktop.
A research aid for tracing security relevant events in the CLR via ETW for detecting malicious assemblies.
import time
import etw
import etw.evntrace
import sys
import argparse
import threading
class RundownDotNetETW(etw.ETW):
def __init__(self, verbose, high_risk_only):
# set options
self.verbose = verbose
self.high_risk_only = high_risk_only
self.should_stop_capture = False
# define capture provider info
providers = [
etw.ProviderInfo(
'DotNet Runtime Rundown',
etw.GUID("{A669021C-C450-4609-A035-5AF59AF4DF18}"),
level=etw.evntrace.TRACE_LEVEL_INFORMATION,
any_keywords=[
"LoaderRundownKeyword",
"StartRundownKeyword",
]
),
]
super().__init__(providers=providers, event_callback=self.print_event)
def start(self):
# do pre-capture setup
self.do_capture_setup()
super().start()
def stop(self):
super().stop()
# do post-capture teardown
self.do_capture_teardown()
def do_capture_setup(self):
# do whatever setup for capture here
pass
def do_capture_teardown(self):
# do whatever for capture teardown here
pass
def print_event(self, event):
# Common info
proc_id = event[1]["EventHeader"]["ProcessId"]
event_id = event[0]
to_print = [str(proc_id)]
should_print = False if self.high_risk_only else True
# Event specific
if event_id == 155: # AssemblyDCStart_V1
to_print.append("AssemblyDCStart_V1")
to_print.append(event[1]["AssemblyID"])
to_print.append(event[1]["AssemblyFlags"])
to_print.append(event[1]["FullyQualifiedAssemblyName"])
elif event_id == 153: # ModuleDCStart_V2
to_print.append("ModuleDCStart_V2")
to_print.append(event[1]["AssemblyID"])
to_print.append(event[1]["ModuleFlags"])
to_print.append(event[1]["ModuleILPath"])
to_print.append(event[1]["ModuleNativePath"])
to_print.append(event[1]["ManagedPdbBuildPath"])
il_path = event[1]["ModuleILPath"].lower()
module_flags = event[1]["ModuleFlags"]
# Look for byte stream loaded assemblies
if not il_path.endswith(".exe") \
and not il_path.endswith(".dll") \
and "Dynamic" not in module_flags:
should_print = True
elif event_id == 151: # DomainModuleDCStart_V1
to_print.append("DomainModuleDCStart_V1")
to_print.append(event[1]["AssemblyID"])
to_print.append(event[1]["ModuleFlags"])
to_print.append(event[1]["ModuleILPath"])
to_print.append(event[1]["ModuleNativePath"])
il_path = event[1]["ModuleILPath"].lower()
module_flags = event[1]["ModuleFlags"]
# Look for byte stream loaded assemblies
if not il_path.endswith(".exe") \
and not il_path.endswith(".dll") \
and "Dynamic" not in module_flags:
should_print = True
# Interesting in non powershell.exe processes as shows use of powershell dynamically
elif "system.management.automation" in il_path.lower():
should_print = True
elif event_id == 145: # DCStartComplete_V1
self.should_stop_capture = True
if not self.verbose:
return
elif not self.verbose:
return
# Print information
if self.verbose and should_print:
print(event)
elif should_print:
for n in range(0, len(to_print)):
to_print[n] = str(to_print[n])
print(", ".join(to_print))
sys.stdout.flush()
class RuntimeDotNetETW(etw.ETW):
high_risk_method_names = [
"VirtualAlloc",
"VirtualAllocEx",
"CreateThread",
"CreateRemoteThread",
"WriteProcessMemory",
"FromBase64String",
"DownloadFile",
"RunPS",
"SetThreadContext",
"MiniDumpWriteDump",
"LoadLibrary",
"GetProcAddress",
"WaitForSingleObject"
]
high_risk_namespaces = [
"System.IO.MemoryStream"
]
def __init__(self, verbose, high_risk_only, method_tracing):
# set options
self.verbose = verbose
self.high_risk_only = high_risk_only
self.method_tracing = method_tracing
keywords = [
"LoaderKeyword"
]
if self.method_tracing:
keywords.extend(
[
"JitKeyword",
"JitTracingKeyword",
"InteropKeyword"
]
)
print(keywords)
# define capture provider info
providers = [
etw.ProviderInfo(
'DotNet Runtime',
etw.GUID("{E13C0D23-CCBC-4E12-931B-D9CC2EEE27E4}"),
level=etw.evntrace.TRACE_LEVEL_VERBOSE if self.method_tracing else etw.evntrace.TRACE_LEVEL_INFORMATION,
any_keywords=keywords
),
]
super().__init__(providers=providers, event_callback=self.print_event)
def start(self):
# do pre-capture setup
self.do_capture_setup()
super().start()
def stop(self):
super().stop()
# do post-capture teardown
self.do_capture_teardown()
def do_capture_setup(self):
# do whatever setup for capture here
pass
def do_capture_teardown(self):
# do whatever for capture teardown here
pass
def print_event(self, event):
# Common info
proc_id = event[1]["EventHeader"]["ProcessId"]
event_id = event[0]
to_print = [str(proc_id)]
should_print = False if self.high_risk_only else True
# Event specific
if event_id == 154: # AssemblyLoad_V1
to_print.append("AssemblyLoad_V1")
to_print.append(event[1]["AssemblyID"])
to_print.append(event[1]["AssemblyFlags"])
to_print.append(event[1]["FullyQualifiedAssemblyName"])
elif event_id == 152: # ModuleLoad_V2
to_print.append("ModuleLoad_V2")
to_print.append(event[1]["AssemblyID"])
to_print.append(event[1]["ModuleFlags"])
to_print.append(event[1]["ModuleILPath"])
to_print.append(event[1]["ModuleNativePath"])
to_print.append(event[1]["ManagedPdbBuildPath"])
il_path = event[1]["ModuleILPath"].lower()
module_flags = event[1]["ModuleFlags"]
# Look for byte stream loaded assemblies
if not il_path.endswith(".exe") \
and not il_path.endswith(".dll") \
and "Dynamic" not in module_flags:
should_print = True
# Interesting in non powershell.exe processes as shows use of powershell dynamically
elif "system.management.automation" in il_path.lower():
should_print = True
elif event_id == 151: # DomainModuleLoad_V1
to_print.append("DomainModuleLoad_V1")
to_print.append(event[1]["AssemblyID"])
to_print.append(event[1]["ModuleFlags"])
to_print.append(event[1]["ModuleILPath"])
to_print.append(event[1]["ModuleNativePath"])
il_path = event[1]["ModuleILPath"].lower()
module_flags = event[1]["ModuleFlags"]
# Look for byte stream loaded assemblies
if not il_path.endswith(".exe") \
and not il_path.endswith(".dll") \
and "Dynamic" not in module_flags:
should_print = True
elif event_id == 145: # MethodJittingStarted
to_print.append("MethodJittingStarted")
to_print.append(event[1]["ModuleID"])
to_print.append(event[1]["MethodNamespace"])
to_print.append(event[1]["MethodName"])
if event[1]["MethodName"] in self.high_risk_method_names \
or event[1]["MethodNamespace"] in self.high_risk_namespaces:
should_print = True
elif event_id == 185 or event_id == 186: # MethodJitInliningSucceeded and MethodJitInliningFailed
if event_id == 185:
to_print.append("MethodJitInliningSucceeded")
else:
to_print.append("MethodJitInliningFailed")
to_print.append(event[1]["MethodBeingCompiledNamespace"])
to_print.append(event[1]["MethodBeingCompiledName"])
to_print.append(event[1]["InlinerNamespace"])
to_print.append(event[1]["InlinerName"])
to_print.append(event[1]["InlineeNamespace"])
to_print.append(event[1]["InlineeName"])
if event[1]["InlineeName"] in self.high_risk_method_names \
or event[1]["InlineeNamespace"] in self.high_risk_namespaces:
should_print = True
elif event_id == 88: # ILStubGenerated
to_print.append("ILStubGenerated")
to_print.append(event[1]["StubFlags"])
to_print.append(event[1]["ManagedInteropMethodNamespace"])
to_print.append(event[1]["ManagedInteropMethodName"])
if event[1]["ManagedInteropMethodName"] in self.high_risk_method_names:
should_print = True
elif not self.verbose:
return
# Print information
if self.verbose and should_print:
print(event)
elif should_print:
for n in range(0, len(to_print)):
to_print[n] = str(to_print[n])
print(", ".join(to_print))
sys.stdout.flush()
def rundown_capture(args):
# instantiate class
capture = RundownDotNetETW(verbose=args.verbose, high_risk_only=args.high_risk_only)
# start capture
capture.start()
# check capture status
while not capture.should_stop_capture:
time.sleep(0.1)
# stop capture
capture.stop()
def runtime_capture(args):
# instantiate class
capture = RuntimeDotNetETW(
verbose=args.verbose,
high_risk_only=args.high_risk_only,
method_tracing=args.enable_method_tracing
)
# start capture
capture.start()
# wait for return
input()
# stop capture
capture.stop()
parser = argparse.ArgumentParser()
parser.add_argument(
"--verbose",
help="Displays full details for all event types, as opposed to limited details for specific events",
action="store_true"
)
parser.add_argument(
"--high-risk-only",
help="Only show high risk events (byte stream loaded assemblies, high risk method calls etc)",
action="store_true"
)
parser.add_argument(
"--disable-rundown-provider",
help="Do not capture current system state - only show events post-startup",
action="store_true"
)
parser.add_argument(
"--enable-method-tracing",
help="Enables relevant JIT and Interop events for providing method call visibility",
action="store_true"
)
args = parser.parse_args()
if not args.disable_rundown_provider:
threading.Thread(target=rundown_capture, args=(args,)).start()
runtime_capture(args)
@kujo2019
Copy link

On a Win10 x64 box, I am running this python script from the article https://www.countercept.com/blog/detecting-malicious-use-of-net-part-1/ (https://gist.github.com/countercept/7765ba05ad00255bcf6a4a26d7647f6e). I am running it with the --high-risk-only flag. It gets a lot of "Failed to get data field for AssemblyFlags, incrementing by reported size" error messages.

What would cause this? Is this a bug? How can I fix it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment