35|即学即练:如何实现一个轻量级线程池?
35|即学即练:如何实现一个轻量级线程池?
讲述:Tony Bai
时长16:25大小15.00M
为什么要用到 Goroutine 池?
workerpool 的实现原理
workerpool 的一个最小可行实现
添加功能选项机制
小结
思考题
今天的项目源码在这里!
赞 13
提建议
精选留言(26)
- ivhong2022-03-17非常感谢老师带着做了一次这样的实现,因为我自己也尝试过这种实现(纯粹是为了学习用)。有几个问题我不是特别明白,不知道是不是和老师理解的一样,望老师闲暇之余给予指正,谢谢! 1. 这个是不是叫“协程池”,为什么叫做“线程池”?两者有什么区别呢?或者是到底什么是“协程”呢? 2. 是不是这节课的实现,也纯粹是为了学习而实现的,个人理解,go实现Goroutine,就是为了解决“线程池”的繁琐,让“并发”实现的不用那么的麻烦,如果是超小“任务”,不用考虑线程频繁切换导致系统资源的浪费。如果再实现“协程池”的话,是不是丢失了这种优点? 3. 常驻内存的Goroutine,反复使用,会导致这个Goroutine的内存越来越大,或者其他隐藏的风险么?展开
作者回复: 好问题。 1. 传统理解的coroutine一般是面向协作式,而非抢占式。像python中通过yield关键字创建的协程,与主routine之间是在一个线程上实现的切换执行,从设计角度是通过coroutine实现了并发(concurrency),但其实它们还是串行执行的,不会真正并行(paralellism),即便在多核处理器上。 基于上面的理解,我们就可以意识到goroutine并非传统意义上的coroutine,是支持抢占的,而且也必须依赖抢占实现runtime对goroutine的调度。它更像thread,可以绑定不同的cpu核并行执行(如果是在多核处理器上的话)。同时基于goroutine的设计也会一种并发的设计。 而goroutine与thread又不同,goroutine是在用户层(相较于os的内核层)调度的,os并不知道其存在,goroutine的切换相对轻量。而thread是os 来调度的,切换代价更高一些。 所以文中将goroutine称为“轻量级线程”,而不是协程。 2. 你理解的没错。这节课是为了演示goroutine、channel之间的调度与通信机制而“设计”出来的。goroutine使用代价很低,通常不用考虑池化。但是在一些大型网络服务程序时,一旦goroutine数量过多,内存占用以及调度goroutine的代价就不能不考虑了。于是有了“池化”的思路。这与传统的线程池的思路的确是一脉相承的 3. go是gc的,内存不会越来越大。
9 - $侯2022-01-17老师您好请教几个问题: 第一个问题,demo1中没有看到p.Free的代码示例,Free方法只是向p.quit <- struct{}{}发送一个空结构体就可以吗,请教下Free方式该如何写 第二个问题,demo1中好像也没看看到p.wg.Wait()
作者回复: 原文中有源码的链接,在最后。 源码在 https://github.com/bigwhite/publication/tree/master/column/timegeek/go-first-course/35 看了后,就可以回答你的问题了。
共 7 条评论3 - Darren2022-01-17老师以下几个问题哈: 1、第一种实现中,这块是不是有点问题: go func() { defer func() { if err := recover(); err != nil { fmt.Printf("worker[%03d]: recover panic[%s] and exit\n", i, err) <-p.active } p.wg.Done() }() <-p.active是不是应该要放到if的外面,如果task执行本身没有出错,正常结束了,active没有减少的地方 2、这块文字描述有点问题,p<-active应该是<-p.active “使用了 defer+recover 对 panic 进行捕捉,捕捉后 worker 也是要退出的,于是我们还通过p<-active更新了 worker 计数器” 3、第二种实现中,当没有提前创建worker,那么当tasks中有任务的时候,p.returnTask方法是干啥的?文章中没有这个方法,且文字也没有说明呀 func (p *Pool) run() { idx := len(p.active) if !p.preAlloc { loop: for t := range p.tasks { p.returnTask(t) select { case <-p.quit: return case p.active <- struct{}{}: idx++ p.newWorker(idx) default: break loop } } }展开
作者回复: 看的真细致!👍 1. worker一旦创建后,除了panic和quit通知退出,worker是不会退出的,也就是没有所谓“正常退出”的情况。所以没在defer中调用<-p.active。 2. 的确是笔误,感谢指出。 3. 在文后有源码链接。这里的task仅是触发了worker创建,这里是调度循环,不处理task,所以要把task扔回tasks channel,等worker启动后再处理。
共 12 条评论3 - Aeins2022-06-051. demo01 的实现,已经是一种预分配的实现,demo02 在 New 里面 去预分配,没有必要 2. demo02 ,task 取出来,再放回去,总感觉不优雅。1
- lesserror2022-01-24大白老师,谢谢你这一章节精彩的内容。有以下困惑,麻烦抽空解忧。 1.. 第二版的Goroutine池的实现中,最后的测试运行输出环节中,运行到 run方法 这个Goroutine中时,!p.preAlloc 为true,for无限循环其实一直在运行 这个 case p.active <- struct{}{}:。最后是因为主main Goroutine退出了,运行这个 无限for循环的 Goroutine也跟着退出了,所以这个无限for循环也退出了,可以这么理解吗? 2. Goroutine的运行不确定性,在并发程序中的理解心智负担很大呀,稍微复杂的程序,会很不容易理解程序的运行原理。老师这个课程,对于 Goroutine 在稍微复杂的编发程序中是如何运行的,好像没有过多的解释,很容易让新手不知所以然。至少,最后这个版本的 Goroutine 池的中,最后的测试运行输出应该解释一下,输出为何是这样的。如果不运行,要能想到程序的输出结果,我觉得这才叫完全理解了。我觉得我现在目前很难做到。 PS:returnTask 那里的逻辑能补充说明就好了,刚看到那里也是有点懵(看了评论才理解)。展开
作者回复: 1. 并不是等到main goroutine退出才退出。当Worker pool满员时,这里会走到default分支,跳出该loop,进入到下面那个for循环,后面的for循环才是真正维护这个pool的主循环。 2. 这篇写的的确有些仓促。
1 - 罗杰2022-01-18实战项目棒棒哒
作者回复: 👍
1 - ゝ骑着小车去兜风。2022-01-17看了这么长时间的专栏,感觉受益匪浅。之前虽然用过go但是一直没有系统的学习过,在开发过程中也遇到了许多坑。这段时间刷了两次函数方法和接口每次都能发掘到新的知识点,感觉对go有了全新的认识。对于今天这个小项目完全可以看的明白,但是如果说现在让我实现一个类似的协程池,就感觉自己没有这种思想头绪不知道从哪下手解决。我想问老师的是:怎么能够将自己所学的知识和实际的问题场景结合起来去解决实际问题,有哪些建议呢,还需要培养哪些技能? 感谢老师展开
作者回复: 不知道 在最后实战篇的三节课 是否能初步回答你的问题。
1 - 在下宝龙、2022-01-17最后一个例子,p := workerpool.New(5, workerpool.WithPreAllocWorkers(false),workerpool.WithBlock(false)) time.Sleep(time.Second * 2) 都给了两秒创建worker 都没创建好worker 吗?,按照代码的意思,即使不允许预先分配worker,作者run里面,继续分配worker 了啊,并且一开始没有task,直接就走default break loop了,感觉是不是多此一举了这里写的,break loop之后,就开始根据active 创建worker,设置是 5,那两秒肯定创建5个worker,所以不懂为甚么说woker不够。 是不是 p.returnTask(t) 这个缘故展开
作者回复: sleep的2s并没有创建worker啊。这里指定的WithPreAllocWorkers(false),即不预创建。run阻塞在对tasks channel的读取上,直到后面main goroutine第一次调用Schedule写入一个task,这时run才会开始按需创建Worker。
共 5 条评论1 - demajiao2023-01-22 来自浙江if !p.preAlloc { loop: for t := range p.tasks { p.returnTask(t) select { case <-p.quit: return case p.active <- struct{}{}: idx++ p.newWorker(idx) default: break loop } } } 这段代码感觉没用呀。展开
作者回复: 当preAlloc=false时,即不预分配时,有用。
- 撕影2023-01-14 来自湖南为何关键变化不写出来?太仓促了吧,一篇最后一节以没看懂收场,对学生打击可不小啊老师
作者回复: “关键变化”,指的是?
- Sunrise2022-11-24 来自辽宁考虑到 Goroutine 调度的次序的不确定性,这里我在创建 workerpool 与真正开始调用 Schedule 方法之间,做了一个 Sleep,尽量减少 Schedule 都返回失败的频率 这块也不太懂,为啥不加 Sleep 会全返回失败呢?
作者回复: 最后一版Schedule加入了default分支,当pool资源不够又设置为non block时,schedule肯定会返回error啊。
- Sunrise2022-11-23 来自北京有几个问题不大理解,望老师抽空解答: 1)自引用函数与选项设计是为了解决 go 函数没有默认参数和可选参数吗? go 函数为什么没有设计默认参数和可选参数呢? 2)为什么下面的 for { select ... } 放到 goroutine 中 才会输出 ch2: 2 ch1: 1 done, 如果直接放到外面只会输出 done? func TestSelect(t *testing.T) { ch1 := make(chan int) ch2 := make(chan int) go func() { ch1 <- 1 }() go func() { ch2 <- 2 }() go func() { for { select { case i := <-ch1: fmt.Println("ch1:", i) case j := <-ch2: fmt.Println("ch2:", j) default: fmt.Println("done") return } } }() // ch2: 2 ch1: 1 done }展开
作者回复: 问题1:主要是为了可选参数吧。为什么go没有原生支持默认参数与可选参数,我猜是因为go设计者压根就不想引入这个复杂性。 问题2: 和goroutine的调度顺序有关。
- 菠萝吹雪—Code2022-09-12 来自辽宁中秋假期打卡
作者回复: 👍
- H2022-08-25 来自辽宁有些没太懂: 1. run里面 p.returnTask(t),相当于又把 task 异步扔p.tasks 里面了,然后创建worker等待从 p.tasks里面取task执行。因为returnTask和newWorker都是异步,所以无法保证是range p.tasks还是worker先执行 2.在我理解,既然是想开5个容量的非预加载的线程池,10个循环中应该前5个应该都创建woker呀展开
作者回复: 1. 是的。 2. 原因就是问题1的描述
- 骚动2022-08-19 来自北京老师,请教一下,这种软件架构图是用什么画的啊?平时自己也想画画自己做的东西的架构图,但是没找到什么合适的工具
作者回复: 在线工具,可以用processon.com,离线的我一般用draw.io。
- MClink2022-07-21功能选项这种封装的方式确实拓展性很好,和传统的参数传入并自动绑定来说,不需要更改New 的代码,只需要在外拓展
作者回复: 👍
- 曾祥金2022-07-16感觉退出可以用context处理
- Geek_a6104e2022-07-10请问tasks通道为什么是不带缓存区的
作者回复: 示例原意就是设计一个同步提交的workerpool池。即如果Schedule方法返回成功,说明用户提交的task肯定被某个worker处理了。 如果要设计为异步提交语义,可以使用带缓冲的tasks channel。那样schedule方法提交后,task可能一直在channel中存着,直到有worker处理。但当缓冲区满了,还是会退化为同步提交。
- Hugh2022-05-17老师您好,请教个问题,之前说过在GMP模型中数据结构G是可以被复用的,那么协程池还有必要吗?因为协程池的一大优势就是复用协程,避免反复创建协程,那么其实go scheduler已经做到了
作者回复: 这是两个层面的复用。GMP对G数据结构的复用是在runtime调度层面,但这种复用 用户层无法控制,G也不是总能被复用,就像sync.Pool中数据项,在一定时间内如果没有goroutine去从池中get,那么还是会被释放的。 当然用户层面实现的goroutine 池也可以增加idle释放的特性(这一讲中的例子不支持),但这种释放是用户层可控的。
共 2 条评论 - Geek_as2022-04-27老师,最后一个例子好像有问题,假如我设置的是当没有活动go worker的时候阻塞,然后当前线程不会预创建,这时候我往池里面添加一个任务,由于没有worker可以接收,导致它走到了default,由于我是设置了获取不到就阻塞,所以我的代码走到了 if p.block{ p.tasks <- t } 这里,这时候run方法检测到task管道里有task,就接收下来,前面的p.tasks <- t阻塞释放了,会继续走下去,然后run的话,在接收了管道的task后,执行的下一步操作是returnTask,然后创建活动worker,去执行returnTask返回的task,但出现了神奇的一幕,run的执行流程卡在returnTask的p.tasks <- task中,导致后面的活动worker没法创建,一直卡在returnTask那里,后面我分析了一下,感觉应该是我的tasks是无缓冲队列,所以returnTask执行p.tasks <- task的时候就卡在这里,等待其他人的接收,但这样就有问题了,因为接收操作要么在run的range中,要么是由活动worker接收,因为卡在returnTask这里,所以没法创建活动worker,也就没worker消费,变成了类似于死锁的样子,假如我配置了阻塞,和不预创建,理论上我提交给池里面的所有任务都不会被执行,我试了一下,当提交的任务超过两个就会爆死锁异常,后面我将returnTask这一步动作移动到了创建worker之后,就解决了这个问题展开
作者回复: demo2使用Workpool2中的returnTask代码如下: func (p *Pool) returnTask(t Task) { go func() { p.tasks <- t }() } returnTask中创建了一个新goroutine来异步send task,returnTask怎么会卡住呢?