Last active
February 4, 2019 16:47
-
-
Save afa/d9e6863305c96269a753162c78785607 to your computer and use it in GitHub Desktop.
small aggregation builder. ohm_ -- redis model, elastic_ -- elastic search index model.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class Aggregated::ProcessOrgResult | |
include Interactor | |
delegate :object, :answer_value, :answer_max_value, :depths, to: :context | |
def call | |
load_params | |
param = setup_param(@obj) | |
sub = setup_sub(@obj) | |
generate(@obj, param, sub) | |
end | |
def load_params | |
@obj = object | |
@old_answer = answer_value | |
@old_max_answer = answer_max_value | |
@depths = [depths].flatten.compact | |
@depths = (0..4).to_a if @depths.empty? | |
@whats = %i(subsystem_id block_id element_id feature_id parameter_id) | |
@whats_f = @depths.map { |i| @whats[i] } | |
@defs = { subsystem_id: nil, element_id: nil, feature_id: nil, block_id: nil, parameter_id: nil } | |
end | |
def setup_param(res) | |
param = {} | |
date_id = begin | |
RdsResultEmploymentDate.create(employment_date: @obj.employment_date.to_time.to_i) | |
rescue Ohm::UniqueIndexViolation | |
RdsResultEmploymentDate.find(employment_date: @obj.employment_date.to_time.to_i).first | |
end | |
param[:poll_id] = res.poll_id | |
param[:survey_kind] = res.survey_kind | |
param[:poll_employee_id] = res.poll_employee_id | |
param[:company_section_id] = res.company_section_id | |
param[:employee_survey_section_id] = res.employee_survey_section_id | |
param[:is_head] = res.is_head.to_s[0] | |
param[:employment_date_id] = date_id.id | |
param[:employment_date] = res.employment_date | |
param | |
end | |
def setup_sub(res) | |
sub = {} | |
sub[:flag_zero] = "f" | |
sub[:flag_zero] = "t" if res.answer_value.zero? | |
sub[:subsystem_id] = res.subsystem_id | |
sub[:block_id] = res.block_id | |
sub[:element_id] = res.element_id | |
sub[:feature_id] = res.feature_id | |
sub[:parameter_id] = res.parameter_id | |
sub | |
end | |
def generate(result, param, subparam) | |
zero = %w(f) | |
zero = %w(t) if subparam[:flag_zero] == "t" | |
sect = %w(f t) | |
sect.each do |sect_flag| | |
zero.each do |zero_flag| | |
@whats_f.each do |what| | |
key_hash = prepare_key_hash(param, sect_flag, zero_flag, what, subparam) | |
OrgAggregatedResult.with_advisory_lock("process_aggregate_" + OrgAggregatedResult.search_key(key_hash)) do | |
clean(result, key_hash) | |
make_aggregate(result, key_hash, sect_flag, param) | |
end | |
end | |
end | |
end | |
end | |
def prepare_key_hash(param, sect_flag, zero_flag, what, subparam) | |
key_hash = param.slice( | |
*[ | |
:poll_id, :survey_kind, :employment_date, | |
:employment_date_id, :employee_survey_section_id, | |
:company_section_id, :poll_employee_id, :is_head | |
] | |
) | |
.merge(flag_section: sect_flag, flag_zero: zero_flag, answer_value: 0.0, | |
answer_max_value: 0.0, what => subparam[what]) | |
.merge(@defs.slice(*(@whats - [what]))) | |
key_hash[:company_section_id] = nil if sect_flag == "t" | |
key_hash[:employee_survey_section_id] = nil if sect_flag == "f" | |
key_hash | |
end | |
def clean(_res, key_hash) | |
oldkey = OrgAggregatedResult.search_key(key_hash.merge(flag_zero: "t")) if @old_answer&.zero? | |
oldkey ||= OrgAggregatedResult.search_key(key_hash) | |
aggr = OrgAggregatedResult.find_by(aggr_key: oldkey) | |
return unless aggr | |
aggr.update( | |
answer_value: aggr.answer_value - @old_answer.to_f, | |
answer_max_value: aggr.answer_max_value - @old_max.to_f | |
) | |
end | |
def make_aggregate(result, key_hash, sect_flag, param) | |
key = OrgAggregatedResult.search_key(key_hash) | |
aggr = OrgAggregatedResult.find_by(aggr_key: key) | |
aggr ||= OrgAggregatedResult.new(key_hash) | |
aggr.aggr_key = key | |
aggr.company_section_id = nil if sect_flag == "t" | |
aggr.employee_survey_section_id = nil if sect_flag == "f" | |
aggr.survey_kind = param[:survey_kind] | |
aggr.employment_date = param[:employment_date].to_time.to_i | |
aggr.employment_date_id = param[:employment_date_id] | |
aggr.answer_value += result.answer_value | |
aggr.answer_max_value += result.answer_max_value | |
aggr.save | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class Elastic::ImportPollResults | |
include Interactor | |
delegate :poll_id, :new_only, to: :context | |
def call | |
data = [] | |
ess_cache = EmployeeSurveySection.where(poll_employee_id: results.pluck(:poll_employee_id)) | |
.group_by { |ess| [ess.poll_employee_id, ess.survey_id] } | |
q_cache = Hash[Answer.where(question_id: results.pluck("distinct poll_results.question_id")) | |
.group(:question_id).pluck("question_id, max(value)")] | |
results.each do |poll_result| | |
res = Elastic::Result.prepare_for_bulk(poll_result, ess_cache, q_cache) | |
data.push(res) if res | |
next if data.count < 500 | |
update_elastic(data) | |
data = [] | |
end | |
update_elastic(data) unless data.count.zero? | |
results.update_all(converted: true) | |
end | |
def update_elastic(data) | |
Elastic::Result.__elasticsearch__.client.bulk( | |
index: "elastic-results", | |
type: "result", | |
body: data.map { |item| { delete: { _id: item[:id] } } } | |
) | |
Elastic::Result.__elasticsearch__.client.bulk( | |
index: "elastic-results", | |
type: "result", | |
body: data.map { |item| { index: { _id: item[:id], data: item } } } | |
) | |
end | |
def results | |
unless @_results | |
@_results = PollResult.joins(:poll) | |
.where(polls: { type: "SaatPoll" }) | |
.includes(:answer, :employee_survey_section, | |
poll_employee: [:employee, { company_section: { company: :company_section } }], | |
question: [:survey, { | |
saat_parameter: :saat_subsystem, | |
parent: { saat_parameter: :saat_subsystem } | |
}]) | |
@_results = @_results.where(poll_id: poll_id) if poll_id | |
@_results = @_results.where.not(polls: { aasm_state: %w(new parsed) }).where(converted: false) if new_only | |
end | |
@_results | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class Aggregated::Build | |
include Interactor | |
delegate :depths, :survey_kinds, :poll_ids, :employees, to: :context | |
def call | |
load_params | |
@polls.all.each do |poll| | |
OrgAggregatedResult | |
.where(poll_id: poll.id, poll_employee_id: @employee_ids, survey_kind: @kinds) | |
.update_all(answer_value: 0.0, answer_max_value: 0.0) | |
OrgResult.where(poll_id: poll.id, poll_employee_id: @employee_ids, survey_kind: @kinds).find_each do |res| | |
Aggregated::ProcessOrgResult.new(object: res, depths: @depths, survey_kinds: @kinds).call | |
end | |
poll.update(processed_result: OrgResult.where(poll_id: poll.id).pluck("max(id)").first) | |
end | |
end | |
def load_params | |
@depths = [depths].flatten.compact | |
@depths = (0..4).to_a if @depths.blank? | |
@kinds = [survey_kinds].flatten.compact | |
@kinds = [11, 12, 13, 21, 22, 23, 31, 32, 33] if @kinds.blank? | |
@polls = SaatPoll | |
@polls = SaatPoll.where(id: poll_ids) unless poll_ids.blank? | |
@employee_ids = [employees].flatten.compact | |
@employee_ids = OrgResult.where(poll_id: poll_ids).pluck("distinct poll_employee_id") if @employee_ids.blank? | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class BackgroundBuildOrgAggregatedResultJob < ApplicationJob | |
queue_as :calculate | |
# @param poll_id [Integer] Id of saat polls to process | |
def perform(poll_id, kinds = nil, employees = nil) | |
Aggregated::Build.new(poll_ids: [poll_id], survey_kinds: kinds, employees: employees).call | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class Elastic::Result | |
include Elasticsearch::Model | |
include Elasticsearch::Persistence::Model | |
attribute :id, Integer, mapping: { type: "integer" } | |
attribute :poll_id, Integer, mapping: { type: "integer" } | |
attribute :subsystem_id, Integer, mapping: { type: "integer", fielddata: true } | |
attribute :block_id, Integer, mapping: { type: "integer" } | |
attribute :element_id, Integer, mapping: { type: "integer" } | |
attribute :feature_id, Integer, mapping: { type: "integer" } | |
attribute :parameter_id, Integer, mapping: { type: "integer" } | |
attribute :answer_value, Decimal, mapping: { type: "double" } | |
attribute :answer_max_value, Decimal, mapping: { type: "double" } | |
attribute :company_section_id, Integer, mapping: { type: "integer" } | |
attribute :employee_survey_section_id, Integer, mapping: { type: "integer" } | |
attribute :survey_kind, Integer, mapping: { type: "integer" } | |
attribute :is_head, Boolean, mapping: { type: "boolean" } | |
attribute :employment_date, Date, mapping: { type: "date" } | |
def self.ess(poll_result, ess_cache) | |
ess_id = ess_cache[[poll_result.poll_employee_id, poll_result.question.survey_id]]&.first&.company_section_id | |
# if root company section - set to employee company section | |
if ess_id == poll_result.poll_employee.company_section.company.company_section.id | |
ess_id = poll_result.poll_employee.company_section_id | |
end | |
ess_id | |
end | |
def self.prepare_for_bulk(poll_result, ess_cache, q_cache) | |
ess_id = ess(poll_result, ess_cache) | |
ss = poll_result.question.saat_item | |
return nil if ss.nil? || poll_result.answer.value&.negative? | |
weight = ss&.criterion_weight || 1.0 | |
weight = weight.zero? ? 1.0 : weight | |
max_val = q_cache[poll_result.question_id] | |
subsystem_id, block_id, element_id, feature_id = ss.ancestry.split("/").append(ss.id) | |
res = from_poll_result(poll_result, weight, max_val) | |
res[:feature_id] = feature_id.to_i | |
res[:block_id] = block_id.to_i | |
res[:element_id] = element_id.to_i | |
res[:subsystem_id] = subsystem_id.to_i | |
res[:company_section_id] = ess_id | |
res[:employee_survey_section_id] = ess_id | |
res | |
end | |
def self.from_poll_result(poll_result, weight, max_val) | |
res = {} | |
res[:id] = poll_result.id | |
res[:parameter_id] = poll_result.question.saat_parameter_item_id | |
res[:poll_id] = poll_result.poll_id | |
res[:poll_employee_id] = poll_result.poll_employee_id | |
res[:employment_date] = poll_result.poll_employee.employee.employment_date | |
res[:is_head] = poll_result.poll_employee.employee.head ? true : false | |
res[:survey_kind] = poll_result.question.survey.kind_number.to_i | |
res[:answer_value] = (poll_result.answer.value || 0) * weight | |
res[:answer_max_value] = (max_val || 0) * weight | |
res | |
end | |
def self.agg_for(kind) | |
{ | |
kind.to_sym => { | |
terms: { shard_size: 200_000, size: 1000, field: "#{kind}_id" }, | |
aggs: { | |
value: { sum: { field: "answer_value" } }, | |
max_value: { sum: { field: "answer_max_value" } } | |
} | |
} | |
} | |
end | |
def self.answers(query, answer = false, head = [true, false]) | |
selectors = [] | |
selectors << { terms: { is_head: head } } unless head.nil? | |
query[:query][:bool][:must] += selectors | |
query[:query][:bool][:must_not] = { term: { answer_value: 0.0 } } if answer | |
query | |
end | |
def self.query_count_zero_answer(poll_id, zero_only = false, kinds = nil, poll_employees = nil) | |
sel = [{ term: { poll_id: poll_id } }] | |
sel << { term: { answer_value: 0.0 } } if zero_only | |
sel << { terms: { survey_kind: kinds } } if kinds&.present? | |
sel << { terms: { poll_employee_id: poll_employees } } if poll_employees&.present? | |
{ | |
query: { bool: { must: sel } }, | |
size: 0, | |
aggs: { | |
zero_answer_counts: { | |
terms: { field: :poll_employee_id, shard_size: 1_000_000, size: 200_000 } | |
} | |
} | |
} | |
end | |
def self.count_zero_answer(poll_id, kinds = nil, poll_employees = nil) | |
poll = SaatPoll.find(poll_id) | |
stop_date = (poll.stop_date || Time.zone.today) - 3.months | |
coefs = Hash[poll | |
.poll_employees | |
.joins(:employee) | |
.pluck("poll_employees.id, employees.employment_date") | |
.map { |sel| [sel.first, sel.last > stop_date ? 0.5 : 0.3] }] | |
z_count = count_by_query(poll_id, true, kinds, poll_employees) | |
count = count_by_query(poll_id, false, kinds, poll_employees) | |
count.keys.select { |key| (z_count.fetch(key, 0.0).to_f / count[key].to_f) < coefs[key] } | |
end | |
def self.count_by_query(poll_id, zero_only, kinds, poll_employees) | |
search( | |
query_count_zero_answer( | |
poll_id, | |
zero_only, | |
kinds, | |
poll_employees | |
) | |
).aggregations.zero_answer_counts.buckets.each_with_object({}) do |el, res| | |
res[el["key"]] = el["doc_count"] | |
end | |
end | |
def self.post_selectors(query, poll_id, date_employees, kinds, apply_unknown) | |
q = query.dup | |
selectors = [] | |
pei = q.fetch(:query, {}).fetch(:bool, {}).fetch(:must) | |
.index { |it| it.fetch(:terms, {})[:poll_employee_id] } | |
pe = pei ? q[:query][:bool][:must][pei][:terms][:poll_employee_id].sort & date_employees.sort : nil | |
q[:query][:bool][:must].delete_at(pei) if pei | |
pe = count_zero_answer(poll_id, kinds, pe) if apply_unknown | |
selectors << { terms: { poll_employee_id: pe } } | |
q[:query][:bool][:must] += selectors | |
q | |
end | |
def self.selectors(poll, company_sections, employee_survey_sections, poll_employees, survey_kinds) | |
p poll, company_sections, employee_survey_sections, poll_employees, survey_kinds | |
selectors = [] | |
selectors << { term: { poll_id: poll } } if poll | |
selectors << { terms: { survey_kind: survey_kinds } } unless survey_kinds.blank? | |
selectors << { terms: { company_section_id: company_sections } } unless company_sections.blank? | |
unless employee_survey_sections.blank? | |
selectors << { terms: { employee_survey_section_id: employee_survey_sections } } | |
end | |
selectors << { terms: { poll_employee_id: poll_employees } } unless poll_employees.blank? | |
{ | |
query: { bool: { must: selectors } }, | |
aggs: {} | |
}.tap {|x| p x} | |
end | |
def self.aggregators(query) | |
query[:aggs].merge!(agg_for("feature")) | |
query[:aggs].merge!(agg_for("subsystem")) | |
query[:aggs].merge!(agg_for("block")) | |
query[:aggs].merge!(agg_for("element")) | |
query[:aggs].merge!(agg_for("parameter")) | |
query | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
class RdsAggregatedResult < Ohm::Model | |
include Ohm::DataTypes | |
attribute :poll_id, Type::Integer | |
attribute :poll_employee_id, Type::Integer | |
attribute :subsystem_id, Type::Integer | |
attribute :block_id, Type::Integer | |
attribute :element_id, Type::Integer | |
attribute :feature_id, Type::Integer | |
attribute :survey_kind, Type::Integer | |
attribute :company_section_id, Type::Integer | |
attribute :is_head | |
attribute :employment_date, Type::Date | |
attribute :employment_date_id, Type::Integer | |
attribute :answer_value, Type::Decimal | |
attribute :answer_max_value, Type::Decimal | |
attribute :employee_survey_section_id, Type::Integer | |
attribute :parameter_id, Type::Integer | |
attribute :flag_zero | |
attribute :flag_section | |
attribute :aggr_key | |
index :aggr_key | |
unique :aggr_key | |
index :poll_id | |
index :poll_employee_id | |
index :subsystem_id | |
index :block_id | |
index :element_id | |
index :feature_id | |
index :company_section_id | |
index :survey_kind | |
index :is_head | |
index :employee_survey_section_id | |
index :parameter_id | |
index :employment_date_id | |
index :flag_zero | |
index :flag_section | |
def self.key_params | |
[ | |
:poll_id, | |
:company_section_id, | |
:employee_survey_section_id, | |
:poll_employee_id, | |
:survey_kind, | |
:is_head, | |
:flag_zero, | |
:flag_section, | |
:subsystem_id, | |
:block_id, | |
:element_id, | |
:feature_id, | |
:parameter_id | |
] | |
end | |
def key_self | |
self.class.key_params.map { |attr| send(attr).to_s }.join(":") | |
end | |
def self.search_key(param) | |
key_params.map { |attr| param[attr].to_s }.join(":") | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment