Skip to content

Instantly share code, notes, and snippets.

@afa
Last active February 4, 2019 16:47
Show Gist options
  • Save afa/d9e6863305c96269a753162c78785607 to your computer and use it in GitHub Desktop.
Save afa/d9e6863305c96269a753162c78785607 to your computer and use it in GitHub Desktop.
small aggregation builder. ohm_ -- redis model, elastic_ -- elastic search index model.
# frozen_string_literal: true
class Aggregated::ProcessOrgResult
include Interactor
delegate :object, :answer_value, :answer_max_value, :depths, to: :context
def call
load_params
param = setup_param(@obj)
sub = setup_sub(@obj)
generate(@obj, param, sub)
end
def load_params
@obj = object
@old_answer = answer_value
@old_max_answer = answer_max_value
@depths = [depths].flatten.compact
@depths = (0..4).to_a if @depths.empty?
@whats = %i(subsystem_id block_id element_id feature_id parameter_id)
@whats_f = @depths.map { |i| @whats[i] }
@defs = { subsystem_id: nil, element_id: nil, feature_id: nil, block_id: nil, parameter_id: nil }
end
def setup_param(res)
param = {}
date_id = begin
RdsResultEmploymentDate.create(employment_date: @obj.employment_date.to_time.to_i)
rescue Ohm::UniqueIndexViolation
RdsResultEmploymentDate.find(employment_date: @obj.employment_date.to_time.to_i).first
end
param[:poll_id] = res.poll_id
param[:survey_kind] = res.survey_kind
param[:poll_employee_id] = res.poll_employee_id
param[:company_section_id] = res.company_section_id
param[:employee_survey_section_id] = res.employee_survey_section_id
param[:is_head] = res.is_head.to_s[0]
param[:employment_date_id] = date_id.id
param[:employment_date] = res.employment_date
param
end
def setup_sub(res)
sub = {}
sub[:flag_zero] = "f"
sub[:flag_zero] = "t" if res.answer_value.zero?
sub[:subsystem_id] = res.subsystem_id
sub[:block_id] = res.block_id
sub[:element_id] = res.element_id
sub[:feature_id] = res.feature_id
sub[:parameter_id] = res.parameter_id
sub
end
def generate(result, param, subparam)
zero = %w(f)
zero = %w(t) if subparam[:flag_zero] == "t"
sect = %w(f t)
sect.each do |sect_flag|
zero.each do |zero_flag|
@whats_f.each do |what|
key_hash = prepare_key_hash(param, sect_flag, zero_flag, what, subparam)
OrgAggregatedResult.with_advisory_lock("process_aggregate_" + OrgAggregatedResult.search_key(key_hash)) do
clean(result, key_hash)
make_aggregate(result, key_hash, sect_flag, param)
end
end
end
end
end
def prepare_key_hash(param, sect_flag, zero_flag, what, subparam)
key_hash = param.slice(
*[
:poll_id, :survey_kind, :employment_date,
:employment_date_id, :employee_survey_section_id,
:company_section_id, :poll_employee_id, :is_head
]
)
.merge(flag_section: sect_flag, flag_zero: zero_flag, answer_value: 0.0,
answer_max_value: 0.0, what => subparam[what])
.merge(@defs.slice(*(@whats - [what])))
key_hash[:company_section_id] = nil if sect_flag == "t"
key_hash[:employee_survey_section_id] = nil if sect_flag == "f"
key_hash
end
def clean(_res, key_hash)
oldkey = OrgAggregatedResult.search_key(key_hash.merge(flag_zero: "t")) if @old_answer&.zero?
oldkey ||= OrgAggregatedResult.search_key(key_hash)
aggr = OrgAggregatedResult.find_by(aggr_key: oldkey)
return unless aggr
aggr.update(
answer_value: aggr.answer_value - @old_answer.to_f,
answer_max_value: aggr.answer_max_value - @old_max.to_f
)
end
def make_aggregate(result, key_hash, sect_flag, param)
key = OrgAggregatedResult.search_key(key_hash)
aggr = OrgAggregatedResult.find_by(aggr_key: key)
aggr ||= OrgAggregatedResult.new(key_hash)
aggr.aggr_key = key
aggr.company_section_id = nil if sect_flag == "t"
aggr.employee_survey_section_id = nil if sect_flag == "f"
aggr.survey_kind = param[:survey_kind]
aggr.employment_date = param[:employment_date].to_time.to_i
aggr.employment_date_id = param[:employment_date_id]
aggr.answer_value += result.answer_value
aggr.answer_max_value += result.answer_max_value
aggr.save
end
end
# frozen_string_literal: true
class Elastic::ImportPollResults
include Interactor
delegate :poll_id, :new_only, to: :context
def call
data = []
ess_cache = EmployeeSurveySection.where(poll_employee_id: results.pluck(:poll_employee_id))
.group_by { |ess| [ess.poll_employee_id, ess.survey_id] }
q_cache = Hash[Answer.where(question_id: results.pluck("distinct poll_results.question_id"))
.group(:question_id).pluck("question_id, max(value)")]
results.each do |poll_result|
res = Elastic::Result.prepare_for_bulk(poll_result, ess_cache, q_cache)
data.push(res) if res
next if data.count < 500
update_elastic(data)
data = []
end
update_elastic(data) unless data.count.zero?
results.update_all(converted: true)
end
def update_elastic(data)
Elastic::Result.__elasticsearch__.client.bulk(
index: "elastic-results",
type: "result",
body: data.map { |item| { delete: { _id: item[:id] } } }
)
Elastic::Result.__elasticsearch__.client.bulk(
index: "elastic-results",
type: "result",
body: data.map { |item| { index: { _id: item[:id], data: item } } }
)
end
def results
unless @_results
@_results = PollResult.joins(:poll)
.where(polls: { type: "SaatPoll" })
.includes(:answer, :employee_survey_section,
poll_employee: [:employee, { company_section: { company: :company_section } }],
question: [:survey, {
saat_parameter: :saat_subsystem,
parent: { saat_parameter: :saat_subsystem }
}])
@_results = @_results.where(poll_id: poll_id) if poll_id
@_results = @_results.where.not(polls: { aasm_state: %w(new parsed) }).where(converted: false) if new_only
end
@_results
end
end
# frozen_string_literal: true
class Aggregated::Build
include Interactor
delegate :depths, :survey_kinds, :poll_ids, :employees, to: :context
def call
load_params
@polls.all.each do |poll|
OrgAggregatedResult
.where(poll_id: poll.id, poll_employee_id: @employee_ids, survey_kind: @kinds)
.update_all(answer_value: 0.0, answer_max_value: 0.0)
OrgResult.where(poll_id: poll.id, poll_employee_id: @employee_ids, survey_kind: @kinds).find_each do |res|
Aggregated::ProcessOrgResult.new(object: res, depths: @depths, survey_kinds: @kinds).call
end
poll.update(processed_result: OrgResult.where(poll_id: poll.id).pluck("max(id)").first)
end
end
def load_params
@depths = [depths].flatten.compact
@depths = (0..4).to_a if @depths.blank?
@kinds = [survey_kinds].flatten.compact
@kinds = [11, 12, 13, 21, 22, 23, 31, 32, 33] if @kinds.blank?
@polls = SaatPoll
@polls = SaatPoll.where(id: poll_ids) unless poll_ids.blank?
@employee_ids = [employees].flatten.compact
@employee_ids = OrgResult.where(poll_id: poll_ids).pluck("distinct poll_employee_id") if @employee_ids.blank?
end
end
# frozen_string_literal: true
class BackgroundBuildOrgAggregatedResultJob < ApplicationJob
queue_as :calculate
# @param poll_id [Integer] Id of saat polls to process
def perform(poll_id, kinds = nil, employees = nil)
Aggregated::Build.new(poll_ids: [poll_id], survey_kinds: kinds, employees: employees).call
end
end
# frozen_string_literal: true
class Elastic::Result
include Elasticsearch::Model
include Elasticsearch::Persistence::Model
attribute :id, Integer, mapping: { type: "integer" }
attribute :poll_id, Integer, mapping: { type: "integer" }
attribute :subsystem_id, Integer, mapping: { type: "integer", fielddata: true }
attribute :block_id, Integer, mapping: { type: "integer" }
attribute :element_id, Integer, mapping: { type: "integer" }
attribute :feature_id, Integer, mapping: { type: "integer" }
attribute :parameter_id, Integer, mapping: { type: "integer" }
attribute :answer_value, Decimal, mapping: { type: "double" }
attribute :answer_max_value, Decimal, mapping: { type: "double" }
attribute :company_section_id, Integer, mapping: { type: "integer" }
attribute :employee_survey_section_id, Integer, mapping: { type: "integer" }
attribute :survey_kind, Integer, mapping: { type: "integer" }
attribute :is_head, Boolean, mapping: { type: "boolean" }
attribute :employment_date, Date, mapping: { type: "date" }
def self.ess(poll_result, ess_cache)
ess_id = ess_cache[[poll_result.poll_employee_id, poll_result.question.survey_id]]&.first&.company_section_id
# if root company section - set to employee company section
if ess_id == poll_result.poll_employee.company_section.company.company_section.id
ess_id = poll_result.poll_employee.company_section_id
end
ess_id
end
def self.prepare_for_bulk(poll_result, ess_cache, q_cache)
ess_id = ess(poll_result, ess_cache)
ss = poll_result.question.saat_item
return nil if ss.nil? || poll_result.answer.value&.negative?
weight = ss&.criterion_weight || 1.0
weight = weight.zero? ? 1.0 : weight
max_val = q_cache[poll_result.question_id]
subsystem_id, block_id, element_id, feature_id = ss.ancestry.split("/").append(ss.id)
res = from_poll_result(poll_result, weight, max_val)
res[:feature_id] = feature_id.to_i
res[:block_id] = block_id.to_i
res[:element_id] = element_id.to_i
res[:subsystem_id] = subsystem_id.to_i
res[:company_section_id] = ess_id
res[:employee_survey_section_id] = ess_id
res
end
def self.from_poll_result(poll_result, weight, max_val)
res = {}
res[:id] = poll_result.id
res[:parameter_id] = poll_result.question.saat_parameter_item_id
res[:poll_id] = poll_result.poll_id
res[:poll_employee_id] = poll_result.poll_employee_id
res[:employment_date] = poll_result.poll_employee.employee.employment_date
res[:is_head] = poll_result.poll_employee.employee.head ? true : false
res[:survey_kind] = poll_result.question.survey.kind_number.to_i
res[:answer_value] = (poll_result.answer.value || 0) * weight
res[:answer_max_value] = (max_val || 0) * weight
res
end
def self.agg_for(kind)
{
kind.to_sym => {
terms: { shard_size: 200_000, size: 1000, field: "#{kind}_id" },
aggs: {
value: { sum: { field: "answer_value" } },
max_value: { sum: { field: "answer_max_value" } }
}
}
}
end
def self.answers(query, answer = false, head = [true, false])
selectors = []
selectors << { terms: { is_head: head } } unless head.nil?
query[:query][:bool][:must] += selectors
query[:query][:bool][:must_not] = { term: { answer_value: 0.0 } } if answer
query
end
def self.query_count_zero_answer(poll_id, zero_only = false, kinds = nil, poll_employees = nil)
sel = [{ term: { poll_id: poll_id } }]
sel << { term: { answer_value: 0.0 } } if zero_only
sel << { terms: { survey_kind: kinds } } if kinds&.present?
sel << { terms: { poll_employee_id: poll_employees } } if poll_employees&.present?
{
query: { bool: { must: sel } },
size: 0,
aggs: {
zero_answer_counts: {
terms: { field: :poll_employee_id, shard_size: 1_000_000, size: 200_000 }
}
}
}
end
def self.count_zero_answer(poll_id, kinds = nil, poll_employees = nil)
poll = SaatPoll.find(poll_id)
stop_date = (poll.stop_date || Time.zone.today) - 3.months
coefs = Hash[poll
.poll_employees
.joins(:employee)
.pluck("poll_employees.id, employees.employment_date")
.map { |sel| [sel.first, sel.last > stop_date ? 0.5 : 0.3] }]
z_count = count_by_query(poll_id, true, kinds, poll_employees)
count = count_by_query(poll_id, false, kinds, poll_employees)
count.keys.select { |key| (z_count.fetch(key, 0.0).to_f / count[key].to_f) < coefs[key] }
end
def self.count_by_query(poll_id, zero_only, kinds, poll_employees)
search(
query_count_zero_answer(
poll_id,
zero_only,
kinds,
poll_employees
)
).aggregations.zero_answer_counts.buckets.each_with_object({}) do |el, res|
res[el["key"]] = el["doc_count"]
end
end
def self.post_selectors(query, poll_id, date_employees, kinds, apply_unknown)
q = query.dup
selectors = []
pei = q.fetch(:query, {}).fetch(:bool, {}).fetch(:must)
.index { |it| it.fetch(:terms, {})[:poll_employee_id] }
pe = pei ? q[:query][:bool][:must][pei][:terms][:poll_employee_id].sort & date_employees.sort : nil
q[:query][:bool][:must].delete_at(pei) if pei
pe = count_zero_answer(poll_id, kinds, pe) if apply_unknown
selectors << { terms: { poll_employee_id: pe } }
q[:query][:bool][:must] += selectors
q
end
def self.selectors(poll, company_sections, employee_survey_sections, poll_employees, survey_kinds)
p poll, company_sections, employee_survey_sections, poll_employees, survey_kinds
selectors = []
selectors << { term: { poll_id: poll } } if poll
selectors << { terms: { survey_kind: survey_kinds } } unless survey_kinds.blank?
selectors << { terms: { company_section_id: company_sections } } unless company_sections.blank?
unless employee_survey_sections.blank?
selectors << { terms: { employee_survey_section_id: employee_survey_sections } }
end
selectors << { terms: { poll_employee_id: poll_employees } } unless poll_employees.blank?
{
query: { bool: { must: selectors } },
aggs: {}
}.tap {|x| p x}
end
def self.aggregators(query)
query[:aggs].merge!(agg_for("feature"))
query[:aggs].merge!(agg_for("subsystem"))
query[:aggs].merge!(agg_for("block"))
query[:aggs].merge!(agg_for("element"))
query[:aggs].merge!(agg_for("parameter"))
query
end
end
# frozen_string_literal: true
class RdsAggregatedResult < Ohm::Model
include Ohm::DataTypes
attribute :poll_id, Type::Integer
attribute :poll_employee_id, Type::Integer
attribute :subsystem_id, Type::Integer
attribute :block_id, Type::Integer
attribute :element_id, Type::Integer
attribute :feature_id, Type::Integer
attribute :survey_kind, Type::Integer
attribute :company_section_id, Type::Integer
attribute :is_head
attribute :employment_date, Type::Date
attribute :employment_date_id, Type::Integer
attribute :answer_value, Type::Decimal
attribute :answer_max_value, Type::Decimal
attribute :employee_survey_section_id, Type::Integer
attribute :parameter_id, Type::Integer
attribute :flag_zero
attribute :flag_section
attribute :aggr_key
index :aggr_key
unique :aggr_key
index :poll_id
index :poll_employee_id
index :subsystem_id
index :block_id
index :element_id
index :feature_id
index :company_section_id
index :survey_kind
index :is_head
index :employee_survey_section_id
index :parameter_id
index :employment_date_id
index :flag_zero
index :flag_section
def self.key_params
[
:poll_id,
:company_section_id,
:employee_survey_section_id,
:poll_employee_id,
:survey_kind,
:is_head,
:flag_zero,
:flag_section,
:subsystem_id,
:block_id,
:element_id,
:feature_id,
:parameter_id
]
end
def key_self
self.class.key_params.map { |attr| send(attr).to_s }.join(":")
end
def self.search_key(param)
key_params.map { |attr| param[attr].to_s }.join(":")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment