Skip to content

Instantly share code, notes, and snippets.

@akellehe
Last active April 25, 2017 22:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akellehe/39fba6eeb112df66c0f9375d19d200c4 to your computer and use it in GitHub Desktop.
Save akellehe/39fba6eeb112df66c0f9375d19d200c4 to your computer and use it in GitHub Desktop.
A queuereader with a little problem.
import logging
import time
import random
import json
from nsq import Reader, run
from tornado.autoreload import start as autoreload
settings = {
'nsq_channel': 'andrew-kelleher',
'topic': 'memory_leak',
'nsq_lookupd_http_addresses': '',
'max_in_flight': 10
}
logging.basicConfig(
level=logging.DEBUG if settings.get('debug') else logging.INFO)
class MyClass:
def __init__(self, task):
cardinality = task['cardinality']
self.state = [random.random() for i in range(int(cardinality))]
def work_a_while(self):
time.sleep(random.random() * 10)
return sum(self.state)
def message_handler(msg):
"""
Takes message of the form
.. code-block:: json
{
"cardinality": 1000
}
"""
msg.enable_async()
task = json.loads(msg.body)
myclass = MyClass(task)
myclass.work_a_while()
msg.finish()
if __name__ == '__main__':
if settings.get('debug'):
autoreload()
Reader(
channel=settings.get('nsq_channel'),
lookupd_http_addresses=settings.get('nsq_lookupd_http_addresses'),
max_in_flight=settings.get('max_in_flight'),
message_handler=message_handler,
topic='memory_leak',
max_tries=5
)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment