Skip to content

Instantly share code, notes, and snippets.

@benwiz
Last active May 5, 2023 06:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benwiz/021ddbbd854651bffed1cc35245a7dd3 to your computer and use it in GitHub Desktop.
Save benwiz/021ddbbd854651bffed1cc35245a7dd3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import pika
import time
import threading
import random
import json
import sys
# NOTE pika is not threadsafe, so any newly created threads should create a new connection within their own scopes
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
chan = conn.channel()
q = 'test.topic'
chan.queue_declare(queue=q)
def get_type():
return random.choice(["fast", "blocking", "nonblocking"])
def publish():
print('start publish thread')
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
chan = conn.channel()
i = 0
while True:
b = json.dumps({'type': get_type(),
'message': "hello world " + str(i)})
chan.basic_publish(exchange='',
routing_key=q,
body=b)
print('published %r' % b)
if i < sys.maxsize:
i += 1
else:
i = 0
time.sleep(1)
def fast_task(b):
return {'response': "bye world " + b['message'].split().pop()}
def blocking_task(b):
time.sleep(4)
return {'response': "see ya later alligator " + b['message'].split().pop()}
def message_handler(ch, method, props, body):
b = json.loads(body)
print('received %r' % b)
match b['type']:
case "fast":
fast_task(b)
case "blocking":
blocking_task(b)
case "nonblocking":
threading.Thread(target=blocking_task, args=[b]).start()
def subscribe():
print('start subscribe thread')
conn = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
chan = conn.channel()
chan.basic_consume(queue=q,
auto_ack=True,
on_message_callback=message_handler)
chan.start_consuming()
sub = threading.Thread(target=subscribe)
pub = threading.Thread(target=publish)
if False:
sub.start()
pub.start()
apturl==0.5.2
bcrypt==3.2.0
blinker==1.4
Brlapi==0.8.3
certifi==2020.6.20
chardet==4.0.0
chrome-gnome-shell==0.0.0
click==8.0.3
colorama==0.4.4
command-not-found==0.3
cryptography==3.4.8
cupshelpers==1.0
dbus-python==1.2.18
defer==1.0.6
distro==1.7.0
distro-info===1.1build1
docstring-to-markdown==0.12
duplicity==0.8.21
fasteners==0.14.1
future==0.18.2
httplib2==0.20.2
idna==3.3
importlib-metadata==4.6.4
jedi==0.18.2
jeepney==0.7.1
keyring==23.5.0
language-selector==0.1
launchpadlib==1.10.16
lazr.restfulclient==0.14.4
lazr.uri==1.0.6
lockfile==0.12.2
louis==3.20.0
macaroonbakery==1.3.1
monotonic==1.6
more-itertools==8.10.0
netifaces==0.11.0
oauthlib==3.2.0
olefile==0.46
packaging==23.1
paramiko==2.9.3
parso==0.8.3
pexpect==4.8.0
pika==1.3.1
Pillow==9.0.1
platformdirs==3.5.0
pluggy==1.0.0
protobuf==3.12.4
ptyprocess==0.7.0
pycairo==1.20.1
pycups==2.0.1
PyGObject==3.42.1
PyJWT==2.3.0
pylsp-rope==0.1.11
pymacaroons==0.13.0
PyNaCl==1.5.0
pyparsing==2.4.7
pyRFC3339==1.1
python-apt==2.4.0+ubuntu1
python-dateutil==2.8.1
python-debian===0.1.43ubuntu1
python-lsp-jsonrpc==1.0.0
python-lsp-server==1.7.2
pytoolconfig==1.2.5
pytz==2022.1
pyxdg==0.27
PyYAML==5.4.1
reportlab==3.6.8
requests==2.25.1
rope==1.7.0
SecretStorage==3.3.1
six==1.16.0
systemd-python==234
tomli==2.0.1
ubuntu-advantage-tools==8001
ubuntu-drivers-common==0.0.0
ufw==0.36.1
ujson==5.7.0
unattended-upgrades==0.1
urllib3==1.26.5
usb-creator==0.3.7
wadllib==1.3.6
xdg==5
xkit==0.0.0
zipp==1.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment