Skip to content

Instantly share code, notes, and snippets.

@karlp
Created September 9, 2013 17:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save karlp/6498892 to your computer and use it in GitHub Desktop.
Save karlp/6498892 to your computer and use it in GitHub Desktop.
hrmm, how to make sure that the publish handler isn't called before I've saved the MID? Is my only safe answer here to use the non-threaded api, and call loop only when I'm actually ready?
#!/usr/bin/env python
from __future__ import division
import collections
import logging
import math
import random
import string
import time
import mosquitto
logging.basicConfig(level=logging.INFO)
class MsgStatus():
"""
Allows recording statistics of a published message.
"""
def __init__(self, mid, real_size):
self.mid = mid
self.size = real_size
self.received = False
self.time_created = time.time()
self.time_received = None
def receive(self):
self.received = True
self.time_received = time.time()
def time_flight(self):
return self.time_received - self.time_created
def __repr__(self):
if self.received:
return ("MSG(%d) OK, flight time: %f seconds" % (self.mid, self.time_flight()))
else:
return ("MSG(%d) INCOMPLETE in flight for %f seconds so far"
% (self.mid, time.time() - self.time_created))
class TrackingSender():
msg_statuses = {}
def __init__(self, host, port, cid):
self.cid = cid
self.log = logging.getLogger(__name__ + ":" + cid)
self.mqttc = mosquitto.Mosquitto(cid)
self.mqttc.on_publish = self.publish_handler
# TODO - you _probably_ want to tweak this
self.mqttc.max_inflight_messages_set(200)
rc = self.mqttc.connect(host, port, 60)
if rc:
raise Exception("Couldn't even connect! ouch! rc=%d" % rc)
# umm, how?
self.mqttc.loop_start()
def publish_handler(self, mosq, userdata, mid):
self.log.debug("Received confirmation of mid %d", mid)
handle = self.msg_statuses.get(mid, None)
# THIS ASSERTION FAILS
assert handle, "Got a reply for mid: %d before we had it in our table?!" % mid
handle.receive()
def run(self, msg_count, msg_size, qos=1):
for i in range(msg_count):
result, mid = self.mqttc.publish("blah/wop", "wopwoeprwer", qos)
assert(result == 0)
# SIMULATING A BUSY SYSTEM, happened in real life with multiprocessing
time.sleep(2)
self.msg_statuses[mid] = MsgStatus(mid, msg_size)
self.log.info("Finished publish %d msgs of %d bytes at qos %d", msg_count, msg_size, qos)
while True:
notdone = [x for x in self.msg_statuses.values() if not x.received]
if len(notdone) > 0:
self.log.info("Waiting for %d messages outstanding", len(notdone))
time.sleep(0.5)
else:
break
if __name__ == "__main__":
ts = TrackingSender("localhost", 1883, "ohdear")
ts.run(10000, 100)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment