Skip to content

Instantly share code, notes, and snippets.

@gfoss
Last active May 24, 2022 14:40
Show Gist options
  • Save gfoss/8864d51f3d6bf9ea7b0e46a273b0bea8 to your computer and use it in GitHub Desktop.
Save gfoss/8864d51f3d6bf9ea7b0e46a273b0bea8 to your computer and use it in GitHub Desktop.
Correlate Parent and Child Process Events via the Lacework Query Language (LQL)
#!/usr/bin/env python3
# Happy Fam
# LQL-Driven Parent and Child Process Analyzer
# Lacework Labs
# v0.1 - May 2022
# greg.foss@lacework.net
'''
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import pandas as pd
from tabulate import tabulate
from subprocess import call
import os,sys,time,datetime,argparse,requests,json,yaml,csv,toml
class bcolors:
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
BLUEBG = '\033[44m'
REDBG = '\033[41m'
ENDC = '\033[0m'
banner = f'''{bcolors.BOLD}{bcolors.CYAN}
O ~O {bcolors.ENDC}{bcolors.BOLD}Happy Fam{bcolors.CYAN}
<|\\ /|\\
| ~o/ | \\o ~o/ _o
|\\ /| |\\ |\\ /| |\\
/ | / \\ |// > / \\ / >
{bcolors.BOLD}{bcolors.RED}Lacework Labs{bcolors.ENDC}
'''
def parse_the_things():
parser = argparse.ArgumentParser(description = 'Correlate Parent and Child Process Events via the Lacework Query Language (LQL)')
parser.add_argument('-e', '--env', help = 'Lacework environment (will be set to "default" if not specified)', default = 'default', action = 'store', dest = 'env')
parser.add_argument('-p', '--proc', help = 'Search for childs spawned from a given process', action = 'store', dest = 'proc')
parser.add_argument('-t', '--time', help ='Hunt timeframe in days (default 7-days)', action = 'store', dest = 'days')
parser.add_argument('-o', '--out', help = 'Export the results in CSV format', action = 'store', dest = 'filename')
'''
usage: happy-fam.py [-h] [-e ENV] [-p PROC] [-t DAYS] [-o FILENAME]
Correlate Parent and Child Process Events via the Lacework Query Language (LQL)
optional arguments:
-h, --help show this help message and exit
-e ENV, --env ENV Lacework environment (will be set to "default" if not specified)
-p PROC, --proc PROC Search for childs spawned from a given process
-t DAYS, --time DAYS Hunt timeframe in days (default 7-days)
-o FILENAME, --out FILENAME
Export the results in CSV format or JSON if -j argument is passed
'''
return parser
def configuration(env):
global lw_account
global sub_account
global authorization_token
config_file = os.path.expanduser("~") + "/.lacework.toml"
if os.path.isfile(config_file):
toml_data = toml.load(config_file)
lw_account = toml_data.get(env).get('account')
keyId = toml_data.get(env).get('api_key')
secret = toml_data.get(env).get('api_secret')
api_version = toml_data.get(env).get('version')
try:
sub_account = toml_data.get(env).get('subaccount')
except:
sub_account = False
# Temporary Access Token Generation
token_url = f"https://{lw_account}.lacework.net/api/v2/access/tokens"
token_payload = json.dumps({
"keyId": keyId,
"expiryTime": 3600
})
if sub_account:
token_headers = {
'X-LW-UAKS': secret,
'Content-Type': 'application/json',
'User-Agent': 'Lacework-Labs_Cloud-Hunter_v1',
'Account-Name': f'{sub_account}'
}
else:
token_headers = {
'X-LW-UAKS': secret,
'Content-Type': 'application/json',
'User-Agent': 'Lacework-Labs_Cloud-Hunter_v1'
}
token_response = requests.request("POST", token_url, headers=token_headers, data=token_payload)
json_data = json.loads(token_response.text)
authorization_token = json_data['token']
else:
print(f"{bcolors.BOLD}{bcolors.CYAN} {banner} {bcolors.ENDC}")
print(f"[!] {bcolors.RED}{bcolors.UNDERLINE}ERROR{bcolors.ENDC}{bcolors.RED}: Missing ~/.lacework.toml configuration file{bcolors.ENDC}")
print()
print(f"{bcolors.RED}Please install and configure the Lacework CLI before proceeding...{bcolors.ENDC}")
print()
print("This can be installed with the following bash command:")
print(f"{bcolors.BLUE}$ curl https://raw.githubusercontent.com/lacework/go-sdk/main/cli/install.sh | bash{bcolors.ENDC}")
quit()
def validate_query(queryValidation):
validation_url = "https://{}.lacework.net/api/v2/Queries/validate".format(lw_account)
payload = json.dumps({
"queryText": "{}".format(queryValidation),
})
headers = {
'Authorization': authorization_token,
'Content-Type': 'application/json'
}
try:
response = requests.request("POST", validation_url, headers=headers, data=payload)
except requests.exceptions.RequestException as e:
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}Query Validation Error{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print("{}".format(e))
if "data" in response.text:
pass
else:
print()
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}Query Validation Error{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print()
print(f"[POST] {response.url}")
print(f"[{response.status_code} - {response.reason}]")
print()
print(response.text)
print()
quit()
def execute(process_name, query, query_stage):
global MID
global parent_process_path
# Check if the query is valid
validate_query(query)
# Obtain and format the Current Date and Time
current_date = datetime.datetime.now().strftime("%Y-%m-%d")
current_time = datetime.datetime.now().strftime("%H:%M:%S")
date_now = current_date + "T" + current_time + ".000Z"
# Back to the Future
search_window = datetime.datetime.now() - datetime.timedelta(int(time_in_days))
search_window_format = search_window.strftime("%Y-%m-%d")
search_range = search_window_format + "T" + current_time + ".000Z"
# Request
execute_custom_url = "https://{}.lacework.net/api/v2/Queries/execute".format(lw_account)
payload = json.dumps({
"query": {
"queryText": "{}".format(query)
},
"arguments": [
{
"name": "StartTimeRange",
"value": "{}".format(search_range)
},
{
"name": "EndTimeRange",
"value": "{}".format(date_now)
}
]
})
headers = {
'Authorization': authorization_token,
'Content-Type': 'application/json'
}
response = requests.request("POST", execute_custom_url, headers=headers, data=payload)
json_data = json.loads(response.text)
try:
event_df = pd.DataFrame.from_dict(json_data['data'], orient='columns')
except:
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}ERROR{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print()
print(f"[POST] {response.url}")
print(f"[{response.status_code} - {response.reason}]")
print()
print(response.text)
print()
quit()
try:
event_count = event_df.shape[0]
event_row_count = event_df.shape[1]
except:
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}ERROR{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print()
print(f"[POST] {response.url}")
print(f"[{response.status_code} - {response.reason}]")
print()
print(response.text)
print()
quit()
# Parent Process Hunt
if query_stage == 'parent_process':
machine_id_array = event_df.MID.unique()
machine_id_count = event_df.MID.nunique(dropna = True)
a = 0
while a < machine_id_count:
new_MID_array = event_df[event_df['MID'] == event_df['MID'][a]]
MID = new_MID_array.MID[a]
process_path_array = new_MID_array.EXE_PATH.unique()
process_path_count = new_MID_array.EXE_PATH.nunique(dropna = True)
a += 1
for b in range(process_path_count):
parent_process_path = process_path_array[b]
new_proc_array = new_MID_array[new_MID_array['EXE_PATH'] == parent_process_path]
PID_hash_array = new_proc_array.PID_HASH.unique()
PID_hash_count = new_proc_array.PID_HASH.nunique(dropna = True)
if PID_hash_count >= 1:
pid_hashes = ", ".join(str(x) for x in PID_hash_array)
else:
pid_hashes = str(PID_hash_array)
# Hunt
child_process(MID, pid_hashes, process_name)
# Child Process Hunt
elif query_stage == 'child_process':
if filename:
event_df.to_csv(filename, index=False)
if event_count == 0:
print(f'{bcolors.GREEN}0{bcolors.ENDC} Events | Machine: {bcolors.CYAN}{MID}{bcolors.ENDC} | Parent Process: {bcolors.CYAN}{parent_process_path}{bcolors.ENDC}')
elif event_count == 1:
print(f'{bcolors.BOLD}{bcolors.RED}1{bcolors.ENDC} {bcolors.BOLD}Event | Machine: {bcolors.RED}{MID}{bcolors.ENDC}{bcolors.BOLD} | Parent Process: {bcolors.RED}{parent_process_path}{bcolors.ENDC}')
print(f'Spawned Child Process:')
event_table = []
for col in event_df:
event_value = event_df[col]
event_table += [[col, event_value.to_string(index=False)]]
print(tabulate(event_table))
elif event_count >= 2:
if event_count >= 100:
print(f'{bcolors.BOLD}{bcolors.RED}{event_count}{bcolors.ENDC} {bcolors.BOLD}Events | Machine: {bcolors.RED}{MID}{bcolors.ENDC}{bcolors.BOLD} | Parent Process: {bcolors.RED}{parent_process_path}{bcolors.ENDC}')
elif event_count >= 10:
print(f'{bcolors.BOLD}{bcolors.RED}{event_count}{bcolors.ENDC} {bcolors.BOLD}Events | Machine: {bcolors.RED}{MID}{bcolors.ENDC}{bcolors.BOLD} | Parent Process: {bcolors.RED}{parent_process_path}{bcolors.ENDC}')
else:
print(f'{bcolors.BOLD}{bcolors.RED}{event_count}{bcolors.ENDC} {bcolors.BOLD}Events | Machine: {bcolors.RED}{MID}{bcolors.ENDC}{bcolors.BOLD} | Parent Process: {bcolors.RED}{parent_process_path}{bcolors.ENDC}')
print(f'Summarized Child Process Details:')
unique_host = event_df.HOSTNAME.unique()
unique_os = event_df.OS.unique()
unique_user = event_df.USERNAME.unique()
unique_procs = event_df.EXE_PATH.unique()
unique_cli = event_df.CMDLINE.unique()
unique_table = []
unique_table += [['Hostname:', f'{unique_host}']]
unique_table += [['OS:', f'{unique_os}']]
unique_table += [['User:', f'{unique_user}']]
unique_table += [['Process:', f'{unique_procs}']]
#unique_table += [['CLI:', f'{unique_cli}']]
print(tabulate(unique_table))
if filename:
print(f"{bcolors.BOLD}{bcolors.GREEN}{{}}{bcolors.ENDC}{bcolors.BOLD} Events written to [{bcolors.CYAN}{{}}{bcolors.ENDC}{bcolors.BOLD}]{bcolors.ENDC}".format(event_count,filename))
print()
def child_process(machine_id, pid_hashes, process_name):
query_stage = 'child_process'
process_length = len(process_name)
query_args = f"process.MID = {machine_id} AND process.PPID_HASH IN ({pid_hashes}) AND NOT (RIGHT(process.EXE_PATH, {process_length}) = '{process_name}')"
c_process_query = """LW_Child_Process_Hunt {
source {
LW_HE_PROCESSES process with (
LW_HE_MACHINES host
)
}
filter {
%s
}
return {
process.MID,
process.CMDLINE,
process.EXE_PATH,
process.PID,
process.PID_HASH,
process.PPID,
process.PPID_HASH,
process.SID,
process.OS,
process.PROCESS_START_TIME,
process.USERNAME,
host.HOSTNAME
}
}""" % query_args
execute(process_name, c_process_query, query_stage)
def parent_process(process_name):
print()
query_stage = 'parent_process'
process_length = len(process_name)
query_args = f"(RIGHT(process.EXE_PATH, {process_length}) = '{process_name}')"
p_process_query = """LW_Parent_Process_Hunt {
source {
LW_HE_PROCESSES process with (
LW_HE_MACHINES host
)
}
filter {
%s
}
return {
process.MID,
process.CMDLINE,
process.EXE_PATH,
process.PID,
process.PID_HASH,
process.PPID,
process.PPID_HASH,
process.SID,
process.OS,
process.PROCESS_START_TIME,
process.USERNAME,
host.HOSTNAME
}
}""" % query_args
execute(process_name, p_process_query, query_stage)
def main():
# Argument Parsing
parser = parse_the_things()
args = parser.parse_args()
# cloud-hunter script
global script_name
script_name = os.path.basename(__file__)
# Global timeframe
global time_in_days
if args.days:
time_in_days = args.days
else:
time_in_days = 7
# Global File Writer
if args.filename:
global filename
filename = args.filename
else:
filename = ''
if args.proc:
# Authentication
configuration(args.env)
parent_process(args.proc)
else:
print(f'{banner}')
print(parser.format_help())
quit()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment