Skip to content

Instantly share code, notes, and snippets.

@jonathangreco
Created November 15, 2019 08:27
Show Gist options
  • Save jonathangreco/6780e6336c5adeed11045dfadd0e6837 to your computer and use it in GitHub Desktop.
Save jonathangreco/6780e6336c5adeed11045dfadd0e6837 to your computer and use it in GitHub Desktop.
version: '3'
services:
rabbitmq:
image: 'bitnami/rabbitmq:3.8'
environment:
- RABBITMQ_PASSWORD=test
- RABBITMQ_USERNAME=test
ports:
- '4369:4369'
- '5672:5672'
- '25672:25672'
- '15672:15672'
volumes:
- 'rabbitmq_data:/bitnami'
volumes:
rabbitmq_data:
driver: local
from src.RabbitMQ.RabbitMQ import RabbitMQ
class Main:
def __init__(self):
threaded_rabbitmq = RabbitMQ()
threaded_rabbitmq.start()
main = Main()
import threading
from threading import Lock
import time
import pika
# Une classe qui consomme un channel
class Consumer:
def __init__(self, channel):
"""
:param pika.adapters.blocking_connection.BlockingChannel channel: The channel
"""
self.channel = channel
def consume(self, method, properties, body):
"""
Cette méthode consomme nos messages
:param method:
:param properties:
:param body:
:return:
"""
print(body)
print('[*] Message received')
time.sleep(40)
# Une bete classe qui stocke nos identifiants de connexion
class ConnectionParams:
def __init__(self):
self.port = 5672
self.host = "127.0.0.1"
self.vhost = "/"
self.password = "test"
self.username = "test"
# Cet objet nous permet de configure rune connexion
# (attention rien qui concerne la connexion à une Queue, mais la connexion à RabbitMQ)
class Connection:
def __init__(self):
self.__get_params()
def __get_params(self):
self.connectionParams = ConnectionParams()
def connect(self):
connection = pika.PlainCredentials(self.connectionParams.username, self.connectionParams.password)
parameters = pika.ConnectionParameters(
self.connectionParams.host,
self.connectionParams.port,
self.connectionParams.vhost,
credentials=connection,
heartbeat=3600,
blocked_connection_timeout=3600
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
return [connection, channel]
# Un service qui démarre les consumers (on les créée s'ils n'existent pas) Il sont persistants.
# A la fin on retourne un tableau de dictionnaire ou chaque dictionnaire représente un channel et sa connexion
class ThreadedConnection(threading.Thread):
def __init__(self, queue_name):
threading.Thread.__init__(self)
self.queueName = queue_name
self.should_reopen = False
self.connection = None
self.channel = None
self.connect()
# https://stackoverflow.com/questions/25489292/consuming-rabbitmq-queue-from-inside-python-threads
def connect(self):
try:
result = Connection().connect()
if result:
[self.connection, self.channel] = result
self.channel.queue_declare(queue=self.queueName, durable=True, arguments={"x-queue-type": "quorum"})
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(queue=self.queueName, on_message_callback=Consumer.consume, auto_ack=True)
else:
raise Exception('no connection available')
except:
print('hang on AMQP is closed')
# Super important car start_consuming va mettre le Thread en statut alive, indéfiniement, ou jusqu'a l'exception
def run(self):
try:
print('try consuming')
self.channel.start_consuming()
except:
print('something wrong happened... waiting for reopenning')
class RabbitMQ(threading.Thread):
def __init__(self):
# Les queues pourraient désormais venir d'un fichier ou d'un élement de configuration
self.queues = ("preview", "common", "urgent")
threading.Thread.__init__(self)
self.channels = []
def reopen_channels(self):
self.channels = []
for queue in self.queues:
print("The channel " + queue + " is closed reopen...")
self.connect(queue=queue)
def run_queues(self):
for queue in self.queues:
self.connect(queue)
def run(self):
self.run_queues()
self.liveness()
def connect(self, queue):
thread = ThreadedConnection(queue)
thread.start()
if thread.channel:
channel_tag = list(thread.channel._consumer_infos.keys())[0]
conf_dict = {
"channel": thread.channel,
"connection": thread.connection,
"ctag": channel_tag,
"thread": thread,
"queue": queue
}
self.channels.append(conf_dict)
def liveness(self):
while True:
should_reopen = False
if not self.channels:
self.run_queues()
for config in self.channels:
if config['thread'].isAlive():
alive = ". The thread is still alive too."
else:
alive = ". The thread is also closed."
if not config['channel'].is_closed:
channel_alive = " is alive"
else:
should_reopen = True
channel_alive = " is closed and should be reopened"
print("The channel " + config['ctag'] + channel_alive + alive)
if should_reopen:
print('re-openning...')
self.reopen_channels()
print('---')
time.sleep(20)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment