v0_2 添加功能选项机制
# v0.2 添加功能选项机制
功能选项机制,可以让某个包的用户可以根据自己的需求,通过设置不同功能选项来定制包的行为。
Go 语言中实现功能选项机制有多种方法,但 Go 社区目前使用最为广泛的一个方案,是 Go 语言之父 Rob Pike 在 2014 年在博文《自引用函数与选项设计》 (opens new window)中论述的一种,这种方案也被后人称为“功能选项(functional option)”方案。
首先,将 workerpool1 目录拷贝一份形成 workerpool2 目录,将在这个目录下为 workerpool 包添加功能选项机制。
然后,在 workerpool2 目录下创建 option.go 文件,在这个文件中,定义用于代表功能选项的类型 Option:
type Option func(*Pool)
这个 Option 实质是一个接受 *Pool 类型参数的函数类型。
为 workerpool 添加两个功能选项:Schedule 调用是否阻塞,以及是否预创建所有的 worker。
为了支持这两个功能选项,需要在 Pool 类型中增加两个 bool 类型的字段,字段的具体含义见代码中注释:
type Pool struct {
... ...
preAlloc bool // 是否在创建pool的时候就预创建workers,默认值为:false
// 当pool满的情况下,新的Schedule调用是否阻塞当前goroutine。默认值:true
// 如果block = false,则Schedule返回ErrNoWorkerAvailInPool
block bool
... ...
}
2
3
4
5
6
7
8
9
针对这两个字段,在 option.go 中添加两个功能选项,WithBlock 与 WithPreAllocWorkers:
func WithBlock(block bool) Option {
return func(p *Pool) {
p.block = block
}
}
func WithPreAllocWorkers(preAlloc bool) Option {
return func(p *Pool) {
p.preAlloc = preAlloc
}
}
2
3
4
5
6
7
8
9
10
11
这两个功能选项实质上是两个返回闭包函数的函数。
为了支持将这两个 Option 传给 workerpool,还需要改造一下 workerpool 包的 New 函数,改造后的 New 函数代码如下:
func New(capacity int, opts ...Option) *Pool {
... ...
for _, opt := range opts {
opt(p)
}
fmt.Printf("workerpool start(preAlloc=%t)\n", p.preAlloc)
if p.preAlloc {
// create all goroutines and send into works channel
for i := 0; i < p.capacity; i++ {
p.newWorker(i + 1)
p.active <- struct{}{}
}
}
go p.run()
return p
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
新版 New 函数除了接受 capacity 参数之外,还在它的参数列表中增加了一个类型为 Option 的可变长参数 opts。
在 New 函数体中,通过一个 for 循环,将传入的 Option 运用到 Pool 类型的实例上。新版 New 函数还会根据 preAlloc 的值来判断是否预创建所有的 worker,如果需要,就调用 newWorker 方法把所有 worker 都创建出来。
由于 preAlloc 选项的加入,Pool 的 run 方法的实现有了变化:
func (p *Pool) run() {
idx := len(p.active)
if !p.preAlloc {
loop:
for t := range p.tasks {
p.returnTask(t)
select {
case <-p.quit:
return
case p.active <- struct{}{}:
idx++
p.newWorker(idx)
default:
break loop
}
}
}
for {
select {
case <-p.quit:
return
case p.active <- struct{}{}:
// create a new worker
idx++
p.newWorker(idx)
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
新版 run 方法在 preAlloc=false 时,会根据 tasks channel 的情况在适合的时候创建 worker(第 4 行~ 第 18 行),直到 active channel 写满,才会进入到和第一版代码一样的调度逻辑中(第 20 行~ 第 29 行)。
而且,提供给用户的 Schedule 函数也因 WithBlock 选项,有了一些变化:
func (p *Pool) Schedule(t Task) error {
select {
case <-p.quit:
return ErrWorkerPoolFreed
case p.tasks <- t:
return nil
default:
if p.block {
p.tasks <- t
return nil
}
return ErrNoIdleWorkerInPool
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
Schedule 在 tasks chanel 无法写入的情况下,进入 default 分支。在 default 分支中,Schedule 根据 block 字段的值,决定究竟是继续阻塞在 tasks channel 上,还是返回 ErrNoIdleWorkerInPool 错误。
验证:
建立一个使用新版 workerpool 的项目 demo2,demo2 的 go.mod 与 demo1 的 go.mod 相似:
module demo2
go 1.17
require github.com/bigwhite/workerpool v1.0.0
replace github.com/bigwhite/workerpool v1.0.0 => ../workerpool2
2
3
4
5
6
7
demo2 的 main.go 文件如下:
package main
import (
"fmt"
"time"
"github.com/bigwhite/workerpool"
)
func main() {
p := workerpool.New(5, workerpool.WithPreAllocWorkers(false), workerpool.WithBlock(false))
time.Sleep(time.Second * 2)
for i := 0; i < 10; i++ {
err := p.Schedule(func() {
time.Sleep(time.Second * 3)
})
if err != nil {
fmt.Printf("task[%d]: error: %s\n", i, err.Error())
}
}
p.Free()
}
/*
workerpool start(preAlloc=false)
task[1]: error: no idle worker in pool
worker[001]: start
task[2]: error: no idle worker in pool
task[4]: error: no idle worker in pool
task[5]: error: no idle worker in pool
task[6]: error: no idle worker in pool
task[7]: error: no idle worker in pool
task[8]: error: no idle worker in pool
task[9]: error: no idle worker in pool
worker[001]: receive a task
worker[002]: start
worker[002]: exit
worker[001]: receive a task
worker[001]: exit
workerpool freed(preAlloc=false)
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
在 demo2 中,使用 workerpool 包提供的功能选项,设置了我们期望的 workerpool 的运作行为,包括不要预创建 worker,以及不要阻塞 Schedule 调用。
考虑到 Goroutine 调度的次序的不确定性,这里在创建 workerpool 与真正开始调用 Schedule 方法之间,做了一个 Sleep,尽量减少 Schedule 都返回失败的频率(但这仍然无法保证这种情况不会发生)。
运行结果,由于 Goroutine 调度的不确定性,这个结果仅仅是很多种结果的一种。仅仅 001 这个 worker 收到了 task,其余的 worker 都因为 worker 尚未创建完毕,而返回了错误,而不是像 demo1 那样阻塞在 Schedule 调用上。
项目源码地址:https://github.com/bigwhite/publication/tree/master/column/timegeek/go-first-course/35