Created
May 24, 2023 19:45
Demonstration tool showing how to bulk close investigations in the Rapid7 Insight IDR platform using InsightIDR4Py. Created for the Medium post "Button-Pusher to MasterBuilder: Automating SIEM Workflows."
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import InsightIDR4Py as idr | |
import argparse | |
import sys | |
# collect user choices | |
parser = argparse.ArgumentParser(description="Closes investigations in bulk depending on user selections.", | |
epilog='Example usage: `python InvestigationClosure.py --assignee-email \ | |
swilliams@acme.com --days-since-last-access 30 --source ALERT --disposition BENIGN`') | |
parser.add_argument("-p", "--priority", | |
dest="priority", | |
help="Comma-separated list of priority values for the investigation. Options include [CRITICAL,HIGH,MEDIUM,LOW].", | |
required=False, | |
type=str) | |
parser.add_argument("-d", "--disposition", | |
dest="disposition", | |
help="Comma-separated list of disposition values for the investigation. Options include [BENIGN,MALICIOUS,NOT_APPLICABLE,UNDECIDED] or ALL.", | |
required=False, | |
type=str, | |
default="BENIGN,NOT_APPLICABLE") | |
parser.add_argument("-s", "--source", | |
dest="source", | |
help="Comma-separated list of source values for the investigation. Options include [USER,HUNT,ALERT] or ALL.", | |
required=False, | |
type=str) | |
parser.add_argument("-ae", "--assignee-email", | |
dest="assignee_email", | |
help="Email address of the investigation's assignee.", | |
required=False, | |
type=str) | |
parser.add_argument("-dlac", "--days-since-last-access", | |
dest="days_since_last_access", | |
help="Minimum number of days since investigation was last viewed or modified.", | |
required=False, | |
type=int) | |
parser.add_argument("-dlal", "--days-since-last-alert", | |
dest="days_since_last_alert", | |
help="Minimum number of days since the last alert associated with the investigation.", | |
required=False, | |
type=int) | |
parser.add_argument("-cc", "--close-comment", | |
dest="close_comment", | |
help="The comment message to add to the investigation when closing.", | |
required=False, | |
type=str) | |
# parse inputs | |
args = parser.parse_args() | |
priority = args.priority | |
disposition = args.disposition | |
source = args.source | |
assignee_email = args.assignee_email | |
days_since_last_access = args.days_since_last_access | |
days_since_last_alert = args.days_since_last_alert | |
close_comment = args.close_comment | |
# validate user 'priority' selection | |
if priority: | |
priorities = [item.upper().strip() for item in priority.split(",")] | |
for item in priorities: | |
if item not in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]: | |
raise ValueError("Error, the priority selection {} is not a valid choice!".format(item)) | |
else: | |
priorities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"] | |
# validate user 'disposition' selection | |
if disposition.upper() != "ALL": | |
dispositions = [item.upper().strip() for item in disposition.split(",")] | |
for item in dispositions: | |
if item not in ["BENIGN", "MALICIOUS", "NOT_APPLICABLE", "UNDECIDED"]: | |
raise ValueError("Error, the disposition selection {} is not a valid choice!".format(item)) | |
else: | |
dispositions = ["BENIGN", "MALICIOUS", "NOT_APPLICABLE", "UNDECIDED"] | |
# validate user 'source' selection | |
if source: | |
sources = [item.upper().strip() for item in source.split(",")] | |
for item in sources: | |
if item not in ["USER", "HUNT", "ALERT"]: | |
raise ValueError("Error, the source selection {} is not a valid choice!".format(item)) | |
else: | |
sources = ["USER", "HUNT", "ALERT"] | |
# connect to InsightIDR and list investigations | |
print("Connecting to InsightIDR.") | |
api_key = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" | |
api = idr.InsightIDR(api_key) | |
# use a start time of one year ago to catch (hopefully) all investigations | |
start_time = (idr.datetime.now(idr.timezone.utc) - idr.timedelta(365)).strftime("%Y-%m-%dT%H:%M:%SZ") | |
# list investigations | |
print("\nListing investigations.") | |
all_investigations = api.ListInvestigations(assignee_email, | |
start_time, | |
multi_customer=True, | |
priorities=priorities, | |
statuses=["OPEN", "INVESTIGATING"]) | |
# filter by disposition | |
print("\nFiltering investigations.") | |
filtered_investigations = [inv for inv in all_investigations if inv["disposition"] in dispositions] | |
# filter by source | |
filtered_investigations = [inv for inv in filtered_investigations if inv["source"] in sources] | |
# filter by last access date | |
if days_since_last_access != None: | |
# get the threshold last access date | |
threshold_access_date = idr.datetime.now(idr.timezone.utc) - idr.timedelta(days_since_last_access) | |
for investigation in filtered_investigations: | |
# get the investigation last access date and compare | |
last_access_date = idr.datetime.strptime(investigation["last_accessed"], "%Y-%m-%dT%H:%M:%S.%f%z") | |
if last_access_date < threshold_access_date: | |
filtered_investigations.remove(investigation) | |
# filter by last alert date | |
if days_since_last_alert != None: | |
# get the threshold last alert date | |
threshold_alert_date = idr.datetime.now(idr.timezone.utc) - idr.timedelta(days_since_last_alert) | |
for investigation in filtered_investigations: | |
# check whethere the investigation has a last alert time | |
if investigation["latest_alert_time"]: | |
# get the investigation last alert date and compare | |
last_alert_date = idr.datetime.strptime(investigation["latest_alert_time"], "%Y-%m-%dT%H:%M:%S.%f%z") | |
if last_alert_date < threshold_alert_date: | |
filtered_investigations.remove(investigation) | |
# confirm the choice | |
print("\nInvestigations slated for closure:") | |
for investigation in filtered_investigations: | |
print("- {} (Created {})".format(investigation["title"], investigation["created_time"])) | |
choice = "" | |
while choice.lower() != "y": | |
choice = input("\nYou have chosen to close the investigations listed above. Continue? (y/n)") | |
if choice.lower() == "n": | |
print("Exiting!") | |
sys.exit() | |
# close the investigations and add the comment | |
print("\nClosing investigations.") | |
for investigation in filtered_investigations: | |
print("- Closing {}.".format(investigation["title"])) | |
inv_id = investigation["rrn"] | |
result = api.UpdateInvestigation(inv_id, status="CLOSED") | |
if close_comment: | |
comment = api.CreateComment(inv_id, close_comment) | |
print("\nWay to go - everything is awesome!!!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment