func(httpPkg)Get(url string, i int) { fmt.Println(i) }
var http httpPkg
funcmain() { var wg sync.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } var i = 0 for _, url := range urls { // Increment the WaitGroup counter. wg.Add(1) // Launch a goroutine to fetch the URL. gofunc(url string, i int) { // Decrement the counter when the goroutine completes. defer wg.Done() // Fetch the URL. http.Get(url, i) }(url, i) i += 1 } // Wait for all HTTP fetches to complete. wg.Wait() }
output:
2
0
1
source code
struct
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// A WaitGroup waits for a collection of goroutines to finish. // The main goroutine calls Add to set the number of // goroutines to wait for. Then each of the goroutines // runs and calls Done when finished. At the same time, // Wait can be used to block until all goroutines have finished. // // A WaitGroup must not be copied after first use. type WaitGroup struct { noCopy noCopy
// 64-bit value: high 32 bits are counter, low 32 bits are waiter count. // 64-bit atomic operations require 64-bit alignment, but 32-bit // compilers do not ensure it. So we allocate 12 bytes and then use // the aligned 8 bytes in them as state, and the other 4 as storage // for the sema. state1 [3]uint32 }
// Add adds delta, which may be negative, to the WaitGroup counter. // If the counter becomes zero, all goroutines blocked on Wait are released. // If the counter goes negative, Add panics. // // Note that calls with a positive delta that occur when the counter is zero // must happen before a Wait. Calls with a negative delta, or calls with a // positive delta that start when the counter is greater than zero, may happen // at any time. // Typically this means the calls to Add should execute before the statement // creating the goroutine or other event to be waited for. // If a WaitGroup is reused to wait for several independent sets of events, // new Add calls must happen after all previous Wait calls have returned. // See the WaitGroup example. func(wg *WaitGroup)Add(delta int) { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } //计数器是4字节的,这里用uint64存的数也只能是4字节大小,然后左移32位加给计数器 state := atomic.AddUint64(statep, uint64(delta)<<32) //算出计数器的实际值 v := int32(state >> 32) //截断成32位,表示waiter w := uint32(state) if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(semap)) } //计数器小于0报错 if v < 0 { panic("sync: negative WaitGroup counter") } //已经有goroutine在wait,且计数器等于0,就不能再add一个正数 //应该是计数器=0,正要唤醒wait的g,这时候就不要再add了,并发安全 if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { return } //说明 v =0 && w > 0,需要唤醒所有waiter // This goroutine has set counter to 0 when waiters > 0. // Now there can't be concurrent mutations(变动) of state: // - Adds must not happen concurrently with Wait, // - Wait does not increment waiters if it sees counter == 0. // Still do a cheap sanity(合理) check to detect WaitGroup misuse(滥用). if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. *statep = 0 for ; w != 0; w-- { //逐个唤醒 runtime_Semrelease(semap, false, 0) } }
Done
1 2 3 4
// Done decrements the WaitGroup counter by one. func(wg *WaitGroup)Done() { wg.Add(-1) }
// Wait blocks until the WaitGroup counter is zero. func(wg *WaitGroup)Wait() { statep, semap := wg.state() if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } for { //同上 state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } //=0就不用阻塞了 return } // Increment waiters count. if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(semap)) } //阻塞 runtime_Semacquire(semap) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } }
funcmain() { var wg sync.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.google.com/", "http://www.somestupidname.com/", } var i = 0 for _, url := range urls { // Increment the WaitGroup counter. wg.Add(1) // Launch a goroutine to fetch the URL. // go func(url string, i int) { // // Decrement the counter when the goroutine completes. // defer wg.Done() // // Fetch the URL. // http.Get(url, i) // }(url, i) go test(wg, url, i) i += 1 } // Wait for all HTTP fetches to complete. wg.Wait() }
output:
1 2 3 4 5 6 7 8 9 10 11 12 13
2 0 1 fatal error: all goroutines are asleep - deadlock!