From 4f1868fbd9a154c9ee3747d5c6106c624f6ae35d Mon Sep 17 00:00:00 2001 From: Xargin Date: Sat, 1 Sep 2018 16:49:40 +0800 Subject: [PATCH] finish cr --- ch6-cloud/ch6-09-crawler.md | 98 ++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/ch6-cloud/ch6-09-crawler.md b/ch6-cloud/ch6-09-crawler.md index 5f7ccee..6b0a72a 100644 --- a/ch6-cloud/ch6-09-crawler.md +++ b/ch6-cloud/ch6-09-crawler.md @@ -154,6 +154,8 @@ for { #### 结合 colly 的消息生产 +我们为每一个网站定制一个对应的 collector,并设置相应的规则,比如 v2ex,v2fx(虚构的),再用简单的工厂方法来将该 collector 和其 host 对应起来: + ```go package main @@ -187,8 +189,11 @@ func initV2exCollector() *colly.Collector { c.OnHTML("a[href]", func(e *colly.HTMLElement) { // 基本的反爬虫策略 - time.Sleep(time.Second * 2) - c.Visit(e.Request.AbsoluteURL(link)) + time.Sleep(time.Second * 2) + + // TODO, 正则 match 列表页的话,就 visit + // TODO, 正则 match 落地页的话,就发消息队列 + c.Visit(e.Request.AbsoluteURL(link)) }) return c } @@ -228,3 +233,92 @@ func main() { ``` #### 结合 colly 的消息消费 + +```go +package main + +import ( + "fmt" + "net/url" + + "github.com/gocolly/colly" +) + +var domain2Collector = map[string]*colly.Collector{} +var nc *nats.Conn +var maxDepth = 10 +var natsURL = "nats://localhost:4222" + +func factory(urlStr string) *colly.Collector { + u, _ := url.Parse(urlStr) + return domain2Collector[u.Host] +} + +func initV2exCollector() *colly.Collector { + c := colly.NewCollector( + colly.AllowedDomains("www.v2ex.com"), + colly.MaxDepth(maxDepth), + ) + + return c +} + +func initV2fxCollector() *colly.Collector { + c := colly.NewCollector( + colly.AllowedDomains("www.v2fx.com"), + colly.MaxDepth(maxDepth), + ) + + return c +} + +func init() { + domain2Collector["www.v2ex.com"] = initV2exCollector() + domain2Collector["www.v2fx.com"] = initV2fxCollector() + + var err error + nc, err = nats.Connect(natsURL) + if err != nil { + // log fatal + os.Exit(1) + } +} + +func startConsumer() { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + // log error + return + } + + sub, err := nc.QueueSubscribeSync("tasks", "workers") + if err != nil { + // log error + return + } + + var msg *nats.Msg + for { + msg, err = sub.NextMsg(time.Hour * 10000) + if err != nil { + // log error + break + } + + urlStr := string(msg.Data) + ins := factory(urlStr) + // 因为最下游拿到的一定是对应网站的落地页 + // 所以不用进行多余的判断了,直接爬内容即可 + ins.Visit(urlStr) + } +} + +func main() { + startConsumer() +} + +``` + +从代码层面上来讲,这里的生产者和消费者其实本质上差不多。如果日后我们要灵活地支持增加、减少各种网站的爬取的话,应该思考如何将这些爬虫的策略、参数尽量地配置化。 + +在本章的分布式配置一节中已经讲了一些配置系统的使用,读者可以自行进行尝试,这里就不再赘述了。