Skip to content

Instantly share code, notes, and snippets.

@Staphylococcus
Created September 1, 2025 18:51
Show Gist options
  • Select an option

  • Save Staphylococcus/4be3e5c79e95ff85189bdb8e6200b738 to your computer and use it in GitHub Desktop.

Select an option

Save Staphylococcus/4be3e5c79e95ff85189bdb8e6200b738 to your computer and use it in GitHub Desktop.
Domesticated rocks doing tricks
#!/usr/bin/env python3
import os, time, math, json, random, sys, resource
def spin_cpu(seconds):
deadline = time.perf_counter() + max(0.0, seconds)
x = 0.0
while time.perf_counter() < deadline:
x += 1.0
return x
def main():
policy = os.getenv("POLICY_MODE", "OFF").upper()
epoch_sec = float(os.getenv("EPOCH_SEC", "20"))
tokens = int(os.getenv("TOKENS", "1000"))
base_cost = float(os.getenv("BASE_COST", "0.002")) # seconds CPU per token at norm=1
amp = float(os.getenv("AMP", "0.8")) # 0..1
freq = float(os.getenv("FREQ", "0.2")) # Hz
t0 = float(os.getenv("T0", str(time.time())))
phi_env = float(os.getenv("PHI_ENV", "0.0"))
phi_scramble = float(os.getenv("PHI_SCRAMBLE", str(random.random()*2*math.pi)))
gate_thresh = float(os.getenv("GATE_FRAC", "0.7"))
cpu_budget = float(os.getenv("CPU_BUDGET_SEC", "0")) # 0 => disabled (token-budget mode)
start = time.time()
done = 0
# Select phase to use for gating
if policy == "ON":
phi_used = phi_env
elif policy == "SCRAMBLED":
phi_used = phi_scramble
else:
phi_used = None
# Work loop
predicted_cpu_used = 0.0
while True:
now = time.time()
if (now - start) >= epoch_sec:
break
if cpu_budget > 0 and predicted_cpu_used >= cpu_budget:
break
phase_true = 2*math.pi*freq*(now - t0) + phi_env
norm = 1.0 + amp * math.sin(phase_true)
gate_open = True
if phi_used is not None:
est = 1.0 + amp * math.sin(2*math.pi*freq*(now - t0) + phi_used)
gate_open = est <= gate_thresh
if gate_open:
spin_cpu(base_cost * norm)
done += 1
predicted_cpu_used += base_cost * norm
if cpu_budget <= 0 and done >= tokens:
time.sleep(0.002)
else:
time.sleep(0.001)
u = resource.getrusage(resource.RUSAGE_SELF)
c = resource.getrusage(resource.RUSAGE_CHILDREN)
cpu_self = (u.ru_utime + u.ru_stime)
cpu_children = (c.ru_utime + c.ru_stime)
print(json.dumps({
"tokens_done": int(done),
"epoch_sec": float(epoch_sec),
"policy": policy,
"cpu_time_sec_self": float(cpu_self),
"cpu_time_sec_children": float(cpu_children)
}))
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
Usage (example):
python triage_harness.py \
--epochs 24 --epoch-sec 5 --tokens 10000 --cpus 1 \
--freq 0.4 --amp 0.7 --base-cost 0.002 --gate-frac 0.7 \
--seed 7 --taskset-core 0 --cpu-budget-sec 2.5 --worker /path/to/gate_worker.py
"""
import argparse, os, time, json, math, random, subprocess, signal, csv, statistics, sys, platform
CLK_TCK = os.sysconf(os.sysconf_names['SC_CLK_TCK']) if hasattr(os, 'sysconf') else 100
def read_proc_cpu(pid):
try:
with open(f"/proc/{pid}/stat", "r") as f:
fields = f.read().split()
utime = float(fields[13])
stime = float(fields[14])
return (utime + stime) / CLK_TCK
except Exception:
return None
def percentile(values, p):
vals = sorted(values)
if not vals:
return None
k = (len(vals) - 1) * (p / 100.0)
f = int(math.floor(k))
c = int(math.ceil(k))
if f == c:
return vals[f]
d0 = vals[f] * (c - k)
d1 = vals[c] * (k - f)
return d0 + d1
def run_epoch(worker_path, policy, args, phi_env, phi_scramble):
env = os.environ.copy()
env.update({
"POLICY_MODE": policy,
"EPOCH_SEC": str(args.epoch_sec),
"TOKENS": str(args.tokens),
"BASE_COST": str(args.base_cost),
"AMP": str(args.amp),
"FREQ": str(args.freq),
"T0": str(args.t0),
"PHI_ENV": str(phi_env),
"PHI_SCRAMBLE": str(phi_scramble),
"GATE_FRAC": str(args.gate_frac),
"CPU_BUDGET_SEC": str(args.cpu_budget_sec),
})
cmd = [sys.executable, worker_path]
if args.taskset_core is not None:
if platform.system() == "Linux":
cmd = ["taskset", "-c", str(args.taskset_core)] + cmd
else:
print(f"[warn] taskset not available on {platform.system()}, ignoring --taskset-core", file=sys.stderr)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=os.setsid)
start_cpu = read_proc_cpu(proc.pid) or 0.0
try:
try:
out, err = proc.communicate(timeout=args.epoch_sec + 5)
except subprocess.TimeoutExpired:
os.killpg(proc.pid, signal.SIGINT)
try:
out, err = proc.communicate(timeout=3)
except subprocess.TimeoutExpired:
os.killpg(proc.pid, signal.SIGKILL)
out, err = proc.communicate(timeout=2)
finally:
end_cpu = read_proc_cpu(proc.pid) or start_cpu
cpu_proc = max(0.0, end_cpu - start_cpu)
tokens_done = 0
cpu_self = None
cpu_children = None
try:
line = out.strip().splitlines()[-1] if out else ""
obj = json.loads(line) if line else {}
tokens_done = int(obj.get("tokens_done", 0))
if "cpu_time_sec_self" in obj:
cpu_self = float(obj["cpu_time_sec_self"])
if "cpu_time_sec_children" in obj:
cpu_children = float(obj["cpu_time_sec_children"])
except Exception:
pass
x_local = float(args.cpus) * float(args.epoch_sec)
upper = x_local * 1.2
cpu_worker = None
if cpu_self is not None:
cpu_worker = cpu_self + (cpu_children or 0.0)
cpu_sec = cpu_proc
if cpu_worker is not None and 0.0 < cpu_worker <= upper:
cpu_sec = cpu_worker
else:
if cpu_sec > upper:
cpu_sec = upper
if tokens_done > 0 and cpu_sec == 0.0:
print(f"[warn] cpu_sec=0 with tokens_done={tokens_done}; using epsilon", file=sys.stderr)
cpu_sec = 1e-6
return tokens_done, cpu_sec, out, err
def bootstrap_diff_of_means(A, B, iters=5000, seed=123):
rnd = random.Random(seed)
if not A or not B:
return (0.0, 0.0, 0.0)
boots = []
for _ in range(iters):
mA = sum(A[rnd.randrange(len(A))] for _ in range(len(A))) / len(A)
mB = sum(B[rnd.randrange(len(B))] for _ in range(len(B))) / len(B)
boots.append(mA - mB)
boots.sort()
mean_diff = sum(boots) / len(boots)
lower = boots[int(0.025 * iters)]
upper = boots[int(0.975 * iters)]
return mean_diff, lower, upper
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--epochs", type=int, default=12)
ap.add_argument("--epoch-sec", type=float, default=20.0)
ap.add_argument("--tokens", type=int, default=2000)
ap.add_argument("--cpus", type=float, default=1.0)
ap.add_argument("--freq", type=float, default=0.3)
ap.add_argument("--amp", type=float, default=0.7)
ap.add_argument("--base-cost", type=float, default=0.001)
ap.add_argument("--gate-frac", type=float, default=0.7)
ap.add_argument("--seed", type=int, default=7)
ap.add_argument("--taskset-core", type=int, default=None)
ap.add_argument("--out-prefix", default="triage_")
ap.add_argument("--worker", default=None)
ap.add_argument("--cpu-budget-sec", type=float, default=0.0)
args = ap.parse_args()
random.seed(args.seed)
args.t0 = time.time() + 1.0
phi_env = random.random() * 2*math.pi
worker_path = args.worker or os.path.join(os.path.dirname(__file__), "gate_worker.py")
if not os.path.exists(worker_path):
print("gate_worker.py not found; pass --worker", file=sys.stderr)
sys.exit(2)
arms = ["ON", "SCRAMBLED", "OFF"]
blocks = args.epochs // 3
remainder = args.epochs % 3
schedule = []
for _ in range(blocks):
b = arms[:]
random.shuffle(b)
schedule.extend(b)
if remainder:
b = arms[:]
random.shuffle(b)
schedule.extend(b[:remainder])
results = []
print(f"# Three-arm RCT v2 | epochs={args.epochs}, sec={args.epoch_sec}, tokens={args.tokens}, cpus={args.cpus}")
for i, arm in enumerate(schedule, 1):
phi_scramble = random.random() * 2*math.pi
tokens_done, cpu_sec, out, err = run_epoch(worker_path, arm, args, phi_env, phi_scramble)
X_local = args.cpus * args.epoch_sec
Y = tokens_done / args.tokens # bounded [0,1]
eta_capture = min(cpu_sec, X_local) / X_local if X_local > 0 else 0.0
results.append({
"epoch": i, "arm": arm, "tokens_done": tokens_done, "Y": Y,
"cpu_sec": cpu_sec, "X_local": X_local, "eta_capture": eta_capture
})
tpc = (tokens_done / cpu_sec) if cpu_sec > 1e-9 else float('inf')
print(f"epoch {i:02d} arm={arm:<10} tokens={tokens_done:5d}/{args.tokens} Y={Y:.3f} cpu={cpu_sec:.3f}s tok/CPU={tpc if tpc!=float('inf') else 0.0:.1f}")
# Compute tokens per CPU-sec
tokens_per_cpu = [(r["tokens_done"] / r["cpu_sec"]) if r["cpu_sec"] > 1e-9 else None for r in results]
# Robust normalization base: 95th percentile, excluding pathological values
valid_for_norm = [v for v in tokens_per_cpu if (v is not None and v > 0 and v < 1e7)]
robust_base = percentile(valid_for_norm, 95) if valid_for_norm else (max(valid_for_norm) if valid_for_norm else 1.0)
# Eta components
for r, yps in zip(results, tokens_per_cpu):
eta_convert = 0.0
if yps is not None and robust_base > 0:
eta_convert = min(1.0, yps / robust_base)
r["eta_convert"] = eta_convert
r["eta_chain"] = r["eta_capture"] * eta_convert
# Group by arm
by_arm = {"ON": [], "SCRAMBLED": [], "OFF": []}
for r in results:
by_arm[r["arm"]].append(r)
# DI on Y (tokens fraction)
DI_on_off = bootstrap_diff_of_means([r["Y"] for r in by_arm["ON"]],
[r["Y"] for r in by_arm["OFF"]])
DI_on_scr = bootstrap_diff_of_means([r["Y"] for r in by_arm["ON"]],
[r["Y"] for r in by_arm["SCRAMBLED"]])
# DI on tokens per CPU-sec (efficiency outcome)
eff_ON = [ (r["tokens_done"]/r["cpu_sec"]) for r in by_arm["ON"] if r["cpu_sec"] > 1e-3 ]
eff_OFF = [ (r["tokens_done"]/r["cpu_sec"]) for r in by_arm["OFF"] if r["cpu_sec"] > 1e-3 ]
eff_SCR = [ (r["tokens_done"]/r["cpu_sec"]) for r in by_arm["SCRAMBLED"] if r["cpu_sec"] > 1e-3 ]
DIeff_on_off = bootstrap_diff_of_means(eff_ON, eff_OFF)
DIeff_on_scr = bootstrap_diff_of_means(eff_ON, eff_SCR)
eta_chain_mean = statistics.mean([r["eta_chain"] for r in results]) if results else 0.0
Aidx_on_off = (DI_on_off[0] if DI_on_off[0] > 0 else 0.0) * eta_chain_mean
Aidx_on_scr = (DI_on_scr[0] if DI_on_scr[0] > 0 else 0.0) * eta_chain_mean
# Write CSV
csv_path = f"{args.out_prefix}results.csv"
with open(csv_path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=list(results[0].keys()))
w.writeheader()
for r in results:
w.writerow(r)
summary = {
"epochs": args.epochs,
"epoch_sec": args.epoch_sec,
"tokens": args.tokens,
"cpus": args.cpus,
"freq": args.freq,
"amp": args.amp,
"base_cost": args.base_cost,
"gate_frac": args.gate_frac,
"cpu_budget_sec": args.cpu_budget_sec,
"robust_norm_base_tokens_per_cpu_p95": robust_base,
"dropped_for_norm": len(tokens_per_cpu) - len(valid_for_norm),
"DI_Y_ON_minus_OFF": {"mean": DI_on_off[0], "CI95": [DI_on_off[1], DI_on_off[2]]},
"DI_Y_ON_minus_SCR": {"mean": DI_on_scr[0], "CI95": [DI_on_scr[1], DI_on_scr[2]]},
"DI_eff_ON_minus_OFF": {"mean": DIeff_on_off[0], "CI95": [DIeff_on_off[1], DIeff_on_off[2]]},
"DI_eff_ON_minus_SCR": {"mean": DIeff_on_scr[0], "CI95": [DIeff_on_scr[1], DIeff_on_scr[2]]},
"eta_chain_mean": eta_chain_mean,
"A_index_ON_vs_OFF": Aidx_on_off,
"A_index_ON_vs_SCR": Aidx_on_scr,
}
json_path = f"{args.out_prefix}summary.json"
with open(json_path, "w") as f:
json.dump(summary, f, indent=2)
print("# Summary:", json.dumps(summary, indent=2))
print(f"# Wrote {csv_path} and {json_path}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment