Skip to content

Instantly share code, notes, and snippets.

@metadaddy
Last active January 18, 2017 02:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save metadaddy/a4ce09ac73f3e7cf5745327d3616e603 to your computer and use it in GitHub Desktop.
Save metadaddy/a4ce09ac73f3e7cf5745327d3616e603 to your computer and use it in GitHub Desktop.
Write data to Splunk from StreamSets Data Collector
import sys
# Set to wherever the requests package lives on your machine
sys.path.append('/Library/Python/2.7/site-packages')
import requests
import json
# Endpoint for Splunk HTTP Event Collector
url = 'http://localhost:8088/services/collector'
# Splunk metadata fields
metadata = ['time', 'host', 'source', 'sourcetype', 'index']
# Read Splunk token from file and cache in state
if state.get('headers') is None:
state['headers'] = {'Authorization': 'Splunk ${runtime:loadResource('splunkToken', false)}'}
buffer = ''
# Loop through batch, building request payload
for record in records:
try:
# Metadata fields are passed as top level properties
payload = dict((key, record.value[key]) for key in record.value if key in metadata)
# Everything else is passed in the 'event' property
payload['event'] = dict((key, record.value[key]) for key in record.value if key not in metadata)
buffer += json.dumps(payload) + '\n'
# Write record to processor output
output.write(record)
except Exception as e:
# Send record to error
error.write(record, str(e))
if len(buffer) > 0:
# Now submit a single request for the entire batch
r = requests.post(url,
headers=state['headers'],
data=buffer).json()
# Check for errors from Splunk
if r['code'] != 0:
log.error('Splunk error: {}: {}', r['code'], r['text'])
raise Exception('Splunk API error {0}: {1}'.format(r['code'], r['text']))
# All is good
log.info('Splunk API response: {}', r['text'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment