Created
March 9, 2018 16:16
-
-
Save rosehgal/981987912d8196c0b582a4687d7be36d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import splunklib.client as client | |
from splunklib.binding import AuthenticationError | |
import sys | |
import json | |
splunk_config = { | |
"splunk":{ | |
"host": "localhost", | |
"port": 8089, | |
"username": "admin", | |
"password": "changeme" | |
}, | |
"index": "index_name" | |
} | |
class SplunkStore: | |
def __init__(self): | |
self._indexes = list() | |
self._splunk_conf = splunk_config | |
self._index = self._splunk_conf["index"] | |
self._connect() | |
self.create() | |
#fucntion connects to the splunks and fetches list of all indexex | |
#accessible to that user and store it in self._indexes | |
def _connect(self): | |
try: | |
self._service = client.connect(**self._splunk_conf["splunk"]) | |
for index in self._service.indexes: | |
self._indexes.append(index.name) | |
except AuthenticationError: | |
print "Unable to authenticate user on Splunk" | |
print "User successfully Logged in..." | |
def create(self): | |
if not self._index in self._indexes: | |
self._service.indexes.create(self._index) | |
print "Index created" | |
else: | |
print "Index already exists not creating..." | |
index = self._service.indexes[self._index] | |
return self._index | |
#used for submitting the single event to the splunk | |
def submit(self, data): | |
print "Submitting single event to the splunk server" | |
self._service.indexes[self._index].submit(data) | |
#returns the stream socket to push series of data to splunk | |
def attach(self, source, sourcetype): | |
return self._service.indexes[self._index].attach( | |
source = "nsg.dat", | |
sourcetype = "_json" | |
) | |
#creates a blocking search for the search query | |
#add it to the list of jobs | |
def search(self, search_query): | |
#get the object from the job pool | |
jobs = self._service.jobs | |
#dict specifies what kind of search job it has to be | |
search_kwargs_params = { | |
"exec_mode": "blocking" | |
} | |
job = jobs.create(search_query, **search_kwargs_params) | |
for result in results.ResultsReader(job.results()): | |
yield json.dumps(result) | |
def main(): | |
ss = SplunkStore() | |
with open("./data/dummy.dat") as f: | |
content = json.load(f) | |
splunk_socket = ss.attach( | |
source = "my_local", | |
sourcetype = "_json" | |
) | |
for json_object in content: | |
splunk_socket.write(json.dumps(json_object)) | |
for res in ss.search( | |
'search index="cloudbit" source="nsg.dat"' | |
): | |
print res | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment