Skip to content

Instantly share code, notes, and snippets.

@SHolzhauer
Created June 2, 2022 18:32
Show Gist options
  • Save SHolzhauer/d4ab739423240df6c2f9c70f6319dd56 to your computer and use it in GitHub Desktop.
Save SHolzhauer/d4ab739423240df6c2f9c70f6319dd56 to your computer and use it in GitHub Desktop.
Python script to use in CI-CD to update custom detection rules in Elastic Stack
stages:
- test
- deploy
test_rules:
stage: test
image: python:3.8
script:
- python -V # Print out python version for debugging
- pip install virtualenv
- virtualenv venv
- source venv/bin/activate
- pip install -r requirements.txt
- python -m detection_rules test
deploy_rules:
stage: deploy
image: python:3.8
only:
- main
script:
- python -V # Print out python version for debugging
- pip install virtualenv
- virtualenv venv
- source venv/bin/activate
- pip install -r requirements.txt
- python update_custom_rules.py
import json
import requests
import toml
import os
from uuid import uuid4
kbnuser = os.environ["kibana_usr"]
kbnpwd = os.environ["kibana_pwd"]
def create_rules(createbody, kbnuser,kbnpwd):
resp = requests.post(
url="https://{}/api/detection_engine/rules/_bulk_create".format(os.environ["kibana_url"]),
json=createbody,
headers={
"Content-Type": "application/json",
"kbn-xsrf": uuid4()
},
auth=(kbnuser,kbnpwd),
verify=False
)
for response in resp.json():
failure = False
try:
if response["statusCode"] in range(400, 599):
response["statusCode"]
print(resp.json())
print("=====================================================================")
print(createbody)
failure = True
if failure:
print(response)
raise ValueError("Failed to create rule")
except Exception as err:
print("Exception: {}".format(err))
print(response)
raise ValueError("Failed to create rule")
custom_rules = []
# Get all the custom rules; aka those prefixed with your custom prefix
for root, dirs, files in os.walk("rules/"):
for file in files:
if file.startswith("custom_prefix_"):
custom_rules.append(os.path.join(root, file))
# read in toml
toml_rules = []
for rulefile in custom_rules:
try:
with open(rulefile, "r") as f:
rule = f.read()
t_rule = toml.loads(rule)
toml_rules.append(t_rule)
except Exception as err:
pass
#print("Failed to parse {} with error: {}".format(rulefile, err))
updatebody = []
for r in toml_rules:
rule = r["rule"]
if "rule_id" not in rule:
continue
else:
updatebody.append(rule)
# bulk request to update
resp = requests.put(
url="https://{}/api/detection_engine/rules/_bulk_update".format(os.environ["kibana_url"]),
json=updatebody,
headers={
"Content-Type": "application/json",
"kbn-xsrf": uuid4()
},
auth=(kbnuser,kbnpwd),
verify=False
)
response = resp.json()
if "error" in response:
print(response["message"])
exit(1)
createbody = []
for rule_resp in resp.json():
try:
if "error" in rule_resp and "not found" in rule_resp["error"]["message"]:
print(rule_resp["error"]["message"])
# find rule in body and create the rule
for r in updatebody:
if r["rule_id"] in rule_resp["error"]["message"]:
createbody.append(r)
except TypeError:
print(rule_resp)
created = False
while not created:
try:
create_rules(createbody, kbnuser, kbnpwd)
except Exception:
pass
else:
created = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment