从其他队列盗取goroutine #
盗取Goroutine的过程就是:如果在所有运行队列找g都找不到,M就进入睡眠状态
findrunnable函数 #
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.
top:
_p_ := _g_.m.p.ptr()
//...
// local runq
//再次看一下本地运行队列是否有需要运行的goroutine
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
//再看看全局运行队列是否有需要运行的goroutine
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
//...
// Steal work from other P's.
// 使用所有P与空闲的P进行比较,如果除了自己,其他的P都是休眠状态。那么整个系统都没有工作需要做了。
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop //直接退出
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
// 当这个M不是自旋状态,并且此时的二倍的自旋M大于当前正在工作的P; 说明此时有许多现在在寻找工作做.
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
//设置m的状态为spinning
_g_.m.spinning = true
//处于spinning状态的m数量加一
atomic.Xadd(&sched.nmspinning, 1)
}
//从其它p的本地运行队列盗取goroutine
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
//...
// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp
// return P and block
lock(&sched.lock)
//...
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
// 当前工作线程解除与p之间的绑定,准备去休眠
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
//把p放入空闲队列
pidleput(_p_)
unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
//m即将睡眠,状态不再是spinning
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
// 休眠之前再看一下是否有工作要做
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
......
//休眠
stopm()
goto top
}
随机遍历从其他的工作线程对应P上盗取G #
- 尽量减少M的自旋状态时间,只有在盗取的时候才把spinning标志位设为true,盗取退出后把spinning标志位重新设置为false,
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
- 随机遍历全局P中的P,如果有goroutines,就偷盗一半来运行.
工作线程进入睡眠 #
src/runtime/proc.go:1910
停止执行当前的m,直到有新的工作。带着获得的P返回。
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) //全局的sched.midle空闲队列
unlock(&sched.lock)
notesleep(&_g_.m.park) //进入睡眠
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
note是go runtime
实现的一次性睡眠和唤醒机制.
这里的notesleep( *note)是一个抽象函数,根据不同的平台或系统有不同的实现.
- dragonfly freebsd linux[src/runtime/lock_futex.go]
- futex系统调用
- futexsleep函数
- futex系统调用
- aix darwin nacl netbsd openbsd plan9 solaris window[src/runtime/lock_sema.go]
- js,was[src/runtime/lock_js.go]
- 不支持 从上面可以看出,note层增加对特定平台的支持,就可以复用上层代码.
notesleep #
func notesleep(n *note) {
gp := getg()
if gp != gp.m.g0 {
throw("notesleep not on g0")
}
ns := int64(-1)
if *cgo_yield != nil {
// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
ns = 10e6
}
for atomic.Load(key32(&n.key)) == 0 { // --------here
gp.m.blocked = true
futexsleep(key32(&n.key), 0, ns) //进入睡眠
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
gp.m.blocked = false
}
}
从这里可以看出来:如果futexsleep()退出,但是检查note.key还是为0,那么又会进入睡眠,
for atomic.Load(key32(&n.key)) == 0 { // --------here
,并不是其它工作线程唤醒了这个线程,所以我们知道当其他线程唤醒这个线程,需要改下这个线程对应的m结构体中note.key字段
type m struct {
//...
park note
//...
}