Skip to content

Instantly share code, notes, and snippets.

@samklr
Created May 28, 2013 13:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samklr/5662632 to your computer and use it in GitHub Desktop.
Save samklr/5662632 to your computer and use it in GitHub Desktop.
class StocksDividendsJoin4(args : Args) extends Job(args) {
val stockSchema = ('symd, 'price_open, 'price_high, 'price_low, 'price_close, 'volume, 'price_adj_close)
val dividendsSchema = ('dymd, 'dividend)
val stocksPipe = new Csv(args("stocks"), stockSchema).read.project('symd, 'price_close)
val dividendsPipe = new Csv(args("dividends"), dividendsSchema) .read
/*
* Inner join! Use the "tiny" variant that attempts to replicate the dividend table
* to all nodes for faster joining.
* Then suppress the extra ymd and specify the remaining fields in the desired order.
*/
stocksPipe
.joinWithTiny('symd -> 'dymd, dividendsPipe)
.project('symd, 'price_close, 'dividend)
.write(Tsv(args("output")))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment