-
-
Save jaegeral/a96ae7743fc64f94b99531b950d348f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# created by Glenn P. Edwards Jr. | |
# https://hiddenillusion.github.io | |
# @hiddenillusion | |
# Date: 2017-07-08 | |
# (while at FireEye) | |
import cmd | |
import os | |
import sys | |
from collections import OrderedDict | |
from dateutil.parser import parse as DTparser | |
from tabulate import tabulate as Tabulate | |
from .client import TimesketchApi | |
""" | |
[ ] ability to configure uri/user/pw | |
[ ] total number of events/rows within a single timeline | |
[ ] delete a timeline | |
[ ] delete a view | |
[ ] delete a sketch | |
[ ] filter (explore) for search terms/dates | |
[ ] created/updated for a Sketch (e.g. - list_sketches should display) | |
[x] can't get timeline details if it's not in a sketch ATM ? | |
[ ] ability to sort/filter output (e.g. sort by asc created_at or # of events in a timeline) | |
""" | |
class TimeSketchQuery(cmd.Cmd): | |
def __init__(self, cfg=None): | |
cmd.Cmd.__init__(self) | |
self.prompt = "(tsq) " | |
self.doc_header = "TimesketchQuery commands" | |
self.undoc_header = "Misc. commands" | |
self._ts_uri = u'http://127.0.0.1:5000' #cfg.get('uri') | |
self._ts_user = u'user' #cfg.get('user') | |
self._ts_pw = u'password' #cfg.get('pw') | |
self._api_client = TimesketchApi(self._ts_uri, self._ts_user, self._ts_pw) | |
def preloop(self): | |
intro_text = """This utility leverages the Timesketch API/Client to interact with one's instance in various ways. \nMost commands (show_*|get_*) require a Sketch ID so the best command to start with is `list_sketches` .""" | |
print("\n"+intro_text+"\n") | |
def format_output(self, data, write_vertical=False): | |
if write_vertical: | |
print('='*25) | |
for k, v in data.iteritems(): | |
print('{0:25} : {1}'.format(k, v)) | |
print('='*25+'\n') | |
elif isinstance(data, dict): | |
print(Tabulate( | |
[data], | |
headers="keys", | |
tablefmt="orgtbl", | |
missingval="-", | |
)+'\n') | |
else: | |
print(Tabulate( | |
data, | |
headers="keys", | |
missingval="-", | |
)+'\n') | |
def format_timestamp(self, ts_str): | |
return DTparser(ts_str) | |
def format_arg_error(self, missing_arg): | |
print("[!] Incorrect/Missing {0} provided".format(missing_arg)+"\n") | |
def format_error(self, error_msg): | |
print("[!] {0}".format(error_msg)+"\n") | |
# lame but don't see another way within Cmd unless repetetive code or | |
# expand to argparse and just wanted to test out Cmd here | |
def validate_arg(self, arg): | |
try: | |
int(arg) | |
return True | |
except ValueError: | |
return False | |
def do_add_timeline(self, searchindex): | |
""" | |
NEEDS TO BE FINISHED | |
Adds a Timeline to a Sketch. | |
Args: | |
searchindex = SearchIndex obj | |
""" | |
self._api_client.add_timeline() | |
#https://github.com/google/timesketch/pull/380/files#diff-cfc277cad8b430c4868c66b63ef6ded2R368 | |
def get_all_searchindices(self): | |
""" Convenience function to iterate SearchIndexes, regardless of Sketches """ | |
for search_index in self._api_client.list_searchindices(): | |
yield search_index | |
# To-Do: should verify there are sketches returned with len(sketches) | |
def get_all_sketch_timelines(self): | |
""" Convenience function to iterate Timelines from all Sketches """ | |
sketches = self._api_client.list_sketches() | |
for sketch in sketches: | |
for timeline in sketch.list_timelines(): | |
yield {'sketch_name':sketch.name, 'timeline':timeline} | |
def get_sketch_data(self, sketch_id): | |
""" Convenience function to get an instance of a Sketch object. """ | |
sketch_data = self._api_client.get_sketch(int(sketch_id)) | |
if not sketch_data.data: | |
self.format_arg_error("Sketch ID") | |
return None | |
else: | |
return sketch_data | |
def get_sketch_events_count(self, sketch_id): | |
""" ref: https://github.com/google/timesketch/blob/79019cacb39ad6fb93bd98ad5fbf590eee429f51/timesketch/__init__.py#L144 """ | |
if not sketch_id or not self.validate_arg(sketch_id): | |
self.format_arg_error("Sketch ID") | |
return | |
resource_url = u'sketches/{0}/count'.format(int(sketch_id)) | |
count_data = self._api_client.fetch_resource_data(resource_url) | |
if count_data: | |
return count_data[u'meta'][u'count'] | |
def do_exit(self, line): | |
return True | |
''' | |
def es_index_name_exists(self, es_index_name): | |
response = self._api_client.fetch_resource_data(u'searchindices/') | |
for index in response[u'objects'][0]: | |
if es_index_name == index.get(u'index_name'): | |
return True | |
return False | |
''' | |
def do_create_searchindex(self, searchindex_name): | |
""" | |
Manually creates a SearchIndex. | |
Args: | |
searchindex_name = Name of the searchindex in Timesketch. | |
#es_index_name = Name to use for ElasticSearch Index | |
#public = Boolean indicating if the searchindex should be public. | |
""" | |
if not searchindex_name: | |
self.format_arg_error("SearchIndex Name") | |
return | |
# test to handle duplicate: | |
#es_index_name="987ed0294e864596a39b03ebd7fd25f0" | |
es_index_name=None | |
created_searchindex, was_created = self._api_client.get_or_create_searchindex( | |
searchindex_name=searchindex_name, | |
es_index_name=es_index_name, | |
) | |
if was_created: | |
new_searchindex_dict = OrderedDict() | |
new_searchindex_dict['SearchIndex ID'] = created_searchindex.id | |
new_searchindex_dict['SearchIndex Name'] = created_searchindex.name | |
new_searchindex_dict['ElasticSearch Index Name'] = created_searchindex.index_name | |
self.format_output(new_searchindex_dict) | |
else: | |
self.format_error("ES Index Name '{0}' already exists".format(es_index_name)) | |
#def do_delete_timeline(self, ): | |
#u'/sketches/<int:sketch_id>/timelines/<int:timeline_id>/') | |
#https://github.com/google/timesketch/blob/master/timesketch/api/v1/resources.py#L1147 | |
def do_list_search_templates(self, line): | |
""" | |
Lists all Search Templates. | |
ref: https://github.com/google/timesketch/blob/79019cacb39ad6fb93bd98ad5fbf590eee429f51/timesketch/__init__.py#L130 | |
""" | |
result = [] | |
search_templates = self._api_client.fetch_resource_data(u'searchtemplate') | |
if len(search_templates.get(u'objects')): | |
for search_template in search_templates[u'objects'][0]: | |
search_template_dict = OrderedDict() | |
search_template_dict['Search Template ID'] = search_template.get(u'id') | |
search_template_dict['Search Template Name'] = search_template.get(u'name') | |
result.append(search_template_dict) | |
self.format_output({'Total Search Templates': len(result)} ,write_vertical=True) | |
self.format_output(result) | |
def do_list_sketches(self, line): | |
""" Lists the details for each Sketch said user has permission to.""" | |
result = [] | |
try: | |
sketches = self._api_client.list_sketches() | |
except IndexError as err: | |
self.format_error("No Sketches found") | |
return | |
for sketch in sketches: | |
sketch_dict = OrderedDict() | |
sketch_dict['Sketch ID'] = sketch.id | |
sketch_dict['Sketch Name'] = sketch.name | |
sketch_dict['URL'] = '{0}/{1}'.format(self._ts_uri, sketch.resource_uri) | |
result.append(sketch_dict) | |
self.format_output({'Total Sketches': len(result)} ,write_vertical=True) | |
self.format_output(result) | |
def do_list_search_indices(self, line): | |
""" | |
Lists every SearchIndex said user has permission for, regardless of it they're included in a Sketch | |
Note: Depending on how many Sketches/Timelines you have, | |
this may take a moment. | |
""" | |
result = [] | |
for search_index in self.get_all_searchindices(): | |
search_index_dict = OrderedDict() | |
search_index_dict['SearchIndex ID'] = search_index.id | |
search_index_dict['SearchIndex Name'] = search_index.name | |
search_index_dict['ES Index Name'] = search_index.index_name | |
search_index_dict['Timeline Search Index Name'] = search_index._searchindex_name | |
# doesn't make sense to include this ATM since you can't directly browse to it | |
search_index_dict['URL'] = '{0}/{1}'.format(self._ts_uri, search_index.resource_uri) | |
result.append(search_index_dict) | |
self.format_output({'Total Search Indices': len(result)} ,write_vertical=True) | |
self.format_output(result) | |
def do_list_sketch_timelines(self, line): | |
""" | |
Lists every Timeline said user has permission for. Currently, this only retrieves Timelines which are associated with a Sketch. | |
Note: Depending on how many Sketches/Timelines you have, | |
this may take a moment. Additionally, if a Timeline | |
isn't included in a Sketch, it won't be displayed here. | |
""" | |
# a timeline can be in more than one Sketch so need to unique them by index? | |
result = [] | |
try: | |
for sketch_dict in self.get_all_sketch_timelines(): | |
timeline_dict = OrderedDict() | |
timeline_dict['Timeline ID'] = sketch_dict['timeline'].id | |
timeline_dict['Sketch Name'] = sketch_dict.get('sketch_name') | |
timeline_dict['Timeline Name'] = sketch_dict['timeline'].name | |
timeline_dict['Timeline Index Name'] = sketch_dict['timeline'].index | |
result.append(timeline_dict) | |
self.format_output({'Total Timelines': len(result)} ,write_vertical=True) | |
self.format_output(result) | |
except IndexError as err: | |
self.format_error("No Sketches found") | |
return | |
''' | |
def do_make_timeline_public(self, index_name): | |
""" | |
Change a timeline from private to public | |
Args: | |
index_name = Name to use for ElasticSearch Index | |
""" | |
if not index_name: | |
self.format_arg_error("Timeline Index Name") | |
return | |
''' | |
def do_show_sketch_views(self, sketch_id): | |
""" | |
Lists basic details for each View within a Timesketch Sketch. | |
Args: | |
sketch_id = ID for a Sketch | |
""" | |
if not sketch_id or not self.validate_arg(sketch_id): | |
self.format_arg_error("Sketch ID") | |
return | |
result = [] | |
sketch_data = self.get_sketch_data(sketch_id) | |
if sketch_data: | |
#if you create a timeline, but dont' provide any data the error isn't currently caught | |
try: | |
for view in sketch_data.list_views(): | |
view_dict = OrderedDict() | |
view_dict['Sketch Name'] = sketch_data.name | |
view_dict['View ID'] = view.id | |
view_dict['View Name'] = view.name | |
view_dict['View URL'] = '{0}/{1}'.format(self._ts_uri, view.resource_uri) | |
result.append(view_dict) | |
self.format_output({'Total Views': len(result)} ,write_vertical=True) | |
self.format_output(result) | |
except KeyError as err: | |
self.format_error("No data for the requested Sketch was found") | |
def do_show_sketch_timelines(self, sketch_id): | |
""" | |
Lists all Timelines for a given Sketch. | |
Args: | |
sketch_id = ID for a Sketch | |
""" | |
if not sketch_id or not self.validate_arg(sketch_id): | |
self.format_arg_error("Sketch ID") | |
return | |
result = [] | |
sketch_data = self.get_sketch_data(sketch_id) | |
if sketch_data: | |
#if you create a timeline, but dont' provide any data the error isn't currently caught | |
try: | |
for timeline in sketch_data.list_timelines(): | |
timeline_dict = OrderedDict() | |
timeline_dict['Sketch ID'] = sketch_data.id | |
timeline_dict['Sketch Name'] = sketch_data.name | |
timeline_dict['Timeline ID'] = timeline.id | |
timeline_dict['Timeline Name'] = timeline.name | |
timeline_dict['Timeline Index'] = timeline.index | |
timeline_dict['URL'] = '{0}/{1}'.format(self._ts_uri, timeline.resource_uri) | |
result.append(timeline_dict) | |
self.format_output({'Total Timelines': len(result)} ,write_vertical=True) | |
self.format_output(result) | |
except KeyError as err: | |
self.format_error("No data for the requested Sketch was found") | |
def do_show_sketch_stories(self, sketch_id): | |
""" | |
Lists all Search Templates for a given Sketch. | |
Args: | |
sketch_id = ID for a Sketch | |
ref: https://github.com/google/timesketch/blob/79019cacb39ad6fb93bd98ad5fbf590eee429f51/timesketch/__init__.py#L136 | |
""" | |
if not sketch_id or not self.validate_arg(sketch_id): | |
self.format_arg_error("Sketch ID") | |
return | |
resource_url = u'sketches/{0}/stories/'.format(int(sketch_id)) | |
sketch_stories = self._api_client.fetch_resource_data(resource_url) | |
result = [] | |
#if you create a timeline, but dont' provide any data the error isn't currently caught | |
try: | |
if len(sketch_stories.get(u'objects')): | |
for sketch_story in sketch_stories[u'objects'][0]: | |
sketch_story_dict = OrderedDict() | |
sketch_story_dict['Sketch ID'] = sketch_story[u'sketch'][u'id'] | |
sketch_story_dict['Sketch Name'] = sketch_story[u'sketch'][u'name'] | |
sketch_story_dict['Search Template ID'] = sketch_story.get(u'id') | |
sketch_story_dict['Created'] = self.format_timestamp(sketch_story.get(u'created_at')) | |
sketch_story_dict['Updated'] = self.format_timestamp(sketch_story.get(u'updated_at')) | |
sketch_story_dict['User'] = sketch_story.get(u'user')[u'username'] | |
sketch_story_dict['Search Template Title'] = sketch_story.get(u'title') | |
result.append(sketch_story_dict) | |
self.format_output({'Total Stories': len(result)} ,write_vertical=True) | |
self.format_output(result) | |
except (KeyError,TypeError) as err: | |
self.format_error("No data for the requested Sketch was found") | |
def do_get_search_template_details(self, search_template_id): | |
""" | |
Displays details for a single Search Template said user has permission to. | |
Args: | |
search_template_id = ID for a Search Template | |
""" | |
# Suppose we could search by it's name too - but, | |
# - have to iterate all search templates initially vs. directly going to `seach_template_id` | |
# - may not be unique so display all matches or break on first? | |
if not search_template_id or not self.validate_arg(search_template_id): | |
self.format_arg_error("Search Template ID") | |
return | |
resource_url = u'searchtemplate/{0}'.format(int(search_template_id)) | |
search_template_data = self._api_client.fetch_resource_data(resource_url) | |
if search_template_data: | |
search_template = search_template_data[u'objects'][0] | |
search_template_dict = OrderedDict() | |
search_template_dict['Search Template ID'] = search_template.get(u'id') | |
search_template_dict['Search Template Name'] = search_template.get(u'name') | |
search_template_dict['Created'] = self.format_timestamp(search_template.get(u'created_at')) | |
search_template_dict['Updated'] = self.format_timestamp(search_template.get(u'updated_at')) | |
search_template_dict['User'] = search_template.get(u'user')['username'] | |
search_template_dict['Query DSL'] = search_template.get(u'query_dsl') | |
search_template_dict['Query String'] = search_template.get(u'query_string') | |
search_template_dict['Query Filter'] = search_template.get(u'query_filter') | |
self.format_output(search_template_dict, write_vertical=True) | |
else: | |
self.format_arg_error("Search Template ID") | |
return | |
def do_get_sketch_details(self, sketch_id): | |
""" | |
Displays details for a single Sketch said user has permission to. | |
Args: | |
sketch_id = ID for a Sketch | |
""" | |
if not sketch_id or not self.validate_arg(sketch_id): | |
self.format_arg_error("Sketch ID") | |
return | |
sketch_data = self.get_sketch_data(sketch_id) | |
try: | |
if sketch_data: | |
sketch_dict = OrderedDict() | |
sketch_dict['Sketch ID'] = sketch_data.id | |
sketch_dict['Sketch Name'] = sketch_data.name | |
sketch_dict['Total Events'] = self.get_sketch_events_count(sketch_id) | |
sketch_dict['Total Timelines'] = len(sketch_data.list_timelines()) | |
sketch_dict['Total Views'] = len(sketch_data.list_views()) | |
sketch_dict['Status'] = sketch_data.status | |
sketch_dict['Description'] = sketch_data.description | |
sketch_dict['URL'] = '{0}/{1}'.format(self._ts_uri, sketch_data.resource_uri) | |
self.format_output(sketch_dict) | |
except KeyError as err: | |
self.format_error("No data for the requested Sketch was found") | |
def do_get_search_index_details(self, searchindex_id): | |
""" add notes here """ | |
if not searchindex_id: | |
self.format_arg_error("SearchIndex ID") | |
return | |
searchindex = self.api_client.get_searchindex(searchindex_id) | |
print(dir(searchindex)) | |
def do_get_timeline_details(self, timeline_index_name_or_name): | |
""" | |
Displays details for a Timeline. If multiple timelines | |
are found based on duplicate `timeline_index_name_or_name` | |
then each of the matched timelines are displayed. | |
Args: | |
timeline_index_or_name = Elasticsearch index name for a Timeline or | |
the name (case sensitive) of the Timesketch Timeline. | |
""" | |
if not timeline_index_name_or_name: | |
self.format_arg_error("Timeline ID/Name") | |
return | |
else: | |
timeline_index_name_or_name = u'{0}'.format(timeline_index_name_or_name) | |
results = [] | |
for timeline in self.get_all_timelines(): | |
if timeline.index_name == timeline_index_name_or_name or timeline.name == timeline_index_name_or_name: | |
timeline_data = timeline.data[u'objects'][0] | |
timeline_dict = OrderedDict() | |
timeline_dict['Timeline ID'] = timeline.id | |
timeline_dict['Timeline Name'] = timeline.name | |
timeline_dict['Timeline Index Name'] = timeline.index_name | |
timeline_dict['Created'] = self.format_timestamp(timeline_data.get(u'created_at')) | |
timeline_dict['Updated'] = self.format_timestamp(timeline_data.get(u'updated_at')) | |
timeline_dict['Status'] = timeline_data[u'status'][0][u'status'] | |
timeline_dict['Color'] = timeline_data.get(u'color') | |
timeline_dict['Description'] = timeline_data.get(u'description') | |
# doesn't make sense to include this ATM since you can't directly browse to it | |
#timeline_dict['URL'] = '{0}/{1}'.format(self._ts_uri, timeline.resource_uri) | |
results.append(timeline_dict) | |
if len(results): | |
if len(results) > 1: | |
self.format_output(results) | |
else: | |
self.format_output(results[0], write_vertical=True) | |
else: | |
self.format_arg_error('Timeline ID/Name') | |
def do_get_view_details(self, view_id_or_name): | |
""" | |
Displays details for a View/View Templates said user has permission to. | |
If multiple Views are found based on duplicate `view_id_or_name` | |
then each of the matched views are displayed. | |
Args: | |
view_id_or_name = ID of a View or the name | |
(case sensitive) of the Timesketch Timeline. | |
""" | |
if not view_id_or_name: | |
self.format_arg_error("View ID/Name") | |
return | |
else: | |
view_id_or_name = u'{0}'.format(view_id_or_name) | |
results = [] | |
sketches = self._api_client.list_sketches() | |
for sketch in sketches: | |
for view in sketch.list_views(): | |
if u'{0}'.format(view.id) == view_id_or_name or view.name == view_id_or_name: | |
view_dict = OrderedDict() | |
view_dict['Sketch Name'] = sketch.name | |
view_dict['View ID'] = view.id | |
view_dict['View Name'] = view.name | |
view_dict['Query String'] = view.query_string | |
view_dict['Query Filter'] = view.query_filter | |
view_dict['Query DSL'] = view.query_dsl | |
view_dict['URL'] = '{0}/{1}'.format(self._ts_uri, view.resource_uri) | |
results.append(view_dict) | |
if len(results): | |
#if len(results) > 1: | |
# self.format_output(results) | |
#else: | |
for result in results: | |
self.format_output(results[0], write_vertical=True) | |
else: | |
self.format_arg_error('View ID/Name') | |
#def do_query_sketch(self, sketch_id, query_string=None, query_dsl=None, query_filter=None): | |
""" | |
Args: | |
sketch_id | |
query_string | |
query_dsl | |
query_filter | |
""" | |
# https://github.com/google/timesketch/blob/347-python-api-client/api_client/python/timesketch_api_client/client.py#L283 | |
# args = | |
if __name__ == "__main__": | |
TimeSketchQuery().cmdloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment