Cassandra C# driver CassandraNoHostException
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using Cassandra; | |
using Cassandra.Data; | |
using Cassandra.Data.Linq; | |
using metrics.Core; | |
using metrics.Reporting; | |
using System.Diagnostics; | |
using System.Threading.Tasks; | |
namespace CassandraLoadTest | |
{ | |
public class DatastaxDriverTest | |
{ | |
private readonly Session Session; | |
private readonly int nThreads; | |
private HistogramMetric _readHistogram = metrics.Metrics.Histogram(typeof(DatastaxDriverTest), "Reads"); | |
private HistogramMetric _writeHistogram = metrics.Metrics.Histogram(typeof(DatastaxDriverTest), "Writes"); | |
public DatastaxDriverTest(string[] hosts, string datacenter, long constDelayMS, int queryTimeout, | |
int coreConnectionPerHost, int maxConnectionPerHost, int nThreads) | |
{ | |
Builder cassandraBuilder = Cluster.Builder().AddContactPoints(hosts) | |
.WithLoadBalancingPolicy(new DCAwareRoundRobinPolicy(datacenter)) | |
.WithReconnectionPolicy(new ConstantReconnectionPolicy(constDelayMS)) | |
.WithRetryPolicy(DefaultRetryPolicy.Instance) | |
.WithQueryTimeout(queryTimeout) | |
.WithCompression(CompressionType.NoCompression); | |
cassandraBuilder.PoolingOptions.SetCoreConnectionsPerHost(HostDistance.Local, coreConnectionPerHost); | |
cassandraBuilder.PoolingOptions.SetMaxConnectionsPerHost(HostDistance.Local, maxConnectionPerHost); | |
Cluster cluster = cassandraBuilder.Build(); | |
this.Session = cluster.Connect(); | |
this.nThreads = nThreads; | |
ConsoleReporter consoleReport = new ConsoleReporter(); | |
consoleReport.Start(10, metrics.TimeUnit.Seconds); | |
} | |
public void parallelInsertTest() | |
{ | |
Console.WriteLine("Start parallel insert test"); | |
string keyspaceName = "testkeyspace"; | |
Console.WriteLine("Creating keyspace"); | |
Session.Cluster.WaitForSchemaAgreement( | |
Session.Execute( | |
string.Format(@"CREATE KEYSPACE {0} | |
WITH replication = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};" | |
, keyspaceName))); | |
Session.ChangeKeyspace(keyspaceName); | |
string tableName = "testtable"; | |
try | |
{ | |
Session.Cluster.WaitForSchemaAgreement( | |
Session.Execute(string.Format(@"CREATE TABLE {0}( | |
tweet_id int, | |
author text, | |
body text, | |
isok boolean, | |
PRIMARY KEY(tweet_id))", tableName))); | |
} | |
catch (AlreadyExistsException) | |
{ } | |
Console.WriteLine("Prepare statement"); | |
PreparedStatement insertPrep = Session.Prepare("INSERT INTO " + tableName + @" ( | |
tweet_id, | |
author, | |
isok, | |
body) | |
VALUES (?,?,?,?);"); | |
Console.WriteLine("Insert Values"); | |
int RowsNo = 100000; | |
int stepSize = RowsNo / nThreads; | |
Task[] tasks = new Task[nThreads]; | |
for (int i = 0; i < nThreads; i++) | |
{ | |
var startIndex = i * stepSize; | |
var endIndex = (i + 1) * stepSize; | |
tasks[i] = Task.Factory.StartNew(() => insertRange(insertPrep, startIndex, endIndex)); | |
} | |
Task.WaitAll(tasks); | |
Console.WriteLine("Insertion complete"); | |
Session.Execute(string.Format(@"DROP TABLE {0};", tableName)); | |
Session.Execute(string.Format(@"DROP KEYSPACE {0};", keyspaceName)); | |
} | |
public void insertRange(PreparedStatement prepStatement, int startIndex, int endIndex) | |
{ | |
System.Threading.Thread.Sleep(500); | |
Console.WriteLine("Inserting values from " + startIndex + " to " + endIndex); | |
Stopwatch t = Stopwatch.StartNew(); | |
long totalElapsedTime = 0; | |
for (int idx = startIndex; idx < endIndex; idx++) | |
{ | |
try | |
{ | |
Session.Execute( | |
prepStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum) | |
.Bind(new object[] { | |
idx, | |
"author"+idx, | |
idx % 2 == 0 ? false : true, | |
"body"+idx | |
})); | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine("Error while inserting " + ex.StackTrace); | |
} | |
var elapsedMs = t.ElapsedMilliseconds - totalElapsedTime; | |
_writeHistogram.Update(elapsedMs); | |
totalElapsedTime += elapsedMs; | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace CassandraLoadTest | |
{ | |
public class MainLoadTest | |
{ | |
public static void Main(string[] args) | |
{ | |
string[] hosts = new string[]{"localhost"}; | |
string datacenter= "datacenter1"; | |
long constDelayMS= 60000; | |
int queryTimeout = 5000; | |
int coreConnectionPerHost= 2; | |
int maxConnectionPerHost = 8; | |
int nThreads = 50; | |
var driver = new DatastaxDriverTest(hosts, datacenter, constDelayMS, queryTimeout, coreConnectionPerHost, | |
maxConnectionPerHost, nThreads); | |
driver.parallelInsertTest(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment