Created
April 17, 2013 22:27
-
-
Save JustinAzoff/5408286 to your computer and use it in GitHub Desktop.
A sort of functional NSQ python client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import requests | |
import random | |
import json | |
class nsqd: | |
def __init__(self, servers): | |
self.last_fetch = 0 | |
self.servers= ["http://%s/put" % h for h in servers] | |
def send(self, topic, msg): | |
while True: | |
for h in self.servers: | |
try : | |
res = requests.post(h, params={"topic": topic}, data=msg) | |
if not res.error: | |
return res | |
except: | |
#h is dead | |
self.servers.remove(h) | |
self.servers.append(h) | |
print "all servers dead, sleeping" | |
time.sleep(1) | |
class nsqlookupd: | |
def __init__(self, lookupds): | |
self.last_fetch = 0 | |
self.lookupds = lookupds | |
self._servers = [] | |
self.get_servers() | |
def _get_servers(self): | |
for h in self.lookupds: | |
try : | |
nodes = requests.get("http://%s/nodes" % h).json | |
return ['http://%(address)s:%(http_port)d/put' % n for n in nodes['data']['producers']] | |
except Exception, e: | |
pass | |
raise e | |
def get_servers(self): | |
if time.time() - self.last_fetch > 5: | |
self.last_fetch = time.time() | |
try: | |
self._servers = self._get_servers() | |
except Exception,e: | |
print "error updating servers", e | |
random.shuffle(self._servers) | |
return self._servers | |
def send(self, topic, msg): | |
while True: | |
for h in self.get_servers(): | |
try : | |
res = requests.post(h, params={"topic": topic}, data=msg) | |
if not res.error: | |
return res | |
except: | |
pass | |
#h is dead | |
print h, "is dead" | |
self._servers.remove(h) | |
if not self._servers: | |
print "all servers dead, sleeping" | |
time.sleep(1) | |
if __name__ == '__main__': | |
#w = nsqlookupd(["localhost:4161"]) | |
w = nsqd(["localhost:4151"]) | |
for x in xrange(10000): | |
w.send("test", json.dumps("Hello %d" % x)) | |
print 'Sent', x | |
#time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment