Created
September 1, 2025 18:51
-
-
Save Staphylococcus/4be3e5c79e95ff85189bdb8e6200b738 to your computer and use it in GitHub Desktop.
Domesticated rocks doing tricks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import os, time, math, json, random, sys, resource | |
| def spin_cpu(seconds): | |
| deadline = time.perf_counter() + max(0.0, seconds) | |
| x = 0.0 | |
| while time.perf_counter() < deadline: | |
| x += 1.0 | |
| return x | |
| def main(): | |
| policy = os.getenv("POLICY_MODE", "OFF").upper() | |
| epoch_sec = float(os.getenv("EPOCH_SEC", "20")) | |
| tokens = int(os.getenv("TOKENS", "1000")) | |
| base_cost = float(os.getenv("BASE_COST", "0.002")) # seconds CPU per token at norm=1 | |
| amp = float(os.getenv("AMP", "0.8")) # 0..1 | |
| freq = float(os.getenv("FREQ", "0.2")) # Hz | |
| t0 = float(os.getenv("T0", str(time.time()))) | |
| phi_env = float(os.getenv("PHI_ENV", "0.0")) | |
| phi_scramble = float(os.getenv("PHI_SCRAMBLE", str(random.random()*2*math.pi))) | |
| gate_thresh = float(os.getenv("GATE_FRAC", "0.7")) | |
| cpu_budget = float(os.getenv("CPU_BUDGET_SEC", "0")) # 0 => disabled (token-budget mode) | |
| start = time.time() | |
| done = 0 | |
| # Select phase to use for gating | |
| if policy == "ON": | |
| phi_used = phi_env | |
| elif policy == "SCRAMBLED": | |
| phi_used = phi_scramble | |
| else: | |
| phi_used = None | |
| # Work loop | |
| predicted_cpu_used = 0.0 | |
| while True: | |
| now = time.time() | |
| if (now - start) >= epoch_sec: | |
| break | |
| if cpu_budget > 0 and predicted_cpu_used >= cpu_budget: | |
| break | |
| phase_true = 2*math.pi*freq*(now - t0) + phi_env | |
| norm = 1.0 + amp * math.sin(phase_true) | |
| gate_open = True | |
| if phi_used is not None: | |
| est = 1.0 + amp * math.sin(2*math.pi*freq*(now - t0) + phi_used) | |
| gate_open = est <= gate_thresh | |
| if gate_open: | |
| spin_cpu(base_cost * norm) | |
| done += 1 | |
| predicted_cpu_used += base_cost * norm | |
| if cpu_budget <= 0 and done >= tokens: | |
| time.sleep(0.002) | |
| else: | |
| time.sleep(0.001) | |
| u = resource.getrusage(resource.RUSAGE_SELF) | |
| c = resource.getrusage(resource.RUSAGE_CHILDREN) | |
| cpu_self = (u.ru_utime + u.ru_stime) | |
| cpu_children = (c.ru_utime + c.ru_stime) | |
| print(json.dumps({ | |
| "tokens_done": int(done), | |
| "epoch_sec": float(epoch_sec), | |
| "policy": policy, | |
| "cpu_time_sec_self": float(cpu_self), | |
| "cpu_time_sec_children": float(cpu_children) | |
| })) | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Usage (example): | |
| python triage_harness.py \ | |
| --epochs 24 --epoch-sec 5 --tokens 10000 --cpus 1 \ | |
| --freq 0.4 --amp 0.7 --base-cost 0.002 --gate-frac 0.7 \ | |
| --seed 7 --taskset-core 0 --cpu-budget-sec 2.5 --worker /path/to/gate_worker.py | |
| """ | |
| import argparse, os, time, json, math, random, subprocess, signal, csv, statistics, sys, platform | |
| CLK_TCK = os.sysconf(os.sysconf_names['SC_CLK_TCK']) if hasattr(os, 'sysconf') else 100 | |
| def read_proc_cpu(pid): | |
| try: | |
| with open(f"/proc/{pid}/stat", "r") as f: | |
| fields = f.read().split() | |
| utime = float(fields[13]) | |
| stime = float(fields[14]) | |
| return (utime + stime) / CLK_TCK | |
| except Exception: | |
| return None | |
| def percentile(values, p): | |
| vals = sorted(values) | |
| if not vals: | |
| return None | |
| k = (len(vals) - 1) * (p / 100.0) | |
| f = int(math.floor(k)) | |
| c = int(math.ceil(k)) | |
| if f == c: | |
| return vals[f] | |
| d0 = vals[f] * (c - k) | |
| d1 = vals[c] * (k - f) | |
| return d0 + d1 | |
| def run_epoch(worker_path, policy, args, phi_env, phi_scramble): | |
| env = os.environ.copy() | |
| env.update({ | |
| "POLICY_MODE": policy, | |
| "EPOCH_SEC": str(args.epoch_sec), | |
| "TOKENS": str(args.tokens), | |
| "BASE_COST": str(args.base_cost), | |
| "AMP": str(args.amp), | |
| "FREQ": str(args.freq), | |
| "T0": str(args.t0), | |
| "PHI_ENV": str(phi_env), | |
| "PHI_SCRAMBLE": str(phi_scramble), | |
| "GATE_FRAC": str(args.gate_frac), | |
| "CPU_BUDGET_SEC": str(args.cpu_budget_sec), | |
| }) | |
| cmd = [sys.executable, worker_path] | |
| if args.taskset_core is not None: | |
| if platform.system() == "Linux": | |
| cmd = ["taskset", "-c", str(args.taskset_core)] + cmd | |
| else: | |
| print(f"[warn] taskset not available on {platform.system()}, ignoring --taskset-core", file=sys.stderr) | |
| proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env, preexec_fn=os.setsid) | |
| start_cpu = read_proc_cpu(proc.pid) or 0.0 | |
| try: | |
| try: | |
| out, err = proc.communicate(timeout=args.epoch_sec + 5) | |
| except subprocess.TimeoutExpired: | |
| os.killpg(proc.pid, signal.SIGINT) | |
| try: | |
| out, err = proc.communicate(timeout=3) | |
| except subprocess.TimeoutExpired: | |
| os.killpg(proc.pid, signal.SIGKILL) | |
| out, err = proc.communicate(timeout=2) | |
| finally: | |
| end_cpu = read_proc_cpu(proc.pid) or start_cpu | |
| cpu_proc = max(0.0, end_cpu - start_cpu) | |
| tokens_done = 0 | |
| cpu_self = None | |
| cpu_children = None | |
| try: | |
| line = out.strip().splitlines()[-1] if out else "" | |
| obj = json.loads(line) if line else {} | |
| tokens_done = int(obj.get("tokens_done", 0)) | |
| if "cpu_time_sec_self" in obj: | |
| cpu_self = float(obj["cpu_time_sec_self"]) | |
| if "cpu_time_sec_children" in obj: | |
| cpu_children = float(obj["cpu_time_sec_children"]) | |
| except Exception: | |
| pass | |
| x_local = float(args.cpus) * float(args.epoch_sec) | |
| upper = x_local * 1.2 | |
| cpu_worker = None | |
| if cpu_self is not None: | |
| cpu_worker = cpu_self + (cpu_children or 0.0) | |
| cpu_sec = cpu_proc | |
| if cpu_worker is not None and 0.0 < cpu_worker <= upper: | |
| cpu_sec = cpu_worker | |
| else: | |
| if cpu_sec > upper: | |
| cpu_sec = upper | |
| if tokens_done > 0 and cpu_sec == 0.0: | |
| print(f"[warn] cpu_sec=0 with tokens_done={tokens_done}; using epsilon", file=sys.stderr) | |
| cpu_sec = 1e-6 | |
| return tokens_done, cpu_sec, out, err | |
| def bootstrap_diff_of_means(A, B, iters=5000, seed=123): | |
| rnd = random.Random(seed) | |
| if not A or not B: | |
| return (0.0, 0.0, 0.0) | |
| boots = [] | |
| for _ in range(iters): | |
| mA = sum(A[rnd.randrange(len(A))] for _ in range(len(A))) / len(A) | |
| mB = sum(B[rnd.randrange(len(B))] for _ in range(len(B))) / len(B) | |
| boots.append(mA - mB) | |
| boots.sort() | |
| mean_diff = sum(boots) / len(boots) | |
| lower = boots[int(0.025 * iters)] | |
| upper = boots[int(0.975 * iters)] | |
| return mean_diff, lower, upper | |
| def main(): | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--epochs", type=int, default=12) | |
| ap.add_argument("--epoch-sec", type=float, default=20.0) | |
| ap.add_argument("--tokens", type=int, default=2000) | |
| ap.add_argument("--cpus", type=float, default=1.0) | |
| ap.add_argument("--freq", type=float, default=0.3) | |
| ap.add_argument("--amp", type=float, default=0.7) | |
| ap.add_argument("--base-cost", type=float, default=0.001) | |
| ap.add_argument("--gate-frac", type=float, default=0.7) | |
| ap.add_argument("--seed", type=int, default=7) | |
| ap.add_argument("--taskset-core", type=int, default=None) | |
| ap.add_argument("--out-prefix", default="triage_") | |
| ap.add_argument("--worker", default=None) | |
| ap.add_argument("--cpu-budget-sec", type=float, default=0.0) | |
| args = ap.parse_args() | |
| random.seed(args.seed) | |
| args.t0 = time.time() + 1.0 | |
| phi_env = random.random() * 2*math.pi | |
| worker_path = args.worker or os.path.join(os.path.dirname(__file__), "gate_worker.py") | |
| if not os.path.exists(worker_path): | |
| print("gate_worker.py not found; pass --worker", file=sys.stderr) | |
| sys.exit(2) | |
| arms = ["ON", "SCRAMBLED", "OFF"] | |
| blocks = args.epochs // 3 | |
| remainder = args.epochs % 3 | |
| schedule = [] | |
| for _ in range(blocks): | |
| b = arms[:] | |
| random.shuffle(b) | |
| schedule.extend(b) | |
| if remainder: | |
| b = arms[:] | |
| random.shuffle(b) | |
| schedule.extend(b[:remainder]) | |
| results = [] | |
| print(f"# Three-arm RCT v2 | epochs={args.epochs}, sec={args.epoch_sec}, tokens={args.tokens}, cpus={args.cpus}") | |
| for i, arm in enumerate(schedule, 1): | |
| phi_scramble = random.random() * 2*math.pi | |
| tokens_done, cpu_sec, out, err = run_epoch(worker_path, arm, args, phi_env, phi_scramble) | |
| X_local = args.cpus * args.epoch_sec | |
| Y = tokens_done / args.tokens # bounded [0,1] | |
| eta_capture = min(cpu_sec, X_local) / X_local if X_local > 0 else 0.0 | |
| results.append({ | |
| "epoch": i, "arm": arm, "tokens_done": tokens_done, "Y": Y, | |
| "cpu_sec": cpu_sec, "X_local": X_local, "eta_capture": eta_capture | |
| }) | |
| tpc = (tokens_done / cpu_sec) if cpu_sec > 1e-9 else float('inf') | |
| print(f"epoch {i:02d} arm={arm:<10} tokens={tokens_done:5d}/{args.tokens} Y={Y:.3f} cpu={cpu_sec:.3f}s tok/CPU={tpc if tpc!=float('inf') else 0.0:.1f}") | |
| # Compute tokens per CPU-sec | |
| tokens_per_cpu = [(r["tokens_done"] / r["cpu_sec"]) if r["cpu_sec"] > 1e-9 else None for r in results] | |
| # Robust normalization base: 95th percentile, excluding pathological values | |
| valid_for_norm = [v for v in tokens_per_cpu if (v is not None and v > 0 and v < 1e7)] | |
| robust_base = percentile(valid_for_norm, 95) if valid_for_norm else (max(valid_for_norm) if valid_for_norm else 1.0) | |
| # Eta components | |
| for r, yps in zip(results, tokens_per_cpu): | |
| eta_convert = 0.0 | |
| if yps is not None and robust_base > 0: | |
| eta_convert = min(1.0, yps / robust_base) | |
| r["eta_convert"] = eta_convert | |
| r["eta_chain"] = r["eta_capture"] * eta_convert | |
| # Group by arm | |
| by_arm = {"ON": [], "SCRAMBLED": [], "OFF": []} | |
| for r in results: | |
| by_arm[r["arm"]].append(r) | |
| # DI on Y (tokens fraction) | |
| DI_on_off = bootstrap_diff_of_means([r["Y"] for r in by_arm["ON"]], | |
| [r["Y"] for r in by_arm["OFF"]]) | |
| DI_on_scr = bootstrap_diff_of_means([r["Y"] for r in by_arm["ON"]], | |
| [r["Y"] for r in by_arm["SCRAMBLED"]]) | |
| # DI on tokens per CPU-sec (efficiency outcome) | |
| eff_ON = [ (r["tokens_done"]/r["cpu_sec"]) for r in by_arm["ON"] if r["cpu_sec"] > 1e-3 ] | |
| eff_OFF = [ (r["tokens_done"]/r["cpu_sec"]) for r in by_arm["OFF"] if r["cpu_sec"] > 1e-3 ] | |
| eff_SCR = [ (r["tokens_done"]/r["cpu_sec"]) for r in by_arm["SCRAMBLED"] if r["cpu_sec"] > 1e-3 ] | |
| DIeff_on_off = bootstrap_diff_of_means(eff_ON, eff_OFF) | |
| DIeff_on_scr = bootstrap_diff_of_means(eff_ON, eff_SCR) | |
| eta_chain_mean = statistics.mean([r["eta_chain"] for r in results]) if results else 0.0 | |
| Aidx_on_off = (DI_on_off[0] if DI_on_off[0] > 0 else 0.0) * eta_chain_mean | |
| Aidx_on_scr = (DI_on_scr[0] if DI_on_scr[0] > 0 else 0.0) * eta_chain_mean | |
| # Write CSV | |
| csv_path = f"{args.out_prefix}results.csv" | |
| with open(csv_path, "w", newline="") as f: | |
| w = csv.DictWriter(f, fieldnames=list(results[0].keys())) | |
| w.writeheader() | |
| for r in results: | |
| w.writerow(r) | |
| summary = { | |
| "epochs": args.epochs, | |
| "epoch_sec": args.epoch_sec, | |
| "tokens": args.tokens, | |
| "cpus": args.cpus, | |
| "freq": args.freq, | |
| "amp": args.amp, | |
| "base_cost": args.base_cost, | |
| "gate_frac": args.gate_frac, | |
| "cpu_budget_sec": args.cpu_budget_sec, | |
| "robust_norm_base_tokens_per_cpu_p95": robust_base, | |
| "dropped_for_norm": len(tokens_per_cpu) - len(valid_for_norm), | |
| "DI_Y_ON_minus_OFF": {"mean": DI_on_off[0], "CI95": [DI_on_off[1], DI_on_off[2]]}, | |
| "DI_Y_ON_minus_SCR": {"mean": DI_on_scr[0], "CI95": [DI_on_scr[1], DI_on_scr[2]]}, | |
| "DI_eff_ON_minus_OFF": {"mean": DIeff_on_off[0], "CI95": [DIeff_on_off[1], DIeff_on_off[2]]}, | |
| "DI_eff_ON_minus_SCR": {"mean": DIeff_on_scr[0], "CI95": [DIeff_on_scr[1], DIeff_on_scr[2]]}, | |
| "eta_chain_mean": eta_chain_mean, | |
| "A_index_ON_vs_OFF": Aidx_on_off, | |
| "A_index_ON_vs_SCR": Aidx_on_scr, | |
| } | |
| json_path = f"{args.out_prefix}summary.json" | |
| with open(json_path, "w") as f: | |
| json.dump(summary, f, indent=2) | |
| print("# Summary:", json.dumps(summary, indent=2)) | |
| print(f"# Wrote {csv_path} and {json_path}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment