Skip to content

Instantly share code, notes, and snippets.

@leehinman
Created April 8, 2021 20:36
Show Gist options
  • Save leehinman/154d02b8b72506f1b418d62631eeda54 to your computer and use it in GitHub Desktop.
Save leehinman/154d02b8b72506f1b418d62631eeda54 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import json
import os
import pprint
import difflib
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("beats_dir",
help="Full path to beats directory")
parser.add_argument("integrations_dir",
help="Full path to integrations directory")
return parser.parse_args()
def get_data_streams(integrations_dir):
zeek_dir = os.path.join(integrations_dir, "packages/zeek/data_stream")
return [name for name in os.listdir(zeek_dir) if os.path.isdir(os.path.join(zeek_dir, name))]
def get_beats_expected(beats_dir, data_stream):
expected_filename = "{0}-json.log-expected.json".format(data_stream)
full_path = os.path.join(beats_dir, "x-pack/filebeat/module/zeek", data_stream, "test", expected_filename)
if os.path.isfile(full_path):
return full_path
else:
return ""
def get_integrations_expected(integrations_dir, data_stream):
expected_filename = "test-{0}.log-expected.json".format(data_stream)
full_path = os.path.join(integrations_dir, "packages/zeek/data_stream", data_stream, "_dev/test/pipeline", expected_filename)
if os.path.isfile(full_path):
return full_path
else:
return ""
def get_beats_json(filename):
if not filename:
return
data = ""
with open(filename) as f_handle:
data = json.load(f_handle)
return json.dumps(data, sort_keys=True, indent=2)
def get_integrations_json(filename):
if not filename:
return
data = ""
with open(filename) as f_handle:
data = json.load(f_handle)
results = []
for o in data['expected']:
results.append(flatten_object(o, []))
return json.dumps(results, sort_keys=True, indent=2)
def flatten_object(obj, dict_fields, prefix=""):
result = {}
for key, value in obj.items():
if isinstance(value, dict) and prefix + key not in dict_fields:
new_prefix = prefix + key + "."
result.update(flatten_object(value, dict_fields, new_prefix))
else:
result[prefix + key] = value
return result
if __name__ == '__main__':
args = parse_args()
for ds in get_data_streams(args.integrations_dir):
print("Diff in {0}".format(ds))
beats_expected = get_beats_json(get_beats_expected(args.beats_dir, ds))
if beats_expected is None:
print("No beats expected file")
continue
integrations_expected = get_integrations_json(get_integrations_expected(args.integrations_dir, ds))
if integrations_expected is None:
print("No integrations expected file")
continue
d = difflib.Differ()
result = list(d.compare(beats_expected.splitlines(keepends=True),
integrations_expected.splitlines(keepends=True)))
pprint.pprint(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment