Skip to content

Instantly share code, notes, and snippets.

@efekarakus
Created January 9, 2025 16:50
Show Gist options
  • Save efekarakus/77ba91221a1cbdf4a8db2ba146b91f3d to your computer and use it in GitHub Desktop.
Save efekarakus/77ba91221a1cbdf4a8db2ba146b91f3d to your computer and use it in GitHub Desktop.
Simulate the multi-call sequential and parallel workflows
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import List, Tuple
@dataclass
class SimulationParams:
dependency_rate_limit: int = 60 # requests per second
processing_time_per_call: float = (
0.5 # seconds. The processing time does not matter yet because the service scales infinitely.
)
num_calls_per_msg: int = (
1 # The number of sequential calls made to a dependency for a single unit of work to be deemed complete.
)
simulation_duration: int = 10000
def simulate(params: SimulationParams, arrival_rate: float) -> float:
"""Simulate the service at a specific arrival rate and return the goodput."""
# Calculate total number of processed messages per second.
calls_per_second = arrival_rate * params.num_calls_per_msg
# Calculate success probability for each call based on rate limit
if calls_per_second > params.dependency_rate_limit:
p_success = float(params.dependency_rate_limit) / float(calls_per_second)
else:
p_success = 1.0
# Goodput is arrival rate times probability all calls succeed
p_all_succeed = p_success**params.num_calls_per_msg
return arrival_rate * p_all_succeed
def plot(
goodputs: List[Tuple[int, List[Tuple[float, float]]]], params: SimulationParams
):
"""Plot the simulation results."""
plt.rcParams["font.family"] = "JetBrains Mono"
plt.rcParams["axes.spines.top"] = False
plt.rcParams["axes.spines.right"] = False
plt.figure(figsize=(10, 6))
colors = ["#1b9e77", "#d95f02", "#7570b3", "#e7298a"]
for i, (num_calls, results) in enumerate(goodputs):
arrival_rates, goodputs = zip(*results)
plt.plot(
arrival_rates,
goodputs,
linewidth=2.0,
label=f"Goodput n={num_calls}",
alpha=0.8,
color=colors[i],
)
# Add reference line for rate limit
plt.axhline(
y=params.dependency_rate_limit,
color="#666666",
linestyle=":",
linewidth=0.8,
alpha=0.5,
label=f"Dependency Rate Limit ({params.dependency_rate_limit}/s)",
)
plt.xlabel("Arrival rate (requests/second)")
plt.ylabel("Goodput (successfully processed messages/second)")
plt.grid(True, color="#E6E6E6", linestyle="-", linewidth=0.3, alpha=0.5)
plt.legend(frameon=False)
# Ensure axes start at 0
plt.ylim(bottom=0)
plt.xlim(left=0)
plt.tight_layout()
plt.show()
def main():
goodputs = []
for n_calls in range(1, 5):
params = SimulationParams(num_calls_per_msg=n_calls)
arrival_rates = range(0, 121)
results = []
for rate in arrival_rates:
goodput = simulate(params, rate)
results.append((rate, goodput))
goodputs.append((n_calls, results))
plot(goodputs, params)
if __name__ == "__main__":
main()
import random
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import List
@dataclass
class SimulationParams:
dependency_rate_limit: int = 60 # requests per second
processing_time_per_call: float = (
0.5 # seconds. The processing time does not matter yet because the service scales infinitely.
)
num_calls_per_msg: int = (
1 # The number of sequential calls made to a dependency for a single unit of work to be deemed complete.
)
simulation_duration: int = 10000
def simulate_parallel(params: SimulationParams, arrival_rate: float) -> float:
"""Simulate the service at a specific arrival rate and return the goodput."""
# Calculate total number of processed messages per second.
calls_per_second = arrival_rate * params.num_calls_per_msg
# Calculate success probability for each call based on rate limit
if calls_per_second > params.dependency_rate_limit:
p_success = float(params.dependency_rate_limit) / float(calls_per_second)
else:
p_success = 1.0
# Goodput is arrival rate times probability all calls succeed
p_all_succeed = p_success**params.num_calls_per_msg
return arrival_rate * p_all_succeed
@dataclass
class World:
concurrency: int # The number of concurrent requests going to hit the dependency
remaining_rate: int # The remaining capacity from the dependency
def p_success(self) -> float:
"""The probability of a single request succeeds."""
if self.remaining_rate == 0:
return 0.0
return min(float(self.remaining_rate) / float(self.concurrency), 1.0)
def __repr__(self) -> str:
return f"World(concurrency={self.concurrency}, remaining_rate={self.remaining_rate})"
class Work:
def __init__(self, n: int = 3):
"""Initialize a Work object that tracks a sequential sequence of calls."""
self._remaining = n
self._stopped = False
def tick(self, world: World) -> bool:
"""
Returns True if there are remaining calls to be processed.
Returns False if the current call did not succeed (the probability of a call to suceed is equivalent to world.p_success())
or if there are no more remaining calls to process.
"""
if self._stopped or self._remaining <= 0:
return False
# Check if current call succeeds based on probability
if random.random() > world.p_success():
self._stopped = True
return False
self._remaining -= 1
return self._remaining > 0
def is_success(self):
"""Returns True when the Work was completed successfully."""
return self._remaining == 0 and not self._stopped
class Service:
def __init__(self, params: SimulationParams, arrival_rate: int):
"""Initialize a Service object that will consume events from the queue."""
self.queue: List[Work] = []
self.r = arrival_rate
self.capacity = params.dependency_rate_limit
self.n = params.num_calls_per_msg
self.calls_per_tick = int(
1.0
/ params.processing_time_per_call # we assume processing_time_per_call is less than 1.0
)
def tick(self) -> int:
"""Simulates the service processing for one second."""
self.queue.extend(
[Work(self.n) for _ in range(self.r)]
) # Add new work to be processed.
goodput = 0
prev_num_calls = 0
for _ in range(min(self.calls_per_tick, self.n)):
# We need to progress on work items for as many calls that can happen in this tick.
state = World(
concurrency=len(self.queue),
remaining_rate=max(0, self.capacity - prev_num_calls),
)
completed = []
for i, work in enumerate(self.queue):
if work.tick(state):
# There are more calls to be done for this work.
continue
if work.is_success():
goodput += 1
completed.append(i)
prev_num_calls += len(self.queue)
for i in reversed(completed):
self.queue.pop(i)
return goodput
def simulate_sequential(params: SimulationParams, arrival_rate: int) -> int:
"""Simulate the service at a specific arrival rate and return the amortized goodput."""
success = 0
svc = Service(params, arrival_rate)
for _ in range(params.simulation_duration):
success += svc.tick()
return float(success) / params.simulation_duration
def main():
n = 3
capacity = 60
fast_processing_time = 0.1
slow_processing_time = 0.5
goodputs_parallel = []
goodputs_sequential_fast = []
goodputs_sequential_slow = []
for arrival_rate in range(0, 121):
goodputs_parallel.append(
(
arrival_rate,
simulate_parallel(
SimulationParams(
num_calls_per_msg=n,
dependency_rate_limit=capacity,
processing_time_per_call=fast_processing_time,
),
arrival_rate,
),
)
)
goodputs_sequential_fast.append(
(
arrival_rate,
simulate_sequential(
SimulationParams(
num_calls_per_msg=n,
dependency_rate_limit=capacity,
processing_time_per_call=fast_processing_time,
simulation_duration=1000,
),
arrival_rate,
),
)
)
goodputs_sequential_slow.append(
(
arrival_rate,
simulate_sequential(
SimulationParams(
num_calls_per_msg=n,
dependency_rate_limit=capacity,
processing_time_per_call=slow_processing_time,
simulation_duration=10000,
),
arrival_rate,
),
)
)
plots = [
("Parallel (W = 100ms)", "#7570b3", goodputs_parallel),
("Sequential (W = 100ms)", "#3E5129", goodputs_sequential_fast),
("Sequential (W = 500ms)", "#66a61e", goodputs_sequential_slow),
]
# Plot the goodputs
plt.rcParams["font.family"] = "JetBrains Mono"
plt.rcParams["axes.spines.top"] = False
plt.rcParams["axes.spines.right"] = False
plt.figure(figsize=(10, 6))
for title, color, dat in plots:
arrival_rates, goodputs = zip(*dat)
plt.plot(
arrival_rates,
goodputs,
linewidth=2.0,
label=title,
alpha=0.8,
color=color,
)
# Add reference line for rate limit
plt.axhline(
y=capacity,
color="#666666",
linestyle=":",
linewidth=0.8,
alpha=0.5,
label=f"Dependency Rate Limit ({capacity}/s)",
)
plt.xlabel("Arrival rate (requests/second)")
plt.ylabel("Amortized goodput (successfully processed messages/second)")
plt.grid(True, color="#E6E6E6", linestyle="-", linewidth=0.3, alpha=0.5)
plt.legend(frameon=False)
plt.text(
0.735,
0.62,
"n = 3",
transform=plt.gca().transAxes,
fontsize=10,
verticalalignment="top",
)
plt.ylim(bottom=0)
plt.xlim(left=0)
plt.tight_layout()
plt.show()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment