Skip to content

Instantly share code, notes, and snippets.

@rodorgas
Last active June 1, 2016 02:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rodorgas/71d556d69b18b018d35c1278c8d999c7 to your computer and use it in GitHub Desktop.
Save rodorgas/71d556d69b18b018d35c1278c8d999c7 to your computer and use it in GitHub Desktop.
# http://stackoverflow.com/q/37531811
# Coments are in Portuguese, let me know if they are relevant – so I'll translate them promptly.
## File: proj/app/views.py (truncated)
from app import tasks
def indicar_cliente(request, cli_id):
tasks.indicar_cliente.delay(cli_id)
return HttpResponse(status=200)
def receber_indicacao(request):
try:
result = tasks.indicar_cliente.AsyncResult(request.POST['task_id'])
if request.POST['acao'] in ['aceitar', 'recusar']:
acao = request.POST['acao']
else:
raise ValueError
except (MultiValueDictKeyError, ObjectDoesNotExist, ValueError):
return HttpResponse(status=400)
# Verifica se o rodízio da tarefa está na vez do usuário que reivindicou a indicação
if not (result.state == 'PROGRESS' and result.result['current_user'] == request.user.username):
return HttpResponse('Essa indicação expirou', status=400)
if acao == 'recusar':
# Continua o rodízio
result.backend.store_result(result.id, result=None, status='CONTINUE', traceback=None)
elif acao == 'aceitar':
# Interrompe o rodízio abortando a tarefa
result.abort()
return HttpResponse(status=200)
## File: proj/app/tasks.py
from __future__ import absolute_import
from dm import celery_app
from celery.contrib.abortable import AbortableTask
import redis
import time
import cent.core
from django.conf import settings
from django.db import connection
from django.db.utils import OperationalError
from django.contrib.auth.models import User
from painel.models import Indicacao, IndicacaoRodizio
from painel.lib import indicacao_helper
from painel.lib.indicacao_helper import SemUsuariosAptosError
from lagarto import horario_permitido
@celery_app.task(bind=True, base=AbortableTask)
def indicar_cliente(self, indicacao_id):
"""
Determina a qual usuário o cliente será indicado, fazendo um rodízio nos usuários aptos a
receber indicações.
"""
indicacao = Indicacao.objects.get(pk=indicacao_id)
client = cent.core.Client(settings.CENTRIFUGE_ADDRESS, settings.CENTRIFUGE_SECRET)
r = redis.StrictRedis(decode_responses=True, host='dataminer')
indicacao_aceita = False
while not indicacao_aceita:
# Pega usuário da fila apto a receber indicação
try:
username = indicacao_helper.eleger_usuario_indicacao(client, r)
except OperationalError:
connection.close()
username = indicacao_helper.eleger_usuario_indicacao(client, r)
except SemUsuariosAptosError:
if horario_permitido.expediente_extendido():
print(str(indicacao_id) + ' - sem usuários aptos')
time.sleep(45)
else:
print(str(indicacao_id) + ' - sem usuários aptos, dormindo por 5 minutos')
time.sleep(300) # 5 * 60s
continue
print(str(indicacao_id) + ' - usuario eleito: ' + username)
self.update_state(state='PROGRESS', meta={'current_user': username})
# Registra no DB que o usuário recebeu a indicação
usuario = User.objects.get(username=username)
IndicacaoRodizio.objects.create(indicacao=indicacao, receptor=usuario)
# Envia indicação para o usuário
client.publish(
'indicacao-' + username,
{'data':
{'cpf': indicacao.cli.cpf,
'indicacao_id': indicacao.id,
'task_id': self.request.id}},
)
# Polling verificando se a indicação foi recebida
i = 1
while True:
# Continua o rodízio após aprox. 25s
# São 50 iterações que duram cada uma pelo menos 0.5s, além do processamento cuja
# duração é variável
if i == 50:
break
# A indicação foi aceita (a tarefa foi, portanto, abortada)
if self.is_aborted():
print(str(indicacao_id) + ' - indicação aceita por: ' + username)
indicacao_aceita = True
return
# A indicação foi recusada, então continua o rodízio imediatamente
if self.AsyncResult(self.request.id).state == 'CONTINUE':
break
time.sleep(0.5)
i += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment