mirror of
https://github.com/chai2010/advanced-go-programming-book.git
synced 2025-05-24 04:22:22 +00:00
update cr
This commit is contained in:
parent
63090d45ab
commit
cf95d2b714
@ -60,16 +60,6 @@ func main() {
|
|||||||
c.Visit(e.Request.AbsoluteURL(link))
|
c.Visit(e.Request.AbsoluteURL(link))
|
||||||
})
|
})
|
||||||
|
|
||||||
// Before making a request
|
|
||||||
c.OnRequest(func(r *colly.Request) {
|
|
||||||
/*
|
|
||||||
r.Headers.Set("Cookie", "")
|
|
||||||
r.Headers.Set("DNT", "1")
|
|
||||||
r.Headers.Set("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36")
|
|
||||||
r.Headers.Set("Host", "www.v2ex.com")
|
|
||||||
*/
|
|
||||||
})
|
|
||||||
|
|
||||||
err := c.Visit("https://www.v2ex.com/go/go")
|
err := c.Visit("https://www.v2ex.com/go/go")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
@ -101,97 +91,67 @@ nats 是 Go 实现的一个高性能分布式消息队列,适用于高并发
|
|||||||
|
|
||||||
nats 的服务端项目是 gnatsd,客户端与 gnatsd 的通信方式为基于 tcp 的文本协议,非常简单:
|
nats 的服务端项目是 gnatsd,客户端与 gnatsd 的通信方式为基于 tcp 的文本协议,非常简单:
|
||||||
|
|
||||||
TODO,用图画协议
|
向 subject 为 task 发消息:
|
||||||
|
|
||||||
```shell
|

|
||||||
~ ❯❯❯ telnet localhost 4222
|
|
||||||
Trying 127.0.0.1...
|
|
||||||
Connected to localhost.
|
|
||||||
Escape character is '^]'.
|
|
||||||
INFO {"server_id":"MNbZZUS4Ed5tvSaSRHyZS1","version":"1.3.0","proto":1,"go":"go1.10.3","host":"0.0.0.0","port":4222,"max_payload":1048576,"client_id":31}
|
|
||||||
sub foo 127.0.0.1
|
|
||||||
+OK
|
|
||||||
pub foo 2
|
|
||||||
hi
|
|
||||||
+OK
|
|
||||||
MSG foo 127.0.0.1 2
|
|
||||||
hi
|
|
||||||
pub foo 11
|
|
||||||
hello world
|
|
||||||
+OK
|
|
||||||
MSG foo 127.0.0.1 11
|
|
||||||
hello world
|
|
||||||
```
|
|
||||||
|
|
||||||
#### 消息生产
|
以 workers 的 queue 从 tasks subject 订阅消息:
|
||||||
|
|
||||||
|

|
||||||
|
|
||||||
|
其中的 queue 参数是可选的,如果希望在分布式的消费端进行任务的负载均衡,而不是所有人都收到同样的消息,那么就要给消费端指定相同的 queue 名字。
|
||||||
|
|
||||||
|
#### 基本消息生产
|
||||||
|
|
||||||
生产消息只要指定 subject 即可:
|
生产消息只要指定 subject 即可:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
package main
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
if err != nil {
|
||||||
import (
|
// log error
|
||||||
"fmt"
|
|
||||||
|
|
||||||
nats "github.com/nats-io/go-nats"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
nc, err := nats.Connect(nats.DefaultURL)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
return
|
return
|
||||||
}
|
|
||||||
|
|
||||||
// use colly to extract the url on the page
|
|
||||||
err = nc.Publish("tasks", []byte("start from here"))
|
|
||||||
if err != nil {
|
|
||||||
// log error, retry
|
|
||||||
}
|
|
||||||
// 要加了 flush 对面才能收到,看来内部也有缓冲区
|
|
||||||
nc.Flush()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 指定 subject 为 tasks,消息内容随意
|
||||||
|
err = nc.Publish("tasks", []byte("your task content"))
|
||||||
|
|
||||||
|
nc.Flush()
|
||||||
```
|
```
|
||||||
|
|
||||||
#### 消息消费
|
#### 基本消息消费
|
||||||
|
|
||||||
直接使用 nats 的 subscribe api 并不能达到任务分发的目的,因为 pub sub 本身是广播性质的。所有消费者都会收到完全一样的所有消息。
|
直接使用 nats 的 subscribe api 并不能达到任务分发的目的,因为 pub sub 本身是广播性质的。所有消费者都会收到完全一样的所有消息。
|
||||||
|
|
||||||
除了普通的 subscribe 之外,nats 还提供了 queue subscribe 的功能。只要提供一个 queue group 名字(类似 kafka 中的 consumer group),即可均衡地将任务分发给消费者。
|
除了普通的 subscribe 之外,nats 还提供了 queue subscribe 的功能。只要提供一个 queue group 名字(类似 kafka 中的 consumer group),即可均衡地将任务分发给消费者。
|
||||||
|
|
||||||
```go
|
```go
|
||||||
package main
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
if err != nil {
|
||||||
import (
|
// log error
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
nats "github.com/nats-io/go-nats"
|
|
||||||
)
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
nc, err := nats.Connect(nats.DefaultURL)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sub, err := nc.QueueSubscribeSync("tasks", "workers")
|
// queue subscribe 相当于在消费者之间进行任务分发的分支均衡
|
||||||
if err != nil {
|
// 前提是所有消费者都使用 workers 这个 queue
|
||||||
fmt.Println(err)
|
// nats 中的 queue 概念上类似于 kafka 中的 consumer group
|
||||||
|
sub, err := nc.QueueSubscribeSync("tasks", "workers")
|
||||||
|
if err != nil {
|
||||||
|
// log error
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var msg *nats.Msg
|
var msg *nats.Msg
|
||||||
for {
|
for {
|
||||||
msg, err = sub.NextMsg(time.Hour * 10000)
|
msg, err = sub.NextMsg(time.Hour * 10000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// log error
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fmt.Println(string(msg.Data), msg.Reply, msg.Sub, msg.Subject, err)
|
// 正确地消费到了消息
|
||||||
}
|
// 可用 nats.Msg 对象处理任务
|
||||||
nc.Flush()
|
|
||||||
sub.Unsubscribe()
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### 结合 colly 的消息生产
|
||||||
|
|
||||||
|
#### 结合 colly 的消息消费
|
||||||
|
BIN
images/ch6-09-nats-protocol-pub.png
Normal file
BIN
images/ch6-09-nats-protocol-pub.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 20 KiB |
BIN
images/ch6-09-nats-protocol-sub.png
Normal file
BIN
images/ch6-09-nats-protocol-sub.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 15 KiB |
Loading…
x
Reference in New Issue
Block a user