Skip to content

Instantly share code, notes, and snippets.

@bicubic
Created December 30, 2015 03:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bicubic/c0d79bd7b85c52580345 to your computer and use it in GitHub Desktop.
Save bicubic/c0d79bd7b85c52580345 to your computer and use it in GitHub Desktop.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<h1>Splunk Magic</h1>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<table class=\"nb_heading\">\n",
"<tr>\n",
"<td>\n",
" Project: Savvi @ NBN<br/>\n",
" Written: 12/12/2015<br/>\n",
" Author: Serge Rogov<br/>\n",
" Security: <span class=\"nb_sec nb_public\">Public</span><br/>\n",
"</td>\n",
"<td style=\"width: 50%\">\n",
" \n",
" <div class=\"savvi-logo\" style=\"height: 4em; background-position: right 0px\"></div>\n",
"</td>\n",
"</tr>\n",
"</table>"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Populating the interactive namespace from numpy and matplotlib\n"
]
}
],
"source": [
"%pylab inline"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/html": [
"<style>\n",
".nb_heading{\n",
"width: 100%;\n",
"border: 0 !important;\n",
"text-align: left;\n",
"}\n",
"\n",
".nb_heading tr{\n",
"border: none;\n",
"margin: 0;\n",
"padding: 0;\n",
"}\n",
"\n",
".nb_heading td{\n",
"border: none;\n",
"margin: 0;\n",
"padding: 0;\n",
"}\n",
"\n",
".nb_sec{\n",
"border-radius: 0.2em;\n",
"padding: 0.2em;\n",
"}\n",
"\n",
".nb_internal{\n",
"background-color: red;\n",
"color: white;\n",
"}\n",
"\n",
".nb_confidential{\n",
"background-color: red;\n",
"color: white;\n",
"}\n",
"\n",
".nb_public{\n",
"background-color: hsl(111, 87%, 55%);\n",
"}\n",
"\n",
" \n",
".nb_message{\n",
" background-color: rgba(0, 24, 0, 0.05);\n",
" border-left: 0.3em solid gray;\n",
" border-radius: 0.15em;\n",
" padding: 0.2em;\n",
" padding-left: 0.3em;\n",
" margin-bottom: 0.2em;\n",
"}\n",
"\n",
".nb_message.nb_error{\n",
" background-color: rgba(255, 24, 77, 0.1);\n",
" border-left: 0.3em solid red;\n",
"}\n",
"\n",
".nb_message.nb_warning{\n",
" background-color: rgba(255, 184, 24, 0.2);\n",
" border-left: 0.3em solid orange !important;\n",
"}\n",
" \n",
".savvi-logo {\n",
" background: url()\n",
" no-repeat\n",
" left center;\n",
" background-size: contain;\n",
"} \n",
"\n",
"</style>"
],
"text/plain": [
"<IPython.core.display.HTML object>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import pandas as pd\n",
"import time\n",
"import io\n",
"import httplib2\n",
"from IPython.display import display, HTML \n",
"import splunklib.results as results\n",
"import splunklib.client\n",
"import json\n",
"import IPython.display\n",
"from IPython.core.magic import (register_line_magic, \n",
" register_cell_magic)\n",
"import qgrid2 as qgrid\n",
"\n",
"\n",
"plt.style.use('ggplot')\n",
"with open('custom_html.html', 'r') as f:\n",
" custom_html = f.read()\n",
"display(HTML(custom_html))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from splunk_nb import *\n",
"if sys.version_info[0] < 3:\n",
" from StringIO import StringIO\n",
"else:\n",
" from io import StringIO"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"with open('splunk-auth-yong', 'r') as f:\n",
" #TODO SR: encrypt splunk auth? \n",
" splunk_auth = json.loads(f.read())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"service = splunklib.client.connect(autologin=True, **splunk_auth) #app=\"apm_snpm\","
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def replace_splunk_time(df):\n",
" '''Converts `nb_epoch` into `_time` as datetime64\n",
" `nb_epoch=_time` must be supplied by the query\n",
"\n",
" '''\n",
" if ('nb_epoch' not in df.columns):\n",
" return\n",
" \n",
" df['_time'] = pd.to_numeric(df['nb_epoch'], errors='coerce').astype('datetime64[s]')\n",
" df.drop('nb_epoch', 1, inplace=True)\n",
"\n",
"class SplunkQuery:\n",
" job = None\n",
" search_string = None\n",
" def __init__(self, search_string):\n",
" if not search_string.startswith('search '):\n",
" search_string = 'search ' + search_string + ' | eval nb_epoch=_time | fields - _time'\n",
" self.search_string = search_string\n",
" \n",
" def _dispatch_query(self):\n",
" self.job = service.jobs.create(self.search_string, **{\"exec_mode\": \"normal\", \n",
" \"earliest_time\": '-10y', \n",
" \"latest_time\": '-0min',\n",
" \"output_mode\": \"csv\",\n",
" \"preview\": True,\n",
" \"maxEvents\": 0})\n",
" \n",
" def _await(self):\n",
" while True:\n",
" self.job.refresh()\n",
" if self.job[\"isDone\"] == \"1\":\n",
" break\n",
" time.sleep(1)\n",
" \n",
" def _report_progress(self):\n",
" #publish progress (stdout, NB)\n",
" pass\n",
" \n",
" def _await_with_progress(self):\n",
" print \"waiting\"\n",
" while True:\n",
" self.job.refresh()\n",
" if self.job[\"isDone\"] == \"1\":\n",
" break\n",
" time.sleep(1)\n",
" print \"done\"\n",
" \n",
" def _df_postprocess(self, df):\n",
" replace_splunk_time(df)\n",
" return df\n",
" \n",
" def _get_results_legacy(self):\n",
" \"\"\"Fetches one page of results using the offically recommended\n",
" approach. This method is SLOW.\n",
" \n",
" Args:\n",
" offset: start offset\n",
" count: number of results to return\n",
" \"\"\"\n",
" job = self.job\n",
" resultCount = job[\"resultCount\"] # Number of results this job returned\n",
" offset = 0; # Start at result 0\n",
" count = 100; # Get sets of 10 results at a time\n",
" items = []\n",
" \n",
" while (offset < int(resultCount)):\n",
" kwargs_paginate = {\"count\": count,\n",
" \"offset\": offset}\n",
" # Get the search results and display them\n",
" blocksearch_results = job.preview(output_mode=\"csv\", **kwargs_paginate)\n",
"\n",
" for result in results.ResultsReader(blocksearch_results):\n",
" items.append(result)\n",
" offset += count\n",
" df = pd.DataFrame(items)\n",
" df = self._df_postprocess(df)\n",
" return df\n",
" \n",
" def get_preview(self):\n",
" self.job.refresh()\n",
" buf = StringIO()\n",
" job = self.job\n",
" self.buf=buf\n",
" \n",
" if (self.job['dispatchState']=='PARSING'):\n",
" return None #haven't received a resultPreviewCount yet TODO: backport to other cases\n",
" resultCount = int(self.job['resultPreviewCount'])\n",
" if (resultCount==0):\n",
" return None #no preview yet\n",
" \n",
" offset = 0\n",
" page_count = 1000\n",
" \n",
" while (offset < resultCount):\n",
" kwargs_paginate = {\"count\": page_count,\n",
" \"offset\": offset}\n",
" \n",
" searchresults = job.preview(output_mode=\"csv\", **kwargs_paginate).read() \n",
" \n",
" #suppress the CSV header on pages other than the first\n",
" if (offset == 0):\n",
" buf.write(searchresults)\n",
" else:\n",
" buf.write(searchresults[searchresults.find('\\n'):])\n",
" offset+=page_count\n",
" \n",
" buf.seek(0)\n",
" \n",
" df = pd.read_csv(buf)\n",
" df = self._df_postprocess(df)\n",
" return df\n",
" \n",
" def display_messages(self):\n",
" classes = {'info': 'nb_info', 'fatal': 'nb_error', 'error': 'nb_error'}\n",
" html = ''\n",
" for k, v in self.job.messages.iteritems():\n",
" line = '<div class=\"nb_message {classname}\">{message}</div>'.format(classname=classes[k], message=v[0])\n",
" html = html + line\n",
" display(HTML(html))\n",
" \n",
" def _get_results_page_fast(self, offset, count):\n",
" \"\"\"Fetches one page of results\n",
" \n",
" Args:\n",
" offset: start offset\n",
" count: number of results to return\n",
" \"\"\"\n",
" #TODO: uses `service` which is in global scope. Refactor\n",
" buf = StringIO()\n",
" self.buf=buf\n",
" sid = self.job['sid']\n",
" myhttp = httplib2.Http(disable_ssl_certificate_validation=True)\n",
" myhttp.add_credentials(service.username, service.password)\n",
" url = '/services/search/jobs/{0}/results?output_mode=csv&&offset={1}&count={2}'.format(sid, offset, count)\n",
" baseurl = str(service.authority)\n",
" searchresults = myhttp.request(baseurl + url, 'GET')[1] \n",
" buf.write(searchresults)\n",
" \n",
" buf.seek(0)\n",
" df = pd.read_csv(buf)\n",
" df = self._df_postprocess(df)\n",
" return df\n",
" \n",
" def _get_results_fast_full(self, page_count=50000):\n",
" \"\"\"Fetches entire result set quickly\n",
" \n",
" Args:\n",
" offset: start offset\n",
" page_count: maximum number of results per page (default splunk limit is 50k)\n",
" \n",
" Notes:\n",
" Not sure which Splunk setting dictates maximum number of\n",
" results returned (page count). Ideally should identify it\n",
" and dynamically read via SDK\n",
" \"\"\"\n",
" #TODO: uses `service` which is in global scope. Refactor\n",
" buf = StringIO()\n",
" self.buf=buf\n",
" sid = self.job['sid']\n",
" resultCount = int(self.job['resultCount'])\n",
" \n",
" myhttp = httplib2.Http(disable_ssl_certificate_validation=True)\n",
" myhttp.add_credentials(service.username, service.password)\n",
" \n",
" offset = 0\n",
" \n",
" while (offset < resultCount):\n",
" url = '/services/search/jobs/{0}/results?output_mode=csv&&offset={1}&count={2}'.format(\n",
" sid, offset, page_count)\n",
" \n",
" baseurl = str(service.authority)\n",
" searchresults = myhttp.request(baseurl + url, 'GET')[1] \n",
" \n",
" #suppress the CSV header on pages other than the first\n",
" if (offset == 0):\n",
" buf.write(searchresults)\n",
" else:\n",
" buf.write(searchresults[searchresults.find('\\n'):])\n",
" offset+=page_count\n",
" \n",
" buf.seek(0)\n",
" df = pd.read_csv(buf)\n",
" df = self._df_postprocess(df)\n",
" return df\n",
" \n",
" \n",
" \n",
" def execute(self, **kwargs):\n",
" \"\"\"Executes the query\n",
" \n",
" Args:\n",
" TODO: add args\n",
" \"\"\"\n",
" self._dispatch_query()\n",
" #self._await_with_progress()\n",
" return\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def run_blocking(query):\n",
" global last_job\n",
" x = SplunkQuery(query)\n",
" last_job = x\n",
" x.execute()\n",
" x._await()\n",
" return x._get_results_fast_full()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def preview_kernel(df):\n",
" chart=df.set_index('_time')['count'].astype('float')\n",
" plt.gca().cla() \n",
" chart.plot()\n",
" IPython.display.clear_output(wait=True)\n",
" IPython.display.display(plt.gcf()) "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"def run_preview(query):\n",
" global last_job\n",
" x = SplunkQuery(query)\n",
" last_job = x\n",
" x.execute()\n",
" while(x.job.is_done() == False):\n",
" \n",
" d = x.get_preview()\n",
" if (d is None):\n",
" IPython.display.clear_output(wait=True)\n",
" print \"waiting\"\n",
" sys.stdout.flush()\n",
" continue\n",
" \n",
" preview_kernel(d)\n",
" \n",
" time.sleep(1.0)\n",
" \n",
" IPython.display.clear_output(wait=True)\n",
" x.display_messages()\n",
" print \"Done!\"\n",
" return x"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"def run_preview(query):\n",
" global last_job\n",
" x = SplunkQuery(query)\n",
" last_job = x\n",
" first_results = True\n",
" x.execute()\n",
" while(x.job.is_done() == False):\n",
" \n",
" d = x.get_preview()\n",
" if (d is None):\n",
" IPython.display.clear_output(wait=True)\n",
" print x.job['dispatchState']\n",
" sys.stdout.flush()\n",
" time.sleep(1.0)\n",
" continue\n",
" else:\n",
" if (first_results):\n",
" grid = qgrid.QGridWidget(df=d)\n",
" display(grid)\n",
" first_results = False\n",
" \n",
" grid.df = d\n",
" \n",
" #preview_kernel(d)\n",
" \n",
" time.sleep(1.0)\n",
" \n",
" IPython.display.clear_output(wait=False)\n",
" x.display_messages()\n",
" print \"Done!\"\n",
" return x\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"@register_cell_magic\n",
"def splunk(line, cell):\n",
" query = cell\n",
" if('preview=True' in line):\n",
" run_preview(query)\n",
" else:\n",
" return run_blocking(query)\n",
" \n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"x = run_preview('source=\"megadump_60.tgz:*\" earliest=\"11/30/2015:20:00:00\" | timechart span=4h avg(max_latency) as count')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>\n",
"<br/>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%splunk preview=True\n",
"source=\"megadump_60.tgz:*\" earliest=\"12/04/2015:20:00:00\" | timechart span=4h avg(max_latency) as count"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"df=last_job._get_results_fast_full()\n",
"df.set_index('_time')['count'].plot()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.11"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment