Skip to content

Instantly share code, notes, and snippets.

Created November 21, 2011 17:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/1383266 to your computer and use it in GitHub Desktop.
Save anonymous/1383266 to your computer and use it in GitHub Desktop.
%default WORK_DIR `pwd`;
%default INPUTfirstLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_firstLevel';
%default INPUTsecondLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_secondLevel';
%default INPUTthirdLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_thirdLevel';
%default OUTPUT '/wide/rollup/CategoryEnthusiastsByTopVisitors_final';
%default FIRST_LEVELS_PERCENTAGE 0.05
%default SECOND_LEVELS_PERCENTAGE 0.05
%default THIRD_LEVELS_PERCENTAGE 0.1
%default BOTS_PERCENTAGE 0.01
REGISTER '$WORK_DIR/pig-support.jar';
DEFINE roundUp com.contextweb.pig.groupby.RoundDateUp();
DEFINE stringUnsplit com.contextweb.pig.udf.StringUnsplit(',');
/*CONTEXTWEB:
start.date = -7D
end.date = -1D
group.by = 7D
mapred.child.java.opts=-Xmx1024m
cw.leave.tmp.output=true
cw.memcache.upload=true
cw.extends.properties=${hadoop.conf}/RawLog/memcache-connection.cfg
cw.memcache.recordconverter.classname=com.contextweb.memcache.convert.CategoryEnthusiastRecordConverter
cw.memcache.key=VisitorGUID
cw.memcache.values=ContextCategoryIds
# expiration timeout in seconds
cw.memcache.expires=86400
cw.memcache.schema.location=schema/CategoryEnthusiastsMemcacheUploadSchema.xml
*/
firstLevels = LOAD '$INPUTfirstLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml');
secondLevels = LOAD '$INPUTsecondLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml');
thirdLevels = LOAD '$INPUTthirdLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml');
--Processing third level categories
thirdLevelsRounded = FOREACH thirdLevels GENERATE
roundUp(CheckPointStart) as CheckPointStart,
ChannelID,
VisitorGUID,
Impressions;
thirdLevelsGrouped = GROUP thirdLevelsRounded BY (CheckPointStart, ChannelID, VisitorGUID);
thirdLevelsSummed = FOREACH thirdLevelsGrouped GENERATE
FLATTEN(group),
SUM(thirdLevelsRounded.Impressions) AS Impressions;
thirdLevelsByCategory = GROUP thirdLevelsSummed BY (CheckPointStart, ChannelID) PARALLEL 30;
thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory {
count = COUNT(thirdLevelsSummed);
result = TOP( (int)(count * (double) ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, thirdLevelsSummed);
GENERATE FLATTEN(result);
}
thirdLevelsCleaned = FOREACH thirdLevelsTopVisitorsWithBots GENERATE
result::group::CheckPointStart AS CheckPointStart,
result::group::ChannelID AS ChannelID,
result::group::VisitorGUID AS VisitorGUID,
--we negate the sum to change the order in TOP
(result::Impressions * -1) AS Impressions;
thirdLevelsCleanedByCategory = GROUP thirdLevelsCleaned BY (CheckPointStart, ChannelID) PARALLEL 30;
thirdLevelsTopVisitors = FOREACH thirdLevelsCleanedByCategory {
count = COUNT(thirdLevelsCleaned);
result = TOP((int)(count * (double) ($THIRD_LEVELS_PERCENTAGE / ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ), 3, thirdLevelsCleaned);
GENERATE FLATTEN(result);
}
thirdLevelsTopVisitorsCleaned = FOREACH thirdLevelsTopVisitors GENERATE
CheckPointStart,
ChannelID,
VisitorGUID,
--we negate to revert to the correct number
Impressions * -1 AS Impressions;
------Processing second plus third level categories
secondLevelsRounded = FOREACH secondLevels GENERATE
roundUp(CheckPointStart) as CheckPointStart,
ParentChannelID,
VisitorGUID,
Impressions;
secondLevelsGrouped = GROUP secondLevelsRounded BY (CheckPointStart, ParentChannelID, VisitorGUID);
secondLevelsSummed = FOREACH secondLevelsGrouped GENERATE
FLATTEN(group),
SUM(secondLevelsRounded.Impressions) AS Impressions;
secondLevelsByCategory = GROUP secondLevelsSummed BY (CheckPointStart, ParentChannelID) PARALLEL 30;
secondLevelsTopVisitorsWithBots = FOREACH secondLevelsByCategory {
count = COUNT(secondLevelsSummed);
result = TOP( (int)(count * (double) ($SECOND_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, secondLevelsSummed);
GENERATE FLATTEN(result);
}
secondLevelsCleaned = FOREACH secondLevelsTopVisitorsWithBots GENERATE
result::group::CheckPointStart AS CheckPointStart,
result::group::ParentChannelID AS ParentChannelID,
result::group::VisitorGUID AS VisitorGUID,
--we negate the sum to change the order in TOP
(result::Impressions * -1) AS Impressions;
secondLevelsCleanedByCategory = GROUP secondLevelsCleaned BY (CheckPointStart, ParentChannelID) PARALLEL 30;
secondLevelsTopVisitors = FOREACH secondLevelsCleanedByCategory {
count = COUNT(secondLevelsCleaned);
result = TOP((int)(count * (double) ($SECOND_LEVELS_PERCENTAGE / ($SECOND_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ), 3, secondLevelsCleaned);
GENERATE FLATTEN(result);
}
secondLevelsTopVisitorsCleaned = FOREACH secondLevelsTopVisitors GENERATE
CheckPointStart,
ParentChannelID,
VisitorGUID,
--we negate to revert to the correct number
Impressions * -1 AS Impressions;
----Processing first plus second plus third level categories
firstLevelsRounded = FOREACH firstLevels GENERATE
roundUp(CheckPointStart) as CheckPointStart,
TopChannelId,
VisitorGUID,
Impressions;
firstLevelsGrouped = GROUP firstLevelsRounded BY (CheckPointStart, TopChannelId, VisitorGUID);
firstLevelsSummed = FOREACH firstLevelsGrouped GENERATE
FLATTEN(group),
SUM(firstLevelsRounded.Impressions) AS Impressions;
firstLevelsByCategory = GROUP firstLevelsSummed BY (CheckPointStart, TopChannelId) PARALLEL 30;
firstLevelsTopVisitorsWithBots = FOREACH firstLevelsByCategory {
count = COUNT(firstLevelsSummed);
result = TOP( (int)(count * (double) ($FIRST_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, firstLevelsSummed);
GENERATE FLATTEN(result);
}
firstLevelsCleaned = FOREACH firstLevelsTopVisitorsWithBots GENERATE
result::group::CheckPointStart AS CheckPointStart,
result::group::TopChannelId AS TopChannelId,
result::group::VisitorGUID AS VisitorGUID,
--we negate the sum to change the order in TOP
(result::Impressions * -1) AS Impressions;
firstLevelsCleanedByCategory = GROUP firstLevelsCleaned BY (CheckPointStart, TopChannelId) PARALLEL 30;
firstLevelsTopVisitors = FOREACH firstLevelsCleanedByCategory {
count = COUNT(firstLevelsCleaned);
result = TOP((int)(count * (double) ($FIRST_LEVELS_PERCENTAGE / ($FIRST_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ), 3, firstLevelsCleaned);
GENERATE FLATTEN(result);
}
firstLevelsTopVisitorsCleaned = FOREACH firstLevelsTopVisitors GENERATE
CheckPointStart,
TopChannelId,
VisitorGUID,
--we negate to revert to the correct number
Impressions * -1 AS Impressions;
-- Making result data
allTops = UNION thirdLevelsTopVisitorsCleaned, secondLevelsTopVisitorsCleaned, firstLevelsTopVisitorsCleaned;
allTopsByVisitor = GROUP allTops BY (CheckPointStart, VisitorGUID);
result = FOREACH allTopsByVisitor GENERATE
group.CheckPointStart AS CheckPointStart,
group.VisitorGUID AS VisitorGUID,
stringUnsplit(allTops.ChannelID) AS ContextCategoryIds;
STORE result INTO '$OUTPUT' USING com.contextweb.pig.CWHeaderStore('CheckPointStart', 'true');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment