Created
January 11, 2020 06:38
-
-
Save Cyb3rWard0g/91ca28ac1238313ac90365987996b949 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Author: Roberto Rodriguez (@Cyb3rWard0g) | |
# License: GPL-3.0 | |
import nbformat as nbf | |
import yaml | |
import subprocess | |
import argparse | |
from os import path | |
# Initial description | |
text = "This script allows you to create a Jupyter notebook with informati a sigma rule converted to an Elasticsearch string to query an Elasticsearch database" | |
# Initiate the parser | |
parser = argparse.ArgumentParser(description=text) | |
# Add arguments (store_true means no argument needed) | |
parser.add_argument("-r", "--sigma-rule", help="sigma rule", type=str , required=True) | |
parser.add_argument("-c", "--helk-mappings-config", help="sigma rule", type=str , required=True) | |
args = parser.parse_args() | |
# convert sigma rule to Elasticsearch query string | |
rule_name = path.basename(args.sigma_rule) | |
print("[+] Converting rule {} to Elasticsearch query string..".format(rule_name)) | |
sigmacCommand = ["sigmac", "-t", "es-qs", "-c", args.helk_mappings_config, "{}".format(args.sigma_rule)] | |
p = subprocess.run(sigmacCommand, stdout=subprocess.PIPE, text=True) | |
elasticsearch_query_string = (p.stdout).strip() | |
# Parse yaml file | |
print("[+] Reading rule {} as a python dictionary..".format(rule_name)) | |
rule = yaml.safe_load(open(args.sigma_rule).read()) | |
# Initializing Notebooks Cells | |
nb = nbf.v4.new_notebook() | |
nb['cells'] = [] | |
# *** Title and Rule Markdown Cell*** | |
nb['cells'].append(nbf.v4.new_markdown_cell("# {}".format(rule['title']))) | |
RULE_CONTENT = yaml.dump(rule, sort_keys=False) | |
nb['cells'].append(nbf.v4.new_markdown_cell("""## Rule Content | |
``` | |
{} | |
```""".format(RULE_CONTENT))) | |
# *** Import Libraries Markdown Cell *** | |
nb['cells'].append(nbf.v4.new_markdown_cell("## Import Libraries")) | |
# *** Import Libraries Code Cell *** | |
nb['cells'].append(nbf.v4.new_code_cell("""from elasticsearch import Elasticsearch | |
from elasticsearch_dsl import Search | |
import pandas as pd""")) | |
# *** Initialize Elasticsearch Markdown Cell *** | |
nb['cells'].append(nbf.v4.new_markdown_cell("## Initialize Elasticsearch client")) | |
# *** Initialize Elasticsearch Code Cell *** | |
nb['cells'].append(nbf.v4.new_code_cell("""es = Elasticsearch(['http://helk-elasticsearch:9200']) | |
searchContext = Search(using=es, index='logs-*', doc_type='doc')""")) | |
# *** Set search query context Makdown Cell *** | |
nb['cells'].append(nbf.v4.new_markdown_cell("## Set Elasticsearch Query Context")) | |
# *** Set search query context Code Cell *** | |
nb['cells'].append(nbf.v4.new_code_cell("s = searchContext.query('query_string', query='{}')".format(elasticsearch_query_string))) | |
# *** Run query and explore response Markdown Cell *** | |
nb['cells'].append(nbf.v4.new_markdown_cell("## Run query and explore results")) | |
# *** Run query and explore response Code Cell *** | |
nb['cells'].append(nbf.v4.new_code_cell("""response = s.execute() | |
if response.success(): | |
df = pd.DataFrame((d.to_dict() for d in s.scan())) | |
df""")) | |
# *** Create/write Jupyter Notebook ** | |
notebook_name = path.splitext(path.basename(args.sigma_rule))[0] | |
nbf.write(nb, "{}.ipynb".format(notebook_name)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment