overview

go 里面的 rwlock 是 write preferred 的,可以避免写锁饥饿。

读锁和写锁按照先来后到的规则持有锁,一旦有协程持有了写锁,后面的协程只能在写锁被释放后才能得到读锁。

同样,一旦有 >= 1 个协程写到了读锁,只有等这些读锁全部释放后,后面的协程才能拿到写锁。

下面了解一下 go 的 rwmutex 是如何实现的吧,下面的代码取自 go1.17.2/src/sync/rwmutex.go,并删减了 race 相关的代码。

ps: rwmutex 的代码挺短的,其实读源码也没那么可怕…

rwmutex 的结构

rwmutex 总体上是通过: 普通锁和条件变量来实现的

type rwmutex struct {
	w           mutex  // held if there are pending writers
	writersem   uint32 // semaphore for writers to wait for completing readers
	readersem   uint32 // semaphore for readers to wait for completing writers
	readercount int32  // number of pending readers
	readerwait  int32  // number of departing readers
}

lock

func (rw *rwmutex) lock() {
	// first, resolve competition with other writers.
	rw.w.lock()
	// announce to readers there is a pending writer.
	r := atomic.addint32(&rw.readercount, -rwmutexmaxreaders) + rwmutexmaxreaders
	// wait for active readers.
	if r != 0 && atomic.addint32(&rw.readerwait, r) != 0 {
		runtime_semacquiremutex(&rw.writersem, false, 0)
	}
}

unlock

const rwmutexmaxreaders = 1 << 30

func (rw *rwmutex) unlock() {
	// announce to readers there is no active writer.
	r := atomic.addint32(&rw.readercount, rwmutexmaxreaders)
	// unblock blocked readers, if any.
	for i := 0; i < int(r); i++ {
		runtime_semrelease(&rw.readersem, false, 0)
	}
	// allow other writers to proceed.
	rw.w.unlock()
}

rlock

func (rw *rwmutex) rlock() {
	if atomic.addint32(&rw.readercount, 1) < 0 {
		// a writer is pending, wait for it.
		runtime_semacquiremutex(&rw.readersem, false, 0)
	}
}

runlock

func (rw *rwmutex) runlock() {
	if r := atomic.addint32(&rw.readercount, -1); r < 0 {
		// outlined slow-path to allow the fast-path to be inlined
		rw.runlockslow(r)
	}
}
func (rw *rwmutex) runlockslow(r int32) {
	// a writer is pending.
	if atomic.addint32(&rw.readerwait, -1) == 0 {
		// the last reader unblocks the writer.
		runtime_semrelease(&rw.writersem, false, 1)
	}
}

q1: 多个协程并发拿读锁,如何保证这些读锁协程都不会被阻塞?

func (rw *rwmutex) rlock() {
	if atomic.addint32(&rw.readercount, 1) < 0 {
		// a writer is pending, wait for it.
		runtime_semacquiremutex(&rw.readersem, false, 0)
	}
}

拿读锁时,仅仅会增加 readercount,因此读锁之间是可以正常并发的

q2: 多个协程并发拿写锁,如何保证只会有一个协程拿到写锁?

func (rw *rwmutex) lock() {
	// first, resolve competition with other writers.
	rw.w.lock()
	// announce to readers there is a pending writer.
	r := atomic.addint32(&rw.readercount, -rwmutexmaxreaders) + rwmutexmaxreaders
	// wait for active readers.
	if r != 0 && atomic.addint32(&rw.readerwait, r) != 0 {
		runtime_semacquiremutex(&rw.writersem, false, 0)
	}
}

拿写锁时,会获取 w.lock,自然能保证同一时间只会有一把写锁

q3: 在读锁被拿到的情况下,新协程拿写锁,如果保证写锁现成会被阻塞?

func (rw *rwmutex) lock() {
	// first, resolve competition with other writers.
	rw.w.lock()
	// announce to readers there is a pending writer.
	r := atomic.addint32(&rw.readercount, -rwmutexmaxreaders) + rwmutexmaxreaders
	// wait for active readers.
	if r != 0 && atomic.addint32(&rw.readerwait, r) != 0 {
		runtime_semacquiremutex(&rw.writersem, false, 0)
	}
}

假设此时有 5 个协程拿到读锁,则 readercount = 5,假设 rwmutexmaxreaders = 100。

此时有一个新的协程 w1 想要拿写锁。

在执行

r := atomic.addint32(&rw.readercount, -rwmutexmaxreaders) + rwmutexmaxreaders

后, rw.readercount = -95,r = 5。

在执行

atomic.addint32(&rw.readerwait, r)

后,rw.readerwait = 5。

readerwait 记录了在获取写锁的这一瞬间有多少个协程持有读锁。这一瞬间之后,就算有新的协程尝试获取读锁,也只会增加 readercount ,而不会动到 readerwait。

之后执行 runtime_semacquiremutex() 睡在了 writersem 这个信号量上面。

q4: 在读锁被拿到的情况下,新协程拿写锁被阻塞,当旧有的读锁协程全部释放,如何唤醒等待的写锁协程

func (rw *rwmutex) runlock() {
	if r := atomic.addint32(&rw.readercount, -1); r < 0 {
		// outlined slow-path to allow the fast-path to be inlined
		rw.runlockslow(r)
	}
}
func (rw *rwmutex) runlockslow(r int32) {
	// a writer is pending.
	if atomic.addint32(&rw.readerwait, -1) == 0 {
		// the last reader unblocks the writer.
		runtime_semrelease(&rw.writersem, false, 1)
	}
}

继续上一步的场景,每当执行 runlock 时,readercount 都会减去1。当 readercount 为负数时,意味着有协程正在持有或者正在等待持有写锁。

之前的五个读协程中的四个,每次 runlock() 之后,readercount = -95 – 4 = -99,readerwait = 5 – 4 = 1。

当最后一个读协程调用 runlock() 之后,readercount 变成了 -100,readerwait 变成 0,此时会唤醒在 writersem 上沉睡的协程 w1。

q5: 在写锁被拿到的情况下,新协程拿读锁,如何让新协程被阻塞?

func (rw *rwmutex) rlock() {
	if atomic.addint32(&rw.readercount, 1) < 0 {
		// a writer is pending, wait for it.
		runtime_semacquiremutex(&rw.readersem, false, 0)
	}
}

继续上面的场景,readercount = -100 + 1 = -99 < 0。

新的读协程 r1 被沉睡在 readersem 下面。

假设此时再来一个读协程 r2,则 readercount = -98,依旧沉睡。

q6: 在写锁被拿到的情况下,新协程拿读锁,写锁协程释放,如何唤醒等待的读锁协程?

继续上面的场景,此时协程 w1 释放写锁

func (rw *rwmutex) unlock() {
	// announce to readers there is no active writer.
	r := atomic.addint32(&amp;rw.readercount, rwmutexmaxreaders)
	// unblock blocked readers, if any.
	for i := 0; i &lt; int(r); i++ {
		runtime_semrelease(&amp;rw.readersem, false, 0)
	}
	// allow other writers to proceed.
	rw.w.unlock()
}

在执行

atomic.addint32(&amp;rw.readercount, rwmutexmaxreaders)

后,r = readercount = -98 + 100 = 2,代表此时有两个读协程 r1 和 r2 在等待

ps: 如果此时有一些新的协程想要拿读锁,他会因为 readercount = 2 + 1 = 3 > 0 而顺利执行下去,不会被阻塞

之后 for 循环执行两次,将协程 r1 和 协程 r2 都唤醒了。

q7: 在写锁被拿到的情况下,有两个协程分别去抢读锁和写锁,当写锁被释放时,这两个协程谁会胜利?

func (rw *rwmutex) unlock() {
	// announce to readers there is no active writer.
	r := atomic.addint32(&amp;rw.readercount, rwmutexmaxreaders)
	// unblock blocked readers, if any.
	for i := 0; i &lt; int(r); i++ {
		runtime_semrelease(&amp;rw.readersem, false, 0)
	}
	// allow other writers to proceed.
	rw.w.unlock()
}

由于是先唤醒读锁,再调用 w.unlock() ,因此肯定是读协程先胜利!

认为写的比较巧妙的两个点

  • readercount 与 rwmutexmaxreaders 的纠缠

    通过 readercount + rwmutexmaxreaders 以及 readercount - rwmutexmaxreaders 这两个操作可以得知当前是否有协程等待/持有写锁以及当前等待/持有读锁的协程数量

  • readercount 与 readerwait 的纠缠

    在 lock() 时直接将 readercount 的值赋给 readerwait,在 readerwait = 0 而非 readercount = 0 是唤醒写协程,可以避免在 lock() 后来达到的读协程先于写协程被执行。

 到此这篇关于go rwmutex的实现示例的文章就介绍到这了,更多相关go rwmutex内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!