public
Last active

Pre-RUBY-556-fix Ruby driver pool simulation, measuring socket wastage.

  • Download Gist
rb_pool.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
import collections
import heapq
import time
from random import seed, randint
 
# Configuration
seed(time.time())
 
 
def query_time():
return randint(0, 20)
 
 
def intra_query_time():
# For now, threads want to query again as soon as they've checked in their
# socket after the previous query
return 1
# return randint(0, 10)
 
 
def simulate(nthreads, max_size, max_acquires, broadcast=False):
"""Returns average socket utilization: sockets in use per unit time.
 
:Parameters:
- `nthreads`: Number of threads contending for sockets
- `max_size`: Max number of sockets to create
- `max_acquires`: How many acquire operations before simulation stops
- `broadcast`: If True, include the RUBY-556 fix
"""
# Simulation state
thread_to_sock = {}
threads_with_sockets = set()
sockets_in_use = set()
q = collections.deque()
nacquires = 0
 
# Threads in use per unit time
total_utilization = 0
 
# print (
# 'nthreads', nthreads, 'max_size', max_size, 'max_tries', max_acquires,
# 'rseed', rseed)
 
# List of (wake_time, thread_id)
runlist = [(0, thread_id) for thread_id in range(nthreads)]
current_time = start_time = 0
 
min_sock_id = 0
 
while nacquires < max_acquires:
current_time, thread_id = heapq.heappop(runlist)
 
if thread_id not in threads_with_sockets:
nacquires += 1
 
# We want to query - get a socket
if thread_id in thread_to_sock:
# This thread has a socket assigned
thread_sock = thread_to_sock[thread_id]
else:
# This thread has no socket
if min_sock_id < max_size:
# Create a new socket
thread_sock = min_sock_id
min_sock_id += 1
else:
# Assign existing socket at random
thread_sock = randint(0, max_size - 1)
 
thread_to_sock[thread_id] = thread_sock
 
if thread_sock in sockets_in_use:
# print thread_id, 'waits'
q.append(thread_id) # wait
else:
# Query
# print thread_id, 'queries'
threads_with_sockets.add(thread_id)
sockets_in_use.add(thread_sock) # acquire
wake_time = current_time + query_time()
heapq.heappush(runlist, (wake_time, thread_id))
else:
# We finished a query - return the socket
threads_with_sockets.discard(thread_id)
thread_sock = thread_to_sock[thread_id]
sockets_in_use.discard(thread_sock)
 
# Signal the queue
if broadcast:
while q:
next_thread = q.pop()
heapq.heappush(runlist, (current_time, next_thread))
elif q:
# signal one waiter
next_thread = q.popleft()
heapq.heappush(runlist, (current_time, next_thread))
 
# Wait until the next time we want to query
wake_time = current_time + intra_query_time()
heapq.heappush(runlist, (wake_time, thread_id)) # query
 
# print thread_id, 'finishes query'
 
if not runlist:
print 'DEADLOCKED after', nacquires, 'acquires'
return
 
# Time until next tick, times number of sockets in use until
# next tick starts
tick_duration = runlist[0][0] - current_time
total_utilization += len(sockets_in_use) * tick_duration
 
end_time = runlist[0][0]
return total_utilization / float(end_time - start_time)
 
 
# A single socket is 100% utilized
print 'simulate(1, 1, 100)', simulate(1, 1, 100)
 
 
def main():
# Header
print '%20s %20s' % ('nthreads', 'avg_extra_waiting_pct')
 
for max_size in [10]:
for nthreads in range(10, 251, 10):
avg_utilization = []
for trial in range(200):
nacquires = 1000
avg_utilization.append(
simulate(nthreads, max_size, nacquires, True))
 
avg = sum(avg_utilization) / float(len(avg_utilization))
avg_wasted = 1 - avg / float(max_size)
 
print '%20d %20.3f' % (nthreads, avg_wasted)
 
 
main()

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.