Skip to content

Instantly share code, notes, and snippets.

@chmduquesne
Last active December 16, 2015 12:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chmduquesne/5d730dcf297d064abde5 to your computer and use it in GitHub Desktop.
Save chmduquesne/5d730dcf297d064abde5 to your computer and use it in GitHub Desktop.
Testing the read ahead feature of mq in python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pymqi
import pymqi.CMQC as CMQC
import time
import sys
import os
from os import getenv
import signal
import logging
import logging.handlers
import math
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)
handler_stderr = logging.StreamHandler()
logger.addHandler(handler_stderr)
log = logger.info
class Stats:
"""
Helper class for showing statistics.
Usage:
>>> s = Stats()
>>> s.update("queue", 2048)
"""
def __init__(self):
self.last_shown = int(time.time())
self.show_every = 5
self.infos = {}
self.total_size = 0.0
def size_fmt(self, nbytes, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(nbytes) < 1024.0:
return "%3.1f %s%s" % (nbytes, unit, suffix)
nbytes /= 1024.0
return "%.1f%s%s" % (nbytes, 'Yi', suffix)
def update(self, topic, size):
if topic not in self.infos:
self.infos[topic] = {"messages": 0, "size": 0}
self.infos[topic]["messages"] += 1
self.infos[topic]["size"] += size
t = int(time.time())
seconds_since_last_shown = t - self.last_shown
if seconds_since_last_shown >= self.show_every:
self.show(seconds_since_last_shown)
self.last_shown = t
def summary(self, duration, messages, size):
tps = float(messages) / duration
throughput = self.size_fmt(float(size) / duration)
avg_size = self.size_fmt(float(size) / messages)
return "%.1f tps, %s/s, %s/msg" % (tps, throughput, avg_size)
def show(self, duration):
for topic, infos in self.infos.items():
log("%s: Got %d messages (%s)" % (topic, infos["messages"],
self.summary(duration, infos["messages"], infos["size"])))
self.infos = {}
def main():
log("Starting.")
QMGR_HOST = getenv("QMGR_HOST")
QMGR_NAME = getenv("QMGR_NAME")
QMGR_CHANNEL = getenv("QMGR_CHANNEL")
SSL_CIPHER_SPEC = getenv("SSL_CIPHER_SPEC")
SSL_KEY_REPO = getenv("SSL_KEY_REPO")
queue_name = getenv("QUEUE_NAME")
read_ahead = bool(getenv("READ_AHEAD"))
cd = pymqi.CD()
cd.ChannelName = QMGR_CHANNEL
cd.ConnectionName = QMGR_HOST
cd.ChannelType = CMQC.MQCHT_CLNTCONN
cd.TransportType = CMQC.MQXPT_TCP
cd.SSLCipherSpec = SSL_CIPHER_SPEC
sco = pymqi.SCO()
sco.KeyRepository = SSL_KEY_REPO
qmgr = pymqi.QueueManager(None)
qmgr.connect_with_options(QMGR_NAME, cd, sco)
od = pymqi.OD()
od.ObjectName = queue_name
if read_ahead:
log("Using read_ahead")
queue = pymqi.Queue(
qmgr,
od,
CMQC.MQOO_INPUT_AS_Q_DEF |
CMQC.MQOO_READ_AHEAD
)
else:
log("Not using read_ahead")
queue = pymqi.Queue(
qmgr,
od,
CMQC.MQOO_INPUT_AS_Q_DEF
)
stats = Stats()
n = 0
try:
while True:
md = pymqi.MD()
gmo = pymqi.GMO()
gmo.Options = CMQC.MQGMO_WAIT | \
CMQC.MQGMO_FAIL_IF_QUIESCING
message = queue.get(None, md, gmo)
stats.update(queue_name, len(message))
n+=1
except pymqi.MQMIError as e:
if e.reason != 2033: # Empty queue
raise
finally:
log("Got a total of %d messages" % n)
qmgr.disconnect()
if __name__ == "__main__":
main()
@chmduquesne
Copy link
Author

I launch this with no argument.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment