channel
# 4. channel
# 4.1 channel 基本用法
channel 作为 go语言 的一等公民,可以像使用普通变量那样使用 channel。
比如,定义 channel 类型变量、给 channel 变量赋值、将 channel 作为参数传递给函数 / 方法、将 channel 作为返回值从函数 / 方法中返回,甚至将 channel 发送到其他 channel 中。这就大大简化了 channel 原语的使用,提升了开发者在做并发设计和实现时的体验。
和切片、结构体、map 等一样,channel 也是一种复合数据类型。也就是说,在声明一个 channel 类型变量时,必须给出其具体的元素类型:
var ch chan int
声明一个元素为 int 类型的 channel 类型变量 ch。
如果 channel 类型变量在声明时没有被赋予初值,那么它的默认值为 nil。并且,和其他复合数据类型支持使用复合类型字面值作为变量初始值不同,为 channel 类型变量赋初值的唯一方法就是使用 make 这个 Go 预定义的函数:
ch1 := make(chan int)
ch2 := make(chan int, 5)
2
声明两个元素类型为 int 的 channel 类型变量 ch1 和 ch2,并给这两个变量赋初值。
两个变量的赋初值操作使用的 make 调用的形式有所不同:
- 第一行通过 make(chan T )创建的元素类型为 T 的 channel 类型,是无缓冲 channel;
- 第二行中通过带有 capacity 参数的make(chan T, capacity)创建的元素类型为 T、缓冲区长度为 capacity 的 channel 类型,是带缓冲 channel。
这两种类型的变量关于发送(send)与接收(receive)的特性是不同的。
# 4.1.1 发送与接收
Go 提供了 ==<-== 操作符用于对 channel 类型变量进行发送与接收操作:
ch1 <- 13 // 将整型字面值13发送到无缓冲channel类型变量ch1中
n := <- ch1 // 从无缓冲channel类型变量ch1中接收一个整型值存储到整型变量n中
ch2 <- 17 // 将整型字面值17发送到带缓冲channel类型变量ch2中
m := <- ch2 // 从带缓冲channel类型变量ch2中接收一个整型值存储到整型变量m中
2
3
4
channel 是用于 Goroutine 间通信的,所以绝大多数对 channel 的读写都被分别放在了不同的 Goroutine 中。
- Goroutine 对无缓冲 channel 的接收和发送操作是同步的。
由于无缓冲 channel 的运行时层实现不带有缓冲区,对同一个无缓冲 channel,只有对它进行接收操作的 Goroutine 和对它进行发送操作的 Goroutine 都存在的情况下,通信才能得以进行,否则单方面的操作会让对应的 Goroutine 陷入挂起状态。
func main() { ch1 := make(chan int) ch1 <- 13 // fatal error: all goroutines are asleep - deadlock! n := <-ch1 println(n) }
1
2
3
4
5
6创建一个无缓冲的 channel 类型变量 ch1,对 ch1 的读写都放在了一个 Goroutine 中。运行这个示例,就会得到 fatal error,提示我们所有 Goroutine 都处于休眠状态,程序处于死锁状态。要想解除这种错误状态,只需要将接收操作,或者发送操作放到另外一个 Goroutine 中就可以了:
func main() { ch1 := make(chan int) go func() { ch1 <- 13 // 将发送操作放入一个新goroutine中执行 }() n := <-ch1 println(n) }
1
2
3
4
5
6
7
8结论:对无缓冲 channel 类型的发送与接收操作,一定要放在两个不同的 Goroutine 中进行,否则会导致 deadlock。
- Goroutine 对带缓冲 channel 的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收不需要阻塞等待)。
对一个带缓冲 channel 来说,在缓冲区未满的情况下,对它进行发送操作的 Goroutine 并不会阻塞挂起;在缓冲区有数据的情况下,对它进行接收操作的 Goroutine 也不会阻塞挂起。
但当缓冲区满了的情况下,对它进行发送操作的 Goroutine 就会阻塞挂起;当缓冲区为空的情况下,对它进行接收操作的 Goroutine 也会阻塞挂起。
ch2 := make(chan int, 1) n := <-ch2 // 由于此时ch2的缓冲区中无数据,因此对其进行接收操作将导致goroutine挂起 ch3 := make(chan int, 1) ch3 <- 17 // 向ch3发送一个整型数17 ch3 <- 27 // 由于此时ch3中缓冲区已满,再向ch3发送数据也将导致goroutine挂起
1
2
3
4
5
6
使用操作符<-,还可以声明只发送 channel 类型(send-only)和只接收 channel 类型(recv-only)。
ch1 := make(chan<- int, 1) // 只发送channel类型
ch2 := make(<-chan int, 1) // 只接收channel类型
<-ch1 // invalid operation: <-ch1 (receive from send-only type chan<- int)
ch2 <- 13 // invalid operation: ch2 <- 13 (send to receive-only type <-chan int)
2
3
4
5
试图从一个只发送 channel 类型变量中接收数据,或者向一个只接收 channel 类型发送数据,都会导致编译错误。
通常只发送 channel 类型和只接收 channel 类型,会被用作函数的参数类型或返回值,用于限制对 channel 内的操作,或者是明确可对 channel 进行的操作的类型,比如:
func produce(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i + 1
time.Sleep(time.Second)
}
close(ch)
}
func consume(ch <-chan int) {
for n := range ch {
println(n)
}
}
func main() {
ch := make(chan int, 5)
var wg sync.WaitGroup
wg.Add(2)
go func() {
produce(ch)
wg.Done()
}()
go func() {
consume(ch)
wg.Done()
}()
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
启动两个 Goroutine,分别代表生产者(produce)与消费者(consume)。
生产者只能向 channel 中发送数据,使用chan<- int作为 produce 函数的参数类型;
消费者只能从 channel 中接收数据,使用<-chan int作为 consume 函数的参数类型。
在消费者函数 consume 中,使用了 for range 循环语句来从 channel 中接收数据,for range 会阻塞在对 channel 的接收操作上,直到 channel 中有数据可接收或 channel 被关闭循环,才会继续向下执行。channel 被关闭后,for range 循环也就结束了。
produce 函数在发送完数据后,调用 Go 内置的 close 函数关闭了 channel。channel 关闭后,所有等待从这个 channel 接收数据的操作都将返回。
# 4.1.2 关闭 channel
采用不同接收语法形式的语句,在 channel 被关闭后的返回值的情况:
n := <- ch // 当ch被关闭后,n将被赋值为ch元素类型的零值
m, ok := <-ch // 当ch被关闭后,m将被赋值为ch元素类型的零值, ok值为false
for v := range ch { // 当ch被关闭后,for range循环结束
... ...
}
2
3
4
5
通过“comma, ok”惯用法或 for range 语句,可以准确地判定 channel 是否被关闭。而单纯采用n := <-ch形式的语句,就无法判定从 ch 返回的元素类型零值,究竟是不是因为 channel 被关闭后才返回的。
channel 的使用惯例:发送端负责关闭 channel。
因为发送端没有像接受端那样的、可以安全判断 channel 是否被关闭了的方法。同时,一旦向一个已经关闭的 channel 执行发送操作,这个操作就会引发 panic。
ch := make(chan int, 5)
close(ch)
ch <- 13 // panic: send on closed channel
2
3
# 4.2 channel 的惯用法
# 4.2.1 无缓冲 channel 的惯用法
无缓冲 channel 兼具通信和同步特性,在并发程序中应用颇为广泛。
第一种用法:用作信号传递
无缓冲 channel 用作信号传递的时候,有两种情况:
1 对 1 通知信号
type signal struct{} func worker() { println("worker is working...") time.Sleep(1 * time.Second) } func spawn(f func()) <-chan signal { c := make(chan signal) go func() { println("worker start to work...") f() c <- signal{} }() return c } func main() { println("start a worker...") c := spawn(worker) <-c fmt.Println("worker work done!") } /* start a worker... worker start to work... worker is working... worker work done! */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30spawn 函数返回的 channel,被用于承载新 Goroutine 退出的“通知信号”,这个信号专门用作通知 main goroutine。main goroutine 在调用 spawn 函数后一直阻塞在对这个“通知信号”的接收动作上。
1 对 n 通知信号
1 对 n 的信号通知机制,常被用于协调多个 Goroutine 一起工作。
func worker(i int) { fmt.Printf("worker %d: is working...\n", i) time.Sleep(1 * time.Second) fmt.Printf("worker %d: works done\n", i) } type signal struct{} func spawnGroup(f func(i int), num int, groupSignal <-chan signal) <-chan signal { c := make(chan signal) var wg sync.WaitGroup for i := 0; i < num; i++ { wg.Add(1) go func(i int) { <-groupSignal fmt.Printf("worker %d: start to work...\n", i) f(i) wg.Done() }(i + 1) } go func() { wg.Wait() c <- signal{} }() return c } func main() { fmt.Println("start a group of workers...") groupSignal := make(chan signal) c := spawnGroup(worker, 5, groupSignal) time.Sleep(5 * time.Second) fmt.Println("the group of workers start to work...") close(groupSignal) <-c fmt.Println("the group of workers work done!") } /* start a group of workers... the group of workers start to work... worker 3: start to work... worker 3: is working... worker 4: start to work... worker 4: is working... worker 1: start to work... worker 1: is working... worker 5: start to work... worker 5: is working... worker 2: start to work... worker 2: is working... worker 3: works done worker 4: works done worker 5: works done worker 1: works done worker 2: works done the group of workers work done! */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59main goroutine 创建了一组 5 个 worker goroutine,这些 Goroutine 启动后会阻塞在名为 groupSignal 的无缓冲 channel 上。main goroutine 通过close(groupSignal)向所有 worker goroutine 广播“开始工作”的信号,收到 groupSignal 后,所有 worker goroutine 会“同时”开始工作,就像起跑线上的运动员听到了裁判员发出的起跑信号枪声。
关闭一个无缓冲 channel 会让所有阻塞在这个 channel 上的接收操作返回,从而实现了一种 1 对 n 的“广播”机制。
第二种用法:用于替代锁机制
无缓冲 channel 具有同步特性,这让它在某些场合可以替代锁,让程序更加清晰,可读性也更好。
传统的、基于“共享内存”+“互斥锁”的 Goroutine 安全的计数器的实现:
type counter struct {
sync.Mutex
i int
}
var cter counter
func Increase() int {
cter.Lock()
defer cter.Unlock()
cter.i++
return cter.i
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
v := Increase()
fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
wg.Done()
}(i)
}
wg.Wait()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
使用一个带有互斥锁保护的全局变量作为计数器,所有要操作计数器的 Goroutine 共享这个全局变量,并在互斥锁的同步下对计数器进行自增操作。
使用无缓冲 channel 替代锁后的实现:
type counter struct {
c chan int
i int
}
func NewCounter() *counter {
cter := &counter{
c: make(chan int),
}
go func() {
for {
cter.i++
cter.c <- cter.i
}
}()
return cter
}
func (cter *counter) Increase() int {
return <-cter.c
}
func main() {
cter := NewCounter()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
v := cter.Increase()
fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
wg.Done()
}(i)
}
wg.Wait()
}
/*
goroutine-9: current counter value is 10
goroutine-0: current counter value is 1
goroutine-6: current counter value is 7
goroutine-2: current counter value is 3
goroutine-8: current counter value is 9
goroutine-4: current counter value is 5
goroutine-5: current counter value is 6
goroutine-1: current counter value is 2
goroutine-7: current counter value is 8
goroutine-3: current counter value is 4
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
在这个实现中,将计数器操作全部交给一个独立的 Goroutine 去处理,并通过无缓冲 channel 的同步阻塞特性,实现了计数器的控制。这样其他 Goroutine 通过 Increase 函数试图增加计数器值的动作,实质上就转化为了一次无缓冲 channel 的接收动作。
这种并发设计逻辑更符合 Go 语言所倡导的 “不要通过共享内存来通信,而是通过通信来共享内存” 的原则。
# 4.2.2 带缓冲 channel 的惯用法
带缓冲的 channel 与无缓冲的 channel 的最大不同之处,就在于它的异步性。也就是说,对一个带缓冲 channel,在缓冲区未满的情况下,对它进行发送操作的 Goroutine 不会阻塞挂起;在缓冲区有数据的情况下,对它进行接收操作的 Goroutine 也不会阻塞挂起。
第一种用法:用作消息队列
channel 经常被 Go 初学者视为在多个 Goroutine 之间通信的消息队列,这是因为,channel 的原生特性与消息队列十分相似,包括 Goroutine 安全、有 FIFO(first-in, first out)保证等。
其实,和无缓冲 channel 更多用于信号 / 事件管道相比,可自行设置容量、异步收发的带缓冲 channel 更适合被用作为消息队列,并且,带缓冲 channel 在数据收发的性能上要明显好于无缓冲 channel。
可以通过对 channel 读写的基本测试来印证这一点。下面是一些关于无缓冲 channel 和带缓冲 channel 收发性能测试的结果(Go 1.17, MacBook Pro 8 核)。基准测试的代码比较多,可以到这里 (opens new window)下载。
- 单接收单发送性能的基准测试
针对一个 channel 只有一个发送 Goroutine 和一个接收 Goroutine 的情况,两种 channel 的收发性能比对数据:
// 无缓冲channel // go-channel-operation-benchmark/unbuffered-chan $go test -bench . one_to_one_test.go goos: darwin goarch: amd64 cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz BenchmarkUnbufferedChan1To1Send-8 6037778 199.7 ns/op BenchmarkUnbufferedChan1To1Recv-8 6286850 194.5 ns/op PASS ok command-line-arguments 2.833s // 带缓冲channel // go-channel-operation-benchmark/buffered-chan $go test -bench . one_to_one_cap_10_test.go goos: darwin goarch: amd64 cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz BenchmarkBufferedChan1To1SendCap10-8 17089879 66.16 ns/op BenchmarkBufferedChan1To1RecvCap10-8 18043450 65.57 ns/op PASS ok command-line-arguments 2.460s
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23然后将 channel 的缓存由 10 改为 100,再看看带缓冲 channel 的 1 对 1 基准测试结果:
$go test -bench . one_to_one_cap_100_test.go goos: darwin goarch: amd64 cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz BenchmarkBufferedChan1To1SendCap100-8 23089318 53.06 ns/op BenchmarkBufferedChan1To1RecvCap100-8 23474095 51.33 ns/op PASS ok command-line-arguments 2.542s
1
2
3
4
5
6
7
8
- 多接收多发送性能基准测试
针对一个 channel 有多个发送 Goroutine 和多个接收 Goroutine 的情况,两种 channel 的收发性能比对数据(这里建立 10 个发送 Goroutine 和 10 个接收 Goroutine):
// 无缓冲channel // go-channel-operation-benchmark/unbuffered-chan $go test -bench . multi_to_multi_test.go goos: darwin goarch: amd64 cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz BenchmarkUnbufferedChanNToNSend-8 293930 3779 ns/op BenchmarkUnbufferedChanNToNRecv-8 280904 4190 ns/op PASS ok command-line-arguments 2.387s // 带缓冲channel // go-channel-operation-benchmark/buffered-chan $go test -bench . multi_to_multi_cap_10_test.go goos: darwin goarch: amd64 cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz BenchmarkBufferedChanNToNSendCap10-8 736540 1609 ns/op BenchmarkBufferedChanNToNRecvCap10-8 795416 1616 ns/op PASS ok command-line-arguments 2.514s
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23也将 channel 的缓存由 10 改为 100 后,看看带缓冲 channel 的多对多基准测试结果:
$go test -bench . multi_to_multi_cap_100_test.go goos: darwin goarch: amd64 cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz BenchmarkBufferedChanNToNSendCap100-8 1236453 966.4 ns/op BenchmarkBufferedChanNToNRecvCap100-8 1279766 969.4 ns/op PASS ok command-line-arguments 4.309s
1
2
3
4
5
6
7
8
测试结论:
- 无论是 1 收 1 发还是多收多发,带缓冲 channel 的收发性能都要好于无缓冲 channel;
- 对于带缓冲 channel 而言,发送与接收的 Goroutine 数量越多,收发性能会有所下降;
- 对于带缓冲 channel 而言,选择适当容量会在一定程度上提升收发性能。
注意:Go 支持 channel 的初衷是将它作为 Goroutine 间的通信手段,它并不是专门用于消息队列场景的。如果你的项目需要专业消息队列的功能特性,比如支持优先级、支持权重、支持离线持久化等,那么 channel 就不合适了,可以使用第三方的专业的消息队列实现。
第二种用法:用作计数信号量(counting semaphore)
Go 并发设计的一个惯用法,就是将带缓冲 channel 用作计数信号量(counting semaphore)。带缓冲 channel 中的当前数据个数代表的是,当前同时处于活动状态(处理业务)的 Goroutine 的数量,而带缓冲 channel 的容量(capacity),就代表了允许同时处于活动状态的 Goroutine 的最大数量。向带缓冲 channel 的一个发送操作表示获取一个信号量,而从 channel 的一个接收操作则表示释放一个信号量。
将带缓冲 channel 用作计数信号量:
var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)
func main() {
go func() {
for i := 0; i < 8; i++ {
jobs <- (i + 1)
}
close(jobs)
}()
var wg sync.WaitGroup
for j := range jobs {
wg.Add(1)
go func(j int) {
active <- struct{}{}
log.Printf("handle job: %d\n", j)
time.Sleep(2 * time.Second)
<-active
wg.Done()
}(j)
}
wg.Wait()
}
/*
2022/01/02 10:08:55 handle job: 1
2022/01/02 10:08:55 handle job: 4
2022/01/02 10:08:55 handle job: 8
2022/01/02 10:08:57 handle job: 5
2022/01/02 10:08:57 handle job: 7
2022/01/02 10:08:57 handle job: 6
2022/01/02 10:08:59 handle job: 3
2022/01/02 10:08:59 handle job: 2
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
创建一组 Goroutine 来处理 job,同一时间允许最多 3 个 Goroutine 处于活动状态。为了达成这一目标,这个示例使用了一个容量(capacity)为 3 的带缓冲 channel: active 作为计数信号量,这意味着允许同时处于活动状态的最大 Goroutine 数量为 3。
从示例运行结果中的时间戳中可以看到,虽然创建了很多 Goroutine,但由于计数信号量的存在,同一时间内处于活动状态(正在处理 job)的 Goroutine 的数量最多为 3 个。
# 4.2.3 len(channel) 的应用
len 是 Go 语言的一个内置函数,它支持接收数组、切片、map、字符串和 channel 类型的参数,并返回对应类型的“长度”,也就是一个整型值。
针对 channel ch 的类型不同,len(ch) 有如下两种语义:
- 当 ch 为无缓冲 channel 时,len(ch) 总是返回 0;
- 当 ch 为带缓冲 channel 时,len(ch) 返回当前 channel ch 中尚未被读取的元素个数。
channel 原语用于多个 Goroutine 间的通信,一旦多个 Goroutine 共同对 channel 进行收发操作,len(channel) 就会在多个 Goroutine 间形成“竞态”。不能单纯地依靠 len(channel) 来判断 channel 中元素状态,因为它不能保证在后续对 channel 的收发时 channel 状态是不变的。
以判空为例:
Goroutine1 使用 len(channel) 判空后,就会尝试从 channel 中接收数据。但在它真正从 channel 读数据之前,另外一个 Goroutine2 已经将数据读了出去,所以,Goroutine1 后面的读取就会阻塞在 channel 上,导致后面逻辑的失效。
为了不阻塞在 channel 上,常见的方法是将“判空与读取”放在一个“事务”中,将“判满与写入”放在一个“事务”中,而这类“事务”可以通过 select 实现:
func producer(c chan<- int) {
var i int = 1
for {
time.Sleep(2 * time.Second)
ok := trySend(c, i)
if ok {
fmt.Printf("[producer]: send [%d] to channel\n", i)
i++
continue
}
fmt.Printf("[producer]: try send [%d], but channel is full\n", i)
}
}
func tryRecv(c <-chan int) (int, bool) {
select {
case i := <-c:
return i, true
default:
return 0, false
}
}
func trySend(c chan<- int, i int) bool {
select {
case c <- i:
return true
default:
return false
}
}
func consumer(c <-chan int) {
for {
i, ok := tryRecv(c)
if !ok {
fmt.Println("[consumer]: try to recv from channel, but the channel is empty")
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("[consumer]: recv [%d] from channel\n", i)
if i >= 3 {
fmt.Println("[consumer]: exit")
return
}
}
}
func main() {
var wg sync.WaitGroup
c := make(chan int, 3)
wg.Add(2)
go func() {
producer(c)
wg.Done()
}()
go func() {
consumer(c)
wg.Done()
}()
wg.Wait()
}
/*
[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [1] to channel
[consumer]: recv [1] from channel
[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [2] to channel
[consumer]: recv [2] from channel
[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [3] to channel
[consumer]: recv [3] from channel
[consumer]: exit
[producer]: send [4] to channel
[producer]: send [5] to channel
[producer]: send [6] to channel
[producer]: try send [7], but channel is full
[producer]: try send [7], but channel is full
[producer]: try send [7], but channel is full
... ...
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
由于用到了 select 原语的 default 分支语义,当 channel 空的时候,tryRecv 不会阻塞;当 channel 满的时候,trySend 也不会阻塞。这个示例的运行结果也证明了这一点,无论是使用 tryRecv 的 consumer 还是使用 trySend 的 producer 都不会阻塞。
这种方法适用于大多数场合,但是这种方法有一个“问题”,那就是它改变了 channel 的状态,会让 channel 接收了一个元素或发送一个元素到 channel。
如果想在不改变 channel 状态的前提下,单纯地侦测 channel 的状态,而又不会因 channel 满或空阻塞在 channel 上。但很遗憾,目前没有一种方法可以在实现这样的功能的同时,适用于所有场合。只有在特定的场景下,可以用 len(channel) 来实现。比如下面这两种场景:
- 情景 (a) 是一个“多发送单接收”的场景,也就是有多个发送者,但有且只有一个接收者。在这样的场景下,可以在接收 goroutine 中使用len(channel)是否大于0来判断是否 channel 中有数据需要接收。
- 情景 (b) 是一个“多接收单发送”的场景,也就是有多个接收者,但有且只有一个发送者。在这样的场景下,可以在发送 Goroutine 中使用len(channel)是否小于cap(channel)来判断是否可以执行向 channel 的发送操作。
# 4.2.4 nil channel 的妙用
**nil channel:**一个 channel 类型变量的值为 nil。
nil channel 有一个特性,那就是对 nil channel 的读写都会发生阻塞。
func main() {
var c chan int
<-c //阻塞
}
或者:
func main() {
var c chan int
c<-1 //阻塞
}
2
3
4
5
6
7
8
9
10
11
无论上面的哪段代码被执行,main goroutine 都会阻塞在对 nil channel 的操作上。
func main() {
ch1, ch2 := make(chan int), make(chan int)
go func() {
time.Sleep(time.Second * 5)
ch1 <- 5
close(ch1)
}()
go func() {
time.Sleep(time.Second * 7)
ch2 <- 7
close(ch2)
}()
var ok1, ok2 bool
for {
select {
case x := <-ch1:
ok1 = true
fmt.Println(x)
case x := <-ch2:
ok2 = true
fmt.Println(x)
}
if ok1 && ok2 {
break
}
}
fmt.Println("program end")
}
// 期望程序在接收完 ch1 和 ch2 两个 channel 上的数据后就退出。但实际的运行情况:
/*
5
0
0
0
... ... //循环输出0
7
program end
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
原本期望上面这个在依次输出 5 和 7 两个数字后退出,但实际运行的输出结果却是在输出 5 之后,程序输出了许多的 0 值,之后才输出 7 并退出。
分析这段代码的运行过程:
- 前 5s,select 一直处于阻塞状态;
- 第 5s,ch1 返回一个 5 后被 close,select 语句的case x := <-ch1这个分支被选出执行,程序输出 5,并回到 for 循环并重新 select;
- 由于 ch1 被关闭,从一个已关闭的 channel 接收数据将永远不会被阻塞,于是新一轮 select 又把case x := <-ch1这个分支选出并执行。由于 ch1 处于关闭状态,从这个 channel 获取数据,我们会得到这个 channel 对应类型的零值,这里就是 0。于是程序再次输出 0;程序按这个逻辑循环执行,一直输出 0 值;
- 2s 后,ch2 被写入了一个数值 7。这样在某一轮 select 的过程中,分支case x := <-ch2被选中得以执行,程序输出 7 之后满足退出条件,于是程序终止。
用 nil channel 改进示例代码,按照预期输出:
func main() {
ch1, ch2 := make(chan int), make(chan int)
go func() {
time.Sleep(time.Second * 5)
ch1 <- 5
close(ch1)
}()
go func() {
time.Sleep(time.Second * 7)
ch2 <- 7
close(ch2)
}()
for {
select {
case x, ok := <-ch1:
if !ok {
ch1 = nil
} else {
fmt.Println(x)
}
case x, ok := <-ch2:
if !ok {
ch2 = nil
} else {
fmt.Println(x)
}
}
if ch1 == nil && ch2 == nil {
break
}
}
fmt.Println("program end")
}
/*
5
7
program end
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
改进后的示例程序的最关键的一个变化,就是在判断 ch1 或 ch2 被关闭后,显式地将 ch1 或 ch2 置为 nil。对一个 nil channel 执行获取操作,这个操作将阻塞。于是,这里已经被置为 nil 的 c1 或 c2 的分支,将再也不会被 select 选中执行。
# 4.2.5 与 select 结合使用的一些惯用法
channel 和 select 的结合使用能形成强大的表达能力。
第一种用法:利用 default 分支避免阻塞
elect 语句的 default 分支的语义,就是在其他非 default 分支因通信未就绪,而无法被选择的时候执行的,这就给 default 分支赋予了一种“避免阻塞”的特性。
其实“len(channel) 的应用”中,就已经用到了“利用 default 分支”实现的trySend和tryRecv两个函数:
func tryRecv(c <-chan int) (int, bool) {
select {
case i := <-c:
return i, true
default: // channel为空
return 0, false
}
}
func trySend(c chan<- int, i int) bool {
select {
case c <- i:
return true
default: // channel满了
return false
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
无论是无缓冲 channel 还是带缓冲 channel,这两个函数都能适用,并且不会阻塞在空 channel 或元素个数已经达到容量的 channel 上。
在 Go 标准库中,这个惯用法也有应用,比如:
// $GOROOT/src/time/sleep.go
func sendTime(c interface{}, seq uintptr) {
// 无阻塞的向c发送当前时间
select {
case c.(chan Time) <- Now():
default:
}
}
2
3
4
5
6
7
8
第二种用法:实现超时机制
带超时机制的 select,是 Go 中常见的一种 select 和 channel 的组合用法。通过超时事件,既可以避免长期陷入某种操作的等待中,也可以做一些异常处理工作。
具有 30s 超时的 select:
func worker() {
select {
case <-c:
// ... do some stuff
case <-time.After(30 *time.Second):
return
}
}
2
3
4
5
6
7
8
在应用带有超时机制的 select 时,要特别注意 timer 使用后的释放,尤其在大量创建 timer 的时候。
Go 语言标准库提供的 timer 实际上是由 Go 运行时自行维护的,而不是操作系统级的定时器资源,它的使用代价要比操作系统级的低许多。但即便如此,作为 time.Timer 的使用者,也要尽量减少在使用 Timer 时给 Go 运行时和 Go 垃圾回收带来的压力,要及时调用 timer 的 Stop 方法回收 Timer 资源。
第三种用法:实现心跳机制
结合 time 包的 Ticker,可以实现带有心跳机制的 select。
这种机制可以在监听 channel 的同时,执行一些周期性的任务,比如下面这段代码:
func worker() {
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-c:
// ... do some stuff
case <- heartbeat.C:
//... do heartbeat stuff
}
}
}
2
3
4
5
6
7
8
9
10
11
12
使用 time.NewTicker,创建一个 Ticker 类型实例 heartbeat。这个实例包含一个 channel 类型的字段 C,这个字段会按一定时间间隔持续产生事件,就像“心跳”一样。这样 for 循环在 channel c 无数据接收时,会每隔特定时间完成一次迭代,然后回到 for 循环进行下一次迭代。
和 timer 一样,在使用完 ticker 之后,也不要忘记调用它的 Stop 方法,避免心跳事件在 ticker 的 channel(上面示例中的 heartbeat.C)中持续产生。