Skip to content

Instantly share code, notes, and snippets.

@cyberw
Last active July 6, 2021 23:02
Show Gist options
  • Save cyberw/f19da44c796255b5946dc0ba14c3ea1a to your computer and use it in GitHub Desktop.
Save cyberw/f19da44c796255b5946dc0ba14c3ea1a to your computer and use it in GitHub Desktop.
poc user distribution
from collections import defaultdict
from itertools import cycle
from locust import User
def batch_iter(batch_size, iter_):
yield [next(iter_) for _ in range(batch_size)]
def user_gen(user_classes, worker_count):
ensure_all_users_get_picked = []
other_users = []
for u in user_classes:
for i in range(u.weight):
if i == 0:
ensure_all_users_get_picked.append(u.__name__)
else:
other_users.append(u.__name__)
# other_users needs to be shuffled better. should be fairly easy,
# but I couldnt immediately find a good solution and I want to get stuck in the weeds...
weighted_users = ensure_all_users_get_picked + other_users
c = cycle(weighted_users)
while True:
yield next(batch_iter(worker_count, c))
current_user_count = 0
active_users = []
stopped_users = []
def ramp_to(target_user_count, users_on_workers, user_list):
global current_user_count
global active_users
if current_user_count < target_user_count: # ramp up
while True:
while stopped_users: # respawn stopped users if any exist
workername, user = stopped_users.pop()
active_users.append([workername, user])
users_on_workers[workername][user] += 1
current_user_count += 1
if current_user_count >= target_user_count:
return
for users in user_gen(user_list, len(users_on_workers)):
for user in users:
selected_worker = None
for workername, worker in users_on_workers.items():
if not user in worker:
worker[user] = 0
if selected_worker is None or users_on_workers[workername][user] < selected_worker[user]:
selected_worker = worker
selected_workername = workername
# print(f"spawn {user} on {selected_workername}")
active_users.append([selected_workername, user])
users_on_workers[selected_workername][user] += 1
current_user_count += 1
if current_user_count >= target_user_count:
return
else: # ramp down
while current_user_count > target_user_count:
workername, user = active_users.pop()
stopped_users.append([workername, user])
# print(f"stop {user} on {workername}")
users_on_workers[workername][user] -= 1
current_user_count -= 1
def worker_lost(lost_worker, users_on_workers):
global active_users
lost_users = (user for user in active_users if user[0] == lost_worker)
for lost_user in lost_users:
user = lost_user[1]
# this is just duplicated code from ramp_to, should be moved to its own function
selected_worker = None
for workername, worker in users_on_workers.items():
if workername == lost_worker:
continue
if not user in worker:
worker[user] = 0
if selected_worker is None or users_on_workers[workername][user] < selected_worker[user]:
selected_worker = worker
selected_workername = workername
# print(f"respawning user {user} on {selected_workername} (was previously on {lost_worker})")
users_on_workers[selected_workername][user] += 1
lost_user[0] = selected_workername # yes, we're modifying items in the list while iterating over it
del users_on_workers[lost_worker]
class User1(User):
weight = 1
class User2(User):
weight = 2
class User3(User):
weight = 3
users_on_worker = {"worker1": {}, "worker2": {}}
userlist = [User1, User2, User3]
def printstuff():
print(users_on_worker)
user_counts = defaultdict(lambda: 0)
for w in users_on_worker:
for u, usercount in users_on_worker[w].items():
user_counts[u] += usercount
print(dict(user_counts))
print(sum(user_counts.values()))
ramp_to(4, users_on_worker, userlist)
printstuff()
ramp_to(12, users_on_worker, userlist)
printstuff()
ramp_to(1, users_on_worker, userlist)
printstuff()
ramp_to(12, users_on_worker, userlist)
printstuff()
worker_lost("worker1", users_on_worker)
printstuff()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment