Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Elasticsearch: calculating user sessions with Map/Reduce (Ruby)
# The same algorithm which is used in Google Analytics (https://support.google.com/analytics/answer/2731565?hl=en):
# Time-based expiry (including end of day):
# - After 30 minutes of inactivity
# - At midnight
# Enable dynamic scripting for Groovy (https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting.html#_enabling_dynamic_scripting)
# ! WARNING: please read about security first
# Usage:
#
# > Elastic::CalcUserSessionsService.new(from_date: 1.day.ago, to_date: Time.current, user_ids: [1]).execute
#
# {
# 1 => [
# {
# day: Thu, 28 May 2015,
# sum: 125,
# average: 25,
# median: 20
# }
# ]
# }
class Elastic::CalcUserSessionsService
INDEX_NAME = "your_index".freeze
TIME_FIELD = "read_at".freeze
MILLISECONDS_IN_SEC = 1_000
MILLISECONDS_IN_MIN = 60 * MILLISECONDS_IN_SEC
INACTIVITY_DURATION_IN_MILLISECONDS = 30 * MILLISECONDS_IN_MIN
attr_reader :params
def initialize(params)
@params = params
end
def execute
result = {}
response = elastic_adapter.search(index: INDEX_NAME, body: query)
response["aggregations"]["user_ids"]["buckets"].each do |user_bucket|
result[user_bucket["key"]] = []
user_bucket["by_days"]["buckets"].each do |session_bucket|
next if session_bucket["sessions"]["value"].blank?
result[user_bucket["key"]] << {
day: Time.at(session_bucket["key"] / MILLISECONDS_IN_SEC).to_date,
sum: session_bucket["sessions"]["value"]["sum"].to_i,
average: session_bucket["sessions"]["value"]["average"].to_i,
median: session_bucket["sessions"]["value"]["median"].to_i
}
end
end
result
end
def query
{
size: 0,
query: {
filtered: {
filter: {
bool: {
must: must_filters,
should: should_filters
}
}
}
},
aggs: {
user_ids: {
terms: {
field: "user_id"
},
aggs: {
by_days: {
date_histogram: {
field: TIME_FIELD,
interval: "1d"
},
aggs: {
sessions: {
scripted_metric: {
init_script: "_agg['read_ats'] = []",
map_script: "_agg.read_ats.add(doc['#{ TIME_FIELD }'].value)",
combine_script: oneliner(%Q{
sessions = []
if (_agg.read_ats.size() < 2) {
return sessions
}
_agg.read_ats.sort()
session_started_at = _agg.read_ats[0]
previous_read_at = session_started_at
last_read_at = _agg.read_ats[-1]
for (read_at in _agg.read_ats[1..-1]) {
if (read_at - previous_read_at > #{ INACTIVITY_DURATION_IN_MILLISECONDS }) {
if (previous_read_at - session_started_at != 0) {
sessions << (previous_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN }
}
session_started_at = read_at
} else if (read_at == last_read_at && read_at - session_started_at != 0) {
sessions << (last_read_at - session_started_at) / #{ MILLISECONDS_IN_MIN }
}
previous_read_at = read_at
}
return sessions
}),
reduce_script: oneliner(%Q{
sessions = []
stats = [:]
for (shard_sessions in _aggs) { sessions.addAll(shard_sessions) }
session_count = sessions.size()
if (session_count == 0) { return stats }
sessions.sort()
median_session_position1 = (int)((session_count - 1) / 2)
median_session_position2 = (int)(session_count / 2)
stats.median = (sessions[median_session_position1] + sessions[median_session_position2]) / 2
stats.sum = sessions.sum()
stats.average = stats.sum / session_count
return stats
})
}
}
}
}
}
}
}
}
end
private
def oneliner(code)
code.gsub(/\A\s*|\s*\z/, "").gsub(/\n[ \n]*/, "; ").squish.gsub("{;", "{").gsub("; }", " }")
end
def must_filters
[
{ range: { read_at: { gte: params[:from_date], lte: params[:to_date] } } }
]
end
def should_filters
params[:user_ids].inject([]) do |result, user_id|
result << { term: { user_id: user_id } }
end
end
def elastic_adapter
Elasticsearch::Client.new(hosts: ElasticSettings.hosts, log: ElasticSettings.log)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.