This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void testHistoryBotRemover() { | |
Fields pageViewFields = new Fields("id", "pageViewId", "path", "campaignTag", "sessionId", "userId", "remoteHost", | |
"host"); | |
Fields sessionFields = new Fields("sessionId", "campaignTag"); | |
TupleInputTap campaignPageViewsTap = new TupleInputTap("campaignPageViewsTap", | |
pageViewFields, | |
new Tuple(0L, "0", "/foo", "bar", 0L, 0L, "1.1.1.1", "foo.bar.com"), | |
new Tuple(3L, "3", "/foo", "bar", 1L, 0L, "1.1.1.1", "foo.bar.com"), | |
AvroInMemoryInputTap<UserEvent> userEventsTap = new AvroInMemoryInputTap<UserEvent>(UserEvent.getClassSchema(), |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
static class HistoryBotRemover extends SubAssembly { | |
private HistoryBotRemover() { | |
this(new Pipe("campaignPageViews"), new Pipe("userEvents")); | |
} | |
public HistoryBotRemover(Pipe campaignPageViews, Pipe userEvents) { | |
Pipe pageLoads = new Each(userEvents, new Fields(CATEGORY), new EqualsValue(PAGELOAD)); | |
pageLoads = new Retain(pageLoads, new Fields(PAGE_VIEW_ID)); | |
pageLoads = new Rename(pageLoads, new Fields(PAGE_VIEW_ID), new Fields(JOIN_FIELD)); | |
Pipe loadedSessions = new CoGroup("loadedSessions", campaignPageViews, new Fields(PAGE_VIEW_ID), pageLoads, |