Skip to content

Instantly share code, notes, and snippets.

@rosehgal
Created March 9, 2018 16:16
Show Gist options
  • Save rosehgal/981987912d8196c0b582a4687d7be36d to your computer and use it in GitHub Desktop.
Save rosehgal/981987912d8196c0b582a4687d7be36d to your computer and use it in GitHub Desktop.
import splunklib.client as client
from splunklib.binding import AuthenticationError
import sys
import json
splunk_config = {
"splunk":{
"host": "localhost",
"port": 8089,
"username": "admin",
"password": "changeme"
},
"index": "index_name"
}
class SplunkStore:
def __init__(self):
self._indexes = list()
self._splunk_conf = splunk_config
self._index = self._splunk_conf["index"]
self._connect()
self.create()
#fucntion connects to the splunks and fetches list of all indexex
#accessible to that user and store it in self._indexes
def _connect(self):
try:
self._service = client.connect(**self._splunk_conf["splunk"])
for index in self._service.indexes:
self._indexes.append(index.name)
except AuthenticationError:
print "Unable to authenticate user on Splunk"
print "User successfully Logged in..."
def create(self):
if not self._index in self._indexes:
self._service.indexes.create(self._index)
print "Index created"
else:
print "Index already exists not creating..."
index = self._service.indexes[self._index]
return self._index
#used for submitting the single event to the splunk
def submit(self, data):
print "Submitting single event to the splunk server"
self._service.indexes[self._index].submit(data)
#returns the stream socket to push series of data to splunk
def attach(self, source, sourcetype):
return self._service.indexes[self._index].attach(
source = "nsg.dat",
sourcetype = "_json"
)
#creates a blocking search for the search query
#add it to the list of jobs
def search(self, search_query):
#get the object from the job pool
jobs = self._service.jobs
#dict specifies what kind of search job it has to be
search_kwargs_params = {
"exec_mode": "blocking"
}
job = jobs.create(search_query, **search_kwargs_params)
for result in results.ResultsReader(job.results()):
yield json.dumps(result)
def main():
ss = SplunkStore()
with open("./data/dummy.dat") as f:
content = json.load(f)
splunk_socket = ss.attach(
source = "my_local",
sourcetype = "_json"
)
for json_object in content:
splunk_socket.write(json.dumps(json_object))
for res in ss.search(
'search index="cloudbit" source="nsg.dat"'
):
print res
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment