1
0
mirror of https://github.com/chai2010/advanced-go-programming-book.git synced 2025-05-23 20:02:22 +00:00

修改排版 ch1-06

This commit is contained in:
iGmainC 2022-01-23 16:41:45 +08:00
parent 2da13f2e69
commit d29be037ff

View File

@ -1,22 +1,22 @@
# 1.6 常见的并发模式
Go语言最吸引人的地方是它内建的并发支持。Go语言并发体系的理论是C.A.R Hoare在1978年提出的CSPCommunicating Sequential Process通讯顺序进程。CSP有着精确的数学模型并实际应用在了Hoare参与设计的T9000通用计算机上。从NewSqueak、Alef、Limbo到现在的Go语言对于对CSP有着20多年实战经验的Rob Pike来说他更关注的是将CSP应用在通用编程语言上产生的潜力。作为Go并发编程核心的CSP理论的核心概念只有一个同步通信。关于同步通信的话题我们在前面一节已经讲过本节我们将简单介绍下Go语言中常见的并发模式。
Go 语言最吸引人的地方是它内建的并发支持。Go 语言并发体系的理论是 _C.A.R Hoare_ 1978 年提出的 CSPCommunicating Sequential Process通讯顺序进程。CSP 有着精确的数学模型,并实际应用在了 Hoare 参与设计的 T9000 通用计算机上。从 NewSqueak、Alef、Limbo 到现在的 Go 语言,对于对 CSP 有着 20 多年实战经验的 _Rob Pike_ 来说,他更关注的是将 CSP 应用在通用编程语言上产生的潜力。作为 Go 并发编程核心的 CSP 理论的核心概念只有一个:同步通信。关于同步通信的话题我们在前面一节已经讲过,本节我们将简单介绍下 Go 语言中常见的并发模式。
首先要明确一个概念并发不是并行。并发更关注的是程序的设计层面并发的程序完全是可以顺序执行的只有在真正的多核CPU上才可能真正地同时运行。并行更关注的是程序的运行层面并行一般是简单的大量重复例如GPU中对图像处理都会有大量的并行运算。为更好的编写并发程序从设计之初Go语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型让程序员专注于分解问题和组合方案而且不用被线程管理和信号互斥这些繁琐的操作分散精力。
首先要明确一个概念:并发不是并行。并发更关注的是程序的设计层面,并发的程序完全是可以顺序执行的,只有在真正的多核 CPU 上才可能真正地同时运行。并行更关注的是程序的运行层面,并行一般是简单的大量重复,例如 GPU 中对图像处理都会有大量的并行运算。为更好的编写并发程序,从设计之初 Go 语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型,让程序员专注于分解问题和组合方案,而且不用被线程管理和信号互斥这些繁琐的操作分散精力。
在并发编程中对共享资源的正确访问需要精确的控制在目前的绝大多数语言中都是通过加锁等线程同步方案来解决这一困难问题而Go语言却另辟蹊径它将共享的值通过Channel传递(实际上多个独立执行的线程很少主动共享资源)。在任意给定的时刻最好只有一个Goroutine能够拥有该资源。数据竞争从设计层面上就被杜绝了。为了提倡这种思考方式Go语言将其并发编程哲学化为一句口号
在并发编程中,对共享资源的正确访问需要精确的控制,在目前的绝大多数语言中,都是通过加锁等线程同步方案来解决这一困难问题,而 Go 语言却另辟蹊径,它将共享的值通过 Channel 传递(实际上多个独立执行的线程很少主动共享资源)。在任意给定的时刻,最好只有一个 Goroutine 能够拥有该资源。数据竞争从设计层面上就被杜绝了。为了提倡这种思考方式Go 语言将其并发编程哲学化为一句口号:
> Do not communicate by sharing memory; instead, share memory by communicating.
> 不要通过共享内存来通信,而应通过通信来共享内存。
这是更高层次的并发编程哲学(通过管道来传值是Go语言推荐的做法)。虽然像引用计数这类简单的并发问题通过原子操作或互斥锁就能很好地实现但是通过Channel来控制访问能够让你写出更简洁正确的程序。
这是更高层次的并发编程哲学(通过管道来传值是 Go 语言推荐的做法)。虽然像引用计数这类简单的并发问题通过原子操作或互斥锁就能很好地实现,但是通过 Channel 来控制访问能够让你写出更简洁正确的程序。
## 1.6.1 并发版本的Hello world
## 1.6.1 并发版本的 Hello world
我们先以在一个新的Goroutine中输出“Hello world”`main`等待后台线程输出工作完成之后退出,这样一个简单的并发程序作为热身。
我们先以在一个新的 Goroutine 中输出“Hello world”`main` 等待后台线程输出工作完成之后退出,这样一个简单的并发程序作为热身。
并发编程的核心概念是同步通信,但是同步的方式却有多种。我们先以大家熟悉的互斥量`sync.Mutex`来实现同步通信。根据文档,我们不能直接对一个未加锁状态的`sync.Mutex`进行解锁,这会导致运行时异常。下面这种方式并不能保证正常工作:
并发编程的核心概念是同步通信,但是同步的方式却有多种。我们先以大家熟悉的互斥量 `sync.Mutex` 来实现同步通信。根据文档,我们不能直接对一个未加锁状态的 `sync.Mutex` 进行解锁,这会导致运行时异常。下面这种方式并不能保证正常工作:
```go
func main() {
@ -31,7 +31,7 @@ func main() {
}
```
因为`mu.Lock()``mu.Unlock()`并不在同一个Goroutine中所以也就不满足顺序一致性内存模型。同时它们也没有其它的同步事件可以参考这两个事件不可排序也就是可以并发的。因为可能是并发的事件所以`main`函数中的`mu.Unlock()`很有可能先发生,而这个时刻`mu`互斥对象还处于未加锁的状态,从而会导致运行时异常。
因为 `mu.Lock()` `mu.Unlock()` 并不在同一个 Goroutine 中,所以也就不满足顺序一致性内存模型。同时它们也没有其它的同步事件可以参考,这两个事件不可排序也就是可以并发的。因为可能是并发的事件,所以 `main` 函数中的 `mu.Unlock()` 很有可能先发生,而这个时刻 `mu` 互斥对象还处于未加锁的状态,从而会导致运行时异常。
下面是修复后的代码:
@ -49,9 +49,9 @@ func main() {
}
```
修复的方式是在`main`函数所在线程中执行两次`mu.Lock()`,当第二次加锁时会因为锁已经被占用(不是递归锁)而阻塞,`main`函数的阻塞状态驱动后台线程继续向前执行。当后台线程执行到`mu.Unlock()`时解锁,此时打印工作已经完成了,解锁会导致`main`函数中的第二个`mu.Lock()`阻塞状态取消,此时后台线程和主线程再没有其它的同步事件参考,它们退出的事件将是并发的:在`main`函数退出导致程序退出时,后台线程可能已经退出了,也可能没有退出。虽然无法确定两个线程退出的时间,但是打印工作是可以正确完成的。
修复的方式是在 `main` 函数所在线程中执行两次 `mu.Lock()`,当第二次加锁时会因为锁已经被占用(不是递归锁)而阻塞,`main` 函数的阻塞状态驱动后台线程继续向前执行。当后台线程执行到 `mu.Unlock()` 时解锁,此时打印工作已经完成了,解锁会导致 `main` 函数中的第二个 `mu.Lock()` 阻塞状态取消,此时后台线程和主线程再没有其它的同步事件参考,它们退出的事件将是并发的:在 `main` 函数退出导致程序退出时,后台线程可能已经退出了,也可能没有退出。虽然无法确定两个线程退出的时间,但是打印工作是可以正确完成的。
使用`sync.Mutex`互斥锁同步是比较低级的做法。我们现在改用无缓存的管道来实现同步:
使用 `sync.Mutex` 互斥锁同步是比较低级的做法。我们现在改用无缓存的管道来实现同步:
```go
func main() {
@ -66,9 +66,9 @@ func main() {
}
```
根据Go语言内存模型规范对于从无缓冲Channel进行的接收发生在对该Channel进行的发送完成之前。因此后台线程`<-done`接收操作完成之后,`main`线程的`done <- 1`发送操作才可能完成从而退出main、退出程序而此时打印工作已经完成了。
根据 Go 语言内存模型规范,对于从无缓冲 Channel 进行的接收,发生在对该 Channel 进行的发送完成之前。因此,后台线程 `<-done` 接收操作完成之后,`main` 线程的 `done <- 1` 发送操作才可能完成(从而退出 main、退出程序而此时打印工作已经完成了。
上面的代码虽然可以正确同步但是对管道的缓存大小太敏感如果管道有缓存的话就无法保证main退出之前后台线程能正常打印了。更好的做法是将管道的发送和接收方向调换一下这样可以避免同步事件受管道缓存大小的影响
上面的代码虽然可以正确同步,但是对管道的缓存大小太敏感:如果管道有缓存的话,就无法保证 main 退出之前后台线程能正常打印了。更好的做法是将管道的发送和接收方向调换一下,这样可以避免同步事件受管道缓存大小的影响:
```go
func main() {
@ -83,15 +83,15 @@ func main() {
}
```
对于带缓冲的Channel对于Channel的第K个接收完成操作发生在第K+C个发送操作完成之前其中C是Channel的缓存大小。虽然管道是带缓存的`main`线程接收完成是在后台线程发送开始但还未完成的时刻,此时打印工作也是已经完成的。
对于带缓冲的 Channel对于 Channel 的第 K 个接收完成操作发生在第 K+C 个发送操作完成之前,其中 C Channel 的缓存大小。虽然管道是带缓存的,`main` 线程接收完成是在后台线程发送开始但还未完成的时刻,此时打印工作也是已经完成的。
基于带缓存的管道我们可以很容易将打印线程扩展到N个。下面的例子是开启10个后台线程分别打印
基于带缓存的管道,我们可以很容易将打印线程扩展到 N 个。下面的例子是开启 10 个后台线程分别打印:
```go
func main() {
done := make(chan int, 10) // 带 10 个缓存
// 开N个后台打印线程
// 开 N 个后台打印线程
for i := 0; i < cap(done); i++ {
go func(){
fmt.Println("你好, 世界")
@ -99,20 +99,20 @@ func main() {
}()
}
// 等待N个后台线程完成
// 等待 N 个后台线程完成
for i := 0; i < cap(done); i++ {
<-done
}
}
```
对于这种要等待N个线程完成后再进行下一步的同步操作有一个简单的做法就是使用`sync.WaitGroup`来等待一组事件:
对于这种要等待 N 个线程完成后再进行下一步的同步操作有一个简单的做法,就是使用 `sync.WaitGroup` 来等待一组事件:
```go
func main() {
var wg sync.WaitGroup
// 开N个后台打印线程
// 开 N 个后台打印线程
for i := 0; i < 10; i++ {
wg.Add(1)
@ -122,18 +122,18 @@ func main() {
}()
}
// 等待N个后台线程完成
// 等待 N 个后台线程完成
wg.Wait()
}
```
其中`wg.Add(1)`用于增加等待事件的个数,必须确保在后台线程启动之前执行(如果放到后台线程之中执行则不能保证被正常执行到)。当后台线程完成打印工作之后,调用`wg.Done()`表示完成一个事件。`main`函数的`wg.Wait()`是等待全部的事件完成。
其中 `wg.Add(1)` 用于增加等待事件的个数,必须确保在后台线程启动之前执行(如果放到后台线程之中执行则不能保证被正常执行到)。当后台线程完成打印工作之后,调用 `wg.Done()` 表示完成一个事件。`main` 函数的 `wg.Wait()` 是等待全部的事件完成。
## 1.6.2 生产者消费者模型
并发编程中最常见的例子就是生产者消费者模式该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。简单地说就是生产者生产一些数据然后放到成果队列中同时消费者从成果队列中来取这些数据。这样就让生产消费变成了异步的两个过程。当成果队列中没有数据时消费者就进入饥饿的等待中而当成果队列中数据已满时生产者则面临因产品挤压导致CPU被剥夺的下岗问题。
并发编程中最常见的例子就是生产者消费者模式,该模式主要通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。简单地说,就是生产者生产一些数据,然后放到成果队列中,同时消费者从成果队列中来取这些数据。这样就让生产消费变成了异步的两个过程。当成果队列中没有数据时,消费者就进入饥饿的等待中;而当成果队列中数据已满时,生产者则面临因产品挤压导致 CPU 被剥夺的下岗问题。
Go语言实现生产者消费者并发很简单
Go 语言实现生产者消费者并发很简单:
```go
// 生产者: 生成 factor 整数倍的序列
@ -154,16 +154,16 @@ func main() {
go Producer(3, ch) // 生成 3 的倍数的序列
go Producer(5, ch) // 生成 5 的倍数的序列
go Consumer(ch) // 消费 生成的队列
go Consumer(ch) // 消费生成的队列
// 运行一定时间后退出
time.Sleep(5 * time.Second)
}
```
我们开启了2个`Producer`生产流水线分别用于生成3和5的倍数的序列。然后开启1个`Consumer`消费者线程,打印获取的结果。我们通过在`main`函数休眠一定的时间来让生产者和消费者工作一定时间。正如前面一节说的,这种靠休眠方式是无法保证稳定的输出结果的。
我们开启了 2 `Producer` 生产流水线,分别用于生成 3 5 的倍数的序列。然后开启 1 `Consumer` 消费者线程,打印获取的结果。我们通过在 `main` 函数休眠一定的时间来让生产者和消费者工作一定时间。正如前面一节说的,这种靠休眠方式是无法保证稳定的输出结果的。
我们可以让`main`函数保存阻塞状态不退出,只有当用户输入`Ctrl-C`时才真正退出程序:
我们可以让 `main` 函数保存阻塞状态不退出,只有当用户输入 `Ctrl-C` 时才真正退出程序:
```go
func main() {
@ -180,13 +180,13 @@ func main() {
}
```
我们这个例子中有2个生产者并且2个生产者之间并无同步事件可参考它们是并发的。因此消费者输出的结果序列的顺序是不确定的这并没有问题生产者和消费者依然可以相互配合工作。
我们这个例子中有 2 个生产者,并且 2 个生产者之间并无同步事件可参考,它们是并发的。因此,消费者输出的结果序列的顺序是不确定的,这并没有问题,生产者和消费者依然可以相互配合工作。
## 1.6.3 发布订阅模型
发布订阅publish-and-subscribe模型通常被简写为pub/sub模型。在这个模型中消息生产者成为发布者publisher而消息消费者则成为订阅者subscriber生产者和消费者是M:N的关系。在传统生产者和消费者模型中是将消息发送到一个队列中而发布订阅模型则是将消息发布给一个主题。
发布订阅publish-and-subscribe模型通常被简写为 pub/sub 模型。在这个模型中消息生产者成为发布者publisher而消息消费者则成为订阅者subscriber生产者和消费者是 M:N 的关系。在传统生产者和消费者模型中,是将消息发送到一个队列中,而发布订阅模型则是将消息发布给一个主题。
为此,我们构建了一个名为`pubsub`的发布订阅模型支持包:
为此,我们构建了一个名为 `pubsub` 的发布订阅模型支持包:
```go
// Package pubsub implements a simple multi-topic pub-sub library.
@ -323,9 +323,9 @@ func main() {
## 1.6.4 控制并发数
很多用户在适应了Go语言强大的并发特性之后都倾向于编写最大并发的程序因为这样似乎可以提供最大的性能。在现实中我们行色匆匆但有时却需要我们放慢脚步享受生活并发的程序也是一样有时候我们需要适当地控制并发的程度因为这样不仅仅可给其它的应用/任务让出/预留一定的CPU资源也可以适当降低功耗缓解电池的压力。
很多用户在适应了 Go 语言强大的并发特性之后,都倾向于编写最大并发的程序,因为这样似乎可以提供最大的性能。在现实中我们行色匆匆,但有时却需要我们放慢脚步享受生活,并发的程序也是一样:有时候我们需要适当地控制并发的程度,因为这样不仅仅可给其它的应用/任务让出/预留一定的 CPU 资源,也可以适当降低功耗缓解电池的压力。
在Go语言自带的godoc程序实现中有一个`vfs`的包对应虚拟的文件系统,在`vfs`包下面有一个`gatefs`的子包,`gatefs`子包的目的就是为了控制访问该虚拟文件系统的最大并发数。`gatefs`包的应用很简单:
Go 语言自带的 godoc 程序实现中有一个 `vfs` 的包对应虚拟的文件系统,在 `vfs` 包下面有一个 `gatefs` 的子包,`gatefs` 子包的目的就是为了控制访问该虚拟文件系统的最大并发数。`gatefs` 包的应用很简单:
```go
import (
@ -339,7 +339,7 @@ func main() {
}
```
其中`vfs.OS("/path")`基于本地文件系统构造一个虚拟的文件系统,然后`gatefs.New`基于现有的虚拟文件系统构造一个并发受控的虚拟文件系统。并发数控制的原理在前面一节已经讲过,就是通过带缓存管道的发送和接收规则来实现最大并发阻塞:
其中 `vfs.OS("/path")` 基于本地文件系统构造一个虚拟的文件系统,然后 `gatefs.New` 基于现有的虚拟文件系统构造一个并发受控的虚拟文件系统。并发数控制的原理在前面一节已经讲过,就是通过带缓存管道的发送和接收规则来实现最大并发阻塞:
```go
var limit = make(chan int, 3)
@ -356,8 +356,7 @@ func main() {
}
```
不过`gatefs`对此做一个抽象类型`gate`,增加了`enter``leave`方法分别对应并发代码的进入和离开。当超出并发数目限制的时候,`enter`方法会阻塞直到并发数降下来为止。
不过 `gatefs` 对此做一个抽象类型 `gate`,增加了 `enter``leave` 方法分别对应并发代码的进入和离开。当超出并发数目限制的时候,`enter` 方法会阻塞直到并发数降下来为止。
```go
type gate chan bool
@ -366,8 +365,7 @@ func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }
```
`gatefs`包装的新的虚拟文件系统就是将需要控制并发的方法增加了`enter``leave`调用而已:
`gatefs` 包装的新的虚拟文件系统就是将需要控制并发的方法增加了 `enter``leave` 调用而已:
```go
type gatefs struct {
@ -382,14 +380,13 @@ func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
}
```
我们不仅可以控制最大的并发数目而且可以通过带缓存Channel的使用量和最大容量比例来判断程序运行的并发率。当管道为空的时候可以认为是空闲状态当管道满了时任务是繁忙状态这对于后台一些低级任务的运行是有参考价值的。
我们不仅可以控制最大的并发数目,而且可以通过带缓存 Channel 的使用量和最大容量比例来判断程序运行的并发率。当管道为空的时候可以认为是空闲状态,当管道满了时任务是繁忙状态,这对于后台一些低级任务的运行是有参考价值的。
## 1.6.5 赢者为王
采用并发编程的动机有很多并发编程可以简化问题比如一类问题对应一个处理线程会更简单并发编程还可以提升性能在一个多核CPU上开2个线程一般会比开1个线程快一些。其实对于提升性能而言程序并不是简单地运行速度快就表示用户体验好的很多时候程序能快速响应用户请求才是最重要的当没有用户请求需要处理的时候才合适处理一些低优先级的后台任务。
采用并发编程的动机有很多:并发编程可以简化问题,比如一类问题对应一个处理线程会更简单;并发编程还可以提升性能,在一个多核 CPU 上开 2 个线程一般会比开 1 个线程快一些。其实对于提升性能而言,程序并不是简单地运行速度快就表示用户体验好的;很多时候程序能快速响应用户请求才是最重要的,当没有用户请求需要处理的时候才合适处理一些低优先级的后台任务。
假设我们想快速地搜索“golang”相关的主题我们可能会同时打开Bing、Google或百度等多个检索引擎。当某个搜索最先返回结果后就可以关闭其它搜索页面了。因为受网络环境和搜索引擎算法的影响某些搜索引擎可能很快返回搜索结果某些搜索引擎也可能等到他们公司倒闭也没有完成搜索。我们可以采用类似的策略来编写这个程序
假设我们想快速地搜索“golang”相关的主题我们可能会同时打开 Bing、Google 或百度等多个检索引擎。当某个搜索最先返回结果后,就可以关闭其它搜索页面了。因为受网络环境和搜索引擎算法的影响,某些搜索引擎可能很快返回搜索结果,某些搜索引擎也可能等到他们公司倒闭也没有完成搜索。我们可以采用类似的策略来编写这个程序:
```go
func main() {
@ -413,17 +410,15 @@ func main() {
通过适当开启一些冗余的线程,尝试用不同途径去解决同样的问题,最终以赢者为王的方式提升了程序的相应性能。
## 1.6.6 素数筛
在“Hello world 的革命”一节中我们为了演示Newsqueak的并发特性文中给出了并发版本素数筛的实现。并发版本的素数筛是一个经典的并发例子通过它我们可以更深刻地理解Go语言的并发特性。“素数筛”的原理如图
在“Hello world 的革命”一节中,我们为了演示 Newsqueak 的并发特性,文中给出了并发版本素数筛的实现。并发版本的素数筛是一个经典的并发例子,通过它我们可以更深刻地理解 Go 语言的并发特性。“素数筛”的原理如图:
![](../images/ch1-13-prime-sieve.png)
*图 1-13 素数筛*
_图 1-13 素数筛_
我们需要先生成最初的`2, 3, 4, ...`自然数序列不包含开头的0、1
我们需要先生成最初的 `2, 3, 4, ...` 自然数序列(不包含开头的 0、1
```go
// 返回生成自然数序列的管道: 2, 3, 4, ...
@ -438,7 +433,7 @@ func GenerateNatural() chan int {
}
```
`GenerateNatural`函数内部启动一个Goroutine生产序列返回对应的管道。
`GenerateNatural` 函数内部启动一个 Goroutine 生产序列,返回对应的管道。
然后是为每个素数构造一个筛子:将输入序列中是素数倍数的数提出,并返回新的序列,是一个新的管道。
@ -457,9 +452,9 @@ func PrimeFilter(in <-chan int, prime int) chan int {
}
```
`PrimeFilter`函数也是内部启动一个Goroutine生产序列返回过滤后序列对应的管道。
`PrimeFilter` 函数也是内部启动一个 Goroutine 生产序列,返回过滤后序列对应的管道。
现在我们可以在`main`函数中驱动这个并发的素数筛了:
现在我们可以在 `main` 函数中驱动这个并发的素数筛了:
```go
func main() {
@ -472,17 +467,17 @@ func main() {
}
```
我们先是调用`GenerateNatural()`生成最原始的从2开始的自然数序列。然后开始一个100次迭代的循环希望生成100个素数。在每次循环迭代开始的时候管道中的第一个数必定是素数我们先读取并打印这个素数。然后基于管道中剩余的数列并以当前取出的素数为筛子过滤后面的素数。不同的素数筛子对应的管道是串联在一起的。
我们先是调用 `GenerateNatural()` 生成最原始的从 2 开始的自然数序列。然后开始一个 100 次迭代的循环,希望生成 100 个素数。在每次循环迭代开始的时候,管道中的第一个数必定是素数,我们先读取并打印这个素数。然后基于管道中剩余的数列,并以当前取出的素数为筛子过滤后面的素数。不同的素数筛子对应的管道是串联在一起的。
素数筛展示了一种优雅的并发程序结构。但是因为每个并发体处理的任务粒度太细微程序整体的性能并不理想。对于细粒度的并发程序CSP模型中固有的消息传递的代价太高了多线程并发模型同样要面临线程启动的代价
素数筛展示了一种优雅的并发程序结构。但是因为每个并发体处理的任务粒度太细微程序整体的性能并不理想。对于细粒度的并发程序CSP 模型中固有的消息传递的代价太高了(多线程并发模型同样要面临线程启动的代价)。
## 1.6.7 并发的安全退出
有时候我们需要通知goroutine停止它正在干的事情特别是当它工作在错误的方向上的时候。Go语言并没有提供在一个直接终止Goroutine的方法由于这样会导致goroutine之间的共享变量处在未定义的状态上。但是如果我们想要退出两个或者任意多个Goroutine怎么办呢
有时候我们需要通知 Goroutine 停止它正在干的事情特别是当它工作在错误的方向上的时候。Go 语言并没有提供在一个直接终止 Goroutine 的方法,由于这样会导致 Goroutine 之间的共享变量处在未定义的状态上。但是如果我们想要退出两个或者任意多个 Goroutine 怎么办呢?
Go语言中不同Goroutine之间主要依靠管道进行通信和同步。要同时处理多个管道的发送或接收操作我们需要使用`select`关键字(这个关键字和网络编程中的`select`函数的行为类似)。当`select`有多个分支时,会随机选择一个可用的管道分支,如果没有可用的管道分支则选择`default`分支,否则会一直保存阻塞状态。
Go 语言中不同 Goroutine 之间主要依靠管道进行通信和同步。要同时处理多个管道的发送或接收操作,我们需要使用 `select` 关键字(这个关键字和网络编程中的 `select` 函数的行为类似)。当 `select` 有多个分支时,会随机选择一个可用的管道分支,如果没有可用的管道分支则选择 `default` 分支,否则会一直保存阻塞状态。
基于`select`实现的管道的超时判断:
基于 `select` 实现的管道的超时判断:
```go
select {
@ -493,7 +488,7 @@ case <-time.After(time.Second):
}
```
通过`select``default`分支实现非阻塞的管道发送或接收操作:
通过 `select` `default` 分支实现非阻塞的管道发送或接收操作:
```go
select {
@ -504,7 +499,7 @@ default:
}
```
通过`select`来阻止`main`函数退出:
通过 `select` 来阻止 `main` 函数退出:
```go
func main() {
@ -513,7 +508,7 @@ func main() {
}
```
当有多个管道均可操作时,`select`会随机选择一个管道。基于该特性我们可以用`select`实现一个生成随机数序列的程序:
当有多个管道均可操作时,`select` 会随机选择一个管道。基于该特性我们可以用 `select` 实现一个生成随机数序列的程序:
```go
func main() {
@ -533,7 +528,7 @@ func main() {
}
```
我们通过`select``default`分支可以很容易实现一个Goroutine的退出控制:
我们通过 `select` `default` 分支可以很容易实现一个 Goroutine 的退出控制:
```go
func worker(cancel chan bool) {
@ -557,7 +552,7 @@ func main() {
}
```
但是管道的发送操作和接收操作是一一对应的如果要停止多个Goroutine那么可能需要创建同样数量的管道这个代价太大了。其实我们可以通过`close`关闭一个管道来实现广播的效果,所有从关闭管道接收的操作均会收到一个零值和一个可选的失败标志。
但是管道的发送操作和接收操作是一一对应的,如果要停止多个 Goroutine 那么可能需要创建同样数量的管道,这个代价太大了。其实我们可以通过 `close` 关闭一个管道来实现广播的效果,所有从关闭管道接收的操作均会收到一个零值和一个可选的失败标志。
```go
func worker(cancel chan bool) {
@ -584,7 +579,7 @@ func main() {
}
```
我们通过`close`来关闭`cancel`管道向多个Goroutine广播退出的指令。不过这个程序依然不够稳健当每个Goroutine收到退出指令退出时一般会进行一定的清理工作但是退出的清理工作并不能保证被完成因为`main`线程并没有等待各个工作Goroutine退出工作完成的机制。我们可以结合`sync.WaitGroup`来改进:
我们通过 `close` 来关闭 `cancel` 管道向多个 Goroutine 广播退出的指令。不过这个程序依然不够稳健:当每个 Goroutine 收到退出指令退出时一般会进行一定的清理工作,但是退出的清理工作并不能保证被完成,因为 `main` 线程并没有等待各个工作 Goroutine 退出工作完成的机制。我们可以结合 `sync.WaitGroup` 来改进:
```go
func worker(wg *sync.WaitGroup, cancel chan bool) {
@ -615,12 +610,11 @@ func main() {
}
```
现在每个工作者并发体的创建、运行、暂停和退出都是在`main`函数的安全控制之下了。
现在每个工作者并发体的创建、运行、暂停和退出都是在 `main` 函数的安全控制之下了。
## 1.6.8 context 包
## 1.6.8 context包
在Go1.7发布时,标准库增加了一个`context`用来简化对于处理单个请求的多个Goroutine之间与请求域的数据、超时和退出等操作官方有博文对此做了专门介绍。我们可以用`context`包来重新实现前面的线程安全退出或超时的控制:
在 Go1.7 发布时,标准库增加了一个 `context` 包,用来简化对于处理单个请求的多个 Goroutine 之间与请求域的数据、超时和退出等操作,官方有博文对此做了专门介绍。我们可以用 `context` 包来重新实现前面的线程安全退出或超时的控制:
```go
func worker(ctx context.Context, wg *sync.WaitGroup) error {
@ -652,9 +646,9 @@ func main() {
}
```
当并发体超时或`main`主动停止工作者Goroutine时每个工作者都可以安全退出。
当并发体超时或 `main` 主动停止工作者 Goroutine 时,每个工作者都可以安全退出。
Go语言是带内存自动回收特性的因此内存一般不会泄漏。在前面素数筛的例子中`GenerateNatural``PrimeFilter`函数内部都启动了新的Goroutine`main`函数不再使用管道时后台Goroutine有泄漏的风险。我们可以通过`context`包来避免这个问题,下面是改进的素数筛实现:
Go 语言是带内存自动回收特性的,因此内存一般不会泄漏。在前面素数筛的例子中,`GenerateNatural` `PrimeFilter` 函数内部都启动了新的 Goroutine `main` 函数不再使用管道时后台 Goroutine 有泄漏的风险。我们可以通过 `context` 包来避免这个问题,下面是改进的素数筛实现:
```go
// 返回生成自然数序列的管道: 2, 3, 4, ...
@ -690,7 +684,7 @@ func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
}
func main() {
// 通过 Context 控制后台Goroutine状态
// 通过 Context 控制后台 Goroutine 状态
ctx, cancel := context.WithCancel(context.Background())
ch := GenerateNatural(ctx) // 自然数序列: 2, 3, 4, ...
@ -704,10 +698,10 @@ func main() {
}
```
当main函数完成工作前通过调用`cancel()`来通知后台Goroutine退出这样就避免了Goroutine的泄漏。
main 函数完成工作前,通过调用 `cancel()` 来通知后台 Goroutine 退出,这样就避免了 Goroutine 的泄漏。
然而,上面这个例子只是展示了`cancel()`的基础用法实际上这个例子会导致Goroutine死锁不能正常退出。
我们可以给上面这个例子添加`sync.WaitGroup`来复现这个问题。
然而,上面这个例子只是展示了 `cancel()` 的基础用法,实际上这个例子会导致 Goroutine 死锁,不能正常退出。
我们可以给上面这个例子添加 `sync.WaitGroup` 来复现这个问题。
```go
package main
@ -754,7 +748,7 @@ func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGro
func main() {
wg := sync.WaitGroup{}
// 通过 Context 控制后台Goroutine状态
// 通过 Context 控制后台 Goroutine 状态
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
ch := GenerateNatural(ctx, &wg) // 自然数序列: 2, 3, 4, ...
@ -770,8 +764,8 @@ func main() {
}
```
执行上面这个例子很容易就复现了死锁的问题,原因是素数筛中的`ctx.Done()`位于`if i := <-in; i%prime != 0`判断之内,
而这个判断可能会一直阻塞,导致goroutine无法正常退出。让我们来解决这个问题。
执行上面这个例子很容易就复现了死锁的问题,原因是素数筛中的 `ctx.Done()` 位于 `if i := <-in; i%prime != 0` 判断之内,
而这个判断可能会一直阻塞,导致 Goroutine 无法正常退出。让我们来解决这个问题。
```go
package main
@ -824,7 +818,7 @@ func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGro
func main() {
wg := sync.WaitGroup{}
// 通过 Context 控制后台Goroutine状态
// 通过 Context 控制后台 Goroutine 状态
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
ch := GenerateNatural(ctx, &wg) // 自然数序列: 2, 3, 4, ...
@ -840,7 +834,7 @@ func main() {
}
```
如上所示,我们可以通过将`i := <-in`放入select在这个select内也执行`<-ctx.Done()`来解决阻塞导致的死锁。
如上所示,我们可以通过将 `i := <-in` 放入 select在这个 select 内也执行 `<-ctx.Done()` 来解决阻塞导致的死锁。
不过上面这个例子并不优美,让我们换一种方式。
```go
@ -890,7 +884,7 @@ func PrimeFilter(ctx context.Context, in <-chan int, prime int, wg *sync.WaitGro
func main() {
wg := sync.WaitGroup{}
// 通过 Context 控制后台Goroutine状态
// 通过 Context 控制后台 Goroutine 状态
ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
ch := GenerateNatural(ctx, &wg) // 自然数序列: 2, 3, 4, ...
@ -907,9 +901,10 @@ func main() {
```
在上面这个例子中主要有以下几点需要关注:
1. 通过`for range`循环保证了输入管道被关闭时,循环能退出,不会出现死循环;
2. 通过`defer close`保证了无论是输入管道被关闭还是ctx被取消只要素数筛退出都会关闭输出管道。
1. 通过 `for range` 循环保证了输入管道被关闭时,循环能退出,不会出现死循环;
2. 通过 `defer close` 保证了无论是输入管道被关闭,还是 ctx 被取消,只要素数筛退出,都会关闭输出管道。
至此,我们终于足够优美地解决了这个死锁问题。
并发是一个非常大的主题我们这里只是展示几个非常基础的并发编程的例子。官方文档也有很多关于并发编程的讨论国内也有专门讨论Go语言并发编程的书籍。读者可以根据自己的需求查阅相关的文献。
并发是一个非常大的主题,我们这里只是展示几个非常基础的并发编程的例子。官方文档也有很多关于并发编程的讨论,国内也有专门讨论 Go 语言并发编程的书籍。读者可以根据自己的需求查阅相关的文献。