Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Created August 31, 2022 23:38
Show Gist options
  • Save kmuthukk/bd5ebf6dcbecc61b2647c47718ddcdfb to your computer and use it in GitHub Desktop.
Save kmuthukk/bd5ebf6dcbecc61b2647c47718ddcdfb to your computer and use it in GitHub Desktop.
Test for SCAN with LIMIT clause in presence of deleted, but perhaps not yet compacted, rows
# pip install yb-cassandra-driver
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
import time
import random
from multiprocessing.dummy import Pool as ThreadPool
# Load Phase params
num_offsets=10000
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
def create_table():
start_time = time.time()
session.execute("""CREATE KEYSPACE IF NOT EXISTS k""");
session.execute("""USE k""");
session.execute("""DROP TABLE IF EXISTS msg_q""");
now_time = time.time()
print("Dropped (if exists): msg_q table")
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
start_time = time.time()
session.execute("""
CREATE TABLE IF NOT EXISTS msg_q(
id text,
from_offset int,
ts int,
until_offset int,
PRIMARY KEY((id), from_offset, ts)
) WITH CLUSTERING ORDER BY (from_offset ASC, ts ASC)
""")
now_time = time.time()
print("Created msg_q table")
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
def load_data():
insert_stmt = session.prepare("""INSERT INTO msg_q (id, from_offset, ts, until_offset) """
+ """ VALUES (?, ?, ?, ?)""")
start_time = time.time()
for idx in range(num_offsets):
session.execute(insert_stmt, ["partition-0",
idx,
1000,
idx+1])
now_time = time.time()
print("Avg Insert Time: %s ms" % ((now_time - start_time) * 1000 / (num_offsets)))
def query_data():
print("Querying data")
select_stmt = session.prepare("""SELECT from_offset, until_offset FROM msg_q WHERE id = ? ORDER BY from_offset DESC LIMIT 1""")
num_reads = 100
start_time = time.time()
for idx in range(num_reads):
results = session.execute(select_stmt, ["partition-0"])
now_time = time.time()
delta = now_time - start_time
# print one sample row for debug purposes
result_list = list(results)
if (len(result_list) == 1):
print(result_list[0])
else:
print("Found %s rows " % len(result_list))
print("Avg Read Time: %s ms" % (delta * 1000 / (num_reads)))
def delete_all_but_latest():
print("Deleting all but the latest offset")
delete_stmt = session.prepare("""DELETE FROM msg_q WHERE id = ? AND from_offset = ?""")
for idx in range(num_offsets - 1):
session.execute(delete_stmt, ["partition-0",
idx])
def delete_latest_as_well():
print("Deleting the latest offset")
delete_stmt = session.prepare("""DELETE FROM msg_q WHERE id = ? AND from_offset = ?""")
session.execute(delete_stmt, ["partition-0",
num_offsets - 1])
# Main
create_table()
load_data()
query_data()
delete_all_but_latest()
query_data()
delete_latest_as_well()
query_data()
@kmuthukk
Copy link
Author

kmuthukk commented Aug 31, 2022

$ python ~/notes/ycql_scan_and_delete.py
Dropped (if exists): msg_q table
Time: 37.2800827026 ms
====================
Created msg_q table
Time: 64.8801326752 ms
====================
Avg Insert Time: 0.809414792061 ms
Querying data
Row(from_offset=9999, until_offset=10000)
Avg Read Time: 1.14317893982 ms                    --> EXPECTED

Deleting all but the latest offset
Querying data
Row(from_offset=9999, until_offset=10000)
Avg Read Time: 61.0519099236 ms                 --> NOT EXPECTED; SINCE THE ROW WE WANT IS THERE AND FOUND EARLY IN THE REVERSE SCAN; is it because the scanner doesn't know that all columns of interest have been found?

Deleting the latest offset
Querying data
Found 0 rows
Avg Read Time: 62.8676486015 ms                --> EXPECTED; because all the rows for this "id" have been deleted, but compactions haven't happened; so we end up scanning everything that matches "id" only to find that everything is marked deleted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment