-
-
Save bibendi/37e2898c274eb581a4f2260f0716ef93 to your computer and use it in GitHub Desktop.
# ===== Common side in gem apress-api | |
Rails.application.routes.draw do | |
scope module: "apress", constraints: {domain: :current} do | |
namespace "api/v1" do | |
post 'callbacks/:service' => 'callbacks#create' | |
end | |
end | |
end | |
Rails.application.config.api = { | |
callbacks: ActiveSupport::HashWithIndifferentAccess.new, | |
events: ActiveSupport::HashWithIndifferentAccess.new([]) | |
} | |
module Apress | |
module Api | |
class CallbacksController < ::Apress::Api::BaseController | |
def create | |
service = params.require(:service) | |
event = params.require(:event) | |
job = ::Rails.application.config.api[:callbacks].fetch(service).fetch(event).camelize.constantize | |
event_params = params[:params] || {} | |
if job.respond_to?(:enqueue) | |
job.enqueue(event_params) | |
else | |
::Resque.enqueue(job, event_params) | |
end | |
head 201 | |
end | |
end | |
end | |
end | |
module Apress | |
module Api | |
class DelayedFireCallback | |
include Interactor | |
def call | |
services = Rails.application.config.api[:events].fetch(context.event) | |
::Resque.redis.multi do | |
services.each do |service| | |
::Resque.enqueue( | |
::Apress::Api::FireCallbackJob, | |
service, | |
context.event, | |
context.params | |
) | |
end | |
end | |
end | |
end | |
end | |
end | |
module Apress | |
module Api | |
class FireCallbackJob | |
include Resque::Integration | |
queue :api_callbacks | |
retries | |
def self.perform(serivce, event, params) | |
callback_class = "#{service}_client/fire_callback".camelize.constantize | |
callback_class.call!( | |
event: event, | |
params: params | |
) | |
end | |
end | |
end | |
end | |
module Apress | |
module Api | |
module Callbacks | |
def notify_service(event:, params: {}, when:) | |
# Metaprogramming magic | |
end | |
end | |
end | |
end | |
# ===== Project side | |
# apress-companies | |
module Apress | |
module Companies | |
class CompanyUser | |
extend ::Apress::Api::Callbacks | |
notify_services event: 'company_user:delete', | |
params: [:company_id, :user], | |
when: [:after_commit, on: :destroy] | |
# !!!! Will be generated as... !!!! | |
# BEGIN | |
after_commit(on: :destroy) do | |
::Apress::Api::DelayedFireCallback.call!( | |
event: 'company_user:delete', | |
params: {company_id: company_id, user_id: user} | |
) | |
end | |
# END | |
end | |
end | |
end | |
# cosmos_client | |
Rails.application.config.api[:events]['company_user:delete'] << :cosmos | |
module CosmosClient | |
class FireCallback | |
include Interactor | |
def call | |
# Если у нас стоит флаг о том, что сервис не готов принимать запросы, то мы не будем этого делать, так как нет гарантий, | |
# что принятый на той стороне запрос корректно обработается. Мы лучше попробуем его послать позднее, | |
# выбросив здесь исключение, чтобы отработал механиз retry в FireCallbackJob | |
context.fail!(message: 'cosmos.updates_locked') if ::Apress::Orders.updates_locked? | |
callback = ::CosmosClient::Callback.new(event: context.event, params: context.params) | |
context.callback = callback | |
context.fail!(message: 'cosmos.fire_callback.failure') unless callback.save | |
end | |
end | |
end | |
module CosmosClient | |
class Callback < CosmosClient::Base | |
self.site = ::CosmosClient.api_url + "/callbacks/cosmos" | |
end | |
end | |
# ===== Cosmos (Apress::Orders at now) side | |
Rails.application.config.api[:callbacks][:cosmos] = { | |
'company_user:delete' => 'apress/orders/reset_manager_to_default_job' | |
} | |
# https://github.com/abak-press/apress-orders/blob/master/app/jobs/apress/orders/reset_manager_to_default_job.rb#L4 | |
# FIXME: Убрать уникальность, которая к тому же сделана и с ошибкой | |
module Apress | |
module Orders | |
class ResetManagerToDefaultJob | |
include Resque::Integration | |
queue :orders_unbind_manager | |
retries | |
def self.perform(params) | |
# ... | |
end | |
end | |
end | |
end |
код должен выполняться в джобе, для которого настроено ретри.
Я с этим категорически не согласен. Наоборот так сделано специально.
Есть проблема с тем, что для одного сервиса может быть поставлен джоб уведомления, а для второго нет и при повторе будет дубль.
Именно в этом. Каждое уведомление сервиса в своем джобе. Постановка джоба в очередь выполняется мгновенно и не будет тормозить. Если делать уведомление сервисов в одном джобе, то при неудачном соединении до одного из сервисов упадет весь джоб. И перепоставить его будет проблематично из-за того, что на сервис, который принял сообщения первый раз без ошибок, опять придет повторное сообщение. Я исхожу из того, что постановка джобов в очередь всегда работает без ошибок.
Или костылить список отправленных уведомлений в редисе, каждому калбеку генерировать уникальный ид
Это конечно решит проблему вызова сервисов из одного джоба. Но что то это все сложно для первой итерации. Может не надо?
Считаю что тут не нужно и вредно блокировать отправку уведомления о не доступности целевой системы.
Подожди, схема то такая и есть:
- В проекте запускается джоб, который вызывает FireCallback
- FireCallback видит, что сервис недоступен и падает с ошибкой
- Джоб видит, что FireCallback выбросил исключение и задействует механизм retry
Я с этим категорически не согласен. Наоборот так сделано специально.
Ты не понял о чем я.
Если делать уведомление сервисов в одном джобе, то при неудачном соединении до одного из сервисов упадет весь джоб.
Я не предлагаю слать сообщения из одного джоба (это не надежно). каждое сообщение в своем джобе, НО джобы нужно ставить надежно.
Я исхожу из того, что постановка джобов в очередь всегда работает без ошибок.
это не так! тому подтверждение недавние ошибки "connection reset by peer", "connection timeout" и тп, которые переодически бывают. Зачем делать ненадежно, если можно надежно?
Итого: какой минус в схеме ставить джобы из джоба? она только лишь увеличивает надежность для необходимого для продакшен-системы уровня. в реализации это 0.5 + защита от повторного запуска джоба.
Другой подход. Можно не делать джоб, можно ставить джобы в рескью в транзакции, но я не уверен что это можно, надо экспериментировать.
В твоей схеме, обязательно в один прекрасный день, при рассылке уведомлений, из 3 джобов, встанет 2, на третьем будет ошибка постановки, клиент ок не получит (или получит? как ошибки будут обрабатываться? транзакция уже закоммичена)
На это конечно же никто не обратит внимание и уйдет часть уведомлений. Или увидят (где?) ошибку и повторят (кому отправлять, а кому нет?) и в результате кто-то получит дубли уведомлений, что может быть критично. Те система не надежна.
В моей схеме (джоб + уникальность или транзакционная постановка), ничего из этого в принципе не возможно.
Если не делать транзакционность и контроль уникальности, то можно не добавлять в главный джоб ретри и просто использовать эту схему для контроля за ошибками, как лог, которой смотрят и для которого есть уведомления в телеграм.
Нужно перехватывать все исключения при постановке каждого джоба и райзить ошибку с списком тех кому не отправлены уведомления.
Кстати, еще один путь, которым можно решить проблему повторной отправки уведомлений при перезапуске главного джоба - при перезапуске в ретри, модифицировать параметры (перезапускать с другими параметрами, перечислять список тех кому уже отправили уведомление). Это простой путь, не нужен контроль уникальности и транзакции.
Те целых четыре варианта сделать надежно:
- без родительского джоба, транзакционно ставить в ресью;
- с родительским джобом, без ретри, просто логгировать в клинер и телеграм;
- с родительским джобом, с ретри, с контролем уникальности, через редис;
- с родительским джобом, с ретри, при перестановке менять параметры запуска.
FireCallback видит, что сервис недоступен и падает с ошибкой
Джоб видит, что FireCallback выбросил исключение и задействует механизм retry
Когда физически система не доступна, это так, но я кинул ссылку на проверку логический-блокировки. Эту блокировку тут проверять не нужно, нужно слать уведомление, а что там система не готова щас это выполнить это ее проблемы и это обработается ретри на принимающей стороне, который конечно же должен быть в каждом джобе обработки нотификации, что не указано в коде.
А тут - https://gist.github.com/bibendi/37e2898c274eb581a4f2260f0716ef93#file-service-callbacks-rb-L19 - случаем DoS не возможен?
Я так понимаю тут надо какие-то ограничения поставить на того, кто может отправлять туда запрос?
Например, что это может сделать только проект в котором задан какой-нибудь secret-token общий с сервисным проектом.
В целом ок, с некоторыми правками:
Пример, в котором существующий код создаст проблемы: вызван калбек, в списке два сервиса, один джоб поставили, второй упал, калбек упал? (нужно проверить что будет при падении калбека, данные в базу уже записаны, нужно в релике выковыривать ошибку и ставить джоб заново, причем не понятно какой был поставлен, а какой нет).
Если джобы ставить из общего джоба, то для него будет сконфигурирован ретри, он будет виден в клинере, его можно будет повторить, автоматом и руками, калбек в моделе не упадет.
Есть проблема с тем, что для одного сервиса может быть поставлен джоб уведомления, а для второго нет и при повторе будет дубль. Нужен механизм не допускающих повторных уведомлений. В идеале в рескью ставить джобы в транзакции:
Или костылить список отправленных уведомлений в редисе, каждому калбеку генерировать уникальный ид, который вместе с сервисом писать в редис с большим экспаиром и проверять при отправке. Лучше бы конечно освоить транзакционность в рескью, уже была задача в которой он был нужен.
Только я не уверен что так просто получится, надо эту тему проработать.
Плюс я допускаю что где-то нужны синхронные уведомления, сделаем их по-мере необходимости.
Отправлять нужно, и уже на той стороне система знает когда этот калбек обработать (когда будет готова, за счет ретри).