Skip to content

Instantly share code, notes, and snippets.

@lukebakken
Last active September 29, 2020 02:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukebakken/f95c348f950eb70d958ebefdff25e6d9 to your computer and use it in GitHub Desktop.
Save lukebakken/f95c348f950eb70d958ebefdff25e6d9 to your computer and use it in GitHub Desktop.
import logging
from pika import SelectConnection, URLParameters
from threading import Thread
from time import sleep
from argparse import ArgumentParser
class AsyncConnection():
def __init__(self, host):
self.connection_params = URLParameters('amqp://guest:guest@{}:5672/%2F'.format(host))
self.log = logging.getLogger(self.__class__.__name__)
self._connection = None
self._closing = False
def on_connection_open(self, connection):
assert self._connection is connection
def on_connection_close(self, connection, reply_code, reply_text):
if self._closing:
self.log.info('Connection closing')
self._connection.ioloop.stop()
self.log.info('Connection closed')
def run(self):
self._connection = SelectConnection(parameters=self.connection_params,
on_open_callback=self.on_connection_open,
on_close_callback=self.on_connection_close)
self._connection.ioloop.start()
def stop(self):
self.log.info('Stopping')
self._closing = True
self._connection.close()
self.log.info('Stopped')
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
parser = ArgumentParser()
parser.add_argument('--host', required=True)
args = parser.parse_args()
connection = AsyncConnection(host=args.host)
def work():
connection.run()
t = Thread(target=work)
t.daemon = True
t.start()
try:
print("Press Ctrl-C to stop")
while True:
sleep(1)
except KeyboardInterrupt:
connection.stop()
t.join(timeout=10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment