TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

golang RWmutex

读和读不隔离,写和写通过mutex隔离,写和读通过sema互相配合。

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"fmt"
"sync"
"time"
)

// Main function
func main() {
rw := new(sync.RWMutex)
var wg sync.WaitGroup

//写锁
rw.Lock()

for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
//读锁
defer func() {
wg.Done()
rw.RUnlock()
}()
rw.RLock()
fmt.Println(i)
}(i)
}

time.Sleep(2 * time.Second)
fmt.Println("print:")
rw.Unlock()
wg.Wait()

}


struct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
//当前读操作的个数
readerCount int32 // number of pending readers
//被写阻塞的读g的个数
readerWait int32 // number of departing readers
}

const rwmutexMaxReaders = 1 << 30

RLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
func (rw *RWMutex) RLock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
  • RLock给readerCount加1
  • readerCount为负数,表示有写在进行,这里就阻塞了

RUnlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}

func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
//读读是可以并行的,这里只需要唤醒阻塞的写
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
  • RUnlock给readerCount减1
  • r < 0表示写锁已经加上了还没释放,那么现在的读g就是被阻塞的,所以readerWait要减1,等减到0的时候就可以唤醒写了(阻塞在Lock下的写)
  • readerWait=0的时候写才会被唤醒

Lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// First, resolve competition with other writers.
rw.w.Lock()
// Announce to readers there is a pending writer.
//readerCount为负数,阻塞其他的新的读
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
//r就是原来的readerCount,这些读都被写操作阻塞了,记录在readerWait
//等待被读锁唤醒,每唤醒一个读,readerWait-1
//这里说明当前有读在进行中,所以写只能阻塞等待被读唤醒。
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
  • 写和写通过rw.w.Lock()隔离,写和读通过sema隔离,读和读不隔离
  • 所有在运行的读都RUnlock了写才被唤醒

Unlock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}

// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
//唤醒所有阻塞的读
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
//将锁让给其他的写
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
  • 还原readerCount的值
  • 唤醒写加锁后新来的被阻塞的读
----------- ending -----------