golang运行时源码解读-Goroutine

前言

Goroutine 是 Go 语言的并发执行单元,是Go语言并发的基础。本文将从源码的角度,解读Goroutine的创建、销毁、状态转移等过程。

协程数据结构

type g struct {
// 栈参数。
// stack 描述了实际的栈内存:[stack.lo, stack.hi)。
// stackguard0 是在 Go 栈增长序言中比较的栈指针。
// 它通常是 stack.lo+StackGuard,但可以是 StackPreempt 以触发抢占。
// stackguard1 是在 //go:systemstack 栈增长序言中比较的栈指针。
// 它在 g0 和 gsignal 栈上是 stack.lo+StackGuard。
// 在其他协程栈上是 ~0,以触发对 morestackc 的调用(并崩溃)。
stack stack // 偏移量已知 runtime/cgo
stackguard0 uintptr // 偏移量已知 liblink
stackguard1 uintptr // 偏移量已知 liblink

_panic *_panic // 最内层的 panic - 偏移量已知 liblink
_defer *_defer // 最内层的 defer
m *m // 当前执行的 m; 偏移量已知 arm liblink
sched gobuf // 调度相关信息
syscallsp uintptr // 如果 status==Gsyscall,syscallsp = sched.sp 用于 gc
syscallpc uintptr // 如果 status==Gsyscall,syscallpc = sched.pc 用于 gc
syscallbp uintptr // 如果 status==Gsyscall,syscallbp = sched.bp 用于 fpTraceback
stktopsp uintptr // 栈顶的预期 sp,用于 traceback 检查
// param 是一个通用指针参数字段,用于在特定上下文中传递值,
// 这些上下文中找到其他存储参数的地方会很困难。它目前有四种用法:
// 1. 当一个通道操作唤醒一个阻塞的协程时,它将 param 设置为指向已完成的阻塞操作的 sudog。
// 2. 由 gcAssistAlloc1 用于向其调用者发信号,表明协程完成了 GC 周期。以其他方式这样做是不安全的,因为协程的栈可能在此期间已移动。
// 3. 由 debugCallWrap 用于将参数传递给一个新的协程,因为在运行时分配闭包是被禁止的。
// 4. 当一个 panic 被恢复并且控制返回到相应的帧时,param 可能指向一个 savedOpenDeferState。
param unsafe.Pointer
atomicstatus atomic.Uint32
stackLock uint32 // sigprof/scang 锁;TODO: 合并到 atomicstatus
goid uint64
schedlink guintptr
waitsince int64 // 大约阻塞的时间
waitreason waitReason // 如果 status==Gwaiting

preempt bool // 抢占信号,重复 stackguard0 = stackpreempt
preemptStop bool // 在抢占时转换为 _Gpreempted;否则,只是取消调度
preemptShrink bool // 在同步安全点缩小栈

// asyncSafePoint 表示 g 是否在异步安全点停止。这意味着栈上有没有精确指针信息的帧。
asyncSafePoint bool

paniconfault bool // 在意外的故障地址上 panic(而不是崩溃)
gcscandone bool // g 已扫描栈;受 status 中的 _Gscan 位保护
throwsplit bool // 不得分割栈
// activeStackChans 表示是否有未锁定的通道指向此协程的栈。如果为 true,栈复制需要获取通道锁以保护这些栈区域。
activeStackChans bool
// parkingOnChan 表示协程即将停放在 chansend 或 chanrecv 上。用于在栈缩小时发出不安全点信号。
parkingOnChan atomic.Bool
// inMarkAssist 表示协程是否在标记辅助中。由执行跟踪器使用。
inMarkAssist bool
coroexit bool // 协程切换时的参数

raceignore int8 // 忽略竞争检测事件
nocgocallback bool // 是否禁用从 C 回调
tracking bool // 是否跟踪此 G 的调度延迟统计
trackingSeq uint8 // 用于决定是否跟踪此 G
trackingStamp int64 // G 最后一次开始被跟踪的时间戳
runnableTime int64 // 可运行时间,在运行时清除,仅在跟踪时使用
lockedm muintptr
sig uint32
writebuf []byte
sigcode0 uintptr
sigcode1 uintptr
sigpc uintptr
parentGoid uint64 // 创建此协程的协程的 goid
gopc uintptr // 创建此协程的 go 语句的 pc
ancestors *[]ancestorInfo // 创建此协程的祖先协程信息(仅在 debug.tracebackancestors 时使用)
startpc uintptr // 协程函数的 pc
racectx uintptr
waiting *sudog // 此 g 正在等待的 sudog 结构(具有有效的 elem 指针);按锁顺序排列
cgoCtxt []uintptr // cgo 回溯上下文
labels unsafe.Pointer // 分析器标签
timer *timer // 缓存的 time.Sleep 定时器
sleepWhen int64 // 何时休眠
selectDone atomic.Uint32 // 我们是否参与了 select 并且有人赢得了比赛

// goroutineProfiled 表示此协程的栈在当前进行中的协程分析中的状态
goroutineProfiled goroutineProfileStateHolder

coroarg *coro // 协程切换期间的参数

// 每个 G 的跟踪器状态。
trace gTraceState

// 每个 G 的 GC 状态

// gcAssistBytes 是此 G 的 GC 辅助信用,以分配的字节数表示。如果这是正数,则 G 有信用可以分配 gcAssistBytes 字节而无需辅助。如果这是负数,则 G 必须通过执行扫描工作来纠正此问题。我们以字节为单位跟踪这个,以便在 malloc 热路径中快速更新和检查债务。辅助比率决定了这与扫描工作债务的对应关系。
gcAssistBytes int64
}

协程的创建

func newproc(fn *funcval) {
// 获取当前执行的协程
gp := getg()

// 获取当前的PC值
pc := getcallerpc()

// 切换到g0栈,执行具体的创建逻辑
systemstack(func() {
newg := newproc1(fn, gp, pc, false, waitReasonZero)

// 获取当前的p
pp := getg().m.p.ptr()
// 添加到p上的运行队列
runqput(pp, newg, true)


// 如果主线程已经启动,则唤醒一个p去执行协程(因为当前p可能没空,或者本地队列已经满了)
if mainStarted {
wakep()
}
})
}
// Create a new g in state _Grunnable (or _Gwaiting if parked is true), starting at fn.
// callerpc is the address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler. If parked is true, waitreason must be non-zero.
func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreason waitReason) *g {
if fn == nil {
fatal("go of nil func value")
}


mp := acquirem() // disable preemption because we hold M and P in local vars.
pp := mp.p.ptr()

// 尝试复用dead状态协程,从空闲列表中获取
newg := gfget(pp)
if newg == nil {
// 如果没有可以复用的,申请一个新的协程,并从堆中申请一块内存,初始化他的栈
newg = malg(stackMin)
// 暂时设置成消亡态去禁止gc扫描
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}

if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}

totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
// 栈对齐
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
}

// 清除内存
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))

// 设置调度相关字段
newg.sched.sp = sp
newg.stktopsp = sp
// 确保协程执行结束后,跳转到goexit
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.parentGoid = callergp.goid
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)

// 设置入口函数
newg.startpc = fn.fn

// 如果是系统Goroutine
if isSystemGoroutine(newg, false) {
sched.ngsys.Add(1)
} else {
// 对用户栈执行profile相关处理
if mp.curg != nil {
newg.labels = mp.curg.labels
}
if goroutineProfile.active {
// A concurrent goroutine profile is running. It should include
// exactly the set of goroutines that were alive when the goroutine
// profiler first stopped the world. That does not include newg, so
// mark it as not needing a profile before transitioning it from
// _Gdead.
newg.goroutineProfiled.Store(goroutineProfileSatisfied)
}
}

// 将一个栈添加到垃圾回收器的扫描任务中
gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo))

// 修改协程状态为运行态
var status uint32 = _Grunnable
casgstatus(newg, _Gdead, status)

if pp.goidcache == pp.goidcacheend {
// Sched.goidgen is the last allocated id,
// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
// At startup sched.goidgen=0, so main goroutine receives goid=1.
pp.goidcache = sched.goidgen.Add(_GoidCacheBatch)
pp.goidcache -= _GoidCacheBatch - 1
pp.goidcacheend = pp.goidcache + _GoidCacheBatch
}
newg.goid = pp.goidcache
pp.goidcache++
newg.trace.reset()
...
releasem(mp)

return newg
}

协程的消亡

在协程的函数返回后,会调用到这个函数

// The top-most function running on a goroutine
// returns to goexit+PCQuantum.
TEXT runtime·goexit(SB),NOSPLIT|TOPFRAME|NOFRAME,$0-0
BYTE $0x90 // NOP
CALL runtime·goexit1(SB) // does not return
// traceback from goexit1 must hit code range of goexit
BYTE $0x90 // NOP

// Finishes execution of the current goroutine.
func goexit1() {
if raceenabled {
racegoend()
}
trace := traceAcquire()
if trace.ok() {
trace.GoEnd()
traceRelease(trace)
}
// 切换到g0执行
mcall(goexit0)
}
// goexit continuation on g0.
func goexit0(gp *g) {
// 销毁相关的数据结构
gdestroy(gp)

// 开启一轮新的调度
schedule()
}
func gdestroy(gp *g) {
mp := getg().m
pp := mp.p.ptr()


// 切换协程状态为dead
casgstatus(gp, _Grunning, _Gdead)
gcController.addScannableStack(pp, -int64(gp.stack.hi-gp.stack.lo))
if isSystemGoroutine(gp, false) {
sched.ngsys.Add(-1)
}
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
mp.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = waitReasonZero
gp.param = nil
gp.labels = nil
gp.timer = nil


// gc相关
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// Flush assist credit to the global pool. This gives
// better information to pacing if the application is
// rapidly creating an exiting goroutines.
assistWorkPerByte := gcController.assistWorkPerByte.Load()
scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))
gcController.bgScanCredit.Add(scanCredit)
gp.gcAssistBytes = 0
}

// 从M中移除g指针
dropg()

if GOARCH == "wasm" { // no threads yet on wasm
gfput(pp, gp)
return
}

if locked && mp.lockedInt != 0 {
print("runtime: mp.lockedInt = ", mp.lockedInt, "\n")
throw("exited a goroutine internally locked to the OS thread")
}

// 添加到gfree列表,加入后,可以在创建新协程的时候复用
gfput(pp, gp)
if locked {
// The goroutine may have locked this thread because
// it put it in an unusual kernel state. Kill it
// rather than returning it to the thread pool.

// Return to mstart, which will release the P and exit
// the thread.
if GOOS != "plan9" { // See golang.org/issue/22227.
gogo(&mp.g0.sched)
} else {
// Clear lockedExt on plan9 since we may end up re-using
// this thread.
mp.lockedExt = 0
}
}
}

协程的状态及转移


const (
_Gidle = iota // 0
_Grunnable // 1
_Grunning // 2
_Gsyscall // 3
_Gwaiting // 4
_Gmoribund_unused // 5
_Gdead // 6
_Genqueue_unused // 7
_Gcopystack // 8
_Gpreempted // 9
_Gscan = 0x1000
_Gscanrunnable = _Gscan + _Grunnable // 0x1001
_Gscanrunning = _Gscan + _Grunning // 0x1002
_Gscansyscall = _Gscan + _Gsyscall // 0x1003
_Gscanwaiting = _Gscan + _Gwaiting // 0x1004
_Gscanpreempted = _Gscan + _Gpreempted // 0x1009
)
状态常量 描述
_Gidle 0 协程刚被分配,尚未初始化(未加入运行队列)。
_Grunnable 1 协程在运行队列中等待执行,未运行用户代码,栈未被占用。
_Grunning 2 协程正在执行用户代码,栈由其独占,已绑定M(OS线程)和P(逻辑处理器),不在运行队列中。
_Gsyscall 3 协程正在执行系统调用,未运行用户代码,栈由其独占,已绑定M但可能解绑P。
_Gwaiting 4 协程被运行时阻塞(如通道操作、锁、time.Sleep),栈可能被其他协程部分访问(需同步)。
_Gdead 6 协程未使用(刚退出或待回收),可能无栈,由M或空闲列表管理。
_Gcopystack 8 协程的栈正在被移动(扩容或缩容),未运行代码,栈由移动它的协程管理。
_Gpreempted 9 协程被抢占(如调度器或GC触发),等待被重新调度,需通过CAS操作转换为_Grunnable
_Gscanrunnable 0x1001 复合状态。协程处于_Grunnable状态,同时GC正在扫描其栈。
_Gscanrunning 0x1002 复合状态。协程处于_Grunning状态,GC要求其主动扫描自己的栈(短暂阻塞状态转换)。
_Gscansyscall 0x1003 复合状态。协程处于_Gsyscall状态,同时GC正在扫描其栈。
_Gscanwaiting 0x1004 复合状态。协程处于_Gwaiting状态,同时GC正在扫描其栈。
_Gscanpreempted 0x1009 复合状态。协程处于_Gpreempted状态,同时GC正在扫描其栈。

几个主要的状态转移如下图所示

alt text

作者

deepwzh

发布于

2024-12-05

更新于

2025-02-15

许可协议

评论