Skip to content

Instantly share code, notes, and snippets.

@linuskmr
Created October 10, 2022 14:27
Show Gist options
  • Save linuskmr/e109ea483e0377cd3ddff130ca40afc8 to your computer and use it in GitHub Desktop.
Save linuskmr/e109ea483e0377cd3ddff130ca40afc8 to your computer and use it in GitHub Desktop.
Comparing leaky and token bucket
from datetime import datetime, timedelta
from typing import List, Tuple
from enum import Enum
BUCKET_SIZE = 512
PROCESS_EVERY_MS = 30
# Read dump produced by `tcpdump -q`. Lines look like this:
# 10:45:08.221000 ARP, Request who-has ESP-B1E2A4.fritz.box tell 0.0.0.0, length 46
with open('dump.txt') as file:
dump = file.readlines()
# Remove newline at the end of each line
dump = [line.strip() for line in dump]
# Filter empty lines
dump = [line for line in dump if line]
def get_timestamp(line) -> datetime:
# The first element in the line is always the timestamp, e.g. 10:45:08.221000
time_str = line.split(' ')[0]
time = datetime.strptime(time_str, '%H:%M:%S.%f')
# Replace date with today's date, since it's not contained in the time string
timestamp = time.replace(year=datetime.now().year, month=datetime.now().month, day=datetime.now().day)
return timestamp
class PacketAction(Enum):
INCOMING = 'incoming'
PROCESS = 'process'
# Map each line to its timestamp
dump: List[datetime] = [get_timestamp(line) for line in dump]
start_time: datetime = dump[0]
end_time: datetime = dump[-1]
# Annotate each timestamp that it's an incoming packet
dump: List[Tuple[datetime, PacketAction]] = [(timestamp, PacketAction.INCOMING) for timestamp in dump]
print(f'{len(dump)=}')
# Add a process step every x ms
current_time = start_time
while current_time < end_time:
dump.append((current_time, PacketAction.PROCESS))
current_time += timedelta(milliseconds=PROCESS_EVERY_MS)
# Sort by timestamp
dump = sorted(dump, key=lambda x: x[0])
def leaky_bucket():
dropped = 0
cap = BUCKET_SIZE
size = 0
for timestamp, action in dump:
# print(f'{size=}')
if action == PacketAction.INCOMING:
if size >= cap:
dropped += 1
else:
size += 1
elif action == PacketAction.PROCESS:
if size > 0:
size -= 1
print(f'leaky_bucket {dropped=} {cap=} {size=}')
return dropped
def token_bucket():
dropped = 0
cap = BUCKET_SIZE
tokens = BUCKET_SIZE
for timestamp, action in dump:
if action == PacketAction.INCOMING:
if tokens > 0:
tokens -= 1
else:
dropped += 1
elif action == PacketAction.PROCESS:
if tokens < cap:
tokens += 1
print(f'token_bucket {dropped=} {cap=} {tokens=}')
return dropped
leaky_bucket()
token_bucket()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment