Warning
本文最后更新于 March 21, 2021,文中内容可能已过时,请谨慎使用。
Go 是一个自动垃圾回收的编程语言,采用三色并发标记算法标记对象并回收。和其它没有自动垃圾回收的编程语言不同,使用 Go 语言创建对象的时候,我们没有回收 / 释放的心理负担,想用就用,想创建就创建。但是,如果你想使用 Go 开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响
Pool
Pool 的出现,可以避免反复的创建一些对象,比如 TCP链接、数据库链接等等,这些对象创建都比较耗时,如果将创建好的对象放入到池子中,需要的时候取,不需要的时候归还池子,将是一个非常不错的实现方式。
通过创建一个 Worker Pool 来减少 goroutine 的使用。比如,我们实现一个 TCP 服务器,如果每一个连接都要由一个独立的 goroutine 去处理的话,在大量连接的情况下,就会创建大量的 goroutine,这个时候,我们就可以创建一个固定数量的 goroutine(Worker),由这一组 Worker 去处理连接,比如 fasthttp 中的Worker Pool。
一句话总结:保存和复用临时对象,减少内存分配,降低 GC 压力。
举例:
1
2
3
4
5
6
7
8
9
10
11
12
| type Student struct {
Name string
Age int32
Remark [1024]byte
}
var buf, _ = json.Marshal(Student{Name: "Geektutu", Age: 25})
func unmarsh() {
stu := &Student{}
json.Unmarshal(buf, stu)
}
|
json 的反序列化在文本解析和网络通信过程中非常常见,当程序并发度非常高的情况下,短时间内需要创建大量的临时对象。而这些对象是都是分配在堆上的,会给 GC 造成很大压力,严重影响程序的性能。
声明对象池
只需要实现 New 函数即可。对象池中没有对象时,将会调用 New 函数创建。
1
2
3
4
5
| var studentPool = sync.Pool{
New: func() interface{} {
return new(Student)
},
}
|
**Get && Put **
1
2
3
| stu := studentPool.Get().(*Student)
json.Unmarshal(buf, stu)
studentPool.Put(stu)
|
- Get() 用于从对象池中获取对象,因为返回值是 interface{},因此需要类型转换。
- Put() 则是在对象使用完毕后,返回对象池。
gammazero/workerpool
gammazero/workerpool gammazero/workerpool 可以无限制地提交任务,提供了更便利的 Submit 和 SubmitWait 方法提交任务,还可以提供当前的 worker 数和任务数以及关闭 Pool 的功能。
下面做一些介绍
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| package main
import (
"fmt"
"github.com/gammazero/workerpool"
)
func main() {
wp := workerpool.New(2)
requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}
for _, r := range requests {
r := r
wp.Submit(func() {
fmt.Println("Handling request:", r)
})
}
wp.StopWait()
}
|
排队的任务数没有上限,只有系统资源的限制。如果入站任务的数量太多,以至于无法排队等待处理那么解决方案就超出了workerpool的处理范围,应该通过在多个系统上分配负载来解决
使用介绍
Submit
用户通过 Submit(task func())
方法提交一个 task 到 task 队列中。
task 函数默认没有返回值,如果想要有返回值,可以用管道将task 的返回值传到管道中。
提交的 task 会立即开启一个可用的worker或者新创建一个worker。如果没有可用的worker或者worker数已经达到最大,task会被放入到 task 等待队列中。当worker空闲时会从task 等待队列中取出task。
一个Worker长时间闲置时可以删除并释放资源。
这个函数非常简单,就是将收到的待执行任务放入到 task等待队列中。
1
2
3
4
5
| func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
}
}
|
还有一个变种, 支持同步等待结果
1
2
3
4
5
6
7
8
9
10
11
| func (p *WorkerPool) SubmitWait(task func()) {
if task == nil {
return
}
doneChan := make(chan struct{})
p.taskQueue <- func() {
task()
close(doneChan)
}
<-doneChan
}
|
New
New()
函数创建了一个 worker goroutines pool 。 Max 指定了最大的worker数量,也就是最大并发的执行数,当没有新到来的 task 时,worker会逐渐减少至0.这里注意 taskQueue 是一个只有1个buffer的缓冲,task等待队列是 waitingQueue()
。dispatch 里实现派发任务的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
pool := &WorkerPool{
maxWorkers: maxWorkers,
taskQueue: make(chan func(), 1),
workerQueue: make(chan func()),
stopSignal: make(chan struct{}),
stoppedChan: make(chan struct{}),
}
// Start the task dispatcher.
go pool.dispatch()
return pool
}
|
dispatch 任务派发 通过设置 idleTimer ,超过这个时间还没有task,就会杀掉一个worker。这个函数刚开始判断等待队列是否为 0,如果不为0,说明任务已经积压,需要将新的task传到等待队列中 ,processWaitingQueue
函数内将 task 传给 waitQueue,并在workerQueue有buffer时将任务传给 worker Queue。
如果 waitQueue 不存在(长度为0),说明还不存在任务排队情况,会将task传给 workerQueue(如果workerQueue能把task塞进去的话),如果塞不进去
就创建一个新worker,要么worker刚好又不够了(达到最大worker数量),任务扔进 waitQueue。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
| // dispatch sends the next queued task to an available worker.
func (p *WorkerPool) dispatch() {
defer close(p.stoppedChan)
timeout := time.NewTimer(idleTimeout)
var workerCount int
var idle bool
Loop:
for {
if p.waitingQueue.Len() != 0 {
if !p.processWaitingQueue() {
break Loop
}
continue
}
select {
case task, ok := <-p.taskQueue:
if !ok {
break Loop
}
// Got a task to do.
select {
case p.workerQueue <- task:
default:
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
go startWorker(task, p.workerQueue)
workerCount++
} else {
// Enqueue task to be executed by next available worker.
p.waitingQueue.PushBack(task)
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
}
}
idle = false
case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker if
// pool has been idle for a whole timeout.
if idle && workerCount > 0 {
if p.killIdleWorker() {
workerCount--
}
}
idle = true
timeout.Reset(idleTimeout)
}
}
// If instructed to wait, then run tasks that are already queued.
if p.wait {
p.runQueuedTasks()
}
// Stop all remaining workers as they become ready.
for workerCount > 0 {
p.workerQueue <- nil
workerCount--
}
timeout.Stop()
}
|
waitingQueue
的目的是讲一个新的task放入到等待 task 队列。或者等待工人队列中有可用的工人时将 task 等待队列中取出 task 交给 worker 队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| func (p *WorkerPool) processWaitingQueue() bool {
select {
case task, ok := <-p.taskQueue:
if !ok {
return false
}
p.waitingQueue.PushBack(task)
case p.workerQueue <- p.waitingQueue.Front().(func()):
// A worker was ready, so gave task to worker.
p.waitingQueue.PopFront()
}
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
return true
}
|