TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

golang runtime sema

go实现的sleep和wakeup,底层是gopark和goready,需要结合调度来看。其他同步手段基本都与这里有关。版本go/1.15.2/libexec/src/runtime/sema.go

struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}

//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}

// Prime to not correlate(相关) with any user patterns.
const semTabSize = 251

var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}

type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.

g *g

next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)

// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.

acquiretime int64
releasetime int64
ticket uint32

// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool

parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}

semtable是大小251的数组,数组元素存放了semaRoot,每个semaRoot都是一个树堆,key是锁的地址elem,优先级是生成的一个随机数ticket.并且相同地址的elem不是存在树堆里面的,而是通过和树堆里相同的key组成单向链表串起来的(waitlink指向链表下一个元素,waittail指向链表最后的一个元素)。sudog.g保存了阻塞在锁上面的g,即goroutine的信息,在唤醒的时候会用到。


cansemacquire

1
2
3
4
5
6
7
8
9
10
11
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}

treap是存放等待队列的,所以如果能直接取到锁的值可以修改的话,就不用阻塞也就不用入队了,这里是easy case.


semacquire1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
gp := getg() //记录当前的G信息
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}

// Easy case.
//尝试原子操作-1
if cansemacquire(addr) {
return
}

// Harder case:
// increment waiter count
// try cansemacquire one more time, return if succeeded
// enqueue itself as a waiter
// sleep
// (waiter descriptor is dequeued by signaler)
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lockWithRank(&root.lock, lockRankRoot)
// Add ourselves to nwait to disable "easy case" in semrelease.
//nwaiter+1,让其他cas失效
atomic.Xadd(&root.nwait, 1)
// Check cansemacquire to avoid missed wakeup.
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// Any semrelease after the cansemacquire knows we're waiting
// (we set nwait above), so go to sleep.
//s加到treap里面,然后gopark睡眠
root.queue(addr, s, lifo)
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
releaseSudog(s)
}

semrelease1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semroot(addr)
//+1
atomic.Xadd(addr, 1)

// Easy case: no waiters?
// This check must happen after the xadd, to avoid a missed wakeup
// (see loop in semacquire).
//再次检查,刚add的看样子已经被其他g给acquire了
if atomic.Load(&root.nwait) == 0 {
return
}

// Harder case: search for a waiter and wake it.
lockWithRank(&root.lock, lockRankRoot)
if atomic.Load(&root.nwait) == 0 {
// The count is already consumed by another goroutine,
// so no need to wake up another goroutine.
unlock(&root.lock)
return
}
//出队,从treap/sudog链表删除
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // May be slow or even yield, so unlock first
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
//goready唤醒
readyWithTime(s, 5+skipframes)
if s.ticket == 1 && getg().m.locks == 0 {
// Direct G handoff
// readyWithTime has added the waiter G as runnext in the
// current P; we now call the scheduler so that we start running
// the waiter G immediately.
// Note that waiter inherits our time slice: this is desirable
// to avoid having a highly contended semaphore hog the P
// indefinitely. goyield is like Gosched, but it emits a
// "preempted" trace event instead and, more importantly, puts
// the current G on the local runq instead of the global one.
// We only do this in the starving regime (handoff=true), as in
// the non-starving case it is possible for a different waiter
// to acquire the semaphore while we are yielding/scheduling,
// and this would be wasteful. We wait instead to enter starving
// regime, and then we start to do direct handoffs of ticket and
// P.
// See issue 33747 for discussion.
//将当前g放到P的可运行队列,而不是全局的可运行队列
goyield()
}
}
}

reference

[1]https://github.com/cch123/golang-notes/blob/master/semaphore.md

[2]https://github.com/thinkboy/go-notes/blob/master/%E4%BB%8E%E6%BA%90%E7%A0%81%E8%A7%92%E5%BA%A6%E7%9C%8BGolang%E7%9A%84%E8%B0%83%E5%BA%A6.md

[3]数据结构与算法之美

----------- ending -----------