Skip to content

@ajdavis /rb_pool.py
Last active

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Pre-RUBY-556-fix Ruby driver pool simulation, measuring socket wastage.
import collections
import heapq
import time
from random import seed, randint
# Configuration
seed(time.time())
def query_time():
return randint(0, 20)
def intra_query_time():
# For now, threads want to query again as soon as they've checked in their
# socket after the previous query
return 1
# return randint(0, 10)
def simulate(nthreads, max_size, max_acquires, broadcast=False):
"""Returns average socket utilization: sockets in use per unit time.
:Parameters:
- `nthreads`: Number of threads contending for sockets
- `max_size`: Max number of sockets to create
- `max_acquires`: How many acquire operations before simulation stops
- `broadcast`: If True, include the RUBY-556 fix
"""
# Simulation state
thread_to_sock = {}
threads_with_sockets = set()
sockets_in_use = set()
q = collections.deque()
nacquires = 0
# Threads in use per unit time
total_utilization = 0
# print (
# 'nthreads', nthreads, 'max_size', max_size, 'max_tries', max_acquires,
# 'rseed', rseed)
# List of (wake_time, thread_id)
runlist = [(0, thread_id) for thread_id in range(nthreads)]
current_time = start_time = 0
min_sock_id = 0
while nacquires < max_acquires:
current_time, thread_id = heapq.heappop(runlist)
if thread_id not in threads_with_sockets:
nacquires += 1
# We want to query - get a socket
if thread_id in thread_to_sock:
# This thread has a socket assigned
thread_sock = thread_to_sock[thread_id]
else:
# This thread has no socket
if min_sock_id < max_size:
# Create a new socket
thread_sock = min_sock_id
min_sock_id += 1
else:
# Assign existing socket at random
thread_sock = randint(0, max_size - 1)
thread_to_sock[thread_id] = thread_sock
if thread_sock in sockets_in_use:
# print thread_id, 'waits'
q.append(thread_id) # wait
else:
# Query
# print thread_id, 'queries'
threads_with_sockets.add(thread_id)
sockets_in_use.add(thread_sock) # acquire
wake_time = current_time + query_time()
heapq.heappush(runlist, (wake_time, thread_id))
else:
# We finished a query - return the socket
threads_with_sockets.discard(thread_id)
thread_sock = thread_to_sock[thread_id]
sockets_in_use.discard(thread_sock)
# Signal the queue
if broadcast:
while q:
next_thread = q.pop()
heapq.heappush(runlist, (current_time, next_thread))
elif q:
# signal one waiter
next_thread = q.popleft()
heapq.heappush(runlist, (current_time, next_thread))
# Wait until the next time we want to query
wake_time = current_time + intra_query_time()
heapq.heappush(runlist, (wake_time, thread_id)) # query
# print thread_id, 'finishes query'
if not runlist:
print 'DEADLOCKED after', nacquires, 'acquires'
return
# Time until next tick, times number of sockets in use until
# next tick starts
tick_duration = runlist[0][0] - current_time
total_utilization += len(sockets_in_use) * tick_duration
end_time = runlist[0][0]
return total_utilization / float(end_time - start_time)
# A single socket is 100% utilized
print 'simulate(1, 1, 100)', simulate(1, 1, 100)
def main():
# Header
print '%20s %20s' % ('nthreads', 'avg_extra_waiting_pct')
for max_size in [10]:
for nthreads in range(10, 251, 10):
avg_utilization = []
for trial in range(200):
nacquires = 1000
avg_utilization.append(
simulate(nthreads, max_size, nacquires, True))
avg = sum(avg_utilization) / float(len(avg_utilization))
avg_wasted = 1 - avg / float(max_size)
print '%20d %20.3f' % (nthreads, avg_wasted)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.