Skip to content

Instantly share code, notes, and snippets.

@andypern
Created May 28, 2014 20:21
Show Gist options
  • Save andypern/5a205e11ab25763ac854 to your computer and use it in GitHub Desktop.
Save andypern/5a205e11ab25763ac854 to your computer and use it in GitHub Desktop.
val records = ssc.socketTextStream(host, port.toInt, StorageLevel.MEMORY_ONLY_SER)
//basically, foreach rdd inside the Dstream, perform a 'collect' on the RDD, which creates an array,
// and run a foreach on the elements within the array. Maybe there's a more 'sparky' way of doing this...
records.foreach(rdd => {
val rddarray = rdd.collect
if(rddarray.length > 0) {
var linecount = 0
for(line <- rddarray) {
linecount += 1
//time to split this row into words, from scala-cookbook, the .trim removes leading/trailing
//spaces from the values.
val Array(resID, date, time, hz, disp, flo, sedPPM, psi, chlPPM) = line.split(",").map(_.trim)
//since tableau is lame about datefields, need to combine date+time
val dateTime = date + " " + time
// Time to create a compositekey for our M7 rowkey
val compositeKey = resID + "_" + dateTime
//now we need some code to shove into m7..
val tblPut = new Put(Bytes.toBytes(compositeKey))
//build our tblPut object with multiple columns.
// TODO: probably better done w/ a loop, but that's for another day.
tblPut.add(Bytes.toBytes("cf1"),Bytes.toBytes("resID"),Bytes.toBytes(resID))
tblPut.add(Bytes.toBytes("cf1"),Bytes.toBytes("date"),Bytes.toBytes(dateTime))
tblPut.add(Bytes.toBytes("cf1"),Bytes.toBytes("hz"),Bytes.toBytes(hz))
tblPut.add(Bytes.toBytes("cf1"),Bytes.toBytes("disp"),Bytes.toBytes(disp))
tblPut.add(Bytes.toBytes("cf1"),Bytes.toBytes("flo"),Bytes.toBytes(flo))
tblPut.add(Bytes.toBytes("cf1"),Bytes.toBytes("sedPPM"),Bytes.toBytes(sedPPM))
tblPut.add(Bytes.toBytes("cf1"),Bytes.toBytes("psi"),Bytes.toBytes(psi))
tblPut.add(Bytes.toBytes("cf1"),Bytes.toBytes("chlPPM"),Bytes.toBytes(chlPPM))
table.put(tblPut)
Files.append(resID + "," + dateTime + "," + hz + "," + disp + "," + flo + "," + sedPPM + "," + psi + "," + chlPPM + "\n", outputFile, Charset.defaultCharset())
}
println("dumped " + linecount + " rows to table " + tablename + " and wrote them to " + outputPath)
}
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment