脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服务器之家 - 脚本之家 - Golang - golang中sync.Mutex的实现方法

golang中sync.Mutex的实现方法

2022-09-13 10:59机智的小小帅 Golang

本文主要介绍了golang中sync.Mutex的实现方法,mutex 主要有两个 method: Lock() 和 Unlock(),文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

mutex 的实现思想

mutex 主要有两个 method: Lock() 和 Unlock()

Lock() 可以通过一个 CAS 操作来实现

?
1
2
3
4
5
6
7
8
func (m *Mutex) Lock() {
    for !atomic.CompareAndSwapUint32(&m.locked, 0, 1) {
    }
}
 
func (m *Mutex) Unlock() {
    atomic.StoreUint32(&m.locked, 0)
}

Lock() 一直进行 CAS 操作,比较耗 CPU。因此带来了一个优化:如果协程在一段时间内抢不到锁,可以把该协程挂到一个等待队列上,Unlock() 的一方除了更新锁的状态,还需要从等待队列中唤醒一个协程。

但是这个优化会存在一个问题,如果一个协程从等待队列中唤醒后再次抢锁时,锁已经被一个新来的协程抢走了,它就只能再次被挂到等待队列中,接着再被唤醒,但又可能抢锁失败...... 这个悲催的协程可能会一直抢不到锁,由此产生饥饿 (starvation) 现象。

饥饿现象会导致尾部延迟 (Tail Latency) 特别高。什么是尾部延迟?用一句话来说就是:最慢的特别慢!

如果共有 1000 个协程,假设 999 个协程可以在 1ms 内抢到锁,虽然平均时间才 2ms,但是最慢的那个协程却需要 1s 才抢到锁,这就是尾部延迟。

golang 中 mutex 的实现思想

?
1
2
➜  go version
go version go1.16.5 darwin/arm64

本次阅读的 go 源码版本为 go1.16.5。

golang 标准库里的 mutex 避免了饥饿现象的发生,先大致介绍一下 golang 的加锁和解锁流程,对后面的源码阅读有帮助。

锁有两种 mode,分别是 normal mode 和 starvation mode。初始为 normal mode,当一个协程来抢锁时,依旧是做 CAS 操作,如果成功了,就直接返回,如果没有抢到锁,它会做一定次数的自旋操作,等待锁被释放,在自旋操作结束后,如果锁依旧没有被释放,那么这个协程就会被放到等待队列中。如果一个处于等待队列中的协程一直都没有抢到锁,mutex 就会从 normal mode 变成 starvation mode,在 starvation mode 下,当有协程释放锁时,这个锁会被直接交给等待队列中的协程,从而避免产生饥饿线程。

除此之外,golang 还有一点小优化,当有协程正在自旋抢锁时,Unlock() 的一方不会从等待队列中唤醒协程,因为即使唤醒了,被唤醒的协程也抢不过正在自旋的协程。

下面正式开始阅读源码。

mutex 的结构以及一些 const 常量值

?
1
2
3
4
type Mutex struct {
    state int32
    sema  uint32
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken             
    mutexStarving
    mutexWaiterShift = iota // 3
 
    // Mutex fairness.
    //
    // Mutex can be in 2 modes of operations: normal and starvation.
    // In normal mode waiters are queued in FIFO order, but a woken up waiter
    // does not own the mutex and competes with new arriving goroutines over
    // the ownership. New arriving goroutines have an advantage -- they are
    // already running on CPU and there can be lots of them, so a woken up
    // waiter has good chances of losing. In such case it is queued at front
    // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
    // it switches mutex to the starvation mode.
    //
    // In starvation mode ownership of the mutex is directly handed off from
    // the unlocking goroutine to the waiter at the front of the queue.
    // New arriving goroutines don't try to acquire the mutex even if it appears
    // to be unlocked, and don't try to spin. Instead they queue themselves at
    // the tail of the wait queue.
    //
    // If a waiter receives ownership of the mutex and sees that either
    // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
    // it switches mutex back to normal operation mode.
    //
    // Normal mode has considerably better performance as a goroutine can acquire
    // a mutex several times in a row even if there are blocked waiters.
    // Starvation mode is important to prevent pathological cases of tail latency.
    starvationThresholdNs = 1e6
)

mutex 的状态是通过 state 来维护的,state 有 32 个 bit。

前面 29 个 bit 用来记录当前等待队列中有多少个协程在等待,将等待队列的协程数量记录为 waiterCount。

?
1
state >> mutexWaiterShift // mutexWaiterShift 的值为 3

第 30 个 bit 表示当前 mutex 是否处于 starvation mode,将这个 bit 记为 starvationFlag。

?
1
state & mutexStarving

第 31 个 bit 表示当前是否有协程正在 (第一次) 自旋,将这个 bit 记为 wokenFlag,woken 的意思也就是醒着,代表它不在等待队列上睡眠。

?
1
state & mutexWoken

第 32 个 bit 表示当前锁是否被锁了 (感觉有点绕口哈哈) ,将这个 bit 记为 lockFlag。

?
1
state & mutexLocked

用一个图来表示这些 bit

?
1
2
3
0 0 0 0 0 0 0 0 ... 0 0             0           0
                      |             |           |
waiterCount     starvationFlag  wokenFlag   lockFlag

sema 是一个信号量,它会被用来关联一个等待队列。

分别讨论几种 case 下,代码的执行情况。

Mutex 没有被锁住,第一个协程来拿锁

?
1
2
3
4
5
6
7
8
9
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        // ...
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

在 Mutex 没有被锁住时,state 的值为 0,此时第一个协程来拿锁时,由于 state 的值为 0,因此 CAS 操作会成功,CAS 操作之后的 state 的值变成 1 (lockFlag = 1) ,然后 return 掉,不会进入到 m.lockSlow() 里面。

Mutex 仅被协程 A 锁住,没有其他协程抢锁,协程 A 释放锁

?
1
2
3
4
5
6
7
8
9
10
11
func (m *Mutex) Unlock() {
    // ...
 
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

紧接上面,state 的值为 1,AddInt32(m.state,-1) 之后,state 的值变成了 0 (lockFlag = 0) ,new 的值为 0,然后就返回了。

Mutex 已经被协程 A 锁住,协程 B 来拿锁

?
1
2
3
4
5
6
7
8
9
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        // ...
        return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

因为 state 的值不为 0,CompareAndSwapInt32 会返回 false,所以会进入到 lockSlow() 里面

lockSlow()

首先看一下 lockSlow() 这个方法的全貌

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexwokenFlag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            new &^= mutexWoken
        }
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
 
    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}

第一步: doSpin (空转)

进入 for 循环后,会执行一个判断

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for {
    // Don't spin in starvation mode, ownership is handed off to waiters
    // so we won't be able to acquire the mutex anyway.
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // Active spinning makes sense.
        // Try to set mutexwokenFlag to inform Unlock
        // to not wake other blocked goroutines.
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()
        iter++
        old = m.state
        continue
    }
    // ...
}

runtime_canSpin(iter) 的作用是根据 iter 的值判断自否应该自旋下去。 (这个方法的实现可以在后面看到)

最初的几次判断,由于 iter 的值为 0,runtime_canSpin(iter) 会返回 true。因此

?
1
if old&amp;(mutexLocked|mutexStarving) == mutexLocked &amp;&amp; runtime_canSpin(iter)

这个判断会一直通过,由于 old>>mutexWaiterShift = 0 (waiterCount = 0) ,不满足第二个判断的条件,因此不会执行 CAS 操作和 awoke = true

接着就是执行 runtime_doSpin() 了,runtime_doSpin() 会进行一些空循环,消耗了一下 CPU 时间,然后就通过 continue 进入到下一次循环了。 (runtime_doSpin具体实现也可以在后面看到)

看到看到,这段代码不是用来抢锁的,而是用来等锁变成 unlock 状态的,它会空转一定的次数,期待在空转的过程中,锁被其他的协程释放。

runtime_doSpin()

?
1
2
3
4
5
6
7
// src/runtime/lock_sema.go
const active_spin_cnt = 30
//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
    procyield(active_spin_cnt)
}
?
1
2
3
4
5
6
7
8
# /src/runtime/asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ again
    RET

procyield() 会循环执行 PAUSE 指令。

runtime_canSpin()

runtime_canSpin() 的实现在 src/runtime/proc.go 里面,里面的判断比较多,但是我们只需要关注 i >= active_spin 这一个判断就行。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const active_spin     = 4
// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
    // sync.Mutex is cooperative, so we are conservative with spinning.
    // Spin only few times and only if running on a multicore machine and
    // GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    // As opposed to runtime mutex we don't do passive spinning here,
    // because there can be work on global runq or on other Ps.
    if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

一个小插曲

在利用断点来 debug 时,发现没办法 watch sync_runtime_canSpin() 内引用的一些全局变量,例如 active_spin,ncpu,sched.npidle 这些,所以我就大力出奇迹,强行修改源码在里面声明了几个局部变量,这下可以通过 watch 局部变量来得知全局变量的值了 (机智如我哈哈) 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func sync_runtime_canSpin(i int) bool {
    local_active_spin := active_spin
    local_ncpu := ncpu
    local_gomaxprocs := gomaxprocs
    npidle := sched.npidle
    nmspinning := sched.nmspinning
    if i >= local_active_spin || local_ncpu <= 1 ||local_gomaxprocs <= int32(npidle+nmspinning)+1 {
        return false
    }
    if p := getg().m.p.ptr(); !runqempty(p) {
        return false
    }
    return true
}

第二步: 根据旧状态来计算新状态

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
    new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
    new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}
if awoke {
    // The goroutine has been woken from sleep,
    // so we need to reset the flag in either case.
    // ...
    new &^= mutexWoken
}

这一段代码,是根据 old state 来计算 new state,有 4 个操作

  • set lockFlag: new |= mutexLocked
  • 增加 waiterCount: new += 1 << mutexWaiterShift
  • set starvationFlag: new |= mutexStarving
  • clear wokenFlag: new &^= mutexWoken

由于在这里我们只讨论 ”Mutex 已经被协程 A 锁住,协程 B 来拿锁“ 这种情况,可以分为两种 case

  • case1: 在第一步自旋的过程中,锁已经被释放了,此时 old state = 000000...000 (所有 bit 都为 0) ,经过这四个操作的洗礼后,lockFlag 被设置成了 1。
  • case2: 在第一步自旋结束后,锁还没有被释放,即 old state 此时为 00000000...001 (仅 lockFlag 为 1),经过这四个操作的洗礼后,waiterCounter = 1,lockFlag 也为 1。

第三步: 更新 state (抢锁)

?
1
2
3
4
5
6
7
8
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    if old&(mutexLocked|mutexStarving) == 0 {
        break // locked the mutex with CAS
    }
    // ...
} else {
    old = m.state
}

这一步会通过 CAS 操作将 mutex.state 更新为我们刚刚计算得到的 new state。如果 CAS 成功,且 old 处于未上锁的状态时,就直接利用 break 退出循环返回了 (也就是上面的 case1) 。如果 CAS 失败,将会更新 old state 的值,进行下一次循环,再重复一二三步;

如果是 case2 的话,情况会稍微复杂一点

?
1
2
3
4
5
6
7
8
9
10
11
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // ...
    // If we were already waiting before, queue at the front of the queue.
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
        waitStartTime = runtime_nanotime()
    }
    
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    // ...
}

主要就是通过 runtime_SemacquireMutex() 把自己放进了等待队列里面,之后 runtime 不会再调度该协程,直到协程被唤醒。

关于 runtime_SemacquireMutex() 的实现,我暂时就不追究下去了,再追究下去就没完没了啦。

Mutex 被协程 A 锁住,协程 B 来抢锁但失败被放入等待队列,此时协程 A 释放锁

?
1
2
3
4
5
6
7
8
9
func (m *Mutex) Unlock() {
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}

紧接上回,最初 state 的值为 00000000000...0001001 (waiterCount = 1, lockFlag = 1)。执行完 AddInt32(&m.state, -mutexLocked) 后,变成了 0000...001000 (waiterCount = 1) ,new 的值也为 0000...001000,接着就进入到 unlockSlow 里面了。

unlockSlow()

看看 unlockSlow() 的全貌

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter, and yield
        // our time slice so that the next waiter can start to run immediately.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
        runtime_Semrelease(&m.sema, true, 1)
    }
}

此时 old >> mutexWaiterShift = 0000...0001 ≠ 0, 所以不会直接返回

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
old := new
for {
    // If there are no waiters or a goroutine has already
    // been woken or grabbed the lock, no need to wake anyone.
    // In starvation mode ownership is directly handed off from unlocking
    // goroutine to the next waiter. We are not part of this chain,
    // since we did not observe mutexStarving when we unlocked the mutex above.
    // So get off the way.
    if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
        return
    }
    // Grab the right to wake someone.
    new = (old - 1<<mutexWaiterShift) | mutexWoken
    if atomic.CompareAndSwapInt32(&m.state, old, new) {
        runtime_Semrelease(&m.sema, false, 1)
        return
    }
    old = m.state
}

接着计算 new = 0000...1000 - 0000...1000 = 0000...0000,waiterCount 由 1 变成了 0。之后进行 CAS 操作,如果 CAS 成功,则从等待队列中唤醒一个 goroutine。

Mutex 被协程 A 锁住,协程 B 来抢锁但失败被放入等待队列,此时协程 A 释放锁,协程 B 被唤醒

让我们会视线切到 lockSlow 的后半截。

?
1
2
3
4
5
6
7
8
9
const starvationThresholdNs = 1e6
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // ...
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    // ...
    iter = 0
}

当协程 B 从 runtime_SemacquireMutex 处醒来后,会根据该协程的等待的时间来判断是否饥饿了。这里我们先假设此时还没有饥饿,后面会详细讨论饥饿时的情况。之后会将 iter 重置为 0,接着就进行下一次的循环了,由于 iter 已经被重置为 0 了,所以在下一次循环时,sync_runtime_doSpin(iter) 会返回 true

由于此时 state 已经变成了 0 了,所以在下一次循环里可以畅通无阻的拿到锁。

饥饿情况下的解锁行为: starvationFlag 的作用

设想这样一种情况:goroutine A 拿到锁,goroutine B 抢锁失败,被放入等待队列。goroutine A 释放锁,goroutine B 被唤醒,但是正当它抢锁时,锁被新来的 goroutine C 抢走了... 连续好几次,每当 goroutine B 要抢锁时,锁都被其他协程抢先一步拿走。直到某一次,goroutine B 再次被唤醒后执行

?
1
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

它就进入饥饿模式 (starvation mode) 啦!

?
1
2
3
4
5
6
7
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
    new |= mutexStarving
}

之后通过 CAS 操作将饥饿标志设置到了 mutex.state 里面,然后它就又被放到等待队列中了。

?
1
atomic.CompareAndSwapInt32(&m.state, old, new)

Unlock()

视角切换到 Unlock() 这一边

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (m *Mutex) Unlock() {
    // ...
    // Fast path: drop lock bit.
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}
 
func (m *Mutex) unlockSlow(new int32) {
    // ...
    if new&mutexStarving == 0 {
        // ...
        for {
            // ...
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            // ...
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter, and yield
        // our time slice so that the next waiter can start to run immediately.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
        runtime_Semrelease(&m.sema, true, 1)
    }
}

在 unlockSlow() 中,此时 new&mutexStarving != 0,所以会直接进入到 else 分支内,调用 runtime_Semrelease() 方法,但要注意 else 分支内runtime_Semrelease() 的参数和 if 分支的参数不一样,在这里 runtime_Semrelease(&m.sema, true, 1) 起到的作用是唤醒了等待队列中的第一个协程并立马调度该协程 (runtime_Semrelease() 方法的详解在后面 )。

同时正如注释所说,在 Unlock() 中由于进行了 atomic.AddInt32(&m.state, -mutexLocked) 操作,所以 mutex.state 的 lockFlag 是为 0 的,但是没关系,starvationFlag 是为 1 的,所以会依旧被认为是锁住的状态。

Lock()

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (m *Mutex) Lock() {
    // ...
    m.lockSlow()
}
 
func (m *Mutex) lockSlow() {
    // ...
    for {
        // ...
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // ...
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // ...
            old = m.state
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                // ...
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
    // ...
}

视角再次切换到 Lock() 这边,饥饿的 goroutine 被唤醒并调度后,首先执行 old = m.state, 此时 old 的 starvationFlag = 1。

之后就正如注释所说,它会尝试修复 mutex.state 的"不一致" (inconsistent) 状态。

修复工作主要做了三件事情:

  • 在 starvation mode 下的 Unlock() 没有将 waitterCount - 1, 所以这里需要给 mutexWaiter 减 1

  • 将 state 的 locked flag 置为 1

  • 如果该 goroutine 没有饥饿或者是等待队列中的最后一个 goroutine 的话,清理 starvationFlag

这三件事情通过 atomic.AddInt32(&m.state, delta) 一步到位。

runtime_Semrelease()

?
1
2
3
4
5
6
7
8
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

handoff 就是传球的意思,handoff 为 false 时,仅仅唤醒等待队列中第一个协程,但是不会立马调度该协程;当 handoff 为 true 时,会立马调度被唤醒的协程,此外,当 handoff = true 时,被唤醒的协程会继承当前协程的时间片。具体例子,假设每个 goroutine 的时间片为 2ms,gorounte A 已经执行了 1ms,假设它通过 runtime_Semrelease(handoff = true) 唤醒了 goroutine B,则 goroutine B 剩余的时间片为 2 - 1 = 1ms。

饥饿模式下新来的 goroutine 的加锁行为: starvationFlag 的作用

如果在饥饿模式下,有新的 goroutine 来请求锁,它会执行下面这些步骤

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (m *Mutex) lockSlow() {
    // ...
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // ...
            runtime_doSpin()
        }
        new := old
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        // ...
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // ..
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // ...
        } else {
            // ...
        }
    }
    // ...
}

由于 old&(mutexLocked|mutexStarving) != mutexLocked ,所以它不会自旋。

由于 old&mutexStarving != 0,所以它不会 set lockFlag。

由于 old&(mutexLocked|mutexStarving) != 0,所以它 增加 waiterCount。

可以看到,它实际上就做了增加 waiterCount 这一个操作,之后通过 CAS 更新 state 的状态,更新完成之后就跑去等待队列睡觉去了。

因此在饥饿状态下,新的来争抢锁的 goroutine 是不会去抢锁 (set lockFlag) 的,它们只会登记一下 (waiterCount + 1) ,然后乖乖加入到等待队列里面。

当有协程正在自旋时的解锁行为: wokenFlag 的作用

wokenFlag 是在 lockSlow() 里面被设置的,wokenFlag 为 1 时,表示此时有协程正在进行自旋。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (m *Mutex) lockSlow() {
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexwokenFlag to inform Unlock
            // to not wake other blocked goroutines.
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        // ...
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // ...
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
            // ...
            awoke = true
            iter = 0
        }
        // ...
    }
    // ...
}

当一个新来的协程 (从未被放到等待队列中) 在第一次自旋时,wokenFlag 的设置逻辑为:

?
1
2
3
4
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
}

但是当协程从等待队列中被唤醒后自旋时,却 lockSlow()找不到设置 wokenFlag 的逻辑,为何?因为这段逻辑被放到了 unlockSlow 里面了。

视线切换到 unlockSlow() 那一边

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (m *Mutex) unlockSlow(new int32) {
    // ...
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                // 当 mutexwokenFlag 被设置时,会直接 return
                // 不会去等待队列唤醒 goroutine
                return
            }
            // Grab the right to wake someone.
            // 这个地方会设置 wokenFlag 哦
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            old = m.state
        }
    } else {
        // ...
    }
}

可以看到,当有协程正在自旋时 (wokenFlag = 1) ,不会从等待队列唤醒协程,这样就避免了等待队列上的协程加入竞争,当然,正在自旋中的协程之间彼此之间还是会竞争的;如果 wokenFlag = 0,则会从等待队列中唤醒一个协程,在唤醒之前会将 wokenFlag 设置为 1,这样协程被唤醒后就不用再去设置 wokenFlag 了,妙呀!

为什么当有协程在自旋时,不要去等待队列中唤醒协程呢?协程从被唤醒到被调度 (在 CPU 上面执行) 是要花时间的,等真正自旋时 mutex 早就被抢走了。

协程从等待队列被唤醒后如果还是没有抢到锁,会被放到队列首部还是尾部?

但是是头部,代码如下:

?
1
2
3
4
5
6
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&amp;m.sema, queueLifo, 1)

复杂情景分析

基于上面的逻辑来分析一下复杂的逻辑吧!

假设有协程 g1,g2,g3,g4,g5,g6, 共同争抢一把锁 m

一开始 g1 拿到锁

owner: g1 waitqueue: null

g2 开始抢锁,没有抢到,被放到等待队列

owner: g1 waitqueue: g2

g1 释放锁,g2 从等待队列中被唤醒

owner: null waitqueue: null

此时 g3 也开始抢锁,g2 没有抢过,又被放回等待队列

owner: g3 waitqueue: g2

g4 开始抢锁,没有抢到,被放到等待队列

owner: g3 waitqueue: g2, g4

g3 释放锁,g2 被唤醒

owner: null waitqueue: g4

此时 g5 开始抢锁,g2 没有抢过,又被放回等待队列首部

owner: g5 waitqueue: g2, g4

g6 开始抢锁,正在自旋中

owner: g5 waitqueue: g2, g4 wokenFlag: 1 spinning: g6

g5 释放锁,由于此时有协程正在自旋,因此不会去等待队列中唤醒协程,锁被 g6 轻松抢到

owner: g6 waitqueue: g2, g4 wokenFlag: 0 spinning: null

g6 释放锁,g2 被唤醒,此时 g7 开始抢锁,g2 没有抢过,又被放回等待队列首部,但是 g2 由于太久没有抢到锁,进入饥饿模式了

owner: g7 waitqueue: g2(饥饿), g4 starvationFlag: 1

g8 来抢锁,由于处于饥饿状态,g8 会被直接放在等待队列尾部

owner: g7 waitqueue: g2(饥饿), g4, g8 starvationFlag: 1

g7 释放锁,由于处于饥饿状态,会直接唤醒 g2 并调度它

owner: g2 waitqueue: g4, g8 starvationFlag: 1

g2 执行完毕,释放锁,注意此刻依旧是饥饿状态,直接调度 g4,g4 苏醒后,发现它自己没有饥饿,于是 clear starvationFlag

owner: g4 waitqueue: g8 starvationFlag: 0

此时新来的 g8 可以正常加入到对锁的争抢中了,之后就是正常的加锁解锁逻辑了。

一点小瑕疵: 一种很边缘的 starvation case

由于等待队列中的协程只有当被唤醒之后才会根据等待时间来判断是否进入 starvation mode,因此会存在一个协程在等待队列中等待了很久,它实际上已经饥饿了,但是一直没被唤醒过,就没机会 set starvationFlag,这就会导致饥饿现象的发生。

那么会存在等待队列里的协程一直不被唤醒的情况么?

有的!在 unlockSlow() 时如果 wokenFlag = 1,那就不会去唤醒等待队列中的线程。就会存在这样一种情况,假设每次 Unlock() 时恰好有一个新来的协程在自旋,那等待队列中的协程就会永远饥饿下去!

reference

Tail Latency Might Matter More Than You Think

Golang 互斥锁内部实现

到此这篇关于golang中sync.Mutex的实现方法的文章就介绍到这了,更多相关golang sync.Mutex  内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/XiaoXiaoShuai-/p/16104689.html

延伸 · 阅读

精彩推荐
  • GolangGo语言实现AzDG可逆加密算法实例

    Go语言实现AzDG可逆加密算法实例

    这篇文章主要介绍了Go语言实现AzDG可逆加密算法,实例分析了AzDG可逆加密算法的实现技巧,具有一定参考借鉴价值,需要的朋友可以参考下 ...

    脚本之家1882020-04-14
  • Golanggolang内存对齐的概念及案例详解

    golang内存对齐的概念及案例详解

    为保证程序顺利高效的运行,编译器会把各种类型的数据安排到合适的地址,并占用合适的长度,这就是内存对齐。本文重点给大家介绍golang内存对齐的概...

    Dawnlight-_-5022022-09-01
  • GolangGo语言流程控制之goto语句与无限循环

    Go语言流程控制之goto语句与无限循环

    这篇文章主要介绍了Go语言流程控制之goto语句与无限循环,是golang入门学习中的基础知识,需要的朋友可以参考下 ...

    脚本之家3012020-04-28
  • Golanggolang中snappy的使用场合实例详解

    golang中snappy的使用场合实例详解

    在java 和go语言 大字符传达的时候, 采用snappy 压缩 解压缩是最好的方案。下面这篇文章主要给大家介绍了关于golang中snappy使用场合的相关资料,文中通过...

    daisy2052020-05-12
  • GolangGolang极简入门教程(三):并发支持

    Golang极简入门教程(三):并发支持

    这篇文章主要介绍了Golang极简入门教程(三):并发支持,本文讲解了goroutine线程、channel 操作符等内容,需要的朋友可以参考下 ...

    junjie3262019-11-17
  • Golanggo语言定时器Timer及Ticker的功能使用示例详解

    go语言定时器Timer及Ticker的功能使用示例详解

    这篇文章主要为大家介绍了go语言定时器的功能使用示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪...

    Jeff的技术栈11982022-04-14
  • GolangVSCode Golang dlv调试数据截断问题及处理方法

    VSCode Golang dlv调试数据截断问题及处理方法

    这篇文章主要介绍了VSCode Golang dlv调试数据截断问题,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友...

    FanZheng''s Debug Blog3962020-07-23
  • Golanggolang配制高性能sql.DB的使用

    golang配制高性能sql.DB的使用

    本文主要讲述SetMaxOpenConns(), SetMaxIdleConns() 和 SetConnMaxLifetime()方法, 您可以使用它们来配置sql.DB的行为并改变其性能,感兴趣的可以了解一下...

    Go语言由浅入深12262022-01-22