Created
August 31, 2022 23:38
-
-
Save kmuthukk/bd5ebf6dcbecc61b2647c47718ddcdfb to your computer and use it in GitHub Desktop.
Test for SCAN with LIMIT clause in presence of deleted, but perhaps not yet compacted, rows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# pip install yb-cassandra-driver | |
from cassandra.cluster import Cluster | |
from cassandra import ConsistencyLevel | |
import time | |
import random | |
from multiprocessing.dummy import Pool as ThreadPool | |
# Load Phase params | |
num_offsets=10000 | |
cluster = Cluster(['127.0.0.1']) | |
session = cluster.connect() | |
def create_table(): | |
start_time = time.time() | |
session.execute("""CREATE KEYSPACE IF NOT EXISTS k"""); | |
session.execute("""USE k"""); | |
session.execute("""DROP TABLE IF EXISTS msg_q"""); | |
now_time = time.time() | |
print("Dropped (if exists): msg_q table") | |
print("Time: %s ms" % ((now_time - start_time) * 1000)) | |
print("====================") | |
start_time = time.time() | |
session.execute(""" | |
CREATE TABLE IF NOT EXISTS msg_q( | |
id text, | |
from_offset int, | |
ts int, | |
until_offset int, | |
PRIMARY KEY((id), from_offset, ts) | |
) WITH CLUSTERING ORDER BY (from_offset ASC, ts ASC) | |
""") | |
now_time = time.time() | |
print("Created msg_q table") | |
print("Time: %s ms" % ((now_time - start_time) * 1000)) | |
print("====================") | |
def load_data(): | |
insert_stmt = session.prepare("""INSERT INTO msg_q (id, from_offset, ts, until_offset) """ | |
+ """ VALUES (?, ?, ?, ?)""") | |
start_time = time.time() | |
for idx in range(num_offsets): | |
session.execute(insert_stmt, ["partition-0", | |
idx, | |
1000, | |
idx+1]) | |
now_time = time.time() | |
print("Avg Insert Time: %s ms" % ((now_time - start_time) * 1000 / (num_offsets))) | |
def query_data(): | |
print("Querying data") | |
select_stmt = session.prepare("""SELECT from_offset, until_offset FROM msg_q WHERE id = ? ORDER BY from_offset DESC LIMIT 1""") | |
num_reads = 100 | |
start_time = time.time() | |
for idx in range(num_reads): | |
results = session.execute(select_stmt, ["partition-0"]) | |
now_time = time.time() | |
delta = now_time - start_time | |
# print one sample row for debug purposes | |
result_list = list(results) | |
if (len(result_list) == 1): | |
print(result_list[0]) | |
else: | |
print("Found %s rows " % len(result_list)) | |
print("Avg Read Time: %s ms" % (delta * 1000 / (num_reads))) | |
def delete_all_but_latest(): | |
print("Deleting all but the latest offset") | |
delete_stmt = session.prepare("""DELETE FROM msg_q WHERE id = ? AND from_offset = ?""") | |
for idx in range(num_offsets - 1): | |
session.execute(delete_stmt, ["partition-0", | |
idx]) | |
def delete_latest_as_well(): | |
print("Deleting the latest offset") | |
delete_stmt = session.prepare("""DELETE FROM msg_q WHERE id = ? AND from_offset = ?""") | |
session.execute(delete_stmt, ["partition-0", | |
num_offsets - 1]) | |
# Main | |
create_table() | |
load_data() | |
query_data() | |
delete_all_but_latest() | |
query_data() | |
delete_latest_as_well() | |
query_data() |
Author
kmuthukk
commented
Aug 31, 2022
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment