TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

golang sync WaitGroup

官方的demo演示如下,做了点小改动

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
)

type httpPkg struct{}

func (httpPkg) Get(url string, i int) { fmt.Println(i) }

var http httpPkg

func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
var i = 0
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string, i int) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url, i)
}(url, i)
i += 1
}
// Wait for all HTTP fetches to complete.
wg.Wait()
}

output:

2
0
1

source code

struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy

// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
// 64-bit atomic operations require 64-bit alignment, but 32-bit
// compilers do not ensure it. So we allocate 12 bytes and then use
// the aligned 8 bytes in them as state, and the other 4 as storage
// for the sema.
state1 [3]uint32
}

用[3]uint32保证在32/64位系统下都是12字节,然后4位保存计数,4位保存等待计数,4位保存sema.64位系统的原子操作是保证8个字节对齐的,32位不能保证。所以用此判断区别32/64位系统。

state

1
2
3
4
5
6
7
8
9
10
// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
//64 bit state1[0]=count, state1[1]=wait, state1[2]=sema
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
//32 bit state1[0]=sema, state1[1]=count, state1[2]=wait
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}

返回32/64位系统中count和sema的地址

Add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
//计数器是4字节的,这里用uint64存的数也只能是4字节大小,然后左移32位加给计数器
state := atomic.AddUint64(statep, uint64(delta)<<32)
//算出计数器的实际值
v := int32(state >> 32)
//截断成32位,表示waiter
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
//计数器小于0报错
if v < 0 {
panic("sync: negative WaitGroup counter")
}
//已经有goroutine在wait,且计数器等于0,就不能再add一个正数
//应该是计数器=0,正要唤醒wait的g,这时候就不要再add了,并发安全
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
//说明 v =0 && w > 0,需要唤醒所有waiter
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations(变动) of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity(合理) check to detect WaitGroup misuse(滥用).
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
//逐个唤醒
runtime_Semrelease(semap, false, 0)
}
}

Done

1
2
3
4
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}

Wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
for {
//同上
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
//=0就不用阻塞了
return
}
// Increment waiters count.
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
//阻塞
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}

总结

  • waitegroup主要用来做wait,等待一组goroutine结束。手动通过channel也能实现。这是抽象好的易用的形式,而且是用Semacquire实现的(另行总结)
  • add用来给计数器做增减,done就是add(-1).wait给waiter+1然后阻塞,计数器=0的时候唤醒所有waiter.
  • 实现上有特色的就是位操作,虽然能看懂。但是这样搞的实际价值以及动机还是有点模糊。
  • go语言函数参数都是值传递,所以waitgroup作为参数的时候要传递地址,不然就出问题了,来个demo测下

WaitGroup值传递

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"fmt"
"sync"
)

type httpPkg struct{}

func (httpPkg) Get(url string, i int) { fmt.Println(i) }

var http httpPkg

func test(wg sync.WaitGroup, url string, i int) {
defer wg.Done()
http.Get(url, i)
}

func main() {
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
var i = 0
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
// go func(url string, i int) {
// // Decrement the counter when the goroutine completes.
// defer wg.Done()
// // Fetch the URL.
// http.Get(url, i)
// }(url, i)
go test(wg, url, i)
i += 1
}
// Wait for all HTTP fetches to complete.
wg.Wait()
}

output:

1
2
3
4
5
6
7
8
9
10
11
12
13
2
0
1
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc000016088)
/usr/local/Cellar/go/1.15.2/libexec/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc000016080)
/usr/local/Cellar/go/1.15.2/libexec/src/sync/waitgroup.go:130 +0x65
main.main()
/Users/river/go/src/grpc/test/main.go:41 +0x178
exit status 2

值传递导致是一个新的WaitGroup对象,原来的Done就没生效。死锁了.这里需要注意的是nocopy的实现,就是定义了一空结构体,然后一个Lock方法。这种类型如果复制,go vet可以检测出来,vscode上也会直观提示的。

reference

[1]https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-sync-primitives/#waitgroup

[2]https://golang.design/under-the-hood/zh-cn/part4lib/ch15sync/waitgroup/

----------- ending -----------