Skip to content

Instantly share code, notes, and snippets.

@amarjitdhillon
Created August 31, 2017 21:31
Show Gist options
  • Save amarjitdhillon/51ab6fba68452b301658786459e7d171 to your computer and use it in GitHub Desktop.
Save amarjitdhillon/51ab6fba68452b301658786459e7d171 to your computer and use it in GitHub Desktop.
public class writeToCassandra {
private static Session session;
private static Cluster cluster;
private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE data WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
private static final String createTable = "CREATE TABLE test.patient(id int, heart_rate int, PRIMARY KEY(id));" ;
public static void main(String[] args) throws Exception {
//setting the env variable to local
StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.createLocalEnvironment(1);
DataStream<cassandraData> hrEventDataStream = envrionment.addSource(new sensorData());
// hrEventDataStream.print();
CassandraSink.addSink(hrEventDataStream)
.setQuery("INSERT INTO test.patient(id,heart_rate) values (?,?);")
.setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("127.0.0.1").build();
}
})
.build();
envrionment.execute();
} //main
} //writeToCassandra
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment