Skip to content

Instantly share code, notes, and snippets.

@bibendi
Last active April 2, 2018 16:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bibendi/37e2898c274eb581a4f2260f0716ef93 to your computer and use it in GitHub Desktop.
Save bibendi/37e2898c274eb581a4f2260f0716ef93 to your computer and use it in GitHub Desktop.
# ===== Common side in gem apress-api
Rails.application.routes.draw do
scope module: "apress", constraints: {domain: :current} do
namespace "api/v1" do
post 'callbacks/:service' => 'callbacks#create'
end
end
end
Rails.application.config.api = {
callbacks: ActiveSupport::HashWithIndifferentAccess.new,
events: ActiveSupport::HashWithIndifferentAccess.new([])
}
module Apress
module Api
class CallbacksController < ::Apress::Api::BaseController
def create
service = params.require(:service)
event = params.require(:event)
job = ::Rails.application.config.api[:callbacks].fetch(service).fetch(event).camelize.constantize
event_params = params[:params] || {}
if job.respond_to?(:enqueue)
job.enqueue(event_params)
else
::Resque.enqueue(job, event_params)
end
head 201
end
end
end
end
module Apress
module Api
class DelayedFireCallback
include Interactor
def call
services = Rails.application.config.api[:events].fetch(context.event)
::Resque.redis.multi do
services.each do |service|
::Resque.enqueue(
::Apress::Api::FireCallbackJob,
service,
context.event,
context.params
)
end
end
end
end
end
end
module Apress
module Api
class FireCallbackJob
include Resque::Integration
queue :api_callbacks
retries
def self.perform(serivce, event, params)
callback_class = "#{service}_client/fire_callback".camelize.constantize
callback_class.call!(
event: event,
params: params
)
end
end
end
end
module Apress
module Api
module Callbacks
def notify_service(event:, params: {}, when:)
# Metaprogramming magic
end
end
end
end
# ===== Project side
# apress-companies
module Apress
module Companies
class CompanyUser
extend ::Apress::Api::Callbacks
notify_services event: 'company_user:delete',
params: [:company_id, :user],
when: [:after_commit, on: :destroy]
# !!!! Will be generated as... !!!!
# BEGIN
after_commit(on: :destroy) do
::Apress::Api::DelayedFireCallback.call!(
event: 'company_user:delete',
params: {company_id: company_id, user_id: user}
)
end
# END
end
end
end
# cosmos_client
Rails.application.config.api[:events]['company_user:delete'] << :cosmos
module CosmosClient
class FireCallback
include Interactor
def call
# Если у нас стоит флаг о том, что сервис не готов принимать запросы, то мы не будем этого делать, так как нет гарантий,
# что принятый на той стороне запрос корректно обработается. Мы лучше попробуем его послать позднее,
# выбросив здесь исключение, чтобы отработал механиз retry в FireCallbackJob
context.fail!(message: 'cosmos.updates_locked') if ::Apress::Orders.updates_locked?
callback = ::CosmosClient::Callback.new(event: context.event, params: context.params)
context.callback = callback
context.fail!(message: 'cosmos.fire_callback.failure') unless callback.save
end
end
end
module CosmosClient
class Callback < CosmosClient::Base
self.site = ::CosmosClient.api_url + "/callbacks/cosmos"
end
end
# ===== Cosmos (Apress::Orders at now) side
Rails.application.config.api[:callbacks][:cosmos] = {
'company_user:delete' => 'apress/orders/reset_manager_to_default_job'
}
# https://github.com/abak-press/apress-orders/blob/master/app/jobs/apress/orders/reset_manager_to_default_job.rb#L4
# FIXME: Убрать уникальность, которая к тому же сделана и с ошибкой
module Apress
module Orders
class ResetManagerToDefaultJob
include Resque::Integration
queue :orders_unbind_manager
retries
def self.perform(params)
# ...
end
end
end
end
@Napolskih
Copy link

В целом ок, с некоторыми правками:

  • Этот код должен выполняться в джобе, для которого настроено ретри. Это позволит обеспечить надежность рассылки уведомлений и не будет тормозить вызвавший калбек код.
    Пример, в котором существующий код создаст проблемы: вызван калбек, в списке два сервиса, один джоб поставили, второй упал, калбек упал? (нужно проверить что будет при падении калбека, данные в базу уже записаны, нужно в релике выковыривать ошибку и ставить джоб заново, причем не понятно какой был поставлен, а какой нет).
    Если джобы ставить из общего джоба, то для него будет сконфигурирован ретри, он будет виден в клинере, его можно будет повторить, автоматом и руками, калбек в моделе не упадет.
    Есть проблема с тем, что для одного сервиса может быть поставлен джоб уведомления, а для второго нет и при повторе будет дубль. Нужен механизм не допускающих повторных уведомлений. В идеале в рескью ставить джобы в транзакции:
  Resque.redis.multi do
    services.each do |service|
      Resque.enqueue ...
    end
  end

Или костылить список отправленных уведомлений в редисе, каждому калбеку генерировать уникальный ид, который вместе с сервисом писать в редис с большим экспаиром и проверять при отправке. Лучше бы конечно освоить транзакционность в рескью, уже была задача в которой он был нужен.

Только я не уверен что так просто получится, надо эту тему проработать.
Плюс я допускаю что где-то нужны синхронные уведомления, сделаем их по-мере необходимости.

  • Считаю что тут не нужно и вредно блокировать отправку уведомления о не доступности целевой системы.
    Отправлять нужно, и уже на той стороне система знает когда этот калбек обработать (когда будет готова, за счет ретри).

@bibendi
Copy link
Author

bibendi commented Mar 6, 2018

код должен выполняться в джобе, для которого настроено ретри.

Я с этим категорически не согласен. Наоборот так сделано специально.

Есть проблема с тем, что для одного сервиса может быть поставлен джоб уведомления, а для второго нет и при повторе будет дубль.

Именно в этом. Каждое уведомление сервиса в своем джобе. Постановка джоба в очередь выполняется мгновенно и не будет тормозить. Если делать уведомление сервисов в одном джобе, то при неудачном соединении до одного из сервисов упадет весь джоб. И перепоставить его будет проблематично из-за того, что на сервис, который принял сообщения первый раз без ошибок, опять придет повторное сообщение. Я исхожу из того, что постановка джобов в очередь всегда работает без ошибок.

Или костылить список отправленных уведомлений в редисе, каждому калбеку генерировать уникальный ид

Это конечно решит проблему вызова сервисов из одного джоба. Но что то это все сложно для первой итерации. Может не надо?

Считаю что тут не нужно и вредно блокировать отправку уведомления о не доступности целевой системы.

Подожди, схема то такая и есть:

  • В проекте запускается джоб, который вызывает FireCallback
  • FireCallback видит, что сервис недоступен и падает с ошибкой
  • Джоб видит, что FireCallback выбросил исключение и задействует механизм retry

@Napolskih
Copy link

Я с этим категорически не согласен. Наоборот так сделано специально.

Ты не понял о чем я.

Если делать уведомление сервисов в одном джобе, то при неудачном соединении до одного из сервисов упадет весь джоб.

Я не предлагаю слать сообщения из одного джоба (это не надежно). каждое сообщение в своем джобе, НО джобы нужно ставить надежно.

Я исхожу из того, что постановка джобов в очередь всегда работает без ошибок.

это не так! тому подтверждение недавние ошибки "connection reset by peer", "connection timeout" и тп, которые переодически бывают. Зачем делать ненадежно, если можно надежно?

Итого: какой минус в схеме ставить джобы из джоба? она только лишь увеличивает надежность для необходимого для продакшен-системы уровня. в реализации это 0.5 + защита от повторного запуска джоба.

Другой подход. Можно не делать джоб, можно ставить джобы в рескью в транзакции, но я не уверен что это можно, надо экспериментировать.

В твоей схеме, обязательно в один прекрасный день, при рассылке уведомлений, из 3 джобов, встанет 2, на третьем будет ошибка постановки, клиент ок не получит (или получит? как ошибки будут обрабатываться? транзакция уже закоммичена)
На это конечно же никто не обратит внимание и уйдет часть уведомлений. Или увидят (где?) ошибку и повторят (кому отправлять, а кому нет?) и в результате кто-то получит дубли уведомлений, что может быть критично. Те система не надежна.
В моей схеме (джоб + уникальность или транзакционная постановка), ничего из этого в принципе не возможно.

Если не делать транзакционность и контроль уникальности, то можно не добавлять в главный джоб ретри и просто использовать эту схему для контроля за ошибками, как лог, которой смотрят и для которого есть уведомления в телеграм.
Нужно перехватывать все исключения при постановке каждого джоба и райзить ошибку с списком тех кому не отправлены уведомления.

Кстати, еще один путь, которым можно решить проблему повторной отправки уведомлений при перезапуске главного джоба - при перезапуске в ретри, модифицировать параметры (перезапускать с другими параметрами, перечислять список тех кому уже отправили уведомление). Это простой путь, не нужен контроль уникальности и транзакции.

Те целых четыре варианта сделать надежно:

  • без родительского джоба, транзакционно ставить в ресью;
  • с родительским джобом, без ретри, просто логгировать в клинер и телеграм;
  • с родительским джобом, с ретри, с контролем уникальности, через редис;
  • с родительским джобом, с ретри, при перестановке менять параметры запуска.

FireCallback видит, что сервис недоступен и падает с ошибкой
Джоб видит, что FireCallback выбросил исключение и задействует механизм retry

Когда физически система не доступна, это так, но я кинул ссылку на проверку логический-блокировки. Эту блокировку тут проверять не нужно, нужно слать уведомление, а что там система не готова щас это выполнить это ее проблемы и это обработается ретри на принимающей стороне, который конечно же должен быть в каждом джобе обработки нотификации, что не указано в коде.

@deniskorobicyn
Copy link

deniskorobicyn commented Apr 2, 2018

А тут - https://gist.github.com/bibendi/37e2898c274eb581a4f2260f0716ef93#file-service-callbacks-rb-L19 - случаем DoS не возможен?
Я так понимаю тут надо какие-то ограничения поставить на того, кто может отправлять туда запрос?
Например, что это может сделать только проект в котором задан какой-нибудь secret-token общий с сервисным проектом.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment