Skip to content

Instantly share code, notes, and snippets.

@bill-transue
Last active February 21, 2018 19:07
Show Gist options
  • Save bill-transue/9ff26fff985461264746f044d99a8b8a to your computer and use it in GitHub Desktop.
Save bill-transue/9ff26fff985461264746f044d99a8b8a to your computer and use it in GitHub Desktop.
package main.scala
import org.mongodb.scala.bson._
import org.mongodb.scala.bson.collection.mutable.Document
object RecordTransform {
def foldToDocument(results: Document, row: org.apache.spark.sql.Row): Document = {
var month_results = results.get[BsonDocument](row.getInt(0).toString)
var results_doc = month_results match {
case Some(doc) => doc
case None => BsonDocument("resubmitted" -> 0, "submitted" -> 0, "canceled" -> 0, "terminated" -> 0)
}
results_doc.put(row.getString(1), BsonInt64(row.getLong(2)))
results.put(row.getInt(0).toString, results_doc)
results
}
}
spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0,org.mongodb.scala:mongo-scala-driver_2.11:2.2.0 --jars target/scala-2.11/spark-mllib-test.jar
scala> import org.mongodb.scala.bson.collection.mutable.Document
import org.mongodb.scala.bson.collection.mutable.Document
scala> val join = spark.sql("SELECT...")
scala> join.show
18/02/21 14:02:44 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
+-----------------------------+-----------+-----+
|coverage_selected_start_month| status| obs|
+-----------------------------+-----------+-----+
| 1| canceled| 4534|
| 1|resubmitted| 274|
| 1| submitted|16770|
| 1| terminated| 2750|
| 2| canceled| 427|
| 2|resubmitted| 8|
| 2| submitted| 1651|
| 2| terminated| 189|
| 3| canceled| 401|
| 3|resubmitted| 9|
| 3| submitted| 2197|
| 3| terminated| 159|
| 4| canceled| 39|
| 4| submitted| 419|
| 4| terminated| 33|
| 5| canceled| 2|
| 5| submitted| 19|
| 5| terminated| 4|
| 6| submitted| 1|
+-----------------------------+-----------+-----+
scala> var results = org.mongodb.scala.bson.collection.mutable.Document
scala> join.toJavaRDD.fold(results) ((acc,row) => main.scala.RecordTransform.foldToDocument(acc,row))
<console>:29: error: type mismatch;
found : org.mongodb.scala.bson.collection.mutable.Document.type
required: org.apache.spark.sql.Row
join.toJavaRDD.fold(results) ((acc,row) => main.scala.RecordTransform.foldToDocument(acc,row))
^
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment