v0_1 最小可行实现
# v0.1 最小可行实现
建立 workerpool 目录作为实战项目的源码根目录,然后为这个项目创建 go module:
$mkdir workerpool1
$cd workerpool1
$go mod init github.com/bigwhite/workerpool
2
3
创建 pool.go 作为 workpool 包的主要源码文件。在这个源码文件中,定义 Pool 结构体类型,这个类型的实例代表一个 workerpool:
type Pool struct {
capacity int // workerpool大小
active chan struct{} // 对应上图中的active channel
tasks chan Task // 对应上图中的task channel
wg sync.WaitGroup // 用于在pool销毁时等待所有worker退出
quit chan struct{} // 用于通知各个worker退出的信号channel
}
2
3
4
5
6
7
8
9
workerpool 包对外主要提供三个 API,分别是:
- workerpool.New:用于创建一个 pool 类型实例,并将 pool 池的 worker 管理机制运行起来;
- workerpool.Free:用于销毁一个 pool 池,停掉所有 pool 池中的 worker;
- Pool.Schedule:这是 Pool 类型的一个导出方法,workerpool 包的用户通过该方法向 pool 池提交待执行的任务(Task)。
API workerpool.New 的实现:
func New(capacity int) *Pool {
if capacity <= 0 {
capacity = defaultCapacity
}
if capacity > maxCapacity {
capacity = maxCapacity
}
p := &Pool{
capacity: capacity,
tasks: make(chan Task),
quit: make(chan struct{}),
active: make(chan struct{}, capacity),
}
fmt.Printf("workerpool start\n")
go p.run()
return p
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
New 函数接受一个参数 capacity 用于指定 workerpool 池的容量,这个参数用于控制 workerpool 最多只能有 capacity 个 worker,共同处理用户提交的任务请求。
函数开始处有一个对 capacity 参数的“防御性”校验,当用户传入不合理的值时,函数 New 会将它纠正为合理的值。
Pool 类型实例变量 p 完成初始化后,创建了一个新的 Goroutine,用于对 workerpool 进行管理,这个 Goroutine 执行的是 Pool 类型的 run 方法:
func (p *Pool) run() {
idx := 0
for {
select {
case <-p.quit:
return
case p.active <- struct{}{}:
// create a new worker
idx++
p.newWorker(idx)
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
run 方法内是一个无限循环,循环体中使用 select 监视 Pool 类型实例的两个 channel:quit 和 active。这种在 for 中使用 select 监视多个 channel 的实现,在 Go 代码中十分常见,是一种惯用法。
当接收到来自 quit channel 的退出“信号”时,这个 Goroutine 就会结束运行。
当 active channel 可写时,run 方法就会创建一个新的 worker Goroutine。
此外,为了方便在程序中区分各个 worker 输出的日志,将一个从 1 开始的变量 idx 作为 worker 的编号,并把它以参数的形式传给创建 worker 的方法。
将创建新的 worker goroutine 的职责,封装到一个名为 newWorker 的方法中:
func (p *Pool) newWorker(i int) {
p.wg.Add(1)
go func() {
defer func() {
if err := recover(); err != nil {
fmt.Printf("worker[%03d]: recover panic[%s] and exit\n", i, err)
<-p.active
}
p.wg.Done()
}()
fmt.Printf("worker[%03d]: start\n", i)
for {
select {
case <-p.quit:
fmt.Printf("worker[%03d]: exit\n", i)
<-p.active
return
case t := <-p.tasks:
fmt.Printf("worker[%03d]: receive a task\n", i)
t()
}
}
}()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
在创建一个新的 worker goroutine 之前,newWorker 方法会先调用 p.wg.Add 方法将 WaitGroup 的等待计数加一。由于每个 worker 运行于一个独立的 Goroutine 中,newWorker 方法通过 go 关键字创建了一个新的 Goroutine 作为 worker。
新 worker 的核心,依然是一个基于 for-select 模式的循环语句,在循环体中,新 worker 通过 select 监视 quit 和 tasks 两个 channel。
和前面的 run 方法一样,当接收到来自 quit channel 的退出“信号”时,这个 worker 就会结束运行。tasks channel 中放置的是用户通过 Schedule 方法提交的请求,新 worker 会从这个 channel 中获取最新的 Task 并运行这个 Task。
API Pool.Schedule 的实现:
var ErrWorkerPoolFreed = errors.New("workerpool freed") //workerpool已终止运行
func (p *Pool) Schedule(t Task) error {
select {
case <-p.quit:
return ErrWorkerPoolFreed
case p.tasks <- t:
return nil
}
}
2
3
4
5
6
7
8
9
10
Schedule 方法的核心逻辑,是将传入的 Task 实例发送到 workerpool 的 tasks channel 中。
但考虑到现在 workerpool 已经被销毁的状态,我们这里通过一个 select,检视 quit channel 是否有“信号”可读,如果有,就返回一个哨兵错误 ErrWorkerPoolFreed。如果没有,一旦 p.tasks 可写,提交的 Task 就会被写入 tasks channel,以供 pool 中的 worker 处理。
要注意的是,这里的 Pool 结构体中的 tasks 是一个无缓冲的 channel,如果 pool 中 worker 数量已达上限,而且 worker 都在处理 task 的状态,那么 Schedule 方法就会阻塞,直到有 worker 变为 idle 状态来读取 tasks channel,schedule 的调用阻塞才会解除。
至此,workerpool 的最小可行实现的主要逻辑都已实现。
验证:
建立一个使用 workerpool 的项目 demo1:
$mkdir demo1
$cd demo1
$go mod init demo1
2
3
引用本地的 module,需要手工修改一下 demo1 的 go.mod 文件,并利用 replace 指示符将 demo1 对 workerpool 的引用指向本地 workerpool1 路径:
module demo1
go 1.17
require github.com/bigwhite/workerpool v1.0.0
replace github.com/bigwhite/workerpool v1.0.0 => ../workerpool1
2
3
4
5
6
7
然后创建 demo1 的 main.go 文件,源码如下:
package main
import (
"time"
"github.com/bigwhite/workerpool"
)
func main() {
p := workerpool.New(5)
for i := 0; i < 10; i++ {
err := p.Schedule(func() {
time.Sleep(time.Second * 3)
})
if err != nil {
println("task: ", i, "err:", err)
}
}
p.Free()
}
/*
workerpool start
worker[005]: start
worker[005]: receive a task
worker[003]: start
worker[003]: receive a task
worker[004]: start
worker[004]: receive a task
worker[001]: start
worker[002]: start
worker[001]: receive a task
worker[002]: receive a task
worker[004]: receive a task
worker[005]: receive a task
worker[003]: receive a task
worker[002]: receive a task
worker[001]: receive a task
worker[001]: exit
worker[005]: exit
worker[002]: exit
worker[003]: exit
worker[004]: exit
workerpool freed
*/
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
创建一个 capacity 为 5 的 workerpool 实例,并连续向这个 workerpool 提交了 10 个 task,每个 task 的逻辑很简单,只是 Sleep 3 秒后就退出。main 函数在提交完任务后,调用 workerpool 的 Free 方法销毁 pool,pool 会等待所有 worker 执行完 task 后再退出。
不足之处:
虽然可以通过 capacity 参数可以指定 workerpool 容量,但无法对 workerpool 的行为进行定制。比如当 workerpool 中的 worker 数量已达上限,而且 worker 都在处理 task 时,用户调用 Schedule 方法将阻塞,如果用户不想阻塞在这里,以目前的实现是做不到的。
解决方案:
为 workerpool 添加功能选项(functional option)机制。