Skip to content

Instantly share code, notes, and snippets.

@cevatkerim
Forked from mjallday/README.md
Created March 23, 2016 22:12
Show Gist options
  • Save cevatkerim/af0031020101c6114551 to your computer and use it in GitHub Desktop.
Save cevatkerim/af0031020101c6114551 to your computer and use it in GitHub Desktop.
SQS Latency Measuring

SQS Latency testing

Run this from an ec2 instance in the us-west-1 region.

It will create two queues and feed messages into the first queue with a timestamp, this message will then be read and the difference between the message timestamp and the current time computed and pushed into a response queue. Reading these times will give you the latency between publishing to a queue and receiving the message.

import bisect
import boto
import boto.sqs
from boto.sqs.message import RawMessage
import datetime
def median(lst, lstLen):
index = (lstLen - 1) // 2
if (lstLen % 2):
return lst[index]
return (lst[index] + lst[index + 1]) / 2.0
def main():
conn = boto.sqs.connect_to_region('us-west-1')
response_queue = conn.create_queue('response_queue')
response_queue.set_message_class(RawMessage)
response_times = []
item_count = 0
last = datetime.datetime.utcnow()
while True:
for message in response_queue.get_messages():
response_time = message.get_body()
response_time = float(response_time.split(':')[-1])
bisect.insort_left(response_times, response_time)
item_count += 1
message.delete()
if (last - datetime.datetime.utcnow()).total_seconds >= 1 and item_count:
last = datetime.datetime.utcnow()
print sum(response_times) / item_count, median(response_times, item_count), item_count, max(response_times), min(response_times)
if __name__ == '__main__':
main()
import boto
import boto.sqs
from boto.sqs.message import RawMessage
import datetime
def main():
conn = boto.sqs.connect_to_region('us-west-1')
request_queue = conn.create_queue('request_queue')
response_queue = conn.create_queue('response_queue')
request_queue.set_message_class(RawMessage)
response_queue.set_message_class(RawMessage)
for i in xrange(10000):
message_body = str(datetime.datetime.utcnow().isoformat())
request_message = RawMessage()
request_message.set_body(message_body)
request_queue.write(request_message)
print '.', message_body
if __name__ == '__main__':
main()
import boto
import boto.sqs
import datetime
import multiprocessing
from boto.sqs.message import RawMessage
def consume():
conn = boto.sqs.connect_to_region('us-west-1')
request_queue = conn.create_queue('request_queue')
response_queue = conn.create_queue('response_queue')
request_queue.set_message_class(RawMessage)
response_queue.set_message_class(RawMessage)
while True:
messages = request_queue.get_messages()
for message in messages:
start = datetime.datetime.strptime(message.get_body(), '%Y-%m-%dT%H:%M:%S.%f')
end = datetime.datetime.utcnow()
response_body = str(end - start)
response_message = RawMessage()
response_message.set_body(response_body)
response_queue.write(response_message)
message.delete()
print 'responded', response_body,
print '.'
def main():
processes = [
multiprocessing.Process(target=consume)
for _ in xrange(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment