Skip to content

Instantly share code, notes, and snippets.

@antonioalcocer
Last active August 29, 2015 14:10
Show Gist options
  • Save antonioalcocer/89d133e929302da0f4b9 to your computer and use it in GitHub Desktop.
Save antonioalcocer/89d133e929302da0f4b9 to your computer and use it in GitHub Desktop.
package com.sinfonier.drains;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.Session;
/**
* Created by aalcocer on 5/12/14.
*/
public class InsertCassandra extends BaseSinfonierDrain {
private String cas_Table;
private String cas_keyspace;
private String serversHost;
private String serversPort;
private String[] server;
private static Cluster cluster;
private Session session; // Cassandra session
private ConsistencyLevel consistency;
/**
* Constructor.
*
* @param xmlFile Path to xml file.
*/
public InsertCassandra(String xmlFile) {
super(xmlFile);
}
@Override public void userprepare() {
this.cas_keyspace=getParam("keyspace");
this.cas_Table=getParam("table");
this.serversHost=getParam("server_host");
this.serversPort=getParam("server_port");
connect(serversHost, serversPort);
session=cluster.connect();
}
@Override public void userexecute() {
insertData(cas_keyspace,cas_Table,getJson());
}
@Override public void usercleanup() {
close();
}
public void connect(String node, String Port) {
cluster = Cluster.builder()
.addContactPoint(node)
.withPort(Integer.parseInt(serversPort))
.build();
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to cluster: %s\n",
metadata.getClusterName());
for ( Host host : metadata.getAllHosts() ) {
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
host.getDatacenter(), host.getAddress(), host.getRack());
}
}
public void close() {
cluster.close();
}
public void insertData(String keyspace, String table, Map<String, Object> json) {
List<String> campos= new LinkedList<String>();
List<String> values= new LinkedList<String>();
for (String key: json.keySet()){
Object value= json.get(key);
if (value instanceof String ){
values.add("'"+value.toString()+"'");
}
else {
values.add(value.toString());
}
campos.add(key);
}
String queryInsert="INSERT INTO "+keyspace+"."+table+" ("+ campos.toString().replace("[",""
).replace("]","") +")" + " VALUES (" +
values.toString().replace("[","").replace("]","") + ");";
session.execute(queryInsert);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment