Skip to content

Instantly share code, notes, and snippets.

@ingenthr
Last active November 24, 2015 23:44
Show Gist options
  • Save ingenthr/b7063054e67c53e7ce20 to your computer and use it in GitHub Desktop.
Save ingenthr/b7063054e67c53e7ce20 to your computer and use it in GitHub Desktop.
Blog: Bulk Transformations of Couchbase Data Using Apache Spark with an External Source
/** Returns an RDD based on email address extracted from the document */
def CreateMappableRdd(s: (String, String)): (String, JsonDocument) = {
val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2))
(return_doc.content().getString("email"), return_doc)
}
/** Returns a JsonDocument enriched with the entitlement token */
def mergeIntoDoc(t: (String, (JsonDocument, Integer))): JsonDocument = {
val jsonToEnrich = t._2._1.content()
val entitlementFromJoin = t._2._2
jsonToEnrich.put("entitlementtoken", entitlementFromJoin)
t._2._1
}
val ssc = new StreamingContext(sc, Seconds(5))
ssc.couchbaseStream("transformative")
.filter(_.isInstanceOf[Mutation])
.map(m => (new String(m.asInstanceOf[Mutation].key), new String(m.asInstanceOf[Mutation].content)))
// load the DataFrame of all of the users from MySQL.
// Note, appending .cache() may make sense here (or not) depending on amount of data.
val entitlements = mysqlReader.load()
/* loading this:
+---------+-----------+-----------------+----------------+
|givenname| surname| email|entitlementtoken|
+---------+-----------+-----------------+----------------+
| Matt| Ingenthron| matt@email.com| 11211|
| Michael|Nitschinger|michael@email.com| 11210|
+---------+-----------+-----------------+----------------+
*/
val entitlementsSansSchema = entitlements.rdd.map[(String, Integer)](f => (f.getAs[String]("email"), f.getAs[Integer]("entitlementtoken")))
val ssc = new StreamingContext(sc, Seconds(5))
ssc.couchbaseStream("transformative")
.filter(_.isInstanceOf[Mutation])
.map(m => (new String(m.asInstanceOf[Mutation].key), new String(m.asInstanceOf[Mutation].content)))
.map(s => CreateMappableRdd(s))
.filter(_._2.content().get("entitlementtoken").eq(null))
.foreachRDD(rdd => {
rdd
.join(entitlementsSansSchema)
.map(mergeIntoDoc)
//.foreach(println) // a good place to see the effect
.saveToCouchbase("transformative")
})
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment