Skip to content

Instantly share code, notes, and snippets.

@mbabinski
Created May 24, 2023 19:45
Demonstration tool showing how to bulk close investigations in the Rapid7 Insight IDR platform using InsightIDR4Py. Created for the Medium post "Button-Pusher to MasterBuilder: Automating SIEM Workflows."
import InsightIDR4Py as idr
import argparse
import sys
# collect user choices
parser = argparse.ArgumentParser(description="Closes investigations in bulk depending on user selections.",
epilog='Example usage: `python InvestigationClosure.py --assignee-email \
swilliams@acme.com --days-since-last-access 30 --source ALERT --disposition BENIGN`')
parser.add_argument("-p", "--priority",
dest="priority",
help="Comma-separated list of priority values for the investigation. Options include [CRITICAL,HIGH,MEDIUM,LOW].",
required=False,
type=str)
parser.add_argument("-d", "--disposition",
dest="disposition",
help="Comma-separated list of disposition values for the investigation. Options include [BENIGN,MALICIOUS,NOT_APPLICABLE,UNDECIDED] or ALL.",
required=False,
type=str,
default="BENIGN,NOT_APPLICABLE")
parser.add_argument("-s", "--source",
dest="source",
help="Comma-separated list of source values for the investigation. Options include [USER,HUNT,ALERT] or ALL.",
required=False,
type=str)
parser.add_argument("-ae", "--assignee-email",
dest="assignee_email",
help="Email address of the investigation's assignee.",
required=False,
type=str)
parser.add_argument("-dlac", "--days-since-last-access",
dest="days_since_last_access",
help="Minimum number of days since investigation was last viewed or modified.",
required=False,
type=int)
parser.add_argument("-dlal", "--days-since-last-alert",
dest="days_since_last_alert",
help="Minimum number of days since the last alert associated with the investigation.",
required=False,
type=int)
parser.add_argument("-cc", "--close-comment",
dest="close_comment",
help="The comment message to add to the investigation when closing.",
required=False,
type=str)
# parse inputs
args = parser.parse_args()
priority = args.priority
disposition = args.disposition
source = args.source
assignee_email = args.assignee_email
days_since_last_access = args.days_since_last_access
days_since_last_alert = args.days_since_last_alert
close_comment = args.close_comment
# validate user 'priority' selection
if priority:
priorities = [item.upper().strip() for item in priority.split(",")]
for item in priorities:
if item not in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]:
raise ValueError("Error, the priority selection {} is not a valid choice!".format(item))
else:
priorities = ["CRITICAL", "HIGH", "MEDIUM", "LOW"]
# validate user 'disposition' selection
if disposition.upper() != "ALL":
dispositions = [item.upper().strip() for item in disposition.split(",")]
for item in dispositions:
if item not in ["BENIGN", "MALICIOUS", "NOT_APPLICABLE", "UNDECIDED"]:
raise ValueError("Error, the disposition selection {} is not a valid choice!".format(item))
else:
dispositions = ["BENIGN", "MALICIOUS", "NOT_APPLICABLE", "UNDECIDED"]
# validate user 'source' selection
if source:
sources = [item.upper().strip() for item in source.split(",")]
for item in sources:
if item not in ["USER", "HUNT", "ALERT"]:
raise ValueError("Error, the source selection {} is not a valid choice!".format(item))
else:
sources = ["USER", "HUNT", "ALERT"]
# connect to InsightIDR and list investigations
print("Connecting to InsightIDR.")
api_key = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
api = idr.InsightIDR(api_key)
# use a start time of one year ago to catch (hopefully) all investigations
start_time = (idr.datetime.now(idr.timezone.utc) - idr.timedelta(365)).strftime("%Y-%m-%dT%H:%M:%SZ")
# list investigations
print("\nListing investigations.")
all_investigations = api.ListInvestigations(assignee_email,
start_time,
multi_customer=True,
priorities=priorities,
statuses=["OPEN", "INVESTIGATING"])
# filter by disposition
print("\nFiltering investigations.")
filtered_investigations = [inv for inv in all_investigations if inv["disposition"] in dispositions]
# filter by source
filtered_investigations = [inv for inv in filtered_investigations if inv["source"] in sources]
# filter by last access date
if days_since_last_access != None:
# get the threshold last access date
threshold_access_date = idr.datetime.now(idr.timezone.utc) - idr.timedelta(days_since_last_access)
for investigation in filtered_investigations:
# get the investigation last access date and compare
last_access_date = idr.datetime.strptime(investigation["last_accessed"], "%Y-%m-%dT%H:%M:%S.%f%z")
if last_access_date < threshold_access_date:
filtered_investigations.remove(investigation)
# filter by last alert date
if days_since_last_alert != None:
# get the threshold last alert date
threshold_alert_date = idr.datetime.now(idr.timezone.utc) - idr.timedelta(days_since_last_alert)
for investigation in filtered_investigations:
# check whethere the investigation has a last alert time
if investigation["latest_alert_time"]:
# get the investigation last alert date and compare
last_alert_date = idr.datetime.strptime(investigation["latest_alert_time"], "%Y-%m-%dT%H:%M:%S.%f%z")
if last_alert_date < threshold_alert_date:
filtered_investigations.remove(investigation)
# confirm the choice
print("\nInvestigations slated for closure:")
for investigation in filtered_investigations:
print("- {} (Created {})".format(investigation["title"], investigation["created_time"]))
choice = ""
while choice.lower() != "y":
choice = input("\nYou have chosen to close the investigations listed above. Continue? (y/n)")
if choice.lower() == "n":
print("Exiting!")
sys.exit()
# close the investigations and add the comment
print("\nClosing investigations.")
for investigation in filtered_investigations:
print("- Closing {}.".format(investigation["title"]))
inv_id = investigation["rrn"]
result = api.UpdateInvestigation(inv_id, status="CLOSED")
if close_comment:
comment = api.CreateComment(inv_id, close_comment)
print("\nWay to go - everything is awesome!!!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment