Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Java API for Spark Cassandra Connector - tutorial for blog post
package com.datastax.spark.demo;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class JavaDemo implements Serializable {
private transient SparkConf conf;
private JavaDemo(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
generateData(sc);
compute(sc);
showResults(sc);
sc.stop();
}
private void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
// Prepare the schema
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
}
// Prepare the products hierarchy
List<Product> products = Arrays.asList(
new Product(0, "All products", Collections.<Integer>emptyList()),
new Product(1, "Product A", Arrays.asList(0)),
new Product(4, "Product A1", Arrays.asList(0, 1)),
new Product(5, "Product A2", Arrays.asList(0, 1)),
new Product(2, "Product B", Arrays.asList(0)),
new Product(6, "Product B1", Arrays.asList(0, 2)),
new Product(7, "Product B2", Arrays.asList(0, 2)),
new Product(3, "Product C", Arrays.asList(0)),
new Product(8, "Product C1", Arrays.asList(0, 3)),
new Product(9, "Product C2", Arrays.asList(0, 3))
);
JavaRDD<Product> productsRDD = sc.parallelize(products);
javaFunctions(productsRDD, Product.class).saveToCassandra("java_api", "products");
JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
@Override
public Boolean call(Product product) throws Exception {
return product.getParents().size() == 2;
}
}).flatMap(new FlatMapFunction<Product, Sale>() {
@Override
public Iterable<Sale> call(Product product) throws Exception {
Random random = new Random();
List<Sale> sales = new ArrayList<>(1000);
for (int i = 0; i < 1000; i++) {
sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
}
return sales;
}
});
javaFunctions(salesRDD, Sale.class).saveToCassandra("java_api", "sales");
}
private void compute(JavaSparkContext sc) {
JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
.cassandraTable("java_api", "products", Product.class)
.keyBy(new Function<Product, Integer>() {
@Override
public Integer call(Product product) throws Exception {
return product.getId();
}
});
JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
.cassandraTable("java_api", "sales", Sale.class)
.keyBy(new Function<Sale, Integer>() {
@Override
public Integer call(Sale sale) throws Exception {
return sale.getProduct();
}
});
JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);
JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMap(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
@Override
public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
Tuple2<Sale, Product> saleWithProduct = input._2();
List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
for (Integer parentProduct : saleWithProduct._2().getParents()) {
allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
}
return allSales;
}
});
JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
@Override
public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
return v1.add(v2);
}
}).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
@Override
public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
return new Summary(input._1(), input._2());
}
});
javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");
}
private void showResults(JavaSparkContext sc) {
JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
.cassandraTable("java_api", "summaries", Summary.class)
.keyBy(new Function<Summary, Integer>() {
@Override
public Integer call(Summary summary) throws Exception {
return summary.getProduct();
}
});
JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
.cassandraTable("java_api", "products", Product.class)
.keyBy(new Function<Product, Integer>() {
@Override
public Integer call(Product product) throws Exception {
return product.getId();
}
});
List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();
for (Tuple2<Product, Optional<Summary>> result : results) {
System.out.println(result);
}
}
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
JavaDemo app = new JavaDemo(conf);
app.run();
}
public static class Product implements Serializable {
private Integer id;
private String name;
private List<Integer> parents;
public Product() { }
public Product(Integer id, String name, List<Integer> parents) {
this.id = id;
this.name = name;
this.parents = parents;
}
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public List<Integer> getParents() { return parents; }
public void setParents(List<Integer> parents) { this.parents = parents; }
@Override
public String toString() {
return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
}
}
public static class Sale implements Serializable {
private UUID id;
private Integer product;
private BigDecimal price;
public Sale() { }
public Sale(UUID id, Integer product, BigDecimal price) {
this.id = id;
this.product = product;
this.price = price;
}
public UUID getId() { return id; }
public void setId(UUID id) { this.id = id; }
public Integer getProduct() { return product; }
public void setProduct(Integer product) { this.product = product; }
public BigDecimal getPrice() { return price; }
public void setPrice(BigDecimal price) { this.price = price; }
@Override
public String toString() {
return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
}
}
public static class Summary implements Serializable {
private Integer product;
private BigDecimal summary;
public Summary() { }
public Summary(Integer product, BigDecimal summary) {
this.product = product;
this.summary = summary;
}
public Integer getProduct() { return product; }
public void setProduct(Integer product) { this.product = product; }
public BigDecimal getSummary() { return summary; }
public void setSummary(BigDecimal summary) { this.summary = summary; }
@Override
public String toString() {
return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.datastax.spark.demo</groupId>
<artifactId>java-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!--Spark Cassandra Connector-->
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.0.0-rc4</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.0.0-rc4</version>
</dependency>
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>0.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>0.9.2</version>
</dependency>
</dependencies>
</project>

Thank you for complete description. when I ran the program I got IllegalArgumentException in JavaRDD

summariesRDD = rdd.map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
@override
public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
return new Summary(input._1(), input._2());
}

pjc23 commented Oct 19, 2014

I am getting the following error when I run the example:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/api/java/function/Function
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
at java.lang.Class.getMethod0(Class.java:2774)
at java.lang.Class.getMethod(Class.java:1663)
at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.api.java.function.Function
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 6 more

Any idea? I've tried to recreate the demo app via maven command line (linux) and maven-in-eclipse (windows) but no luck. I have the 4 dependencies in my pom.xml

include this dependency as well:

com.datastax.cassandra
cassandra-driver-core
2.1.0

and make sure the version of spark and Cassandra and Connector are compatible

I was able to execute this in my cluster. I then decided to add

Map countByKey = productsRDD.countByKey();

in the compute method. Now I am consistently getting:

Exception in thread “main” java.lang.ClassCastException: it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap cannot be cast to it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap
at org.apache.spark.rdd.RDD.countByValue(RDD.scala:780)
at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:203)
at org.apache.spark.api.java.JavaPairRDD.countByKey(JavaPairRDD.scala:195)
at com.maarcus.mitur.sparktutorial.JavaDemo.compute(JavaDemo.java:98)
at com.maarcus.mitur.sparktutorial.JavaDemo.run(JavaDemo.java:34)
at com.maarcus.mitur.sparktutorial.JavaDemo.main(JavaDemo.java:181)

Any ideas of what went wrong?

Thanks.

BTW, the Map is defined as a Map of Integer to Object. Copy and paste took that part out.

Could someone please point out what I could be doing wrong? I have a standalone DSE 4.6 one node cluster and am able to launch a spark scala shell and try out sample programs. I can see the Master UI and look at various stages being processed.

However when I try to run the above Java code from Eclipse, I am getting the following errors indicating that the cluster is not being discovered

14/12/17 00:34:10 INFO server.Server: jetty-7.6.8.v20121106
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/12/17 00:34:10 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/12/17 00:34:10 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4041
14/12/17 00:34:10 INFO ui.SparkUI: Started Spark Web UI at http://172.16.0.3:4041
14/12/17 00:34:10 INFO client.AppClient$ClientActor: Connecting to master spark://127.0.0.1:7077...
14/12/17 00:34:12 WARN core.FrameCompressor: Cannot find LZ4 class, you should make sure the LZ4 library is in the classpath if you intend to use it. LZ4 compression will not be available for the protocol.
14/12/17 00:34:12 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/17 00:34:12 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/17 00:34:13 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
14/12/17 00:34:13 INFO spark.SparkContext: Starting job: runJob at RDDFunctions.scala:48
14/12/17 00:34:13 INFO scheduler.DAGScheduler: Got job 0 (runJob at RDDFunctions.scala:48) with 2 output partitions (allowLocal=false)
14/12/17 00:34:13 INFO scheduler.DAGScheduler: Final stage: Stage 0 (runJob at RDDFunctions.scala:48)
14/12/17 00:34:13 INFO scheduler.DAGScheduler: Parents of final stage: List()
14/12/17 00:34:13 INFO scheduler.DAGScheduler: Missing parents: List()
14/12/17 00:34:13 INFO scheduler.DAGScheduler: Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at JavaDemo.java:64), which has no missing parents
14/12/17 00:34:13 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (ParallelCollectionRDD[0] at parallelize at JavaDemo.java:64)
14/12/17 00:34:13 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/12/17 00:34:28 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/12/17 00:34:30 INFO client.AppClient$ClientActor: Connecting to master spark://127.0.0.1:7077...
14/12/17 00:34:43 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/12/17 00:34:50 INFO client.AppClient$ClientActor: Connecting to master spark://127.0.0.1:7077...
14/12/17 00:34:58 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
14/12/17 00:35:10 ERROR client.AppClient$ClientActor: All masters are unresponsive! Giving up.
14/12/17 00:35:10 ERROR cluster.SparkDeploySchedulerBackend: Spark cluster looks dead, giving up.
14/12/17 00:35:10 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/12/17 00:35:10 INFO scheduler.DAGScheduler: Failed to run runJob at RDDFunctions.scala:48
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Spark cluster looks down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Here are the dependencies from my pom.xml for the above code:



com.datastax.spark
spark-cassandra-connector_2.10
1.0.0-rc4


com.datastax.spark
spark-cassandra-connector-java_2.10
1.0.0-rc4

    <!--Spark-->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>0.9.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>0.9.2</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>2.1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.thrift</groupId>
                <artifactId>libthrift</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.thrift</groupId>
        <artifactId>libthrift</artifactId>
        <version>0.9.1</version>
    </dependency>

</dependencies>

gabhi commented Jan 13, 2015

I am trying to run above example at https://github.com/gabhi/spark-cassandra-java-example

I m getting following error
15/01/13 00:05:08 WARN scheduler.TaskSetManager: Loss was due to java.lang.NoSuchMethodError
java.lang.NoSuchMethodError: com.datastax.driver.core.DataType.serialize(Ljava/lang/Object;)Ljava/nio/ByteBuffer;
at com.datastax.spark.connector.writer.DefaultRowWriter$$anonfun$bind$1.apply(DefaultRowWriter.scala:89)

I am trying to run this sample getting following error, looks like the prepared statement is not getting executed correctly.

Caused by: org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Failed to prepare statement INSERT INTO "java_api_test"."products" ("parents", "name", "id") VALUES (:parents, :name, :id): line 1:90 no viable alternative at input ':')

Any ideas?

I am using following versions:

com.datastax.spark spark-cassandra-connector-java_2.10 1.0.0-rc4 org.apache.spark spark-core_2.10 0.9.2 org.apache.thrift libthrift 0.9.1

com.datastax.cassandra
cassandra-driver-core
2.1.0

ssoto commented Jul 29, 2015

I found a mistake compiling this with maven related static import and other Java funtionalities need a specific maven-compiler-plugin. I used this:

<build>
    <plugins>
      <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.0</version>
          <configuration>
              <source>1.7</source>
              <target>1.7</target>
          </configuration>
      </plugin>
    </plugins>
  </build>

guang384 commented Oct 7, 2015

some api changed.

i forked and update to spark V1.5 and spark-cassandra-connector V1.5

https://gist.github.com/guang384/53b6293499f9b0674998

So everything seems to be working fine for me till saving data to Cassandra via saveToCassandra(). I am getting the below error for my Product Class. Any idea what could the reason be?

15/10/15 01:35:01 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 192.168.1.222): java.io.IOException: java.lang.ClassNotFoundException: com.sylvestor.testcasspark.JavaDemo$Product
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: com.sylvestor.testcasspark.JavaDemo$Product
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 20 more

15/10/15 01:35:01 INFO TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2) on executor 192.168.1.222: java.io.IOException (java.lang.ClassNotFoundException: com.sylvestor.testcasspark.JavaDemo$Product) [duplicate 1]
15/10/15 01:35:01 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) on executor 192.168.1.222: java.io.IOException (java.lang.ClassNotFoundException: com.sylvestor.testcasspark.JavaDemo$Product) [duplicate 2]
15/10/15 01:35:01 INFO TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3) on executor 192.168.1.222: java.io.IOException (java.lang.ClassNotFoundException: com.sylvestor.testcasspark.JavaDemo$Product) [duplicate 3]
15/10/15 01:35:01 INFO TaskSetManager: Starting task 3.1 in stage 0.0 (TID 4, 192.168.1.222, PROCESS_LOCAL, 2490 bytes)
15/10/15 01:35:01 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 5, 192.168.1.222, PROCESS_LOCAL, 2495 bytes)
15/10/15 01:35:01 INFO TaskSetManager: Starting task 2.1 in stage 0.0 (TID 6, 192.168.1.222, PROCESS_LOCAL, 2446 bytes)
15/10/15 01:35:01 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 7, 192.168.1.222, PROCESS_LOCAL, 2454 bytes)

PNewbie commented Nov 13, 2015

I am getting following error when running this example. Any help will be greatly appreciated.

15/11/13 10:57:54 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" java.lang.IllegalArgumentException: Failed to get converter for field "getId" of type java.lang.Integer in com.datastax.spark.demo.JavaDemo$Product mapped to column "id" of "java_api.products"
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1$$anonfun$5.apply(MappedToGettableDataConverter.scala:155)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1$$anonfun$5.apply(MappedToGettableDataConverter.scala:148)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.(MappedToGettableDataConverter.scala:148)
at com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:18)
at com.datastax.spark.connector.writer.DefaultRowWriter.(DefaultRowWriter.scala:17)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:269)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:61)
at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:443)
at com.datastax.spark.demo.JavaDemo.generateData(JavaDemo.java:76)
at com.datastax.spark.demo.JavaDemo.run(JavaDemo.java:37)
at com.datastax.spark.demo.JavaDemo.main(JavaDemo.java:205)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Hii,
I am Developing web application to connect cassandra from java using spark(not maven) in eclipse.I included all the jars in lib folder,i am getting the following error,

Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.augmentString(Ljava/lang/String;)Lscala/collection/immutable/StringOps;
at akka.util.Duration$.(Duration.scala:76)
at akka.util.Duration$.(Duration.scala)
at akka.actor.ActorSystem$Settings.(ActorSystem.scala:120)
at akka.actor.ActorSystemImpl.(ActorSystem.scala:426)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:103)
at akka.actor.ActorSystem$.apply(ActorSystem.scala:98)
at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:122)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:55)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1837)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1828)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:57)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:223)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:163)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:269)
at org.apache.spark.SparkContext.(SparkContext.scala:272)
at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at SparkCassandraSave.run(SparkCassandraSave.java:27)
at SparkCassandraSave.main(SparkCassandraSave.java:115)

Any idea regarding this?

Thanks.

What are the run parameters to connect to an actual Spark Cluster instead of using local[4]

Did it worked for anybody?
What are the exact configurations, code and pom.xml which datastax claims to work perfectly.

I have tried several combinations, no success so far.

QuyetNT commented Jun 2, 2016

I am trying to run in spark-submit but still error
Exception in thread "main" java.lang.NoClassDefFoundError: com/datastax/spark/connector/japi/CassandraJavaUtil
at trongquyet.bk.TestSchema.App.main(App.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.japi.CassandraJavaUtil
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 10 more
i added to pathspark/lib and add to sparkConf().addjars, but still error appreance.

Hi, I'm trying to run this example with Spark 2.0 and Cassandra 3.7, but no look.

Is there an updated version of the example? How has my POM file to look like.

Any kind help would be highly appreciated!

Sam-SFO commented Dec 4, 2016 edited

This example no longer works as : com.datastax.spark.connector.CassandraJavaUtil. no longer exists in its old form. The new one is in separate package (com.datastax.spark.connector.japi.CassandraJavaUtil.) but does not have the overloaded .javaFunctions(). Some functions exist n other forms but no 1:1 match, couldn't find any equivalent to the one that can take .class as input. (at least I have not yet found )

So this needs to be totally refactored- has any one done that.

I have added updated pom.xml and updated class at https://gist.github.com/baghelamit/f2963d9e37acc55474559104f5f16cf1. This is working for spark - 2.0.2, spark-cassandra-connector - 2.0.0-M3 and cassandra-driver-core - 3.1.2

I have this problem using CassandraConnectorConf class. Any ideas?


 "main" java.lang.NoSuchMethodError: org.apache.commons.codec.binary.Base64.encodeBase64String([B)Ljava/lang/String;
	at com.datastax.spark.connector.cql.CassandraConnectorConf.serializedConfString$lzycompute(CassandraConnectorConf.scala:40)
	at com.datastax.spark.connector.cql.CassandraConnectorConf.serializedConfString(CassandraConnectorConf.scala:34)
	at com.datastax.spark.connector.cql.CassandraConnectorConf.hashCode(CassandraConnectorConf.scala:43)
	at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:206)
	at scala.collection.concurrent.TrieMap$MangledHashing.hash(TrieMap.scala:955)
	at scala.collection.concurrent.TrieMap.computeHash(TrieMap.scala:827)
	at scala.collection.concurrent.TrieMap.get(TrieMap.scala:842)
	at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:50)
	at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:79)
	at customCassandraSpark.JavaDemo.generateData(JavaDemo.java:53)
	at customCassandraSpark.JavaDemo.run(JavaDemo.java:43)
	at customCassandraSpark.JavaDemo.main(JavaDemo.java:188)

Here is a working sample base on the sample


4.0.0

<groupId>com.datastax.spark.demo</groupId>
<artifactId>java-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
	<!--Spark Cassandra Connector -->
	<dependency>
		<groupId>com.datastax.spark</groupId>
		<artifactId>spark-cassandra-connector_2.11</artifactId>
		<version>2.0.5</version>
	</dependency>
	<dependency>
		<groupId>com.datastax.spark</groupId>
		<artifactId>spark-cassandra-connector-java_2.11</artifactId>
		<version>1.5.2</version>
	</dependency>

	<!--Spark -->
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-core_2.11</artifactId>
		<version>2.2.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-streaming_2.11</artifactId>
		<version>2.2.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.spark</groupId>
		<artifactId>spark-sql_2.11</artifactId>
		<version>2.2.0</version>
	</dependency>
</dependencies>

package com.datastax.spark.demo;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.UUID;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;

import scala.Tuple2;

public class Test {

public static void main(String[] args) {
	// TODO Auto-generated method stub
    SparkConf conf = new SparkConf();
    conf.setAppName("Java API demo");
    conf.setMaster("local[*]");
    conf.set("spark.cassandra.connection.host", "127.0.0.1");
    JavaSparkContext sc = new JavaSparkContext(conf);
    generateData(sc);
    compute(sc);
    showResults(sc);
    sc.stop();

}


private static void showResults(JavaSparkContext sc) {
	// TODO Auto-generated method stub
	JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
            .cassandraTable("java_api", "summaries", mapRowTo(Summary.class))
            .keyBy(new Function<Summary, Integer>() {
                @Override
                public Integer call(Summary summary) throws Exception {
                    return summary.getProduct();
                }
            });

    JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
            .cassandraTable("java_api", "products", mapRowTo(Product.class))
            .keyBy(new Function<Product, Integer>() {
                @Override
                public Integer call(Product product) throws Exception {
                    return product.getId();
                }
            });

    List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().collect();

    for (Tuple2<Product, Optional<Summary>> result : results) {
        System.out.println(result);
    }
}


private static void compute(JavaSparkContext sc) {
	// TODO Auto-generated method stub

// JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
// .cassandraTable("java_api", "products", mapColumnTo(Product.class))
// .keyBy(new Function<Product, Integer>() {
// @override
// public Integer call(Product product) throws Exception {
// return product.getId();
// }
// });

	JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc).cassandraTable("java_api", "products", 
            mapRowTo(Product.class)).keyBy(new Function<Product, Integer>() {
        @Override
        public Integer call(Product product) throws Exception {
            return product.getId();
        }
    });

// JavaRDD productsRDD = CassandraJavaUtil.javaFunctions(sc)
// .cassandraTable("my_keyspace", "my_table", CassandraJavaUtil.mapColumnTo(Product.class))
// .select("id","name","parents");

    JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
            .cassandraTable("java_api", "sales",mapRowTo(Sale.class))
            .keyBy(new Function<Sale, Integer>() {
                @Override
                public Integer call(Sale sale) throws Exception {
                    return sale.getProduct();
                }
            });

    JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);

    JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
        @Override
        public Iterator<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
            Tuple2<Sale, Product> saleWithProduct = input._2();
            List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
            allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
            for (Integer parentProduct : saleWithProduct._2().getParents()) {
                allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
            }
            return allSales.iterator();
        }
    });

    JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
        @Override
        public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
            return v1.add(v2);
        }
    }).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
        @Override
        public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
            return new Summary(input._1(), input._2());
        }
    });

    javaFunctions(summariesRDD).writerBuilder("java_api", "summaries", mapToRow(Summary.class)).saveToCassandra();
}


private static void generateData(JavaSparkContext sc) {
    CassandraConnector connector = CassandraConnector.apply(sc.getConf());

    // Prepare the schema
    try (Session session = connector.openSession()) {
        session.execute("DROP KEYSPACE IF EXISTS java_api");
        session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
        session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
        session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
    }

    // Prepare the products hierarchy
    List<Product> products = Arrays.asList(
            new Product(0, "All products", Collections.<Integer>emptyList()),
            new Product(1, "Product A", Arrays.asList(0)),
            new Product(4, "Product A1", Arrays.asList(0, 1)),
            new Product(5, "Product A2", Arrays.asList(0, 1)),
            new Product(2, "Product B", Arrays.asList(0)),
            new Product(6, "Product B1", Arrays.asList(0, 2)),
            new Product(7, "Product B2", Arrays.asList(0, 2)),
            new Product(3, "Product C", Arrays.asList(0)),
            new Product(8, "Product C1", Arrays.asList(0, 3)),
            new Product(9, "Product C2", Arrays.asList(0, 3))
    );

    JavaRDD<Product> productsRDD = sc.parallelize(products);
    javaFunctions(productsRDD).writerBuilder("java_api", "products",mapToRow(Product.class)).saveToCassandra();

    JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
        @Override
        public Boolean call(Product product) throws Exception {
            return product.getParents().size() == 2;
        }
    }).flatMap(new FlatMapFunction<Product, Sale>() {

		@Override
		public Iterator<Sale> call(Product product) throws Exception {
			// TODO Auto-generated method stub
			Random random = new Random();
            List<Sale> sales = new ArrayList<Sale>(1000);
            for (int i = 0; i < 1000; i++) {
                sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
            }
			return sales.iterator();
		}
        
	
    });

    javaFunctions(salesRDD).writerBuilder("java_api", "sales",mapToRow(Sale.class)).saveToCassandra();
   
}


public static class Product implements Serializable {
    private Integer id;
    private String name;
    private List<Integer> parents;

    public Product() { }

    public Product(Integer id, String name, List<Integer> parents) {
        this.id = id;
        this.name = name;
        this.parents = parents;
    }

    public Integer getId() { return id; }
    public void setId(Integer id) { this.id = id; }

    public String getName() { return name; }
    public void setName(String name) { this.name = name; }

    public List<Integer> getParents() { return parents; }
    public void setParents(List<Integer> parents) { this.parents = parents; }

    @Override
    public String toString() {
        return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
    }
}

public static class Sale implements Serializable {
    private UUID id;
    private Integer product;
    private BigDecimal price;

    public Sale() { }

    public Sale(UUID id, Integer product, BigDecimal price) {
        this.id = id;
        this.product = product;
        this.price = price;
    }

    public UUID getId() { return id; }
    public void setId(UUID id) { this.id = id; }

    public Integer getProduct() { return product; }
    public void setProduct(Integer product) { this.product = product; }

    public BigDecimal getPrice() { return price; }
    public void setPrice(BigDecimal price) { this.price = price; }

    @Override
    public String toString() {
        return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
    }
}

public static class Summary implements Serializable {
    private Integer product;
    private BigDecimal summary;

    public Summary() { }

    public Summary(Integer product, BigDecimal summary) {
        this.product = product;
        this.summary = summary;
    }

    public Integer getProduct() { return product; }
    public void setProduct(Integer product) { this.product = product; }

    public BigDecimal getSummary() { return summary; }
    public void setSummary(BigDecimal summary) { this.summary = summary; }

    @Override
    public String toString() {
        return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
    }
}

}

sunone5 commented Oct 13, 2017

Completely rewritten with ( java version "1.8.0_131" & javac 1.8.0_131 ). For those who newbies, complete application can be found here.
https://github.com/sunone5/BigData/tree/master/spark-cassandra-streaming

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment