Last active
August 1, 2017 08:54
-
-
Save abvgedeika/e8d4882bf4b404de6007331a205ce04a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# Usage: | |
# | |
# 1) ./schema_upgrade_tests.py setup | |
# | |
# Populates the cluster with tables and data. Idempotent. | |
# | |
# 2) ./schema_upgrade_tests.py validate | |
# | |
# Verifies the state by doing reads and updates. Idempotent. | |
# | |
# Separating setup from validation allows reusing the test cases for arbitrary | |
# combination of cluster upgrades or schema/data migration. | |
# | |
import argparse | |
import struct | |
import sys | |
from unittest import TestCase | |
from thrift.transport import TSocket | |
from thrift_bindings.v22.Cassandra import * | |
from thrift_bindings.v22.ttypes import NotFoundException | |
from cassandra.cluster import Cluster | |
tests = list() | |
thrift_client = None | |
cql_client = None | |
ks_name = 'test_upgrade_schema_ks' #'Keyspace1' | |
IP = "34.226.155.107" | |
def get_thrift_client(host=IP, port=9160): # 9160 | |
socket = TSocket.TSocket(host, port) | |
transport = TTransport.TFramedTransport(socket) | |
protocol = TBinaryProtocol.TBinaryProtocol(transport) | |
c = Client(protocol) | |
c.transport = transport | |
transport.open() | |
return c | |
def describe_table(ks, table): | |
for cf in thrift_client.describe_keyspace(ks).cf_defs: | |
if cf.name == table: | |
return cf | |
def _i32(n): | |
return struct.pack('>i', n) # big endian = network order | |
def _i64(n): | |
return struct.pack('>q', n) # big endian = network order | |
def test(test): | |
tests.append(test()) | |
class Test(TestCase): | |
def setUp(self): | |
pass | |
# Can be called multiple times after setUp so must be idempotent. | |
def runTest(self): | |
pass | |
@test | |
class TestNonCompoundThriftDynamicTable(Test): | |
keys = ['key%d' % i for i in range(10)] | |
def setUp(self): | |
cf = CfDef(ks_name, 'test_table0', | |
key_validation_class='UTF8Type', | |
comparator_type='LongType', | |
default_validation_class='Int32Type') | |
thrift_client.system_add_column_family(cf) | |
for key in self.keys: | |
thrift_client.insert(key, ColumnParent('test_table0'), Column(_i64(1), _i32(1), 1), ConsistencyLevel.ONE) | |
def runTest(self): | |
for key in self.keys: | |
self.assertEqual(thrift_client.get(key, ColumnPath('test_table0', column=_i64(1)), ConsistencyLevel.ONE).column, Column(_i64(1), _i32(1), 1)) | |
@test | |
class TestCqlTableMigration(Test): | |
def _insertData(self): | |
cql_client.execute("INSERT INTO test1 (k, ck, value, value2) VALUES (0, 0, 1, 2)") | |
cql_client.execute("INSERT INTO test1 (k, ck, value, value2) VALUES (1, 0, 1, 2)") | |
cql_client.execute("INSERT INTO test1 (k, ck, value, value2) VALUES (2, 0, 1, 2)") | |
cql_client.execute("INSERT INTO test2 (k, v1, v2) VALUES (0, 0, 1)") | |
cql_client.execute("INSERT INTO test2 (k, v1, v2) VALUES (1, 0, 1)") | |
cql_client.execute("INSERT INTO test2 (k, v1, v2) VALUES (2, 0, 1)") | |
cql_client.execute("INSERT INTO test3 (k, v1, v2) VALUES (0, 0, 1)") | |
cql_client.execute("INSERT INTO test3 (k, v1, v2) VALUES (1, 0, 1)") | |
cql_client.execute("INSERT INTO test3 (k, v1, v2) VALUES (2, 0, 1)") | |
cql_client.execute("INSERT INTO test4 (k, ck, ck1, v) VALUES (0, 0, 1, 2)") | |
cql_client.execute("INSERT INTO test4 (k, ck, ck1, v) VALUES (1, 0, 1, 2)") | |
cql_client.execute("INSERT INTO test4 (k, ck, ck1, v) VALUES (2, 0, 1, 2)") | |
cql_client.execute("INSERT INTO test5 (k, ck, value) VALUES (0, 0, 1)") | |
cql_client.execute("INSERT INTO test5 (k, ck, value) VALUES (1, 0, 1)") | |
cql_client.execute("INSERT INTO test5 (k, ck, value) VALUES (2, 0, 1)") | |
cql_client.execute("INSERT INTO test6 (k) VALUES (0)") | |
cql_client.execute("INSERT INTO test6 (k) VALUES (1)") | |
cql_client.execute("INSERT INTO test6 (k) VALUES (2)") | |
cql_client.execute("INSERT INTO test7 (k, ck) VALUES (0, 0)") | |
cql_client.execute("INSERT INTO test7 (k, ck) VALUES (1, 0)") | |
cql_client.execute("INSERT INTO test7 (k, ck) VALUES (2, 0)") | |
def _updateCounters(self, delta = 1): | |
cql_client.execute("UPDATE test8 set v = v + %d where k = 0 and ck = 0" % delta) | |
cql_client.execute("UPDATE test8 set v = v + %d where k = 1 and ck = 0" % delta) | |
cql_client.execute("UPDATE test8 set v = v + %d where k = 2 and ck = 0" % delta) | |
cql_client.execute("UPDATE test9 set v = v + %d where k = 0 and ck = 0" % delta) | |
cql_client.execute("UPDATE test9 set v = v + %d where k = 1 and ck = 0" % delta) | |
cql_client.execute("UPDATE test9 set v = v + %d where k = 2 and ck = 0" % delta) | |
cql_client.execute("UPDATE test10 set v = v + %d where k = 0" % delta) | |
cql_client.execute("UPDATE test10 set v = v + %d where k = 1" % delta) | |
cql_client.execute("UPDATE test10 set v = v + %d where k = 2" % delta) | |
cql_client.execute("UPDATE test11 set v = v + %d where k = 0" % delta) | |
cql_client.execute("UPDATE test11 set v = v + %d where k = 1" % delta) | |
cql_client.execute("UPDATE test11 set v = v + %d where k = 2" % delta) | |
def setUp(self): | |
cql_client.execute("create table test1 (k int, ck int, value int, value2 int, primary key (k, ck))") | |
cql_client.execute("create table test2 (k int, v1 int, v2 int, primary key (k))") | |
cql_client.execute("create table test3 (k int, v1 int, v2 int, primary key (k)) with compact storage") | |
cql_client.execute("create table test4 (k int, ck int, ck1 int, v int, primary key (k, ck, ck1)) with compact storage") | |
cql_client.execute("create table test5 (k int, ck int, value int, primary key (k, ck)) with compact storage") | |
cql_client.execute("create table test6 (k int primary key)") | |
cql_client.execute("create table test7 (k int, ck int, primary key (k, ck)) with compact storage") | |
cql_client.execute("create table test8 (k int, ck int, v counter, primary key (k, ck))") | |
cql_client.execute("create table test9 (k int, ck int, v counter, primary key (k, ck)) with compact storage") | |
cql_client.execute("create table test10 (k int, v counter, primary key (k))") | |
cql_client.execute("create table test11 (k int, v counter, primary key (k)) with compact storage") | |
self._insertData() | |
self._updateCounters() | |
def runTest(self): | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test1")), {(1, 0, 1, 2), (0, 0, 1, 2), (2, 0, 1, 2)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test2")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test3")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test4")), {(1, 0, 1, 2), (0, 0, 1, 2), (2, 0, 1, 2)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test5")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test6")), {(1, ), (0, ), (2, )}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test7")), {(1, 0), (0, 0), (2, 0)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test8")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test9")), {(1, 0, 1), (0, 0, 1), (2, 0, 1)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test10")), {(1, 1), (0, 1), (2, 1)}) | |
self.assertEqual(set(cql_client.execute("SELECT * FROM test11")), {(1, 1), (0, 1), (2, 1)}) | |
self._insertData() | |
self._updateCounters() | |
self._updateCounters(-1) | |
# Fails for Scylla 1.7 -> 2.0 migration due to https://github.com/scylladb/scylla/issues/2573 | |
@test | |
class TestNonStandardComparator(Test): | |
keys = ['key%d' % i for i in range(10)] | |
def _insertData(self): | |
for key in self.keys: | |
thrift_client.insert(key, ColumnParent('test_table'), Column(_i64(4), _i32(1), 1), ConsistencyLevel.ONE) | |
thrift_client.insert(key, ColumnParent('test_table'), Column(_i64(5), _i32(2), 2), ConsistencyLevel.ONE) | |
def setUp(self): | |
cf = CfDef(ks_name, 'test_table', | |
column_metadata=[ | |
ColumnDef(_i64(4), 'Int32Type'), | |
ColumnDef(_i64(5), 'Int32Type'), | |
# Fails due to https://github.com/scylladb/scylla/issues/2573 | |
# ColumnDef(_i64(-2), 'Int32Type') | |
], | |
key_validation_class='UTF8Type', | |
comparator_type='LongType', | |
default_validation_class='UTF8Type') | |
thrift_client.system_add_column_family(cf) | |
self._insertData() | |
def runTest(self): | |
for key in self.keys: | |
assert thrift_client.get(key, ColumnPath('test_table', column=_i64(4)), | |
ConsistencyLevel.ONE).column == Column(_i64(4), _i32(1), 1) | |
assert thrift_client.get(key, ColumnPath('test_table', column=_i64(5)), | |
ConsistencyLevel.ONE).column == Column(_i64(5), _i32(2), 2) | |
self._insertData() | |
cdef = describe_table(ks_name, 'test_table') | |
# Fails due to https://github.com/scylladb/scylla/issues/2573 | |
# assert cdef.comparator_type == "org.apache.cassandra.db.marshal.LongType" | |
# Fails due to https://github.com/scylladb/scylla/issues/1474 | |
#assert cdef.default_validation_class == "org.apache.cassandra.db.marshal.UTF8Type" | |
assert cdef.key_validation_class == "org.apache.cassandra.db.marshal.UTF8Type" | |
# Fails due to https://github.com/scylladb/scylla/issues/2588 only in 2.0 fixed | |
# @test | |
class TestThriftTableConvertedFromDenseToSparse(Test): | |
keys = ['key%d' % i for i in range(10)] | |
def setUp(self): | |
cf = CfDef(ks_name, 'test_table2', | |
key_validation_class='UTF8Type', | |
comparator_type='LongType', | |
default_validation_class='UTF8Type') | |
thrift_client.system_add_column_family(cf) | |
for key in self.keys: | |
thrift_client.insert(key, ColumnParent('test_table2'), Column(_i64(1), _i32(1), 1), ConsistencyLevel.ONE) | |
thrift_client.insert(key, ColumnParent('test_table2'), Column(_i64(2), _i32(1), 1), ConsistencyLevel.ONE) | |
thrift_client.insert(key, ColumnParent('test_table2'), Column(_i64(3), _i32(1), 1), ConsistencyLevel.ONE) | |
thrift_client.insert(key, ColumnParent('test_table2'), Column(_i64(4), _i32(1), 1), ConsistencyLevel.ONE) | |
cf.column_metadata = [ | |
ColumnDef(_i64(4), 'Int32Type') | |
] | |
thrift_client.system_update_column_family(cf) | |
def runTest(self): | |
for key in self.keys: | |
self.assertEqual(thrift_client.get(key, ColumnPath('test_table2', column=_i64(1)), ConsistencyLevel.ONE).column, Column(_i64(1), _i32(1), 1)) | |
self.assertEqual(thrift_client.get(key, ColumnPath('test_table2', column=_i64(2)), ConsistencyLevel.ONE).column, Column(_i64(2), _i32(1), 1)) | |
self.assertEqual(thrift_client.get(key, ColumnPath('test_table2', column=_i64(3)), ConsistencyLevel.ONE).column, Column(_i64(3), _i32(1), 1)) | |
self.assertEqual(thrift_client.get(key, ColumnPath('test_table2', column=_i64(4)), ConsistencyLevel.ONE).column, Column(_i64(4), _i32(1), 1)) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Tests upgrade and migration scenarios') | |
parser.add_argument('mode', choices=['setup', 'validate', 'print-thrift-schema']) | |
args = parser.parse_args() | |
thrift_client = get_thrift_client() | |
cluster = Cluster(contact_points=[IP]) | |
cql_client = cluster.connect() | |
if args.mode == 'setup': | |
try: | |
thrift_client.describe_keyspace(ks_name) | |
cql_client.execute("drop keyspace \"%s\"" % ks_name) | |
except NotFoundException: | |
pass | |
thrift_client.system_add_keyspace( | |
KsDef(ks_name, 'org.apache.cassandra.locator.SimpleStrategy', {'replication_factor': '1'}, cf_defs=[])) | |
elif args.mode == 'print-thrift-schema': | |
ks = thrift_client.describe_keyspace(ks_name) | |
ks.cf_defs = sorted(ks.cf_defs, key=lambda cf: cf.name) | |
print(ks) | |
sys.exit(0) | |
thrift_client.set_keyspace(ks_name) | |
cql_client.set_keyspace(ks_name) | |
for test in tests: | |
if args.mode == 'setup': | |
test.setUp() | |
else: | |
test.runTest() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment