抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

并发安全

1-并发安全

串行、并行、并发

竞态

竞态 是指多个 goroutine 按某些交错顺序执行时,争抢使用同一份临界资源,导致程序无法给出正确的结果。
串行程序中(一个程序只有一个 goroutine),程序中各个步骤的执行顺序由程序逻辑决定,所以单个 goroutine 不会引发竞态问题。
并发程序中(一个程序有多个 goroutine),每个 goroutine 的执行顺序是不一样的,因此可能会带来竞态问题。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package main

import "fmt"

var x int64

func add() {
for i := 0; i < 5_000_000; i++ {
x = x + 1
}
}

func main() {
add()
add()
fmt.Println(x) // 10_000_000
}

这是一个串行程序,所以最终的结果是正确的。

下面开启新的 goroutine 执行 add()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import (
"fmt"
"sync"
)

var (
x int64
wg sync.WaitGroup
)

func add() {
defer wg.Done()
for i := 0; i < 5_000_000; i++ {
x = x + 1
}
}

func main() {
wg.Add(2)

go add()
go add()
fmt.Println(x) // 5_872_265

wg.Wait()
}

从打印结果可以看出结果并不正确,这是因为 3 个 goroutine 争抢着使用 x 这个临界资源,有可能最终的 x 的结果是正确的,但是main() 提前抢到资源就打印出来了,也有可能两个 子goroutine 争抢时导致 x 被写乱了(发生了竞态)。

避免竞态

  • 第一种:不要修改变量。这种可以,但不现实,因为实际业务中必然涉及同时读写同一个变量的场景。只能说尽量避免。
  • 第二种:上锁

上锁可以很好的解决竞态问题,但是会有些许性能上的损失。

发生竞态的主要原因是因为:多个 goroutine 争抢读写同一个临界资源。这个临界资源可以是打印机、全局变量等等。

举个栗子,
一个人就是一个 goroutine,公共卫生间就是一种临界资源,同一时刻只能有一个人使用(一个临界资源,同一时刻只能有一个 goroutine 读写 )。多个人同时争抢一个公共卫生间肯定出问题。

所以卫生间要加个锁,进去使用的人上锁,使用完了开锁。
使用临界资源的 goroutine 给临界资源上锁,使用完了把锁释放。

Golang 中的 sync 包提供了 MutexRWMutex 两种锁。

sync.Mutex 互斥锁

1
2
3
4
5
6
7
type Mutex struct {
state int32
sema uint32
}

func (m *Mutex) Lock() {}
func (m *Mutex) Unlock() {}

Mutex 非常简单,只有Lock()Unlock() 两个方法。

加锁

  • Lock() 用于加锁
  • 加锁规则:
    • 如果互斥锁已经被上锁了,则加锁操作阻塞,直到互斥锁被解锁以后才能上锁。
    • 如果互斥锁没有被上锁,那么上锁成功。
    解锁
  • Unlock() 用于解锁
  • 解锁规则:释放互斥锁

上锁

下面我们对上面的例子中,争抢临界资源的部分上锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"fmt"
"sync"
)

var (
x int64
wg sync.WaitGroup
lock sync.Mutex
)

func add() {
defer wg.Done()
for i := 0; i < 5_000_000; i++ {
lock.Lock() // 加锁
x = x + 1
lock.Unlock() // 解锁
}
}

func main() {
wg.Add(2)

go add()
go add()
fmt.Println(x) // 10_000_000

wg.Wait()
}

如此一来,不管哪个 子goroutine 得到运行的机会,别的 goroutine 都不能访问临界资源,得等到上了锁的 goroutine 释放,别的 goroutine 才能访问。

延迟解锁

如果一个 goroutine 上了锁之后忘记释放锁,别的 goroutine 是永远拿不到锁的,还会导致死锁。
在复杂的代码中,很难确定所有分支中的 Lock()Unlock() 成对出现,所以,最好养成锁成对写、延迟解锁的习惯。

1
2
3
4
5
6
7
8
9
10
var x int
var lock sync.Mutex

func add() {
lock.Lock() // 上锁
x = x + 1

defer lock.Unlock() // 延迟解锁,在函数运行结束之后会自动解锁
return x
}

sync.RWMutex 读写互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}

// 写锁
func (rw *RWMutex) Lock() {}
func (rw *RWMutex) Unlock() {}
// 读锁
func (rw *RWMutex) RLock() {}
func (rw *RWMutex) RUnlock() {}

func (rw *RWMutex) RLocker() Locker {}

读写互斥锁适用于读多写少的场景。因为在使用 Mutex 互斥锁的时候,不管是因为要去读上的锁还是要去写上的锁,只要一上了锁,别的 goroutine 就不能上锁,也不能读写。

于是在 读多写少 的场景下,导致少量写操作时,也不能读。所以这种场景要使用 读写互斥锁

写锁

  • Lock() 函数用于上写锁
  • Unlock() 函数用于解写锁
  • 加锁规则:
    • 如果当前的读写锁被上锁了,则 加写锁 操作阻塞直到读写锁被释放
    • 如果当前的读写锁没有被锁,则 加写锁 成功
  • 释放规则:直接释放

读锁

  • RLock() 函数用于上读锁
  • RUnlock() 函数用于解读锁
  • 加锁规则:
    • 如果当前的读写锁 没有被锁,则 加读锁 操作成功
    • 如果当前的读写锁 被上了读锁,则 加读锁 操作成功
    • 如果当前的读写锁 被上了写锁,则 加读锁 操作阻塞,直到读写锁被释放

eg:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"fmt"
"sync"
"time"
)

var (
x int64
wg sync.WaitGroup
rwlock sync.RWMutex // 读写锁
)

func read() {
defer wg.Done()

rwlock.RLock() // 加读锁
time.Sleep(time.Millisecond)
fmt.Println(x)
defer rwlock.RUnlock() // 解读锁
}

func write() {
defer wg.Done()

rwlock.Lock() // 加写锁
time.Sleep(time.Millisecond * 5)
x = x + 1
defer rwlock.Unlock() // 解写锁
}

func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}

for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}

wg.Wait()
fmt.Println(time.Now().Sub(start))
}
// --------------------------------------
// Output:
...
1
1
1
1
215.9568ms

Mutex 和 RWMutex 的选择

  • 仅在绝大部分goroutine都在获取读锁并且锁竞争比较激烈时(即,goroutine一般都需要等待后才能获到锁),RWMutex才有优势
  • 否则,一般使用Mutex即可

sync.Once

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Once struct {
done uint32
m Mutex
}

func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}

func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
  • Once 是一个结构体,里面有一把 Mutexm,还有一个 done 标志位用来标记 f 是否执行过了。(0未执行,1已执行)
  • Once 只有一个 Do() 方法,其中检查了一下标志位,如果为 0 表示 f 未执行过,调用 doSlow();非 0 表示执行过了,直接返回
    doSlow() 中,先是上锁,然后检查 done 标志位,看看 f 是否执行过,如果没有执行,就执行,然后标志位置为 1。

sync.Once 写一个单例模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"fmt"
"sync"
)

type person struct {
name string
age int
}

var (
p *person
once sync.Once
)

// 构造方法使用 Once 的 Do 方法实现
func NewSinglePerson(name string, age int) *person {
once.Do(func() {
p = &person{name, age}
})
return p
}

func main() {
p1 := NewSinglePerson("Boii", 18)
p2 := NewSinglePerson("Eva", 20)
fmt.Printf("%p\n", p1) // 0xc0000044a0
fmt.Printf("%p\n", p2) // 0xc0000044a0
fmt.Printf("%v\n", p1) // &{Boii 18}
fmt.Printf("%v\n", p2) // &{Boii 18}
}

sync.Map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Map struct {
mu Mutex
read atomic.Value // readOnly
dirty map[interface{}]*entry
misses int
}

// set
func (m *Map) Store(key, value interface{}) {}

// get
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {}

// 删除
func (m *Map) Delete(key interface{}) {}
// 先加载看,有再删除
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {}
// 先加载看,没有再存,有就不存
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {}

// 遍历
func (m *Map) Range(f func(key, value interface{}) bool) {}

Golang 中内置的 map 不是并发安全的,在对 map 并发写的时候会出现问题。

Golang 的 sync 包中提供了一个开箱即用的并发安全版map:sync.Map

开箱即用,即表示不需要 make() 函数初始化就能直接使用,同时内置了诸如 Store、Load、Delete、Range 等操作。

先看对内置 map 并发写会发生什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
var m = make(map[string]int)
var wg sync.WaitGroup

func get(key string) int {
return m[key]
}

func set(key string, value int) {
m[key] = value
}

func call (n int) {
defer wg.Done()
key := strconv.Itoa(n)
set(key, n)
fmt.Printf("k=:%v,v:=%v\n", key, get(key))
}

func main() {
for i := 0; i < 20; i++ {
wg.Add(1)
go call(i)
}
wg.Wait()
}
// -------------------------------
// Output:
fatal error: k=:0,v:=0
concurrent map writes
k=:19,v:=19
fatal error: concurrent map writes
...

采用 并发安全版:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
var wg sync.WaitGroup
var sm = sync.Map{} // 声明 sync.Map 变量

func call(n int) {
defer wg.Done()

key := strconv.Itoa(n)

sm.Store(key, n) // 写
value, _ := sm.Load(key) // 读

fmt.Printf("k=:%v,v:=%v\n", key, value)
}

func main() {
for i := 0; i < 20; i++ {
wg.Add(1)
go call(i)
}
wg.Wait()
}
// -------------------------------
// Output:
k=:19,v:=19
k=:10,v:=10
k=:11,v:=11
k=:2,v:=2
k=:15,v:=15
k=:3,v:=3
k=:12,v:=12
k=:0,v:=0
k=:13,v:=13
k=:18,v:=18
k=:14,v:=14
k=:6,v:=6
k=:4,v:=4
k=:5,v:=5
k=:8,v:=8
k=:7,v:=7
k=:9,v:=9
k=:1,v:=1
k=:17,v:=17
k=:16,v:=16

原子操作

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。
针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是 Go 语言提供的方法它在用户态就可以完成,
因此性能比加锁操作更好。Go语言中原子操作由内置的标准库 sync/atomic 提供。

atomic 包

读取操作
func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
写入操作
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
修改操作
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
交换操作
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
比较并交换操作
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

var (
lock sync.Mutex
x, y, z int64
)

// 非并发安全版
func add() {
x++
}

// 加锁版
func mutexAdd() {
lock.Lock()
defer lock.Unlock()
y++
}

// 原子版
func atomicAdd() {
atomic.AddInt64(&z, 1)
}

func test(f func()) time.Duration {
var wg sync.WaitGroup

start := time.Now()
for i := 0; i < 10000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
f()
}()
}

wg.Wait()
return time.Now().Sub(start)
}

func main() {
fmt.Println("非并发安全版:", x, test(add))
fmt.Println("加锁版 :", y, test(mutexAdd))
fmt.Println("原子操作版 :", z, test(atomicAdd))
}

// -------------------------------
// Output:
非并发安全版: 9604 4.9936ms
加锁版 : 10000 3.9898ms
原子操作版 : 10000 2.9909ms

从结果可以看出,非并发安全版连结果都不对,加锁版安全,原子版安全且效率更高。

atomic 包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

哔哔