Skip to content

Instantly share code, notes, and snippets.

@kerinin
Last active December 13, 2015 17:09
Show Gist options
  • Save kerinin/4945992 to your computer and use it in GitHub Desktop.
Save kerinin/4945992 to your computer and use it in GitHub Desktop.
set default_parallel 12;
sls_watchlist = LOAD 's3://oib-mapreduce-rmichael/sls_watchlist_dumps' AS (
domain: chararray,
subject_line: chararray,
campaign_group: chararray,
mail_campaign_id: chararray,
hour: int
);
cid_watchlist = LOAD 's3://oib-mapreduce-rmichael/cid_watchlist_dumps' AS (
matching_id: chararray,
mail_campaign_id: chararray,
campaign_group: chararray,
hour: int
);
cid_records = LOAD 's3://oib-mapreduce-rmichael/cid_records_dumps' AS (
re_id: chararray,
matching_id: chararray,
service: chararray,
metric: chararray,
count: int,
hour: int
);
cid_and_cid_watchlist = JOIN cid_records BY (hour, matching_id), cid_watchlist BY (hour, matching_id);
cid_and_both_watchlists = JOIN cid_and_cid_watchlist BY (cid_records::hour, mail_campaign_id), sls_watchlist BY (hour, mail_campaign_id);
cid_and_both_watchlists = FOREACH cid_and_both_watchlists GENERATE domain AS domain, metric AS metric, count AS count;
cid_and_both_watchlists_only_counts = FILTER cid_and_both_watchlists BY (metric != 'received_dt');
cid_grouped_by_domain = GROUP cid_and_both_watchlists_only_counts BY domain;
cid_grouped_by_domain = FOREACH cid_grouped_by_domain GENERATE group AS domain, $1 AS rest;
cid_counts = FOREACH cid_grouped_by_domain GENERATE domain AS domain, SUM(rest.count) AS count;
read_rate_records = LOAD 's3://oib-mapreduce/output/folder_scan_rollups/2013/06/2013-02-12' AS (
scanned_at:chararray,
received_at:chararray,
mm_id,
source:chararray,
folder:chararray,
read,
subject:chararray,
ip:chararray,
header:chararray,
from:chararray,
user_id:chararray,
ham:int,
address_book,
important,
tags:chararray,
replied,
forwarded,
spam:int,
has_priority,
is_public,
rp_tag:chararray,
on_content_redaction_whitelist:int,
to_domain:chararray,
eea_address:chararray,
to_address:chararray,
message_id:chararray,
real_folder_name:chararray);
valid_records = FILTER read_rate_records BY (user_id != '') AND (subject != '') AND (from != '');
-- received_at or scanned_at ?
records_for_day = FILTER valid_records BY ((int)received_at / 86400 == 15747);
-- from equivalent to domain?
rr_grouped_by_from = GROUP records_for_day BY from;
rr_grouped_by_from = FOREACH rr_grouped_by_from GENERATE group AS from, $1 AS rest;
rr_counts = FOREACH rr_grouped_by_from GENERATE from AS from, COUNT(rest) AS count;
joined = JOIN rr_counts BY from FULL, cid_counts BY domain USING 'skewed';
-- Janky right join - memory explodes if rr_grouped_by_from isn't on the left
joined = FILTER joined BY cid_counts::count IS NOT NULL;
counts = FOREACH joined GENERATE
(domain is null ? from : domain) AS domain,
(rr_counts::count is null ? 0 : rr_counts::count) AS rr_count,
(cid_counts::count is null ? 0 : cid_counts::count) AS cid_count,
((float)rr_counts::count / (float)cid_counts::count) AS rr_cid_fraction;
STORE counts INTO 's3://oib-mapreduce-rmichael/output/cid-vs-rr6';
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment