## 控制并发数 增加的方法如下: ```go func (g gate) Len() int { return len(g) } func (g gate) Cap() int { return cap(g) } func (g gate) Idle() bool { return len(g) == 0 } func (g gate) Busy() bool { return len(g) == cap(g) } func (g gate) Fraction() float64 { return float64(len(g)) / float64(cap(g)) } ``` 然后我们可以在相对空闲的时候处理一些后台低优先级的任务,在并发相对繁忙或超出一定比例的时候提供预警: ```go func New(fs vfs.FileSystem, gate chan bool) *gatefs { p := &gatefs{fs, gate} // 后台监控线程 go func() { for { switch { case p.gate.Idle(): // 处理后台任务 case p.gate.Fraction() >= 0.7: // 并发预警 default: time.Sleep(time.Second) } } }() return p } ``` 这样我们通过后台线程就可以根据程序的状态动态调整自己的工作模式。 ## 消费海量的请求 在前面的生产者、消费者并发模型中,只有当生产者和消费的速度近似相等时才会达到最佳的效果,同时通过引入带缓存的管道可以消除因临时效率波动产生的影响。但是当生产者和消费者的速度严重不匹配时,我们是无法通过带缓存的管道来提高性能的(缓存的管道只能延缓问题发生的时间,无法消除速度差异带来的问题)。当消费者无法及时消费生产者的输出时,时间积累会导致问题越来越严重。 对于生产者、消费者并发模型,我们当然可以通过降低生产者的产能来避免资源的浪费。但在很多场景中,生产者才是核心对象,它们生产出各种问题或任务单据,这时候产出的问题是必须要解决的、任务单据也是必须要完成的。在现实生活中,制造各种生活垃圾的海量人类其实就是垃圾生产者,而清理生活垃圾的少量的清洁工就是垃圾消费者。在网络服务中,提交POST数据的海量用户则变成了生产者,Web后台服务则对应POST数据的消费者。海量生产者的问题也就变成了:如何构造一个能够处理海量请求的Web服务(假设每分钟百万级请求)。 在Web服务中,用户提交的每个POST请求可以看作是一个Job任务,而服务器是通过后台的Worker工作者来消费这些Job任务。当面向海量的Job处理时,我们一般可以通过构造一个Worker工作者池来提高Job的处理效率;通过一个带缓存的Job管道来接收新的任务请求,避免任务请求功能无法响应;Job请求接收管道和Worker工作者池通过分发系统来衔接。 我们可以用管道来模拟工作者池:当需要处理一个任务时,先从工作者池取一个工作者,处理完任务之后将工作者返回给工作者池。`WorkerPool`对应工作者池,`Worker`对应工作者。 ```go type WorkerPool struct { workers []*Worker pool chan *Worker } // 构造工作者池 func NewWorkerPool(maxWorkers int) *WorkerPool { p := &WorkerPool{ workers: make([]*Worker, maxWorkers) pool: make(chan *Worker, maxWorkers) } // 初始化工作者 for i, _ := range p.workers { worker := NewWorker(0) p.workers[i] = worker p.pool <- worker } return p } // 启动工作者 func (p *WorkerPool) Start() { for _, worker := range p.workers { worker.Start() } } // 停止工作者 func (p *WorkerPool) Stop() { for _, worker := range p.workers { worker.Stop() } } // 获取工作者(阻塞) func (p *WorkerPool) Get() *Worker { return <-p.pool } // 返回工作者 func (p *WorkerPool) Put(w *Worker) { p.pool <- w } ``` 工作者池通过一个带缓存的管道来提高工作者的管理。当所有工作者都在处理任务时,工作者的获取会阻塞自动有工作者可用为止。 `Worker`对应工作者实现,具体任务由后台一个固定的Goroutine完成,和外界通过专有的管道通信(工作者的私有管道也可以选择带有一定的缓存)具体实现如下: ```go type Worker struct { job chan interface{} quit chan bool wg sync.WaitGroup } // 构造工作者 func NewWorker(maxJobs int) *Worker { return &Worker{ job: make(chan interface{}, maxJobs), quit: make(chan bool), } } // 启动任务 func (w *Worker) Start() { p.wg.Add(1) go func() { defer p.wg.Done() for { // 接收任务 // 此时工作中已经从工作者池中取出 select { case job := <-p.job: // 处理任务 case <-w.quit: return } } }() } // 关闭任务 func (p *Worker) Stop() { p.quit <- true p.wg.Wait() } // 提交任务 func (p *Worker) AddJob(job interface{}) { p.job <- job } ``` 任务的分发系统在`Service`对象中完成: ```go type Service struct { workers *WorkerPool jobs chan interface{} maxJobs int wg sync.WaitGroup } func NewService(maxWorkers, maxJobs int) *Service { return &Service { workers: NewWorkerPool(maxWorkers), jobs: make(chan interface{}, maxJobs), } } func (p *Service) Start() { p.jobs = make(chan interface{}, maxJobs) p.wg.Add(1) p.workers.Start() go func() { defer p.wg.Done() for job := range p.jobs: go func(job Job) { // 从工作者池取一个工作者 worker := p.workers.Get() // 完成任务后返回给工作者池 defer p.workers.Put(worker) // 提交任务处理(异步) worker.AddJob(job) }(job) } }() } func (p *Service) Stop() { p.workers.Stop() close(p.jobs) p.wg.Wait() } // 提交任务 // 任务管道带较大的缓存, 延缓阻塞的时间 func (p *Service) AddJob(job interface{}) { p.jobs <- job } ``` 主程序可以是一个web服务器: ```go var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) func main() { service := NewService(MaxWorker, MaxQueue) service.Start() defer service.Stop() // 处理海量的任务 http.HandleFunc("/jobs", func(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Job以JSON格式提交 var jobs []Job err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&jobs) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // 处理任务 for _, job := range jobs { service.AddJob(job) } // OK w.WriteHeader(http.StatusOK) }) // 启动web服务 log.Fatal(http.ListenAndServe(":8080", nil)) } ``` 基于Go语言特有的管道和Goroutine特性,我们以非常简单的方式设计了一个针对海量请求的处理系统结构。在实际的系统中,用户可以根据任务的具体类型和特性,将管道定义为具体类型以避免接口等动态特性导致的开销。