Skip to content

Instantly share code, notes, and snippets.

@attilapiros
Created August 14, 2018 15:11
Show Gist options
  • Save attilapiros/3b5ef42c0f7aa08b0e2c834fbadfc574 to your computer and use it in GitHub Desktop.
Save attilapiros/3b5ef42c0f7aa08b0e2c834fbadfc574 to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kudu.spark.kudu
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.StreamSinkProvider
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.StreamTest
class StreamingSuite extends StreamTest with KuduTestSuite {
import testImplicits._
def transformation(v: Int): (Int, String) =
(v + 1, v.toString)
test("testing KuduContext with Streaming DataFrame") {
withTempDir { checkpointDir =>
val input = MemoryStream[Int]
val query = input
.toDS()
.map(v => (v + 1, v.toString))
.toDF("key", "value")
.writeStream
.format(classOf[KuduSinkProvider].getCanonicalName)
.option("kudu.master", miniCluster.getMasterAddresses)
.option("kudu.table", simpleTableName)
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.outputMode(OutputMode.Update)
.start()
def verifyOutput(expectedVersion: Int, expectedData: Seq[(Int, String)]): Unit = {
val df = sqlContext.read
.options(
Map("kudu.master" -> miniCluster.getMasterAddresses, "kudu.table" -> simpleTableName))
.kudu
val actual = df.rdd
.map { row =>
(row.get(0), row.getString(1))
}
.collect()
.toSet
assert(actual === expectedData.toSet)
}
// send data
input.addData(1, 2, 3, 4)
query.processAllAvailable()
verifyOutput(expectedVersion = 0, expectedData = (1 to 4).map(transformation))
query.stop()
}
}
}
class KuduSinkProvider extends StreamSinkProvider with DataSourceRegister {
override def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): Sink =
new KuduSink(sqlContext, parameters)
override def shortName(): String = "kudu"
}
class KuduSink(sqlContext: SQLContext, parameters: Map[String, String]) extends Sink {
private val kuduContext =
new KuduContext(parameters("kudu.master"), sqlContext.sparkContext)
private val tablename = parameters("kudu.table")
override def addBatch(batchId: Long, data: DataFrame): Unit = {
kuduContext.upsertRows(data, tablename)
}
}
@attilapiros
Copy link
Author

To compile this in Kudu the following maven dependencies are need:

          <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <type>test-jar</type>
            <scope>test</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <type>test-jar</type>
            <scope>test</scope>
          </dependency>
          <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <type>test-jar</type>
            <scope>test</scope>
          </dependency>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment