Skip to content

Instantly share code, notes, and snippets.

@erraggy
Created December 3, 2013 00:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save erraggy/7762024 to your computer and use it in GitHub Desktop.
Save erraggy/7762024 to your computer and use it in GitHub Desktop.
class WebSearchAggregationJob extends HJob[NoSettings]("Aggregate web searches by site",
HMapReduceTask(
HTaskID("Aggregation task"),
HTaskConfigs(),
HIO(
HTableInput(WebCrawlingSchema.WebTable),
HTableOutput(WebCrawlingSchema.Sites)
),
new FromTableBinaryMapperFx(WebCrawlingSchema.WebTable) {
val webPage = row
val domain = new URL(webPage.rowid).getAuthority
ctr("Sites for domain" + domain)
val dates = webPage.family(_.searchMetrics)
for((dateOfSearches,searchCount) <- dates) {
val keyOutput = makeWritable{keyWriter=>
keyWriter.writeUTF(domain)
keyWriter.writeObj(dateOfSearches)
}
val valueOutput = makeWritable{valueWriter=>
valueWriter.writeLong(searchCount)
}
ctr("Dated metrics written for domain " + domain)
write(keyOutput, valueOutput)
}
},
new ToTableBinaryReducerFx(WebCrawlingSchema.Sites) {
val (domain, dateOfSearches) = readKey{keyInput=>
(keyInput.readUTF(), keyInput.readObj[DateMidnight])
}
var totalCounts = 0l
perValue{valueInput=>
totalCounts += valueInput.readLong
}
write(
WebCrawlingSchema.Sites.put(domain).valueMap(_.searchMetrics,Map(dateOfSearches->totalCounts))
)
}
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment