Skip to content

Instantly share code, notes, and snippets.

@belyaev-pa
Last active August 10, 2018 08:34
Show Gist options
  • Save belyaev-pa/d7e5fbfad7d466138d031d6df6e3c6e4 to your computer and use it in GitHub Desktop.
Save belyaev-pa/d7e5fbfad7d466138d031d6df6e3c6e4 to your computer and use it in GitHub Desktop.
redis client/generator task

Redis Генератор/Читатель

Task description

Задача написать приложение, работающее с redis, умеющее как генерировать сообщения, так и обрабатывать. Параллельно может быть запущено сколько угодно приложений.

Обмен любой информацией между приложениями осуществлять только через redis.

Все запущенные копии приложения кроме генератора, являются обработчиками сообщений и в любой момент готовы получить сообщение из redis.

Все сообщения должны быть обработаны, причём только один раз, только одним из обработчиков.

Генератором должно быть только одно из запущенных приложений. Т.е. каждое приложение может стать генератором. Но в любой момент времени может работать только один генератор.

Если текущий генератор завершить принудительно (обработчик завершения запрещен, например, выключили из розетки), то одно из приложений должно заменить завершённое (упавшее) и стать генератором. Для определения кто генератор нельзя использовать средства ОС, считается что все приложения запущенны на разных серверах и общаются только через redis.

Сообщения генерируются раз в 500 мс.

Для генерации сообщения использовать любую функцию с рандомным текстовым ответом.

Приложение, получая сообщение, с вероятностью 5% определяет, что сообщение содержит ошибку.

Если сообщение содержит ошибку, то сообщение следует поместить в redis для дальнейшего изучения.

Если приложение запустить с параметром 'getErrors', то оно заберет из redis все сообщения с ошибкой, выведет их на экран и завершится, при этом из базы сообщения удаляются.

Author

from redis import Redis
import string
import random
from time import sleep
def random_message_generate(length=100):
message = ''.join([random.choice(
string.ascii_letters + string.digits) for n in range(length)])
return message
class Queue(object):
def __init__(self, name, namespace='queue'):
self.db = Redis(host='localhost', port=6379, db=0)
self.key = '{}:{}'.format(namespace, name)
def qsize(self):
"""получаем кол-во элементов в очереди"""
return self.db.llen(self.key)
def empty(self):
"""проверяем есть ли элементы в очереди"""
return self.qsize() == 0
def put(self, item):
"""добавим элемент в очередь"""
self.db.rpush(self.key, item)
def get(self, block=True, timeout=None):
"""получаем и удаляем первый элемент очереди c ожиданием или без"""
if block:
item = self.db.blpop(self.key, timeout=timeout)[1]
else:
item = self.db.lpop(self.key)
return item
def get_nowait(self):
"""получаем без ожидания"""
return self.get(False)
def multi_pop(self, ):
"""получаем все сообщения из очереди и очищаем ее"""
pop = self.db.pipeline()
pop.multi()
pop.lrange(self.key, 0, self.qsize() - 1)
pop.ltrim(self.key, self.qsize(), -1)
return pop.execute()
class Application(object):
"""наименование ключа генератора"""
generator_key = 'generator'
"""время жизни ключа генератора"""
expire_time = 10
def __init__(self, getErrors=False):
self.db = Redis(host='localhost', port=6379, db=0)
self.messages_queue = Queue('messages')
self.error_queue = Queue('errors')
if getErrors:
self.get_errors()
else:
if self.try_generator_role():
self.play_generator_role()
else:
self.play_client_role()
def try_generator_role(self):
"""пробуем стать генератором"""
if self.db.get(self.generator_key):
return False
else:
self.db.set(self.generator_key, True, ex=self.expire_time)
return True
def play_generator_role(self):
"""выполняем роль генератора"""
while True:
self.messages_queue.put(random_message_generate(
random.randint(100,300)))
self.db.expire(self.generator_key, self.expire_time)
sleep(0.5)
def play_client_role(self):
"""выполняем роль клиента"""
while True:
if self.messages_queue.empty():
sleep(0.5)
else:
message = self.messages_queue.get_nowait()
#если сообщение с ошибкой записываем его в очередь ошибок
if random.randint(1,100) < 6:
self.error_queue.put(message)
# пробуем стать генератором
if self.try_generator_role():
self.play_generator_role()
def get_errors(self):
"""получаем сообщения с ошибками"""
errors_list = self.error_queue.multi_pop()
for obj in errors_list:
print(obj)
q = Application()
#q = Application(getErrors=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment