Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
POC memory leak, memory backend Riak < 2.0
import riak
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import deque
import time
import signal
def main_function():
client = riak.RiakClient(protocol='pbc', host='127.0.0.1', pbc_port=8097)
max_loops = 100
bucket = client.bucket('memory')
bucket.set_properties({"backend": "memory"})
quit = [False]
for x in xrange(0, max_loops):
keys = deque()
keys_to_delete = deque()
if quit[0]:
print('Bye!')
break
def signal_handler(*args):
print('You pressed Ctrl+C!')
quit[0] = True
signal.signal(signal.SIGINT, signal_handler)
for n in xrange(0, 50000):
keys.append('XXXXXXXXXX%d' % n)
def store(key):
RO = riak.RiakObject(client, bucket, key=key)
RO.data = 1234.5
RO.store(return_body=False, if_none_match=False)
def save():
with ThreadPoolExecutor(max_workers=10) as executor:
while True:
if not len(keys) or quit[0]:
break
k = keys.popleft()
keys_to_delete.append(k)
executor.submit(store, k)
def delete():
with ThreadPoolExecutor(max_workers=10) as executor:
while True:
if not len(keys_to_delete):
break
k = keys_to_delete.popleft()
executor.submit(bucket.delete, k)
m_pool = []
pool = ThreadPoolExecutor(max_workers=2)
m_pool.append(pool.submit(save))
time.sleep(2)
m_pool.append(pool.submit(delete))
for future in as_completed(m_pool):
try:
ret = future.result()
except Exception as e:
str(e)
print "Loop %d of %d" % (x, max_loops)
if __name__ == '__main__':
main_function()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.