Skip to content

Instantly share code, notes, and snippets.

@ldacosta
Last active February 12, 2016 21:58
Show Gist options
  • Save ldacosta/a00a967d6f6ab756beec to your computer and use it in GitHub Desktop.
Save ldacosta/a00a967d6f6ab756beec to your computer and use it in GitHub Desktop.
I want to write a data writer that is generic for the Warehouse API's data
/**
* I want to write a data writer that is generic for the Warehouse API's data
* however, this won't compile as d.toDF needs some implicits that can only be found if
* (1) T is a case class (that is why T <: Product) and
* (2) T is defiend in the proper place (see https://issues.scala-lang.org/browse/SI-6649 or http://stackoverflow.com/questions/33704831/value-todf-is-not-a-member-of-org-apache-spark-rdd-rdd)
*
* I think step (1) is OK, but the typetags (step (2)) are not working.
*/
case class GenericDataWriter[T <: Product](name: String, sqlC: SQLContext, stage: Stage, fmt: Format) extends Serializable with Logging with DataWriter[T] {
import sqlC.implicits._
def write(d: RDD[T]): Try[RDD[T]] = {
val fName = s"${name}_${stage.toString}.parquet"
val allLocations = locations.getOrElse((Program, stage, fmt), Set.empty[WarehousePath])
if (allLocations.isEmpty) {
fail(s"No locations specified to save '$name'")
} else {
fmt match {
case Parquet =>
val allResults =
allLocations.map { physicalLocation =>
physicalLocation match {
case l @ Local(dir) =>
d.toDF().write.parquet(l.getFullFileName(fName))
true
case s @ S3(bucketName) =>
// to write on S3 we took the convoluted path to
// (1) write a local file
val fileName = s"/tmp/$fName"
d.toDF().write.parquet(fileName)
// (2) copy it into S3's bucket
val dstFileName = s.getFullFileName(fName)
sparkutils.S3.writeFileToS3(bucketName, localFileName = fileName, dstFileName) match {
case None =>
logger.info(s"Writing to S3's bucket '${bucketName}', file '${dstFileName}' worked like a charm")
true
case Some(errMsg) =>
logger.error(s"Writing to S3's bucket '${bucketName}' failed miserably ($errMsg)")
false
}
}
}
// let's report the final results
allResults.count(_ == true) match {
case 0 =>
fail(s"No saving of '$name' was possible")
case n if n < allResults.size =>
logger.warn(s"'$name' was saved for some configurations, but not for others")
Success(d)
case _ =>
Success(d)
}
case f => fail(s"Save '$name' data on format $f has not been implemented")
}
}
}
}
@yawaramin
Copy link

Luis, I'll take a better look later, but one thing is that case classes automatically get Serializable, you don't need to specify it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment