Skip to content

Instantly share code, notes, and snippets.

@jaegeral
Forked from hiddenillusion/timesketch_query.py
Created November 1, 2018 16:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaegeral/a96ae7743fc64f94b99531b950d348f7 to your computer and use it in GitHub Desktop.
Save jaegeral/a96ae7743fc64f94b99531b950d348f7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# created by Glenn P. Edwards Jr.
# https://hiddenillusion.github.io
# @hiddenillusion
# Date: 2017-07-08
# (while at FireEye)
import cmd
import os
import sys
from collections import OrderedDict
from dateutil.parser import parse as DTparser
from tabulate import tabulate as Tabulate
from .client import TimesketchApi
"""
[ ] ability to configure uri/user/pw
[ ] total number of events/rows within a single timeline
[ ] delete a timeline
[ ] delete a view
[ ] delete a sketch
[ ] filter (explore) for search terms/dates
[ ] created/updated for a Sketch (e.g. - list_sketches should display)
[x] can't get timeline details if it's not in a sketch ATM ?
[ ] ability to sort/filter output (e.g. sort by asc created_at or # of events in a timeline)
"""
class TimeSketchQuery(cmd.Cmd):
def __init__(self, cfg=None):
cmd.Cmd.__init__(self)
self.prompt = "(tsq) "
self.doc_header = "TimesketchQuery commands"
self.undoc_header = "Misc. commands"
self._ts_uri = u'http://127.0.0.1:5000' #cfg.get('uri')
self._ts_user = u'user' #cfg.get('user')
self._ts_pw = u'password' #cfg.get('pw')
self._api_client = TimesketchApi(self._ts_uri, self._ts_user, self._ts_pw)
def preloop(self):
intro_text = """This utility leverages the Timesketch API/Client to interact with one's instance in various ways. \nMost commands (show_*|get_*) require a Sketch ID so the best command to start with is `list_sketches` ."""
print("\n"+intro_text+"\n")
def format_output(self, data, write_vertical=False):
if write_vertical:
print('='*25)
for k, v in data.iteritems():
print('{0:25} : {1}'.format(k, v))
print('='*25+'\n')
elif isinstance(data, dict):
print(Tabulate(
[data],
headers="keys",
tablefmt="orgtbl",
missingval="-",
)+'\n')
else:
print(Tabulate(
data,
headers="keys",
missingval="-",
)+'\n')
def format_timestamp(self, ts_str):
return DTparser(ts_str)
def format_arg_error(self, missing_arg):
print("[!] Incorrect/Missing {0} provided".format(missing_arg)+"\n")
def format_error(self, error_msg):
print("[!] {0}".format(error_msg)+"\n")
# lame but don't see another way within Cmd unless repetetive code or
# expand to argparse and just wanted to test out Cmd here
def validate_arg(self, arg):
try:
int(arg)
return True
except ValueError:
return False
def do_add_timeline(self, searchindex):
"""
NEEDS TO BE FINISHED
Adds a Timeline to a Sketch.
Args:
searchindex = SearchIndex obj
"""
self._api_client.add_timeline()
#https://github.com/google/timesketch/pull/380/files#diff-cfc277cad8b430c4868c66b63ef6ded2R368
def get_all_searchindices(self):
""" Convenience function to iterate SearchIndexes, regardless of Sketches """
for search_index in self._api_client.list_searchindices():
yield search_index
# To-Do: should verify there are sketches returned with len(sketches)
def get_all_sketch_timelines(self):
""" Convenience function to iterate Timelines from all Sketches """
sketches = self._api_client.list_sketches()
for sketch in sketches:
for timeline in sketch.list_timelines():
yield {'sketch_name':sketch.name, 'timeline':timeline}
def get_sketch_data(self, sketch_id):
""" Convenience function to get an instance of a Sketch object. """
sketch_data = self._api_client.get_sketch(int(sketch_id))
if not sketch_data.data:
self.format_arg_error("Sketch ID")
return None
else:
return sketch_data
def get_sketch_events_count(self, sketch_id):
""" ref: https://github.com/google/timesketch/blob/79019cacb39ad6fb93bd98ad5fbf590eee429f51/timesketch/__init__.py#L144 """
if not sketch_id or not self.validate_arg(sketch_id):
self.format_arg_error("Sketch ID")
return
resource_url = u'sketches/{0}/count'.format(int(sketch_id))
count_data = self._api_client.fetch_resource_data(resource_url)
if count_data:
return count_data[u'meta'][u'count']
def do_exit(self, line):
return True
'''
def es_index_name_exists(self, es_index_name):
response = self._api_client.fetch_resource_data(u'searchindices/')
for index in response[u'objects'][0]:
if es_index_name == index.get(u'index_name'):
return True
return False
'''
def do_create_searchindex(self, searchindex_name):
"""
Manually creates a SearchIndex.
Args:
searchindex_name = Name of the searchindex in Timesketch.
#es_index_name = Name to use for ElasticSearch Index
#public = Boolean indicating if the searchindex should be public.
"""
if not searchindex_name:
self.format_arg_error("SearchIndex Name")
return
# test to handle duplicate:
#es_index_name="987ed0294e864596a39b03ebd7fd25f0"
es_index_name=None
created_searchindex, was_created = self._api_client.get_or_create_searchindex(
searchindex_name=searchindex_name,
es_index_name=es_index_name,
)
if was_created:
new_searchindex_dict = OrderedDict()
new_searchindex_dict['SearchIndex ID'] = created_searchindex.id
new_searchindex_dict['SearchIndex Name'] = created_searchindex.name
new_searchindex_dict['ElasticSearch Index Name'] = created_searchindex.index_name
self.format_output(new_searchindex_dict)
else:
self.format_error("ES Index Name '{0}' already exists".format(es_index_name))
#def do_delete_timeline(self, ):
#u'/sketches/<int:sketch_id>/timelines/<int:timeline_id>/')
#https://github.com/google/timesketch/blob/master/timesketch/api/v1/resources.py#L1147
def do_list_search_templates(self, line):
"""
Lists all Search Templates.
ref: https://github.com/google/timesketch/blob/79019cacb39ad6fb93bd98ad5fbf590eee429f51/timesketch/__init__.py#L130
"""
result = []
search_templates = self._api_client.fetch_resource_data(u'searchtemplate')
if len(search_templates.get(u'objects')):
for search_template in search_templates[u'objects'][0]:
search_template_dict = OrderedDict()
search_template_dict['Search Template ID'] = search_template.get(u'id')
search_template_dict['Search Template Name'] = search_template.get(u'name')
result.append(search_template_dict)
self.format_output({'Total Search Templates': len(result)} ,write_vertical=True)
self.format_output(result)
def do_list_sketches(self, line):
""" Lists the details for each Sketch said user has permission to."""
result = []
try:
sketches = self._api_client.list_sketches()
except IndexError as err:
self.format_error("No Sketches found")
return
for sketch in sketches:
sketch_dict = OrderedDict()
sketch_dict['Sketch ID'] = sketch.id
sketch_dict['Sketch Name'] = sketch.name
sketch_dict['URL'] = '{0}/{1}'.format(self._ts_uri, sketch.resource_uri)
result.append(sketch_dict)
self.format_output({'Total Sketches': len(result)} ,write_vertical=True)
self.format_output(result)
def do_list_search_indices(self, line):
"""
Lists every SearchIndex said user has permission for, regardless of it they're included in a Sketch
Note: Depending on how many Sketches/Timelines you have,
this may take a moment.
"""
result = []
for search_index in self.get_all_searchindices():
search_index_dict = OrderedDict()
search_index_dict['SearchIndex ID'] = search_index.id
search_index_dict['SearchIndex Name'] = search_index.name
search_index_dict['ES Index Name'] = search_index.index_name
search_index_dict['Timeline Search Index Name'] = search_index._searchindex_name
# doesn't make sense to include this ATM since you can't directly browse to it
search_index_dict['URL'] = '{0}/{1}'.format(self._ts_uri, search_index.resource_uri)
result.append(search_index_dict)
self.format_output({'Total Search Indices': len(result)} ,write_vertical=True)
self.format_output(result)
def do_list_sketch_timelines(self, line):
"""
Lists every Timeline said user has permission for. Currently, this only retrieves Timelines which are associated with a Sketch.
Note: Depending on how many Sketches/Timelines you have,
this may take a moment. Additionally, if a Timeline
isn't included in a Sketch, it won't be displayed here.
"""
# a timeline can be in more than one Sketch so need to unique them by index?
result = []
try:
for sketch_dict in self.get_all_sketch_timelines():
timeline_dict = OrderedDict()
timeline_dict['Timeline ID'] = sketch_dict['timeline'].id
timeline_dict['Sketch Name'] = sketch_dict.get('sketch_name')
timeline_dict['Timeline Name'] = sketch_dict['timeline'].name
timeline_dict['Timeline Index Name'] = sketch_dict['timeline'].index
result.append(timeline_dict)
self.format_output({'Total Timelines': len(result)} ,write_vertical=True)
self.format_output(result)
except IndexError as err:
self.format_error("No Sketches found")
return
'''
def do_make_timeline_public(self, index_name):
"""
Change a timeline from private to public
Args:
index_name = Name to use for ElasticSearch Index
"""
if not index_name:
self.format_arg_error("Timeline Index Name")
return
'''
def do_show_sketch_views(self, sketch_id):
"""
Lists basic details for each View within a Timesketch Sketch.
Args:
sketch_id = ID for a Sketch
"""
if not sketch_id or not self.validate_arg(sketch_id):
self.format_arg_error("Sketch ID")
return
result = []
sketch_data = self.get_sketch_data(sketch_id)
if sketch_data:
#if you create a timeline, but dont' provide any data the error isn't currently caught
try:
for view in sketch_data.list_views():
view_dict = OrderedDict()
view_dict['Sketch Name'] = sketch_data.name
view_dict['View ID'] = view.id
view_dict['View Name'] = view.name
view_dict['View URL'] = '{0}/{1}'.format(self._ts_uri, view.resource_uri)
result.append(view_dict)
self.format_output({'Total Views': len(result)} ,write_vertical=True)
self.format_output(result)
except KeyError as err:
self.format_error("No data for the requested Sketch was found")
def do_show_sketch_timelines(self, sketch_id):
"""
Lists all Timelines for a given Sketch.
Args:
sketch_id = ID for a Sketch
"""
if not sketch_id or not self.validate_arg(sketch_id):
self.format_arg_error("Sketch ID")
return
result = []
sketch_data = self.get_sketch_data(sketch_id)
if sketch_data:
#if you create a timeline, but dont' provide any data the error isn't currently caught
try:
for timeline in sketch_data.list_timelines():
timeline_dict = OrderedDict()
timeline_dict['Sketch ID'] = sketch_data.id
timeline_dict['Sketch Name'] = sketch_data.name
timeline_dict['Timeline ID'] = timeline.id
timeline_dict['Timeline Name'] = timeline.name
timeline_dict['Timeline Index'] = timeline.index
timeline_dict['URL'] = '{0}/{1}'.format(self._ts_uri, timeline.resource_uri)
result.append(timeline_dict)
self.format_output({'Total Timelines': len(result)} ,write_vertical=True)
self.format_output(result)
except KeyError as err:
self.format_error("No data for the requested Sketch was found")
def do_show_sketch_stories(self, sketch_id):
"""
Lists all Search Templates for a given Sketch.
Args:
sketch_id = ID for a Sketch
ref: https://github.com/google/timesketch/blob/79019cacb39ad6fb93bd98ad5fbf590eee429f51/timesketch/__init__.py#L136
"""
if not sketch_id or not self.validate_arg(sketch_id):
self.format_arg_error("Sketch ID")
return
resource_url = u'sketches/{0}/stories/'.format(int(sketch_id))
sketch_stories = self._api_client.fetch_resource_data(resource_url)
result = []
#if you create a timeline, but dont' provide any data the error isn't currently caught
try:
if len(sketch_stories.get(u'objects')):
for sketch_story in sketch_stories[u'objects'][0]:
sketch_story_dict = OrderedDict()
sketch_story_dict['Sketch ID'] = sketch_story[u'sketch'][u'id']
sketch_story_dict['Sketch Name'] = sketch_story[u'sketch'][u'name']
sketch_story_dict['Search Template ID'] = sketch_story.get(u'id')
sketch_story_dict['Created'] = self.format_timestamp(sketch_story.get(u'created_at'))
sketch_story_dict['Updated'] = self.format_timestamp(sketch_story.get(u'updated_at'))
sketch_story_dict['User'] = sketch_story.get(u'user')[u'username']
sketch_story_dict['Search Template Title'] = sketch_story.get(u'title')
result.append(sketch_story_dict)
self.format_output({'Total Stories': len(result)} ,write_vertical=True)
self.format_output(result)
except (KeyError,TypeError) as err:
self.format_error("No data for the requested Sketch was found")
def do_get_search_template_details(self, search_template_id):
"""
Displays details for a single Search Template said user has permission to.
Args:
search_template_id = ID for a Search Template
"""
# Suppose we could search by it's name too - but,
# - have to iterate all search templates initially vs. directly going to `seach_template_id`
# - may not be unique so display all matches or break on first?
if not search_template_id or not self.validate_arg(search_template_id):
self.format_arg_error("Search Template ID")
return
resource_url = u'searchtemplate/{0}'.format(int(search_template_id))
search_template_data = self._api_client.fetch_resource_data(resource_url)
if search_template_data:
search_template = search_template_data[u'objects'][0]
search_template_dict = OrderedDict()
search_template_dict['Search Template ID'] = search_template.get(u'id')
search_template_dict['Search Template Name'] = search_template.get(u'name')
search_template_dict['Created'] = self.format_timestamp(search_template.get(u'created_at'))
search_template_dict['Updated'] = self.format_timestamp(search_template.get(u'updated_at'))
search_template_dict['User'] = search_template.get(u'user')['username']
search_template_dict['Query DSL'] = search_template.get(u'query_dsl')
search_template_dict['Query String'] = search_template.get(u'query_string')
search_template_dict['Query Filter'] = search_template.get(u'query_filter')
self.format_output(search_template_dict, write_vertical=True)
else:
self.format_arg_error("Search Template ID")
return
def do_get_sketch_details(self, sketch_id):
"""
Displays details for a single Sketch said user has permission to.
Args:
sketch_id = ID for a Sketch
"""
if not sketch_id or not self.validate_arg(sketch_id):
self.format_arg_error("Sketch ID")
return
sketch_data = self.get_sketch_data(sketch_id)
try:
if sketch_data:
sketch_dict = OrderedDict()
sketch_dict['Sketch ID'] = sketch_data.id
sketch_dict['Sketch Name'] = sketch_data.name
sketch_dict['Total Events'] = self.get_sketch_events_count(sketch_id)
sketch_dict['Total Timelines'] = len(sketch_data.list_timelines())
sketch_dict['Total Views'] = len(sketch_data.list_views())
sketch_dict['Status'] = sketch_data.status
sketch_dict['Description'] = sketch_data.description
sketch_dict['URL'] = '{0}/{1}'.format(self._ts_uri, sketch_data.resource_uri)
self.format_output(sketch_dict)
except KeyError as err:
self.format_error("No data for the requested Sketch was found")
def do_get_search_index_details(self, searchindex_id):
""" add notes here """
if not searchindex_id:
self.format_arg_error("SearchIndex ID")
return
searchindex = self.api_client.get_searchindex(searchindex_id)
print(dir(searchindex))
def do_get_timeline_details(self, timeline_index_name_or_name):
"""
Displays details for a Timeline. If multiple timelines
are found based on duplicate `timeline_index_name_or_name`
then each of the matched timelines are displayed.
Args:
timeline_index_or_name = Elasticsearch index name for a Timeline or
the name (case sensitive) of the Timesketch Timeline.
"""
if not timeline_index_name_or_name:
self.format_arg_error("Timeline ID/Name")
return
else:
timeline_index_name_or_name = u'{0}'.format(timeline_index_name_or_name)
results = []
for timeline in self.get_all_timelines():
if timeline.index_name == timeline_index_name_or_name or timeline.name == timeline_index_name_or_name:
timeline_data = timeline.data[u'objects'][0]
timeline_dict = OrderedDict()
timeline_dict['Timeline ID'] = timeline.id
timeline_dict['Timeline Name'] = timeline.name
timeline_dict['Timeline Index Name'] = timeline.index_name
timeline_dict['Created'] = self.format_timestamp(timeline_data.get(u'created_at'))
timeline_dict['Updated'] = self.format_timestamp(timeline_data.get(u'updated_at'))
timeline_dict['Status'] = timeline_data[u'status'][0][u'status']
timeline_dict['Color'] = timeline_data.get(u'color')
timeline_dict['Description'] = timeline_data.get(u'description')
# doesn't make sense to include this ATM since you can't directly browse to it
#timeline_dict['URL'] = '{0}/{1}'.format(self._ts_uri, timeline.resource_uri)
results.append(timeline_dict)
if len(results):
if len(results) > 1:
self.format_output(results)
else:
self.format_output(results[0], write_vertical=True)
else:
self.format_arg_error('Timeline ID/Name')
def do_get_view_details(self, view_id_or_name):
"""
Displays details for a View/View Templates said user has permission to.
If multiple Views are found based on duplicate `view_id_or_name`
then each of the matched views are displayed.
Args:
view_id_or_name = ID of a View or the name
(case sensitive) of the Timesketch Timeline.
"""
if not view_id_or_name:
self.format_arg_error("View ID/Name")
return
else:
view_id_or_name = u'{0}'.format(view_id_or_name)
results = []
sketches = self._api_client.list_sketches()
for sketch in sketches:
for view in sketch.list_views():
if u'{0}'.format(view.id) == view_id_or_name or view.name == view_id_or_name:
view_dict = OrderedDict()
view_dict['Sketch Name'] = sketch.name
view_dict['View ID'] = view.id
view_dict['View Name'] = view.name
view_dict['Query String'] = view.query_string
view_dict['Query Filter'] = view.query_filter
view_dict['Query DSL'] = view.query_dsl
view_dict['URL'] = '{0}/{1}'.format(self._ts_uri, view.resource_uri)
results.append(view_dict)
if len(results):
#if len(results) > 1:
# self.format_output(results)
#else:
for result in results:
self.format_output(results[0], write_vertical=True)
else:
self.format_arg_error('View ID/Name')
#def do_query_sketch(self, sketch_id, query_string=None, query_dsl=None, query_filter=None):
"""
Args:
sketch_id
query_string
query_dsl
query_filter
"""
# https://github.com/google/timesketch/blob/347-python-api-client/api_client/python/timesketch_api_client/client.py#L283
# args =
if __name__ == "__main__":
TimeSketchQuery().cmdloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment