Skip to content

Instantly share code, notes, and snippets.

Created December 22, 2011 01:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/1508511 to your computer and use it in GitHub Desktop.
Save anonymous/1508511 to your computer and use it in GitHub Desktop.
%default WORK_DIR `pwd`;
%default INPUTfirstLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_firstLevel';
%default INPUTsecondLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_secondLevel';
%default INPUTthirdLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_thirdLevel';
%default OUTPUT '/wide/rollup/CategoryEnthusiastsByTopVisitors_final';
%default FIRST_LEVELS_PERCENTAGE 0.05
%default SECOND_LEVELS_PERCENTAGE 0.05
%default THIRD_LEVELS_PERCENTAGE 0.1
%default BOTS_PERCENTAGE 0.01
REGISTER '$WORK_DIR/pig-support.jar';
DEFINE stringUnsplit com.contextweb.pig.udf.StringUnsplit(',');
/*CONTEXTWEB:
start.date = -7D
end.date = -1D
cw.leave.tmp.output=true
cw.memcache.upload=true
cw.extends.properties=${hadoop.conf}/RawLog/memcache-connection.cfg
cw.memcache.recordconverter.classname=com.contextweb.memcache.convert.CategoryEnthusiastRecordConverter
cw.memcache.key=VisitorGUID
cw.memcache.values=ContextCategoryIds
# expiration timeout in seconds
cw.memcache.expires=86400
cw.memcache.schema.location=schema/CategoryEnthusiastsMemcacheUploadSchema.xml
*/
firstLevels = LOAD '$INPUTfirstLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml');
secondLevels = LOAD '$INPUTsecondLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml');
thirdLevels = LOAD '$INPUTthirdLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml');
--Processing third level categories
thirdLevelsProjected = FOREACH thirdLevels GENERATE
ChannelID,
VisitorGUID,
Impressions;
thirdLevelsGrouped = GROUP thirdLevelsProjected BY (ChannelID, VisitorGUID);
thirdLevelsSummed = FOREACH thirdLevelsGrouped GENERATE
FLATTEN(group),
SUM(thirdLevelsProjected.Impressions) AS Impressions;
thirdLevelsByCategory = GROUP thirdLevelsSummed BY (ChannelID) PARALLEL 30;
thirdLevelsCounts = FOREACH thirdLevelsByCategory GENERATE
group,
(int)(COUNT(thirdLevelsSummed) * (double) ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) AS TopNumber;
thirdLevelsJoined = JOIN thirdLevelsByCategory BY group, thirdLevelsCounts BY group USING 'replicated';
thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsJoined GENERATE FLATTEN(TOP(TopNumber, 2, thirdLevelsSummed));
thirdLevelsCleaned = FOREACH thirdLevelsTopVisitorsWithBots GENERATE
group::ChannelID AS ChannelID,
group::VisitorGUID AS VisitorGUID,
--we negate the sum to change the order in TOP
(Impressions * -1) AS Impressions;
thirdLevelsCleanedByCategory = GROUP thirdLevelsCleaned BY (ChannelID) PARALLEL 30;
thirdLevelsTopCounts = FOREACH thirdLevelsCleanedByCategory GENERATE
group,
(int)(COUNT(thirdLevelsCleaned) * (double) ($THIRD_LEVELS_PERCENTAGE / ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ) AS TopNumber;
thirdLevelsTopJoined = JOIN thirdLevelsCleanedByCategory BY group, thirdLevelsTopCounts BY group USING 'replicated';
thirdLevelsTopVisitors = FOREACH thirdLevelsTopJoined GENERATE FLATTEN(TOP(TopNumber, 2, thirdLevelsCleaned));
thirdLevelsTopVisitorsCleaned = FOREACH thirdLevelsTopVisitors GENERATE
ChannelID,
VisitorGUID,
--we negate to revert to the correct number
Impressions * -1 AS Impressions;
--------Processing second plus third level categories
secondLevelsProjected = FOREACH secondLevels GENERATE
ParentChannelID,
VisitorGUID,
Impressions;
secondLevelsGrouped = GROUP secondLevelsProjected BY (ParentChannelID, VisitorGUID);
secondLevelsSummed = FOREACH secondLevelsGrouped GENERATE
FLATTEN(group),
SUM(secondLevelsProjected.Impressions) AS Impressions;
secondLevelsByCategory = GROUP secondLevelsSummed BY (ParentChannelID) PARALLEL 30;
secondLevelsCounts = FOREACH secondLevelsByCategory GENERATE
group,
(int)(COUNT(secondLevelsSummed) * (double) ($SECOND_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) AS TopNumber;
secondLevelsJoined = JOIN secondLevelsByCategory BY group, secondLevelsCounts BY group USING 'replicated';
secondLevelsTopVisitorsWithBots = FOREACH secondLevelsJoined GENERATE FLATTEN(TOP(TopNumber, 2, secondLevelsSummed));
secondLevelsCleaned = FOREACH secondLevelsTopVisitorsWithBots GENERATE
group::ParentChannelID AS ParentChannelID,
group::VisitorGUID AS VisitorGUID,
--we negate the sum to change the order in TOP
(Impressions * -1) AS Impressions;
secondLevelsCleanedByCategory = GROUP secondLevelsCleaned BY (ParentChannelID) PARALLEL 30;
secondLevelsTopCounts = FOREACH secondLevelsCleanedByCategory GENERATE
group,
(int)(COUNT(secondLevelsCleaned) * (double) ($SECOND_LEVELS_PERCENTAGE / ($SECOND_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ) AS TopNumber;
secondLevelsTopJoined = JOIN secondLevelsCleanedByCategory BY group, secondLevelsTopCounts BY group USING 'replicated';
secondLevelsTopVisitors = FOREACH secondLevelsTopJoined GENERATE FLATTEN(TOP(TopNumber, 2, secondLevelsCleaned));
secondLevelsTopVisitorsCleaned = FOREACH secondLevelsTopVisitors GENERATE
ParentChannelID,
VisitorGUID,
--we negate to revert to the correct number
Impressions * -1 AS Impressions;
------Processing first plus second plus third level categories
firstLevelsProjected = FOREACH firstLevels GENERATE
TopChannelId,
VisitorGUID,
Impressions;
firstLevelsGrouped = GROUP firstLevelsProjected BY (TopChannelId, VisitorGUID);
firstLevelsSummed = FOREACH firstLevelsGrouped GENERATE
FLATTEN(group),
SUM(firstLevelsProjected.Impressions) AS Impressions;
firstLevelsByCategory = GROUP firstLevelsSummed BY (TopChannelId) PARALLEL 30;
firstLevelsCounts = FOREACH firstLevelsByCategory GENERATE
group,
(int)(COUNT(firstLevelsSummed) * (double) ($FIRST_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) AS TopNumber;
firstLevelsJoined = JOIN firstLevelsByCategory BY group, firstLevelsCounts BY group USING 'replicated';
firstLevelsTopVisitorsWithBots = FOREACH firstLevelsJoined GENERATE FLATTEN(TOP(TopNumber, 2, firstLevelsSummed));
firstLevelsCleaned = FOREACH firstLevelsTopVisitorsWithBots GENERATE
group::TopChannelId AS TopChannelId,
group::VisitorGUID AS VisitorGUID,
--we negate the sum to change the order in TOP
(Impressions * -1) AS Impressions;
firstLevelsCleanedByCategory = GROUP firstLevelsCleaned BY (TopChannelId) PARALLEL 30;
firstLevelsTopCounts = FOREACH firstLevelsCleanedByCategory GENERATE
group,
(int)(COUNT(firstLevelsCleaned) * (double) ($FIRST_LEVELS_PERCENTAGE / ($FIRST_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ) AS TopNumber;
firstLevelsTopJoined = JOIN firstLevelsCleanedByCategory BY group, firstLevelsTopCounts BY group USING 'replicated';
firstLevelsTopVisitors = FOREACH firstLevelsTopJoined GENERATE FLATTEN(TOP(TopNumber, 2, firstLevelsCleaned));
firstLevelsTopVisitorsCleaned = FOREACH firstLevelsTopVisitors GENERATE
TopChannelId,
VisitorGUID,
--we negate to revert to the correct number
Impressions * -1 AS Impressions;
---- Making result data
allTops = UNION thirdLevelsTopVisitorsCleaned, secondLevelsTopVisitorsCleaned, firstLevelsTopVisitorsCleaned;
allTopsByVisitor = GROUP allTops BY (VisitorGUID);
result = FOREACH allTopsByVisitor GENERATE
group AS VisitorGUID,
stringUnsplit(allTops.ChannelID) AS ContextCategoryIds;
STORE result INTO '$OUTPUT' USING com.contextweb.pig.CWHeaderStore('');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment