From 3bd2d7ac2d28797cc38aecbb0502284e3a1a2c5f Mon Sep 17 00:00:00 2001 From: Xargin Date: Tue, 1 Jan 2019 19:48:10 +0800 Subject: [PATCH] fix ch6 --- ch6-cloud/ch6-01-dist-id.md | 6 +- ch6-cloud/ch6-02-lock.md | 88 +++++------------------ ch6-cloud/ch6-03-delay-job.md | 6 +- ch6-cloud/ch6-04-search-engine.md | 8 +-- ch6-cloud/ch6-05-load-balance.md | 14 ++-- ch6-cloud/ch6-07-crawler.md | 112 +++++++++++++----------------- 6 files changed, 80 insertions(+), 154 deletions(-) diff --git a/ch6-cloud/ch6-01-dist-id.md b/ch6-cloud/ch6-01-dist-id.md index 74761e3..87e30b3 100644 --- a/ch6-cloud/ch6-01-dist-id.md +++ b/ch6-cloud/ch6-01-dist-id.md @@ -47,7 +47,7 @@ mysql> select last_insert_id(); ### 6.1.2.1 标准 snowflake 实现 -`github.com/bwmarrin/snowflake` 是一个相当轻量化的snowflake的Go实现。其文档指出: +`github.com/bwmarrin/snowflake` 是一个相当轻量化的snowflake的Go实现。其文档对各位使用的定义见*图 6-2*所示。 ![ch6-snowflake-easy](../images/ch6-snowflake-easy.png) @@ -85,7 +85,7 @@ func main() { } ``` -当然,这个库也给我们留好了定制的后路: +当然,这个库也给我们留好了定制的后路,其中预留了一些可定制字段: ```go // Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC @@ -105,7 +105,7 @@ func main() { ### 6.1.2.2 sonyflake -sonyflake是Sony公司的一个开源项目,基本思路和snowflake差不多,不过位分配上稍有不同,见*图 6-2*: +sonyflake是Sony公司的一个开源项目,基本思路和snowflake差不多,不过位分配上稍有不同,见*图 6-3*: ![sonyflake](../images/ch6-snoyflake.png) diff --git a/ch6-cloud/ch6-02-lock.md b/ch6-cloud/ch6-02-lock.md index 68ec94a..079334f 100644 --- a/ch6-cloud/ch6-02-lock.md +++ b/ch6-cloud/ch6-02-lock.md @@ -1,6 +1,6 @@ # 6.2 分布式锁 -在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。为什么需要加锁呢?可以看看这段代码: +在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。为什么需要加锁呢?我们看看在不加锁的情况下并发计数会发生什么情况: ```go package main @@ -70,6 +70,10 @@ println(counter) ## 6.2.2 trylock +在某些场景,我们只是希望一个任务有单一的执行者。而不像计数器场景一样,所有goroutine都执行成功。后来的goroutine在抢锁失败后,需要放弃其流程。这时候就需要trylock了。 + +trylock顾名思义,尝试加锁,加锁成功执行后续流程,如果加锁失败的话也不会阻塞,而会直接返回加锁的结果。在Go语言中我们可以用大小为1的Channel来模拟trylock: + ```go package main @@ -129,13 +133,15 @@ func main() { } ``` -因为我们的逻辑限定每个goroutine只有成功执行了`Lock`才会继续执行后续逻辑,因此在`Unlock`时可以保证Lock结构体中的channel一定是空,从而不会阻塞,也不会失败。 +因为我们的逻辑限定每个goroutine只有成功执行了`Lock`才会继续执行后续逻辑,因此在`Unlock`时可以保证Lock结构体中的channel一定是空,从而不会阻塞,也不会失败。上面的代码使用了大小为1的channel来模拟trylock,理论上还可以使用标准库中的CAS来实现相同的功能且成本更低,读者可以自行尝试。 在单机系统中,trylock并不是一个好选择。因为大量的goroutine抢锁可能会导致CPU无意义的资源浪费。有一个专有名词用来描述这种抢锁的场景:活锁。 活锁指的是程序看起来在正常执行,但实际上CPU周期被浪费在抢锁,而非执行任务上,从而程序整体的执行效率低下。活锁的问题定位起来要麻烦很多。所以在单机场景下,不建议使用这种锁。 -## 6.2.3 基于 Redis 的 setnx +## 6.2.3 基于Redis的setnx + +在分布式场景下,我们也需要这种“抢占”的逻辑,这时候怎么办呢?我们可以使用Redis提供的`setnx`命令: ```go package main @@ -226,7 +232,7 @@ unlock success! 所以,我们需要依赖于这些请求到达Redis节点的顺序来做正确的抢锁操作。如果用户的网络环境比较差,那也只能自求多福了。 -## 6.2.4 基于 ZooKeeper +## 6.2.4 基于ZooKeeper ```go package main @@ -259,12 +265,14 @@ func main() { 基于ZooKeeper的锁与基于Redis的锁的不同之处在于Lock成功之前会一直阻塞,这与我们单机场景中的`mutex.Lock`很相似。 -其原理也是基于临时sequence节点和watch API,例如我们这里使用的是`/lock`节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致。如果一致,说明加锁成功了。 +其原理也是基于临时Sequence节点和watch API,例如我们这里使用的是`/lock`节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致。如果一致,说明加锁成功了。 这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景。按照Google的Chubby论文里的阐述,基于强一致协议的锁适用于`粗粒度`的加锁操作。这里的粗粒度指锁占用时间较长。我们在使用时也应思考在自己的业务场景中使用是否合适。 ## 6.2.5 基于 etcd +etcd是分布式系统中,功能上与ZooKeeper类似的组件,这两年越来越火了。上面基于ZooKeeper我们实现了分布式阻塞锁,基于etcd,也可以实现类似的功能: + ```go package main @@ -298,76 +306,14 @@ func main() { } ``` -etcd中没有像ZooKeeper那样的sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是: +etcd中没有像ZooKeeper那样的Sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是: 1. 先检查`/lock`路径下是否有值,如果有值,说明锁已经被别人抢了 2. 如果没有值,那么写入自己的值。写入成功返回,说明加锁成功。写入时如果节点被其它节点写入过了,那么会导致加锁失败,这时候到 3 3. watch `/lock`下的事件,此时陷入阻塞 4. 当`/lock`路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动unlock),或者过期事件(说明锁过期失效)。如果是的话,那么回到 1,走抢锁流程。 -## 6.2.6 Redlock - -```go -package main - -import ( - "fmt" - "time" - - "github.com/garyburd/redigo/redis" - "gopkg.in/redsync.v1" -) - -func newPool(server string) *redis.Pool { - return &redis.Pool{ - MaxIdle: 3, - IdleTimeout: 240 * time.Second, - - Dial: func() (redis.Conn, error) { - c, err := redis.Dial("tcp", server) - if err != nil { - return nil, err - } - return c, err - }, - - TestOnBorrow: func(c redis.Conn, t time.Time) error { - _, err := c.Do("PING") - return err - }, - } -} - -func newPools(servers []string) []redsync.Pool { - pools := []redsync.Pool{} - for _, server := range servers { - pool := newPool(server) - pools = append(pools, pool) - } - - return pools -} - -func main() { - pools := newPools([]string{ - "127.0.0.1:6379", "127.0.0.1:6378", "127.0.0.1:6377", - }) - rs := redsync.New(pools) - m := rs.NewMutex("/lock") - - err := m.Lock() - if err != nil { - panic(err) - } - fmt.Println("lock success") - unlockRes := m.Unlock() - fmt.Println("unlock result: ", unlockRes) -} -``` - -Redlock也是一种阻塞锁,单个节点操作对应的是`set nx px`命令,超过半数节点返回成功时,就认为加锁成功。 - -关于Redlock设计曾经在社区引起一场口水战,分布式专家各抒己见。不过这个不是我们要讨论的内容,相关链接在参考资料中给出。 +值得一提的是,在etcdv3的API中官方已经提供了可以直接使用的锁API,读者可以查阅etcd的文档做进一步的学习。 ## 6.2.7 如何选择 @@ -375,9 +321,7 @@ Redlock也是一种阻塞锁,单个节点操作对应的是`set nx px`命令 如果发展到了分布式服务阶段,但业务规模不大,qps很小的情况下,使用哪种锁方案都差不多。如果公司内已有可以使用的ZooKeeper、etcd或者Redis集群,那么就尽量在不引入新的技术栈的情况下满足业务需求。 -业务发展到一定量级的话,就需要从多方面来考虑了。首先是你的锁是否在任何恶劣的条件下都不允许数据丢失,如果不允许,那么就不要使用Redis的setnx的简单锁。 - -如果要使用Redlock,那么要考虑你们公司Redis的集群方案,是否可以直接把对应的Redis的实例的ip+port暴露给开发人员。如果不可以,那也没法用。 +业务发展到一定量级的话,就需要从多方面来考虑了。首先是你的锁是否在任何恶劣的条件下都不允许数据丢失,如果不允许,那么就不要使用Redis的`setnx`的简单锁。 对锁数据的可靠性要求极高的话,那只能使用etcd或者ZooKeeper这种通过一致性协议保证数据可靠性的锁方案。但可靠的背面往往都是较低的吞吐量和较高的延迟。需要根据业务的量级对其进行压力测试,以确保分布式锁所使用的etcd或ZooKeeper集群可以承受得住实际的业务请求压力。需要注意的是,etcd和Zookeeper集群是没有办法通过增加节点来提高其性能的。要对其进行横向扩展,只能增加搭建多个集群来支持更多的请求。这会进一步提高对运维和监控的要求。多个集群可能需要引入proxy,没有proxy那就需要业务去根据某个业务id来做分片。如果业务已经上线的情况下做扩展,还要考虑数据的动态迁移。这些都不是容易的事情。 diff --git a/ch6-cloud/ch6-03-delay-job.md b/ch6-cloud/ch6-03-delay-job.md index 4dc7f38..ba3a3cc 100644 --- a/ch6-cloud/ch6-03-delay-job.md +++ b/ch6-cloud/ch6-03-delay-job.md @@ -13,7 +13,7 @@ timer的实现在工业界已经是有解的问题了。常见的就是时间堆和时间轮。 -## 6.3.1 timer 实现 +## 6.3.1 timer实现 ### 6.3.1.1 时间堆 @@ -74,7 +74,7 @@ Go自身的timer就是用时间堆来实现的,不过并没有使用二叉堆 下面给出一种思路: -我们可以参考Elasticsearch的设计,每份任务数据都有多个副本,这里假设两副本: +我们可以参考Elasticsearch的设计,每份任务数据都有多个副本,这里假设两副本,如*图 6-8*所示: ![数据分布](../images/ch6-data-dist1.png) @@ -84,7 +84,7 @@ Go自身的timer就是用时间堆来实现的,不过并没有使用二叉堆 一个任务只会在持有主副本的节点上被执行。 -当有机器故障时,任务数据需要进行数据再平衡的工作,比如节点1挂了: +当有机器故障时,任务数据需要进行数据再平衡的工作,比如节点1挂了,见*图 6-9*。 ![数据分布2](../images/ch6-data-dist2.png) diff --git a/ch6-cloud/ch6-04-search-engine.md b/ch6-cloud/ch6-04-search-engine.md index 72f46ef..1e28011 100644 --- a/ch6-cloud/ch6-04-search-engine.md +++ b/ch6-cloud/ch6-04-search-engine.md @@ -6,9 +6,9 @@ > 在线交易处理(OLTP, Online transaction processing)是指透过信息系统、电脑网络及数据库,以线上交易的方式处理一般即时性的作业数据,和更早期传统数据库系统大量批量的作业方式并不相同。OLTP通常被运用于自动化的数据处理工作,如订单输入、金融业务…等反复性的日常性交易活动。和其相对的是属于决策分析层次的联机分析处理(OLAP)。 -在互联网的业务场景中,也有一些实时性要求不高(可以接受多秒的延迟),但是查询复杂性却很高的场景。举个例子,在电商的wms系统中,或者在大多数业务场景丰富的crm或者客服系统中,可能需要提供几十个字段的随意组合查询功能。这种系统的数据维度天生众多,比如一个电商的wms中对一件货物的描述,可能有下面这些字段: +在互联网的业务场景中,也有一些实时性要求不高(可以接受多秒的延迟),但是查询复杂性却很高的场景。举个例子,在电商的WMS系统中,或者在大多数业务场景丰富的CRM或者客服系统中,可能需要提供几十个字段的随意组合查询功能。这种系统的数据维度天生众多,比如一个电商的WMS中对一件货物的描述,可能有下面这些字段: -> 仓库id,入库时间,库位分区id,储存货架id,入库操作员id,出库操作员id,库存数量,过期时间,sku类型,产品品牌,产品分类,内件数量 +> 仓库id,入库时间,库位分区id,储存货架id,入库操作员id,出库操作员id,库存数量,过期时间,SKU类型,产品品牌,产品分类,内件数量 除了上述信息,如果商品在仓库内有流转。可能还有有关联的流程 id,当前的流转状态等等。 @@ -369,7 +369,7 @@ SQL的where部分就是boolean expression。我们之前提到过,这种bool *图 6-13 基于时间戳的数据同步* -这种同步方式与业务强绑定,例如wms系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从MySQL的出库单表中,把最近十分钟创建的所有出库单取出,批量存入es中,具体的逻辑实际上就是一条SQL: +这种同步方式与业务强绑定,例如WMS系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从MySQL的出库单表中,把最近十分钟创建的所有出库单取出,批量存入es中,具体的逻辑实际上就是一条SQL: ```sql select * from wms_orders where update_time >= date_sub(now(), interval 10 minute); @@ -393,6 +393,6 @@ select * from wms_orders where update_time >= date_sub( 业界使用较多的是阿里开源的Canal,来进行binlog解析与同步。canal会伪装成MySQL的从库,然后解析好行格式的binlog,再以更容易解析的格式(例如json)发送到消息队列。 -由下游的Kafka消费者负责把上游数据表的自增主键作为es的document的id进行写入,这样可以保证每次接收到binlog时,对应id的数据都被覆盖更新为最新。MySQL的row格式的binlog会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按id进行覆盖即可。 +由下游的Kafka消费者负责把上游数据表的自增主键作为es的文档的id进行写入,这样可以保证每次接收到binlog时,对应id的数据都被覆盖更新为最新。MySQL的Row格式的binlog会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按id进行覆盖即可。 这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键id来保证我们进入es的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。当然,你也可以为每一张需要的表去定制消费者的逻辑,这就不是通用系统讨论的范畴了。 diff --git a/ch6-cloud/ch6-05-load-balance.md b/ch6-cloud/ch6-05-load-balance.md index 90de9f1..de5802b 100644 --- a/ch6-cloud/ch6-05-load-balance.md +++ b/ch6-cloud/ch6-05-load-balance.md @@ -4,13 +4,13 @@ ## 6.5.1 常见的负载均衡思路 -如果我们不考虑均衡的话,现在有n个endpoint,我们完成业务流程实际上只需要从这n个中挑出其中的一个。有几种思路: +如果我们不考虑均衡的话,现在有n个服务节点,我们完成业务流程实际上只需要从这n个中挑出其中的一个。有几种思路: -1. 按顺序挑: 例如上次选了第一台,那么这次就选第二台,下次第三台,如果已经到了最后一台,那么下一次从第一台开始。这种情况下我们可以把endpoint都存储在数组中,每次请求完成下游之后,将一个索引后移即可。在移到尽头时再移回数组开头处。 +1. 按顺序挑: 例如上次选了第一台,那么这次就选第二台,下次第三台,如果已经到了最后一台,那么下一次从第一台开始。这种情况下我们可以把服务节点信息都存储在数组中,每次请求完成下游之后,将一个索引后移即可。在移到尽头时再移回数组开头处。 2. 随机挑一个: 每次都随机挑,真随机伪随机均可。假设选择第 x 台机器,那么x可描述为`rand.Intn()%n`。 -3. 根据某种权重,对下游endpoints进行排序,选择权重最大/小的那一个。 +3. 根据某种权重,对下游节点进行排序,选择权重最大/小的那一个。 当然了,实际场景我们不可能无脑轮询或者无脑随机,如果对下游请求失败了,我们还需要某种机制来进行重试,如果纯粹的随机算法,存在一定的可能性使你在下一次仍然随机到这次的问题节点。 @@ -18,7 +18,7 @@ ## 6.5.2 基于洗牌算法的负载均衡 -考虑到我们需要随机选取每次发送请求的endpoint,同时在遇到下游返回错误时换其它节点重试。所以我们设计一个大小和endpoints数组大小一致的索引数组,每次来新的请求,我们对索引数组做洗牌,然后取第一个元素作为选中的服务节点,如果请求失败,那么选择下一个节点重试,以此类推: +考虑到我们需要随机选取每次发送请求的节点,同时在遇到下游返回错误时换其它节点重试。所以我们设计一个大小和节点数组大小一致的索引数组,每次来新的请求,我们对索引数组做洗牌,然后取第一个元素作为选中的服务节点,如果请求失败,那么选择下一个节点重试,以此类推: ```go var endpoints = []string { @@ -75,9 +75,9 @@ func request(params map[string]interface{}) error { 2. 洗牌不均匀,会导致整个数组第一个节点有大概率被选中,并且多个节点的负载分布不均衡。 -第一点比较简单,应该不用在这里给出证明了。关于第二点,我们可以用概率知识来简单证明一下。假设每次挑选都是真随机,我们假设第一个位置的endpoint在`len(slice)`次交换中都不被选中的概率是`((6/7)*(6/7))^7 ≈ 0.34`。而分布均匀的情况下,我们肯定希望被第一个元素在任意位置上分布的概率均等,所以其被随机选到的概率应该约等于`1/7≈0.14`。 +第一点比较简单,应该不用在这里给出证明了。关于第二点,我们可以用概率知识来简单证明一下。假设每次挑选都是真随机,我们假设第一个位置的节点在`len(slice)`次交换中都不被选中的概率是`((6/7)*(6/7))^7 ≈ 0.34`。而分布均匀的情况下,我们肯定希望被第一个元素在任意位置上分布的概率均等,所以其被随机选到的概率应该约等于`1/7≈0.14`。 -显然,这里给出的洗牌算法对于任意位置的元素来说,有30%的概率不对其进行交换操作。所以所有元素都倾向于留在原来的位置。因为我们每次对`shuffle`数组输入的都是同一个序列,所以第一个元素有更大的概率会被选中。在负载均衡的场景下,也就意味着endpoints数组中的第一台机器负载会比其它机器高不少(这里至少是3倍以上)。 +显然,这里给出的洗牌算法对于任意位置的元素来说,有30%的概率不对其进行交换操作。所以所有元素都倾向于留在原来的位置。因为我们每次对`shuffle`数组输入的都是同一个序列,所以第一个元素有更大的概率会被选中。在负载均衡的场景下,也就意味着节点数组中的第一台机器负载会比其它机器高不少(这里至少是3倍以上)。 ### 6.5.2.2 修正洗牌算法 @@ -108,7 +108,7 @@ func shuffle(n int) []int { 本节中的场景是从N个节点中选择一个节点发送请求,初始请求结束之后,后续的请求会重新对数组洗牌,所以每两个请求之间没有什么关联关系。因此我们上面的洗牌算法,理论上不初始化随机库的种子也是不会出什么问题的。 -但在一些特殊的场景下,例如使用ZooKeeper时,客户端初始化从多个服务节点中挑选一个节点后,是会向该节点建立长连接的。并且之后如果有请求,也都会发送到该节点去。直到该节点不可用,才会在endpoints列表中挑选下一个节点。在这种场景下,我们的初始连接节点选择就要求必须是“真”随机了。否则,所有客户端起动时,都会去连接同一个ZooKeeper的实例,根本无法起到负载均衡的目的。如果在日常开发中,你的业务也是类似的场景,也务必考虑一下是否会发生类似的情况。为rand库设置种子的方法: +但在一些特殊的场景下,例如使用ZooKeeper时,客户端初始化从多个服务节点中挑选一个节点后,是会向该节点建立长连接的。之后客户端请求都会发往该节点去。直到该节点不可用,才会在节点列表中挑选下一个节点。在这种场景下,我们的初始连接节点选择就要求必须是“真”随机了。否则,所有客户端起动时,都会去连接同一个ZooKeeper的实例,根本无法起到负载均衡的目的。如果在日常开发中,你的业务也是类似的场景,也务必考虑一下是否会发生类似的情况。为rand库设置种子的方法: ```go rand.Seed(time.Now().UnixNano()) diff --git a/ch6-cloud/ch6-07-crawler.md b/ch6-cloud/ch6-07-crawler.md index 7bf5b80..4edd192 100644 --- a/ch6-cloud/ch6-07-crawler.md +++ b/ch6-cloud/ch6-07-crawler.md @@ -8,9 +8,7 @@ ## 基于colly的单机爬虫 -有很多程序员比较喜欢在v2ex上讨论问题,发表观点,有时候可能懒癌发作,我们希望能直接命令行爬到v2ex在Go tag下的新贴,只要简单写一个爬虫即可。 - -《Go 语言编程》一书给出了简单的爬虫示例,经过了多年的发展,现在使用Go语言写一个网站的爬虫要更加方便,比如用colly来实现爬取v2ex前十页内容: +《Go 语言编程》一书给出了简单的爬虫示例,经过了多年的发展,现在使用Go语言写一个网站的爬虫要更加方便,比如用colly来实现爬取某网站(虚拟站点,这里用abcdefg作为占位符)在Go语言标签下的前十页内容: ```go package main @@ -28,14 +26,16 @@ var visited = map[string]bool{} func main() { // Instantiate default collector c := colly.NewCollector( - colly.AllowedDomains("www.v2ex.com"), + colly.AllowedDomains("www.abcdefg.com"), colly.MaxDepth(1), ) + // 我们认为匹配该模式的是该网站的详情页 detailRegex, _ := regexp.Compile(`/go/go\?p=\d+$`) + // 匹配下面模式的是该网站的列表页 listRegex, _ := regexp.Compile(`/t/\d+#\w+`) - // On every a element which has href attribute call callback + // 所有a标签,上设置回调函数 c.OnHTML("a[href]", func(e *colly.HTMLElement) { link := e.Attr("href") @@ -44,13 +44,15 @@ func main() { return } - // 匹配下列两种 url 模式的,才去 visit - // https://www.v2ex.com/go/go?p=2 - // https://www.v2ex.com/t/472945#reply3 + // 既不是列表页,也不是详情页 + // 那么不是我们关心的内容,要跳过 if !detailRegex.Match([]byte(link)) && !listRegex.Match([]byte(link)) { println("not match", link) return } + + // 因为大多数网站有反爬虫策略 + // 所以爬虫逻辑中应该有 sleep 逻辑以避免被封杀 time.Sleep(time.Second) println("match", link) @@ -60,10 +62,8 @@ func main() { c.Visit(e.Request.AbsoluteURL(link)) }) - err := c.Visit("https://www.v2ex.com/go/go") - if err != nil { - fmt.Println(err) - } + err := c.Visit("https://www.abcdefg.com/go/go") + if err != nil {fmt.Println(err)} } ``` @@ -112,10 +112,7 @@ nats的服务端项目是gnatsd,客户端与gnatsd的通信方式为基于tcp ```go nc, err := nats.Connect(nats.DefaultURL) -if err != nil { - // log error - return -} +if err != nil {return} // 指定 subject 为 tasks,消息内容随意 err = nc.Publish("tasks", []byte("your task content")) @@ -131,27 +128,18 @@ nc.Flush() ```go nc, err := nats.Connect(nats.DefaultURL) -if err != nil { - // log error - return -} +if err != nil {return} // queue subscribe 相当于在消费者之间进行任务分发的分支均衡 // 前提是所有消费者都使用 workers 这个 queue // nats 中的 queue 概念上类似于 Kafka 中的 consumer group sub, err := nc.QueueSubscribeSync("tasks", "workers") -if err != nil { - // log error - return -} +if err != nil {return} var msg *nats.Msg for { msg, err = sub.NextMsg(time.Hour * 10000) - if err != nil { - // log error - break - } + if err != nil {break} // 正确地消费到了消息 // 可用 nats.Msg 对象处理任务 } @@ -159,7 +147,7 @@ for { #### 结合colly的消息生产 -我们为每一个网站定制一个对应的collector,并设置相应的规则,比如v2ex,v2fx(虚构的),再用简单的工厂方法来将该collector和其host对应起来: +我们为每一个网站定制一个对应的collector,并设置相应的规则,比如abcdefg,hijklmn(虚构的),再用简单的工厂方法来将该collector和其host对应起来,每个站点爬到列表页之后,需要在当前程序中把所有链接解析出来,并把落地页的URL发往消息队列。 ```go package main @@ -181,9 +169,9 @@ func factory(urlStr string) *colly.Collector { return domain2Collector[u.Host] } -func initV2exCollector() *colly.Collector { +func initABCDECollector() *colly.Collector { c := colly.NewCollector( - colly.AllowedDomains("www.v2ex.com"), + colly.AllowedDomains("www.abcdefg.com"), colly.MaxDepth(maxDepth), ) @@ -194,18 +182,25 @@ func initV2exCollector() *colly.Collector { c.OnHTML("a[href]", func(e *colly.HTMLElement) { // 基本的反爬虫策略 + link := e.Attr("href") time.Sleep(time.Second * 2) - // TODO, 正则 match 列表页的话,就 visit - // TODO, 正则 match 落地页的话,就发消息队列 - c.Visit(e.Request.AbsoluteURL(link)) + // 正则 match 列表页的话,就 visit + if listRegex.Match([]byte(link)) { + c.Visit(e.Request.AbsoluteURL(link)) + } + // 正则 match 落地页的话,就发消息队列 + if detailRegex.Match([]byte(link)) { + err = nc.Publish("tasks", []byte(link)) + nc.Flush() + } }) return c } -func initV2fxCollector() *colly.Collector { +func initHIJKLCollector() *colly.Collector { c := colly.NewCollector( - colly.AllowedDomains("www.v2fx.com"), + colly.AllowedDomains("www.hijklmn.com"), colly.MaxDepth(maxDepth), ) @@ -216,19 +211,16 @@ func initV2fxCollector() *colly.Collector { } func init() { - domain2Collector["www.v2ex.com"] = initV2exCollector() - domain2Collector["www.v2fx.com"] = initV2fxCollector() + domain2Collector["www.abcdefg.com"] = initV2exCollector() + domain2Collector["www.hijklmn.com"] = initV2fxCollector() var err error nc, err = nats.Connect(natsURL) - if err != nil { - // log fatal - os.Exit(1) - } + if err != nil {os.Exit(1)} } func main() { - urls := []string{"https://www.v2ex.com", "https://www.v2fx.com"} + urls := []string{"https://www.abcdefg.com", "https://www.hijklmn.com"} for _, url := range urls { instance := factory(url) instance.Visit(url) @@ -239,6 +231,8 @@ func main() { #### 结合 colly 的消息消费 +消费端就简单一些了,我们只需要订阅对应的主题,并直接访问网站的详情页(落地页)即可。 + ```go package main @@ -261,60 +255,48 @@ func factory(urlStr string) *colly.Collector { func initV2exCollector() *colly.Collector { c := colly.NewCollector( - colly.AllowedDomains("www.v2ex.com"), + colly.AllowedDomains("www.abcdefg.com"), colly.MaxDepth(maxDepth), ) - return c } func initV2fxCollector() *colly.Collector { c := colly.NewCollector( - colly.AllowedDomains("www.v2fx.com"), + colly.AllowedDomains("www.hijklmn.com"), colly.MaxDepth(maxDepth), ) - return c } func init() { - domain2Collector["www.v2ex.com"] = initV2exCollector() - domain2Collector["www.v2fx.com"] = initV2fxCollector() + domain2Collector["www.abcdefg.com"] = initABCDECollector() + domain2Collector["www.hijklmn.com"] = initHIJKLCollector() var err error nc, err = nats.Connect(natsURL) - if err != nil { - // log fatal - os.Exit(1) - } + if err != nil {os.Exit(1)} } func startConsumer() { nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - // log error - return - } + if err != nil {return} sub, err := nc.QueueSubscribeSync("tasks", "workers") - if err != nil { - // log error - return - } + if err != nil {return} var msg *nats.Msg for { msg, err = sub.NextMsg(time.Hour * 10000) - if err != nil { - // log error - break - } + if err != nil {break} urlStr := string(msg.Data) ins := factory(urlStr) // 因为最下游拿到的一定是对应网站的落地页 // 所以不用进行多余的判断了,直接爬内容即可 ins.Visit(urlStr) + // 防止被封杀 + time.Sleep(time.Second) } }