Skip to content

Instantly share code, notes, and snippets.

@drhenner
Created August 4, 2017 06:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drhenner/44a04f838ba612778db1e4dc093d0e48 to your computer and use it in GitHub Desktop.
Save drhenner/44a04f838ba612778db1e4dc093d0e48 to your computer and use it in GitHub Desktop.
Message Bus
###################################################################
# A Topic is a superclass that all Communications inherit from
#
# A Communication Channel will accept a number of firms_topics it wants communications for.
# eg. A mobile Device will by default want to hear all communications.
# If the device adds a filter they will not accept communications for
# That type of message.
#
# If it accepts a given firms_topic and there is a message to send. The firms_topic
# has one topic "belongs_to :topic" that performs the method `Process`.
# The `process` method generally takes the params (device_id, object_id, message)
#
# -------------- -------------- ------------------
# | Firm | | FirmsTopic | | Topic |
# | |-------------------------------<| |>-------| |
# | | | | | medium |
# | | | | | message_type |
# | | | | | type |
# | | | | | name |
# | | | | | description |
# -------------- -------------- ------------------
# | |
# | |
# / \ / \
# -------------- ------------------ ------------------
# | User | | Communication | | Notification |
# | |-------<| Channel |-------<| Filters |
# | | | | | |
# | | | type | | |
# | | | name | | |
# | | | meta | | |
# | | | options | | |
# | | | ------------- | | |
# | | | medium (email) | | |
# -------------- ------------------ ------------------
#
###################################################################
class Topic < ActiveRecord::Base
#has_many :firms_topics # shouldn't be called as a method this would return all firm's firms_topics
validates :description, presence: true
validates :name, presence: true
validates :medium, presence: true
validates :message_type, presence: true, uniqueness: { scope: [:medium, :type]}
MESSAGE_ADDED = 'message_added'
TASK_ASSIGNED = 'task_assigned'
TASK_COMPLETED = 'task_completed'
TASK_DUE_TODAY = 'task_due_today'
TASK_READY = 'task_ready'
INTEGRATION_INSTALLED = 'integration_installed'
MESSAGE_TYPES = [
MESSAGE_ADDED,
TASK_ASSIGNED,
TASK_COMPLETED,
TASK_DUE_TODAY,
TASK_READY
]
INFO = [
{ type: 'Topics::EmailMessageAdded', medium: :email, message_type: MESSAGE_ADDED, name: 'Message Added', description: 'Send notification to email when a message is added to a task that I follow.'},
{ type: 'Topics::EmailTaskCompleted', medium: :email, message_type: TASK_COMPLETED, name: 'Task Completed', description: 'Send notification to email when a task is Completed that I follow'},
{ type: 'Topics::MobileMessageAdded', medium: :mobile, message_type: MESSAGE_ADDED, name: 'Message Added', description: 'Send notification to device when a message is added to a task that I follow.'},
{ type: 'Topics::MobileTaskCompleted',medium: :mobile, message_type: TASK_COMPLETED, name: 'Task Completed', description: 'Send notification to device when a task is Completed that I follow'},
#{ type: 'Topics::SlackTaskAssigned', medium: :slack, message_type: TASK_ASSIGNED, name: 'Task Assigned', description: 'Send personal notification to Slack when a task is assigned to me'},
#{ type: 'Topics::SlackIntegrationInstalled', medium: :slack, message_type: INTEGRATION_INSTALLED, name: 'Integration Installed', description: 'Send personal notification to Slack when Slack integration is installed'},
#{ type: 'Topics::SlackMessageAdded', medium: :slack, message_type: MESSAGE_ADDED, name: 'Message Added', description: 'Send notification to Slack when a message is added to a task that I follow.'},
]
def perform(channel_id, object_id, message = nil, options = {})
raise 'please implement `perform` in sub-class'
end
end
####################################################################
class FirmsTopic < ActiveRecord::Base
belongs_to :firm
belongs_to :topic
validates :topic_id, presence: true, uniqueness: { scope: :firm_id }
validates :firm_id, presence: true
delegate :description,
:name,
:medium,
:message_type, to: :topic
def deactivate!
self.update_attributes(active: false)
end
end
##########################################################
class CommunicationChannel < ActiveRecord::Base
include Bitfields
TYPES = ['MobileChannel', 'EmailChannel', 'SlackChannel']
belongs_to :user
has_many :notification_filters
has_many :filtered_firms_topics, -> { where(firms_topics: { active: true }) },
through: :notification_filters,
class_name: 'FirmsTopic', source: :firms_topic
scope :active, -> { where(active: true) }
validates :name, presence: true, uniqueness: {scope: :user_id}
validates :user_id, presence: true
bitfield :options, 1 => :allows_daily_digest
# meta data about the device (for things like OS/model/???? )
serialize :meta, Hash
delegate :firm, to: :user, allow_nil: false
attr_accessor :email, :device_token, :endpoint_arn
def deactivate!
self.update_attribute(:active, false)
end
def activate!
self.update_attribute(:active, true)
end
# 1) if the device(communication_channel) is marked inactive then it will not get ANY notification firms_topics
# 2) if the firm shut off the notifications the device(communication_channel) will not get the notification firms_topic
# 3) otherwise grab all firms_topics that the user has not filtered that are available for the firm
def allowed_firms_topics
if active
firm.firms_topics.includes(:topic).where(topics: {medium: medium}).where.not({firms_topics: {id: notification_filters.pluck(:firms_topic_id) }})
else
[]
end
end
def allows_firms_topic?(topic_id)
if active
allowed_firms_topics.where(topics: {id: topic_id}).exists?
else
false
end
end
def firms_topics_for(message_type)
if active
firm.firms_topics.includes(:topic).where(topics: { medium: medium, message_type: message_type }).where.not({firms_topics: {id: notification_filters.pluck(:firms_topic_id) }})
else
FirmsTopic.none
end
end
def add_to_message_bus(message_type, object_id, message = nil, options = {})
firms_topics_for(message_type).map { |firm_topic| firm_topic.topic.perform(id, object_id, message, options) }
end
def medium
# sub-class should implement this method
raise NotImplementedError.new("Add `medium` method to #{self.class}")
end
def identifier
#
end
def identifier_name=(val)
# do nothing
end
end
###############################################
class NotificationFilter < ActiveRecord::Base
belongs_to :communication_channel # null false in DB
belongs_to :firms_topic # null false in DB
delegate :topic_id,
:description,
:name,
:medium,
:message_type, to: :firms_topic
end
################################################
class Firm < ActiveRecord::Base
has_many :firms_topics, -> { where({firms_topics: {active: true }}) }
has_many :all_firms_topics, class_name: 'FirmsTopic'
after_commit :assign_all_firms_topics, on: :create
private
def assign_all_firms_topics
Topic.all.each do |trigger|
self.firms_topics.where(topic_id: trigger.id).first_or_create(active: true)
end
end
end
################################################
class User < ActiveRecord::Base
has_many :communication_channels
has_many :email_channels
has_many :mobile_channels
has_many :notification_filters, through: :communication_channels
has_many :slack_channels, source: :communication_channels, class_name: 'SlackChannel'
# @user.add_to_message_bus
#
# This method does all the heavy lifting for sending messages/information to
# users.
#
# The message bus is NOT responsible for knowing if the user was suppose to
# receive the message to begin with. If you put a message on the bus and the
# user has any type of communication turned on, they WILL receive the message.
# HENCE, the message bus does not check permissions, it just sends messages to all
# communication channels turned on for a given message_type.
#
# NOTE: you can look at the processing of the messages in models/triggers/* { :method_name => 'process' }
def add_to_message_bus(message_type, object_id, message = nil, options = {})
raise "#{message_type} must be a recognized message type: #{Topic::MESSAGE_TYPES}" if !Topic::MESSAGE_TYPES.include?(message_type.to_s)
channels = communication_channels.where(active: true)
channels.each do |channel|
channel.add_to_message_bus(message_type, object_id, message, options)
end
end
end
################################################
class EmailChannel < CommunicationChannel
validate :email_present
def email
self.meta['email']
end
def email=(value)
self.meta['email'] = value
end
def medium
'email'
end
def identifier
email
end
def identifier=(val)
self.email=(val)
end
def identifier_name
'email'
end
def email_present
if meta['email'].blank? || !meta['email'].match(Devise.email_regexp)
self.errors.add(:base, 'This is not a valid email')
end
true
end
end
############################################
# endpoint_arn
# - an endpoint token for a device or mobile app on one of the supported push notification services, such as GCM and APNS
#
# device_token
# - REFERENCE: http://www.jamesransom.net/?p=27
# - To obtain your device token for testing on chrome refer to the README at
# * https://github.com/scalus/sample_push_notification
class MobileChannel < CommunicationChannel
include ActionView::Helpers::NumberHelper
validates :endpoint_arn, :device_token, presence: true, if: :active?
before_validation :obtain_endpoint_arn
def medium
'mobile'
end
def identifier=(val)
self.device_token=(val)
end
def identifier_name
'mobile'
end
def identifier_name=(val)
#
end
def endpoint_arn=(endpoint_arn)
self.meta[:endpoint_arn] = endpoint_arn
end
def endpoint_arn
self.meta[:endpoint_arn]
end
def device_token=(device_token)
@device_token_changed = (self.meta[:device_token] != device_token)
self.meta[:device_token] = device_token
end
def device_token
self.meta[:device_token]
end
def is_sandbox
self.meta[:is_sandbox]
end
def is_sandbox=(flag)
self.meta[:is_sandbox] = !!flag
end
def obtain_endpoint_arn!
obtain_endpoint_arn
self.save
end
def sns_client
@sns ||= Aws::SNS::Client.new
end
def publish(hash)
sns_message = {
default: hash.delete(:default),
APNS_SANDBOX: { aps: hash.to_json },
APNS: { aps: hash.to_json }
}
sns_client.publish(target_arn: self.endpoint_arn, message: sns_message.to_json, message_structure: 'json')
end
private
def obtain_endpoint_arn
platform_arn = self.is_sandbox ? Settings.aws.sns.ios.application_arn : Settings.aws.sns.ios.sandbox_arn
if device_token.present? && (endpoint_arn.blank? || @device_token_changed)
endpoint = sns_client.create_platform_endpoint(
platform_application_arn: platform_arn,
token: device_token
)
self.endpoint_arn = endpoint[:endpoint_arn]
end
true
end
end
#######################################
class EmailTopic < Topic
end
#######################################
class Topics::EmailMessageAdded < EmailTopic
def perform(channel_id, comment_id, message, options)
# send device the message with a push notification
channel = CommunicationChannel.includes(:user).find(channel_id)
user_id_to_message = channel.user_id
comment_id = comment_id # Comment.includes(:task).find(comment_id)
# options optionally have recipient_names
options = ActiveSupport::HashWithIndifferentAccess.new(options)
options.reverse_merge({recipient_names: []}) # default to no options
send_message(channel.user_id, comment_id, options[:recipient_names])
end
private
def send_message(user_id, comment_id, recipient_names)
TaskMailer.delay.new_comment_notification(user_id, comment_id, recipient_names: recipient_names)
end
end
#######################################
class Topics::EmailTaskCompleted < EmailTopic
def perform(channel_id, task_id, message, options) # updater_id is a required option
# send device the message with a push notification
channel = CommunicationChannel.includes(:user).find(channel_id)
task = Task.find(task_id)
options = ActiveSupport::HashWithIndifferentAccess.new(options)
options.reverse_merge({updater_id: nil})
options.reverse_merge({recipient_names: []})
options.reverse_merge({new_comment: false})
send_message(channel.user_id, task.id, options[:updater_id], options[:new_comment], options[:recipient_names])
end
private
def send_message(user_id, task_id, updater_id, new_comment, recipient_names)
TaskMailer.delay.resolved_notification(user_id,
task_id,
updater_id,
new_comment: new_comment,
recipient_names: recipient_names)
end
end
###################################
class MobileTopic < Topic
def sns
Aws::SNS::Client.new
end
end
###################################
class Topics::MobileMessageAdded < MobileTopic
def perform(channel_id, comment_id, message, options)
# send device the message with a push notification
@channel = MobileChannel.includes(:user).find(channel_id)
@comment = Comment.find(comment_id)
send_message
end
private
def send_message
sns = Aws::SNS::Client.new
title = truncate("New Message: #{@comment.commentable.title}", length: 40, omission: '...')
message = { default: title, sound: 'default', badge: 1, alert: title }
sns.publish(target_arn: @channel.endpoint_arn, message: message, message_structure: 'json', subject: 'test')
end
end
###################################
class Topics::MobileTaskCompleted < MobileTopic
def perform(channel_id, task_id, message, options)
# send device the message with a push notification
@channel = MobileChannel.find(channel_id)
@task = Task.find(task_id)
send_message
end
private
def send_message
title = truncate("Completed: #{@task.title}", length: 40, omission: '...')
message = { default: title, sound: 'default', badge: 1, alert: title }
@channel.publish(message)
end
end
###################################
require 'slack'
class SlackTopic < Topic
include Rails.application.routes.url_helpers
include ActionView::Helpers::TextHelper
include ApplicationHelper
private
def slack
Slack.configure do |config|
config.token = token.token
end
Slack
end
def token
@token ||= IntegrationToken.where(application: Integrations::SlackService::APPLICATION, firm_id: @firm.id).last
end
end
###################################
class Topics::SlackTask < SlackTopic
def task_list_text
if @task.project
"\n*Tasklist*\n #{@task.project.title.html_safe}"
end
end
def task_url
riveter_tasks_url(host: @firm.main_host) + '/' + @task.id.to_s
end
def task_actions
links = []
links << "<#{ complete_task_with_token_url(@task, @user, { host: @firm.main_host }) }|Complete Task>" if @user.present? && @task.can_be_email_completed_by?(@user)
links << "<#{ unfollow_task_with_token_url(@follower, { host: @firm.main_host }) }|Unfollow Task>" if @follower.present? && !@follower.required_follower?
links.join(' • ')
end
def for_company_text
@task && @task.company ? "for #{@task.company.name.html_safe}" : ''
end
end
###################################
module Topics
class SlackMessageAdded < SlackTask
def perform(channel_id, comment_id, message = nil, options = {})
# send device the message with a push notification
@channel = CommunicationChannel.includes(:user).find(channel_id)
@comment = Comment.find(comment_id)
@task = @comment.task
@author = User.find_by_id(@comment.created_by)
@firm = @task.firm
@follower = @task.task_followers.where(follower_id: @channel.user_id).first
send_message
end
private
def send_message
response = slack.post('chat.postMessage', channel: @channel.slack_user_id, token: token.token,
username: 'Scalus Notifications',
icon_url: Settings.slack.bot_logo_url,
attachments: [{
pretext: pretext,
fallback: pretext + ' ' + task_url,
color: '#EEE',
title: @task.title.html_safe,
text: text,
title_link: task_url,
mrkdwn_in: ['text']
}].to_json
)
response['ok']
end
def pretext
@author ? "#{@author.name_or_email} added a new message to this task" : 'A new message was added to this task'
end
def text
[
truncate(@comment.try(:body).to_s.html_safe, length: 50),
task_url,
task_actions,
task_list_text
].join("\n")
end
end
end
###################################
###################################
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment