Skip to content

Instantly share code, notes, and snippets.

@mambocab
Created August 11, 2016 17:10
Show Gist options
  • Save mambocab/373beaa9ef9050708130bf2051375498 to your computer and use it in GitHub Desktop.
Save mambocab/373beaa9ef9050708130bf2051375498 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from __future__ import print_function
import errno
import logging
import os
import shutil
import time
import uuid
from collections import namedtuple
import cassandra.cluster
import ccmlib.cluster
import ccmlib.cluster_factory
from cassandra.concurrent import execute_concurrent
from nose.tools import assert_equal, assert_not_equal
LOG = logging.getLogger(__name__)
logging.getLogger('cassandra').setLevel('INFO')
CLUSTERS_DIR = './clusters'
_16_uuid_column_spec = (
'a uuid PRIMARY KEY, b uuid, c uuid, d uuid, e uuid, f uuid, g uuid, '
'h uuid, i uuid, j uuid, k uuid, l uuid, m uuid, n uuid, o uuid, '
'p uuid'
)
class log_filter(object):
def __init__(self, logger, object_with_filter):
self.logger, self.object_with_filter = logger, object_with_filter
def __enter__(self):
self.logger.addFilter(self.object_with_filter)
def __exit__(self, exc_type, exc_value, traceback):
self.logger.addFilter(self.object_with_filter)
class expect_control_connection_failures(object):
"""
We're just using a class here as a one-off object with a filter method, for
use as a filter object in the driver logger. It's frustrating that we can't
just pass in a function, but we need an object with a .filter method. Oh
well, I guess that's what old stdlib libraries are like.
"""
@staticmethod
def filter(record):
expected_strings = [
'Control connection failed to connect, shutting down Cluster:',
'[control connection] Error connecting to '
]
for s in expected_strings:
if s in record.msg or s in record.name:
return False
return True
def _get_16_uuid_insert_stmt(ks_name, table_name):
return (
'INSERT INTO {ks_name}.{table_name} '
'(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) '
'VALUES (uuid(), uuid(), uuid(), uuid(), uuid(), '
'uuid(), uuid(), uuid(), uuid(), uuid(), uuid(), '
'uuid(), uuid(), uuid(), uuid(), uuid())'
).format(ks_name=ks_name, table_name=table_name)
ClusterInfo = namedtuple('ClusterInfo', ['path', 'name'])
TableInfoNamedtuple = namedtuple('TableInfoNamedtuple', [
# required
'ks_name', 'table_name', 'column_spec',
# optional
'options', 'insert_stmt',
# derived
'name', 'create_stmt'
])
class TableInfo(TableInfoNamedtuple):
__slots__ = ()
def __new__(cls, ks_name, table_name, column_spec, options=None, insert_stmt=None):
name = ks_name + '.' + table_name
create_stmt = get_create_table_statement(ks_name, table_name, column_spec, options)
self = super(TableInfo, cls).__new__(
cls,
# required
ks_name=ks_name, table_name=table_name, column_spec=column_spec,
# optional
options=options, insert_stmt=insert_stmt,
# derived
name=name, create_stmt=create_stmt
)
return self
def _move_contents(source_dir, dest_dir):
source_filenames = os.listdir(source_dir)
LOG.info('about to move the following files: {}'.format(source_filenames))
for source_filename in source_filenames:
source_path, dest_path = (os.path.join(source_dir, source_filename),
os.path.join(dest_dir, source_filename))
LOG.info('moving {} to {}'.format(source_path, dest_path))
shutil.move(source_path, dest_path)
def get_create_table_statement(ks_name, table_name, column_spec, options=None):
if options:
options_pairs = ('{k}={v}'.format(k=k, v=v) for (k, v) in options.iteritems())
options_string = 'WITH ' + ' AND '.join(options_pairs)
else:
options_string = ''
return (
'CREATE TABLE ' + ks_name + '.' + table_name + ' '
'(' + column_spec + ') ' + options_string
)
def get_16_uuid_insert_stmt(ks_name, table_name):
return (
'INSERT INTO {ks_name}.{table_name} '
'(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p) '
'VALUES (uuid(), uuid(), uuid(), uuid(), uuid(), '
'uuid(), uuid(), uuid(), uuid(), uuid(), uuid(), '
'uuid(), uuid(), uuid(), uuid(), uuid())'
).format(ks_name=ks_name, table_name=table_name)
def remove_cluster_if_exists(path, name):
LOG.info('removing ' + name + ' from ' + path + '...')
cluster_to_delete = None
try:
cluster_to_delete = ccmlib.cluster_factory.ClusterFactory.load(
path,
name
)
except IOError as e:
if e.errno != errno.ENOENT:
raise e
LOG.info("cluster didn't exist")
if cluster_to_delete is not None:
cluster_to_delete.remove()
LOG.info('removed')
def fresh_cluster(path, name):
remove_cluster_if_exists(path, name)
cluster = ccmlib.cluster.Cluster(
path,
name,
version='github:mambocab/mambocab_debug-8844_review',
verbose=True,
create_dir=True,
)
cluster.populate(1)
cluster.set_configuration_options(
{'cdc_enabled': 'true', 'commitlog_segment_size_in_mb': 1}
).set_batch_commitlog(True)
cluster.set_log_level('TRACE')
with log_filter(logging.getLogger('cassandra.cluster'), expect_control_connection_failures):
cluster.start(wait_for_binary_proto=True)
LOG.info('successfully started new cluster')
return cluster
def init_ks(ks, session=None):
LOG.info('creating ks ' + ks)
if not session:
LOG.info('getting a session')
session = cassandra.cluster.Cluster().connect()
LOG.info('got it')
return session.execute(
"CREATE KEYSPACE " + ks + " WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}"
)
if __name__ == '__main__':
ks = 'ks'
try:
os.mkdir(CLUSTERS_DIR)
except OSError as e:
assert e.errno == errno.EEXIST
data_loading_cluster_info = ClusterInfo(CLUSTERS_DIR, 'data-loading-cluster')
data_generation_cluster_info = ClusterInfo(CLUSTERS_DIR, 'data-generation-cluster')
remove_cluster_if_exists(path=data_loading_cluster_info.path,
name=data_loading_cluster_info.name)
remove_cluster_if_exists(path=data_generation_cluster_info.path,
name=data_generation_cluster_info.name)
data_generation_cluster = fresh_cluster(data_generation_cluster_info.path,
data_generation_cluster_info.name)
session = cassandra.cluster.Cluster().connect()
init_ks(ks, session)
table_info = TableInfo(
ks_name=ks, table_name='cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks, 'cdc_tab'),
options=dict(cdc='true', id=uuid.uuid4()),
)
LOG.info('about to execute `' + table_info.create_stmt + '`')
session.execute(table_info.create_stmt)
prepared_insert = session.prepare(table_info.insert_stmt)
execute_concurrent(
session,
((prepared_insert, ()) for _ in range(10000)),
concurrency=500,
raise_on_first_error=True
)
data_in_generation_cluster = list(session.execute('SELECT * FROM ' + table_info.name))
assert 10000 == len(data_in_generation_cluster)
# block on finding commitlogs
wait_for_cl_start, cl_found = time.time(), False
while not cl_found:
assert time.time() - wait_for_cl_start < 120
LOG.info('checking for commitlog segments')
if os.listdir(os.path.join(data_generation_cluster.nodelist()[0].get_path(), 'commitlogs')):
cl_found = True
time.sleep(4)
# write to non-cdc table. we want to trigger the creation of more
# commitlogs so we can be certain stuff the entire cdc table is actually
# present in commitlogs
pre_filler_cls = set(os.listdir(
os.path.join(data_generation_cluster.nodelist()[0].get_path(), 'commitlogs')
))
filler_table_info = TableInfo(
ks_name=ks, table_name='non_cdc_tab',
column_spec=_16_uuid_column_spec,
insert_stmt=_get_16_uuid_insert_stmt(ks, 'non_cdc_tab'),
options=dict(id=uuid.uuid4()),
)
LOG.info('about to execute `' + filler_table_info.create_stmt + '`')
session.execute(filler_table_info.create_stmt)
filler_insert = session.prepare(filler_table_info.insert_stmt)
execute_concurrent(
session,
((filler_insert, ()) for _ in range(30000)),
concurrency=500,
raise_on_first_error=True
)
data_generation_cluster.drain()
# make sure we actually got some new commitlogs from putting stuff in the filler table.
assert_not_equal(
set(os.listdir(os.path.join(data_generation_cluster.nodelist()[0].get_path(), 'commitlogs'))),
pre_filler_cls
)
assert_not_equal(
[],
os.listdir(
os.path.join(data_generation_cluster.nodelist()[0].get_path(), 'cdc_raw')
)
)
LOG.info('stopping ' + data_generation_cluster_info.name)
data_generation_cluster.stop()
session.cluster.shutdown()
LOG.info('creating new cluster')
data_loading_cluster = fresh_cluster(data_loading_cluster_info.path,
data_loading_cluster_info.name)
LOG.info('successfully created ' + data_loading_cluster_info.name + '. connecting')
session = cassandra.cluster.Cluster().connect()
init_ks(ks, session)
LOG.info('ks inited. creating table')
session.execute(table_info.create_stmt)
LOG.info('table created. stopping')
data_loading_cluster.stop()
session.cluster.shutdown()
_move_contents(
os.path.join(data_generation_cluster.nodelist()[0].get_path(), 'cdc_raw'),
os.path.join(data_loading_cluster.nodelist()[0].get_path(), 'commitlogs')
)
LOG.info('starting cluster')
with log_filter(logging.getLogger('cassandra.cluster'), expect_control_connection_failures):
data_loading_cluster.start(wait_for_binary_proto=True)
LOG.info('cluster successfully started')
data_loading_cluster.nodelist()[0].watch_log_for("Log replay complete")
LOG.info('log replayed')
LOG.info(data_loading_cluster.nodelist()[0].grep_log('replayed'))
session = cassandra.cluster.Cluster().connect()
data_in_loading_cluster = list(session.execute('SELECT * FROM ' + table_info.name))
assert_equal(len(data_in_generation_cluster), len(data_in_loading_cluster))
assert_equal(data_in_generation_cluster, data_in_loading_cluster)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment