Skip to content

Instantly share code, notes, and snippets.

@JustinAzoff
Created April 17, 2013 22:27
Show Gist options
  • Save JustinAzoff/5408286 to your computer and use it in GitHub Desktop.
Save JustinAzoff/5408286 to your computer and use it in GitHub Desktop.
A sort of functional NSQ python client
import time
import requests
import random
import json
class nsqd:
def __init__(self, servers):
self.last_fetch = 0
self.servers= ["http://%s/put" % h for h in servers]
def send(self, topic, msg):
while True:
for h in self.servers:
try :
res = requests.post(h, params={"topic": topic}, data=msg)
if not res.error:
return res
except:
#h is dead
self.servers.remove(h)
self.servers.append(h)
print "all servers dead, sleeping"
time.sleep(1)
class nsqlookupd:
def __init__(self, lookupds):
self.last_fetch = 0
self.lookupds = lookupds
self._servers = []
self.get_servers()
def _get_servers(self):
for h in self.lookupds:
try :
nodes = requests.get("http://%s/nodes" % h).json
return ['http://%(address)s:%(http_port)d/put' % n for n in nodes['data']['producers']]
except Exception, e:
pass
raise e
def get_servers(self):
if time.time() - self.last_fetch > 5:
self.last_fetch = time.time()
try:
self._servers = self._get_servers()
except Exception,e:
print "error updating servers", e
random.shuffle(self._servers)
return self._servers
def send(self, topic, msg):
while True:
for h in self.get_servers():
try :
res = requests.post(h, params={"topic": topic}, data=msg)
if not res.error:
return res
except:
pass
#h is dead
print h, "is dead"
self._servers.remove(h)
if not self._servers:
print "all servers dead, sleeping"
time.sleep(1)
if __name__ == '__main__':
#w = nsqlookupd(["localhost:4161"])
w = nsqd(["localhost:4151"])
for x in xrange(10000):
w.send("test", json.dumps("Hello %d" % x))
print 'Sent', x
#time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment