mirror of
https://github.com/chai2010/advanced-go-programming-book.git
synced 2025-05-24 12:32:21 +00:00
finish cr
This commit is contained in:
parent
d3d008ee1c
commit
4f1868fbd9
@ -154,6 +154,8 @@ for {
|
|||||||
|
|
||||||
#### 结合 colly 的消息生产
|
#### 结合 colly 的消息生产
|
||||||
|
|
||||||
|
我们为每一个网站定制一个对应的 collector,并设置相应的规则,比如 v2ex,v2fx(虚构的),再用简单的工厂方法来将该 collector 和其 host 对应起来:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
package main
|
package main
|
||||||
|
|
||||||
@ -188,6 +190,9 @@ func initV2exCollector() *colly.Collector {
|
|||||||
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
|
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
|
||||||
// 基本的反爬虫策略
|
// 基本的反爬虫策略
|
||||||
time.Sleep(time.Second * 2)
|
time.Sleep(time.Second * 2)
|
||||||
|
|
||||||
|
// TODO, 正则 match 列表页的话,就 visit
|
||||||
|
// TODO, 正则 match 落地页的话,就发消息队列
|
||||||
c.Visit(e.Request.AbsoluteURL(link))
|
c.Visit(e.Request.AbsoluteURL(link))
|
||||||
})
|
})
|
||||||
return c
|
return c
|
||||||
@ -228,3 +233,92 @@ func main() {
|
|||||||
```
|
```
|
||||||
|
|
||||||
#### 结合 colly 的消息消费
|
#### 结合 colly 的消息消费
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"github.com/gocolly/colly"
|
||||||
|
)
|
||||||
|
|
||||||
|
var domain2Collector = map[string]*colly.Collector{}
|
||||||
|
var nc *nats.Conn
|
||||||
|
var maxDepth = 10
|
||||||
|
var natsURL = "nats://localhost:4222"
|
||||||
|
|
||||||
|
func factory(urlStr string) *colly.Collector {
|
||||||
|
u, _ := url.Parse(urlStr)
|
||||||
|
return domain2Collector[u.Host]
|
||||||
|
}
|
||||||
|
|
||||||
|
func initV2exCollector() *colly.Collector {
|
||||||
|
c := colly.NewCollector(
|
||||||
|
colly.AllowedDomains("www.v2ex.com"),
|
||||||
|
colly.MaxDepth(maxDepth),
|
||||||
|
)
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func initV2fxCollector() *colly.Collector {
|
||||||
|
c := colly.NewCollector(
|
||||||
|
colly.AllowedDomains("www.v2fx.com"),
|
||||||
|
colly.MaxDepth(maxDepth),
|
||||||
|
)
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
domain2Collector["www.v2ex.com"] = initV2exCollector()
|
||||||
|
domain2Collector["www.v2fx.com"] = initV2fxCollector()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
nc, err = nats.Connect(natsURL)
|
||||||
|
if err != nil {
|
||||||
|
// log fatal
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func startConsumer() {
|
||||||
|
nc, err := nats.Connect(nats.DefaultURL)
|
||||||
|
if err != nil {
|
||||||
|
// log error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sub, err := nc.QueueSubscribeSync("tasks", "workers")
|
||||||
|
if err != nil {
|
||||||
|
// log error
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var msg *nats.Msg
|
||||||
|
for {
|
||||||
|
msg, err = sub.NextMsg(time.Hour * 10000)
|
||||||
|
if err != nil {
|
||||||
|
// log error
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
urlStr := string(msg.Data)
|
||||||
|
ins := factory(urlStr)
|
||||||
|
// 因为最下游拿到的一定是对应网站的落地页
|
||||||
|
// 所以不用进行多余的判断了,直接爬内容即可
|
||||||
|
ins.Visit(urlStr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
startConsumer()
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
从代码层面上来讲,这里的生产者和消费者其实本质上差不多。如果日后我们要灵活地支持增加、减少各种网站的爬取的话,应该思考如何将这些爬虫的策略、参数尽量地配置化。
|
||||||
|
|
||||||
|
在本章的分布式配置一节中已经讲了一些配置系统的使用,读者可以自行进行尝试,这里就不再赘述了。
|
||||||
|
Loading…
x
Reference in New Issue
Block a user