Last active
December 13, 2015 17:09
-
-
Save kerinin/4945992 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
set default_parallel 12; | |
sls_watchlist = LOAD 's3://oib-mapreduce-rmichael/sls_watchlist_dumps' AS ( | |
domain: chararray, | |
subject_line: chararray, | |
campaign_group: chararray, | |
mail_campaign_id: chararray, | |
hour: int | |
); | |
cid_watchlist = LOAD 's3://oib-mapreduce-rmichael/cid_watchlist_dumps' AS ( | |
matching_id: chararray, | |
mail_campaign_id: chararray, | |
campaign_group: chararray, | |
hour: int | |
); | |
cid_records = LOAD 's3://oib-mapreduce-rmichael/cid_records_dumps' AS ( | |
re_id: chararray, | |
matching_id: chararray, | |
service: chararray, | |
metric: chararray, | |
count: int, | |
hour: int | |
); | |
cid_and_cid_watchlist = JOIN cid_records BY (hour, matching_id), cid_watchlist BY (hour, matching_id); | |
cid_and_both_watchlists = JOIN cid_and_cid_watchlist BY (cid_records::hour, mail_campaign_id), sls_watchlist BY (hour, mail_campaign_id); | |
cid_and_both_watchlists = FOREACH cid_and_both_watchlists GENERATE domain AS domain, metric AS metric, count AS count; | |
cid_and_both_watchlists_only_counts = FILTER cid_and_both_watchlists BY (metric != 'received_dt'); | |
cid_grouped_by_domain = GROUP cid_and_both_watchlists_only_counts BY domain; | |
cid_grouped_by_domain = FOREACH cid_grouped_by_domain GENERATE group AS domain, $1 AS rest; | |
cid_counts = FOREACH cid_grouped_by_domain GENERATE domain AS domain, SUM(rest.count) AS count; | |
read_rate_records = LOAD 's3://oib-mapreduce/output/folder_scan_rollups/2013/06/2013-02-12' AS ( | |
scanned_at:chararray, | |
received_at:chararray, | |
mm_id, | |
source:chararray, | |
folder:chararray, | |
read, | |
subject:chararray, | |
ip:chararray, | |
header:chararray, | |
from:chararray, | |
user_id:chararray, | |
ham:int, | |
address_book, | |
important, | |
tags:chararray, | |
replied, | |
forwarded, | |
spam:int, | |
has_priority, | |
is_public, | |
rp_tag:chararray, | |
on_content_redaction_whitelist:int, | |
to_domain:chararray, | |
eea_address:chararray, | |
to_address:chararray, | |
message_id:chararray, | |
real_folder_name:chararray); | |
valid_records = FILTER read_rate_records BY (user_id != '') AND (subject != '') AND (from != ''); | |
-- received_at or scanned_at ? | |
records_for_day = FILTER valid_records BY ((int)received_at / 86400 == 15747); | |
-- from equivalent to domain? | |
rr_grouped_by_from = GROUP records_for_day BY from; | |
rr_grouped_by_from = FOREACH rr_grouped_by_from GENERATE group AS from, $1 AS rest; | |
rr_counts = FOREACH rr_grouped_by_from GENERATE from AS from, COUNT(rest) AS count; | |
joined = JOIN rr_counts BY from FULL, cid_counts BY domain USING 'skewed'; | |
-- Janky right join - memory explodes if rr_grouped_by_from isn't on the left | |
joined = FILTER joined BY cid_counts::count IS NOT NULL; | |
counts = FOREACH joined GENERATE | |
(domain is null ? from : domain) AS domain, | |
(rr_counts::count is null ? 0 : rr_counts::count) AS rr_count, | |
(cid_counts::count is null ? 0 : cid_counts::count) AS cid_count, | |
((float)rr_counts::count / (float)cid_counts::count) AS rr_cid_fraction; | |
STORE counts INTO 's3://oib-mapreduce-rmichael/output/cid-vs-rr6'; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment