Last active
November 24, 2015 23:44
-
-
Save ingenthr/b7063054e67c53e7ce20 to your computer and use it in GitHub Desktop.
Blog: Bulk Transformations of Couchbase Data Using Apache Spark with an External Source
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Returns an RDD based on email address extracted from the document */ | |
def CreateMappableRdd(s: (String, String)): (String, JsonDocument) = { | |
val return_doc = JsonDocument.create(s._1, JsonObject.fromJson(s._2)) | |
(return_doc.content().getString("email"), return_doc) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Returns a JsonDocument enriched with the entitlement token */ | |
def mergeIntoDoc(t: (String, (JsonDocument, Integer))): JsonDocument = { | |
val jsonToEnrich = t._2._1.content() | |
val entitlementFromJoin = t._2._2 | |
jsonToEnrich.put("entitlementtoken", entitlementFromJoin) | |
t._2._1 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val ssc = new StreamingContext(sc, Seconds(5)) | |
ssc.couchbaseStream("transformative") | |
.filter(_.isInstanceOf[Mutation]) | |
.map(m => (new String(m.asInstanceOf[Mutation].key), new String(m.asInstanceOf[Mutation].content))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// load the DataFrame of all of the users from MySQL. | |
// Note, appending .cache() may make sense here (or not) depending on amount of data. | |
val entitlements = mysqlReader.load() | |
/* loading this: | |
+---------+-----------+-----------------+----------------+ | |
|givenname| surname| email|entitlementtoken| | |
+---------+-----------+-----------------+----------------+ | |
| Matt| Ingenthron| matt@email.com| 11211| | |
| Michael|Nitschinger|michael@email.com| 11210| | |
+---------+-----------+-----------------+----------------+ | |
*/ | |
val entitlementsSansSchema = entitlements.rdd.map[(String, Integer)](f => (f.getAs[String]("email"), f.getAs[Integer]("entitlementtoken"))) | |
val ssc = new StreamingContext(sc, Seconds(5)) | |
ssc.couchbaseStream("transformative") | |
.filter(_.isInstanceOf[Mutation]) | |
.map(m => (new String(m.asInstanceOf[Mutation].key), new String(m.asInstanceOf[Mutation].content))) | |
.map(s => CreateMappableRdd(s)) | |
.filter(_._2.content().get("entitlementtoken").eq(null)) | |
.foreachRDD(rdd => { | |
rdd | |
.join(entitlementsSansSchema) | |
.map(mergeIntoDoc) | |
//.foreach(println) // a good place to see the effect | |
.saveToCouchbase("transformative") | |
}) | |
ssc.start() | |
ssc.awaitTermination() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment