src/pkg/runtime proc.c asm_*.s
proc.c:
// Goroutine scheduler
// The scheduler's job is to distribute ready-to-run goroutines over worker threads.
//
// The main concepts are:
// G - goroutine.
// M - worker thread, or machine.
// P - processor, a resource that is required to execute Go code.
// M must have an associated P to execute Go code, however it can be
// blocked or in a syscall w/o an associated P.
//
// Design doc at http://golang.org/s/go11sched.
P の数の上限 = MAXGOPROCS
go したときに遊んでる M がない and P が余ってると M を作る
システムコール実行時に P を開放する. 実行可能な G があって、でも M が無い場合、 M を作ってやる
- package represented by · (middle dot)
- scheduler is in runtime package
- proc.c
- asm_*.s
newproc1 で G を一個作り、 PC に goexit を、 fnstart にその goroutine のトップの関数を設定する. その後 G を runqput でランキューに突っ込む
その後どっかの M が新しい Goroutine を実行しようと schedule() を呼び出し、 runqget() で取り出した G を execute() で実行する
execute() は fnstart が goexit なら fnstart の呼び出しから開始し、 そうでなければ gogo で PC から実行を再開する
proc.c:
// Create a new g running fn with narg bytes of arguments starting
// at argp and returning nret bytes of results. callerpc is the
// address of the go statement that created this. The new g is put
// on the queue of g's waiting to run.
G*
runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
byte *sp;
G *newg;
int32 siz;
//printf("newproc1 %p %p narg=%d nret=%d\n", fn, argp, narg, nret);
siz = narg + nret;
siz = (siz+7) & ~7;
// We could instead create a secondary stack frame
// and make it look like goexit was on the original but
// the call to the actual goroutine function was split.
// Not worth it: this is almost always an error.
if(siz > StackMin - 1024)
runtime·throw("runtime.newproc: function arguments too large for new goroutine");
if((newg = gfget(m->p)) != nil) {
if(newg->stackguard - StackGuard != newg->stack0)
runtime·throw("invalid stack in newg");
} else {
newg = runtime·malg(StackMin);
runtime·lock(&runtime·sched);
if(runtime·lastg == nil)
runtime·allg = newg;
else
runtime·lastg->alllink = newg;
runtime·lastg = newg;
runtime·unlock(&runtime·sched);
}
sp = (byte*)newg->stackbase;
sp -= siz;
runtime·memmove(sp, argp, narg);
if(thechar == '5') {
// caller's LR
sp -= sizeof(void*);
*(void**)sp = nil;
}
newg->sched.sp = (uintptr)sp;
newg->sched.pc = (byte*)runtime·goexit;
newg->sched.g = newg;
newg->fnstart = fn;
newg->gopc = (uintptr)callerpc;
newg->status = Grunnable;
newg->goid = runtime·xadd64(&runtime·sched.goidgen, 1);
if(raceenabled)
newg->racectx = runtime·racegostart(callerpc);
runqput(m->p, newg);
if(runtime·atomicload(&runtime·sched.npidle) != 0 && runtime·atomicload(&runtime·sched.nmspinning) == 0 && fn->fn != runtime·main) // TODO: fast atomic
wakep();
return newg;
}
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
static void
schedule(void)
{
G *gp;
if(m->locks)
runtime·throw("schedule: holding locks");
top:
if(runtime·gcwaiting) {
gcstopm();
goto top;
}
gp = runqget(m->p);
if(gp == nil)
gp = findrunnable();
if(m->spinning) {
m->spinning = false;
runtime·xadd(&runtime·sched.nmspinning, -1);
}
// M wakeup policy is deliberately somewhat conservative (see nmspinning handling),
// so see if we need to wakeup another M here.
if (m->p->runqhead != m->p->runqtail &&
runtime·atomicload(&runtime·sched.nmspinning) == 0 &&
runtime·atomicload(&runtime·sched.npidle) > 0) // TODO: fast atomic
wakep();
if(gp->lockedm) {
startlockedm(gp);
goto top;
}
execute(gp);
}
// Schedules gp to run on the current M.
// Never returns.
static void
execute(G *gp)
{
int32 hz;
if(gp->status != Grunnable) {
runtime·printf("execute: bad g status %d\n", gp->status);
runtime·throw("execute: bad g status");
}
gp->status = Grunning;
m->p->tick++;
m->curg = gp;
gp->m = m;
// Check whether the profiler needs to be turned on or off.
hz = runtime·sched.profilehz;
if(m->profilehz != hz)
runtime·resetcpuprofiler(hz);
if(gp->sched.pc == (byte*)runtime·goexit) // kickoff
runtime·gogocallfn(&gp->sched, gp->fnstart);
runtime·gogo(&gp->sched, 0);
}
asm_386.s
// void gogo(Gobuf*, uintptr)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), 7, $0
MOVL 8(SP), AX // return 2nd arg
MOVL 4(SP), BX // gobuf
MOVL gobuf_g(BX), DX
MOVL 0(DX), CX // make sure g != nil
get_tls(CX)
MOVL DX, g(CX)
MOVL gobuf_sp(BX), SP // restore SP
MOVL gobuf_pc(BX), BX
JMP BX
最初に goexit を PC に入れてたので、 goroutine のトップの関数から 戻ると goexit に飛ぶ goexit は goexit0 を呼び、 goexit0 は schedule() を呼び、次の goroutine が実行される
proc.c:
// Finishes execution of the current goroutine.
void
runtime·goexit(void)
{
if(raceenabled)
runtime·racegoend();
runtime·mcall(goexit0);
}
// runtime·goexit continuation on g0.
static void
goexit0(G *gp)
{
gp->status = Gdead;
gp->m = nil;
gp->lockedm = nil;
m->curg = nil;
m->lockedg = nil;
if(m->locked & ~LockExternal) {
runtime·printf("invalid m->locked = %d", m->locked);
runtime·throw("internal lockOSThread error");
}
m->locked = 0;
runtime·unwindstack(gp, nil);
gfput(m->p, gp);
schedule();
}
schedule() の中で呼ばれてたよね。
proc.c:
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
static G*
findrunnable(void)
{
G *gp;
P *p;
int32 i;
top:
if(runtime·gcwaiting) {
gcstopm();
goto top;
}
// local runq
gp = runqget(m->p);
if(gp)
return gp;
// global runq
if(runtime·sched.runqsize) {
runtime·lock(&runtime·sched);
gp = globrunqget(m->p);
runtime·unlock(&runtime·sched);
if(gp)
return gp;
}
// poll network
gp = runtime·netpoll(false); // non-blocking
if(gp) {
injectglist(gp->schedlink);
gp->status = Grunnable;
return gp;
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
if(!m->spinning && 2 * runtime·atomicload(&runtime·sched.nmspinning) >= runtime·gomaxprocs - runtime·atomicload(&runtime·sched.npidle)) // TODO: fast atomic
goto stop;
if(!m->spinning) {
m->spinning = true;
runtime·xadd(&runtime·sched.nmspinning, 1);
}
// random steal from other P's
for(i = 0; i < 2*runtime·gomaxprocs; i++) {
if(runtime·gcwaiting)
goto top;
p = runtime·allp[runtime·fastrand1()%runtime·gomaxprocs];
if(p == m->p)
gp = runqget(p);
else
gp = runqsteal(m->p, p);
if(gp)
return gp;
}
stop:
// return P and block
runtime·lock(&runtime·sched);
if(runtime·gcwaiting) {
runtime·unlock(&runtime·sched);
goto top;
}
if(runtime·sched.runqsize) {
gp = globrunqget(m->p);
runtime·unlock(&runtime·sched);
return gp;
}
p = releasep();
pidleput(p);
runtime·unlock(&runtime·sched);
if(m->spinning) {
m->spinning = false;
runtime·xadd(&runtime·sched.nmspinning, -1);
}
// check all runqueues once again
for(i = 0; i < runtime·gomaxprocs; i++) {
p = runtime·allp[i];
if(p && p->runqhead != p->runqtail) {
runtime·lock(&runtime·sched);
p = pidleget();
runtime·unlock(&runtime·sched);
if(p) {
acquirep(p);
goto top;
}
break;
}
}
// poll network
if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
if(m->p)
runtime·throw("findrunnable: netpoll with p");
if(m->spinning)
runtime·throw("findrunnable: netpoll with spinning");
gp = runtime·netpoll(true); // block until new work is available
runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
if(gp) {
runtime·lock(&runtime·sched);
p = pidleget();
runtime·unlock(&runtime·sched);
if(p) {
acquirep(p);
injectglist(gp->schedlink);
gp->status = Grunnable;
return gp;
}
injectglist(gp);
}
}
stopm();
goto top;
}
システムコール実行前に呼ばれる. entersyscallblock() するとすぐに handoffp() する。 entersyscall() すると Psyscall 状態になり、 sysmon スレッドに監視され、 一定時間経つと handoffp() される。 handoffp() は startm() して P に新しい M を割り当てる。
proc.c:
// The same as runtime·entersyscall(), but with a hint that the syscall is blocking.
#pragma textflag 7
void
·entersyscallblock(int32 dummy)
{
P *p;
if(m->profilehz > 0)
runtime·setprof(false);
// Leave SP around for gc and traceback.
g->sched.sp = (uintptr)runtime·getcallersp(&dummy);
g->sched.pc = runtime·getcallerpc(&dummy);
g->sched.g = g;
g->gcsp = g->sched.sp;
g->gcpc = g->sched.pc;
g->gcstack = g->stackbase;
g->gcguard = g->stackguard;
g->status = Gsyscall;
if(g->gcsp < g->gcguard-StackGuard || g->gcstack < g->gcsp) {
// runtime·printf("entersyscallblock inconsistent %p [%p,%p]\n",
// g->gcsp, g->gcguard-StackGuard, g->gcstack);
runtime·throw("entersyscallblock");
}
p = releasep();
handoffp(p);
if(g->isbackground) // do not consider blocked scavenger for deadlock detection
inclocked(1);
runtime·gosave(&g->sched); // re-save for traceback
}
// Hands off P from syscall or locked M.
static void
handoffp(P *p)
{
// if it has local work, start it straight away
if(p->runqhead != p->runqtail || runtime·sched.runqsize) {
startm(p, false);
return;
}
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
if(runtime·atomicload(&runtime·sched.nmspinning) + runtime·atomicload(&runtime·sched.npidle) == 0 && // TODO: fast atomic
runtime·cas(&runtime·sched.nmspinning, 0, 1)) {
startm(p, true);
return;
}
runtime·lock(&runtime·sched);
if(runtime·gcwaiting) {
p->status = Pgcstop;
if(--runtime·sched.stopwait == 0)
runtime·notewakeup(&runtime·sched.stopnote);
runtime·unlock(&runtime·sched);
return;
}
if(runtime·sched.runqsize) {
runtime·unlock(&runtime·sched);
startm(p, false);
return;
}
// If this is the last running P and nobody is polling network,
// need to wakeup another M to poll network.
if(runtime·sched.npidle == runtime·gomaxprocs-1 && runtime·atomicload64(&runtime·sched.lastpoll) != 0) {
runtime·unlock(&runtime·sched);
startm(p, false);
return;
}
pidleput(p);
runtime·unlock(&runtime·sched);
}
// Schedules some M to run the p (creates an M if necessary).
// If p==nil, tries to get an idle P, if no idle P's returns false.
static void
startm(P *p, bool spinning)
{
M *mp;
void (*fn)(void);
runtime·lock(&runtime·sched);
if(p == nil) {
p = pidleget();
if(p == nil) {
runtime·unlock(&runtime·sched);
if(spinning)
runtime·xadd(&runtime·sched.nmspinning, -1);
return;
}
}
mp = mget();
runtime·unlock(&runtime·sched);
if(mp == nil) {
fn = nil;
if(spinning)
fn = mspinning;
newm(fn, p);
return;
}
if(mp->spinning)
runtime·throw("startm: m is spinning");
if(mp->nextp)
runtime·throw("startm: m has p");
mp->spinning = spinning;
mp->nextp = p;
runtime·notewakeup(&mp->park);
}
P がそのままだったらそのまま実行 P が消え去っていたら、待機中の P を探してそっちで実行 待機中のPが無かったら globrunqput() でグローバルの runq に登録し、 M が必要になるまで待つ.
// The goroutine g exited its system call.
// Arrange for it to run on a cpu again.
// This is called only from the go syscall library, not
// from the low-level system calls used by the runtime.
void
runtime·exitsyscall(void)
{
P *p;
// Check whether the profiler needs to be turned on.
if(m->profilehz > 0)
runtime·setprof(true);
// Try to re-acquire the last P.
if(m->p && m->p->status == Psyscall && runtime·cas(&m->p->status, Psyscall, Prunning)) {
// There's a cpu for us, so we can run.
m->mcache = m->p->mcache;
m->p->m = m;
m->p->tick++;
g->status = Grunning;
// Garbage collector isn't running (since we are),
// so okay to clear gcstack and gcsp.
g->gcstack = (uintptr)nil;
g->gcsp = (uintptr)nil;
return;
}
if(g->isbackground) // do not consider blocked scavenger for deadlock detection
inclocked(-1);
// Try to get any other idle P.
m->p = nil;
if(runtime·sched.pidle) {
runtime·lock(&runtime·sched);
p = pidleget();
runtime·unlock(&runtime·sched);
if(p) {
acquirep(p);
g->gcstack = (uintptr)nil;
g->gcsp = (uintptr)nil;
return;
}
}
// Call the scheduler.
runtime·mcall(exitsyscall0);
// Scheduler returned, so we're allowed to run now.
// Delete the gcstack information that we left for
// the garbage collector during the system call.
// Must wait until now because until gosched returns
// we don't know for sure that the garbage collector
// is not running.
g->gcstack = (uintptr)nil;
g->gcsp = (uintptr)nil;
}
// runtime·exitsyscall slow path on g0.
// Failed to acquire P, enqueue gp as runnable.
static void
exitsyscall0(G *gp)
{
P *p;
gp->status = Grunnable;
gp->m = nil;
m->curg = nil;
runtime·lock(&runtime·sched);
p = pidleget();
if(p == nil)
globrunqput(gp);
runtime·unlock(&runtime·sched);
if(p) {
acquirep(p);
execute(gp); // Never returns.
}
if(m->lockedg) {
// Wait until another thread schedules gp and so m again.
stoplockedm();
execute(gp); // Never returns.
}
stopm();
schedule(); // Never returns.
}