Created
December 22, 2011 01:35
-
-
Save anonymous/1508511 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%default WORK_DIR `pwd`; | |
%default INPUTfirstLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_firstLevel'; | |
%default INPUTsecondLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_secondLevel'; | |
%default INPUTthirdLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_thirdLevel'; | |
%default OUTPUT '/wide/rollup/CategoryEnthusiastsByTopVisitors_final'; | |
%default FIRST_LEVELS_PERCENTAGE 0.05 | |
%default SECOND_LEVELS_PERCENTAGE 0.05 | |
%default THIRD_LEVELS_PERCENTAGE 0.1 | |
%default BOTS_PERCENTAGE 0.01 | |
REGISTER '$WORK_DIR/pig-support.jar'; | |
DEFINE stringUnsplit com.contextweb.pig.udf.StringUnsplit(','); | |
/*CONTEXTWEB: | |
start.date = -7D | |
end.date = -1D | |
cw.leave.tmp.output=true | |
cw.memcache.upload=true | |
cw.extends.properties=${hadoop.conf}/RawLog/memcache-connection.cfg | |
cw.memcache.recordconverter.classname=com.contextweb.memcache.convert.CategoryEnthusiastRecordConverter | |
cw.memcache.key=VisitorGUID | |
cw.memcache.values=ContextCategoryIds | |
# expiration timeout in seconds | |
cw.memcache.expires=86400 | |
cw.memcache.schema.location=schema/CategoryEnthusiastsMemcacheUploadSchema.xml | |
*/ | |
firstLevels = LOAD '$INPUTfirstLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml'); | |
secondLevels = LOAD '$INPUTsecondLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml'); | |
thirdLevels = LOAD '$INPUTthirdLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml'); | |
--Processing third level categories | |
thirdLevelsProjected = FOREACH thirdLevels GENERATE | |
ChannelID, | |
VisitorGUID, | |
Impressions; | |
thirdLevelsGrouped = GROUP thirdLevelsProjected BY (ChannelID, VisitorGUID); | |
thirdLevelsSummed = FOREACH thirdLevelsGrouped GENERATE | |
FLATTEN(group), | |
SUM(thirdLevelsProjected.Impressions) AS Impressions; | |
thirdLevelsByCategory = GROUP thirdLevelsSummed BY (ChannelID) PARALLEL 30; | |
thirdLevelsCounts = FOREACH thirdLevelsByCategory GENERATE | |
group, | |
(int)(COUNT(thirdLevelsSummed) * (double) ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) AS TopNumber; | |
thirdLevelsJoined = JOIN thirdLevelsByCategory BY group, thirdLevelsCounts BY group USING 'replicated'; | |
thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsJoined GENERATE FLATTEN(TOP(TopNumber, 2, thirdLevelsSummed)); | |
thirdLevelsCleaned = FOREACH thirdLevelsTopVisitorsWithBots GENERATE | |
group::ChannelID AS ChannelID, | |
group::VisitorGUID AS VisitorGUID, | |
--we negate the sum to change the order in TOP | |
(Impressions * -1) AS Impressions; | |
thirdLevelsCleanedByCategory = GROUP thirdLevelsCleaned BY (ChannelID) PARALLEL 30; | |
thirdLevelsTopCounts = FOREACH thirdLevelsCleanedByCategory GENERATE | |
group, | |
(int)(COUNT(thirdLevelsCleaned) * (double) ($THIRD_LEVELS_PERCENTAGE / ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ) AS TopNumber; | |
thirdLevelsTopJoined = JOIN thirdLevelsCleanedByCategory BY group, thirdLevelsTopCounts BY group USING 'replicated'; | |
thirdLevelsTopVisitors = FOREACH thirdLevelsTopJoined GENERATE FLATTEN(TOP(TopNumber, 2, thirdLevelsCleaned)); | |
thirdLevelsTopVisitorsCleaned = FOREACH thirdLevelsTopVisitors GENERATE | |
ChannelID, | |
VisitorGUID, | |
--we negate to revert to the correct number | |
Impressions * -1 AS Impressions; | |
--------Processing second plus third level categories | |
secondLevelsProjected = FOREACH secondLevels GENERATE | |
ParentChannelID, | |
VisitorGUID, | |
Impressions; | |
secondLevelsGrouped = GROUP secondLevelsProjected BY (ParentChannelID, VisitorGUID); | |
secondLevelsSummed = FOREACH secondLevelsGrouped GENERATE | |
FLATTEN(group), | |
SUM(secondLevelsProjected.Impressions) AS Impressions; | |
secondLevelsByCategory = GROUP secondLevelsSummed BY (ParentChannelID) PARALLEL 30; | |
secondLevelsCounts = FOREACH secondLevelsByCategory GENERATE | |
group, | |
(int)(COUNT(secondLevelsSummed) * (double) ($SECOND_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) AS TopNumber; | |
secondLevelsJoined = JOIN secondLevelsByCategory BY group, secondLevelsCounts BY group USING 'replicated'; | |
secondLevelsTopVisitorsWithBots = FOREACH secondLevelsJoined GENERATE FLATTEN(TOP(TopNumber, 2, secondLevelsSummed)); | |
secondLevelsCleaned = FOREACH secondLevelsTopVisitorsWithBots GENERATE | |
group::ParentChannelID AS ParentChannelID, | |
group::VisitorGUID AS VisitorGUID, | |
--we negate the sum to change the order in TOP | |
(Impressions * -1) AS Impressions; | |
secondLevelsCleanedByCategory = GROUP secondLevelsCleaned BY (ParentChannelID) PARALLEL 30; | |
secondLevelsTopCounts = FOREACH secondLevelsCleanedByCategory GENERATE | |
group, | |
(int)(COUNT(secondLevelsCleaned) * (double) ($SECOND_LEVELS_PERCENTAGE / ($SECOND_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ) AS TopNumber; | |
secondLevelsTopJoined = JOIN secondLevelsCleanedByCategory BY group, secondLevelsTopCounts BY group USING 'replicated'; | |
secondLevelsTopVisitors = FOREACH secondLevelsTopJoined GENERATE FLATTEN(TOP(TopNumber, 2, secondLevelsCleaned)); | |
secondLevelsTopVisitorsCleaned = FOREACH secondLevelsTopVisitors GENERATE | |
ParentChannelID, | |
VisitorGUID, | |
--we negate to revert to the correct number | |
Impressions * -1 AS Impressions; | |
------Processing first plus second plus third level categories | |
firstLevelsProjected = FOREACH firstLevels GENERATE | |
TopChannelId, | |
VisitorGUID, | |
Impressions; | |
firstLevelsGrouped = GROUP firstLevelsProjected BY (TopChannelId, VisitorGUID); | |
firstLevelsSummed = FOREACH firstLevelsGrouped GENERATE | |
FLATTEN(group), | |
SUM(firstLevelsProjected.Impressions) AS Impressions; | |
firstLevelsByCategory = GROUP firstLevelsSummed BY (TopChannelId) PARALLEL 30; | |
firstLevelsCounts = FOREACH firstLevelsByCategory GENERATE | |
group, | |
(int)(COUNT(firstLevelsSummed) * (double) ($FIRST_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) AS TopNumber; | |
firstLevelsJoined = JOIN firstLevelsByCategory BY group, firstLevelsCounts BY group USING 'replicated'; | |
firstLevelsTopVisitorsWithBots = FOREACH firstLevelsJoined GENERATE FLATTEN(TOP(TopNumber, 2, firstLevelsSummed)); | |
firstLevelsCleaned = FOREACH firstLevelsTopVisitorsWithBots GENERATE | |
group::TopChannelId AS TopChannelId, | |
group::VisitorGUID AS VisitorGUID, | |
--we negate the sum to change the order in TOP | |
(Impressions * -1) AS Impressions; | |
firstLevelsCleanedByCategory = GROUP firstLevelsCleaned BY (TopChannelId) PARALLEL 30; | |
firstLevelsTopCounts = FOREACH firstLevelsCleanedByCategory GENERATE | |
group, | |
(int)(COUNT(firstLevelsCleaned) * (double) ($FIRST_LEVELS_PERCENTAGE / ($FIRST_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ) AS TopNumber; | |
firstLevelsTopJoined = JOIN firstLevelsCleanedByCategory BY group, firstLevelsTopCounts BY group USING 'replicated'; | |
firstLevelsTopVisitors = FOREACH firstLevelsTopJoined GENERATE FLATTEN(TOP(TopNumber, 2, firstLevelsCleaned)); | |
firstLevelsTopVisitorsCleaned = FOREACH firstLevelsTopVisitors GENERATE | |
TopChannelId, | |
VisitorGUID, | |
--we negate to revert to the correct number | |
Impressions * -1 AS Impressions; | |
---- Making result data | |
allTops = UNION thirdLevelsTopVisitorsCleaned, secondLevelsTopVisitorsCleaned, firstLevelsTopVisitorsCleaned; | |
allTopsByVisitor = GROUP allTops BY (VisitorGUID); | |
result = FOREACH allTopsByVisitor GENERATE | |
group AS VisitorGUID, | |
stringUnsplit(allTops.ChannelID) AS ContextCategoryIds; | |
STORE result INTO '$OUTPUT' USING com.contextweb.pig.CWHeaderStore(''); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment