-
-
Save efekarakus/77ba91221a1cbdf4a8db2ba146b91f3d to your computer and use it in GitHub Desktop.
Simulate the multi-call sequential and parallel workflows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import matplotlib.pyplot as plt | |
from dataclasses import dataclass | |
from typing import List, Tuple | |
@dataclass | |
class SimulationParams: | |
dependency_rate_limit: int = 60 # requests per second | |
processing_time_per_call: float = ( | |
0.5 # seconds. The processing time does not matter yet because the service scales infinitely. | |
) | |
num_calls_per_msg: int = ( | |
1 # The number of sequential calls made to a dependency for a single unit of work to be deemed complete. | |
) | |
simulation_duration: int = 10000 | |
def simulate(params: SimulationParams, arrival_rate: float) -> float: | |
"""Simulate the service at a specific arrival rate and return the goodput.""" | |
# Calculate total number of processed messages per second. | |
calls_per_second = arrival_rate * params.num_calls_per_msg | |
# Calculate success probability for each call based on rate limit | |
if calls_per_second > params.dependency_rate_limit: | |
p_success = float(params.dependency_rate_limit) / float(calls_per_second) | |
else: | |
p_success = 1.0 | |
# Goodput is arrival rate times probability all calls succeed | |
p_all_succeed = p_success**params.num_calls_per_msg | |
return arrival_rate * p_all_succeed | |
def plot( | |
goodputs: List[Tuple[int, List[Tuple[float, float]]]], params: SimulationParams | |
): | |
"""Plot the simulation results.""" | |
plt.rcParams["font.family"] = "JetBrains Mono" | |
plt.rcParams["axes.spines.top"] = False | |
plt.rcParams["axes.spines.right"] = False | |
plt.figure(figsize=(10, 6)) | |
colors = ["#1b9e77", "#d95f02", "#7570b3", "#e7298a"] | |
for i, (num_calls, results) in enumerate(goodputs): | |
arrival_rates, goodputs = zip(*results) | |
plt.plot( | |
arrival_rates, | |
goodputs, | |
linewidth=2.0, | |
label=f"Goodput n={num_calls}", | |
alpha=0.8, | |
color=colors[i], | |
) | |
# Add reference line for rate limit | |
plt.axhline( | |
y=params.dependency_rate_limit, | |
color="#666666", | |
linestyle=":", | |
linewidth=0.8, | |
alpha=0.5, | |
label=f"Dependency Rate Limit ({params.dependency_rate_limit}/s)", | |
) | |
plt.xlabel("Arrival rate (requests/second)") | |
plt.ylabel("Goodput (successfully processed messages/second)") | |
plt.grid(True, color="#E6E6E6", linestyle="-", linewidth=0.3, alpha=0.5) | |
plt.legend(frameon=False) | |
# Ensure axes start at 0 | |
plt.ylim(bottom=0) | |
plt.xlim(left=0) | |
plt.tight_layout() | |
plt.show() | |
def main(): | |
goodputs = [] | |
for n_calls in range(1, 5): | |
params = SimulationParams(num_calls_per_msg=n_calls) | |
arrival_rates = range(0, 121) | |
results = [] | |
for rate in arrival_rates: | |
goodput = simulate(params, rate) | |
results.append((rate, goodput)) | |
goodputs.append((n_calls, results)) | |
plot(goodputs, params) | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import random | |
import matplotlib.pyplot as plt | |
from dataclasses import dataclass | |
from typing import List | |
@dataclass | |
class SimulationParams: | |
dependency_rate_limit: int = 60 # requests per second | |
processing_time_per_call: float = ( | |
0.5 # seconds. The processing time does not matter yet because the service scales infinitely. | |
) | |
num_calls_per_msg: int = ( | |
1 # The number of sequential calls made to a dependency for a single unit of work to be deemed complete. | |
) | |
simulation_duration: int = 10000 | |
def simulate_parallel(params: SimulationParams, arrival_rate: float) -> float: | |
"""Simulate the service at a specific arrival rate and return the goodput.""" | |
# Calculate total number of processed messages per second. | |
calls_per_second = arrival_rate * params.num_calls_per_msg | |
# Calculate success probability for each call based on rate limit | |
if calls_per_second > params.dependency_rate_limit: | |
p_success = float(params.dependency_rate_limit) / float(calls_per_second) | |
else: | |
p_success = 1.0 | |
# Goodput is arrival rate times probability all calls succeed | |
p_all_succeed = p_success**params.num_calls_per_msg | |
return arrival_rate * p_all_succeed | |
@dataclass | |
class World: | |
concurrency: int # The number of concurrent requests going to hit the dependency | |
remaining_rate: int # The remaining capacity from the dependency | |
def p_success(self) -> float: | |
"""The probability of a single request succeeds.""" | |
if self.remaining_rate == 0: | |
return 0.0 | |
return min(float(self.remaining_rate) / float(self.concurrency), 1.0) | |
def __repr__(self) -> str: | |
return f"World(concurrency={self.concurrency}, remaining_rate={self.remaining_rate})" | |
class Work: | |
def __init__(self, n: int = 3): | |
"""Initialize a Work object that tracks a sequential sequence of calls.""" | |
self._remaining = n | |
self._stopped = False | |
def tick(self, world: World) -> bool: | |
""" | |
Returns True if there are remaining calls to be processed. | |
Returns False if the current call did not succeed (the probability of a call to suceed is equivalent to world.p_success()) | |
or if there are no more remaining calls to process. | |
""" | |
if self._stopped or self._remaining <= 0: | |
return False | |
# Check if current call succeeds based on probability | |
if random.random() > world.p_success(): | |
self._stopped = True | |
return False | |
self._remaining -= 1 | |
return self._remaining > 0 | |
def is_success(self): | |
"""Returns True when the Work was completed successfully.""" | |
return self._remaining == 0 and not self._stopped | |
class Service: | |
def __init__(self, params: SimulationParams, arrival_rate: int): | |
"""Initialize a Service object that will consume events from the queue.""" | |
self.queue: List[Work] = [] | |
self.r = arrival_rate | |
self.capacity = params.dependency_rate_limit | |
self.n = params.num_calls_per_msg | |
self.calls_per_tick = int( | |
1.0 | |
/ params.processing_time_per_call # we assume processing_time_per_call is less than 1.0 | |
) | |
def tick(self) -> int: | |
"""Simulates the service processing for one second.""" | |
self.queue.extend( | |
[Work(self.n) for _ in range(self.r)] | |
) # Add new work to be processed. | |
goodput = 0 | |
prev_num_calls = 0 | |
for _ in range(min(self.calls_per_tick, self.n)): | |
# We need to progress on work items for as many calls that can happen in this tick. | |
state = World( | |
concurrency=len(self.queue), | |
remaining_rate=max(0, self.capacity - prev_num_calls), | |
) | |
completed = [] | |
for i, work in enumerate(self.queue): | |
if work.tick(state): | |
# There are more calls to be done for this work. | |
continue | |
if work.is_success(): | |
goodput += 1 | |
completed.append(i) | |
prev_num_calls += len(self.queue) | |
for i in reversed(completed): | |
self.queue.pop(i) | |
return goodput | |
def simulate_sequential(params: SimulationParams, arrival_rate: int) -> int: | |
"""Simulate the service at a specific arrival rate and return the amortized goodput.""" | |
success = 0 | |
svc = Service(params, arrival_rate) | |
for _ in range(params.simulation_duration): | |
success += svc.tick() | |
return float(success) / params.simulation_duration | |
def main(): | |
n = 3 | |
capacity = 60 | |
fast_processing_time = 0.1 | |
slow_processing_time = 0.5 | |
goodputs_parallel = [] | |
goodputs_sequential_fast = [] | |
goodputs_sequential_slow = [] | |
for arrival_rate in range(0, 121): | |
goodputs_parallel.append( | |
( | |
arrival_rate, | |
simulate_parallel( | |
SimulationParams( | |
num_calls_per_msg=n, | |
dependency_rate_limit=capacity, | |
processing_time_per_call=fast_processing_time, | |
), | |
arrival_rate, | |
), | |
) | |
) | |
goodputs_sequential_fast.append( | |
( | |
arrival_rate, | |
simulate_sequential( | |
SimulationParams( | |
num_calls_per_msg=n, | |
dependency_rate_limit=capacity, | |
processing_time_per_call=fast_processing_time, | |
simulation_duration=1000, | |
), | |
arrival_rate, | |
), | |
) | |
) | |
goodputs_sequential_slow.append( | |
( | |
arrival_rate, | |
simulate_sequential( | |
SimulationParams( | |
num_calls_per_msg=n, | |
dependency_rate_limit=capacity, | |
processing_time_per_call=slow_processing_time, | |
simulation_duration=10000, | |
), | |
arrival_rate, | |
), | |
) | |
) | |
plots = [ | |
("Parallel (W = 100ms)", "#7570b3", goodputs_parallel), | |
("Sequential (W = 100ms)", "#3E5129", goodputs_sequential_fast), | |
("Sequential (W = 500ms)", "#66a61e", goodputs_sequential_slow), | |
] | |
# Plot the goodputs | |
plt.rcParams["font.family"] = "JetBrains Mono" | |
plt.rcParams["axes.spines.top"] = False | |
plt.rcParams["axes.spines.right"] = False | |
plt.figure(figsize=(10, 6)) | |
for title, color, dat in plots: | |
arrival_rates, goodputs = zip(*dat) | |
plt.plot( | |
arrival_rates, | |
goodputs, | |
linewidth=2.0, | |
label=title, | |
alpha=0.8, | |
color=color, | |
) | |
# Add reference line for rate limit | |
plt.axhline( | |
y=capacity, | |
color="#666666", | |
linestyle=":", | |
linewidth=0.8, | |
alpha=0.5, | |
label=f"Dependency Rate Limit ({capacity}/s)", | |
) | |
plt.xlabel("Arrival rate (requests/second)") | |
plt.ylabel("Amortized goodput (successfully processed messages/second)") | |
plt.grid(True, color="#E6E6E6", linestyle="-", linewidth=0.3, alpha=0.5) | |
plt.legend(frameon=False) | |
plt.text( | |
0.735, | |
0.62, | |
"n = 3", | |
transform=plt.gca().transAxes, | |
fontsize=10, | |
verticalalignment="top", | |
) | |
plt.ylim(bottom=0) | |
plt.xlim(left=0) | |
plt.tight_layout() | |
plt.show() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment