Skip to content

Instantly share code, notes, and snippets.

@samklr
Created May 28, 2013 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samklr/5662644 to your computer and use it in GitHub Desktop.
Save samklr/5662644 to your computer and use it in GitHub Desktop.
class StocksDividendsRevisited8(args : Args) extends Job(args) {
val stocksSchema =
('symd, 'price_open, 'price_high, 'price_low, 'price_close, 'volume, 'price_adj_close)
val dividendsSchema = ('dymd, 'dividend)
def startPipe(rootPath: String, symbol: String, beforeSchema: Fields, afterSchema: Fields) =
new Csv(rootPath+"/"+symbol+".csv", beforeSchema)
.read
.write(Tsv("output/dump1"))
.map(beforeSchema -> afterSchema) { before: Fields => (symbol, before)}
.flatten[String]((Symbol(symbol), beforeSchema) -> afterSchema)
val symbols = args("symbols").split(",").toList
val stocksRoot = args("stocks-root-path")
val dividendsRoot = args("dividends-root-path")
val stocksPipes = symbols.map { symbol =>
new Csv(stocksRoot+"/"+symbol+".csv", stocksSchema)
.read
.mapTo(stocksSchema -> ('symd, 'ssymbol, 'price_close)) {
record: (String,String,String,String,String,String,String) => (symbol, record._1, record._5)
}
}
.reduceLeft( _ ++ _ )
val dividendsPipe = symbols.map { symbol =>
new Csv(dividendsRoot+"/"+symbol+".csv", dividendsSchema)
.read
.mapTo(dividendsSchema -> ('dymd, 'dsymbol, 'dividend)) {
record: (String,String) => (symbol, record._1, record._2)
}
}
.reduceLeft( _ ++ _ )
stocksPipes
.joinWithTiny(('ssymbol, 'symd) -> ('dsymbol, 'dymd), dividendsPipe)
.project('symd, 'ssymbol, 'price_close, 'dividend)
.write(Tsv(args("output")))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment