Last active
August 29, 2015 14:10
-
-
Save antonioalcocer/89d133e929302da0f4b9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sinfonier.drains; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.Map; | |
import com.datastax.driver.core.Cluster; | |
import com.datastax.driver.core.ConsistencyLevel; | |
import com.datastax.driver.core.Host; | |
import com.datastax.driver.core.Metadata; | |
import com.datastax.driver.core.Session; | |
/** | |
* Created by aalcocer on 5/12/14. | |
*/ | |
public class InsertCassandra extends BaseSinfonierDrain { | |
private String cas_Table; | |
private String cas_keyspace; | |
private String serversHost; | |
private String serversPort; | |
private String[] server; | |
private static Cluster cluster; | |
private Session session; // Cassandra session | |
private ConsistencyLevel consistency; | |
/** | |
* Constructor. | |
* | |
* @param xmlFile Path to xml file. | |
*/ | |
public InsertCassandra(String xmlFile) { | |
super(xmlFile); | |
} | |
@Override public void userprepare() { | |
this.cas_keyspace=getParam("keyspace"); | |
this.cas_Table=getParam("table"); | |
this.serversHost=getParam("server_host"); | |
this.serversPort=getParam("server_port"); | |
connect(serversHost, serversPort); | |
session=cluster.connect(); | |
} | |
@Override public void userexecute() { | |
insertData(cas_keyspace,cas_Table,getJson()); | |
} | |
@Override public void usercleanup() { | |
close(); | |
} | |
public void connect(String node, String Port) { | |
cluster = Cluster.builder() | |
.addContactPoint(node) | |
.withPort(Integer.parseInt(serversPort)) | |
.build(); | |
Metadata metadata = cluster.getMetadata(); | |
System.out.printf("Connected to cluster: %s\n", | |
metadata.getClusterName()); | |
for ( Host host : metadata.getAllHosts() ) { | |
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n", | |
host.getDatacenter(), host.getAddress(), host.getRack()); | |
} | |
} | |
public void close() { | |
cluster.close(); | |
} | |
public void insertData(String keyspace, String table, Map<String, Object> json) { | |
List<String> campos= new LinkedList<String>(); | |
List<String> values= new LinkedList<String>(); | |
for (String key: json.keySet()){ | |
Object value= json.get(key); | |
if (value instanceof String ){ | |
values.add("'"+value.toString()+"'"); | |
} | |
else { | |
values.add(value.toString()); | |
} | |
campos.add(key); | |
} | |
String queryInsert="INSERT INTO "+keyspace+"."+table+" ("+ campos.toString().replace("[","" | |
).replace("]","") +")" + " VALUES (" + | |
values.toString().replace("[","").replace("]","") + ");"; | |
session.execute(queryInsert); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment