从其他P队列

从其他队列盗取goroutine #

盗取Goroutine的过程就是:如果在所有运行队列找g都找不到,M就进入睡眠状态

findrunnable函数 #

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()

    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.

top:
    _p_ := _g_.m.p.ptr()

    //...

    // local runq
    //再次看一下本地运行队列是否有需要运行的goroutine
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // global runq
    //再看看全局运行队列是否有需要运行的goroutine
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    //...

	// Steal work from other P's.
	// 使用所有P与空闲的P进行比较,如果除了自己,其他的P都是休眠状态。那么整个系统都没有工作需要做了。
	procs := uint32(gomaxprocs)
	if atomic.Load(&sched.npidle) == procs-1 {
		// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
		// New work can appear from returning syscall/cgocall, network or timers.
		// Neither of that submits to local run queues, so no point in stealing.
		goto stop   //直接退出
	}
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
 	// 当这个M不是自旋状态,并且此时的二倍的自旋M大于当前正在工作的P; 说明此时有许多现在在寻找工作做.
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    if !_g_.m.spinning {
        //设置m的状态为spinning
        _g_.m.spinning = true
        //处于spinning状态的m数量加一
        atomic.Xadd(&sched.nmspinning, 1)
    }

    //从其它p的本地运行队列盗取goroutine
    for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }

stop:

    //...

    // Before we drop our P, make a snapshot of the allp slice,
    // which can change underfoot once we no longer block
    // safe-points. We don't need to snapshot the contents because
    // everything up to cap(allp) is immutable.
    allpSnapshot := allp

    // return P and block
    lock(&sched.lock)
 
    //...
 
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
   
    // 当前工作线程解除与p之间的绑定,准备去休眠
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    //把p放入空闲队列
    pidleput(_p_)
    unlock(&sched.lock)

// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        //m即将睡眠,状态不再是spinning
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again
    // 休眠之前再看一下是否有工作要做
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }

    ......
    //休眠
    stopm()
    goto top
}

随机遍历从其他的工作线程对应P上盗取G #

  1. 尽量减少M的自旋状态时间,只有在盗取的时候才把spinning标志位设为true,盗取退出后把spinning标志位重新设置为false,
	if !_g_.m.spinning {
		_g_.m.spinning = true
		atomic.Xadd(&sched.nmspinning, 1)
    }
  1. 随机遍历全局P中的P,如果有goroutines,就偷盗一半来运行.

工作线程进入睡眠 #

src/runtime/proc.go:1910

停止执行当前的m,直到有新的工作。带着获得的P返回。

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("stopm holding locks")
	}
	if _g_.m.p != 0 {
		throw("stopm holding p")
	}
	if _g_.m.spinning {
		throw("stopm spinning")
	}

	lock(&sched.lock)
	mput(_g_.m)  //全局的sched.midle空闲队列
	unlock(&sched.lock)
	notesleep(&_g_.m.park) //进入睡眠
	noteclear(&_g_.m.park)
	acquirep(_g_.m.nextp.ptr())
	_g_.m.nextp = 0
}

note是go runtime实现的一次性睡眠和唤醒机制.

这里的notesleep( *note)是一个抽象函数,根据不同的平台或系统有不同的实现.

  • dragonfly freebsd linux[src/runtime/lock_futex.go]
    • futex系统调用
      • futexsleep函数
  • aix darwin nacl netbsd openbsd plan9 solaris window[src/runtime/lock_sema.go]
  • js,was[src/runtime/lock_js.go]
    • 不支持 从上面可以看出,note层增加对特定平台的支持,就可以复用上层代码.

notesleep #

func notesleep(n *note) {
	gp := getg()
	if gp != gp.m.g0 {
		throw("notesleep not on g0")
	}
	ns := int64(-1)
	if *cgo_yield != nil {
		// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
		ns = 10e6
	}
	for atomic.Load(key32(&n.key)) == 0 { // --------here
		gp.m.blocked = true
		futexsleep(key32(&n.key), 0, ns) //进入睡眠
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield, nil)
		}
		gp.m.blocked = false
	}
}

从这里可以看出来:如果futexsleep()退出,但是检查note.key还是为0,那么又会进入睡眠,for atomic.Load(key32(&n.key)) == 0 { // --------here,

并不是其它工作线程唤醒了这个线程,所以我们知道当其他线程唤醒这个线程,需要改下这个线程对应的m结构体中note.key字段

type m struct {
    //...
	park          note
    //...
}