From d2780acaef20258d9ec15df822e8525b2f0023e9 Mon Sep 17 00:00:00 2001 From: chai2010 Date: Sun, 22 Jul 2018 17:00:22 +0800 Subject: [PATCH] =?UTF-8?q?ch1-06:=20=E8=A3=81=E5=89=AA=E5=86=85=E5=AE=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ch1-basic/ch1-06-goroutine.md | 295 ++++------------------------------ draft/ch1-06.md | 258 +++++++++++++++++++++++++++++ 2 files changed, 285 insertions(+), 268 deletions(-) create mode 100644 draft/ch1-06.md diff --git a/ch1-basic/ch1-06-goroutine.md b/ch1-basic/ch1-06-goroutine.md index 7f208a1..ef96fa6 100644 --- a/ch1-basic/ch1-06-goroutine.md +++ b/ch1-basic/ch1-06-goroutine.md @@ -318,34 +318,6 @@ func main() { 在发布订阅模型中,每条消息都会传送给多个订阅者。发布者通常不会知道、也不关心哪一个订阅者正在接收主题消息。订阅者和发布者可以在运行时动态添加是一种松散的耦合关心,这使得系统的复杂性可以随时间的推移而增长。在现实生活中,不同城市的象天气预报之类的应用就可以应用这个并发模式。 -## 赢者为王 - -采用并发编程的动机有很多:并发编程可以简化问题,比如一类问题对应一个处理线程会更简单;并发编程还可以提升性能,在一个多核CPU上开2个线程一般会比开1个线程快一些。其实对于提升性能而言,程序并不是简单地运行速度快就表示用户体验好的;很多时候程序能快速响应用户请求才是最重要的,当没有用户请求需要处理的时候才合适处理一些低优先级的后台任务。 - -假设我们想快速地检索“golang”相关的主题,我们可能会同时打开Bing、Google或百度等多个检索引擎。当某个检索最先返回结果后,就可以关闭其它检索页面了。因为受限于网络环境和检索引擎算法的影响,某些检索引擎可能很快返回检索结果,某些检索引擎也可能遇到等到他们公司倒闭也没有完成检索的情况。我们可以采用类似的策略来编写这个程序: - -```go -func main() { - ch := make(chan string, 32) - - go func() { - ch <- searchByBing("golang") - } - go func() { - ch <- searchByGoogle("golang") - } - go func() { - ch <- searchByBaidu("golang") - } - - fmt.Println(<-ch) -} -``` - -首先,我们创建了一个带缓存的管道,管道的缓存数目要足够大,保证不会因为缓存的容量引起不必要的阻塞。然后我们开启了多个后台线程,分别向不同的检索引擎提交检索请求。当任意一个检索引擎最先有结果之后,都会马上将结果发到管道中(因为管道带了足够的缓存,这个过程不会阻塞)。但是最终我们只从管道取第一个结果,也就是最先返回的结果。 - -通过适当开启一些冗余的线程,尝试用不同途径去解决同样的问题,最终以赢者为王的方式提升了程序的相应性能。 - ## 控制并发数 很多用户在适应了Go语言强大的并发特性之后,都倾向于编写最大并发的程序,因为这样似乎可以提供最大的性能。在现实中我们行色匆匆,但有时却需要我们放慢脚步享受生活,并发的程序也是一样:有时候我们需要适当地控制并发的程度,因为这样不仅仅可给其它的应用/任务让出/预留一定的CPU资源,也可以适当降低功耗缓解电池的压力。 @@ -381,6 +353,7 @@ func main() { } ``` + 不过`gatefs`对此做一个抽象类型`gate`,增加了`enter`和`leave`方法分别对应并发代码的进入和离开。当超出并发数目限制的时候,`enter`方法会阻塞直到并发数降下来为止。 ```go @@ -392,6 +365,7 @@ func (g gate) leave() { <-g } `gatefs`包装的新的虚拟文件系统就是将需要控制并发的方法增加了`enter`和`leave`调用而已: + ```go type gatefs struct { fs vfs.FileSystem @@ -405,45 +379,37 @@ func (fs gatefs) Lstat(p string) (os.FileInfo, error) { } ``` -我们不仅可以控制最大的并发数目,而且可以通过带缓存Channel的使用量和最大容量比例来判断程序运行的并发率。当管道为空的时候可以认为是空闲状态,当管道满了时任务是繁忙状态,这对于后台一些低级任务的运行是有参考价值的。增加的方法如下: +我们不仅可以控制最大的并发数目,而且可以通过带缓存Channel的使用量和最大容量比例来判断程序运行的并发率。当管道为空的时候可以认为是空闲状态,当管道满了时任务是繁忙状态,这对于后台一些低级任务的运行是有参考价值的。 + + +## 赢者为王 + +采用并发编程的动机有很多:并发编程可以简化问题,比如一类问题对应一个处理线程会更简单;并发编程还可以提升性能,在一个多核CPU上开2个线程一般会比开1个线程快一些。其实对于提升性能而言,程序并不是简单地运行速度快就表示用户体验好的;很多时候程序能快速响应用户请求才是最重要的,当没有用户请求需要处理的时候才合适处理一些低优先级的后台任务。 + +假设我们想快速地检索“golang”相关的主题,我们可能会同时打开Bing、Google或百度等多个检索引擎。当某个检索最先返回结果后,就可以关闭其它检索页面了。因为受限于网络环境和检索引擎算法的影响,某些检索引擎可能很快返回检索结果,某些检索引擎也可能遇到等到他们公司倒闭也没有完成检索的情况。我们可以采用类似的策略来编写这个程序: ```go -func (g gate) Len() int { return len(g) } -func (g gate) Cap() int { return cap(g) } +func main() { + ch := make(chan string, 32) -func (g gate) Idle() bool { return len(g) == 0 } -func (g gate) Busy() bool { return len(g) == cap(g) } - -func (g gate) Fraction() float64 { - return float64(len(g)) / float64(cap(g)) -} -``` - -然后我们可以在相对空闲的时候处理一些后台低优先级的任务,在并发相对繁忙或超出一定比例的时候提供预警: - -```go -func New(fs vfs.FileSystem, gate chan bool) *gatefs { - p := &gatefs{fs, gate} - - // 后台监控线程 go func() { - for { - switch { - case p.gate.Idle(): - // 处理后台任务 - case p.gate.Fraction() >= 0.7: - // 并发预警 - default: - time.Sleep(time.Second) - } - } - }() + ch <- searchByBing("golang") + } + go func() { + ch <- searchByGoogle("golang") + } + go func() { + ch <- searchByBaidu("golang") + } - return p + fmt.Println(<-ch) } ``` -这样我们通过后台线程就可以根据程序的状态动态调整自己的工作模式。 +首先,我们创建了一个带缓存的管道,管道的缓存数目要足够大,保证不会因为缓存的容量引起不必要的阻塞。然后我们开启了多个后台线程,分别向不同的检索引擎提交检索请求。当任意一个检索引擎最先有结果之后,都会马上将结果发到管道中(因为管道带了足够的缓存,这个过程不会阻塞)。但是最终我们只从管道取第一个结果,也就是最先返回的结果。 + +通过适当开启一些冗余的线程,尝试用不同途径去解决同样的问题,最终以赢者为王的方式提升了程序的相应性能。 + ## 素数筛 @@ -645,215 +611,8 @@ func main() { 现在每个工作者并发体的创建、运行、暂停和退出都是在`main`函数的安全控制之下了。 -## 消费海量的请求 -在前面的生产者、消费者并发模型中,只有当生产者和消费的速度近似相等时才会达到最佳的效果,同时通过引入带缓存的管道可以消除因临时效率波动产生的影响。但是当生产者和消费者的速度严重不匹配时,我们是无法通过带缓存的管道来提高性能的(缓存的管道只能延缓问题发生的时间,无法消除速度差异带来的问题)。当消费者无法及时消费生产者的输出时,时间积累会导致问题越来越严重。 - -对于生产者、消费者并发模型,我们当然可以通过降低生产者的产能来避免资源的浪费。但在很多场景中,生产者才是核心对象,它们生产出各种问题或任务单据,这时候产出的问题是必须要解决的、任务单据也是必须要完成的。在现实生活中,制造各种生活垃圾的海量人类其实就是垃圾生产者,而清理生活垃圾的少量的清洁工就是垃圾消费者。在网络服务中,提交POST数据的海量用户则变成了生产者,Web后台服务则对应POST数据的消费者。海量生产者的问题也就变成了:如何构造一个能够处理海量请求的Web服务(假设每分钟百万级请求)。 - -在Web服务中,用户提交的每个POST请求可以看作是一个Job任务,而服务器是通过后台的Worker工作者来消费这些Job任务。当面向海量的Job处理时,我们一般可以通过构造一个Worker工作者池来提高Job的处理效率;通过一个带缓存的Job管道来接收新的任务请求,避免任务请求功能无法响应;Job请求接收管道和Worker工作者池通过分发系统来衔接。 - -我们可以用管道来模拟工作者池:当需要处理一个任务时,先从工作者池取一个工作者,处理完任务之后将工作者返回给工作者池。`WorkerPool`对应工作者池,`Worker`对应工作者。 - -```go -type WorkerPool struct { - workers []*Worker - pool chan *Worker -} - -// 构造工作者池 -func NewWorkerPool(maxWorkers int) *WorkerPool { - p := &WorkerPool{ - workers: make([]*Worker, maxWorkers) - pool: make(chan *Worker, maxWorkers) - } - - // 初始化工作者 - for i, _ := range p.workers { - worker := NewWorker(0) - p.workers[i] = worker - p.pool <- worker - } - return p -} - -// 启动工作者 -func (p *WorkerPool) Start() { - for _, worker := range p.workers { - worker.Start() - } -} - -// 停止工作者 -func (p *WorkerPool) Stop() { - for _, worker := range p.workers { - worker.Stop() - } -} - -// 获取工作者(阻塞) -func (p *WorkerPool) Get() *Worker { - return <-p.pool -} - -// 返回工作者 -func (p *WorkerPool) Put(w *Worker) { - p.pool <- w -} -``` - -工作者池通过一个带缓存的管道来提高工作者的管理。当所有工作者都在处理任务时,工作者的获取会阻塞自动有工作者可用为止。 - -`Worker`对应工作者实现,具体任务由后台一个固定的Goroutine完成,和外界通过专有的管道通信(工作者的私有管道也可以选择带有一定的缓存)具体实现如下: - -```go -type Worker struct { - job chan interface{} - quit chan bool - wg sync.WaitGroup -} - -// 构造工作者 -func NewWorker(maxJobs int) *Worker { - return &Worker{ - job: make(chan interface{}, maxJobs), - quit: make(chan bool), - } -} - -// 启动任务 -func (w *Worker) Start() { - p.wg.Add(1) - - go func() { - defer p.wg.Done() - - for { - // 接收任务 - // 此时工作中已经从工作者池中取出 - select { - case job := <-p.job: - // 处理任务 - - case <-w.quit: - return - } - } - }() -} - -// 关闭任务 -func (p *Worker) Stop() { - p.quit <- true - p.wg.Wait() -} - -// 提交任务 -func (p *Worker) AddJob(job interface{}) { - p.job <- job -} -``` - -任务的分发系统在`Service`对象中完成: - -```go -type Service struct { - workers *WorkerPool - jobs chan interface{} - maxJobs int - wg sync.WaitGroup -} - -func NewService(maxWorkers, maxJobs int) *Service { - return &Service { - workers: NewWorkerPool(maxWorkers), - jobs: make(chan interface{}, maxJobs), - } -} - -func (p *Service) Start() { - p.jobs = make(chan interface{}, maxJobs) - - p.wg.Add(1) - p.workers.Start() - - go func() { - defer p.wg.Done() - - for job := range p.jobs: - go func(job Job) { - // 从工作者池取一个工作者 - worker := p.workers.Get() - - // 完成任务后返回给工作者池 - defer p.workers.Put(worker) - - // 提交任务处理(异步) - worker.AddJob(job) - }(job) - } - }() -} -func (p *Service) Stop() { - p.workers.Stop() - close(p.jobs) - p.wg.Wait() -} - -// 提交任务 -// 任务管道带较大的缓存, 延缓阻塞的时间 -func (p *Service) AddJob(job interface{}) { - p.jobs <- job -} -``` - -主程序可以是一个web服务器: - -```go -var ( - MaxWorker = os.Getenv("MAX_WORKERS") - MaxQueue = os.Getenv("MAX_QUEUE") -) - -func main() { - service := NewService(MaxWorker, MaxQueue) - - service.Start() - defer service.Stop() - - // 处理海量的任务 - http.HandleFunc("/jobs", func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - - // Job以JSON格式提交 - var jobs []Job - err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&jobs) - if err != nil { - w.Header().Set("Content-Type", "application/json; charset=UTF-8") - w.WriteHeader(http.StatusBadRequest) - return - } - - // 处理任务 - for _, job := range jobs { - service.AddJob(job) - } - - // OK - w.WriteHeader(http.StatusOK) - }) - - // 启动web服务 - log.Fatal(http.ListenAndServe(":8080", nil)) -} -``` - -基于Go语言特有的管道和Goroutine特性,我们以非常简单的方式设计了一个针对海量请求的处理系统结构。在实际的系统中,用户可以根据任务的具体类型和特性,将管道定义为具体类型以避免接口等动态特性导致的开销。 - -## 更多 +## context包 在Go1.7发布时,标准库增加了一个`context`包,用来简化对于处理单个请求的多个Goroutine之间与请求域的数据、超时和退出等操作,官方有博文对此做了专门介绍。我们可以用`context`包来重新实现前面的线程安全退出或超时的控制: @@ -934,7 +693,7 @@ func main() { fmt.Printf("%v: %v\n", i+1, prime) ch = PrimeFilter(ctx, ch, prime) // 基于新素数构造的过滤器 } - + cancel() } ``` diff --git a/draft/ch1-06.md b/draft/ch1-06.md new file mode 100644 index 0000000..9962a01 --- /dev/null +++ b/draft/ch1-06.md @@ -0,0 +1,258 @@ +## 控制并发数 + +增加的方法如下: + + + +```go +func (g gate) Len() int { return len(g) } +func (g gate) Cap() int { return cap(g) } + +func (g gate) Idle() bool { return len(g) == 0 } +func (g gate) Busy() bool { return len(g) == cap(g) } + +func (g gate) Fraction() float64 { + return float64(len(g)) / float64(cap(g)) +} +``` + +然后我们可以在相对空闲的时候处理一些后台低优先级的任务,在并发相对繁忙或超出一定比例的时候提供预警: + + +```go +func New(fs vfs.FileSystem, gate chan bool) *gatefs { + p := &gatefs{fs, gate} + + // 后台监控线程 + go func() { + for { + switch { + case p.gate.Idle(): + // 处理后台任务 + case p.gate.Fraction() >= 0.7: + // 并发预警 + default: + time.Sleep(time.Second) + } + } + }() + + return p +} +``` + +这样我们通过后台线程就可以根据程序的状态动态调整自己的工作模式。 + + + +## 消费海量的请求 + +在前面的生产者、消费者并发模型中,只有当生产者和消费的速度近似相等时才会达到最佳的效果,同时通过引入带缓存的管道可以消除因临时效率波动产生的影响。但是当生产者和消费者的速度严重不匹配时,我们是无法通过带缓存的管道来提高性能的(缓存的管道只能延缓问题发生的时间,无法消除速度差异带来的问题)。当消费者无法及时消费生产者的输出时,时间积累会导致问题越来越严重。 + +对于生产者、消费者并发模型,我们当然可以通过降低生产者的产能来避免资源的浪费。但在很多场景中,生产者才是核心对象,它们生产出各种问题或任务单据,这时候产出的问题是必须要解决的、任务单据也是必须要完成的。在现实生活中,制造各种生活垃圾的海量人类其实就是垃圾生产者,而清理生活垃圾的少量的清洁工就是垃圾消费者。在网络服务中,提交POST数据的海量用户则变成了生产者,Web后台服务则对应POST数据的消费者。海量生产者的问题也就变成了:如何构造一个能够处理海量请求的Web服务(假设每分钟百万级请求)。 + +在Web服务中,用户提交的每个POST请求可以看作是一个Job任务,而服务器是通过后台的Worker工作者来消费这些Job任务。当面向海量的Job处理时,我们一般可以通过构造一个Worker工作者池来提高Job的处理效率;通过一个带缓存的Job管道来接收新的任务请求,避免任务请求功能无法响应;Job请求接收管道和Worker工作者池通过分发系统来衔接。 + + +我们可以用管道来模拟工作者池:当需要处理一个任务时,先从工作者池取一个工作者,处理完任务之后将工作者返回给工作者池。`WorkerPool`对应工作者池,`Worker`对应工作者。 + +```go +type WorkerPool struct { + workers []*Worker + pool chan *Worker +} + +// 构造工作者池 +func NewWorkerPool(maxWorkers int) *WorkerPool { + p := &WorkerPool{ + workers: make([]*Worker, maxWorkers) + pool: make(chan *Worker, maxWorkers) + } + + // 初始化工作者 + for i, _ := range p.workers { + worker := NewWorker(0) + p.workers[i] = worker + p.pool <- worker + } + return p +} + +// 启动工作者 +func (p *WorkerPool) Start() { + for _, worker := range p.workers { + worker.Start() + } +} + +// 停止工作者 +func (p *WorkerPool) Stop() { + for _, worker := range p.workers { + worker.Stop() + } +} + +// 获取工作者(阻塞) +func (p *WorkerPool) Get() *Worker { + return <-p.pool +} + +// 返回工作者 +func (p *WorkerPool) Put(w *Worker) { + p.pool <- w +} +``` + +工作者池通过一个带缓存的管道来提高工作者的管理。当所有工作者都在处理任务时,工作者的获取会阻塞自动有工作者可用为止。 + + +`Worker`对应工作者实现,具体任务由后台一个固定的Goroutine完成,和外界通过专有的管道通信(工作者的私有管道也可以选择带有一定的缓存)具体实现如下: + +```go +type Worker struct { + job chan interface{} + quit chan bool + wg sync.WaitGroup +} + +// 构造工作者 +func NewWorker(maxJobs int) *Worker { + return &Worker{ + job: make(chan interface{}, maxJobs), + quit: make(chan bool), + } +} + +// 启动任务 +func (w *Worker) Start() { + p.wg.Add(1) + + go func() { + defer p.wg.Done() + + for { + // 接收任务 + // 此时工作中已经从工作者池中取出 + select { + case job := <-p.job: + // 处理任务 + + case <-w.quit: + return + } + } + }() +} + +// 关闭任务 +func (p *Worker) Stop() { + p.quit <- true + p.wg.Wait() +} + +// 提交任务 +func (p *Worker) AddJob(job interface{}) { + p.job <- job +} +``` + +任务的分发系统在`Service`对象中完成: + + +```go +type Service struct { + workers *WorkerPool + jobs chan interface{} + maxJobs int + wg sync.WaitGroup +} + +func NewService(maxWorkers, maxJobs int) *Service { + return &Service { + workers: NewWorkerPool(maxWorkers), + jobs: make(chan interface{}, maxJobs), + } +} + +func (p *Service) Start() { + p.jobs = make(chan interface{}, maxJobs) + + p.wg.Add(1) + p.workers.Start() + + go func() { + defer p.wg.Done() + + for job := range p.jobs: + go func(job Job) { + // 从工作者池取一个工作者 + worker := p.workers.Get() + + // 完成任务后返回给工作者池 + defer p.workers.Put(worker) + + // 提交任务处理(异步) + worker.AddJob(job) + }(job) + } + }() +} +func (p *Service) Stop() { + p.workers.Stop() + close(p.jobs) + p.wg.Wait() +} + +// 提交任务 +// 任务管道带较大的缓存, 延缓阻塞的时间 +func (p *Service) AddJob(job interface{}) { + p.jobs <- job +} +``` + +主程序可以是一个web服务器: + + +```go +var ( + MaxWorker = os.Getenv("MAX_WORKERS") + MaxQueue = os.Getenv("MAX_QUEUE") +) + +func main() { + service := NewService(MaxWorker, MaxQueue) + + service.Start() + defer service.Stop() + + // 处理海量的任务 + http.HandleFunc("/jobs", func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + // Job以JSON格式提交 + var jobs []Job + err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&jobs) + if err != nil { + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + w.WriteHeader(http.StatusBadRequest) + return + } + + // 处理任务 + for _, job := range jobs { + service.AddJob(job) + } + + // OK + w.WriteHeader(http.StatusOK) + }) + + // 启动web服务 + log.Fatal(http.ListenAndServe(":8080", nil)) +} +``` + +基于Go语言特有的管道和Goroutine特性,我们以非常简单的方式设计了一个针对海量请求的处理系统结构。在实际的系统中,用户可以根据任务的具体类型和特性,将管道定义为具体类型以避免接口等动态特性导致的开销。