1.schedule函数 #
一轮调度:找到可运行的goroutine并执行它。即由g0->g, 当g0在m上运行时,找到一个新的goroutine,然后m运行新的goroutine的过程。
- findRunnable:找到可运行的goroutine。
- execute函数:执行找到的goroutine。
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
_g_ := getg()
//...
top:
//...
var gp *g
var inheritTime bool
//...
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { //schedtick就是调度次数,如果能被61整除且全局的Goroutine队列不为空就尝试获取
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1) //全局运行队列中获取goroutine
unlock(&sched.lock)
}
}
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr()) //本地运行队列(当前线程)中获取goroutine
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
if gp == nil {
// 则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程进入睡眠,
// 直到获取到需要运行的goroutine之后findrunnable函数才会返回.
*/
gp, inheritTime = findrunnable() // blocks until work is available
}
if _g_.m.spinning {
resetspinning()
}
//...
execute(gp, inheritTime)
}
分析schedule函数整个流程:
- schedule函数按顺序分三步来分别来获取可运行的Goroutine: 全局G队列,本地G队列和其他P上面的G队列
从全局运行G队列中寻找Goroutine: 因为全局上面拿Goroutine是需要加锁的,需尽量少操作全局队列,所以当一个P调度次数是61的倍数之后且全局的Goroutine队列不为空,才尝试在全局上寻找
- 只是为了让全局队列中的g有机会得到调度
从工作线程本地运行队列中寻找Goroutine: 其实就是从当前线程关联的P上的G队列拿
- 因一个P在同一时刻只能与一个M关联,此时就不需要加锁
其他P的运行队列中偷取Goroutine:调用findrunnable从其他工作线程的运行队列中偷取Goroutine
- 在偷取之前,findrunnable函数会再尝试从全局运行G队列和当前P运行G队列中查找,看是否有符合的待运行的g
2.全局运行队列中获取goroutine #
关于全局G队列的结构,参看<前置知识.底层重要结构>一文.
// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
if sched.runqsize == 0 { //如果全局的G队列为空,直接返回nil
return nil
}
/*
- 如果gomaxprocs==1;sched.runqsize==1;
- 导致(n==2)>sched.runqsize[全局的队列长度1];
- 需要判断下,取两者少的数;
*/
n := sched.runqsize/gomaxprocs + 1 //按照P的数量平分全局队列
if n > sched.runqsize { //
n = sched.runqsize //取两者少的数
}
if max > 0 && n > max {
n = max //取两者少的数
}
if n > int32(len(_p_.runq))/2 { //取本地队列的一半长最多
n = int32(len(_p_.runq)) / 2
}
sched.runqsize -= n
gp := sched.runq.pop() //返回第一个,其他放入本地队列
n--
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false)
/*
这里如果put G到本地满了,它又会put到全局.
If the run queue is full, runnext puts g on the global queue.
*/
}
return gp
}
从全局G队列获取的数量,为下方四者中的最小值:
- 传入获取g数量的实参(参数值需大于0,才会参与,否则忽略此值)
- 当前全局G队列可运行g数量除以gomaxprocs(最大P数量),然后加一
- 当前全局G队列可运行g数量
- 当前P本地G队列队列大小的一半
从上面的注释很容易看懂,只有一个需要注意:
//...
for ; n > 0; n-- {
gp1 := sched.runq.pop()
runqput(_p_, gp1, false) //...注意点...
/*
这里如果put G到本地满了,它又会put到全局.
If the run queue is full, runnext puts g on the global queue.
*/
}
//...
按上面的算法,取到了一定数量的g,放入本地队列时,本地队列满了,此时又会调用runqput
函数又会把g
移走:
runqput #
runqput尝试将 实参g(可调度运行的goroutine) 放入下面三者之中的某一个:
- 本地P的runnext成员变量中: 如果当前P的runnext已有值gX,那需要把这个gX放入本地/全局G队列,否则直接返回
- 本地P的G队列: 如果本地队列没满,放入直接返回
- 全局G队列: 如果本地队列满了,则通过runqputslow函数把gp放入全局运行队列。
/*
- runqput尝试将g放在本地可运行队列中。
1. >如果 next 为 false,runqput 将 g 加到可运行队列的尾部。
2. 如果 next 为真,runqput 将 g 放在 _p_.runnext 槽中。
3. 如果运行队列满了,runnext就把g放到全局队列中。
*/
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}
if next { //第二步所说的
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}
randomizeScheduler为是否执行随机调度
runqputslow #
runqputslow把本地运行G队列的一半(在前面runqput判断本地队列已满)加一(实参 gp)移动到全局队列, 所以如果本地队列长度为256,那么最多可以移动128+1个
// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
var batch [len(_p_.runq)/2 + 1]*g // 256/2+1 = 129个goroutine
// First, grab a batch from local queue.
n := t - h
n = n / 2
if n != uint32(len(_p_.runq)/2) { // 得到现有队列中的一半G
throw("runqputslow: queue is not full")
}
for i := uint32(0); i < n; i++ {
batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
/*
---------
| | |
---------
head[1] tail[2], TODO 应该head, tail应该不是地址
*/
}
if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
return false
}
batch[n] = gp
if randomizeScheduler { // 打乱将要插入全局的G
for i := uint32(1); i <= n; i++ {
j := fastrandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}
// Link the goroutines.
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1]) // TODO 通过schedlink来进行连接? 全局运行队列是一个链表,把将要放入全局的G,链接起来.
}
var q gQueue
q.head.set(batch[0])
q.tail.set(batch[n])
// Now put the batch on global queue.
lock(&sched.lock)
globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock)
return true
}
3.本地运行队列中获取goroutine #
关于本地G队列的结构,参看<前置知识.底层重要结构>一文.
3.1.runqget #
- 实现是由
runqget
函数完成的:- 首先查看
runnext
成员是否为空,如果不为空则返回runnext
所指的Goroutine
,并把runnext
成员清零; - 如果
runnext
为空,则继续从本地循环队列中查找Goroutine
.
- 首先查看
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
for {
next := _p_.runnext
if next == 0 { // runnext是空的,break for loop,然后从队列里面拿
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
for {
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail
if t == h { // 如果头等于尾,证明是队列是空的
return nil, false
}
gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp, false
}
}
}
4.盗取goroutine #
盗取Goroutine的过程就是:如果在所有运行队列找g都找不到,M就进入睡眠状态
4.1.findrunnable函数 #
// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
// The conditions here and in handoffp must agree: if
// findrunnable would return a G to run, handoffp must start
// an M.
top:
_p_ := _g_.m.p.ptr()
//...
// local runq
//再次看一下本地运行队列是否有需要运行的goroutine
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
//再看看全局运行队列是否有需要运行的goroutine
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
//...
// Steal work from other P's.
// 使用所有P与空闲的P进行比较,如果除了自己,其他的P都是休眠状态。那么整个系统都没有工作需要做了。
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop //直接退出
}
// If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low.
// 当这个M不是自旋状态,并且此时的二倍的自旋M大于当前正在工作的P; 说明此时有许多现在在寻找工作做.
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
//设置m的状态为spinning
_g_.m.spinning = true
//处于spinning状态的m数量加一
atomic.Xadd(&sched.nmspinning, 1)
}
//从其它p的本地运行队列盗取goroutine
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
//...
// Before we drop our P, make a snapshot of the allp slice,
// which can change underfoot once we no longer block
// safe-points. We don't need to snapshot the contents because
// everything up to cap(allp) is immutable.
allpSnapshot := allp
// return P and block
lock(&sched.lock)
//...
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
// 当前工作线程解除与p之间的绑定,准备去休眠
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
//把p放入空闲队列
pidleput(_p_)
unlock(&sched.lock)
// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
wasSpinning := _g_.m.spinning
if _g_.m.spinning {
//m即将睡眠,状态不再是spinning
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// check all runqueues once again
// 休眠之前再看一下是否有工作要做
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
......
//休眠
stopm()
goto top
}
随机遍历 #
- 尽量减少M的自旋状态时间,只有在盗取的时候才把spinning标志位设为true,盗取退出后把spinning标志位重新设置为false,
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
- 随机遍历全局P中的P,如果有goroutines,就偷盗一半来运行.
进入睡眠 #
src/runtime/proc.go:1910
停止执行当前的m,直到有新的工作。带着获得的P返回。
// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) //全局的sched.midle空闲队列
unlock(&sched.lock)
notesleep(&_g_.m.park) //进入睡眠
noteclear(&_g_.m.park)
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
note是go runtime
实现的一次性睡眠和唤醒机制.
这里的notesleep( *note)是一个抽象函数,根据不同的平台或系统有不同的实现.
- dragonfly freebsd linux[src/runtime/lock_futex.go]
- futex系统调用
- futexsleep函数
- futex系统调用
- aix darwin nacl netbsd openbsd plan9 solaris window[src/runtime/lock_sema.go]
- js,was[src/runtime/lock_js.go]
- 不支持 从上面可以看出,note层增加对特定平台的支持,就可以复用上层代码.
notesleep #
func notesleep(n *note) {
gp := getg()
if gp != gp.m.g0 {
throw("notesleep not on g0")
}
ns := int64(-1)
if *cgo_yield != nil {
// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
ns = 10e6
}
for atomic.Load(key32(&n.key)) == 0 { // --------here
gp.m.blocked = true
futexsleep(key32(&n.key), 0, ns) //进入睡眠
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
gp.m.blocked = false
}
}
从这里可以看出来:如果futexsleep()退出,但是检查note.key还是为0,那么又会进入睡眠,
for atomic.Load(key32(&n.key)) == 0 { // --------here
,并不是其它工作线程唤醒了这个线程,所以我们知道当其他线程唤醒这个线程,需要改下这个线程对应的m结构体中note.key字段
type m struct {
//...
park note
//...
}