运维八一 运维八一
首页
运维杂记
编程浅尝
周积跬步
专栏
生活
关于
收藏
  • 分类
  • 标签
  • 归档
Source (opens new window)

运维八一

运维,运维!
首页
运维杂记
编程浅尝
周积跬步
专栏
生活
关于
收藏
  • 分类
  • 标签
  • 归档
Source (opens new window)
  • Go

    • 前言

    • Go基础知识

    • Go基本语法

    • 实战项目:简单web服务

    • 基本数据类型

    • 内置运算符

    • 分支和循环

    • 函数 function

    • 结构体 struct

    • 方法 method

    • 实战项目:跟踪函数调用链

    • 接口 interface

    • 并发 concurrency

    • 指针

    • 实战项目:实现轻量级线程池

      • 项目背景
      • workerpool的实现原理
      • v0_1 最小可行实现
        • v0.1 最小可行实现
      • v0_2 添加功能选项机制
    • 实战项目:实现TCP服务器

    • go常用包

    • Gin框架

    • go随记

  • Python

  • Shell

  • Java

  • Vue

  • 前端

  • 编程浅尝
  • Go
  • 实战项目:实现轻量级线程池
lyndon
2022-06-07
目录

v0_1 最小可行实现

# v0.1 最小可行实现

建立 workerpool 目录作为实战项目的源码根目录,然后为这个项目创建 go module:

$mkdir workerpool1
$cd workerpool1
$go mod init github.com/bigwhite/workerpool
1
2
3

创建 pool.go 作为 workpool 包的主要源码文件。在这个源码文件中,定义 Pool 结构体类型,这个类型的实例代表一个 workerpool:

type Pool struct {
    capacity int         // workerpool大小

    active chan struct{} // 对应上图中的active channel
    tasks  chan Task     // 对应上图中的task channel

    wg   sync.WaitGroup  // 用于在pool销毁时等待所有worker退出
    quit chan struct{}   // 用于通知各个worker退出的信号channel
}
1
2
3
4
5
6
7
8
9

workerpool 包对外主要提供三个 API,分别是:

  • workerpool.New:用于创建一个 pool 类型实例,并将 pool 池的 worker 管理机制运行起来;
  • workerpool.Free:用于销毁一个 pool 池,停掉所有 pool 池中的 worker;
  • Pool.Schedule:这是 Pool 类型的一个导出方法,workerpool 包的用户通过该方法向 pool 池提交待执行的任务(Task)。

API workerpool.New 的实现:

func New(capacity int) *Pool {
    if capacity <= 0 {
        capacity = defaultCapacity
    }
    if capacity > maxCapacity { 
        capacity = maxCapacity
    } 

    p := &Pool{
        capacity: capacity,
        tasks:    make(chan Task),
        quit:     make(chan struct{}),
        active:   make(chan struct{}, capacity),
    }

    fmt.Printf("workerpool start\n")

    go p.run()

    return p
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

New 函数接受一个参数 capacity 用于指定 workerpool 池的容量,这个参数用于控制 workerpool 最多只能有 capacity 个 worker,共同处理用户提交的任务请求。

函数开始处有一个对 capacity 参数的“防御性”校验,当用户传入不合理的值时,函数 New 会将它纠正为合理的值。

Pool 类型实例变量 p 完成初始化后,创建了一个新的 Goroutine,用于对 workerpool 进行管理,这个 Goroutine 执行的是 Pool 类型的 run 方法:

func (p *Pool) run() { 
    idx := 0 

    for { 
        select { 
        case <-p.quit:
            return
        case p.active <- struct{}{}:
            // create a new worker
            idx++
            p.newWorker(idx)
        } 
    } 
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

run 方法内是一个无限循环,循环体中使用 select 监视 Pool 类型实例的两个 channel:quit 和 active。这种在 for 中使用 select 监视多个 channel 的实现,在 Go 代码中十分常见,是一种惯用法。

当接收到来自 quit channel 的退出“信号”时,这个 Goroutine 就会结束运行。

当 active channel 可写时,run 方法就会创建一个新的 worker Goroutine。

此外,为了方便在程序中区分各个 worker 输出的日志,将一个从 1 开始的变量 idx 作为 worker 的编号,并把它以参数的形式传给创建 worker 的方法。

将创建新的 worker goroutine 的职责,封装到一个名为 newWorker 的方法中:

func (p *Pool) newWorker(i int) {
    p.wg.Add(1)
    go func() {
        defer func() {
            if err := recover(); err != nil {
                fmt.Printf("worker[%03d]: recover panic[%s] and exit\n", i, err)
                <-p.active
            }
            p.wg.Done()
        }()

        fmt.Printf("worker[%03d]: start\n", i)

        for {
            select {
            case <-p.quit:
                fmt.Printf("worker[%03d]: exit\n", i)
                <-p.active
                return
            case t := <-p.tasks:
                fmt.Printf("worker[%03d]: receive a task\n", i)
                t()
            }
        }
    }()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

在创建一个新的 worker goroutine 之前,newWorker 方法会先调用 p.wg.Add 方法将 WaitGroup 的等待计数加一。由于每个 worker 运行于一个独立的 Goroutine 中,newWorker 方法通过 go 关键字创建了一个新的 Goroutine 作为 worker。

新 worker 的核心,依然是一个基于 for-select 模式的循环语句,在循环体中,新 worker 通过 select 监视 quit 和 tasks 两个 channel。

和前面的 run 方法一样,当接收到来自 quit channel 的退出“信号”时,这个 worker 就会结束运行。tasks channel 中放置的是用户通过 Schedule 方法提交的请求,新 worker 会从这个 channel 中获取最新的 Task 并运行这个 Task。

API Pool.Schedule 的实现:

var ErrWorkerPoolFreed    = errors.New("workerpool freed") //workerpool已终止运行

func (p *Pool) Schedule(t Task) error {
    select {
    case <-p.quit:
        return ErrWorkerPoolFreed
    case p.tasks <- t:
        return nil
    }
}
1
2
3
4
5
6
7
8
9
10

Schedule 方法的核心逻辑,是将传入的 Task 实例发送到 workerpool 的 tasks channel 中。

但考虑到现在 workerpool 已经被销毁的状态,我们这里通过一个 select,检视 quit channel 是否有“信号”可读,如果有,就返回一个哨兵错误 ErrWorkerPoolFreed。如果没有,一旦 p.tasks 可写,提交的 Task 就会被写入 tasks channel,以供 pool 中的 worker 处理。

要注意的是,这里的 Pool 结构体中的 tasks 是一个无缓冲的 channel,如果 pool 中 worker 数量已达上限,而且 worker 都在处理 task 的状态,那么 Schedule 方法就会阻塞,直到有 worker 变为 idle 状态来读取 tasks channel,schedule 的调用阻塞才会解除。

至此,workerpool 的最小可行实现的主要逻辑都已实现。

验证:

建立一个使用 workerpool 的项目 demo1:

$mkdir demo1
$cd demo1
$go mod init demo1
1
2
3

引用本地的 module,需要手工修改一下 demo1 的 go.mod 文件,并利用 replace 指示符将 demo1 对 workerpool 的引用指向本地 workerpool1 路径:

module demo1

go 1.17

require github.com/bigwhite/workerpool v1.0.0

replace github.com/bigwhite/workerpool v1.0.0 => ../workerpool1
1
2
3
4
5
6
7

然后创建 demo1 的 main.go 文件,源码如下:

package main
  
import (
    "time"
    "github.com/bigwhite/workerpool"
)

func main() {
    p := workerpool.New(5)

    for i := 0; i < 10; i++ {
        err := p.Schedule(func() {
            time.Sleep(time.Second * 3)
        })
        if err != nil {
            println("task: ", i, "err:", err)
        }
    }

    p.Free()
}

/*
workerpool start
worker[005]: start
worker[005]: receive a task
worker[003]: start
worker[003]: receive a task
worker[004]: start
worker[004]: receive a task
worker[001]: start
worker[002]: start
worker[001]: receive a task
worker[002]: receive a task
worker[004]: receive a task
worker[005]: receive a task
worker[003]: receive a task
worker[002]: receive a task
worker[001]: receive a task
worker[001]: exit
worker[005]: exit
worker[002]: exit
worker[003]: exit
worker[004]: exit
workerpool freed
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

创建一个 capacity 为 5 的 workerpool 实例,并连续向这个 workerpool 提交了 10 个 task,每个 task 的逻辑很简单,只是 Sleep 3 秒后就退出。main 函数在提交完任务后,调用 workerpool 的 Free 方法销毁 pool,pool 会等待所有 worker 执行完 task 后再退出。

不足之处:

虽然可以通过 capacity 参数可以指定 workerpool 容量,但无法对 workerpool 的行为进行定制。比如当 workerpool 中的 worker 数量已达上限,而且 worker 都在处理 task 时,用户调用 Schedule 方法将阻塞,如果用户不想阻塞在这里,以目前的实现是做不到的。

解决方案:

为 workerpool 添加功能选项(functional option)机制。

上次更新: 2022/10/06, 00:04:41
workerpool的实现原理
v0_2 添加功能选项机制

← workerpool的实现原理 v0_2 添加功能选项机制→

最近更新
01
ctr和crictl显示镜像不一致
03-13
02
alpine镜像集成常用数据库客户端
03-13
03
create-cluster
02-26
更多文章>
Theme by Vdoing | Copyright © 2015-2024 op81.com
苏ICP备18041258号-2
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式