Skip to content

Instantly share code, notes, and snippets.

@mpenick
Last active August 29, 2015 14:04
Show Gist options
  • Save mpenick/eb59939b5020e081fb78 to your computer and use it in GitHub Desktop.
Save mpenick/eb59939b5020e081fb78 to your computer and use it in GitHub Desktop.
Prepared Inserts
/*
Copyright (c) 2014 DataStax
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
#include "cassandra.h"
#define NUM_THREADS 1
#define NUM_CONCURRENT_REQUESTS 10000
void print_error(CassFuture* future) {
CassString message = cass_future_error_message(future);
fprintf(stderr, "Error: %.*s\n", (int)message.length, message.data);
}
CassCluster* create_cluster() {
CassCluster* cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, "127.0.0.1");
cass_cluster_set_credentials(cluster, "cassandra", "cassandra");
cass_cluster_set_log_level(cluster, CASS_LOG_WARN);
cass_cluster_set_queue_size_io(cluster, 8*16384);
cass_cluster_set_num_threads_io(cluster, 2);
cass_cluster_set_max_pending_requests(cluster, 100000);
cass_cluster_set_core_connections_per_host(cluster, 4);
cass_cluster_set_max_connections_per_host(cluster, 10);
return cluster;
}
CassError connect_session(CassCluster* cluster, CassSession** output) {
CassError rc = 0;
CassFuture* future = cass_cluster_connect_keyspace(cluster, "examples");
*output = NULL;
cass_future_wait(future);
rc = cass_future_error_code(future);
if(rc != CASS_OK) {
print_error(future);
} else {
*output = cass_future_get_session(future);
}
cass_future_free(future);
return rc;
}
CassError prepare_query(CassSession* session, CassString query, const CassPrepared** prepared) {
CassError rc = 0;
CassFuture* future = NULL;
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if(rc != CASS_OK) {
print_error(future);
} else {
*prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
return rc;
}
void insert_into_perf(CassSession* session, CassString query, const CassPrepared* prepared) {
size_t i;
uint64_t start, elapsed;
CassFuture* futures[NUM_CONCURRENT_REQUESTS];
static double total = 0.0;
static long count = 0;
CassCollection* collection = cass_collection_new(CASS_COLLECTION_TYPE_SET, 2);
cass_collection_append_string(collection, cass_string_init("jazz"));
cass_collection_append_string(collection, cass_string_init("2013"));
start = uv_hrtime();
for(i = 0; i < NUM_CONCURRENT_REQUESTS; ++i) {
CassUuid id;
CassStatement* statement;
if (prepared != NULL) {
statement = cass_prepared_bind(prepared);
} else {
statement = cass_statement_new(query, 5);
}
cass_uuid_generate_time(id);
cass_statement_bind_uuid(statement, 0, id);
cass_statement_bind_string(statement, 1, cass_string_init("La Petite Tonkinoise"));
cass_statement_bind_string(statement, 2, cass_string_init("Bye Bye Blackbird"));
cass_statement_bind_string(statement, 3, cass_string_init("Joséphine Baker"));
cass_statement_bind_collection(statement, 4, collection);
futures[i] = cass_session_execute(session, statement);
cass_statement_free(statement);
}
for(i = 0; i < NUM_CONCURRENT_REQUESTS; ++i) {
CassFuture* future = futures[i];
CassError rc = cass_future_error_code(future);
if(rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
}
elapsed = uv_hrtime() - start;
total += (double)NUM_CONCURRENT_REQUESTS / ((double)elapsed / 1000000000.0);
count++;
printf("average %lf inserts/sec\n", total / count);
}
void run_insert_queries(void* data) {
int i;
CassSession* session = (CassSession*)data;
const CassPrepared* insert_prepared = NULL;
CassString insert_query = cass_string_init("INSERT INTO songs (id, title, album, artist, tags) VALUES (?, ?, ?, ?, ?);");
#define USE_PREPARED
#ifdef USE_PREPARED
if (prepare_query(session, insert_query, &insert_prepared) == CASS_OK) {
for (i = 0; i < 100; ++i) {
insert_into_perf(session, insert_query, insert_prepared);
}
cass_prepared_free(insert_prepared);
}
#else
insert_into_perf(session, insert_query, insert_prepared);
#endif
}
int main() {
int i;
uv_thread_t threads[NUM_THREADS];
CassError rc = 0;
CassCluster* cluster = create_cluster();
CassSession* session = NULL;
CassFuture* close_future = NULL;
rc = connect_session(cluster, &session);
if(rc != CASS_OK) {
return -1;
}
for (i = 0; i < NUM_THREADS; ++i) {
uv_thread_create(&threads[i], run_insert_queries, (void*)session);
}
for (i = 0; i < NUM_THREADS; ++i) {
uv_thread_join(&threads[i]);
}
close_future = cass_session_close(session);
cass_future_wait(close_future);
cass_future_free(close_future);
cass_cluster_free(cluster);
return 0;
}
package mikep.perf;
import static com.codahale.metrics.MetricRegistry.name;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.utils.UUIDs;
import com.google.common.util.concurrent.Futures;
/**
* Hello world!
*
*/
public class App
{
final static MetricRegistry metrics = new MetricRegistry();
final static Timer queries = metrics.timer(name(App.class, "queries"));
public static void main( String[] args ) throws ExecutionException, InterruptedException
{
Cluster cluster = Cluster.builder()
.addContactPoint("127.0.0.1")
.withCredentials("cassandra", "cassandra")
.build();
Session session = cluster.connect("examples");
PreparedStatement prepared = session.prepare("INSERT INTO songs (id, title, album, artist, tags) VALUES (?, ?, ?, ?, ?);");
List<ResultSetFuture> futures = new ArrayList<ResultSetFuture>(10000);
Set<String> collection = new HashSet<String>();
collection.add("jazz");
collection.add("2013");
double total = 0.0;
long count = 0;
for (int j = 0; j < 1000; ++j) {
long start = System.currentTimeMillis();
try {
for (int i = 0; i < 10000; ++i) {
BoundStatement statement = prepared.bind();
statement.setUUID(0, UUIDs.timeBased());
statement.setString(1, "La Petite Tonkinoise");
statement.setString(2, "Bye Bye Blackbird");
statement.setString(3, "Joséphine Baker");
statement.setSet(4, collection);
futures.add(session.executeAsync(statement));
}
Futures.allAsList(futures).get();
} catch(Exception e) {
System.out.printf("Error: %s\n", e.getMessage());
}
long elapsed = System.currentTimeMillis() - start;
total += 10000.0 / (elapsed / 1000.0);
count++;
System.out.printf("average %f inserts/sec\n", total / count);
//System.out.printf("%f inserts/sec\n", 10000.0 / (elapsed / 1000.0));
futures.clear();
}
session.close();
cluster.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment