Skip to content

Instantly share code, notes, and snippets.

@beltran
Created March 9, 2017 00:19
Show Gist options
  • Save beltran/87ab88599178627ffe4caf5044282493 to your computer and use it in GitHub Desktop.
Save beltran/87ab88599178627ffe4caf5044282493 to your computer and use it in GitHub Desktop.
Memory grows when using prepared queries and eventles, this is what's happening in endurance tests, because memory is all taken by the driver
# -*- coding: utf-8 -*-
import os
import uuid
import time
import dse
from dse import cluster as cassandra_cluster
#from dse.io.geventreactor import GeventConnection
from dse.io.eventletreactor import EventletConnection
from dse.io.libevreactor import LibevConnection
from dse.io.asyncorereactor import AsyncoreConnection
from dse.concurrent import execute_concurrent
import concurrent
import os
import gc
import resource
from itertools import cycle
import traceback
from dse import ConsistencyLevel
from dse.query import SimpleStatement
import sys
connection_class=EventletConnection
import code
import signal
if connection_class == EventletConnection:
from eventlet import monkey_patch
monkey_patch()
signal.signal(signal.SIGUSR2, lambda sig, frame: code.interact())
local_pid = os.getpid()
cluster = cassandra_cluster.Cluster(("127.0.0.1", "127.0.0.2", "127.0.0.3"), connection_class=connection_class)
session = cluster.connect()
session.execute('''
CREATE KEYSPACE IF NOT EXISTS test3rf
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}''')
session.execute('''
CREATE TABLE IF NOT EXISTS test3rf.test (
k int PRIMARY KEY,
v int )''')
def run_query(query, futures=None):
"""
Runs a query aggregates the futures into if needed and handles all exception logging and marking
"""
if futures is None:
futures=[]
try:
futures.append(session.execute_async(query, timeout=1))
return futures
except Exception as e:
traceback.print_exc()
def wait_for_futures(futures):
for future in futures:
# Blocks until future returns either data or an exception is encounted. If an exception is enconted it's logged
# and handled appropriately.
try:
future.result()
except Exception as e:
return True;
return False;
def one_iteration():
futures = []
timestamp = int(time.time() * 1000)
insert = session.prepare("INSERT INTO {0} (k, v) VALUES (?, ?) USING TIMESTAMP ?".
format("test3rf.test"))
insert.consistency_level = ConsistencyLevel.QUORUM
for i in range(copies):
query = insert.bind([i, i, timestamp])
futures = run_query(query, futures)
wait_for_futures(futures)
num_threads = 50
pool = concurrent.futures.ThreadPoolExecutor(max_workers=num_threads)
copies = 100
# Write copies of the key/value pair using key as the partition key and copy as the clustering key
count = 0
while 1:
for i in range(num_threads):
pool.submit(one_iteration)
time.sleep(0.001)
if not count % 10:
gc.collect()
print(resource.getrusage(resource.RUSAGE_SELF))
print(gc.garbage)
count += 1
print("The end")
print(gc.garbage)
from tests.integration import use_cluster, get_node, use_singledc
import time
from random import randint
import sys
import os
use_singledc()
print("\n\nLaunch poc_hangs now")
count= 2
while 1:
time.sleep(5)
this_time_node = (count % 3) + 1
print("Stopping node {0}".format(this_time_node))
node = get_node(this_time_node)
node.stop()
"Starting node"
time.sleep(5)
node.start()
print("Node active now\n".format(this_time_node))
count += 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment