Skip to content

Instantly share code, notes, and snippets.

@helena
Last active December 20, 2015 21:08
Show Gist options
  • Save helena/6194946 to your computer and use it in GitHub Desktop.
Save helena/6194946 to your computer and use it in GitHub Desktop.
TopAByBJob is a daily job which pulls .pailfile data with pathing by date/time and type from S3. Data must be grouped by A and B, sorted by B, then only written for the top n (keep) for each A, based on B (count), descending. I wrote all of the base jobs such as DailyJobWithKeep (for use by daily jobs needing a 'keepN') extends DailyJob extends …
Which generates a part-file of data in the format of:
2034 cid1 a
1025 cid1 g
2034 cid3 g
1025 cid3 a
2034 cid6 f
1025 cid6 b
class TopAByBJob(args: Args) extends DailyJobWithKeep(args, classOf[ProtobufTypeForS3PathPartition]) with TypeAFilters {
PailSource.source[FooProtobuf](rootpath, structure, directories).read
.mapTo('pailItem -> ('b, 'a)) { e: FooProtobuf ⇒ e.b -> calculateA(e) }
.filter('a) { n: String ⇒ n.nonEmpty }
.groupBy(('b, 'a)) { _.size('count) }
.groupBy('b) { _.sortedReverseTake[(Long, String, String)](('count, 'b, 'a) -> 'tcount, keep) }
.flatMapTo('tcount -> ('count, 'b, 'a)) { t: (List[(Long, String, String)]) ⇒ t }
.write(Tsv(outputdir))
}
@johnynek
Copy link

johnynek commented Aug 9, 2013

sortBy on line 6 is not needed, but sadly, may not be a no-op.

@helena
Copy link
Author

helena commented Aug 11, 2013

Thanks for the simplification Oscar :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment