Created
November 15, 2019 08:27
-
-
Save jonathangreco/6780e6336c5adeed11045dfadd0e6837 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: '3' | |
services: | |
rabbitmq: | |
image: 'bitnami/rabbitmq:3.8' | |
environment: | |
- RABBITMQ_PASSWORD=test | |
- RABBITMQ_USERNAME=test | |
ports: | |
- '4369:4369' | |
- '5672:5672' | |
- '25672:25672' | |
- '15672:15672' | |
volumes: | |
- 'rabbitmq_data:/bitnami' | |
volumes: | |
rabbitmq_data: | |
driver: local |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from src.RabbitMQ.RabbitMQ import RabbitMQ | |
class Main: | |
def __init__(self): | |
threaded_rabbitmq = RabbitMQ() | |
threaded_rabbitmq.start() | |
main = Main() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from threading import Lock | |
import time | |
import pika | |
# Une classe qui consomme un channel | |
class Consumer: | |
def __init__(self, channel): | |
""" | |
:param pika.adapters.blocking_connection.BlockingChannel channel: The channel | |
""" | |
self.channel = channel | |
def consume(self, method, properties, body): | |
""" | |
Cette méthode consomme nos messages | |
:param method: | |
:param properties: | |
:param body: | |
:return: | |
""" | |
print(body) | |
print('[*] Message received') | |
time.sleep(40) | |
# Une bete classe qui stocke nos identifiants de connexion | |
class ConnectionParams: | |
def __init__(self): | |
self.port = 5672 | |
self.host = "127.0.0.1" | |
self.vhost = "/" | |
self.password = "test" | |
self.username = "test" | |
# Cet objet nous permet de configure rune connexion | |
# (attention rien qui concerne la connexion à une Queue, mais la connexion à RabbitMQ) | |
class Connection: | |
def __init__(self): | |
self.__get_params() | |
def __get_params(self): | |
self.connectionParams = ConnectionParams() | |
def connect(self): | |
connection = pika.PlainCredentials(self.connectionParams.username, self.connectionParams.password) | |
parameters = pika.ConnectionParameters( | |
self.connectionParams.host, | |
self.connectionParams.port, | |
self.connectionParams.vhost, | |
credentials=connection, | |
heartbeat=3600, | |
blocked_connection_timeout=3600 | |
) | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
return [connection, channel] | |
# Un service qui démarre les consumers (on les créée s'ils n'existent pas) Il sont persistants. | |
# A la fin on retourne un tableau de dictionnaire ou chaque dictionnaire représente un channel et sa connexion | |
class ThreadedConnection(threading.Thread): | |
def __init__(self, queue_name): | |
threading.Thread.__init__(self) | |
self.queueName = queue_name | |
self.should_reopen = False | |
self.connection = None | |
self.channel = None | |
self.connect() | |
# https://stackoverflow.com/questions/25489292/consuming-rabbitmq-queue-from-inside-python-threads | |
def connect(self): | |
try: | |
result = Connection().connect() | |
if result: | |
[self.connection, self.channel] = result | |
self.channel.queue_declare(queue=self.queueName, durable=True, arguments={"x-queue-type": "quorum"}) | |
self.channel.basic_qos(prefetch_count=1) | |
self.channel.basic_consume(queue=self.queueName, on_message_callback=Consumer.consume, auto_ack=True) | |
else: | |
raise Exception('no connection available') | |
except: | |
print('hang on AMQP is closed') | |
# Super important car start_consuming va mettre le Thread en statut alive, indéfiniement, ou jusqu'a l'exception | |
def run(self): | |
try: | |
print('try consuming') | |
self.channel.start_consuming() | |
except: | |
print('something wrong happened... waiting for reopenning') | |
class RabbitMQ(threading.Thread): | |
def __init__(self): | |
# Les queues pourraient désormais venir d'un fichier ou d'un élement de configuration | |
self.queues = ("preview", "common", "urgent") | |
threading.Thread.__init__(self) | |
self.channels = [] | |
def reopen_channels(self): | |
self.channels = [] | |
for queue in self.queues: | |
print("The channel " + queue + " is closed reopen...") | |
self.connect(queue=queue) | |
def run_queues(self): | |
for queue in self.queues: | |
self.connect(queue) | |
def run(self): | |
self.run_queues() | |
self.liveness() | |
def connect(self, queue): | |
thread = ThreadedConnection(queue) | |
thread.start() | |
if thread.channel: | |
channel_tag = list(thread.channel._consumer_infos.keys())[0] | |
conf_dict = { | |
"channel": thread.channel, | |
"connection": thread.connection, | |
"ctag": channel_tag, | |
"thread": thread, | |
"queue": queue | |
} | |
self.channels.append(conf_dict) | |
def liveness(self): | |
while True: | |
should_reopen = False | |
if not self.channels: | |
self.run_queues() | |
for config in self.channels: | |
if config['thread'].isAlive(): | |
alive = ". The thread is still alive too." | |
else: | |
alive = ". The thread is also closed." | |
if not config['channel'].is_closed: | |
channel_alive = " is alive" | |
else: | |
should_reopen = True | |
channel_alive = " is closed and should be reopened" | |
print("The channel " + config['ctag'] + channel_alive + alive) | |
if should_reopen: | |
print('re-openning...') | |
self.reopen_channels() | |
print('---') | |
time.sleep(20) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment