Skip to content

Instantly share code, notes, and snippets.

@wallyqs
Created May 24, 2022 20:01
Show Gist options
  • Save wallyqs/544c6f4fd87c3e880c83b86acb081fe8 to your computer and use it in GitHub Desktop.
Save wallyqs/544c6f4fd87c3e880c83b86acb081fe8 to your computer and use it in GitHub Desktop.
js pub bench
import argparse
import asyncio
import sys
import time
from random import randint
import nats
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except:
pass
DEFAULT_FLUSH_TIMEOUT = 30
DEFAULT_NUM_MSGS = 100000
DEFAULT_MSG_SIZE = 16
DEFAULT_BATCH_SIZE = 100
HASH_MODULO = 1000
def show_usage():
message = """
Usage: pub_perf [options]
options:
-n COUNT Messages to send (default: 100000}
-s SIZE Message size (default: 16)
-S SUBJECT Send subject (default: (test)
-b BATCH Batch size (default: (100)
"""
print(message)
def show_usage_and_die():
show_usage()
sys.exit(1)
async def main():
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--count', default=DEFAULT_NUM_MSGS, type=int)
parser.add_argument('-s', '--size', default=DEFAULT_MSG_SIZE, type=int)
parser.add_argument('-S', '--subject', default='test')
parser.add_argument('-b', '--batch', default=DEFAULT_BATCH_SIZE, type=int)
parser.add_argument('--servers', default=[], action='append')
args = parser.parse_args()
data = []
for i in range(0, args.size):
s = "%01x" % randint(0, 15)
data.append(s.encode())
payload = b''.join(data)
servers = args.servers
if len(args.servers) < 1:
servers = ["nats://127.0.0.1:4222"]
# Make sure we're connected to a server first..
try:
nc = await nats.connect(servers, pending_size=1024*1024)
except Exception as e:
sys.stderr.write(f"ERROR: {e}")
show_usage_and_die()
js = nc.jetstream()
await js.add_stream(name=args.subject)
# Start the benchmark
start = time.time()
to_send = args.count
print("Sending {} messages of size {} bytes on [{}]".format(
args.count, args.size, args.subject))
while to_send > 0:
for i in range(0, args.batch):
to_send -= 1
await js.publish(args.subject, payload)
if (to_send % HASH_MODULO) == 0:
sys.stdout.write("#")
sys.stdout.flush()
if to_send == 0:
break
# Minimal pause in between batches sent to server
await asyncio.sleep(0.00001)
# Additional roundtrip with server to try to ensure everything has been sent already.
try:
await nc.flush(DEFAULT_FLUSH_TIMEOUT)
except nats.aio.errors.ErrTimeout:
print(f"Server flush timeout after {DEFAULT_FLUSH_TIMEOUT}")
elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count)/elapsed) / (1024*1024))
print("\nTest completed : {} msgs/sec ({}) MB/sec".format(
args.count/elapsed,
mbytes))
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment