Skip to content

Instantly share code, notes, and snippets.

@gfoss
Created February 14, 2022 22:00
Show Gist options
  • Save gfoss/593987a0514a6e06d7efa05724d028de to your computer and use it in GitHub Desktop.
Save gfoss/593987a0514a6e06d7efa05724d028de to your computer and use it in GitHub Desktop.
Execute LQL queries on the Lacework Platform
#!/usr/bin/env python3
# LQL Runner
# Lacework Labs
# v0.1 - February 2022
# greg.foss@lacework.net
#####################################################################################
# usage: lql-runner.py [-h] [-e LW_ENV] [-q QUERY] [-t DAYS] [-c] [-j] [-o FILENAME]
#
# Execute LQL queries on the Lacework Platform
#
# optional arguments:
# -h, --help show this help message and exit
# -e LW_ENV Lacework environment
# -q QUERY Hunt by executing a raw LQL query
# -t DAYS Hunt timeframe in days (default 7-days)
# -c, --count Hunt and only count the hits, do not print the details to the screen
# -j, --JSON View the results as raw JSON
# -o FILENAME Export the results in CSV format or JSON if -j argument is passed
#####################################################################################
'''
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import pandas as pd
from tabulate import tabulate
from subprocess import call
import os,sys,time,datetime,argparse,requests,json,yaml,csv,toml
class bcolors:
BLUE = '\033[94m'
CYAN = '\033[96m'
GREEN = '\033[92m'
RED = '\033[91m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
BLUEBG = '\033[44m'
REDBG = '\033[41m'
ENDC = '\033[0m'
banner = f'''{bcolors.CYAN}
| _ \\ | _ \\
| ( | | / | | \\ \\ -_) _|
____| \\__\\_\\ ____| _|_\\ \\_,_| _| _| _| _| \\___| _| {bcolors.RED}
Lacework Labs {bcolors.ENDC}
'''
def parse_the_things():
parser = argparse.ArgumentParser(description = 'Execute LQL queries on the Lacework Platform')
parser.add_argument('-e', help = 'Lacework environment', default = 'default', action = 'store', dest = 'lw_env')
parser.add_argument('-q', help = 'Hunt by executing a raw LQL query', action = 'store', dest = 'query')
parser.add_argument('-f', help = 'Hunt by executing a LQL query from a YAML file', action = 'store', dest = 'file')
parser.add_argument('-t', help ='Hunt timeframe in days (default 7-days)', action = 'store', dest = 'days')
parser.add_argument('-c', '--count', help = 'Hunt and only count the hits, do not print the details to the screen', action = 'store_true')
parser.add_argument('-j', '--JSON', help = 'View the results as raw JSON', action = 'store_true')
parser.add_argument('-o', help = 'Export the results in CSV format or JSON if -j argument is passed', action = 'store', dest = 'filename')
return parser
def configuration(lw_env):
global lw_account
global authorization_token
config_file = os.path.expanduser("~") + "/.lacework.toml"
if os.path.isfile(config_file):
toml_data = toml.load(config_file)
lw_account = toml_data.get(lw_env).get('account')
keyId = toml_data.get(lw_env).get('api_key')
secret = toml_data.get(lw_env).get('api_secret')
api_version = toml_data.get(lw_env).get('version')
# Temporary Access Token Generation
token_url = "https://{}.lacework.net/api/v2/access/tokens".format(lw_account)
token_payload = json.dumps({
"keyId": keyId,
"expiryTime": 3600
})
token_headers = {
'X-LW-UAKS': secret,
'Content-Type': 'application/json'
}
token_response = requests.request("POST", token_url, headers=token_headers, data=token_payload)
json_data = json.loads(token_response.text)
authorization_token = json_data['token']
else:
print(f"{bcolors.BOLD}{bcolors.CYAN} {{}} {bcolors.ENDC}".format(banner))
print(f"[!] {bcolors.RED}{bcolors.UNDERLINE}ERROR{bcolors.ENDC}{bcolors.RED}: Missing ~/.lacework configuration file{bcolors.ENDC}")
print()
print(f"{bcolors.RED}Please install and configure the Lacework CLI before proceeding...{bcolors.ENDC}")
print()
print("This can be installed with the following bash command:")
print(f"{bcolors.BLUE}$ curl https://raw.githubusercontent.com/lacework/go-sdk/main/cli/install.sh | bash{bcolors.ENDC}")
quit()
def validate_query(queryValidation):
validation_url = "https://{}.lacework.net/api/v2/Queries/validate".format(lw_account)
if cloud_trail_activity:
payload = json.dumps({
"queryText": "{}".format(queryValidation),
"evaluatorId": "Cloudtrail"
})
else:
payload = json.dumps({
"queryText": "{}".format(queryValidation),
})
headers = {
'Authorization': authorization_token,
'Content-Type': 'application/json'
}
try:
response = requests.request("POST", validation_url, headers=headers, data=payload)
except requests.exceptions.RequestException as e:
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}Query Validation Error{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print("{}".format(e))
print()
if "data" in response.text:
pass
else:
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}Query Validation Error{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print()
print(f"[POST] {response.url}")
print(f"[{response.status_code} - {response.reason}]")
print()
print(response.text)
print()
quit()
def execute(query):
# Check if cloudtrail or otherwise
if "CloudTrailRawEvents" in query:
cloud_trail_activity = True
else:
cloud_trail_activity = False
# Check if the query is valid
validate_query(query)
# Obtain and format the Current Date and Time
current_date = datetime.datetime.now().strftime("%Y-%m-%d")
current_time = datetime.datetime.now().strftime("%H:%M:%S")
date_now = current_date + "T" + current_time + ".000Z"
# Back to the Future
search_window = datetime.datetime.now() - datetime.timedelta(int(time_in_days))
search_window_format = search_window.strftime("%Y-%m-%d")
search_range = search_window_format + "T" + current_time + ".000Z"
# Request
execute_custom_url = "https://{}.lacework.net/api/v2/Queries/execute".format(lw_account)
if cloud_trail_activity:
payload = json.dumps({
"query": {
"evaluatorId": "Cloudtrail",
"queryText": "{}".format(query)
},
"arguments": [
{
"name": "StartTimeRange",
"value": "{}".format(search_range)
},
{
"name": "EndTimeRange",
"value": "{}".format(date_now)
}
]
})
else:
payload = json.dumps({
"query": {
"queryText": "{}".format(query)
},
"arguments": [
{
"name": "StartTimeRange",
"value": "{}".format(search_range)
},
{
"name": "EndTimeRange",
"value": "{}".format(date_now)
}
]
})
headers = {
'Authorization': authorization_token,
'Content-Type': 'application/json'
}
response = requests.request("POST", execute_custom_url, headers=headers, data=payload)
json_data = json.loads(response.text)
try:
event_df = pd.DataFrame.from_dict(json_data['data'], orient='columns')
except:
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}ERROR{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print()
print(f"[POST] {response.url}")
print(f"[{response.status_code} - {response.reason}]")
print()
print(response.text)
print()
quit()
try:
if cloud_trail_activity:
event_count = len(json_data['data'])
else:
event_count = event_df.shape[0]
event_row_count = event_df.shape[1]
except:
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}ERROR{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print()
print(f"[POST] {response.url}")
print(f"[{response.status_code} - {response.reason}]")
print()
print(response.text)
print()
quit()
if JSON:
if filename:
with open(filename, 'a', encoding='utf-8') as outfile:
json.dump(json_data, outfile, ensure_ascii=False, indent=4)
print()
print(f"{bcolors.BOLD}JSON Output written to [{bcolors.CYAN}{{}}{bcolors.ENDC}{bcolors.BOLD}]{bcolors.ENDC}".format(filename))
print()
quit()
else:
json_formatted_data = json.dumps(json_data['data'], indent=4)
print(json_formatted_data)
quit()
# CLOUDTRAIL SPECIFIC PARSING
if cloud_trail_activity:
events_table = [['Event', 'Region', 'Source', 'Time', 'Type', 'Username', 'Source IP']]
if filename:
fields = ['Event', 'Region', 'Source', 'Time', 'Type', 'Username', 'Source IP', 'User Agent', 'Access Key ID', 'Account ID', 'Recipient Account ID', 'ARN', 'Principal ID', 'Session Context', 'Type', 'Category', 'Event ID', 'Request ID', 'Version', 'Management Event', 'Read Only', 'User Identity', 'Resources', 'Request Parameters', 'TLS Details', 'Query']
with open(filename, "a") as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(fields)
for d in range(event_count):
try:
event_awsRegion = json_data['data'][d]['EVENT']['awsRegion']
except:
event_awsRegion = "N/A"
try:
event_eventCategory = json_data['data'][d]['EVENT']['eventCategory']
except:
event_eventCategory = "N/A"
try:
event_eventID = json_data['data'][d]['EVENT']['eventID']
except:
event_eventID = "N/A"
try:
event_eventName = json_data['data'][d]['EVENT']['eventName']
except:
event_eventName = "N/A"
try:
event_eventSource = json_data['data'][d]['EVENT']['eventSource']
except:
event_eventSource = "N/A"
try:
event_eventTime = json_data['data'][d]['EVENT']['eventTime']
except:
event_eventTime = "N/A"
try:
event_eventType = json_data['data'][d]['EVENT']['eventType']
except:
event_eventType = "N/A"
try:
event_eventVersion = json_data['data'][d]['EVENT']['eventVersion']
except:
event_eventVersion = "N/A"
try:
event_managementEvent = json_data['data'][d]['EVENT']['managementEvent']
except:
event_managementEvent = "N/A"
try:
event_readOnly = json_data['data'][d]['EVENT']['readOnly']
except:
event_readOnly = "N/A"
try:
event_recipientAccountId = json_data['data'][d]['EVENT']['recipientAccountId']
except:
event_recipientAccountId = "N/A"
try:
event_requestID = json_data['data'][d]['EVENT']['requestID']
except:
event_requestID = "N/A"
try:
event_sourceIPAddress = json_data['data'][d]['EVENT']['sourceIPAddress']
except:
event_sourceIPAddress = "N/A"
try:
event_userAgent = json_data['data'][d]['EVENT']['userAgent']
except:
event_userAgent = "N/A"
try:
event_resources = json_data['data'][d]['EVENT']['resources']
except:
event_resources = "N/A"
try:
event_requestParameters = json_data['data'][d]['EVENT']['requestParameters']
except:
event_requestParameters = "N/A"
try:
event_tlsDetails = json_data['data'][d]['EVENT']['tlsDetails']
except:
event_tlsDetails = "N/A"
try:
event_userIdentity= json_data['data'][d]['EVENT']['userIdentity']
except:
event_userIdentity = "N/A"
try:
event_accountId = json_data['data'][d]['EVENT']['userIdentity']['accountId']
except:
event_accountId = "N/A"
try:
event_arn = json_data['data'][d]['EVENT']['userIdentity']['arn']
except:
event_arn = "N/A"
try:
event_principalId = json_data['data'][d]['EVENT']['userIdentity']['principalId']
except:
event_principalId = "N/A"
try:
event_type = json_data['data'][d]['EVENT']['userIdentity']['type']
except:
event_type = "N/A"
try:
event_userName = json_data['data'][d]['EVENT']['userIdentity']['userName']
except:
event_userName = "N/A"
try:
event_accessKeyId = json_data['data'][d]['EVENT']['userIdentity']['accessKeyId']
except:
event_accessKeyId = "N/A"
try:
event_sessionContext = json_data['data'][d]['EVENT']['userIdentity']['sessionContext']
except:
event_sessionContext = "N/A"
# Append JSON Data to Table for printing to screen
if event_count >= 2:
events_table += [[event_eventName, event_awsRegion, event_eventSource, event_eventTime, event_eventType, event_userName, event_sourceIPAddress]]
# Output full dataset to CSV if desired
if filename:
row = [event_eventName, event_awsRegion, event_eventSource, event_eventTime, event_eventType, event_userName, event_sourceIPAddress, event_userAgent, event_accessKeyId, event_accountId, event_recipientAccountId, event_arn, event_principalId, event_sessionContext, event_type, event_eventCategory, event_eventID, event_requestID, event_eventVersion, event_managementEvent, event_readOnly, event_userIdentity, event_resources, event_requestParameters, event_tlsDetails, query]
with open(filename, "a") as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(row)
# FOR EVERYTHING BUT CLOUDTRAIL
else:
if filename:
event_df.to_csv(filename, index=False)
if event_count == 0:
if count:
print(f"[!] {bcolors.BOLD}{bcolors.RED}No Events found over a {bcolors.ENDC}{bcolors.BOLD}{{}}{bcolors.RED}-day search period{bcolors.ENDC}".format(time_in_days))
print()
else:
print(f"[!] {bcolors.BOLD}{bcolors.RED}No Events found over a {bcolors.ENDC}{bcolors.BOLD}{{}}{bcolors.RED}-day search period{bcolors.ENDC}".format(time_in_days))
print()
elif event_count == 1:
if count:
print(f"[*] {bcolors.GREEN}1{bcolors.ENDC} Event returned over a {bcolors.GREEN}{{}}{bcolors.ENDC}-day search period".format(time_in_days))
print()
else:
print(f"[*] {bcolors.GREEN}1{bcolors.ENDC} Event returned over a {bcolors.GREEN}{{}}{bcolors.ENDC}-day search period".format(time_in_days))
print()
print(f"{bcolors.BOLD}Event Details{bcolors.ENDC}")
if cloud_trail_activity:
event_table = [['Event:', '{}'.format(event_eventName)]]
event_table += [['Region:', '{}'.format(event_awsRegion)]]
event_table += [['Source:', '{}'.format(event_eventSource)]]
event_table += [['Time:', '{}'.format(event_eventTime)]]
event_table += [['Type:', '{}'.format(event_eventType)]]
event_table += [['Username:', '{}'.format(event_userName)]]
event_table += [['Source IP:', '{}'.format(event_sourceIPAddress)]]
event_table += [['User Agent:', '{}'.format(event_userAgent)]]
event_table += [['Access Key ID:', '{}'.format(event_accessKeyId)]]
event_table += [['Account ID:', '{}'.format(event_accountId)]]
event_table += [['Recipient Account ID:', '{}'.format(event_recipientAccountId)]]
event_table += [['ARN:', '{}'.format(event_arn)]]
event_table += [['Principal ID:', '{}'.format(event_principalId)]]
event_table += [['Type:', '{}'.format(event_type)]]
event_table += [['Category:', '{}'.format(event_eventCategory)]]
event_table += [['Event ID:', '{}'.format(event_eventID)]]
event_table += [['Request ID:', '{}'.format(event_requestID)]]
else:
event_table = []
for col in event_df:
event_value = event_df[col]
event_table += [[col, event_value.to_string(index=False)]]
print(tabulate(event_table))
print()
if filename:
print(f"{bcolors.BOLD}Event written to [{bcolors.CYAN}{{}}{bcolors.ENDC}{bcolors.BOLD}]{bcolors.ENDC}".format(filename))
print()
else:
print("For additional information, export event details to a file:")
print(f"{bcolors.BLUE}$ ./{script_name} -q <query> -o <output_file.csv>{bcolors.ENDC}")
print()
elif event_count >= 2:
if count:
print(f"[*] Found [{bcolors.GREEN}{{}}{bcolors.ENDC}] events over a {bcolors.GREEN}{{}}{bcolors.ENDC}-day search period".format(event_count,time_in_days))
print()
else:
print(f"[*] Found [{bcolors.GREEN}{{}}{bcolors.ENDC}] events over a {bcolors.GREEN}{{}}{bcolors.ENDC}-day search period:".format(event_count,time_in_days))
print()
if cloud_trail_activity:
print(tabulate(events_table, headers='firstrow'))
else:
print(event_df)
print()
if filename:
print(f"{bcolors.BOLD}{bcolors.GREEN}{{}}{bcolors.ENDC}{bcolors.BOLD} Events written to [{bcolors.CYAN}{{}}{bcolors.ENDC}{bcolors.BOLD}]{bcolors.ENDC}".format(event_count,filename))
print()
else:
print("For additional information, export event details to a file:")
print(f"{bcolors.BLUE}$ ./{script_name} -q <query> -o <output_file.csv>{bcolors.ENDC}")
print()
def main():
# Argument Parsing
parser = parse_the_things()
args = parser.parse_args()
# cloud-hunter script
global script_name
script_name = os.path.basename(__file__)
# Global timeframe
global time_in_days
if args.days:
time_in_days = args.days
else:
time_in_days = 7
# Global Counter
if args.count:
global count
count = args.count
else:
count = ''
# Global File Writer
if args.filename:
global filename
filename = args.filename
else:
filename = ''
# Dump Raw JSON
if args.JSON:
global JSON
JSON = args.JSON
else:
JSON = ''
# Only query cloudtrail data if explicitly triggered
global cloud_trail_activity
cloud_trail_activity = False
if args.query:
# Authentication
configuration(args.lw_env)
execute(args.query)
elif args.file:
configuration(args.lw_env)
yml_extensions = ('.yml', '.yaml')
with open(args.file, "r") as stream:
ext = os.path.splitext(args.file)[-1].lower()
if ext in yml_extensions:
try:
query_data = yaml.safe_load(stream)
lql_query = json.dumps(query_data['queryText'])
lql_query = json.loads(lql_query)
execute(lql_query)
except yaml.YAMLError as exc:
print()
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}ERROR{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print()
print(exc)
print()
else:
try:
lql_query = open(args.file, "r").read()
execute(lql_query)
except:
print()
print(f"{bcolors.RED}[!] {bcolors.UNDERLINE}ERROR{bcolors.ENDC}{bcolors.RED} [!]{bcolors.ENDC}")
print()
print(f"Unable to parse [{args.file}]")
print("Please ensure the file is in YAML or raw LQL format")
print()
else:
print(f"{{}}".format(banner))
print(parser.format_help())
quit()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment