Skip to content

Instantly share code, notes, and snippets.

@jaguilar
Last active May 19, 2019 04:38
Show Gist options
  • Save jaguilar/a590ae106776544143785c122a53eeb2 to your computer and use it in GitHub Desktop.
Save jaguilar/a590ae106776544143785c122a53eeb2 to your computer and use it in GitHub Desktop.
import { Scheduler } from "scheduler.js";
function computeCounts(ns, target, hackThreads, hackFraction) {
let hackTime = ns.getHackTime(target);
let growTime = ns.getGrowTime(target);
let weakenTime = ns.getWeakenTime(target);
let growThreads = Math.ceil(ns.growthAnalyze(target, 1.06) * growTime / hackTime);
// Note: weaken weakens the security level by 0.05, so we have to multiply the rate of
// security increases by 20 to get the desired number of threads.
let weakenThreads = Math.ceil(1.1 * 20 * (0.002 * hackThreads * (weakenTime / hackTime) +
0.004 * growThreads * (weakenTime / growTime)));
return {hack: hackThreads, grow: growThreads, weaken: weakenThreads};
}
function ramUsage(ns, threadDict) {
return ns.getScriptRam("hack.js") * threadDict.hack +
ns.getScriptRam("grow.js") * threadDict.grow +
ns.getScriptRam("weaken.js") * threadDict.weaken;
}
function threadsForScript(scheduler, filename) {
return _(scheduler.jobs).filter(j => j.filename == filename).map(j => j.threads).sum()
}
// Calculates ram available, counting ram currently dedicated to hacking as available.
// Depends on the scheduler being synced, so checkIntegrity(true) must be called before this.
function calculateRamAvailable(ns, scheduler) {
let ramAvailable = _(scheduler.servers).map(s => s.ramAvailable()).sum();
let activeHackingRam = ramUsage(ns, {
hack: threadsForScript(scheduler, "hack.js"),
grow: threadsForScript(scheduler, "grow.js"),
weaken: threadsForScript(scheduler, "weaken.js"),
});
return ramAvailable + activeHackingRam;
}
function decideHackThreads(ns, target, ramAvailable) {
let oneHackFraction = ns.hackAnalyzePercent(target) / 100;
// We want to hack up to 5% of the server's money per hack.
let desiredHackThreads = oneHackFraction === 0 ? 1 : Math.max(1, Math.floor(0.05 / oneHackFraction));
if (ns.getServerSecurityLevel(target) > 5 + ns.getServerMinSecurityLevel(target)) {
// If the server's security level is very high, it will return a low fraction. This will result
// in a high number of hack threads which will cause instability. Instead, we'll cap the number of
// hack threads to a semi-reasonable value. Since we scehdule a slight excess of grow and weaken
// threads, over time, the server's security level will improve and we'll get a more accurate estimate.
desiredHackThreads = Math.min(200, desiredHackThreads);
}
console.log(oneHackFraction, desiredHackThreads);
let threadDict = computeCounts(ns, target, desiredHackThreads);
let desiredRamUsage = ramUsage(ns, threadDict);
if (ramAvailable < desiredRamUsage) {
let scaleFactor = 0.9 /* fudge factor */ * ramAvailable / desiredRamUsage;
threadDict = {
hack: Math.floor(threadDict.hack * scaleFactor),
grow: Math.ceil(threadDict.grow * scaleFactor),
weaken: Math.ceil(threadDict.weaken * scaleFactor),
};
let hackThreads = Math.floor(desiredHackThreads * ramAvailable / desiredRamUsage - 1);
ns.tprint(target + ": scaling down desired hack threads " + desiredHackThreads + " -> " + threadDict.hack);
while (ramUsage(ns, threadDict) > ramAvailable) {
if (threadDict.hack > 0) --threadDict.hack;
else if (threadDict.grow > 0) --threadDict.grow;
else if (threadDict.weaken > 0) --threadDict.weaken;
else throw new Error("zero'd threadDict still using more ram than is available");
}
}
return threadDict;
}
function adjustThreadsForUnreadyServers(ns, target, threadDict) {
// If we're far from the minimum security level, don't start the grow threads yet.
let startGrow = ns.getServerSecurityLevel(target) < 10 + ns.getServerMinSecurityLevel(target);
// If we're far from the maximum money, don't start the hack threads yet.
let startHack = ns.getServerMoneyAvailable(target) > 0.2 * ns.getServerMaxMoney(target);
let hackMem = ns.getScriptRam("hack.js");
let growMem = ns.getScriptRam("grow.js");
let weakenMem = ns.getScriptRam("weaken.js");
if (!startGrow) {
threadDict.weaken = Math.floor(
threadDict.weaken +
threadDict.hack * hackMem / weakenMem +
threadDict.grow * growMem / weakenMem);
threadDict.grow = 0;
threadDict.hack = 0;
ns.tprint(target + ": not starting grow or hack (security level too high) -- reassigning threads to weaken");
} else if (!startHack) {
threadDict.grow = Math.floor(threadDict.grow + threadDict.hack * hackMem / growMem);
threadDict.hack = 0;
ns.tprint(target + ": not starting hack (money too low) -- reassigning threads to grow");
}
}
// Returns the amount of ram to be used by the scheduled threads required for executing on a single target.
async function scheduleOneTarget(ns, scheduler, target, threadDict) {
await scheduler.start({filename: "weaken.js", args: [target], threads: threadDict.weaken});
await scheduler.start({filename: "grow.js", args: [target], threads: threadDict.grow});
await scheduler.start({filename: "hack.js", args: [target], threads: threadDict.hack});
}
async function startExtraWeakens(scheduler, weakens) {
await scheduler.start({filename: "weaken.js", args: ["foodnstuff", "extra_weakens"], threads: weakens});
}
async function stopExtraWeakens(scheduler) {
await scheduler.stop({filename: "weaken.js", args: ["foodnstuff", "extra_weakens"]});
}
async function scheduleAllTargets(ns, scheduler) {
// We reduce the amount of ramAvailable by the length of the scheduler server list, since
// it's possible we'll have some fragmentation of ram on each server. The upper bound of fragmentation
// is the size of the largest of hack.js, grow.js, and weaken.js times the number of servers.
let ramAvailable = calculateRamAvailable(ns, scheduler) - 2 * scheduler.servers.length;
// Order the servers from easiest to hardest to hack. Even though this seems inefficient,
// it helps get hacking experience up fast, which will in turn lead to faster hacks, more
// hacking of not-yet-owned servers, and therefore faster mid and long-term money growth.
let hostAndThreads = _(scheduler.servers).
filter(s => {
let condition = s.hostname != "home" && ns.getServerMaxMoney(s.hostname) > 1000000;
if (!condition) ns.tprint(s.hostname + ": not a good source of funds");
return condition;
}).
orderBy([s => ns.getServerBaseSecurityLevel(s.hostname)], ["asc"]).
map(s => {
let threadDict = decideHackThreads(ns, s.hostname, ramAvailable);
ramAvailable -= ramUsage(ns, threadDict);
return {target: s.hostname, threadDict: threadDict};
}).
value();
// While there is ram available, scale up the grow and weaken values for each server, again in priority
// order. On the other hand, if there is not ram available AND a server isn't ready, focus on weakening
// or growing it as needed.
for (let ht of hostAndThreads) {
let used = ramUsage(ns, ht.threadDict);
if (used > ramAvailable) {
// We are in a memory constrained scenario. In certain circumstances, we will rededicate
// hack threads to grow, or hack and grow threads to weaken.
adjustThreadsForUnreadyServers(ns, ht.target, ht.threadDict);
continue;
}
// We are in a relatively unconstrained situation. We can increase our grow and weaken amounts
// to ensure we don't fall behind on those tasks. We don't want to
// do this with hack because we risk getting into a scenario where we totally drain out a server.
// This will also help to grow servers which are not currently at capacity, while not requiring
// complicated management of dedicated jobs for this purpose.
let growWeakenMultiplier = Math.floor(Math.min(4, ramAvailable/used));
ht.threadDict.grow *= growWeakenMultiplier;
ht.threadDict.weaken *= growWeakenMultiplier;
ramAvailable += used - ramUsage(ns, ht.threadDict);
}
// With any extra ram, we will start a large number of weaken threads.
let extraWeakens = Math.floor(0.90 * ramAvailable / ns.getScriptRam("weaken.js"));
await startExtraWeakens(scheduler, extraWeakens);
// We actually do the scheduling in reverse order, since there is a chance jobs at the tail
// will shrink, freeing up space for jobs at the start.
_.reverse(hostAndThreads);
for (let i = 0; i < hostAndThreads.length; ++i) {
await scheduleOneTarget(ns, scheduler, hostAndThreads[i].target, hostAndThreads[i].threadDict);
}
}
export async function main(ns) {
while (true) {
let scheduler = new Scheduler(ns);
await scheduler.beginTransaction();
try {
await scheduleAllTargets(ns, scheduler);
} finally {
scheduler.maybeCommitTransaction();
}
// Decide how long to sleep. This should be at least 20s longer than the duration of the most difficult
// task on the most difficult server where we have root access, or else about five minutes.
let sleepSeconds = Math.max(
60 * 5,
20 + _(scheduler.servers).map(s => {
let h = s.hostname;
return Math.max(ns.getGrowTime(h), ns.getWeakenTime(h), ns.getHackTime(h));
}).max());
await ns.sleep(1000 * sleepSeconds);
}
}
// Job:
// {
// filename: "a-program.js", // Always stored on "home".
// args: [array, of, arguments],
// threads: the_desired_number_of_threads,
// }
//
// Note that jobs are unique in the database by program and args. addProgram
// will kill and restart tasks if the same job is requested with different
// numbers of threads.
//
//
// Server:
// {
// hostname: // The hostname.
// ram: 12, // Amount of ram on server.
// }
function findUnownedServers(ns) {
let queue = ["home"];
let out = [];
while (queue.length > 0) {
let host = queue.shift();
if (!ns.hasRootAccess(host)) continue;
out.push(host);
_(ns.scan(host)).forEach(scanned => {
if (_(out).find(x => _.isEqual(x, scanned)) !== undefined) return;
queue.push(scanned);
});
}
return out;
}
// Returns whether a task or job is the same as another task or job.
function isSameJob(a, b) {
return a.filename == b.filename && _.isEqual(a.args, b.args);
}
function listOwnedServers(ns) {
throw new Exception("todo");
}
class Server {
constructor(ns, hostname) {
this.ns = ns;
this.hostname = hostname;
}
ram() {
let ram = this.ns.getServerRam(this.hostname)[0];
if (this.hostname == "home") {
// Keep the greater of 32MB or 20% of RAM free on home.
ram -= Math.max(Math.ceil(ram * .2), 32);
}
return Math.max(0, ram);
}
ramAvailable() {
return Math.max(0, this.ram() - this.ns.getServerRam(this.hostname)[1]);
}
tasks() {
return this.ns.ps(this.hostname);
}
isRunning(filename, args) {
let j = {
filename: filename,
args: args
};
return _(this.tasks()).some(t => isSameJob(j, t));
}
}
class Job {
constructor(ns, json) {
this.ns = ns;
this.filename = json.filename;
this.args = json.args;
this.threads = json.threads;
}
// Returns a version of this serialized as JSON.
json() {
return JSON.stringify({ filename: this.filename, args: this.args, threads: this.threads });
}
// Returns per thread ram usage, assuming the script is the version that lives on the server
// running the scheduler.
ram() {
return this.ns.getScriptRam(this.filename);
}
}
// Returns the number of threads job is using on server.
function jobThreadsOn(ns, job, server) {
let task = _(server.tasks()).filter(t => isSameJob(job, t)).head();
if (task === undefined) return 0;
return task.threads;
}
async function jobIsGone(ns, job, hostname) {
let start = Date.now();
while (ns.isRunning(job.filename, hostname, ...job.args)) {
await ns.sleep(500);
if (Date.now() - start > 20000) {
ns.tprint("a while passed since we killed " +
"a job and it's still running: " + job.json());
start = Date.now();
}
}
}
function loadServers(ns) {
return _.map(findUnownedServers(ns), h => new Server(ns, h));
}
let lockFile = "scheduler_lock.txt";
let dbFile = "scheduler_db.txt";
function loadJobs(ns) {
let data = ns.read(dbFile);
if (data.length === 0) return [];
return _.map(JSON.parse(data), j => new Job(ns, j));
}
function storeJobs(ns, jobs) {
ns.write(dbFile,
JSON.stringify(_(jobs).
filter(j => j.threads > 0).
map(j => _.pick(j, ['filename', 'threads', 'args']))),
"w");
}
async function lock(ns) {
let start = Date.now();
while (ns.read(lockFile) != "") {
await ns.sleep(100 + Math.random() * 25);
if (Date.now() > start + 10000) {
ns.tprint("scheduler lock is still locked after 10s, current holder: " + ns.read(lockFile));
start = Date.now();
}
}
ns.write(lockFile, ns.sprintf("%s %s", ns.getScriptName(), JSON.stringify(ns.args)));
}
function unlock(ns) {
ns.rm(lockFile);
}
function doList(ns, jobs) {
_(jobs).forEach(j => {
ns.tprint(ns.sprintf("%s %s (%d thread(s))", j.filename, _(j.args).join(" "), j.threads));
});
}
function doListUnowned(ns, jobs, servers) {
_(servers).forEach(s => {
_(s.tasks()).forEach(t => {
if (_(jobs).some(j => isSameJob(j, t))) return;
// We found a task that doesn't match any of the jobs we have listed in our
// db. Print it to the terminal.
ns.tprint(ns.sprintf(
"%s: %s %s%s",
s.hostname,
t.filename,
_.join(t.args, " "),
t.threads > 1 ? ns.sprintf(" (%d threads)", t.threads) : ""));
});
});
}
function parseSpec(ns, args) {
if (args.length < 1) return null;
let filename = args.shift();
let threads = 1;
if (args[0] == "-t") {
args.shift();
threads = parseInt(args.shift());
}
return new Job(ns, { filename: filename, args: [...args], threads: threads });
}
function clamp(value, min, max) {
return Math.max(min, Math.min(value, max));
}
async function makeItSo(ns, jobs, servers, job) {
if (job.ns != ns) { job = new Job(ns, job); }
let confirmedThreads = 0;
// Get the potential hosts to run this. First we consider any host already
// running this job. Then we'll consider high ram hosts not running this job.
// This will tend to have the effect of compacting the job onto a small number
// of servers over time.
let potentialServers = _.orderBy(servers, [
s => _(s.tasks()).some(t => isSameJob(job, t)),
s => s.ram()
], ["desc", "desc"]);
let hostnamesToDrop = [];
let tasksToCreate = [];
_.forEach(potentialServers, server => {
// Now is when we need to start copying scripts to servers, since we're going to depend
// on the ram calculation.
ns.scp(job.filename, server.hostname);
let activeOnServer = jobThreadsOn(ns, job, server);
let maxOnServer = Math.floor(server.ramAvailable() / job.ram()) + activeOnServer;
let desiredThreads = job.threads - confirmedThreads;
let target = Math.min(maxOnServer, desiredThreads);
confirmedThreads += target;
if (activeOnServer == target) return;
if (activeOnServer > 0) {
hostnamesToDrop.push(server.hostname);
}
if (target > 0) {
tasksToCreate.push({hostname: server.hostname, threads: target});
}
});
// Kill all of the jobs first in a batch, then come back and wait for them to go away.
for (const h of hostnamesToDrop) {
ns.kill(job.filename, h, ...job.args);
}
for (const h of hostnamesToDrop) {
await jobIsGone(ns, job, h);
}
for (const t of tasksToCreate) {
await ns.exec(job.filename, t.hostname, t.threads, ...job.args);
}
// Upsert this job into the job list.
_.remove(jobs, j => isSameJob(j, job));
if (job.threads > confirmedThreads) {
ns.tprint(ns.sprintf("could not start all threads for job, started %d: %s", confirmedThreads, job.json()));
}
job.threads = confirmedThreads;
jobs.push(job);
storeJobs(ns, jobs);
}
async function doStart(ns, jobs, servers) {
let job = parseSpec(ns, ns.args);
await makeItSo(ns, jobs, servers, job);
}
async function doStop(ns, jobs, servers) {
let job = parseSpec(ns, ns.args);
job.threads = 0;
await makeItSo(ns, jobs, servers, job);
}
async function checkIntegrity(ns, jobs, servers, removeUnknownJobs) {
let promises = [];
jobs.forEach(j => { j.threads = 0; });
_.forEach(servers, s => {
_.forEach(s.tasks(), t => {
let job = _(jobs).find(j => isSameJob(j, t));
if (job === undefined) {
let privileged = t.filename == "scheduler.js" || s.hostname == "home";
if (!privileged && removeUnknownJobs) promises.push(rmJobOn(ns, t, s));
} else {
job.threads += jobThreadsOn(ns, t, s);
}
});
});
_.remove(jobs, j => j.threads === 0);
await Promise.all(promises);
}
export class Scheduler {
constructor(ns) {
this.ns = ns;
this.haveLock = false;
}
async beginTransaction() {
await lock(this.ns);
this.haveLock = true;
this.load();
}
maybeCommitTransaction() {
if (!this.haveLock) return;
storeJobs(this.ns, this.jobs);
this.haveLock = false;
unlock(this.ns);
}
// You can use load directly when you're not planning on modifying the scheduler
// state.
load() {
this.jobs = loadJobs(this.ns);
this.servers = loadServers(this.ns);
}
async start(job) {
if (!this.haveLock) {
throw new Error("tried to call Scheduler.start() without an active transaction.");
}
await makeItSo(this.ns, this.jobs, this.servers, job);
}
async stop(job) {
if (!this.haveLock) {
throw new Error("tried to call Scheduler.stop() without an active transaction.");
}
job.threads = 0;
await makeItSo(this.ns, this.jobs, this.servers, job);
}
async checkIntegrity(killUnknownJobs) {
if (!this.haveLock && killUnknownJobs) {
throw new Error("tried to call Scheduler.checkIntegrity(true) without an active transaction.");
}
await checkIntegrity(this.ns, this.jobs, this.servers, killUnknownJobs);
}
ram() {
return _(this.servers).map(s => s.ram()).sum();
}
ramAvailable() {
return _(this.servers).map(s => s.ramAvailable()).sum();
}
// Reschedules all jobs in the jobs database with their current thread count. This is to cover a situation
// where some tasks were killed outside the scheduler and we want to restore the scheduled state of the
// world.
async reschedule() {
if (!this.haveLock) {
throw new Error("tried to call Scheduler.reschedule() without an active transaction.");
}
for (const j of this.jobs) {
await this.start(j);
}
}
}
export async function main(ns) {
// jobspec: filename [args]
// commands:
// start -t NUMTHREADS jobspec # Start one owned job.
// stop jobspec # Stop one owned job.
// list # List all owned jobs.
// listunowned # List all unowned jobs, excepting jobs on home.
// fsck # Delete all unowned jobs, excepting jobs on home.
if (ns.args.size < 1) return;
let scheduler = new Scheduler(ns);
try {
let command = ns.args.shift();
if (command == "list") {
scheduler.load();
doList(ns, scheduler.jobs);
} else if (command == "listunowned") {
scheduler.load();
doListUnowned(ns, scheduler.jobs, scheduler.servers);
} else if (command == "available") {
scheduler.load();
ns.tprint("ram available to scheduler: " + scheduler.ramAvailable());
} else if (command == "start") {
await scheduler.beginTransaction();
await scheduler.start(parseSpec(ns, ns.args));
} else if (command == "stop") {
await scheduler.beginTransaction();
await scheduler.stop(parseSpec(ns, ns.args));
} else if (command == "fsck") {
await scheduler.beginTransaction();
await scheduler.checkIntegrity(true);
} else {
ns.tprint("unknown command: " + command);
}
} finally {
scheduler.maybeCommitTransaction();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment