Write data to Splunk from StreamSets Data Collector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
# Set to wherever the requests package lives on your machine | |
sys.path.append('/Library/Python/2.7/site-packages') | |
import requests | |
import json | |
# Endpoint for Splunk HTTP Event Collector | |
url = 'http://localhost:8088/services/collector' | |
# Splunk metadata fields | |
metadata = ['time', 'host', 'source', 'sourcetype', 'index'] | |
# Read Splunk token from file and cache in state | |
if state.get('headers') is None: | |
state['headers'] = {'Authorization': 'Splunk ${runtime:loadResource('splunkToken', false)}'} | |
buffer = '' | |
# Loop through batch, building request payload | |
for record in records: | |
try: | |
# Metadata fields are passed as top level properties | |
payload = dict((key, record.value[key]) for key in record.value if key in metadata) | |
# Everything else is passed in the 'event' property | |
payload['event'] = dict((key, record.value[key]) for key in record.value if key not in metadata) | |
buffer += json.dumps(payload) + '\n' | |
# Write record to processor output | |
output.write(record) | |
except Exception as e: | |
# Send record to error | |
error.write(record, str(e)) | |
if len(buffer) > 0: | |
# Now submit a single request for the entire batch | |
r = requests.post(url, | |
headers=state['headers'], | |
data=buffer).json() | |
# Check for errors from Splunk | |
if r['code'] != 0: | |
log.error('Splunk error: {}: {}', r['code'], r['text']) | |
raise Exception('Splunk API error {0}: {1}'.format(r['code'], r['text'])) | |
# All is good | |
log.info('Splunk API response: {}', r['text']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment