Skip to content

Instantly share code, notes, and snippets.

@303248153
Created November 10, 2017 09:29
Show Gist options
  • Save 303248153/6e1ca1a768ba3ced95345255a4180529 to your computer and use it in GitHub Desktop.
Save 303248153/6e1ca1a768ba3ced95345255a4180529 to your computer and use it in GitHub Desktop.
G - goroutine
M - worker thread, or machine (native thread)
P - processor, a resource that is required to execute Go code
M must have an associated P to execute Go code,
however it can be blocked or in a syscall without an associated P
Worker thread parking/unparking
A worker thread is considered spinning if
if it is out of local work and did not find work in global run queue/netpoller
Spinning threads do some spinning looking for work in per-P run queues before parking
If a spinning thread finds work it takes itself out of the spinning state and proceeds to execution
If it does not find work it takes itself out of the spinning state and then parks
If there is at least one spinning thread we don't unpark new threads when readying goroutine
if the last spinning thread finds work and stops spinning, it must unpark a new spinning thread
==============================================================================================
# program startup
asm_amd64.s runtime.rt0_go
setup g0's stack
g0.stack.stack_lo = rsp - (64 * 1024) + 104
g0.stack.stack_hi = rsp
g0.stack.stackguard0 = g0.stack.stack_lo
g0.stack.stackguard1 = g0.stack.stack_lo
setup flags by CPUID
call _cgo_init if present
CALL runtime·settls(SB)
set fs = m0.tls + 8 (access tls by -8(fs))
set tls.g = g0
set m0->g0 = g0
set g0->m = m0
CALL runtime.args(SB)
CALL runtime.osinit(SB)
CALL runtime.schedinit(SB)
procresize(procs)
for nprocs:
allp[index] = new p { id = index }
set p.mcache
_g_ = getg()
acquire(allp[0])
set _g_.m.p = p
set p.m = m
set p.status = _Prunning
set _g_.m.mcache = p.mcache
CALL runtime.newproc(SB)
newg := gfget(_p_) // grab a new g from free list
if newg == nil
newg = malg(_StackMin) // allocate a new g
casgstatus(newg, _Gidle, _Gdead) // set status to dead so gc won't scan it's tack
allgadd(newg) // add to all g list
totalSize := how many size needs before sp
sp := newg.stach.hi - totalSize
copy arguments after sp (positive)
execute write barrier for arguments that is pointer (bulkBarrierBitmap)
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
gostartcall(gobuf, fn.fn, unsafe.Pointer(fn))
newg.sched.sp -= sys.PtrSize
*newg.sched.sp = newg.sched.pc // return to goexit
newg.sched.pc = uintptr(fn) // function address
newg.sched.ctxt = fn // pointer of funcval
newg.gopc = callerpc // where call go
newg.startpc = fn.fn // function address here
newg.gcscanvalid = false
casgstatus(newg, _Gdead, _Grunnable)
newg.goid = _p_.goidcache++
runqput(_p_, newg, true)
if randomizeScheduler is enabled and rand % 2 == 0 then
try replace _p_.runnext to gp, if success then kick the old runnext to queue
put to local queue (_p_.runq), no lock is required
if failed (full) then runqputslow(_p_, gp, h, t)
batch := half of local queue
link gorountine in batch (schedlink)
put batch on global queue (globrunqputbatch), lock is required
if there any idle p and none spinning m and mainStarted
wakep()
if !atomic.Cas(&sched.nmspinning, 0, 1)
return // change spinning to 1, if old value isn't 0 then quick
startm(nil, true)
get a idle p
if no idle p and spinning (caller incremented nmspinning)
decrement nmspinning
mp := mget() // get a m from sched.midle
if mp == nil // no idle m
fn := spinning ? mspinning : nil
newm(fn, _p_) // create a new m
return
mp.spinning = spinning
mp.nextp.set(_p_)
notewakeup(&mp.park) // wake up idle m (thread)
CALL runtime.mstart(SB)
_g_.stackguard0 = _g_.stack.lo + _StackGuard // _StackGuard is 1760
_g_.stackguard1 = _g_.stackguard0
mstart1
gosave(&_g_.m.g0.sched)
sched.sp = caller's sp
sched.pc = caller's pc (return address)
sched.ret = 0
sched.bp = rbp
assert sched.ctxt == nil
sched.g = getg()
_g_.m.g0.sched.pc = ^uintptr(0) // make sure it is never used
asminit() // do nothing
minit()
call sigaltstack to set the signal stack
set signal mask to getg().m.sigmask (initSigmask)
install signal handlers
schedule() // find a runnable goroutine and execute it, never returns
if sched.gcwaiting != 0
gcstopm() // returns when the world is restarted
_p_ := releasep()
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0
notewakeup(&sched.stopnote)
stopm() // stops execution of the current m until new work is available
mput(_g_.m)
mp.schedlink = sched.midle
sched.midle.set(mp)
sched.nmidle++
notesleep(&_g_.m.park)
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr()) // nextp will set before wakeup
acquirep1(_p_)
_g_.m.p.set(_p_)
_p_.m.set(_g_.m)
_p_.status = _Prunning
_g_.m.mcache = _p_.mcache
_g_.m.nextp = 0
continue
if _g_.m.p.ptr().runSafePointFn != 0
runSafePointFn()
// run _p_.gcBgMarkWorker if exist
if gp == nil && gcBlackenEnabled != 0
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
// check the global runnable queue once in a while to ensure fairness
if gp == nil
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0
gp = globrunqget(_g_.m.p.ptr(), 1)
// get g from local queue
if gp == nil
gp, inheritTime = runqget(_g_.m.p.ptr())
// finds a runnable goroutine to execute
// tries to steal from other P's, get g from global queue, poll network
if gp == nil
gp, inheritTime = findrunnable() // blocks until work is available
// get g from local queue
// get g from global queue
// get g from poll network
// if all other p is idle then stop
// if nmspinning*2 >= pidle then stop
// set m is spinning
// steal half gs from other p (see runqsteal)
// if there g then return it
// stop:
// release p
// put p to sched.pidle (see pidleput)
// set m is not spinning
// decrement nmspinning first
// check all runqueues once again
// check for idle-priority GC work again
// check poll network again
// call stopm()
// reset spinning
if _g_.m.spinning
resetspinning()
_g_.m.spinning = false
sched.nmspinning--
if nmspinning == 0 && sched.npidle > 0
wakep()
if gp.lockedm != nil
startlockedm(gp) // hands off own p to the locked m (g want to use that m)
_p_ := releasep()
mp.nextp.set(_p_)
notewakeup(&mp.park)
stopm()
continue
execute(gp, inheritTime)
_g_ := getg()
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime
_g_.m.p.ptr().schedtick++
_g_.m.curg = gp
gp.m = _g_.m
gogo(&gp.sched)
get ctxt from gp.sched.ctxt
if ctxt exists
call writebarrierptr_prewrite(ctxt, 0) // execute wb, but not assign dst
set tls.g = sched.g
set rsp = sched.rsp
set rax = sched.ret
set rdx = ctxt
set rbp = sched.rbp
clear sched.rsp
clear sched.ret
clear sched.ctxt
clear sched.rbp
jmp to sched.pc
// return to goexit
runtime.newm
mp := allocm(_p_, fn)
_g_ := getg()
_g_.m.locks++ // disable GC because it can be called from sysmon
if _g_.m.p == 0
acquirep(_p_) // temporarily borrow p for mallocs in this function
mp := new(m)
mp.mstartfn = fn
mcommoninit(mp)
mp.g0 = malg(8192 * sys.StackGuardMultiplier)
newg := new(g)
systemstack(func() {
newg.stack = stackalloc(uint32(stacksize))
s = mheap_.allocManual(npage, &memstats.stacks_inuse)
s.state = _MSpanManual
})
newg.stackguard0 = newg.stack.lo + _StackGuard
newg.stackguard1 = ^uintptr(0)
mp.g0.m = mp
if _p_ == _g_.m.p.ptr()
releasep()
_g_.m.locks--
mp.nextp.set(_p_)
mp.sigmask = initSigmask
execLock.rlock()
newosproc(mp, unsafe.Pointer(mp.g0.stack.hi))
// disable signals during clone, so that the new thread starts
// with signals disabled. it will enable them in minit.
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
execLock.runlock()
runtime.main
// allow newproc to start new Ms.
mainStarted = true
systemstack(func() { newm(sysmon, nil) })
// some init functions need main os thread
lockOSThread()
runtime_init() // go forcegchelper() (trigger gc by time interval)
gcenable() // go bgsweep() then set memstats.enablegc = true
main_init() // runtime.throwinit, fmt.init, time.init
unlockOSThread()
main_main()
exit(0)
runtime.goexit
goexit1()
mcall(goexit0) // switch g0's stack then call fn(g)
casgstatus(gp, _Grunning, _Gdead)
gp.m = nil
gp.lockedm = nil
_g_.m.lockedg = nil
gp.paniconfault = false
gp._defer = nil
gp._panic = nil
gp.writebuf = nil
gp.waitreason = ""
gp.param = nil
gp.labels = nil
gp.timer = nil
gp.gcscanvalid = true
dropg()
_g_.m.curg.m = nil
_g_.m.curg = nil
_g_.m.locked = 0
gfput(_g_.m.p.ptr(), gp) // put on gfree list
stksize := gp.stack.hi - gp.stack.lo
if stksize != _FixedStack
// non-standard stack size - free it.
stackfree(gp.stack)
gp.stack.lo = 0
gp.stack.hi = 0
gp.stackguard0 = 0
gp.schedlink.set(_p_.gfree)
_p_.gfree = gp
_p_.gfreecnt++
if _p_.gfreecnt >= 64
while _p_.gfreecnt >= 32
// move g to global sched.gfreeNoStack or sched.gfreeStack
schedule() // find a runnable goroutine and execute it, never returns
runtime.mcall
// save state in g.sched, make it resumable
g.sched.pc = return address
g.sched.sp = rsp
g.sched.g = g
g.sched.bp = rbp
// switch to m.g0
tls.g = m.g0
rsp = tls.g.sp
push g // argument
rdx = fn // context
// call function, it must never return
call fn.fn
==============================================================================================
# questions
how g0 initialized?
it's all 0s on program started, then see runtime.rt0_go
how m0 initialized?
it's all 0s on program started, then see runtime.rt0_go
what systemstack do?
g = getg
if g is m.g0, then call fn
if g isnt m.g0
set g.sched.pc = runtime.systemstack_switch
set g.sched.sp = rsp
set g.sched.g = g
set g.sched.bp = rbp
set g(tls) to m.g0
set rsp = g.sched.sp
sub rsp, 8
mov runtime.mstart, (rsp) // stop traceback
call fn
set g(tls) to g.m.curg
set rsp = g.sched.sp
set g.sched.sp = 0
what's park?
park mean save g's state, then let m to do other things
g will resume later when some condition is meet
==============================================================================================
# rules
### m0 and g0:
m0 is the first thread after program executed.
g0 is a dummy g contains system stack for schedule, each m has a g0, global g0 is m0.g0.
rt0_go will create a new g for runtime.main (see newproc).
m0 will take the g of runtime.main and execute it (see mstart).
runtime.main executes main.main and exit(0).
### workflow of m
the creation
m0 is main os thread started at first
other m will start from runtime.newm
the entrypoin
the entrypoint of m is mstart
mstart will do the initialization and call schedule
the schedule
wait for gc if sched.gcwaiting
call _g_.m.p.ptr().runSafePointFn
[find g] run _p_.gcBgMarkWorker(goroutine) if exist
[find g] check the global runnable queue once in a while to ensure fairness
[find g] get g from local queue
[find g] finds a runnable goroutine to execute, blocks until work is available
[find g] get g from local queue
[find g] get g from global queue
[find g] get g from poll network
[spinning+] set m is spinning if it isn't
[fing g] steal half gs from other p (see runqsteal)
[find g] check _p_.gcBgMarkWorker again
[find g] check global queue again
release p and put p to sched.pidle
[spinning-] set m is not spinning
[find g] check all runqueues once again
[find g] check for idle-priority GC work again
[find g] check poll network again
stop m
put m to sched.midle
wait for signal _g_.m.park
wakeup
acquire the p from wakeupper
goto top
[resetspinning] set m is not spinning if it is, and
if no other m is spinning but there idle p then call wakep
wakep will start a idle m or create a new m in spinning state
if g require a specific m, then hand off own p to the locked m and find next g
[execute] execute g
[gogo] set registers from g.sched
[gogo] jump to g.sched.pc
[gogo] return to goexit
the goexit
when a goroutine finished, it will return to runtime.goexit
goexit will switch back to m.g0's stack
then reset g's state and put g to free list
and then call schedule again
### sleep and wakeup of m
when m found there no g can execute, it will sleep.
when m is going to exeucte a g,
if there no other m is spinning but idle p exists,
it will wakeup exist m or create a new m for spinning.
when a new g is created,
if there no m is spinning but idle p exists,
it will wakeup exist m or create a new m for spinning.
delicate dance:
[avoid]
m get a g | check sched.nmspinning
decrement nmspinning |
check g, there no g |
| submit g to local work queue
m sleep | no m to execute this g
[so do]
decrement nmspinning | submit g to local work queue
memory barrier | memory barrier
check all per-P work queues | check sched.nmspinning
m sleep | if no spinning but idle p then start m
worse case is both will start a new m, not so bad.
### channel
runtime.chansend1
chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c)))
lock(&c.lock)
// found a waiting receiver, channel must be empty, send to the receiver directly
if sg := c.recvq.dequeue(); sg != nil
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
if sg.elem != nil // pointer use to store received object
sendDirect(c.elemtype, sg, ep) // *sg.elem = *ep
dst := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1) // resume g
systemstack(func() { ready(gp, traceskip, true) })
_g_ := getg()
_g_.m.locks++ // disable preemption because it can be holding p in a local var
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0
wakep()
_g_.m.locks--
return true
// space is available in the channel buffer. enqueue the element to send.
if c.qcount < c.dataqsiz
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz
c.sendx = 0
c.qcount++
return true
// block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.g = gp
mysg.c = c
gp.waiting = mysg
c.sendq.enqueue(mysg)
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
// puts the current goroutine into a waiting state and unlocks the lock
gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
mp := acquirem()
gp := mp.curg
mp.waitlock = lock
mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
mcall(park_m) // it will save state to g.sched then switch to g0
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if _g_.m.waitunlockf != nil
fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // schedule it back, never returns
schedule()
// someone woke us up
gp.waiting = nil
mysg.c = nil
releaseSudog(mysg)
return true
runtime.chanrecv1
chanrecv(c, elem, true)
lock(&c.lock)
// found a waiting sender, either queue is full or there no buffer
if sg := c.sendq.dequeue(); sg != nil
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
if c.dataqsiz == 0
// there no buffer, copy data from sender
if ep != nil
recvDirect(c.elemtype, sg, ep) // *ep = *sg.elem
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
else
// queue is full. Take the item at the head of the queue
// make the sender enqueue its item at the tail of the queue
// since the queue is full, those are both the same slot
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil
typedmemmove(c.elemtype, ep, qp)
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz
c.recvx = 0
c.sendx = c.recvx
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
goready(gp, skip+1) // resume g, same as previous
return true, true
// receive directly from queue
if c.qcount > 0
qp := chanbuf(c, c.recvx)
if ep != nil
typedmemmove(c.elemtype, ep, qp)
c.recvx++
if c.recvx == c.dataqsiz
c.recvx = 0
c.qcount--
unlock(&c.lock)
return true, true
// no sender available: block on this channel
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
// puts the current goroutine into a waiting state and unlocks the lock
// same as previous
// someone woke us up
gp.waiting = nil
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
runtime.closechan
lock(&c.lock)
c.closed = 1
var glist *g
// release all readers
while true
sg := c.recvq.dequeue()
if sg == nil
break
if sg.elem != nil
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
gp := sg.g
gp.param = nil // that mean channel is closed
gp.schedlink.set(glist)
glist = gp
// release all writers (they will panic)
while true
sg := c.sendq.dequeue()
if sg == nil
break
sg.elem = nil
gp := sg.g
gp.param = nil // that mean channel is closed
gp.schedlink.set(glist)
glist = gp
unlock(&c.lock)
// ready all Gs now that we've dropped the channel lock
while glist != nil
gp := glist
glist = glist.schedlink.ptr()
gp.schedlink = 0
goready(gp, 3)
### blocking syscall
entersyscallblock(0)
ok := notetsleep_internal(n, ns, nil, 0)
exitsyscall(0)
entersyscallblock
_g_ := getg()
_g_.m.locks++ // see comment in entersyscall
_g_.throwsplit = true
_g_.stackguard0 = stackPreempt // see comment in entersyscall
_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
_g_.sysblocktraced = true
_g_.m.p.ptr().syscalltick++
pc := getcallerpc(unsafe.Pointer(&dummy))
sp := getcallersp(unsafe.Pointer(&dummy))
save(pc, sp) // save to g.sched
_g_.syscallsp = _g_.sched.sp
_g_.syscallpc = _g_.sched.pc
casgstatus(_g_, _Grunning, _Gsyscall)
systemstack(entersyscallblock_handoff)
handoffp(releasep()) // hands off P from syscall or locked M
// handoffp must start an M in any situation where
// findrunnable would return a G to run on _p_
// if it has local work, start it straight away
if !runqempty(_p_) || sched.runqsize != 0
startm(_p_, false)
return
// if it has GC work, start it straight away
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_)
startm(_p_, false)
return
// no local work, check that there are no spinning/idle M's,
// otherwise our help is not required
if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 &&
atomic.Cas(&sched.nmspinning, 0, 1) // exchange success
// no spinning m, no idle p, still create a new spinning m
startm(_p_, true)
lock(&sched.lock)
// handle stw
if sched.gcwaiting != 0
_p_.status = _Pgcstop
sched.stopwait--
if sched.stopwait == 0
notewakeup(&sched.stopnote)
unlock(&sched.lock)
return
// handle safepoint
if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0)
sched.safePointFn(_p_)
sched.safePointWait--
if sched.safePointWait == 0
notewakeup(&sched.safePointNote)
// check global queue again
if sched.runqsize != 0
unlock(&sched.lock)
startm(_p_, false)
return
// if this is the last running P and nobody is polling network,
// need to wakeup another M to poll network
if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0
unlock(&sched.lock)
startm(_p_, false)
return
// put p to sched.pidle
pidleput(_p_)
unlock(&sched.lock
// resave for traceback during blocked call.
save(getcallerpc(unsafe.Pointer(&dummy)), getcallersp(unsafe.Pointer(&dummy)))
_g_.m.locks--
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment