From 75cafbe4956a8d136284d9083c6b2bb4b5992afb Mon Sep 17 00:00:00 2001 From: Xargin Date: Fri, 31 Aug 2018 15:26:08 +0800 Subject: [PATCH] add nats demo --- ch6-cloud/ch6-09-crawler.md | 69 +++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 2 deletions(-) diff --git a/ch6-cloud/ch6-09-crawler.md b/ch6-cloud/ch6-09-crawler.md index 953753a..661f6b3 100644 --- a/ch6-cloud/ch6-09-crawler.md +++ b/ch6-cloud/ch6-09-crawler.md @@ -95,6 +95,71 @@ func main() { 本节我们来简单实现一个基于消息队列的爬虫,为了演示方便,我们暂时用 redis 的 list 结构来作为消息队列。实际使用时,应该针对自己的业务对消息本身的可靠性要求和公司的基础架构组件情况进行选型。 -### 消息生产 +### nats 简介 -### 消息消费 +#### 消息生产 + +```go +package main + +import ( + "fmt" + + nats "github.com/nats-io/go-nats" +) + +func main() { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + fmt.Println(err) + return + } + + // use colly to extract the url on the page + err = nc.Publish("tasks", []byte("start from here")) + if err != nil { + // log error, retry + } + // 要加了 flush 对面才能收到,看来内部也有缓冲区 + nc.Flush() +} + +``` + +#### 消息消费 + +```go +package main + +import ( + "fmt" + "time" + + nats "github.com/nats-io/go-nats" +) + +func main() { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + fmt.Println(err) + return + } + + sub, err := nc.QueueSubscribeSync("tasks", "workers") + if err != nil { + fmt.Println(err) + return + } + + var msg *nats.Msg + for { + msg, err = sub.NextMsg(time.Hour * 10000) + if err != nil { + break + } + fmt.Println(string(msg.Data), msg.Reply, msg.Sub, msg.Subject, err) + } + nc.Flush() + sub.Unsubscribe() +} +```