Skip to content

Instantly share code, notes, and snippets.

@DrSkippy
Created October 22, 2012 21:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DrSkippy/3934734 to your computer and use it in GitHub Desktop.
Save DrSkippy/3934734 to your computer and use it in GitHub Desktop.
Connecting to Gnip Streams within Python Threads
#!/usr/bin/env python
import time
import threading
from threading import Lock
import base64
import zlib
import urllib2
printloc = Lock()
CHUNKSIZE = 2000
NEWLINE = "\r\n"
class des():
def __init__(self, URL, UN, PWD, NAME):
self.url = URL
self.un = UN
self.pwd = PWD
self.name = NAME
def __repr__(self):
return "%s\n%s\n%s\n%s\n\n"%(self.url,
self.un,
self.pwd,
self.name)
class do(threading.Thread):
def __init__(self,d):
self.desc = d
threading.Thread.__init__(self)
def run(self):
h = { 'Accept': 'application/json',
'Connection': 'Keep-Alive',
'Accept-Encoding' : 'gzip',
'Authorization' : 'Basic %s' % base64.encodestring('%s:%s' % (self.desc.un, self.desc.pwd)) }
req = urllib2.Request(self.desc.url, headers=h)
response = urllib2.urlopen(req, timeout=31)
decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
remainder = ''
while True:
tmp = decompressor.decompress(response.read(CHUNKSIZE))
if tmp == '':
return
[records, remainder] = ''.join([remainder, tmp]).rsplit(NEWLINE,1)
# with printloc:
# print "Thread[%s]: %s"%(self.desc.name, str(records))
username = 'your-user-name'
password = 'password'
ds = [
des( 'https://stream.gnip.com:443/accounts/shendrickson/publishers/twitter/streams/mention/usermention.json',
password,
username,
'user mentions stream'),
des( 'https://stream.gnip.com:443/accounts/shendrickson/publishers/twitter/streams/compliance/prod.json',
username,
password,
'compliance stream')
]
threads = []
for d in ds:
with printloc:
print str(d)
threads.append(do(d))
# Set daemon thread flag so main and threads live and die together
threads[-1].daemon = True
threads[-1].start()
countdown = 20
print "CTRL-C at any time, then ps aux | grep streamingToThreads to verify process dead"
for x in range(countdown):
time.sleep(3)
for t in threads:
with printloc:
print str(t), str(t.isAlive())
with printloc:
print x-countdown
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment