Skip to content

Instantly share code, notes, and snippets.

@Test
public void testHistoryBotRemover() {
Fields pageViewFields = new Fields("id", "pageViewId", "path", "campaignTag", "sessionId", "userId", "remoteHost",
"host");
Fields sessionFields = new Fields("sessionId", "campaignTag");
TupleInputTap campaignPageViewsTap = new TupleInputTap("campaignPageViewsTap",
pageViewFields,
new Tuple(0L, "0", "/foo", "bar", 0L, 0L, "1.1.1.1", "foo.bar.com"),
new Tuple(3L, "3", "/foo", "bar", 1L, 0L, "1.1.1.1", "foo.bar.com"),
AvroInMemoryInputTap<UserEvent> userEventsTap = new AvroInMemoryInputTap<UserEvent>(UserEvent.getClassSchema(),
static class HistoryBotRemover extends SubAssembly {
private HistoryBotRemover() {
this(new Pipe("campaignPageViews"), new Pipe("userEvents"));
}
public HistoryBotRemover(Pipe campaignPageViews, Pipe userEvents) {
Pipe pageLoads = new Each(userEvents, new Fields(CATEGORY), new EqualsValue(PAGELOAD));
pageLoads = new Retain(pageLoads, new Fields(PAGE_VIEW_ID));
pageLoads = new Rename(pageLoads, new Fields(PAGE_VIEW_ID), new Fields(JOIN_FIELD));
Pipe loadedSessions = new CoGroup("loadedSessions", campaignPageViews, new Fields(PAGE_VIEW_ID), pageLoads,