Skip to content

Instantly share code, notes, and snippets.

@adamf
Created June 1, 2016 03:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamf/cedf53a50ee2c025dfe637c8fb4add82 to your computer and use it in GitHub Desktop.
Save adamf/cedf53a50ee2c025dfe637c8fb4add82 to your computer and use it in GitHub Desktop.
A script to parse exported Splunk channel archives as a text with ISO8601 timestamps
#!/usr/bin/env python3
import argparse
import json
import sys
import os
import datetime
import re
# usage:
# python3 parse_splunk.py --archive channel_archives --channels general,random,cat_gifs
def parse_users(users_file):
users_file = open(users_file, 'r')
users_dict = json.load(users_file)
users = {}
for user in users_dict:
users[user['id']] = user['name']
return users
def parse_channel_archives(channel_log_directory, channels, users):
for channel in channels:
log_entries = {}
json_files = [json_files for json_files in os.listdir(channel_log_directory + '/' + channel + '/') if json_files.endswith('.json')]
print('\n\n#' + channel)
for channel_log_file in json_files:
log_file_handle = open(channel_log_directory + '/' + channel + '/' + channel_log_file, 'r')
log_dict = json.load(log_file_handle)
for log_entry in log_dict:
ts = log_entry['ts']
if 'subtype' in log_entry and 'message_changed' in log_entry['subtype']:
# replace the message in log_entries with the edited message
ts = log_entry['message']['ts']
log_entries[ts] = log_entry
log_entries[ts]['text'] = log_entry['message']['text']
log_entries[ts]['user'] = log_entry['message']['user']
elif 'subtype' not in log_entry:
log_entries[ts] = log_entry
else:
continue
log_text = log_entries[ts]['text']
for user_id in re.findall('<@(U........)>', log_text):
log_text = log_text.replace('<@' + user_id + '>', '@' + users[user_id])
log_entries[ts]['text'] = log_text
for ts in sorted(log_entries):
iso_time = datetime.datetime.fromtimestamp(int(float(log_entries[ts]['ts']))).isoformat()
print("%s %s: %s" % (iso_time, users[log_entries[ts]['user']], log_entries[ts]['text']))
def get_parser():
"""
Setup argparser.
"""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
'--archive_dir',
required=True,
help='''The directory containing the users.json and the directories of channel logs.''')
parser.add_argument(
'--channels',
required=True,
help='''Comma seperated list of channels (without the leading hash) to parse.''')
return parser
def get_args():
parser = get_parser()
args = parser.parse_args()
return args
def main(args):
users = parse_users(args.archive_dir + '/users.json')
parse_channel_archives(args.archive_dir, args.channels.split(','), users)
if __name__ == '__main__':
sys.exit(main(get_args()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment