Skip to content

Instantly share code, notes, and snippets.

@itay
Created September 13, 2012 00:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itay/3711055 to your computer and use it in GitHub Desktop.
Save itay/3711055 to your computer and use it in GitHub Desktop.
Twitter Election Streaming
#!/bin/bash
$SPLUNK_HOME/etc/apps/social/bin/httpstream.py --username="<your_twitter_username>" --password=<password> --https --host="stream.twitter.com" --path="/1.1/statuses/filter.json?track=election,elect,politics,political,politician,Obama,president,Romney,Republican,Democrat,Democratic,Senate,economy,vote,gov,dem,rep,teaparty,tea party,progressive,liberal,conservative,voting,student loans,jobs,Rock the Vote,Congress,economy,healthcare,obamacare,parents insurance,taxes,debt,immigration,education,convention,DNC,RNC,debate,voter ID,voter suppression,voter fraud,abortion,Civil Rights,Foreign Policy,Animal Rights,Same Sex Marriage,Energy,2nd Amendment Rights,Drug Laws,legalization"
PID="$!"
trap "kill $PID" exit INT TERM HUP
wait
#!/usr/bin/env python
#
# Copyright 2011 Splunk, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"): you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, sofaare
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
from getpass import getpass
import httplib
import json
import sys
import time
import traceback
import optparse
import zlib
verbose = 1
MAX_RECONNECTIONS = 10
INITIAL_DELAY = 3
BACKOFF_POWER = 2
def retry(ExceptionToCheck, tries=MAX_RECONNECTIONS, delay=INITIAL_DELAY, backoff=BACKOFF_POWER):
"""Retry decorator
original from http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
"""
def deco_retry(f):
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay
try_one_last_time = True
while mtries > 1:
try:
return f(*args, **kwargs)
try_one_last_time = False
break
except ExceptionToCheck, e:
sys.stderr.write("%s, Retrying in %d seconds..." % (str(e), mdelay))
time.sleep(mdelay)
mtries -= 1
mdelay *= backoff
sys.stderr.flush()
if try_one_last_time:
return f(*args, **kwargs)
return
return f_retry # true decorator
return deco_retry
class StreamingHttp:
def __init__(self, username, password, host, path, use_https):
self.buffer = ""
self.username = username
self.password = password
self.host = host
self.path = path
self.use_https = use_https
def connect(self):
# Login using basic auth
login = "%s:%s" % (self.username, self.password)
token = "Basic " + str.strip(base64.encodestring(login))
headers = {
'Content-Length': "0",
'Authorization': token,
'Host': self.host,
'User-Agent': "splunk_streaming_http.py/0.1",
'Accept': "*/*",
'Accept-Encoding': '*,gzip'
}
connection = None
self.path = self.path.replace(" ", "%20")
if self.use_https:
connection = httplib.HTTPSConnection(self.host)
else:
connection = httplib.HTTPConnection(self.host)
connection.request("GET", self.path, "", headers)
response = connection.getresponse()
if response.status != 200:
raise Exception, "HTTP Error %d (%s)" % (
response.status, response.reason)
return response
RULES = {
'fusername': {
'flags': ["--http:username"],
'help': "HTTP stream username",
},
'fpassword': {
'flags': ["--http:password"],
'help': "HTTP stream password",
},
'verbose': {
'flags': ["--verbose"],
'default': 1,
'type': "int",
'help': "Verbosity level (0-3, default 0)",
}
}
CHUNK_SIZE = 10 * 1024
DEFAULT_TERMINATOR = "\r\n"
def cmdline():
parser = optparse.OptionParser()
parser.add_option("", "--https", action="store_true", dest="use_https")
parser.add_option("-u", "--username", action="store", type="string", dest="username")
parser.add_option("-p", "--password", action="store", type="string", dest="password")
parser.add_option("", "--host", action="store", type="string", dest="host")
parser.add_option("", "--path", action="store", type="string", dest="path")
parser.add_option("", "--chunk", action="store", type="int", dest="chunk", default=CHUNK_SIZE)
parser.add_option("", "--terminator", action="store", type="string", dest="terminator", default=DEFAULT_TERMINATOR)
(opts, args) = parser.parse_args(sys.argv[1:])
kwargs = vars(opts)
kwargs['terminator'] = kwargs['terminator'].decode("string-escape")
# Prompt for HTTP username/password/host/path if not provided on command line
if not kwargs.has_key('username') or not kwargs['username']:
kwargs['username'] = raw_input("HTTP stream username: ")
if not kwargs.has_key('password') or not kwargs['password']:
kwargs['password'] = getpass("HTTP stream password: ")
if not kwargs.has_key('host') or not kwargs['host']:
kwargs['host'] = raw_input("HTTP stream host: ")
if not kwargs.has_key('path') or not kwargs['path']:
kwargs['path'] = raw_input("HTTP stream path: ")
return kwargs
MAX_TRIES = 100
def listen(username, password, host, path, use_https, terminator, chunk_size):
streaming_http = StreamingHttp(username, password, host, path, use_https)
stream = streaming_http.connect()
is_gzip = False
if stream.getheader("content-encoding", "") == "gzip":
is_gzip = True
zlib_mode = 16 + zlib.MAX_WBITS
decompressor = zlib.decompressobj(zlib_mode)
buffer = ""
tries = 0
while True and tries < MAX_TRIES:
# Read a chunk
data = stream.read(chunk_size)
if is_gzip:
decompressed_data = decompressor.decompress(data)
decompressed_data += decompressor.decompress(bytes())
buffer = decompressed_data
else:
buffer = data
sys.stdout.write(buffer);
# Make sure we're actually making forward progress
if len(buffer) == 0:
tries += 1
elif len(buffer) > 0:
tries = 0
buffer = ""
if tries == MAX_TRIES:
stream.close()
raise Exception("Reached maximum read attempts")
def process(json):
print json
# We encode all the logic for starting up the HTTP connection,
# together with the actual processing
# logic into one function. This allows us to decorate it with
# the ability to keep retrying if an exception happens, using
# exponential backoff.
@retry(Exception)
def start(username, password, host, path, use_https, terminator, chunk_size):
if verbose > 0:
pass
#print "Listening.."
try:
listen(username, password, host, path, use_https, terminator, chunk_size)
except KeyboardInterrupt:
pass
except Exception as e:
traceback.print_exc(file=sys.stderr)
sys.stderr.flush()
raise Exception()
def main():
kwargs = cmdline()
start( username = kwargs['username'], password = kwargs['password'],
host = kwargs['host'], path = kwargs['path'],
use_https = kwargs['use_https'],
terminator = kwargs['terminator'], chunk_size = kwargs['chunk'])
if __name__ == "__main__":
main()
[election]
coldPath = $SPLUNK_DB/election/colddb
homePath = $SPLUNK_DB/election/db
thawedPath = $SPLUNK_DB/election/thaweddb
[script://$SPLUNK_HOME/etc/apps/social/bin/election.sh]
disabled = 1
index = election
interval = 0
source = election_httpstream
sourcetype = election
[election]
DATETIME_CONFIG = CURRENT
NO_BINARY_CHECK = 1
SHOULD_LINEMERGE = false
pulldown_type = 1
@itay
Copy link
Author

itay commented Sep 13, 2012

To install this input, just create an app called social and put the conf files in the default directory of the app, and the httpstream.py and election.sh files in the bin directory.

You can then enable the input from the manager.

This requires at least Splunk 4.3.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment