Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Consuming the Gnipstream with python urllib2
#!/usr/bin/env python
import urllib2
import base64
import zlib
import threading
from threading import Lock
import json
import sys
import ssl
# Tune CHUNKSIZE as needed. The CHUNKSIZE is the size of compressed data read
# For high volume streams, use large chuck sizes, for low volume streams, decrease
# CHUNKSIZE. Minimum practicle is about 1K.
CHUNKSIZE = 4*1024
GNIPKEEPALIVE = 30 # seconds
NEWLINE = '\r\n'
URL = ''
UN = 'UN'
HEADERS = { 'Accept': 'application/json',
'Connection': 'Keep-Alive',
'Accept-Encoding' : 'gzip',
'Authorization' : 'Basic %s' % base64.encodestring('%s:%s' % (UN, PWD)) }
print_lock = Lock()
err_lock = Lock()
class procEntry(threading.Thread):
def __init__(self, buf):
self.buf = buf
def run(self):
for rec in [x.strip() for x in self.buf.split(NEWLINE) if x.strip() <> '']:
jrec = json.loads(rec.strip())
tmp = json.dumps(jrec)
with print_lock:
except ValueError, e:
with err_lock:
sys.stderr.write("Error processing JSON: %s (%s)\n"%(str(e), rec))
def getStream():
req = urllib2.Request(URL, headers=HEADERS)
response = urllib2.urlopen(req, timeout=(1+GNIPKEEPALIVE))
# header - print
decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
remainder = ''
while True:
tmp = decompressor.decompress(
if tmp == '':
[records, remainder] = ''.join([remainder, tmp]).rsplit(NEWLINE,1)
if __name__ == "__main__":
while True:
with err_lock:
sys.stderr.write("Forced disconnect: %s\n"%(str(e)))
except ssl.SSLError, e:
with err_lock:
sys.stderr.write("Connection failed: %s\n"%(str(e)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment