Skip to content

Instantly share code, notes, and snippets.

@metadaddy metadaddy/
Last active Jan 18, 2017

What would you like to do?
Write data to Splunk from StreamSets Data Collector
import sys
# Set to wherever the requests package lives on your machine
import requests
import json
# Endpoint for Splunk HTTP Event Collector
url = 'http://localhost:8088/services/collector'
# Splunk metadata fields
metadata = ['time', 'host', 'source', 'sourcetype', 'index']
# Read Splunk token from file and cache in state
if state.get('headers') is None:
state['headers'] = {'Authorization': 'Splunk ${runtime:loadResource('splunkToken', false)}'}
buffer = ''
# Loop through batch, building request payload
for record in records:
# Metadata fields are passed as top level properties
payload = dict((key, record.value[key]) for key in record.value if key in metadata)
# Everything else is passed in the 'event' property
payload['event'] = dict((key, record.value[key]) for key in record.value if key not in metadata)
buffer += json.dumps(payload) + '\n'
# Write record to processor output
except Exception as e:
# Send record to error
error.write(record, str(e))
if len(buffer) > 0:
# Now submit a single request for the entire batch
r =,
# Check for errors from Splunk
if r['code'] != 0:
log.error('Splunk error: {}: {}', r['code'], r['text'])
raise Exception('Splunk API error {0}: {1}'.format(r['code'], r['text']))
# All is good'Splunk API response: {}', r['text'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.