diff --git a/ch6-cloud/ch6-09-crawler.md b/ch6-cloud/ch6-09-crawler.md index 47eeb5c..023dc0e 100644 --- a/ch6-cloud/ch6-09-crawler.md +++ b/ch6-cloud/ch6-09-crawler.md @@ -60,16 +60,6 @@ func main() { c.Visit(e.Request.AbsoluteURL(link)) }) - // Before making a request - c.OnRequest(func(r *colly.Request) { - /* - r.Headers.Set("Cookie", "") - r.Headers.Set("DNT", "1") - r.Headers.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36") - r.Headers.Set("Host", "www.v2ex.com") - */ - }) - err := c.Visit("https://www.v2ex.com/go/go") if err != nil { fmt.Println(err) @@ -101,97 +91,67 @@ nats 是 Go 实现的一个高性能分布式消息队列,适用于高并发 nats 的服务端项目是 gnatsd,客户端与 gnatsd 的通信方式为基于 tcp 的文本协议,非常简单: -TODO,用图画协议 +向 subject 为 task 发消息: -```shell -~ ❯❯❯ telnet localhost 4222 -Trying 127.0.0.1... -Connected to localhost. -Escape character is '^]'. -INFO {"server_id":"MNbZZUS4Ed5tvSaSRHyZS1","version":"1.3.0","proto":1,"go":"go1.10.3","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":31} -sub foo 127.0.0.1 -+OK -pub foo 2 -hi -+OK -MSG foo 127.0.0.1 2 -hi -pub foo 11 -hello world -+OK -MSG foo 127.0.0.1 11 -hello world -``` +![nats-protocol-pub](../images/ch6-09-nats-protocol-pub.png) -#### 消息生产 +以 workers 的 queue 从 tasks subject 订阅消息: + +![nats-protocol-sub](../images/ch6-09-nats-protocol-sub.png) + +其中的 queue 参数是可选的,如果希望在分布式的消费端进行任务的负载均衡,而不是所有人都收到同样的消息,那么就要给消费端指定相同的 queue 名字。 + +#### 基本消息生产 生产消息只要指定 subject 即可: ```go -package main - -import ( - "fmt" - - nats "github.com/nats-io/go-nats" -) - -func main() { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - fmt.Println(err) - return - } - - // use colly to extract the url on the page - err = nc.Publish("tasks", []byte("start from here")) - if err != nil { - // log error, retry - } - // 要加了 flush 对面才能收到,看来内部也有缓冲区 - nc.Flush() +nc, err := nats.Connect(nats.DefaultURL) +if err != nil { + // log error + return } +// 指定 subject 为 tasks,消息内容随意 +err = nc.Publish("tasks", []byte("your task content")) + +nc.Flush() ``` -#### 消息消费 +#### 基本消息消费 直接使用 nats 的 subscribe api 并不能达到任务分发的目的,因为 pub sub 本身是广播性质的。所有消费者都会收到完全一样的所有消息。 除了普通的 subscribe 之外,nats 还提供了 queue subscribe 的功能。只要提供一个 queue group 名字(类似 kafka 中的 consumer group),即可均衡地将任务分发给消费者。 ```go -package main +nc, err := nats.Connect(nats.DefaultURL) +if err != nil { + // log error + return +} -import ( - "fmt" - "time" +// queue subscribe 相当于在消费者之间进行任务分发的分支均衡 +// 前提是所有消费者都使用 workers 这个 queue +// nats 中的 queue 概念上类似于 kafka 中的 consumer group +sub, err := nc.QueueSubscribeSync("tasks", "workers") +if err != nil { + // log error + return +} - nats "github.com/nats-io/go-nats" -) - -func main() { - nc, err := nats.Connect(nats.DefaultURL) +var msg *nats.Msg +for { + msg, err = sub.NextMsg(time.Hour * 10000) if err != nil { - fmt.Println(err) - return + // log error + break } - - sub, err := nc.QueueSubscribeSync("tasks", "workers") - if err != nil { - fmt.Println(err) - return - } - - var msg *nats.Msg - for { - msg, err = sub.NextMsg(time.Hour * 10000) - if err != nil { - break - } - fmt.Println(string(msg.Data), msg.Reply, msg.Sub, msg.Subject, err) - } - nc.Flush() - sub.Unsubscribe() + // 正确地消费到了消息 + // 可用 nats.Msg 对象处理任务 } ``` + +#### 结合 colly 的消息生产 + +#### 结合 colly 的消息消费 diff --git a/images/ch6-09-nats-protocol-pub.png b/images/ch6-09-nats-protocol-pub.png new file mode 100644 index 0000000..14d931e Binary files /dev/null and b/images/ch6-09-nats-protocol-pub.png differ diff --git a/images/ch6-09-nats-protocol-sub.png b/images/ch6-09-nats-protocol-sub.png new file mode 100644 index 0000000..2721c29 Binary files /dev/null and b/images/ch6-09-nats-protocol-sub.png differ