运维八一 运维八一
首页
运维杂记
编程浅尝
周积跬步
专栏
生活
关于
收藏
  • 分类
  • 标签
  • 归档
Source (opens new window)

运维八一

运维,运维!
首页
运维杂记
编程浅尝
周积跬步
专栏
生活
关于
收藏
  • 分类
  • 标签
  • 归档
Source (opens new window)
  • Go

    • 前言

    • Go基础知识

    • Go基本语法

    • 实战项目:简单web服务

    • 基本数据类型

    • 内置运算符

    • 分支和循环

    • 函数 function

    • 结构体 struct

    • 方法 method

    • 实战项目:跟踪函数调用链

    • 接口 interface

    • 并发 concurrency

      • 什么是并发
      • goroutine
      • select
      • channel
      • 基于共享内存的并发模型
        • 5. 基于共享内存的并发模型
          • 5.1 sync 包低级同步原语使用场景
          • 5.2 sync 包中同步原语使用的注意事项
          • 5.3 sync 包中同步原语的用法
          • 5.3.1 互斥锁 (Mutex) 和读写锁 (RWMutex)
          • 5.3.2 条件变量 (Cond)
      • 原子操作 atomic包
    • 指针

    • 实战项目:实现轻量级线程池

    • 实战项目:实现TCP服务器

    • go常用包

    • Gin框架

    • go随记

  • Python

  • Shell

  • Java

  • Vue

  • 前端

  • 编程浅尝
  • Go
  • 并发 concurrency
lyndon
2022-06-07
目录

基于共享内存的并发模型

# 5. 基于共享内存的并发模型

Go 应用并发设计的主流风格:使用 channel 进行不同 Goroutine 间的通信。

Go 语言之父 Rob Pike 有一句经典名言:“不要通过共享内存来通信,应该通过通信来共享内存(Don’t communicate by sharing memory, share memory by communicating)”。

Go 也并没有彻底放弃基于共享内存的并发模型,而是在提供 CSP 并发模型原语的同时,还通过标准库的 sync 包,提供了针对传统的、基于共享内存并发模型的低级同步原语,包括:互斥锁(sync.Mutex)、读写锁(sync.RWMutex)、条件变量(sync.Cond)等,并通过 atomic 包提供了原子操作原语等等。

# 5.1 sync 包低级同步原语使用场景

一般情况下,优先使用 CSP 并发模型进行并发程序设计。在一些特殊场景中,需要 sync 包提供的低级同步原语。

第一种:需要高性能的临界区(critical section)同步机制场景。

在 Go 中,channel 并发原语也可以用于对数据对象访问的同步,可以把 channel 看成是一种高级的同步原语,它自身的实现也是建构在低级同步原语之上的。也正因为如此,channel 自身的性能与低级同步原语相比要略微逊色,开销要更大。

关于 sync.Mutex 和 channel 各自实现的临界区同步机制,一个简单的性能基准测试对比,性能差异:

var cs = 0 // 模拟临界区要保护的数据
var mu sync.Mutex
var c = make(chan struct{}, 1)

func criticalSectionSyncByMutex() {
 mu.Lock()
 cs++
 mu.Unlock()
}

func criticalSectionSyncByChan() {
 c <- struct{}{}
 cs++
 <-c
}

func BenchmarkCriticalSectionSyncByMutex(b *testing.B) {
 for n := 0; n < b.N; n++ {
     criticalSectionSyncByMutex()
 }
}

func BenchmarkCriticalSectionSyncByMutexInParallel(b *testing.B) {
 b.RunParallel(func(pb *testing.PB) {
     for pb.Next() {
         criticalSectionSyncByMutex()
     }
 })
}

func BenchmarkCriticalSectionSyncByChan(b *testing.B) {
 for n := 0; n < b.N; n++ {
     criticalSectionSyncByChan()
 }
}

func BenchmarkCriticalSectionSyncByChanInParallel(b *testing.B) {
 b.RunParallel(func(pb *testing.PB) {
     for pb.Next() {
         criticalSectionSyncByChan()
     }
 })
}

// 运行这个对比测试(Go 1.17),得到:
/*
$go test -bench .
goos: darwin
goarch: amd64
... ...
BenchmarkCriticalSectionSyncByMutex-8               88083549          13.64 ns/op
BenchmarkCriticalSectionSyncByMutexInParallel-8     22337848          55.29 ns/op
BenchmarkCriticalSectionSyncByChan-8                28172056          42.48 ns/op
BenchmarkCriticalSectionSyncByChanInParallel-8       5722972         208.1 ns/op
PASS
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

无论是在单 Goroutine 情况下,还是在并发测试情况下,sync.Mutex实现的同步机制的性能,都要比 channel 实现的高出三倍多。因此,通常在需要高性能的临界区(critical section)同步机制的情况下,sync 包提供的低级同步原语更为适合。

第二种就是在不想转移结构体对象所有权,但又要保证结构体内部状态数据的同步访问的场景。

基于 channel 的并发设计,有一个特点:在 Goroutine 间通过 channel 转移数据对象的所有权。所以,只有拥有数据对象所有权(从 channel 接收到该数据)的 Goroutine 才可以对该数据对象进行状态变更。

如果你的设计中没有转移结构体对象所有权,但又要保证结构体内部状态数据在多个 Goroutine 之间同步访问,那么你可以使用 sync 包提供的低级同步原语来实现,比如最常用的sync.Mutex。

# 5.2 sync 包中同步原语使用的注意事项

首次使用 Mutex 等 sync 包中定义的结构类型后,不应该再对它们进行复制操作。

在 sync 包的注释中(在$GOROOT/src/sync/mutex.go文件的头部注释),有一行说明:

// Values containing the types defined in this package should not be copied.  “不应复制那些包含了此包中类型的值”
1

在 sync 包的其他源文件中,同样看到类似的一些注释:

// $GOROOT/src/sync/mutex.go
// A Mutex must not be copied after first use. (禁止复制首次使用后的Mutex)

// $GOROOT/src/sync/rwmutex.go
// A RWMutex must not be copied after first use.(禁止复制首次使用后的RWMutex)

// $GOROOT/src/sync/cond.go
// A Cond must not be copied after first use.(禁止复制首次使用后的Cond)
... ...
1
2
3
4
5
6
7
8
9

Go 标准库中 sync.Mutex 的定义:

// $GOROOT/src/sync/mutex.go
type Mutex struct {
    state int32
    sema  uint32
}
1
2
3
4
5

Mutex 的定义非常简单,由两个整型字段 state 和 sema 组成:

  • state:表示当前互斥锁的状态;
  • sema:用于控制锁状态的信号量。

初始情况下,Mutex 的实例处于 Unlocked 状态(state 和 sema 均为 0)。对 Mutex 实例的复制也就是两个整型字段的复制。一旦发生复制,原变量与副本就是两个单独的内存块,各自发挥同步作用,互相就没有了关联。

func main() {
     var wg sync.WaitGroup
     i := 0
     var mu sync.Mutex // 负责对i的同步访问
 
     wg.Add(1)
     // g1
     go func(mu1 sync.Mutex) {
         mu1.Lock()
         i = 10
         time.Sleep(10 * time.Second)
         fmt.Printf("g1: i = %d\n", i)
         mu1.Unlock()
         wg.Done()
     }(mu)
 
     time.Sleep(time.Second)
 
     mu.Lock()
     i = 1
     fmt.Printf("g0: i = %d\n", i)
     mu.Unlock()
 
     wg.Wait()
 }
 
 /*
g0: i = 1
g1: i = 1
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

使用一个 sync.Mutex 类型变量 mu 来同步对整型变量 i 的访问。创建一个新 Goroutine:g1,g1 通过函数参数得到 mu 的一份拷贝 mu1,然后 g1 会通过 mu1 来同步对整型变量 i 的访问。

从运行结果来看,这个程序并没有实现对 i 的同步访问,第 9 行 g1 对 mu1 的加锁操作,并没能阻塞第 19 行 g0 对 mu 的加锁。于是,g1 刚刚将 i 赋值为 10 后,g0 就又将 i 赋值为 1 了。出现这种结果的原因就是一旦 Mutex 类型变量被拷贝,原变量与副本就各自发挥作用,互相没有关联了。甚至,如果拷贝的时机不对,比如在一个 mutex 处于 locked 的状态时对它进行了拷贝,就会对副本进行加锁操作,将导致加锁的 Goroutine 永远阻塞下去。

结论:

如果对使用过的、sync 包中的类型的示例进行复制,并使用了复制后得到的副本,将导致不可预期的结果。所以,在使用 sync 包中的类型的时候,推荐通过闭包方式,或者是传递类型实例(或包裹该类型的类型实例)的地址(指针)的方式进行。

# 5.3 sync 包中同步原语的用法

# 5.3.1 互斥锁 (Mutex) 和读写锁 (RWMutex)

sync 包提供了两种用于临界区同步的原语:

  • 互斥锁(Mutex)
  • 读写锁(RWMutex)

它们都是零值可用的数据类型,不需要显式初始化就可以使用,并且使用方法都比较简单。

互斥锁(Mutex)

var mu sync.Mutex
mu.Lock()   // 加锁
doSomething()
mu.Unlock() // 解锁
1
2
3
4

一旦某个 Goroutine 调用的 Mutex 执行 Lock 操作成功,它将成功持有这把互斥锁。这个时候,如果有其他 Goroutine 执行 Lock 操作,就会阻塞在这把互斥锁上,直到持有这把锁的 Goroutine 调用 Unlock 释放掉这把锁后,才会抢到这把锁的持有权并进入临界区。

使用互斥锁的两个原则:

  • 尽量减少在锁中的操作。这可以减少其他因 Goroutine 阻塞而带来的损耗与延迟。
  • 一定要记得调用 Unlock 解锁。忘记解锁会导致程序局部死锁,甚至是整个程序死锁,会导致严重的后果。

互斥锁应用场景:

互斥锁(Mutex)是临时区同步原语的首选,它常被用来对结构体对象的内部状态、缓存等进行保护,是使用最为广泛的临界区同步原语。

读写锁(RWMutex)

var rwmu sync.RWMutex
rwmu.RLock()   //加读锁
readSomething()
rwmu.RUnlock() //解读锁
rwmu.Lock()    //加写锁
changeSomething()
rwmu.Unlock()  //解写锁
1
2
3
4
5
6
7

写锁与 Mutex 的行为十分类似,一旦某 Goroutine 持有写锁,其他 Goroutine 无论是尝试加读锁,还是加写锁,都会被阻塞在写锁上。

但读锁就宽松多了,一旦某个 Goroutine 持有读锁,它不会阻塞其他尝试加读锁的 Goroutine,但加写锁的 Goroutine 依然会被阻塞住。

读写锁应用场景:

读写锁适合应用在具有一定并发量且读多写少的场合。

在大量并发读的情况下,多个 Goroutine 可以同时持有读锁,从而减少在锁竞争中等待的时间。而互斥锁,即便是读请求的场合,同一时刻也只能有一个 Goroutine 持有锁,其他 Goroutine 只能阻塞在加锁操作上等待被调度。

// 基准测试
var cs1 = 0 // 模拟临界区要保护的数据
var mu1 sync.Mutex

var cs2 = 0 // 模拟临界区要保护的数据
var mu2 sync.RWMutex

func BenchmarkWriteSyncByMutex(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu1.Lock()
            cs1++
            mu1.Unlock()
        }
    })
}

func BenchmarkReadSyncByMutex(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu1.Lock()
            _ = cs1
            mu1.Unlock()
        }
    })
}

func BenchmarkReadSyncByRWMutex(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu2.RLock()
            _ = cs2
            mu2.RUnlock()
        }
    })
}

func BenchmarkWriteSyncByRWMutex(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu2.Lock()
            cs2++
            mu2.Unlock()
        }
    })
}
// 这些基准测试都是并发测试,度量的是 Mutex、RWMutex 在并发下的读写性能。分别在 cpu=2、8、16、32 的情况下运行这个并发性能测试,测试结果如下:
/*
goos: darwin
goarch: amd64
... ...
BenchmarkWriteSyncByMutex-2       73423770          16.12 ns/op
BenchmarkReadSyncByMutex-2        84031135          15.08 ns/op
BenchmarkReadSyncByRWMutex-2      37182219          31.87 ns/op
BenchmarkWriteSyncByRWMutex-2     40727782          29.08 ns/op

BenchmarkWriteSyncByMutex-8       22153354          56.39 ns/op
BenchmarkReadSyncByMutex-8        24164278          51.12 ns/op
BenchmarkReadSyncByRWMutex-8      38589122          31.17 ns/op
BenchmarkWriteSyncByRWMutex-8     18482208          65.27 ns/op

BenchmarkWriteSyncByMutex-16        20672842          62.94 ns/op
BenchmarkReadSyncByMutex-16         19247158          62.94 ns/op
BenchmarkReadSyncByRWMutex-16       29978614          39.98 ns/op
BenchmarkWriteSyncByRWMutex-16      16095952          78.19 ns/op

BenchmarkWriteSyncByMutex-32        20539290          60.20 ns/op
BenchmarkReadSyncByMutex-32         18807060          72.61 ns/op
BenchmarkReadSyncByRWMutex-32       29772936          40.45 ns/op
BenchmarkWriteSyncByRWMutex-32      13320544          86.53 ns/op
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

结论:

  • 并发量较小的情况下,Mutex 性能最好;随着并发量增大,Mutex 的竞争激烈,导致加锁和解锁性能下降;
  • RWMutex 的读锁性能并没有随着并发量的增大,而发生较大变化,性能始终恒定在 40ns 左右;
  • 在并发量较大的情况下,RWMutex 的写锁性能和 Mutex、RWMutex 读锁相比,是最差的,并且随着并发量增大,RWMutex 写锁性能有继续下降趋势。

# 5.3.2 条件变量 (Cond)

sync.Cond 是传统的条件变量原语概念在 Go 语言中的实现。

可以把一个条件变量理解为一个容器,这个容器中存放着一个或一组等待着某个条件成立的 Goroutine。当条件成立后,这些处于等待状态的 Goroutine 将得到通知,并被唤醒继续进行后续的工作。

条件变量是同步原语的一种,如果没有条件变量,开发人员可能需要在 Goroutine 中通过连续轮询的方式,检查某条件是否为真,这种连续轮询非常消耗资源,因为 Goroutine 在这个过程中是处于活动状态的,但它的工作又没有进展。

使用sync.Mutex 实现对条件轮询等待:

type signal struct{}

var ready bool

func worker(i int) {
  fmt.Printf("worker %d: is working...\n", i)
  time.Sleep(1 * time.Second)
  fmt.Printf("worker %d: works done\n", i)
}

func spawnGroup(f func(i int), num int, mu *sync.Mutex) <-chan signal {
  c := make(chan signal)
  var wg sync.WaitGroup

  for i := 0; i < num; i++ {
    wg.Add(1)
    go func(i int) {
      for {
        mu.Lock()
        if !ready {
          mu.Unlock()
          time.Sleep(100 * time.Millisecond)
          continue
        }
        mu.Unlock()
        fmt.Printf("worker %d: start to work...\n", i)
        f(i)
        wg.Done()
        return
      }
    }(i + 1)
  }

  go func() {
    wg.Wait()
    c <- signal(struct{}{})
  }()
  return c
}

func main() {
  fmt.Println("start a group of workers...")
  mu := &sync.Mutex{}
  c := spawnGroup(worker, 5, mu)

  time.Sleep(5 * time.Second) // 模拟ready前的准备工作
  fmt.Println("the group of workers start to work...")

  mu.Lock()
  ready = true
  mu.Unlock()

  <-c
  fmt.Println("the group of workers work done!")
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

轮询的方式开销大,轮询间隔设置的不同,条件检查的及时性也会受到影响。

使用sync.Cond对上面的例子进行改造:

type signal struct{}

var ready bool

func worker(i int) {
  fmt.Printf("worker %d: is working...\n", i)
  time.Sleep(1 * time.Second)
  fmt.Printf("worker %d: works done\n", i)
}

func spawnGroup(f func(i int), num int, groupSignal *sync.Cond) <-chan signal {
  c := make(chan signal)
  var wg sync.WaitGroup

  for i := 0; i < num; i++ {
    wg.Add(1)
    go func(i int) {
      groupSignal.L.Lock()
      for !ready {
        groupSignal.Wait()
      }
      groupSignal.L.Unlock()
      fmt.Printf("worker %d: start to work...\n", i)
      f(i)
      wg.Done()
    }(i + 1)
  }

  go func() {
    wg.Wait()
    c <- signal(struct{}{})
  }()
  return c
}

func main() {
  fmt.Println("start a group of workers...")
  groupSignal := sync.NewCond(&sync.Mutex{})
  c := spawnGroup(worker, 5, groupSignal)

  time.Sleep(5 * time.Second) // 模拟ready前的准备工作
  fmt.Println("the group of workers start to work...")

  groupSignal.L.Lock()
  ready = true
  groupSignal.Broadcast()
  groupSignal.L.Unlock()

  <-c
  fmt.Println("the group of workers work done!")
}

/*
start a group of workers...
the group of workers start to work...
worker 2: start to work...
worker 2: is working...
worker 3: start to work...
worker 3: is working...
worker 1: start to work...
worker 1: is working...
worker 4: start to work...
worker 5: start to work...
worker 5: is working...
worker 4: is working...
worker 4: works done
worker 2: works done
worker 3: works done
worker 1: works done
worker 5: works done
the group of workers work done!
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

通常使用 sync.Mutex 来初始化 sync.Cond,条件变量需要这个互斥锁来同步临界区,保护用作条件的数据。

加锁后,各个等待条件成立的 Goroutine 判断条件是否成立,如果不成立,则调用sync.Cond的 Wait 方法进入等待状态。Wait 方法在 Goroutine 挂起前会进行 Unlock 操作。

当 main goroutine 将ready置为 true,并调用sync.Cond的 Broadcast 方法后,各个阻塞的 Goroutine 将被唤醒,并从 Wait 方法中返回。Wait 方法返回前,Wait 方法会再次加锁让 Goroutine 进入临界区。接下来 Goroutine 会再次对条件数据进行判定,如果条件成立,就会解锁并进入下一个工作阶段;如果条件依旧不成立,那么会再次进入循环体,并调用 Wait 方法挂起等待。

上次更新: 2022/06/12, 15:48:09
channel
原子操作 atomic包

← channel 原子操作 atomic包→

最近更新
01
ctr和crictl显示镜像不一致
03-13
02
alpine镜像集成常用数据库客户端
03-13
03
create-cluster
02-26
更多文章>
Theme by Vdoing | Copyright © 2015-2024 op81.com
苏ICP备18041258号-2
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式