Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save stephenreid/7ce2bdfd10b6ff9d8c6c099c1d9c9923 to your computer and use it in GitHub Desktop.
Save stephenreid/7ce2bdfd10b6ff9d8c6c099c1d9c9923 to your computer and use it in GitHub Desktop.
RefreshIdentifiesService.rb
class Analytics::RefreshIdentifiesService
# This service is explicitly written to send identify calls for users
# that have incorrect attributes in Braze. The flow for this service is as
# follows:
#
# 1. Find all users that have been updated in the last hour
# 2. For each user, pull the corresponding user attributes for that user
# from Braze using the Braze REST API (via the Braze-Ruby gem)
# 3. If the traits in the user's identify_data is different than what comes
# from Braze (that is, there are either new attributes or changed attributes),
# then:
# 4. Issue an identify call to Segment for the changed attributes
attr_reader :within, :batch_size
def initialize(within: 2.hours, batch_size: 50)
@within = within
if batch_size.nil? || batch_size > 50 || batch_size < 1
raise ArgumentError, 'batch_size must be between 1 and 50'
end
@batch_size = batch_size
end
def perform
updated_users =
ActiveRecord::Base.connected_to(role: :reading_replica) do
User
.not_deleted
.where(
User.arel_table[:updated_at].gt(within.ago).and(
User.arel_table[:identify_data].not_eq(nil)
)
)
.pluck(:id, :identify_data)
end
updated_users.each_slice(batch_size).each do |users|
process_users(users)
end
end
private
def process_users(users)
user_data = Hash[users]
return if user_data.empty?
braze_users = fetch_braze_users(user_data.keys)
return if braze_users.empty?
braze_users.each do |braze_user|
user_id = braze_user['external_id'].to_i
braze_traits = normalize_braze_traits(braze_user)
next unless user_data[user_id]
current_traits = normalize_user_traits(user_data[user_id]['traits'])
next unless current_traits.present?
changed_traits = traits_diff(current_traits, braze_traits)
next unless changed_traits.present?
send_identify(user_id, changed_traits)
end
segment_client.flush
end
def remove_nils(hash)
hash.delete_if { |_, v| v.nil? }
end
def fetch_braze_users(user_ids)
success, data = braze_client.export_users(
external_ids: user_ids,
fields_to_export: %w(
external_id
first_name
last_name
email
custom_attributes
)
)
if success && data['users'] && data['message'] == 'success'
data['users']
end
end
def normalize_braze_traits(braze_user)
traits = {}
traits.merge!(braze_user['custom_attributes']) if braze_user['custom_attributes']
traits['email'] = braze_user['email'] if braze_user['email']
if braze_user['first_name'] || braze_user['last_name']
traits['name'] = build_name(braze_user['first_name'], braze_user['last_name'])
traits['first_name'] = braze_user['first_name'].to_s.strip
traits['last_name'] = braze_user['last_name'].to_s.strip
end
normalize_timestamps(traits)
traits
end
def normalize_user_traits(traits)
remove_nils(traits)
traits['name'] = traits['name'].strip if traits['name']
traits['first_name'] = traits['first_name'].to_s.strip if traits['first_name']
traits['last_name'] = traits['last_name'].to_s.strip if traits['last_name']
normalize_timestamps(traits)
traits
end
def normalize_timestamps(traits)
%w(
created_at
last_login
current_period_end
current_period_start
subscription_ended_at
subscription_canceled_at
).each do |attr|
traits[attr] = DateTime.parse(traits[attr]).utc.iso8601 if traits[attr]
end
end
def build_name(first_name, last_name)
"#{first_name} #{last_name}".strip
end
def traits_diff(current, braze)
(current.to_a - braze.to_a).to_h
end
def send_identify(user_id, traits)
Rails.logger.debug "[#{self.class}] Identifying user (refresh): #{user_id}, #{traits.inspect}"
segment_client.identify(build_identify(user_id, traits))
end
def build_identify(user_id, traits)
{
user_id: user_id,
traits: traits,
integrations: Analytics::IntegrationsService.identify_options(user_id)
}.as_json
end
def braze_client
@braze_client ||= Analytics::Api::BrazeService.new
end
def segment_client
@segment_client ||= Segment::Analytics.new(
write_key: Settings.segment.write_key,
batch_size: batch_size
)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment