Skip to content

Instantly share code, notes, and snippets.

@perfecto25
Last active September 20, 2022 03:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save perfecto25/c2689e85238819bf842002ea529b78ce to your computer and use it in GitHub Desktop.
Save perfecto25/c2689e85238819bf842002ea529b78ce to your computer and use it in GitHub Desktop.
iperf3 bandwidth snapshot generator
#!/opt/pyperf/venv/bin/python3
## This runs on each agent, generates bandwidth snapshot
from datetime import datetime
import socket
import json
import os
import sys
import iperf3
from dictor import dictor
from math import trunc
from inspect import getsourcefile
base_dir = os.path.dirname(getsourcefile(lambda: 0))
dt = sys.argv[1]
if not base_dir:
base_dir = "."
if not os.path.exists(base_dir + "/output"):
os.makedirs(base_dir + "/output")
if not os.path.exists(base_dir + "/config.json"):
print(f"config file doesnt exist at {base_dir}/config.json")
sys.exit(1)
with open(f"{base_dir}/config.json") as f:
data = json.load(f)
targets = dictor(data, "targets", checknone=True)
timespan = dictor(data, "timespan", checknone=True)
port = dictor(data, "port", 5201)
hostname = socket.gethostname().split('.', 1)[0]
json_path = base_dir + "/output/snapshot.json"
if __name__ == "__main__":
output = {}
for target in targets:
client = iperf3.Client()
client.duration = int(timespan)
client.port = int(port)
client.server_hostname = target
result = client.run()
if result.error:
print(result.error)
continue
output[target] = trunc(result.sent_Mbps)
client = None
# write back to json
if not os.path.exists(json_path):
with open(json_path, "w+") as f:
data = {}
data[dt] = {}
data[dt] = output
json.dump(data, f)
else:
with open(json_path, "r+") as f:
data = json.load(f)
data[dt] = {}
data[dt] = output
f.seek(0)
json.dump(data, f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment