Created
November 21, 2011 17:09
-
-
Save anonymous/1383266 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%default WORK_DIR `pwd`; | |
%default INPUTfirstLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_firstLevel'; | |
%default INPUTsecondLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_secondLevel'; | |
%default INPUTthirdLevel '/wide/rollup/CategoryEnthusiastsByTopVisitors_thirdLevel'; | |
%default OUTPUT '/wide/rollup/CategoryEnthusiastsByTopVisitors_final'; | |
%default FIRST_LEVELS_PERCENTAGE 0.05 | |
%default SECOND_LEVELS_PERCENTAGE 0.05 | |
%default THIRD_LEVELS_PERCENTAGE 0.1 | |
%default BOTS_PERCENTAGE 0.01 | |
REGISTER '$WORK_DIR/pig-support.jar'; | |
DEFINE roundUp com.contextweb.pig.groupby.RoundDateUp(); | |
DEFINE stringUnsplit com.contextweb.pig.udf.StringUnsplit(','); | |
/*CONTEXTWEB: | |
start.date = -7D | |
end.date = -1D | |
group.by = 7D | |
mapred.child.java.opts=-Xmx1024m | |
cw.leave.tmp.output=true | |
cw.memcache.upload=true | |
cw.extends.properties=${hadoop.conf}/RawLog/memcache-connection.cfg | |
cw.memcache.recordconverter.classname=com.contextweb.memcache.convert.CategoryEnthusiastRecordConverter | |
cw.memcache.key=VisitorGUID | |
cw.memcache.values=ContextCategoryIds | |
# expiration timeout in seconds | |
cw.memcache.expires=86400 | |
cw.memcache.schema.location=schema/CategoryEnthusiastsMemcacheUploadSchema.xml | |
*/ | |
firstLevels = LOAD '$INPUTfirstLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml'); | |
secondLevels = LOAD '$INPUTsecondLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml'); | |
thirdLevels = LOAD '$INPUTthirdLevel' USING com.contextweb.pig.CWHeaderLoader('$WORK_DIR/schema/CategoryEnthusiastsSchema.xml'); | |
--Processing third level categories | |
thirdLevelsRounded = FOREACH thirdLevels GENERATE | |
roundUp(CheckPointStart) as CheckPointStart, | |
ChannelID, | |
VisitorGUID, | |
Impressions; | |
thirdLevelsGrouped = GROUP thirdLevelsRounded BY (CheckPointStart, ChannelID, VisitorGUID); | |
thirdLevelsSummed = FOREACH thirdLevelsGrouped GENERATE | |
FLATTEN(group), | |
SUM(thirdLevelsRounded.Impressions) AS Impressions; | |
thirdLevelsByCategory = GROUP thirdLevelsSummed BY (CheckPointStart, ChannelID) PARALLEL 30; | |
thirdLevelsTopVisitorsWithBots = FOREACH thirdLevelsByCategory { | |
count = COUNT(thirdLevelsSummed); | |
result = TOP( (int)(count * (double) ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, thirdLevelsSummed); | |
GENERATE FLATTEN(result); | |
} | |
thirdLevelsCleaned = FOREACH thirdLevelsTopVisitorsWithBots GENERATE | |
result::group::CheckPointStart AS CheckPointStart, | |
result::group::ChannelID AS ChannelID, | |
result::group::VisitorGUID AS VisitorGUID, | |
--we negate the sum to change the order in TOP | |
(result::Impressions * -1) AS Impressions; | |
thirdLevelsCleanedByCategory = GROUP thirdLevelsCleaned BY (CheckPointStart, ChannelID) PARALLEL 30; | |
thirdLevelsTopVisitors = FOREACH thirdLevelsCleanedByCategory { | |
count = COUNT(thirdLevelsCleaned); | |
result = TOP((int)(count * (double) ($THIRD_LEVELS_PERCENTAGE / ($THIRD_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ), 3, thirdLevelsCleaned); | |
GENERATE FLATTEN(result); | |
} | |
thirdLevelsTopVisitorsCleaned = FOREACH thirdLevelsTopVisitors GENERATE | |
CheckPointStart, | |
ChannelID, | |
VisitorGUID, | |
--we negate to revert to the correct number | |
Impressions * -1 AS Impressions; | |
------Processing second plus third level categories | |
secondLevelsRounded = FOREACH secondLevels GENERATE | |
roundUp(CheckPointStart) as CheckPointStart, | |
ParentChannelID, | |
VisitorGUID, | |
Impressions; | |
secondLevelsGrouped = GROUP secondLevelsRounded BY (CheckPointStart, ParentChannelID, VisitorGUID); | |
secondLevelsSummed = FOREACH secondLevelsGrouped GENERATE | |
FLATTEN(group), | |
SUM(secondLevelsRounded.Impressions) AS Impressions; | |
secondLevelsByCategory = GROUP secondLevelsSummed BY (CheckPointStart, ParentChannelID) PARALLEL 30; | |
secondLevelsTopVisitorsWithBots = FOREACH secondLevelsByCategory { | |
count = COUNT(secondLevelsSummed); | |
result = TOP( (int)(count * (double) ($SECOND_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, secondLevelsSummed); | |
GENERATE FLATTEN(result); | |
} | |
secondLevelsCleaned = FOREACH secondLevelsTopVisitorsWithBots GENERATE | |
result::group::CheckPointStart AS CheckPointStart, | |
result::group::ParentChannelID AS ParentChannelID, | |
result::group::VisitorGUID AS VisitorGUID, | |
--we negate the sum to change the order in TOP | |
(result::Impressions * -1) AS Impressions; | |
secondLevelsCleanedByCategory = GROUP secondLevelsCleaned BY (CheckPointStart, ParentChannelID) PARALLEL 30; | |
secondLevelsTopVisitors = FOREACH secondLevelsCleanedByCategory { | |
count = COUNT(secondLevelsCleaned); | |
result = TOP((int)(count * (double) ($SECOND_LEVELS_PERCENTAGE / ($SECOND_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ), 3, secondLevelsCleaned); | |
GENERATE FLATTEN(result); | |
} | |
secondLevelsTopVisitorsCleaned = FOREACH secondLevelsTopVisitors GENERATE | |
CheckPointStart, | |
ParentChannelID, | |
VisitorGUID, | |
--we negate to revert to the correct number | |
Impressions * -1 AS Impressions; | |
----Processing first plus second plus third level categories | |
firstLevelsRounded = FOREACH firstLevels GENERATE | |
roundUp(CheckPointStart) as CheckPointStart, | |
TopChannelId, | |
VisitorGUID, | |
Impressions; | |
firstLevelsGrouped = GROUP firstLevelsRounded BY (CheckPointStart, TopChannelId, VisitorGUID); | |
firstLevelsSummed = FOREACH firstLevelsGrouped GENERATE | |
FLATTEN(group), | |
SUM(firstLevelsRounded.Impressions) AS Impressions; | |
firstLevelsByCategory = GROUP firstLevelsSummed BY (CheckPointStart, TopChannelId) PARALLEL 30; | |
firstLevelsTopVisitorsWithBots = FOREACH firstLevelsByCategory { | |
count = COUNT(firstLevelsSummed); | |
result = TOP( (int)(count * (double) ($FIRST_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE) ), 3, firstLevelsSummed); | |
GENERATE FLATTEN(result); | |
} | |
firstLevelsCleaned = FOREACH firstLevelsTopVisitorsWithBots GENERATE | |
result::group::CheckPointStart AS CheckPointStart, | |
result::group::TopChannelId AS TopChannelId, | |
result::group::VisitorGUID AS VisitorGUID, | |
--we negate the sum to change the order in TOP | |
(result::Impressions * -1) AS Impressions; | |
firstLevelsCleanedByCategory = GROUP firstLevelsCleaned BY (CheckPointStart, TopChannelId) PARALLEL 30; | |
firstLevelsTopVisitors = FOREACH firstLevelsCleanedByCategory { | |
count = COUNT(firstLevelsCleaned); | |
result = TOP((int)(count * (double) ($FIRST_LEVELS_PERCENTAGE / ($FIRST_LEVELS_PERCENTAGE + $BOTS_PERCENTAGE)) ), 3, firstLevelsCleaned); | |
GENERATE FLATTEN(result); | |
} | |
firstLevelsTopVisitorsCleaned = FOREACH firstLevelsTopVisitors GENERATE | |
CheckPointStart, | |
TopChannelId, | |
VisitorGUID, | |
--we negate to revert to the correct number | |
Impressions * -1 AS Impressions; | |
-- Making result data | |
allTops = UNION thirdLevelsTopVisitorsCleaned, secondLevelsTopVisitorsCleaned, firstLevelsTopVisitorsCleaned; | |
allTopsByVisitor = GROUP allTops BY (CheckPointStart, VisitorGUID); | |
result = FOREACH allTopsByVisitor GENERATE | |
group.CheckPointStart AS CheckPointStart, | |
group.VisitorGUID AS VisitorGUID, | |
stringUnsplit(allTops.ChannelID) AS ContextCategoryIds; | |
STORE result INTO '$OUTPUT' USING com.contextweb.pig.CWHeaderStore('CheckPointStart', 'true'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment