Skip to content

Instantly share code, notes, and snippets.

@abvgedeika
Last active August 1, 2017 08:54
Show Gist options
  • Save abvgedeika/e8d4882bf4b404de6007331a205ce04a to your computer and use it in GitHub Desktop.
Save abvgedeika/e8d4882bf4b404de6007331a205ce04a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
#
# Usage:
#
# 1) ./schema_upgrade_tests.py setup
#
# Populates the cluster with tables and data. Idempotent.
#
# 2) ./schema_upgrade_tests.py validate
#
# Verifies the state by doing reads and updates. Idempotent.
#
# Separating setup from validation allows reusing the test cases for arbitrary
# combination of cluster upgrades or schema/data migration.
#
import argparse
import struct
import sys
from unittest import TestCase
from thrift.transport import TSocket
from thrift_bindings.v22.Cassandra import *
from thrift_bindings.v22.ttypes import NotFoundException
from cassandra.cluster import Cluster
tests = list()
thrift_client = None
cql_client = None
ks_name = 'test_upgrade_schema_ks' #'Keyspace1'
IP = "34.226.155.107"
def get_thrift_client(host=IP, port=9160): # 9160
socket = TSocket.TSocket(host, port)
transport = TTransport.TFramedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
c = Client(protocol)
c.transport = transport
transport.open()
return c
def describe_table(ks, table):
for cf in thrift_client.describe_keyspace(ks).cf_defs:
if cf.name == table:
return cf
def _i32(n):
return struct.pack('>i', n) # big endian = network order
def _i64(n):
return struct.pack('>q', n) # big endian = network order
def test(test):
tests.append(test())
class Test(TestCase):
def setUp(self):
pass
# Can be called multiple times after setUp so must be idempotent.
def runTest(self):
pass
@test
class TestNonCompoundThriftDynamicTable(Test):
keys = ['key%d' % i for i in range(10)]
def setUp(self):
cf = CfDef(ks_name, 'test_table0',
key_validation_class='UTF8Type',
comparator_type='LongType',
default_validation_class='Int32Type')
thrift_client.system_add_column_family(cf)
for key in self.keys:
thrift_client.insert(key, ColumnParent('test_table0'), Column(_i64(1), _i32(1), 1), ConsistencyLevel.ONE)
def runTest(self):
for key in self.keys:
self.assertEqual(thrift_client.get(key, ColumnPath('test_table0', column=_i64(1)), ConsistencyLevel.ONE).column, Column(_i64(1), _i32(1), 1))
@test
class TestCqlTableMigration(Test):
def _insertData(self):
cql_client.execute("INSERT INTO test1 (k, ck, value, value2) VALUES (0, 0, 1, 2)")
cql_client.execute("INSERT INTO test1 (k, ck, value, value2) VALUES (1, 0, 1, 2)")
cql_client.execute("INSERT INTO test1 (k, ck, value, value2) VALUES (2, 0, 1, 2)")
cql_client.execute("INSERT INTO test2 (k, v1, v2) VALUES (0, 0, 1)")
cql_client.execute("INSERT INTO test2 (k, v1, v2) VALUES (1, 0, 1)")
cql_client.execute("INSERT INTO test2 (k, v1, v2) VALUES (2, 0, 1)")
cql_client.execute("INSERT INTO test3 (k, v1, v2) VALUES (0, 0, 1)")
cql_client.execute("INSERT INTO test3 (k, v1, v2) VALUES (1, 0, 1)")
cql_client.execute("INSERT INTO test3 (k, v1, v2) VALUES (2, 0, 1)")
cql_client.execute("INSERT INTO test4 (k, ck, ck1, v) VALUES (0, 0, 1, 2)")
cql_client.execute("INSERT INTO test4 (k, ck, ck1, v) VALUES (1, 0, 1, 2)")
cql_client.execute("INSERT INTO test4 (k, ck, ck1, v) VALUES (2, 0, 1, 2)")
cql_client.execute("INSERT INTO test5 (k, ck, value) VALUES (0, 0, 1)")
cql_client.execute("INSERT INTO test5 (k, ck, value) VALUES (1, 0, 1)")
cql_client.execute("INSERT INTO test5 (k, ck, value) VALUES (2, 0, 1)")
cql_client.execute("INSERT INTO test6 (k) VALUES (0)")
cql_client.execute("INSERT INTO test6 (k) VALUES (1)")
cql_client.execute("INSERT INTO test6 (k) VALUES (2)")
cql_client.execute("INSERT INTO test7 (k, ck) VALUES (0, 0)")
cql_client.execute("INSERT INTO test7 (k, ck) VALUES (1, 0)")
cql_client.execute("INSERT INTO test7 (k, ck) VALUES (2, 0)")
def _updateCounters(self, delta = 1):
cql_client.execute("UPDATE test8 set v = v + %d where k = 0 and ck = 0" % delta)
cql_client.execute("UPDATE test8 set v = v + %d where k = 1 and ck = 0" % delta)
cql_client.execute("UPDATE test8 set v = v + %d where k = 2 and ck = 0" % delta)
cql_client.execute("UPDATE test9 set v = v + %d where k = 0 and ck = 0" % delta)
cql_client.execute("UPDATE test9 set v = v + %d where k = 1 and ck = 0" % delta)
cql_client.execute("UPDATE test9 set v = v + %d where k = 2 and ck = 0" % delta)
cql_client.execute("UPDATE test10 set v = v + %d where k = 0" % delta)
cql_client.execute("UPDATE test10 set v = v + %d where k = 1" % delta)
cql_client.execute("UPDATE test10 set v = v + %d where k = 2" % delta)
cql_client.execute("UPDATE test11 set v = v + %d where k = 0" % delta)
cql_client.execute("UPDATE test11 set v = v + %d where k = 1" % delta)
cql_client.execute("UPDATE test11 set v = v + %d where k = 2" % delta)
def setUp(self):
cql_client.execute("create table test1 (k int, ck int, value int, value2 int, primary key (k, ck))")
cql_client.execute("create table test2 (k int, v1 int, v2 int, primary key (k))")
cql_client.execute("create table test3 (k int, v1 int, v2 int, primary key (k)) with compact storage")
cql_client.execute("create table test4 (k int, ck int, ck1 int, v int, primary key (k, ck, ck1)) with compact storage")
cql_client.execute("create table test5 (k int, ck int, value int, primary key (k, ck)) with compact storage")
cql_client.execute("create table test6 (k int primary key)")
cql_client.execute("create table test7 (k int, ck int, primary key (k, ck)) with compact storage")
cql_client.execute("create table test8 (k int, ck int, v counter, primary key (k, ck))")
cql_client.execute("create table test9 (k int, ck int, v counter, primary key (k, ck)) with compact storage")
cql_client.execute("create table test10 (k int, v counter, primary key (k))")
cql_client.execute("create table test11 (k int, v counter, primary key (k)) with compact storage")
self._insertData()
self._updateCounters()
def runTest(self):
self.assertEqual(set(cql_client.execute("SELECT * FROM test1")), {(1, 0, 1, 2), (0, 0, 1, 2), (2, 0, 1, 2)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test2")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test3")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test4")), {(1, 0, 1, 2), (0, 0, 1, 2), (2, 0, 1, 2)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test5")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test6")), {(1, ), (0, ), (2, )})
self.assertEqual(set(cql_client.execute("SELECT * FROM test7")), {(1, 0), (0, 0), (2, 0)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test8")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test9")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test10")), {(1, 1), (0, 1), (2, 1)})
self.assertEqual(set(cql_client.execute("SELECT * FROM test11")), {(1, 1), (0, 1), (2, 1)})
self._insertData()
self._updateCounters()
self._updateCounters(-1)
# Fails for Scylla 1.7 -> 2.0 migration due to https://github.com/scylladb/scylla/issues/2573
@test
class TestNonStandardComparator(Test):
keys = ['key%d' % i for i in range(10)]
def _insertData(self):
for key in self.keys:
thrift_client.insert(key, ColumnParent('test_table'), Column(_i64(4), _i32(1), 1), ConsistencyLevel.ONE)
thrift_client.insert(key, ColumnParent('test_table'), Column(_i64(5), _i32(2), 2), ConsistencyLevel.ONE)
def setUp(self):
cf = CfDef(ks_name, 'test_table',
column_metadata=[
ColumnDef(_i64(4), 'Int32Type'),
ColumnDef(_i64(5), 'Int32Type'),
# Fails due to https://github.com/scylladb/scylla/issues/2573
# ColumnDef(_i64(-2), 'Int32Type')
],
key_validation_class='UTF8Type',
comparator_type='LongType',
default_validation_class='UTF8Type')
thrift_client.system_add_column_family(cf)
self._insertData()
def runTest(self):
for key in self.keys:
assert thrift_client.get(key, ColumnPath('test_table', column=_i64(4)),
ConsistencyLevel.ONE).column == Column(_i64(4), _i32(1), 1)
assert thrift_client.get(key, ColumnPath('test_table', column=_i64(5)),
ConsistencyLevel.ONE).column == Column(_i64(5), _i32(2), 2)
self._insertData()
cdef = describe_table(ks_name, 'test_table')
# Fails due to https://github.com/scylladb/scylla/issues/2573
# assert cdef.comparator_type == "org.apache.cassandra.db.marshal.LongType"
# Fails due to https://github.com/scylladb/scylla/issues/1474
#assert cdef.default_validation_class == "org.apache.cassandra.db.marshal.UTF8Type"
assert cdef.key_validation_class == "org.apache.cassandra.db.marshal.UTF8Type"
# Fails due to https://github.com/scylladb/scylla/issues/2588 only in 2.0 fixed
# @test
class TestThriftTableConvertedFromDenseToSparse(Test):
keys = ['key%d' % i for i in range(10)]
def setUp(self):
cf = CfDef(ks_name, 'test_table2',
key_validation_class='UTF8Type',
comparator_type='LongType',
default_validation_class='UTF8Type')
thrift_client.system_add_column_family(cf)
for key in self.keys:
thrift_client.insert(key, ColumnParent('test_table2'), Column(_i64(1), _i32(1), 1), ConsistencyLevel.ONE)
thrift_client.insert(key, ColumnParent('test_table2'), Column(_i64(2), _i32(1), 1), ConsistencyLevel.ONE)
thrift_client.insert(key, ColumnParent('test_table2'), Column(_i64(3), _i32(1), 1), ConsistencyLevel.ONE)
thrift_client.insert(key, ColumnParent('test_table2'), Column(_i64(4), _i32(1), 1), ConsistencyLevel.ONE)
cf.column_metadata = [
ColumnDef(_i64(4), 'Int32Type')
]
thrift_client.system_update_column_family(cf)
def runTest(self):
for key in self.keys:
self.assertEqual(thrift_client.get(key, ColumnPath('test_table2', column=_i64(1)), ConsistencyLevel.ONE).column, Column(_i64(1), _i32(1), 1))
self.assertEqual(thrift_client.get(key, ColumnPath('test_table2', column=_i64(2)), ConsistencyLevel.ONE).column, Column(_i64(2), _i32(1), 1))
self.assertEqual(thrift_client.get(key, ColumnPath('test_table2', column=_i64(3)), ConsistencyLevel.ONE).column, Column(_i64(3), _i32(1), 1))
self.assertEqual(thrift_client.get(key, ColumnPath('test_table2', column=_i64(4)), ConsistencyLevel.ONE).column, Column(_i64(4), _i32(1), 1))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Tests upgrade and migration scenarios')
parser.add_argument('mode', choices=['setup', 'validate', 'print-thrift-schema'])
args = parser.parse_args()
thrift_client = get_thrift_client()
cluster = Cluster(contact_points=[IP])
cql_client = cluster.connect()
if args.mode == 'setup':
try:
thrift_client.describe_keyspace(ks_name)
cql_client.execute("drop keyspace \"%s\"" % ks_name)
except NotFoundException:
pass
thrift_client.system_add_keyspace(
KsDef(ks_name, 'org.apache.cassandra.locator.SimpleStrategy', {'replication_factor': '1'}, cf_defs=[]))
elif args.mode == 'print-thrift-schema':
ks = thrift_client.describe_keyspace(ks_name)
ks.cf_defs = sorted(ks.cf_defs, key=lambda cf: cf.name)
print(ks)
sys.exit(0)
thrift_client.set_keyspace(ks_name)
cql_client.set_keyspace(ks_name)
for test in tests:
if args.mode == 'setup':
test.setUp()
else:
test.runTest()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment