goroutine被挑选运行策略

1.schedule函数 #

一轮调度:找到可运行的goroutine并执行它。即由g0->g, 当g0在m上运行时,找到一个新的goroutine,然后m运行新的goroutine的过程。

  1. findRunnable:找到可运行的goroutine。
  2. execute函数:执行找到的goroutine。
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    _g_ := getg()
    //...
top:
    //...
    var gp *g
    var inheritTime bool
    //...
    if gp == nil {
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { //schedtick就是调度次数,如果能被61整除且全局的Goroutine队列不为空就尝试获取
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)     //全局运行队列中获取goroutine
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())  //本地运行队列(当前线程)中获取goroutine
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
    if gp == nil {
        // 则调用findrunnable函数从其它工作线程的运行队列中偷取,如果偷取不到,则当前工作线程进入睡眠,
        // 直到获取到需要运行的goroutine之后findrunnable函数才会返回.
        */
        gp, inheritTime = findrunnable() // blocks until work is available
    }
    if _g_.m.spinning {
        resetspinning()
    }
    //...
    execute(gp, inheritTime)
}

分析schedule函数整个流程:

效果图

效果图

  • schedule函数按顺序分三步来分别来获取可运行的Goroutine: 全局G队列,本地G队列和其他P上面的G队列
    • 从全局运行G队列中寻找Goroutine: 因为全局上面拿Goroutine是需要加锁的,需尽量少操作全局队列,所以当一个P调度次数是61的倍数之后且全局的Goroutine队列不为空,才尝试在全局上寻找

      • 只是为了让全局队列中的g有机会得到调度
    • 从工作线程本地运行队列中寻找Goroutine: 其实就是从当前线程关联的P上的G队列拿

      • 因一个P在同一时刻只能与一个M关联,此时就不需要加锁
    • 其他P的运行队列中偷取Goroutine:调用findrunnable从其他工作线程的运行队列中偷取Goroutine

      • 在偷取之前,findrunnable函数会再尝试从全局运行G队列和当前P运行G队列中查找,看是否有符合的待运行的g

2.全局运行队列中获取goroutine #

关于全局G队列的结构,参看<前置知识.底层重要结构>一文.

// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
    if sched.runqsize == 0 {  //如果全局的G队列为空,直接返回nil
       return nil
    }
    /*
        - 如果gomaxprocs==1;sched.runqsize==1;
        - 导致(n==2)>sched.runqsize[全局的队列长度1];
        - 需要判断下,取两者少的数;
    */
    n := sched.runqsize/gomaxprocs + 1 //按照P的数量平分全局队列
    if n > sched.runqsize { //
        n = sched.runqsize //取两者少的数
    }
    if max > 0 && n > max {
        n = max //取两者少的数
    }
    if n > int32(len(_p_.runq))/2 {  //取本地队列的一半长最多
        n = int32(len(_p_.runq)) / 2
    }
    sched.runqsize -= n
    gp := sched.runq.pop() //返回第一个,其他放入本地队列
    n--
    for ; n > 0; n-- {
        gp1 := sched.runq.pop()
        runqput(_p_, gp1, false)
        /*
        这里如果put G到本地满了,它又会put到全局.
        If the run queue is full, runnext puts g on the global queue.
        */
    }
    return gp
    }

从全局G队列获取的数量,为下方四者中的最小值:

  • 传入获取g数量的实参(参数值需大于0,才会参与,否则忽略此值)
  • 当前全局G队列可运行g数量除以gomaxprocs(最大P数量),然后加一
  • 当前全局G队列可运行g数量
  • 当前P本地G队列队列大小的一半

从上面的注释很容易看懂,只有一个需要注意:

//...
    for ; n > 0; n-- {
        gp1 := sched.runq.pop()
        runqput(_p_, gp1, false) //...注意点...
        /*
        这里如果put G到本地满了,它又会put到全局.
        If the run queue is full, runnext puts g on the global queue.
        */
    }
//...

按上面的算法,取到了一定数量的g,放入本地队列时,本地队列满了,此时又会调用runqput函数又会把g移走:

runqput #

runqput尝试将 实参g(可调度运行的goroutine) 放入下面三者之中的某一个:

  • 本地P的runnext成员变量中: 如果当前P的runnext已有值gX,那需要把这个gX放入本地/全局G队列,否则直接返回
  • 本地P的G队列: 如果本地队列没满,放入直接返回
  • 全局G队列: 如果本地队列满了,则通过runqputslow函数把gp放入全局运行队列。
/*
- runqput尝试将g放在本地可运行队列中。
  1. >如果 next 为 false,runqput 将 g 加到可运行队列的尾部。
  2. 如果 next 为真,runqput 将 g 放在 _p_.runnext 槽中。
  3. 如果运行队列满了,runnext就把g放到全局队列中。
*/
func runqput(_p_ *p, gp *g, next bool) {
	if randomizeScheduler && next && fastrand()%2 == 0 {
		next = false
	}

	if next { //第二步所说的
	retryNext:
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

retry:
	h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
	t := _p_.runqtail
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
		return
	}
	if runqputslow(_p_, gp, h, t) {
		return
	}
	// the queue is not full, now the put above must succeed
	goto retry
}

randomizeScheduler为是否执行随机调度

runqputslow #

runqputslow把本地运行G队列的一半(在前面runqput判断本地队列已满)加一(实参 gp)移动到全局队列, 所以如果本地队列长度为256,那么最多可以移动128+1个

// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
	var batch [len(_p_.runq)/2 + 1]*g // 256/2+1 = 129个goroutine

	// First, grab a batch from local queue.
	n := t - h
	n = n / 2
	if n != uint32(len(_p_.runq)/2) {   // 得到现有队列中的一半G
		throw("runqputslow: queue is not full")
	}
	for i := uint32(0); i < n; i++ {
		batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
		/*
			 ---------
			|   |    |  
			 ---------
           head[1]   tail[2], TODO 应该head, tail应该不是地址
		*/
	}
	if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume
		return false
	}
	batch[n] = gp

	if randomizeScheduler { // 打乱将要插入全局的G
		for i := uint32(1); i <= n; i++ {
			j := fastrandn(i + 1)
			batch[i], batch[j] = batch[j], batch[i]
		}
	}

	// Link the goroutines.
	for i := uint32(0); i < n; i++ {
		batch[i].schedlink.set(batch[i+1])  // TODO 通过schedlink来进行连接? 全局运行队列是一个链表,把将要放入全局的G,链接起来.
	}
	var q gQueue
	q.head.set(batch[0])
	q.tail.set(batch[n])

	// Now put the batch on global queue.
	lock(&sched.lock)
	globrunqputbatch(&q, int32(n+1))
	unlock(&sched.lock)
	return true
}

3.本地运行队列中获取goroutine #

关于本地G队列的结构,参看<前置知识.底层重要结构>一文.

3.1.runqget #

  • 实现是由runqget函数完成的:
    • 首先查看runnext成员是否为空,如果不为空则返回runnext所指的Goroutine,并把runnext成员清零;
    • 如果runnext为空,则继续从本地循环队列中查找Goroutine.
// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
	// If there's a runnext, it's the next G to run.
	for {
		next := _p_.runnext
		if next == 0 { // runnext是空的,break for loop,然后从队列里面拿
			break
		}
		if _p_.runnext.cas(next, 0) {
			return next.ptr(), true
		}
	}

	for {
		h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
		t := _p_.runqtail
		if t == h { // 如果头等于尾,证明是队列是空的
			return nil, false
		}
		gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
		if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
			return gp, false
		}
	}
}

4.盗取goroutine #

盗取Goroutine的过程就是:如果在所有运行队列找g都找不到,M就进入睡眠状态

4.1.findrunnable函数 #

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()

    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.

top:
    _p_ := _g_.m.p.ptr()

    //...

    // local runq
    //再次看一下本地运行队列是否有需要运行的goroutine
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // global runq
    //再看看全局运行队列是否有需要运行的goroutine
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    //...

	// Steal work from other P's.
	// 使用所有P与空闲的P进行比较,如果除了自己,其他的P都是休眠状态。那么整个系统都没有工作需要做了。
	procs := uint32(gomaxprocs)
	if atomic.Load(&sched.npidle) == procs-1 {
		// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
		// New work can appear from returning syscall/cgocall, network or timers.
		// Neither of that submits to local run queues, so no point in stealing.
		goto stop   //直接退出
	}
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
 	// 当这个M不是自旋状态,并且此时的二倍的自旋M大于当前正在工作的P; 说明此时有许多现在在寻找工作做.
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    if !_g_.m.spinning {
        //设置m的状态为spinning
        _g_.m.spinning = true
        //处于spinning状态的m数量加一
        atomic.Xadd(&sched.nmspinning, 1)
    }

    //从其它p的本地运行队列盗取goroutine
    for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }

stop:

    //...

    // Before we drop our P, make a snapshot of the allp slice,
    // which can change underfoot once we no longer block
    // safe-points. We don't need to snapshot the contents because
    // everything up to cap(allp) is immutable.
    allpSnapshot := allp

    // return P and block
    lock(&sched.lock)
 
    //...
 
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
   
    // 当前工作线程解除与p之间的绑定,准备去休眠
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    //把p放入空闲队列
    pidleput(_p_)
    unlock(&sched.lock)

// Delicate dance: thread transitions from spinning to non-spinning state,
// potentially concurrently with submission of new goroutines. We must
// drop nmspinning first and then check all per-P queues again (with
// #StoreLoad memory barrier in between). If we do it the other way around,
// another thread can submit a goroutine after we've checked all run queues
// but before we drop nmspinning; as the result nobody will unpark a thread
// to run the goroutine.
// If we discover new work below, we need to restore m.spinning as a signal
// for resetspinning to unpark a new worker thread (because there can be more
// than one starving goroutine). However, if after discovering new work
// we also observe no idle Ps, it is OK to just park the current thread:
// the system is fully loaded so no spinning threads are required.
// Also see "Worker thread parking/unparking" comment at the top of the file.
    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        //m即将睡眠,状态不再是spinning
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again
    // 休眠之前再看一下是否有工作要做
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }

    ......
    //休眠
    stopm()
    goto top
}

随机遍历 #

  1. 尽量减少M的自旋状态时间,只有在盗取的时候才把spinning标志位设为true,盗取退出后把spinning标志位重新设置为false,
	if !_g_.m.spinning {
		_g_.m.spinning = true
		atomic.Xadd(&sched.nmspinning, 1)
    }
  1. 随机遍历全局P中的P,如果有goroutines,就偷盗一半来运行.

进入睡眠 #

src/runtime/proc.go:1910

停止执行当前的m,直到有新的工作。带着获得的P返回。

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("stopm holding locks")
	}
	if _g_.m.p != 0 {
		throw("stopm holding p")
	}
	if _g_.m.spinning {
		throw("stopm spinning")
	}

	lock(&sched.lock)
	mput(_g_.m)  //全局的sched.midle空闲队列
	unlock(&sched.lock)
	notesleep(&_g_.m.park) //进入睡眠
	noteclear(&_g_.m.park)
	acquirep(_g_.m.nextp.ptr())
	_g_.m.nextp = 0
}

note是go runtime实现的一次性睡眠和唤醒机制.

这里的notesleep( *note)是一个抽象函数,根据不同的平台或系统有不同的实现.

  • dragonfly freebsd linux[src/runtime/lock_futex.go]
    • futex系统调用
      • futexsleep函数
  • aix darwin nacl netbsd openbsd plan9 solaris window[src/runtime/lock_sema.go]
  • js,was[src/runtime/lock_js.go]
    • 不支持 从上面可以看出,note层增加对特定平台的支持,就可以复用上层代码.

notesleep #

func notesleep(n *note) {
	gp := getg()
	if gp != gp.m.g0 {
		throw("notesleep not on g0")
	}
	ns := int64(-1)
	if *cgo_yield != nil {
		// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
		ns = 10e6
	}
	for atomic.Load(key32(&n.key)) == 0 { // --------here
		gp.m.blocked = true
		futexsleep(key32(&n.key), 0, ns) //进入睡眠
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield, nil)
		}
		gp.m.blocked = false
	}
}

从这里可以看出来:如果futexsleep()退出,但是检查note.key还是为0,那么又会进入睡眠,for atomic.Load(key32(&n.key)) == 0 { // --------here,

并不是其它工作线程唤醒了这个线程,所以我们知道当其他线程唤醒这个线程,需要改下这个线程对应的m结构体中note.key字段

type m struct {
    //...
	park          note
    //...
}