Created July 10, 2011 17:17
Tool for profiling Cassandra query performance.
"""Tool for profiling Cassandra query performance.
Tests are run by profile() multiple times and the 'Read Latency' is
extracted using node tool.
#Create the schema using the cassandra-cli.
create keyspace query
with strategy_options=[{replication_factor:1}]
and placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy';
use query;
create column family NoCache
with comparator = AsciiType
and default_validation_class = AsciiType
and key_validation_class = AsciiType
and keys_cached = 0
and rows_cached = 0;
#Load data
$python insert_rows
#Warm up the database
$python warm_up
#Test the name locality for query by column name
$python name_locality
#Test the start position for query by range
$python start_position
import math
import multiprocessing
import operator
import os
import os.path
import random
import re
import shlex
import subprocess
import sys
import time
import pycassa
rows = [
("small-row", 100), # 100 columns, 5K of data
("no-col-index", 1200), # 1200 columns, 60K of data
("five-thousand", 5000), # 5000 columns, 244K of data
("ten-thousand", 10000), # 10000 columns, 488K of data
("hundred-thousand", 100000), # 100000 columns, 4.8M of data
("one-million", 1000000), # 1000000 columns, 48M of data
("ten-million", 10000000) # 10000000 columns, 480M of data
CASSANDRA_PATH = "/Users/aaron/frameworks/cassandra/apache-cassandra-0.8.1/bin/"
CASS_POOL = pycassa.connect("query", ["localhost"])
NOCACHE = pycassa.columnfamily.ColumnFamily(CASS_POOL, "NoCache")
col_pattern = "{0:0>#10}"
profile_pattern = "Row {0:>20} latency in ms "\
"{1:>10.4} {2:>10.4} {3:>10.4} {4:>10.4}"
def make_cols(cols):
"""Make the column names"""
return [col_pattern.format(i) for i in xrange(cols)]
def make_paged_cols(cols, shuffle=True):
"""Split the cols into pages about the size as a column index page
1310 * 50B = 63.9K
:param shuffle: Shuffle the pages
:returns: List of lists.
all_cols = make_cols(cols)
pages_start = range(0, cols, 1310)
pages_end = pages_start[1:] + [cols]
pages = [
operator.getslice(all_cols, start, end)
for start, end in zip(pages_start, pages_end)
if shuffle:
return pages
def col_range(start, cols, count):
"""A range of columns from the start or end of the column list.
:returns: list of column names.
all_cols = make_cols(cols)
return all_cols[:count] if start else all_cols[-count:]
def range_paged_cols(start, cols, range, shuffle_pages=False,
shuffle_columns=False, collapse=True):
"""select a range from the start or end of each page of columns
:returns: list of columns if collapse, else list of list of columns.
result = []
for page in make_paged_cols(cols, shuffle=shuffle_pages):
if shuffle_columns:
if collapse:
result.extend(page[:range] if start else page[-range:])
result.append(page[:range] if start else page[-range:])
return result
def start_col(cols, offset, page_offset=None):
"""A single column name at the offset from the start of the row.
:returns: str column name
if page_offset is None:
all_cols = make_cols(cols)
all_cols = make_paged_cols(cols, shuffle=False)
all_cols = all_cols[min(page_offset, len(all_cols)-1)]
return all_cols[offset]
# ========================
# Testing functions
def run(cmd):
"""Execute the command line"""
p = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE,
return p.communicate()
def percentile(N, percent, key=lambda x:x):
from (r1)
Find the percentile of a list of values.
@parameter N - is a list of values. Note N MUST BE already sorted.
@parameter percent - a float value from 0.0 to 1.0.
@parameter key - optional key function to compute value from each element of N.
@return - the percentile of the values
if not N:
return None
k = (len(N)-1) * percent
f = math.floor(k)
c = math.ceil(k)
if f == c:
return key(N[int(k)])
d0 = key(N[int(f)]) * (k-f)
d1 = key(N[int(c)]) * (c-k)
return d0+d1
def profile(target, repeat=10):
"""Clears the recent latency stats, runs the target and then gets the
latency stats again.
global rows
nodetool_path = os.path.join(CASSANDRA_PATH, "nodetool")
cmd = nodetool_path + " -h localhost cfstats"
print "Latency is min, 80th percentile, 95th percentile and max."
print target.__doc__
for key, cols in rows:
latency = []
low_col_warn = False
for i in range(repeat):
rv = target(key, cols)
std_out, std_err = run(cmd)
if std_err:
raise RuntimeError(std_err)
if len(rv) < 100 and not low_col_warn:
print "WARN: only %s columns returned" % len(rv)
low_col_warn = True
this_latency = _parse_latency(std_out)
if this_latency is not None:
stats = (
percentile(latency, 0.8),
percentile(latency, 0.95),
print profile_pattern.format(key, *stats)
def _parse_latency(data):
"""Parse the nodetool std out to get the recent read latency for the CF.
found_cf = False
for line in data.split("\n"):
line = line.strip()
if not found_cf:
found_cf = line.startswith("Column Family: NoCache")
if found_cf:
token = re.findall("Read Latency: (?P<foo>\S*)", line)
if not token:
return (float(token[0]))
return None
def start_position():
"""Test different start column offsets."""
print "Test start position..."
def test1(key, cols):
"""100 columns from with no start column"""
return NOCACHE.get(key, column_count=100)
def test2(key, cols):
"""100 columns from the start of the row with a start col"""
x = start_col(cols, 0)
return NOCACHE.get(key, column_start=x, column_count=100)
def test3(key, cols):
"""100 columns from the start of the second page"""
x = start_col(cols, 0, page_offset=1)
return NOCACHE.get(key, column_start=x, column_count=100)
def test4(key, cols):
"""100 columns starting half way through the row"""
x = start_col(cols, (cols / 2))
return NOCACHE.get(key, column_start=x, column_count=100)
def test5(key, cols):
"""100 columns starting from the last page """
x = start_col(cols, 0, page_offset=-1)
return NOCACHE.get(key, column_start=x, column_count=100)
def name_locality():
"""Test difference between tightly clustered columns and spread out
print "Test name locality..."
def test1(key, cols):
"""100 columns by name, start of the row."""
x = col_range(True, cols, 100)
return NOCACHE.get(key, columns=x)
def test2(key, cols):
"""100 columns by name, end of the row."""
x = col_range(False, cols, 100)
return NOCACHE.get(key, columns=x)
def test3(key, cols):
"""100 columns by name, middle of row."""
x = range_paged_cols(True, cols, 100, collapse=False)
#Got a list of lists, inner list is columns from a page.
#Pick the middle page
x = x[int(len(x)/2.0)]
return NOCACHE.get(key, columns=x)
def test4(key, cols):
"""100 columns by name, first 2 cols from 50 random pages"""
x = range_paged_cols(True, cols, 2, shuffle_pages=True)
return NOCACHE.get(key, columns=x[:100])
def test5(key, cols):
"""100 columns by name, last 2 cols from 50 random pages"""
x = range_paged_cols(False, cols, 2,shuffle_pages=True)
return NOCACHE.get(key, columns=x[:100])
def test6(key, cols):
"""100 columns by name, random 2 cols from 50 random pages"""
x = range_paged_cols(False, cols, 2,shuffle_pages=True,
return NOCACHE.get(key, columns=x[:100])
def test7(key, cols):
"""100 columns by name, first col from 100 random pages"""
x = range_paged_cols(True, cols, 1,shuffle_pages=True)
return NOCACHE.get(key, columns=x[:100])
def test8(key, cols):
"""100 columns by name, last col from 100 random pages"""
x = range_paged_cols(False, cols, 1,shuffle_pages=True, )
return NOCACHE.get(key, columns=x[:100])
def test9(key, cols):
"""100 columns by name, random col from 100 random pages"""
x = range_paged_cols(False, cols, 1,shuffle_pages=True,
return NOCACHE.get(key, columns=x[:100])
def warm_up():
"""Scan through all of the keys in each row to warm the server"""
global rows
class Walk(multiprocessing.Process):
def __init__(self, row_key):
super(Walk, self).__init__()
self.row_key = row_key
conn = pycassa.connect("query", ["localhost"]) = pycassa.columnfamily.ColumnFamily(conn, "NoCache")
def run(self):
start_col = ""
while start_col is not None:
cols =, column_start=start_col,
start_col, _ = cols.popitem(last=True) if len(cols) > 1 else (
None, None)
threads = [Walk(key) for key, cols in rows]
print "Starting %s processes..." % len(rows)
start = time.time()
map(Walk.start, threads)
alive = lambda: (t for t in threads if t.is_alive())
while (any(alive())):
if int(time.time() - start) % 5 == 0:
print "Alive count ", len(list(alive()))
print "Finish in ", time.time() - start
def insert_rows():
"""Insert rows into the DB for testing"""
global rows
class Insert(multiprocessing.Process):
def __init__(self, row_key, cols):
super(Insert, self).__init__()
self.row_key = row_key
self.cols = cols
self.conn = pycassa.connect("query", ["localhost"]) = pycassa.columnfamily.ColumnFamily(self.conn, "NoCache")
def run(self):
#automatic send every 100 cols
mutator = pycassa.batch.Mutator(self.conn, queue_size=100)
data = ("foo" * 10)[:25]
for i in xrange(self.cols):
mutator.insert(NOCACHE, self.row_key,
{col_pattern.format(i) : data})
threads = [Insert(key, cols) for key, cols in rows]
print "Starting %s processes..." % len(rows)
start = time.time()
map(Insert.start, threads)
alive = lambda: (t for t in threads if t.is_alive())
while (any(alive())):
if int(time.time() - start) % 5 == 0:
print "Alive count ", len(list(alive()))
print "Finish in ", time.time() - start
if __name__ == "__main__":
action = sys.argv[1] if len(sys.argv) > 1 else None
args = [
for token in sys.argv[2:]
if token.find("=") < 0
kwargs = dict(
for token in sys.argv[2:]
if token.find("=") > 0
func = globals().get(action)
if func:
func(*args, **kwargs)
