Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Cassandra C# driver CassandraNoHostException
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Cassandra;
using Cassandra.Data;
using Cassandra.Data.Linq;
using metrics.Core;
using metrics.Reporting;
using System.Diagnostics;
using System.Threading.Tasks;
namespace CassandraLoadTest
{
public class DatastaxDriverTest
{
private readonly Session Session;
private readonly int nThreads;
private HistogramMetric _readHistogram = metrics.Metrics.Histogram(typeof(DatastaxDriverTest), "Reads");
private HistogramMetric _writeHistogram = metrics.Metrics.Histogram(typeof(DatastaxDriverTest), "Writes");
public DatastaxDriverTest(string[] hosts, string datacenter, long constDelayMS, int queryTimeout,
int coreConnectionPerHost, int maxConnectionPerHost, int nThreads)
{
Builder cassandraBuilder = Cluster.Builder().AddContactPoints(hosts)
.WithLoadBalancingPolicy(new DCAwareRoundRobinPolicy(datacenter))
.WithReconnectionPolicy(new ConstantReconnectionPolicy(constDelayMS))
.WithRetryPolicy(DefaultRetryPolicy.Instance)
.WithQueryTimeout(queryTimeout)
.WithCompression(CompressionType.NoCompression);
cassandraBuilder.PoolingOptions.SetCoreConnectionsPerHost(HostDistance.Local, coreConnectionPerHost);
cassandraBuilder.PoolingOptions.SetMaxConnectionsPerHost(HostDistance.Local, maxConnectionPerHost);
Cluster cluster = cassandraBuilder.Build();
this.Session = cluster.Connect();
this.nThreads = nThreads;
ConsoleReporter consoleReport = new ConsoleReporter();
consoleReport.Start(10, metrics.TimeUnit.Seconds);
}
public void parallelInsertTest()
{
Console.WriteLine("Start parallel insert test");
string keyspaceName = "testkeyspace";
Console.WriteLine("Creating keyspace");
Session.Cluster.WaitForSchemaAgreement(
Session.Execute(
string.Format(@"CREATE KEYSPACE {0}
WITH replication = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};"
, keyspaceName)));
Session.ChangeKeyspace(keyspaceName);
string tableName = "testtable";
try
{
Session.Cluster.WaitForSchemaAgreement(
Session.Execute(string.Format(@"CREATE TABLE {0}(
tweet_id int,
author text,
body text,
isok boolean,
PRIMARY KEY(tweet_id))", tableName)));
}
catch (AlreadyExistsException)
{ }
Console.WriteLine("Prepare statement");
PreparedStatement insertPrep = Session.Prepare("INSERT INTO " + tableName + @" (
tweet_id,
author,
isok,
body)
VALUES (?,?,?,?);");
Console.WriteLine("Insert Values");
int RowsNo = 100000;
int stepSize = RowsNo / nThreads;
Task[] tasks = new Task[nThreads];
for (int i = 0; i < nThreads; i++)
{
var startIndex = i * stepSize;
var endIndex = (i + 1) * stepSize;
tasks[i] = Task.Factory.StartNew(() => insertRange(insertPrep, startIndex, endIndex));
}
Task.WaitAll(tasks);
Console.WriteLine("Insertion complete");
Session.Execute(string.Format(@"DROP TABLE {0};", tableName));
Session.Execute(string.Format(@"DROP KEYSPACE {0};", keyspaceName));
}
public void insertRange(PreparedStatement prepStatement, int startIndex, int endIndex)
{
System.Threading.Thread.Sleep(500);
Console.WriteLine("Inserting values from " + startIndex + " to " + endIndex);
Stopwatch t = Stopwatch.StartNew();
long totalElapsedTime = 0;
for (int idx = startIndex; idx < endIndex; idx++)
{
try
{
Session.Execute(
prepStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum)
.Bind(new object[] {
idx,
"author"+idx,
idx % 2 == 0 ? false : true,
"body"+idx
}));
}
catch (Exception ex)
{
Console.WriteLine("Error while inserting " + ex.StackTrace);
}
var elapsedMs = t.ElapsedMilliseconds - totalElapsedTime;
_writeHistogram.Update(elapsedMs);
totalElapsedTime += elapsedMs;
}
}
}
}
namespace CassandraLoadTest
{
public class MainLoadTest
{
public static void Main(string[] args)
{
string[] hosts = new string[]{"localhost"};
string datacenter= "datacenter1";
long constDelayMS= 60000;
int queryTimeout = 5000;
int coreConnectionPerHost= 2;
int maxConnectionPerHost = 8;
int nThreads = 50;
var driver = new DatastaxDriverTest(hosts, datacenter, constDelayMS, queryTimeout, coreConnectionPerHost,
maxConnectionPerHost, nThreads);
driver.parallelInsertTest();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment