极客时间已完结课程限时免费阅读

33|并发:小channel中蕴含大智慧

33|并发:小channel中蕴含大智慧-极客时间

33|并发:小channel中蕴含大智慧

讲述:Tony Bai

时长25:26大小23.23M

你好,我是 Tony Bai。
通过上两节课的学习,我们知道了 Go 语言实现了基于 CSP(Communicating Sequential Processes)理论的并发方案。
Go 语言的 CSP 模型的实现包含两个主要组成部分:一个是 Goroutine,它是 Go 应用并发设计的基本构建与执行单元;另一个就是 channel,它在并发模型中扮演着重要的角色。channel 既可以用来实现 Goroutine 间的通信,还可以实现 Goroutine 间的同步。它就好比 Go 并发设计这门“武功”的秘籍口诀,可以说,学会在 Go 并发设计时灵活运用 channel,才能说真正掌握了 Go 并发设计的真谛。
所以,在这一讲中,我们就来系统学习 channel 这一并发原语的基础语法与常见使用方法。

作为一等公民的 channel

Go 对并发的原生支持可不是仅仅停留在口号上的,Go 在语法层面将并发原语 channel 作为一等公民对待。在前面的第 21 讲中我们已经学过“一等公民”这个概念了,如果你记不太清了可以回去复习一下。
那 channel 作为一等公民意味着什么呢?
这意味着我们可以像使用普通变量那样使用 channel,比如,定义 channel 类型变量、给 channel 变量赋值、将 channel 作为参数传递给函数 / 方法、将 channel 作为返回值从函数 / 方法中返回,甚至将 channel 发送到其他 channel 中。这就大大简化了 channel 原语的使用,提升了我们开发者在做并发设计和实现时的体验。

创建 channel

和切片、结构体、map 等一样,channel 也是一种复合数据类型。也就是说,我们在声明一个 channel 类型变量时,必须给出其具体的元素类型,比如下面的代码这样:
var ch chan int
这句代码里,我们声明了一个元素为 int 类型的 channel 类型变量 ch。
如果 channel 类型变量在声明时没有被赋予初值,那么它的默认值为 nil。并且,和其他复合数据类型支持使用复合类型字面值作为变量初始值不同,为 channel 类型变量赋初值的唯一方法就是使用 make 这个 Go 预定义的函数,比如下面代码:
ch1 := make(chan int)
ch2 := make(chan int, 5)
这里,我们声明了两个元素类型为 int 的 channel 类型变量 ch1 和 ch2,并给这两个变量赋了初值。但我们看到,两个变量的赋初值操作使用的 make 调用的形式有所不同。
第一行我们通过make(chan T)创建的、元素类型为 T 的 channel 类型,是无缓冲 channel,而第二行中通过带有 capacity 参数的make(chan T, capacity)创建的元素类型为 T、缓冲区长度为 capacity 的 channel 类型,是带缓冲 channel
这两种类型的变量关于发送(send)与接收(receive)的特性是不同的,我们接下来就基于这两种类型的 channel,看看 channel 类型变量如何进行发送和接收数据元素。

发送与接收

Go 提供了<-操作符用于对 channel 类型变量进行发送与接收操作:
ch1 <- 13 // 将整型字面值13发送到无缓冲channel类型变量ch1中
n := <- ch1 // 从无缓冲channel类型变量ch1中接收一个整型值存储到整型变量n中
ch2 <- 17 // 将整型字面值17发送到带缓冲channel类型变量ch2中
m := <- ch2 // 从带缓冲channel类型变量ch2中接收一个整型值存储到整型变量m中
这里我要提醒你一句,在理解 channel 的发送与接收操作时,你一定要始终牢记:channel 是用于 Goroutine 间通信的,所以绝大多数对 channel 的读写都被分别放在了不同的 Goroutine 中。
现在,我们先来看看无缓冲 channel 类型变量(如 ch1)的发送与接收。
由于无缓冲 channel 的运行时层实现不带有缓冲区,所以 Goroutine 对无缓冲 channel 的接收和发送操作是同步的。也就是说,对同一个无缓冲 channel,只有对它进行接收操作的 Goroutine 和对它进行发送操作的 Goroutine 都存在的情况下,通信才能得以进行,否则单方面的操作会让对应的 Goroutine 陷入挂起状态,比如下面示例代码:
func main() {
ch1 := make(chan int)
ch1 <- 13 // fatal error: all goroutines are asleep - deadlock!
n := <-ch1
println(n)
}
在这个示例中,我们创建了一个无缓冲的 channel 类型变量 ch1,对 ch1 的读写都放在了一个 Goroutine 中。
运行这个示例,我们就会得到 fatal error,提示我们所有 Goroutine 都处于休眠状态,程序处于死锁状态。要想解除这种错误状态,我们只需要将接收操作,或者发送操作放到另外一个 Goroutine 中就可以了,比如下面代码:
func main() {
ch1 := make(chan int)
go func() {
ch1 <- 13 // 将发送操作放入一个新goroutine中执行
}()
n := <-ch1
println(n)
}
由此,我们可以得出结论:对无缓冲 channel 类型的发送与接收操作,一定要放在两个不同的 Goroutine 中进行,否则会导致 deadlock
接下来,我们再来看看带缓冲 channel 的发送与接收操作。
和无缓冲 channel 相反,带缓冲 channel 的运行时层实现带有缓冲区,因此,对带缓冲 channel 的发送操作在缓冲区未满、接收操作在缓冲区非空的情况下是异步的(发送或接收不需要阻塞等待)。
也就是说,对一个带缓冲 channel 来说,在缓冲区未满的情况下,对它进行发送操作的 Goroutine 并不会阻塞挂起;在缓冲区有数据的情况下,对它进行接收操作的 Goroutine 也不会阻塞挂起。
但当缓冲区满了的情况下,对它进行发送操作的 Goroutine 就会阻塞挂起;当缓冲区为空的情况下,对它进行接收操作的 Goroutine 也会阻塞挂起。
如果光看文字还不是很好理解,你可以再看看下面几个关于带缓冲 channel 的操作的例子:
ch2 := make(chan int, 1)
n := <-ch2 // 由于此时ch2的缓冲区中无数据,因此对其进行接收操作将导致goroutine挂起
ch3 := make(chan int, 1)
ch3 <- 17 // 向ch3发送一个整型数17
ch3 <- 27 // 由于此时ch3中缓冲区已满,再向ch3发送数据也将导致goroutine挂起
也正是因为带缓冲 channel 与无缓冲 channel 在发送与接收行为上的差异,在具体使用上,它们有各自的“用武之地”,这个我们等会再细说,现在我们先继续把 channel 的基本语法讲完。
使用操作符<-,我们还可以声明只发送 channel 类型(send-only)和只接收 channel 类型(recv-only),我们接着看下面这个例子:
ch1 := make(chan<- int, 1) // 只发送channel类型
ch2 := make(<-chan int, 1) // 只接收channel类型
<-ch1 // invalid operation: <-ch1 (receive from send-only type chan<- int)
ch2 <- 13 // invalid operation: ch2 <- 13 (send to receive-only type <-chan int)
你可以从这个例子中看到,试图从一个只发送 channel 类型变量中接收数据,或者向一个只接收 channel 类型发送数据,都会导致编译错误。通常只发送 channel 类型和只接收 channel 类型,会被用作函数的参数类型或返回值,用于限制对 channel 内的操作,或者是明确可对 channel 进行的操作的类型,比如下面这个例子:
func produce(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i + 1
time.Sleep(time.Second)
}
close(ch)
}
func consume(ch <-chan int) {
for n := range ch {
println(n)
}
}
func main() {
ch := make(chan int, 5)
var wg sync.WaitGroup
wg.Add(2)
go func() {
produce(ch)
wg.Done()
}()
go func() {
consume(ch)
wg.Done()
}()
wg.Wait()
}
在这个例子中,我们启动了两个 Goroutine,分别代表生产者(produce)与消费者(consume)。生产者只能向 channel 中发送数据,我们使用chan<- int作为 produce 函数的参数类型;消费者只能从 channel 中接收数据,我们使用<-chan int作为 consume 函数的参数类型。
在消费者函数 consume 中,我们使用了 for range 循环语句来从 channel 中接收数据,for range 会阻塞在对 channel 的接收操作上,直到 channel 中有数据可接收或 channel 被关闭循环,才会继续向下执行。channel 被关闭后,for range 循环也就结束了。

关闭 channel

在上面的例子中,produce 函数在发送完数据后,调用 Go 内置的 close 函数关闭了 channel。channel 关闭后,所有等待从这个 channel 接收数据的操作都将返回。
这里我们继续看一下采用不同接收语法形式的语句,在 channel 被关闭后的返回值的情况:
n := <- ch // 当ch被关闭后,n将被赋值为ch元素类型的零值
m, ok := <-ch // 当ch被关闭后,m将被赋值为ch元素类型的零值, ok值为false
for v := range ch { // 当ch被关闭后,for range循环结束
... ...
}
我们看到,通过“comma, ok”惯用法或 for range 语句,我们可以准确地判定 channel 是否被关闭。而单纯采用n := <-ch形式的语句,我们就无法判定从 ch 返回的元素类型零值,究竟是不是因为 channel 被关闭后才返回的。
另外,从前面 produce 的示例程序中,我们也可以看到,channel 是在 produce 函数中被关闭的,这也是 channel 的一个使用惯例,那就是发送端负责关闭 channel
这里为什么要在发送端关闭 channel 呢?
这是因为发送端没有像接受端那样的、可以安全判断 channel 是否被关闭了的方法。同时,一旦向一个已经关闭的 channel 执行发送操作,这个操作就会引发 panic,比如下面这个示例:
ch := make(chan int, 5)
close(ch)
ch <- 13 // panic: send on closed channel

select

当涉及同时对多个 channel 进行操作时,我们会结合 Go 为 CSP 并发模型提供的另外一个原语 select,一起使用。
通过 select,我们可以同时在多个 channel 上进行发送 / 接收操作:
select {
case x := <-ch1: // 从channel ch1接收数据
... ...
case y, ok := <-ch2: // 从channel ch2接收数据,并根据ok值判断ch2是否已经关闭
... ...
case ch3 <- z: // 将z值发送到channel ch3中:
... ...
default: // 当上面case中的channel通信均无法实施时,执行该默认分支
}
当 select 语句中没有 default 分支,而且所有 case 中的 channel 操作都阻塞了的时候,整个 select 语句都将被阻塞,直到某一个 case 上的 channel 变成可发送,或者某个 case 上的 channel 变成可接收,select 语句才可以继续进行下去。关于 select 语句的妙用,我们在后面还会细讲,这里我们先简单了解它的基本语法。
看到这里你应该能感受到,channel 和 select 两种原语的操作都十分简单,它们都遵循了 Go 语言“追求简单”的设计哲学,但它们却为 Go 并发程序带来了强大的表达能力。学习了这些基础用法后,接下来我们再深一层,看看 Go 并发原语 channel 的一些惯用法。同样地,这里我们也分成无缓冲 channel 和带缓冲 channel 两种情况来分析。

无缓冲 channel 的惯用法

无缓冲 channel 兼具通信和同步特性,在并发程序中应用颇为广泛。现在我们来看看几个无缓冲 channel 的典型应用:

第一种用法:用作信号传递

无缓冲 channel 用作信号传递的时候,有两种情况,分别是 1 对 1 通知信号和 1 对 n 通知信号。我们先来分析下 1 对 1 通知信号这种情况。
我们直接来看具体的例子:
type signal struct{}
func worker() {
println("worker is working...")
time.Sleep(1 * time.Second)
}
func spawn(f func()) <-chan signal {
c := make(chan signal)
go func() {
println("worker start to work...")
f()
c <- signal{}
}()
return c
}
func main() {
println("start a worker...")
c := spawn(worker)
<-c
fmt.Println("worker work done!")
}
在这个例子中,spawn 函数返回的 channel,被用于承载新 Goroutine 退出的“通知信号”,这个信号专门用作通知 main goroutine。main goroutine 在调用 spawn 函数后一直阻塞在对这个“通知信号”的接收动作上。
我们来运行一下这个例子:
start a worker...
worker start to work...
worker is working...
worker work done!
有些时候,无缓冲 channel 还被用来实现 1 对 n 的信号通知机制。这样的信号通知机制,常被用于协调多个 Goroutine 一起工作,比如下面的例子:
func worker(i int) {
fmt.Printf("worker %d: is working...\n", i)
time.Sleep(1 * time.Second)
fmt.Printf("worker %d: works done\n", i)
}
type signal struct{}
func spawnGroup(f func(i int), num int, groupSignal <-chan signal) <-chan signal {
c := make(chan signal)
var wg sync.WaitGroup
for i := 0; i < num; i++ {
wg.Add(1)
go func(i int) {
<-groupSignal
fmt.Printf("worker %d: start to work...\n", i)
f(i)
wg.Done()
}(i + 1)
}
go func() {
wg.Wait()
c <- signal{}
}()
return c
}
func main() {
fmt.Println("start a group of workers...")
groupSignal := make(chan signal)
c := spawnGroup(worker, 5, groupSignal)
time.Sleep(5 * time.Second)
fmt.Println("the group of workers start to work...")
close(groupSignal)
<-c
fmt.Println("the group of workers work done!")
}
这个例子中,main goroutine 创建了一组 5 个 worker goroutine,这些 Goroutine 启动后会阻塞在名为 groupSignal 的无缓冲 channel 上。main goroutine 通过close(groupSignal)向所有 worker goroutine 广播“开始工作”的信号,收到 groupSignal 后,所有 worker goroutine 会“同时”开始工作,就像起跑线上的运动员听到了裁判员发出的起跑信号枪声。
这个例子的运行结果如下:
start a group of workers...
the group of workers start to work...
worker 3: start to work...
worker 3: is working...
worker 4: start to work...
worker 4: is working...
worker 1: start to work...
worker 1: is working...
worker 5: start to work...
worker 5: is working...
worker 2: start to work...
worker 2: is working...
worker 3: works done
worker 4: works done
worker 5: works done
worker 1: works done
worker 2: works done
the group of workers work done!
我们可以看到,关闭一个无缓冲 channel 会让所有阻塞在这个 channel 上的接收操作返回,从而实现了一种 1 对 n 的“广播”机制。

第二种用法:用于替代锁机制

无缓冲 channel 具有同步特性,这让它在某些场合可以替代锁,让我们的程序更加清晰,可读性也更好。我们可以对比下两个方案,直观地感受一下。
首先我们看一个传统的、基于“共享内存”+“互斥锁”的 Goroutine 安全的计数器的实现:
type counter struct {
sync.Mutex
i int
}
var cter counter
func Increase() int {
cter.Lock()
defer cter.Unlock()
cter.i++
return cter.i
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
v := Increase()
fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
wg.Done()
}(i)
}
wg.Wait()
}
在这个示例中,我们使用了一个带有互斥锁保护的全局变量作为计数器,所有要操作计数器的 Goroutine 共享这个全局变量,并在互斥锁的同步下对计数器进行自增操作。
接下来我们再看更符合 Go 设计惯例的实现,也就是使用无缓冲 channel 替代锁后的实现:
type counter struct {
c chan int
i int
}
func NewCounter() *counter {
cter := &counter{
c: make(chan int),
}
go func() {
for {
cter.i++
cter.c <- cter.i
}
}()
return cter
}
func (cter *counter) Increase() int {
return <-cter.c
}
func main() {
cter := NewCounter()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
v := cter.Increase()
fmt.Printf("goroutine-%d: current counter value is %d\n", i, v)
wg.Done()
}(i)
}
wg.Wait()
}
在这个实现中,我们将计数器操作全部交给一个独立的 Goroutine 去处理,并通过无缓冲 channel 的同步阻塞特性,实现了计数器的控制。这样其他 Goroutine 通过 Increase 函数试图增加计数器值的动作,实质上就转化为了一次无缓冲 channel 的接收动作。
这种并发设计逻辑更符合 Go 语言所倡导的“不要通过共享内存来通信,而是通过通信来共享内存”的原则。
运行这个示例,我们可以得出与互斥锁方案相同的结果:
goroutine-9: current counter value is 10
goroutine-0: current counter value is 1
goroutine-6: current counter value is 7
goroutine-2: current counter value is 3
goroutine-8: current counter value is 9
goroutine-4: current counter value is 5
goroutine-5: current counter value is 6
goroutine-1: current counter value is 2
goroutine-7: current counter value is 8
goroutine-3: current counter value is 4

带缓冲 channel 的惯用法

带缓冲的 channel 与无缓冲的 channel 的最大不同之处,就在于它的异步性。也就是说,对一个带缓冲 channel,在缓冲区未满的情况下,对它进行发送操作的 Goroutine 不会阻塞挂起;在缓冲区有数据的情况下,对它进行接收操作的 Goroutine 也不会阻塞挂起。
这种特性让带缓冲的 channel 有着与无缓冲 channel 不同的应用场合。接下来我们一个个来分析。

第一种用法:用作消息队列

channel 经常被 Go 初学者视为在多个 Goroutine 之间通信的消息队列,这是因为,channel 的原生特性与我们认知中的消息队列十分相似,包括 Goroutine 安全、有 FIFO(first-in, first out)保证等。
其实,和无缓冲 channel 更多用于信号 / 事件管道相比,可自行设置容量、异步收发的带缓冲 channel 更适合被用作为消息队列,并且,带缓冲 channel 在数据收发的性能上要明显好于无缓冲 channel。
我们可以通过对 channel 读写的基本测试来印证这一点。下面是一些关于无缓冲 channel 和带缓冲 channel 收发性能测试的结果(Go 1.17, MacBook Pro 8 核)。基准测试的代码比较多,我就不全部贴出来了,你可以到这里下载。
单接收单发送性能的基准测试
我们先来看看针对一个 channel 只有一个发送 Goroutine 和一个接收 Goroutine 的情况,两种 channel 的收发性能比对数据:
// 无缓冲channel
// go-channel-operation-benchmark/unbuffered-chan
$go test -bench . one_to_one_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkUnbufferedChan1To1Send-8 6037778 199.7 ns/op
BenchmarkUnbufferedChan1To1Recv-8 6286850 194.5 ns/op
PASS
ok command-line-arguments 2.833s
// 带缓冲channel
// go-channel-operation-benchmark/buffered-chan
$go test -bench . one_to_one_cap_10_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkBufferedChan1To1SendCap10-8 17089879 66.16 ns/op
BenchmarkBufferedChan1To1RecvCap10-8 18043450 65.57 ns/op
PASS
ok command-line-arguments 2.460s
然后我们将 channel 的缓存由 10 改为 100,再看看带缓冲 channel 的 1 对 1 基准测试结果:
$go test -bench . one_to_one_cap_100_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkBufferedChan1To1SendCap100-8 23089318 53.06 ns/op
BenchmarkBufferedChan1To1RecvCap100-8 23474095 51.33 ns/op
PASS
ok command-line-arguments 2.542s
多接收多发送性能基准测试
我们再来看看,针对一个 channel 有多个发送 Goroutine 和多个接收 Goroutine 的情况,两种 channel 的收发性能比对数据(这里建立 10 个发送 Goroutine 和 10 个接收 Goroutine):
// 无缓冲channel
// go-channel-operation-benchmark/unbuffered-chan
$go test -bench . multi_to_multi_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkUnbufferedChanNToNSend-8 293930 3779 ns/op
BenchmarkUnbufferedChanNToNRecv-8 280904 4190 ns/op
PASS
ok command-line-arguments 2.387s
// 带缓冲channel
// go-channel-operation-benchmark/buffered-chan
$go test -bench . multi_to_multi_cap_10_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkBufferedChanNToNSendCap10-8 736540 1609 ns/op
BenchmarkBufferedChanNToNRecvCap10-8 795416 1616 ns/op
PASS
ok command-line-arguments 2.514s
这里我们也将 channel 的缓存由 10 改为 100 后,看看带缓冲 channel 的多对多基准测试结果:
$go test -bench . multi_to_multi_cap_100_test.go
goos: darwin
goarch: amd64
cpu: Intel(R) Core(TM) i5-8257U CPU @ 1.40GHz
BenchmarkBufferedChanNToNSendCap100-8 1236453 966.4 ns/op
BenchmarkBufferedChanNToNRecvCap100-8 1279766 969.4 ns/op
PASS
ok command-line-arguments 4.309s
综合前面这些结果数据,我们可以得出几个初步结论:
无论是 1 收 1 发还是多收多发,带缓冲 channel 的收发性能都要好于无缓冲 channel;
对于带缓冲 channel 而言,发送与接收的 Goroutine 数量越多,收发性能会有所下降;
对于带缓冲 channel 而言,选择适当容量会在一定程度上提升收发性能。
不过你要注意的是,Go 支持 channel 的初衷是将它作为 Goroutine 间的通信手段,它并不是专门用于消息队列场景的。如果你的项目需要专业消息队列的功能特性,比如支持优先级、支持权重、支持离线持久化等,那么 channel 就不合适了,可以使用第三方的专业的消息队列实现。

第二种用法:用作计数信号量(counting semaphore)

Go 并发设计的一个惯用法,就是将带缓冲 channel 用作计数信号量(counting semaphore)。带缓冲 channel 中的当前数据个数代表的是,当前同时处于活动状态(处理业务)的 Goroutine 的数量,而带缓冲 channel 的容量(capacity),就代表了允许同时处于活动状态的 Goroutine 的最大数量。向带缓冲 channel 的一个发送操作表示获取一个信号量,而从 channel 的一个接收操作则表示释放一个信号量。
这里我们来看一个将带缓冲 channel 用作计数信号量的例子:
var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)
func main() {
go func() {
for i := 0; i < 8; i++ {
jobs <- (i + 1)
}
close(jobs)
}()
var wg sync.WaitGroup
for j := range jobs {
wg.Add(1)
go func(j int) {
active <- struct{}{}
log.Printf("handle job: %d\n", j)
time.Sleep(2 * time.Second)
<-active
wg.Done()
}(j)
}
wg.Wait()
}
我们看到,这个示例创建了一组 Goroutine 来处理 job,同一时间允许最多 3 个 Goroutine 处于活动状态。
为了达成这一目标,我们看到这个示例使用了一个容量(capacity)为 3 的带缓冲 channel: active 作为计数信号量,这意味着允许同时处于活动状态的最大 Goroutine 数量为 3。
我们运行一下这个示例:
2022/01/02 10:08:55 handle job: 1
2022/01/02 10:08:55 handle job: 4
2022/01/02 10:08:55 handle job: 8
2022/01/02 10:08:57 handle job: 5
2022/01/02 10:08:57 handle job: 7
2022/01/02 10:08:57 handle job: 6
2022/01/02 10:08:59 handle job: 3
2022/01/02 10:08:59 handle job: 2
从示例运行结果中的时间戳中,我们可以看到,虽然我们创建了很多 Goroutine,但由于计数信号量的存在,同一时间内处于活动状态(正在处理 job)的 Goroutine 的数量最多为 3 个。

len(channel) 的应用

len 是 Go 语言的一个内置函数,它支持接收数组、切片、map、字符串和 channel 类型的参数,并返回对应类型的“长度”,也就是一个整型值。
针对 channel ch 的类型不同,len(ch) 有如下两种语义:
当 ch 为无缓冲 channel 时,len(ch) 总是返回 0;
当 ch 为带缓冲 channel 时,len(ch) 返回当前 channel ch 中尚未被读取的元素个数。
这样一来,针对带缓冲 channel 的 len 调用似乎才是有意义的。那我们是否可以使用 len 函数来实现带缓冲 channel 的“判满”、“判有”和“判空”逻辑呢?就像下面示例中伪代码这样:
var ch chan T = make(chan T, capacity)
// 判空
if len(ch) == 0 {
// 此时channel ch空了?
}
// 判有
if len(ch) > 0 {
// 此时channel ch中有数据?
}
// 判满
if len(ch) == cap(ch) {
// 此时channel ch满了?
}
你可以看到,我在上面代码注释的“空了”、“有数据”和“满了”的后面都打上了问号这是为什么呢?
这是因为,channel 原语用于多个 Goroutine 间的通信,一旦多个 Goroutine 共同对 channel 进行收发操作,len(channel) 就会在多个 Goroutine 间形成“竞态”。单纯地依靠 len(channel) 来判断 channel 中元素状态,是不能保证在后续对 channel 的收发时 channel 状态是不变的。
我们以判空为例看看:
从上图可以看到,Goroutine1 使用 len(channel) 判空后,就会尝试从 channel 中接收数据。但在它真正从 channel 读数据之前,另外一个 Goroutine2 已经将数据读了出去,所以,Goroutine1 后面的读取就会阻塞在 channel 上,导致后面逻辑的失效。
因此,为了不阻塞在 channel 上,常见的方法是将“判空与读取”放在一个“事务”中,将“判满与写入”放在一个“事务”中,而这类“事务”我们可以通过 select 实现。我们来看下面示例:
func producer(c chan<- int) {
var i int = 1
for {
time.Sleep(2 * time.Second)
ok := trySend(c, i)
if ok {
fmt.Printf("[producer]: send [%d] to channel\n", i)
i++
continue
}
fmt.Printf("[producer]: try send [%d], but channel is full\n", i)
}
}
func tryRecv(c <-chan int) (int, bool) {
select {
case i := <-c:
return i, true
default:
return 0, false
}
}
func trySend(c chan<- int, i int) bool {
select {
case c <- i:
return true
default:
return false
}
}
func consumer(c <-chan int) {
for {
i, ok := tryRecv(c)
if !ok {
fmt.Println("[consumer]: try to recv from channel, but the channel is empty")
time.Sleep(1 * time.Second)
continue
}
fmt.Printf("[consumer]: recv [%d] from channel\n", i)
if i >= 3 {
fmt.Println("[consumer]: exit")
return
}
}
}
func main() {
var wg sync.WaitGroup
c := make(chan int, 3)
wg.Add(2)
go func() {
producer(c)
wg.Done()
}()
go func() {
consumer(c)
wg.Done()
}()
wg.Wait()
}
我们看到,由于用到了 select 原语的 default 分支语义,当 channel 空的时候,tryRecv 不会阻塞;当 channel 满的时候,trySend 也不会阻塞。
这个示例的运行结果也证明了这一点,无论是使用 tryRecv 的 consumer 还是使用 trySend 的 producer 都不会阻塞:
[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [1] to channel
[consumer]: recv [1] from channel
[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [2] to channel
[consumer]: recv [2] from channel
[consumer]: try to recv from channel, but the channel is empty
[consumer]: try to recv from channel, but the channel is empty
[producer]: send [3] to channel
[consumer]: recv [3] from channel
[consumer]: exit
[producer]: send [4] to channel
[producer]: send [5] to channel
[producer]: send [6] to channel
[producer]: try send [7], but channel is full
[producer]: try send [7], but channel is full
[producer]: try send [7], but channel is full
... ...
这种方法适用于大多数场合,但是这种方法有一个“问题”,那就是它改变了 channel 的状态,会让 channel 接收了一个元素或发送一个元素到 channel。
有些时候我们不想这么做,我们想在不改变 channel 状态的前提下,单纯地侦测 channel 的状态,而又不会因 channel 满或空阻塞在 channel 上。但很遗憾,目前没有一种方法可以在实现这样的功能的同时,适用于所有场合。
但是在特定的场景下,我们可以用 len(channel) 来实现。比如下面这两种场景:
上图中的情景 (a) 是一个“多发送单接收”的场景,也就是有多个发送者,但有且只有一个接收者。在这样的场景下,我们可以在接收 goroutine 中使用len(channel)是否大于0来判断是否 channel 中有数据需要接收。
而情景 (b) 呢,是一个“多接收单发送”的场景,也就是有多个接收者,但有且只有一个发送者。在这样的场景下,我们可以在发送 Goroutine 中使用len(channel)是否小于cap(channel)来判断是否可以执行向 channel 的发送操作。

nil channel 的妙用

如果一个 channel 类型变量的值为 nil,我们称它为 nil channel。nil channel 有一个特性,那就是对 nil channel 的读写都会发生阻塞。比如下面示例代码:
func main() {
var c chan int
<-c //阻塞
}
或者:
func main() {
var c chan int
c<-1 //阻塞
}
你会看到,无论上面的哪段代码被执行,main goroutine 都会阻塞在对 nil channel 的操作上。
不过,nil channel 的这个特性可不是一无是处,有些时候应用 nil channel 的这个特性可以得到事半功倍的效果。我们来看一个例子:
func main() {
ch1, ch2 := make(chan int), make(chan int)
go func() {
time.Sleep(time.Second * 5)
ch1 <- 5
close(ch1)
}()
go func() {
time.Sleep(time.Second * 7)
ch2 <- 7
close(ch2)
}()
var ok1, ok2 bool
for {
select {
case x := <-ch1:
ok1 = true
fmt.Println(x)
case x := <-ch2:
ok2 = true
fmt.Println(x)
}
if ok1 && ok2 {
break
}
}
fmt.Println("program end")
}
在这个示例中,我们期望程序在接收完 ch1 和 ch2 两个 channel 上的数据后就退出。但实际的运行情况却是这样的:
5
0
0
0
... ... //循环输出0
7
program end
我们原本期望上面这个在依次输出 5 和 7 两个数字后退出,但实际运行的输出结果却是在输出 5 之后,程序输出了许多的 0 值,之后才输出 7 并退出。
这是怎么回事呢?我们简单分析一下这段代码的运行过程:
前 5s,select 一直处于阻塞状态;
第 5s,ch1 返回一个 5 后被 close,select 语句的case x := <-ch1这个分支被选出执行,程序输出 5,并回到 for 循环并重新 select;
由于 ch1 被关闭,从一个已关闭的 channel 接收数据将永远不会被阻塞,于是新一轮 select 又把case x := <-ch1这个分支选出并执行。由于 ch1 处于关闭状态,从这个 channel 获取数据,我们会得到这个 channel 对应类型的零值,这里就是 0。于是程序再次输出 0;程序按这个逻辑循环执行,一直输出 0 值;
2s 后,ch2 被写入了一个数值 7。这样在某一轮 select 的过程中,分支case x := <-ch2被选中得以执行,程序输出 7 之后满足退出条件,于是程序终止。
那我们可以怎么改进一下这个程序,让它能按照我们的预期输出呢?
是时候让 nil channel 登场了!用 nil channel 改进后的示例代码是这样的:
func main() {
ch1, ch2 := make(chan int), make(chan int)
go func() {
time.Sleep(time.Second * 5)
ch1 <- 5
close(ch1)
}()
go func() {
time.Sleep(time.Second * 7)
ch2 <- 7
close(ch2)
}()
for {
select {
case x, ok := <-ch1:
if !ok {
ch1 = nil
} else {
fmt.Println(x)
}
case x, ok := <-ch2:
if !ok {
ch2 = nil
} else {
fmt.Println(x)
}
}
if ch1 == nil && ch2 == nil {
break
}
}
fmt.Println("program end")
}
这里,改进后的示例程序的最关键的一个变化,就是在判断 ch1 或 ch2 被关闭后,显式地将 ch1 或 ch2 置为 nil。
而我们前面已经知道了,对一个 nil channel 执行获取操作,这个操作将阻塞。于是,这里已经被置为 nil 的 c1 或 c2 的分支,将再也不会被 select 选中执行。
改进后的示例的运行结果如下,与我们预期相符:
5
7
program end

与 select 结合使用的一些惯用法

channel 和 select 的结合使用能形成强大的表达能力,我们在前面的例子中已经或多或少见识过了。这里我再强调几种 channel 与 select 结合的惯用法。

第一种用法:利用 default 分支避免阻塞

select 语句的 default 分支的语义,就是在其他非 default 分支因通信未就绪,而无法被选择的时候执行的,这就给 default 分支赋予了一种“避免阻塞”的特性。
其实在前面的“len(channel) 的应用”小节的例子中,我们就已经用到了“利用 default 分支”实现的trySendtryRecv两个函数:
func tryRecv(c <-chan int) (int, bool) {
select {
case i := <-c:
return i, true
default: // channel为空
return 0, false
}
}
func trySend(c chan<- int, i int) bool {
select {
case c <- i:
return true
default: // channel满了
return false
}
}
而且,无论是无缓冲 channel 还是带缓冲 channel,这两个函数都能适用,并且不会阻塞在空 channel 或元素个数已经达到容量的 channel 上。
在 Go 标准库中,这个惯用法也有应用,比如:
// $GOROOT/src/time/sleep.go
func sendTime(c interface{}, seq uintptr) {
// 无阻塞的向c发送当前时间
select {
case c.(chan Time) <- Now():
default:
}
}

第二种用法:实现超时机制

带超时机制的 select,是 Go 中常见的一种 select 和 channel 的组合用法。通过超时事件,我们既可以避免长期陷入某种操作的等待中,也可以做一些异常处理工作。
比如,下面示例代码实现了一次具有 30s 超时的 select:
func worker() {
select {
case <-c:
// ... do some stuff
case <-time.After(30 *time.Second):
return
}
}
不过,在应用带有超时机制的 select 时,我们要特别注意 timer 使用后的释放,尤其在大量创建 timer 的时候。
Go 语言标准库提供的 timer 实际上是由 Go 运行时自行维护的,而不是操作系统级的定时器资源,它的使用代价要比操作系统级的低许多。但即便如此,作为 time.Timer 的使用者,我们也要尽量减少在使用 Timer 时给 Go 运行时和 Go 垃圾回收带来的压力,要及时调用 timer 的 Stop 方法回收 Timer 资源。

第三种用法:实现心跳机制

结合 time 包的 Ticker,我们可以实现带有心跳机制的 select。这种机制让我们可以在监听 channel 的同时,执行一些周期性的任务,比如下面这段代码:
func worker() {
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-c:
// ... do some stuff
case <- heartbeat.C:
//... do heartbeat stuff
}
}
}
这里我们使用 time.NewTicker,创建了一个 Ticker 类型实例 heartbeat。这个实例包含一个 channel 类型的字段 C,这个字段会按一定时间间隔持续产生事件,就像“心跳”一样。这样 for 循环在 channel c 无数据接收时,会每隔特定时间完成一次迭代,然后回到 for 循环进行下一次迭代。
和 timer 一样,我们在使用完 ticker 之后,也不要忘记调用它的 Stop 方法,避免心跳事件在 ticker 的 channel(上面示例中的 heartbeat.C)中持续产生。

小结

好了,今天的课讲到这里就结束了,现在我们一起来回顾一下吧。
在这一讲中,我们系统学习了 Go CSP 并发方案中除 Goroutine 之外的另一个重要组成部分:channel。Go 为了原生支持并发,把 channel 视作一等公民身份,这就大幅提升了开发人员使用 channel 进行并发设计和实现的体验。
通过预定义函数 make,我们可以创建两类 channel:无缓冲 channel 与带缓冲的 channel。这两类 channel 具有不同的收发特性,可以适用于不同的应用场合:无缓冲 channel 兼具通信与同步特性,常用于作为信号通知或替代同步锁;而带缓冲 channel 的异步性,让它更适合用来实现基于内存的消息队列、计数信号量等。
此外,你也要牢记值为 nil 的 channel 的阻塞特性,有些时候它也能帮上大忙。而面对已关闭的 channel 你也一定要小心,尤其要避免向已关闭的 channel 发送数据,那会导致 panic。
最后,select 是 Go 为了支持同时操作多个 channel,而引入的另外一个并发原语,select 与 channel 有几种常用的固定搭配,你也要好好掌握和理解。

思考题

channel 作为 Go 并发设计的重要组成部分,需要你掌握的细节非常多。而且,channel 的应用模式也非常多,我们这一讲仅挑了几个常见的模式做了讲解。在日常开发中你还见过哪些实用的 channel 使用模式呢?欢迎在留言区分享。
如果你觉得有收获,也欢迎你把这节课分享给更多对 Go 并发感兴趣的朋友。我是 Tony Bai,我们下节课见。
分享给需要的人,Ta购买本课程,你将得18
生成海报并分享

赞 39

提建议

上一篇
32|并发:聊聊Goroutine调度器的原理
下一篇
34|并发:如何使用共享变量?
unpreview
 写留言

精选留言(36)

  • peison
    2022-04-15
    请问计数信号量的例子中,因为jobs的容量是10,这里执行的循环不会导致阻塞,close(jobs) 应该会被执行到,那么下面的for range为什么不会终止,而可以继续运行? go func() { for i := 0; i < 8; i++ { jobs <- (i + 1) } close(jobs) }()
    展开

    作者回复: 好问题! channel内部数据是排队的,即便被close,依然可以从closed channel中读取到尚未被消费的元素,直到没有可读的元素为止,才真正会变成closed状态。没数据后,如果再读就会得到元素类型的零值了, 对于没数据且closed状态的channel,for range会终止。

    共 3 条评论
    14
  • 张申傲
    2022-02-20
    这节课信息量有点大,需要多看几遍好好消化。请问老师一个问题:如果程序中没有手动 close channel,那么 channel 会在什么时候关闭呢?是否需要借助 defer 去释放 channel 资源呢?

    作者回复: channel一旦没有人引用了,就会被gc掉,不关闭也ok。但是如果有goroutine一直在读channel,那么channel一直存在,不会关闭。直到程序退出。

    7
  • 木木
    2022-03-18
    go的并发原语选择真的是非常精炼:简洁又强大,一个ch就负责了线程通信、同步的多种功能;一个select又实现了对阻塞、非阻塞的控制以及事件循环模式。

    作者回复: 👍

    5
  • ibin
    2022-01-12
    白老师,你好,下面这段可以模拟close(groupSignal) for i := 0;i < 5; i++ { groupSignal<-signal(struct{}{}) } 为什么close(groupSignal) 可以给每个groupSignal都发送了{}

    作者回复: close一个channel后,所有阻塞在这个channel接收操作的goroutine都会收到通知,这是Go语言的channel语义就这么定义的。

    共 4 条评论
    5
  • 罗杰
    2022-01-12
    这节课比较绕,要静下心好好学习

    作者回复: 👍

    5
  • airmy丶
    2022-05-18
    请问下老师: 为什么 "1 对 n 的信号通知机制" 这个例子中,wg.Wait() 一定需要新起一个协程执行呢?而且在本地测试确实只能在新的协程中执行才不会报错,否则会报出: goroutine x [chan receive] 这样的错误。

    作者回复: Wait方法的语义就是等待例子中for循环创建的所有子goroutine,直到每个子goroutine都调用完wg.Done才返回。如果不再一个新goroutine执行,wg.Wait就会阻塞住main goroutine,这也将导致后续所有goroutine都阻塞住,然后go运行时检测到所有goroutine都阻塞住了,于是报错退出。

    共 2 条评论
    3
  • 瓜牛
    2022-04-20
    为啥有时需要手动调用close关闭channel,有时又不需要?

    作者回复: 首先明确一点:channel如果不close,也不会存在资源泄露的问题。 是否需要close channel完全看需要。 至于如何知道何时需要,看文中对close channel的语义的描述,以及如何基于这种语义的一些妙用。

    共 2 条评论
    3
  • 0mfg
    2022-01-12
    白老师好,请教个问题。对于无缓冲通道的结论,“对无缓冲 channel 类型的发送与接收操作,一定要放在两个不同的 Goroutine 中进行,否则会导致 deadlock。” 尝试了如下写法,发送在主goroutine,接收在新goroutine发现还是deadlock,请问具体原因是啥?谢谢 package main import "fmt" func main() { ch := make(chan int) ch <- 13 go func() { n := <- ch fmt.Println(n) }() }
    展开

    作者回复: main goroutine在执行ch <-13时就阻塞住了。还没执行到创建下面那个goroutine呢。

    共 4 条评论
    2
  • knightjdq
    2022-11-25 来自辽宁
    白老师好:请教下用于替代锁机制中的代码,十分感谢! func NewCounter() *counter { cter := &counter{ c: make(chan int), } go func() { for { cter.i++ cter.c <- cter.i } }() return cter } 这里的死循环,i++写入channel后阻塞,Increase函数来读取,for循环到9后,不再读取,channel阻塞,那死循环的groutine呢?在counter对象销毁后就停止执行了是么?
    展开

    作者回复: 这里仅是示例,NewCounter中新创建的goroutine(也就是你所说的“死循环”goroutine)的生命周期是在Newcounter中创建,随进程结束而结束的。

    1
  • 每天晒白牙
    2022-07-20
    感觉只发送channel和只接收channel类型定义符号,交换一下更好理解,也更形象吧 make(chan<- int, 1) 这个代表只接收 make(<-chan int, 1) 这个代表只发送

    作者回复: 由于箭头都是向左的,即<-,所以我区分只发送和只接收型的channel的tip是以 chan这个关键字为中心,将chan关键字看成一个“管子”。当<-在chan的右边,即chan <- 好似向管子里写,这样就是只发送型。当<-在chan的左边,即<- chan ,好似从管子里取,这样就是只接收型。

    共 2 条评论
    1
  • 小虎
    2022-07-13
    “无缓冲 channel 替代锁”这个示例说到“这种并发设计逻辑更符合 Go 语言所倡导的“不要通过共享内存来通信,而是通过通信来共享内存”的原则。”,没太想明白为什么符合这个原则,goroutine 中调用了cter.Increase(),这不是共享了cter这个变量吗?

    作者回复: 这个示例要共享的内存是“counter结构中的i”,而不是i的外围结构counter。cter.Increase是改变共享内存的方法,里面可以采用共享内存的方式,比如通过lock进行同步,也可以通过通信的方式,比如基于channel.

    1
  • 郑泽洲
    2022-02-19
    这节课需要反复读,举的例子和示例代码都非常好,值得深入思考,只要思考到位,对channel认识就能加深一步👍🏻

    作者回复: 👍

    1
  • return
    2022-01-20
    太干了,看了几天才看完, 面面俱到了, 各种坑也指点出来了。 收获非常大, 从瞎用channel,到现在 心里有底,知道该怎么用channel了。 非常感谢老师,老师讲的太赞了。

    作者回复: 👍

    1
  • lesserror
    2022-01-16
    channel是Go并发设计的核心之一,要反复阅读理解。大白老师这里总结的很不错。有以下疑问,麻烦解答。 1. 无缓冲 channel 的惯用法 -> 第一种用法:用作信号传递 。下面的示例代码: c <- signal(struct{}{}) 改成 c <- signal{} 会不会更好一些呢? 2. 无缓冲channel替代锁那里。计数器操作全部交给一个独立的 Goroutine 去处理。我的理解是这个Goroutine 其实最后还是处于阻塞的状态下。最后主 Goroutine 结束运行了。这个Goroutine 不得不退出了,这么理解对吗?
    展开

    作者回复: 1. 👍 2. 是的。main goroutine更关注"worker" goroutine的状态,所以只等待了worker goroutine的退出。

    1
  • Calvin
    2022-01-13
    请教下,文章中说到“select 这种判空或判满的方法适用于大多数场合,但是这种方法有一个“问题”,那就是它改变了 channel 的状态,会让 channel 接收了一个元素或发送一个元素到 channel。”,怎样理解这句话?为什么“会让 channel 接收了一个元素或发送一个元素到 channel”呢?

    作者回复: trySend或tryRecv有两个分支,当返回False时,说明channel满或channel为空。但另外一个执行分支可能会修改channel,即向channel中成功发送一个元素或从channel中成功读取一个元素。文中接着也提到“有些时候我们不想这么做,我们想在不改变 channel 状态的前提下,单纯地侦测 channel 的状态,而又不会因 channel 满或空阻塞在 channel 上”。但trySend和tryRecv不能完全满足这种需求。

    共 5 条评论
    1
  • momo
    2022-12-13 来自辽宁
    狂赞

    作者回复: 👍

  • Unknown element
    2022-11-20 来自北京
    老师我看makechan的源码发现分配内存的时候分了3种情况: 1. 缓冲区大小=0 2. 元素类型不是指针 3. 元素类型包含指针 我想问下为什么2和3要分成两种情况呢?我看区别好像是调用 mallocgc 时第二个参数不一样,但是mallocgc 的源码我就看不懂了。。。希望老师可以简单解释下 谢谢老师~
    展开

    作者回复: 如果channel中的元素大小为0,那就不需要额外分配缓存(buf); 如果元素类型中不含有指针,那么buf就和hchan一起分配(将来也和hchan一起释放),减少一次heap mem分配。GC只扫描hchan就ok了。 如果元素类型包含指针,那么hchan和hchan.buf单独分配内存,GC分别扫描hchan和buf中的元素。

  • fengruichao
    2022-09-22 来自辽宁
    func test1() { ch1 := make(chan int) ch1 <- 1 go func() { fmt.Println(<-ch1) }() } 为什么把接收操作放在Goroutine会报fatal error: all goroutines are asleep - deadlock!,而把发送操作放到Goroutine中就不会报错
    展开

    作者回复: ch1 <- 1这一行将整个程序阻塞住了,下面的goroutine根本就没有得到创建的机会。go runtime检测到没有可以运行的用户goroutine了,所以报错!

  • Jay
    2022-09-17 来自北京
    func worker() { heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() for { select { case <-c: // ... do some stuff case <- heartbeat.C: //... do heartbeat stuff } } } 老师,最后这段代码我没太明白,select如果没有default的话,不可发送和不可接受都会处于阻塞,状态,select本身不是会一直等待吗(select本身看起来像是个循环等待啊),外面的for循环有什么意义呢
    展开

    作者回复: select会等待啊。但如果没有外面的for循环,只会等待一次啊。然后就继续向下执行,就退出worker函数了。这里代码的目的就是循环定时等待。

  • Jack Xin
    2022-09-13 来自北京
    白老师,想问您一个问题,关于无缓冲chan 的第二种用法:替换锁机制。我复制里面的例子在我的Mac上运行得到的是下面的结果,跟您的不一样,不是v=i+1的关系,想了半天也不知道为什么,我用的1.18版本的go, goroutine-0: current counter value is 1 goroutine-1: current counter value is 9 goroutine-9: current counter value is 3 goroutine-5: current counter value is 4 goroutine-6: current counter value is 5 goroutine-7: current counter value is 6 goroutine-8: current counter value is 7 goroutine-2: current counter value is 8 goroutine-4: current counter value is 2 goroutine-3: current counter value is 10
    展开

    作者回复: 你是说 输出的顺序和文章中不一致。 这个没问题。这个例子演示的是如何基于channel实现计数器,保证每个goroutine拿到的计数值都是不同的。输出顺序与goroutine调度顺序有关。