Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?

go 1.1 scheduler

where

src/pkg/runtime proc.c asm_*.s

design

proc.c:

// Goroutine scheduler
// The scheduler's job is to distribute ready-to-run goroutines over worker threads.
//
// The main concepts are:
// G - goroutine.
// M - worker thread, or machine.
// P - processor, a resource that is required to execute Go code.
//     M must have an associated P to execute Go code, however it can be
//     blocked or in a syscall w/o an associated P.
//
// Design doc at http://golang.org/s/go11sched.

P の数の上限 = MAXGOPROCS

go したときに遊んでる M がない and P が余ってると M を作る

システムコール実行時に P を開放する. 実行可能な G があって、でも M が無い場合、 M を作ってやる

reading

  • package represented by · (middle dot)
  • scheduler is in runtime package
    • proc.c
    • asm_*.s

start goroutine

newproc1 で G を一個作り、 PC に goexit を、 fnstart にその goroutine のトップの関数を設定する. その後 G を runqput でランキューに突っ込む

その後どっかの M が新しい Goroutine を実行しようと schedule() を呼び出し、 runqget() で取り出した G を execute() で実行する

execute() は fnstart が goexit なら fnstart の呼び出しから開始し、 そうでなければ gogo で PC から実行を再開する

proc.c:

// Create a new g running fn with narg bytes of arguments starting
// at argp and returning nret bytes of results.  callerpc is the
// address of the go statement that created this.  The new g is put
// on the queue of g's waiting to run.
G*
runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
	byte *sp;
	G *newg;
	int32 siz;

//printf("newproc1 %p %p narg=%d nret=%d\n", fn, argp, narg, nret);
	siz = narg + nret;
	siz = (siz+7) & ~7;

	// We could instead create a secondary stack frame
	// and make it look like goexit was on the original but
	// the call to the actual goroutine function was split.
	// Not worth it: this is almost always an error.
	if(siz > StackMin - 1024)
		runtime·throw("runtime.newproc: function arguments too large for new goroutine");

	if((newg = gfget(m->p)) != nil) {
		if(newg->stackguard - StackGuard != newg->stack0)
			runtime·throw("invalid stack in newg");
	} else {
		newg = runtime·malg(StackMin);
		runtime·lock(&runtime·sched);
		if(runtime·lastg == nil)
			runtime·allg = newg;
		else
			runtime·lastg->alllink = newg;
		runtime·lastg = newg;
		runtime·unlock(&runtime·sched);
	}

	sp = (byte*)newg->stackbase;
	sp -= siz;
	runtime·memmove(sp, argp, narg);
	if(thechar == '5') {
		// caller's LR
		sp -= sizeof(void*);
		*(void**)sp = nil;
	}

	newg->sched.sp = (uintptr)sp;
	newg->sched.pc = (byte*)runtime·goexit;
	newg->sched.g = newg;
	newg->fnstart = fn;
	newg->gopc = (uintptr)callerpc;
	newg->status = Grunnable;
	newg->goid = runtime·xadd64(&runtime·sched.goidgen, 1);
	if(raceenabled)
		newg->racectx = runtime·racegostart(callerpc);
	runqput(m->p, newg);

	if(runtime·atomicload(&runtime·sched.npidle) != 0 && runtime·atomicload(&runtime·sched.nmspinning) == 0 && fn->fn != runtime·main)  // TODO: fast atomic
		wakep();
	return newg;
}

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
static void
schedule(void)
{
	G *gp;

	if(m->locks)
		runtime·throw("schedule: holding locks");

top:
	if(runtime·gcwaiting) {
		gcstopm();
		goto top;
	}

	gp = runqget(m->p);
	if(gp == nil)
		gp = findrunnable();

	if(m->spinning) {
		m->spinning = false;
		runtime·xadd(&runtime·sched.nmspinning, -1);
	}

	// M wakeup policy is deliberately somewhat conservative (see nmspinning handling),
	// so see if we need to wakeup another M here.
	if (m->p->runqhead != m->p->runqtail &&
		runtime·atomicload(&runtime·sched.nmspinning) == 0 &&
		runtime·atomicload(&runtime·sched.npidle) > 0)  // TODO: fast atomic
		wakep();

	if(gp->lockedm) {
		startlockedm(gp);
		goto top;
	}

	execute(gp);
}

// Schedules gp to run on the current M.
// Never returns.
static void
execute(G *gp)
{
	int32 hz;

	if(gp->status != Grunnable) {
		runtime·printf("execute: bad g status %d\n", gp->status);
		runtime·throw("execute: bad g status");
	}
	gp->status = Grunning;
	m->p->tick++;
	m->curg = gp;
	gp->m = m;

	// Check whether the profiler needs to be turned on or off.
	hz = runtime·sched.profilehz;
	if(m->profilehz != hz)
		runtime·resetcpuprofiler(hz);

	if(gp->sched.pc == (byte*)runtime·goexit)  // kickoff
		runtime·gogocallfn(&gp->sched, gp->fnstart);
	runtime·gogo(&gp->sched, 0);
}

asm_386.s

// void gogo(Gobuf*, uintptr)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), 7, $0
	MOVL	8(SP), AX		// return 2nd arg
	MOVL	4(SP), BX		// gobuf
	MOVL	gobuf_g(BX), DX
	MOVL	0(DX), CX		// make sure g != nil
	get_tls(CX)
	MOVL	DX, g(CX)
	MOVL	gobuf_sp(BX), SP	// restore SP
	MOVL	gobuf_pc(BX), BX
	JMP	BX

exit goroutine

最初に goexit を PC に入れてたので、 goroutine のトップの関数から 戻ると goexit に飛ぶ goexit は goexit0 を呼び、 goexit0 は schedule() を呼び、次の goroutine が実行される

proc.c:

// Finishes execution of the current goroutine.
void
runtime·goexit(void)
{
	if(raceenabled)
		runtime·racegoend();
	runtime·mcall(goexit0);
}

// runtime·goexit continuation on g0.
static void
goexit0(G *gp)
{
	gp->status = Gdead;
	gp->m = nil;
	gp->lockedm = nil;
	m->curg = nil;
	m->lockedg = nil;
	if(m->locked & ~LockExternal) {
		runtime·printf("invalid m->locked = %d", m->locked);
		runtime·throw("internal lockOSThread error");
	}	
	m->locked = 0;
	runtime·unwindstack(gp, nil);
	gfput(m->p, gp);
	schedule();
}

stealing

schedule() の中で呼ばれてたよね。

proc.c:

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
static G*
findrunnable(void)
{
	G *gp;
	P *p;
	int32 i;

top:
	if(runtime·gcwaiting) {
		gcstopm();
		goto top;
	}
	// local runq
	gp = runqget(m->p);
	if(gp)
		return gp;
	// global runq
	if(runtime·sched.runqsize) {
		runtime·lock(&runtime·sched);
		gp = globrunqget(m->p);
		runtime·unlock(&runtime·sched);
		if(gp)
			return gp;
	}
	// poll network
	gp = runtime·netpoll(false);  // non-blocking
	if(gp) {
		injectglist(gp->schedlink);
		gp->status = Grunnable;
		return gp;
	}
	// If number of spinning M's >= number of busy P's, block.
	// This is necessary to prevent excessive CPU consumption
	// when GOMAXPROCS>>1 but the program parallelism is low.
	if(!m->spinning && 2 * runtime·atomicload(&runtime·sched.nmspinning) >= runtime·gomaxprocs - runtime·atomicload(&runtime·sched.npidle))  // TODO: fast atomic
		goto stop;
	if(!m->spinning) {
		m->spinning = true;
		runtime·xadd(&runtime·sched.nmspinning, 1);
	}
	// random steal from other P's
	for(i = 0; i < 2*runtime·gomaxprocs; i++) {
		if(runtime·gcwaiting)
			goto top;
		p = runtime·allp[runtime·fastrand1()%runtime·gomaxprocs];
		if(p == m->p)
			gp = runqget(p);
		else
			gp = runqsteal(m->p, p);
		if(gp)
			return gp;
	}
stop:
	// return P and block
	runtime·lock(&runtime·sched);
	if(runtime·gcwaiting) {
		runtime·unlock(&runtime·sched);
		goto top;
	}
	if(runtime·sched.runqsize) {
		gp = globrunqget(m->p);
		runtime·unlock(&runtime·sched);
		return gp;
	}
	p = releasep();
	pidleput(p);
	runtime·unlock(&runtime·sched);
	if(m->spinning) {
		m->spinning = false;
		runtime·xadd(&runtime·sched.nmspinning, -1);
	}
	// check all runqueues once again
	for(i = 0; i < runtime·gomaxprocs; i++) {
		p = runtime·allp[i];
		if(p && p->runqhead != p->runqtail) {
			runtime·lock(&runtime·sched);
			p = pidleget();
			runtime·unlock(&runtime·sched);
			if(p) {
				acquirep(p);
				goto top;
			}
			break;
		}
	}
	// poll network
	if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
		if(m->p)
			runtime·throw("findrunnable: netpoll with p");
		if(m->spinning)
			runtime·throw("findrunnable: netpoll with spinning");
		gp = runtime·netpoll(true);  // block until new work is available
		runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
		if(gp) {
			runtime·lock(&runtime·sched);
			p = pidleget();
			runtime·unlock(&runtime·sched);
			if(p) {
				acquirep(p);
				injectglist(gp->schedlink);
				gp->status = Grunnable;
				return gp;
			}
			injectglist(gp);
		}
	}
	stopm();
	goto top;
}

entering syscall

システムコール実行前に呼ばれる. entersyscallblock() するとすぐに handoffp() する。 entersyscall() すると Psyscall 状態になり、 sysmon スレッドに監視され、 一定時間経つと handoffp() される。 handoffp() は startm() して P に新しい M を割り当てる。

proc.c:

// The same as runtime·entersyscall(), but with a hint that the syscall is blocking.
#pragma textflag 7
void
·entersyscallblock(int32 dummy)
{
	P *p;

	if(m->profilehz > 0)
		runtime·setprof(false);

	// Leave SP around for gc and traceback.
	g->sched.sp = (uintptr)runtime·getcallersp(&dummy);
	g->sched.pc = runtime·getcallerpc(&dummy);
	g->sched.g = g;
	g->gcsp = g->sched.sp;
	g->gcpc = g->sched.pc;
	g->gcstack = g->stackbase;
	g->gcguard = g->stackguard;
	g->status = Gsyscall;
	if(g->gcsp < g->gcguard-StackGuard || g->gcstack < g->gcsp) {
		// runtime·printf("entersyscallblock inconsistent %p [%p,%p]\n",
		//	g->gcsp, g->gcguard-StackGuard, g->gcstack);
		runtime·throw("entersyscallblock");
	}

	p = releasep();
	handoffp(p);
	if(g->isbackground)  // do not consider blocked scavenger for deadlock detection
		inclocked(1);
	runtime·gosave(&g->sched);  // re-save for traceback
}

// Hands off P from syscall or locked M.
static void
handoffp(P *p)
{
	// if it has local work, start it straight away
	if(p->runqhead != p->runqtail || runtime·sched.runqsize) {
		startm(p, false);
		return;
	}
	// no local work, check that there are no spinning/idle M's,
	// otherwise our help is not required
	if(runtime·atomicload(&runtime·sched.nmspinning) + runtime·atomicload(&runtime·sched.npidle) == 0 &&  // TODO: fast atomic
		runtime·cas(&runtime·sched.nmspinning, 0, 1)) {
		startm(p, true);
		return;
	}
	runtime·lock(&runtime·sched);
	if(runtime·gcwaiting) {
		p->status = Pgcstop;
		if(--runtime·sched.stopwait == 0)
			runtime·notewakeup(&runtime·sched.stopnote);
		runtime·unlock(&runtime·sched);
		return;
	}
	if(runtime·sched.runqsize) {
		runtime·unlock(&runtime·sched);
		startm(p, false);
		return;
	}
	// If this is the last running P and nobody is polling network,
	// need to wakeup another M to poll network.
	if(runtime·sched.npidle == runtime·gomaxprocs-1 && runtime·atomicload64(&runtime·sched.lastpoll) != 0) {
		runtime·unlock(&runtime·sched);
		startm(p, false);
		return;
	}
	pidleput(p);
	runtime·unlock(&runtime·sched);
}

// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's returns false.
static void
startm(P *p, bool spinning)
{
	M *mp;
	void (*fn)(void);

	runtime·lock(&runtime·sched);
	if(p == nil) {
		p = pidleget();
		if(p == nil) {
			runtime·unlock(&runtime·sched);
			if(spinning)
				runtime·xadd(&runtime·sched.nmspinning, -1);
			return;
		}
	}
	mp = mget();
	runtime·unlock(&runtime·sched);
	if(mp == nil) {
		fn = nil;
		if(spinning)
			fn = mspinning;
		newm(fn, p);
		return;
	}
	if(mp->spinning)
		runtime·throw("startm: m is spinning");
	if(mp->nextp)
		runtime·throw("startm: m has p");
	mp->spinning = spinning;
	mp->nextp = p;
	runtime·notewakeup(&mp->park);
}

return from syscall

P がそのままだったらそのまま実行 P が消え去っていたら、待機中の P を探してそっちで実行 待機中のPが無かったら globrunqput() でグローバルの runq に登録し、 M が必要になるまで待つ.

// The goroutine g exited its system call.
// Arrange for it to run on a cpu again.
// This is called only from the go syscall library, not
// from the low-level system calls used by the runtime.
void
runtime·exitsyscall(void)
{
	P *p;

	// Check whether the profiler needs to be turned on.
	if(m->profilehz > 0)
		runtime·setprof(true);

	// Try to re-acquire the last P.
	if(m->p && m->p->status == Psyscall && runtime·cas(&m->p->status, Psyscall, Prunning)) {
		// There's a cpu for us, so we can run.
		m->mcache = m->p->mcache;
		m->p->m = m;
		m->p->tick++;
		g->status = Grunning;
		// Garbage collector isn't running (since we are),
		// so okay to clear gcstack and gcsp.
		g->gcstack = (uintptr)nil;
		g->gcsp = (uintptr)nil;
		return;
	}

	if(g->isbackground)  // do not consider blocked scavenger for deadlock detection
		inclocked(-1);
	// Try to get any other idle P.
	m->p = nil;
	if(runtime·sched.pidle) {
		runtime·lock(&runtime·sched);
		p = pidleget();
		runtime·unlock(&runtime·sched);
		if(p) {
			acquirep(p);
			g->gcstack = (uintptr)nil;
			g->gcsp = (uintptr)nil;
			return;
		}
	}

	// Call the scheduler.
	runtime·mcall(exitsyscall0);

	// Scheduler returned, so we're allowed to run now.
	// Delete the gcstack information that we left for
	// the garbage collector during the system call.
	// Must wait until now because until gosched returns
	// we don't know for sure that the garbage collector
	// is not running.
	g->gcstack = (uintptr)nil;
	g->gcsp = (uintptr)nil;
}

// runtime·exitsyscall slow path on g0.
// Failed to acquire P, enqueue gp as runnable.
static void
exitsyscall0(G *gp)
{
	P *p;

	gp->status = Grunnable;
	gp->m = nil;
	m->curg = nil;
	runtime·lock(&runtime·sched);
	p = pidleget();
	if(p == nil)
		globrunqput(gp);
	runtime·unlock(&runtime·sched);
	if(p) {
		acquirep(p);
		execute(gp);  // Never returns.
	}
	if(m->lockedg) {
		// Wait until another thread schedules gp and so m again.
		stoplockedm();
		execute(gp);  // Never returns.
	}
	stopm();
	schedule();  // Never returns.
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment