mirror of
https://github.com/chai2010/advanced-go-programming-book.git
synced 2025-05-24 04:16:01 +00:00
fix ch6
This commit is contained in:
parent
16a588e572
commit
3bd2d7ac2d
@ -47,7 +47,7 @@ mysql> select last_insert_id();
|
||||
|
||||
### 6.1.2.1 标准 snowflake 实现
|
||||
|
||||
`github.com/bwmarrin/snowflake` 是一个相当轻量化的snowflake的Go实现。其文档指出:
|
||||
`github.com/bwmarrin/snowflake` 是一个相当轻量化的snowflake的Go实现。其文档对各位使用的定义见*图 6-2*所示。
|
||||
|
||||

|
||||
|
||||
@ -85,7 +85,7 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
当然,这个库也给我们留好了定制的后路:
|
||||
当然,这个库也给我们留好了定制的后路,其中预留了一些可定制字段:
|
||||
|
||||
```go
|
||||
// Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC
|
||||
@ -105,7 +105,7 @@ func main() {
|
||||
|
||||
### 6.1.2.2 sonyflake
|
||||
|
||||
sonyflake是Sony公司的一个开源项目,基本思路和snowflake差不多,不过位分配上稍有不同,见*图 6-2*:
|
||||
sonyflake是Sony公司的一个开源项目,基本思路和snowflake差不多,不过位分配上稍有不同,见*图 6-3*:
|
||||
|
||||

|
||||
|
||||
|
@ -1,6 +1,6 @@
|
||||
# 6.2 分布式锁
|
||||
|
||||
在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。为什么需要加锁呢?可以看看这段代码:
|
||||
在单机程序并发或并行修改全局变量时,需要对修改行为加锁以创造临界区。为什么需要加锁呢?我们看看在不加锁的情况下并发计数会发生什么情况:
|
||||
|
||||
```go
|
||||
package main
|
||||
@ -70,6 +70,10 @@ println(counter)
|
||||
|
||||
## 6.2.2 trylock
|
||||
|
||||
在某些场景,我们只是希望一个任务有单一的执行者。而不像计数器场景一样,所有goroutine都执行成功。后来的goroutine在抢锁失败后,需要放弃其流程。这时候就需要trylock了。
|
||||
|
||||
trylock顾名思义,尝试加锁,加锁成功执行后续流程,如果加锁失败的话也不会阻塞,而会直接返回加锁的结果。在Go语言中我们可以用大小为1的Channel来模拟trylock:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
@ -129,13 +133,15 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
因为我们的逻辑限定每个goroutine只有成功执行了`Lock`才会继续执行后续逻辑,因此在`Unlock`时可以保证Lock结构体中的channel一定是空,从而不会阻塞,也不会失败。
|
||||
因为我们的逻辑限定每个goroutine只有成功执行了`Lock`才会继续执行后续逻辑,因此在`Unlock`时可以保证Lock结构体中的channel一定是空,从而不会阻塞,也不会失败。上面的代码使用了大小为1的channel来模拟trylock,理论上还可以使用标准库中的CAS来实现相同的功能且成本更低,读者可以自行尝试。
|
||||
|
||||
在单机系统中,trylock并不是一个好选择。因为大量的goroutine抢锁可能会导致CPU无意义的资源浪费。有一个专有名词用来描述这种抢锁的场景:活锁。
|
||||
|
||||
活锁指的是程序看起来在正常执行,但实际上CPU周期被浪费在抢锁,而非执行任务上,从而程序整体的执行效率低下。活锁的问题定位起来要麻烦很多。所以在单机场景下,不建议使用这种锁。
|
||||
|
||||
## 6.2.3 基于 Redis 的 setnx
|
||||
## 6.2.3 基于Redis的setnx
|
||||
|
||||
在分布式场景下,我们也需要这种“抢占”的逻辑,这时候怎么办呢?我们可以使用Redis提供的`setnx`命令:
|
||||
|
||||
```go
|
||||
package main
|
||||
@ -226,7 +232,7 @@ unlock success!
|
||||
|
||||
所以,我们需要依赖于这些请求到达Redis节点的顺序来做正确的抢锁操作。如果用户的网络环境比较差,那也只能自求多福了。
|
||||
|
||||
## 6.2.4 基于 ZooKeeper
|
||||
## 6.2.4 基于ZooKeeper
|
||||
|
||||
```go
|
||||
package main
|
||||
@ -259,12 +265,14 @@ func main() {
|
||||
|
||||
基于ZooKeeper的锁与基于Redis的锁的不同之处在于Lock成功之前会一直阻塞,这与我们单机场景中的`mutex.Lock`很相似。
|
||||
|
||||
其原理也是基于临时sequence节点和watch API,例如我们这里使用的是`/lock`节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致。如果一致,说明加锁成功了。
|
||||
其原理也是基于临时Sequence节点和watch API,例如我们这里使用的是`/lock`节点。Lock会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有watch该节点的程序。这时候程序会检查当前节点下最小的子节点的id是否与自己的一致。如果一致,说明加锁成功了。
|
||||
|
||||
这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景。按照Google的Chubby论文里的阐述,基于强一致协议的锁适用于`粗粒度`的加锁操作。这里的粗粒度指锁占用时间较长。我们在使用时也应思考在自己的业务场景中使用是否合适。
|
||||
|
||||
## 6.2.5 基于 etcd
|
||||
|
||||
etcd是分布式系统中,功能上与ZooKeeper类似的组件,这两年越来越火了。上面基于ZooKeeper我们实现了分布式阻塞锁,基于etcd,也可以实现类似的功能:
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
@ -298,76 +306,14 @@ func main() {
|
||||
}
|
||||
```
|
||||
|
||||
etcd中没有像ZooKeeper那样的sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是:
|
||||
etcd中没有像ZooKeeper那样的Sequence节点。所以其锁实现和基于ZooKeeper实现的有所不同。在上述示例代码中使用的etcdsync的Lock流程是:
|
||||
|
||||
1. 先检查`/lock`路径下是否有值,如果有值,说明锁已经被别人抢了
|
||||
2. 如果没有值,那么写入自己的值。写入成功返回,说明加锁成功。写入时如果节点被其它节点写入过了,那么会导致加锁失败,这时候到 3
|
||||
3. watch `/lock`下的事件,此时陷入阻塞
|
||||
4. 当`/lock`路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动unlock),或者过期事件(说明锁过期失效)。如果是的话,那么回到 1,走抢锁流程。
|
||||
|
||||
## 6.2.6 Redlock
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"gopkg.in/redsync.v1"
|
||||
)
|
||||
|
||||
func newPool(server string) *redis.Pool {
|
||||
return &redis.Pool{
|
||||
MaxIdle: 3,
|
||||
IdleTimeout: 240 * time.Second,
|
||||
|
||||
Dial: func() (redis.Conn, error) {
|
||||
c, err := redis.Dial("tcp", server)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return c, err
|
||||
},
|
||||
|
||||
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
||||
_, err := c.Do("PING")
|
||||
return err
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newPools(servers []string) []redsync.Pool {
|
||||
pools := []redsync.Pool{}
|
||||
for _, server := range servers {
|
||||
pool := newPool(server)
|
||||
pools = append(pools, pool)
|
||||
}
|
||||
|
||||
return pools
|
||||
}
|
||||
|
||||
func main() {
|
||||
pools := newPools([]string{
|
||||
"127.0.0.1:6379", "127.0.0.1:6378", "127.0.0.1:6377",
|
||||
})
|
||||
rs := redsync.New(pools)
|
||||
m := rs.NewMutex("/lock")
|
||||
|
||||
err := m.Lock()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println("lock success")
|
||||
unlockRes := m.Unlock()
|
||||
fmt.Println("unlock result: ", unlockRes)
|
||||
}
|
||||
```
|
||||
|
||||
Redlock也是一种阻塞锁,单个节点操作对应的是`set nx px`命令,超过半数节点返回成功时,就认为加锁成功。
|
||||
|
||||
关于Redlock设计曾经在社区引起一场口水战,分布式专家各抒己见。不过这个不是我们要讨论的内容,相关链接在参考资料中给出。
|
||||
值得一提的是,在etcdv3的API中官方已经提供了可以直接使用的锁API,读者可以查阅etcd的文档做进一步的学习。
|
||||
|
||||
## 6.2.7 如何选择
|
||||
|
||||
@ -375,9 +321,7 @@ Redlock也是一种阻塞锁,单个节点操作对应的是`set nx px`命令
|
||||
|
||||
如果发展到了分布式服务阶段,但业务规模不大,qps很小的情况下,使用哪种锁方案都差不多。如果公司内已有可以使用的ZooKeeper、etcd或者Redis集群,那么就尽量在不引入新的技术栈的情况下满足业务需求。
|
||||
|
||||
业务发展到一定量级的话,就需要从多方面来考虑了。首先是你的锁是否在任何恶劣的条件下都不允许数据丢失,如果不允许,那么就不要使用Redis的setnx的简单锁。
|
||||
|
||||
如果要使用Redlock,那么要考虑你们公司Redis的集群方案,是否可以直接把对应的Redis的实例的ip+port暴露给开发人员。如果不可以,那也没法用。
|
||||
业务发展到一定量级的话,就需要从多方面来考虑了。首先是你的锁是否在任何恶劣的条件下都不允许数据丢失,如果不允许,那么就不要使用Redis的`setnx`的简单锁。
|
||||
|
||||
对锁数据的可靠性要求极高的话,那只能使用etcd或者ZooKeeper这种通过一致性协议保证数据可靠性的锁方案。但可靠的背面往往都是较低的吞吐量和较高的延迟。需要根据业务的量级对其进行压力测试,以确保分布式锁所使用的etcd或ZooKeeper集群可以承受得住实际的业务请求压力。需要注意的是,etcd和Zookeeper集群是没有办法通过增加节点来提高其性能的。要对其进行横向扩展,只能增加搭建多个集群来支持更多的请求。这会进一步提高对运维和监控的要求。多个集群可能需要引入proxy,没有proxy那就需要业务去根据某个业务id来做分片。如果业务已经上线的情况下做扩展,还要考虑数据的动态迁移。这些都不是容易的事情。
|
||||
|
||||
|
@ -13,7 +13,7 @@
|
||||
|
||||
timer的实现在工业界已经是有解的问题了。常见的就是时间堆和时间轮。
|
||||
|
||||
## 6.3.1 timer 实现
|
||||
## 6.3.1 timer实现
|
||||
|
||||
### 6.3.1.1 时间堆
|
||||
|
||||
@ -74,7 +74,7 @@ Go自身的timer就是用时间堆来实现的,不过并没有使用二叉堆
|
||||
|
||||
下面给出一种思路:
|
||||
|
||||
我们可以参考Elasticsearch的设计,每份任务数据都有多个副本,这里假设两副本:
|
||||
我们可以参考Elasticsearch的设计,每份任务数据都有多个副本,这里假设两副本,如*图 6-8*所示:
|
||||
|
||||

|
||||
|
||||
@ -84,7 +84,7 @@ Go自身的timer就是用时间堆来实现的,不过并没有使用二叉堆
|
||||
|
||||
一个任务只会在持有主副本的节点上被执行。
|
||||
|
||||
当有机器故障时,任务数据需要进行数据再平衡的工作,比如节点1挂了:
|
||||
当有机器故障时,任务数据需要进行数据再平衡的工作,比如节点1挂了,见*图 6-9*。
|
||||
|
||||

|
||||
|
||||
|
@ -6,9 +6,9 @@
|
||||
|
||||
> 在线交易处理(OLTP, Online transaction processing)是指透过信息系统、电脑网络及数据库,以线上交易的方式处理一般即时性的作业数据,和更早期传统数据库系统大量批量的作业方式并不相同。OLTP通常被运用于自动化的数据处理工作,如订单输入、金融业务…等反复性的日常性交易活动。和其相对的是属于决策分析层次的联机分析处理(OLAP)。
|
||||
|
||||
在互联网的业务场景中,也有一些实时性要求不高(可以接受多秒的延迟),但是查询复杂性却很高的场景。举个例子,在电商的wms系统中,或者在大多数业务场景丰富的crm或者客服系统中,可能需要提供几十个字段的随意组合查询功能。这种系统的数据维度天生众多,比如一个电商的wms中对一件货物的描述,可能有下面这些字段:
|
||||
在互联网的业务场景中,也有一些实时性要求不高(可以接受多秒的延迟),但是查询复杂性却很高的场景。举个例子,在电商的WMS系统中,或者在大多数业务场景丰富的CRM或者客服系统中,可能需要提供几十个字段的随意组合查询功能。这种系统的数据维度天生众多,比如一个电商的WMS中对一件货物的描述,可能有下面这些字段:
|
||||
|
||||
> 仓库id,入库时间,库位分区id,储存货架id,入库操作员id,出库操作员id,库存数量,过期时间,sku类型,产品品牌,产品分类,内件数量
|
||||
> 仓库id,入库时间,库位分区id,储存货架id,入库操作员id,出库操作员id,库存数量,过期时间,SKU类型,产品品牌,产品分类,内件数量
|
||||
|
||||
除了上述信息,如果商品在仓库内有流转。可能还有有关联的流程 id,当前的流转状态等等。
|
||||
|
||||
@ -369,7 +369,7 @@ SQL的where部分就是boolean expression。我们之前提到过,这种bool
|
||||
|
||||
*图 6-13 基于时间戳的数据同步*
|
||||
|
||||
这种同步方式与业务强绑定,例如wms系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从MySQL的出库单表中,把最近十分钟创建的所有出库单取出,批量存入es中,具体的逻辑实际上就是一条SQL:
|
||||
这种同步方式与业务强绑定,例如WMS系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从MySQL的出库单表中,把最近十分钟创建的所有出库单取出,批量存入es中,具体的逻辑实际上就是一条SQL:
|
||||
|
||||
```sql
|
||||
select * from wms_orders where update_time >= date_sub(now(), interval 10 minute);
|
||||
@ -393,6 +393,6 @@ select * from wms_orders where update_time >= date_sub(
|
||||
|
||||
业界使用较多的是阿里开源的Canal,来进行binlog解析与同步。canal会伪装成MySQL的从库,然后解析好行格式的binlog,再以更容易解析的格式(例如json)发送到消息队列。
|
||||
|
||||
由下游的Kafka消费者负责把上游数据表的自增主键作为es的document的id进行写入,这样可以保证每次接收到binlog时,对应id的数据都被覆盖更新为最新。MySQL的row格式的binlog会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按id进行覆盖即可。
|
||||
由下游的Kafka消费者负责把上游数据表的自增主键作为es的文档的id进行写入,这样可以保证每次接收到binlog时,对应id的数据都被覆盖更新为最新。MySQL的Row格式的binlog会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按id进行覆盖即可。
|
||||
|
||||
这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键id来保证我们进入es的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。当然,你也可以为每一张需要的表去定制消费者的逻辑,这就不是通用系统讨论的范畴了。
|
||||
|
@ -4,13 +4,13 @@
|
||||
|
||||
## 6.5.1 常见的负载均衡思路
|
||||
|
||||
如果我们不考虑均衡的话,现在有n个endpoint,我们完成业务流程实际上只需要从这n个中挑出其中的一个。有几种思路:
|
||||
如果我们不考虑均衡的话,现在有n个服务节点,我们完成业务流程实际上只需要从这n个中挑出其中的一个。有几种思路:
|
||||
|
||||
1. 按顺序挑: 例如上次选了第一台,那么这次就选第二台,下次第三台,如果已经到了最后一台,那么下一次从第一台开始。这种情况下我们可以把endpoint都存储在数组中,每次请求完成下游之后,将一个索引后移即可。在移到尽头时再移回数组开头处。
|
||||
1. 按顺序挑: 例如上次选了第一台,那么这次就选第二台,下次第三台,如果已经到了最后一台,那么下一次从第一台开始。这种情况下我们可以把服务节点信息都存储在数组中,每次请求完成下游之后,将一个索引后移即可。在移到尽头时再移回数组开头处。
|
||||
|
||||
2. 随机挑一个: 每次都随机挑,真随机伪随机均可。假设选择第 x 台机器,那么x可描述为`rand.Intn()%n`。
|
||||
|
||||
3. 根据某种权重,对下游endpoints进行排序,选择权重最大/小的那一个。
|
||||
3. 根据某种权重,对下游节点进行排序,选择权重最大/小的那一个。
|
||||
|
||||
当然了,实际场景我们不可能无脑轮询或者无脑随机,如果对下游请求失败了,我们还需要某种机制来进行重试,如果纯粹的随机算法,存在一定的可能性使你在下一次仍然随机到这次的问题节点。
|
||||
|
||||
@ -18,7 +18,7 @@
|
||||
|
||||
## 6.5.2 基于洗牌算法的负载均衡
|
||||
|
||||
考虑到我们需要随机选取每次发送请求的endpoint,同时在遇到下游返回错误时换其它节点重试。所以我们设计一个大小和endpoints数组大小一致的索引数组,每次来新的请求,我们对索引数组做洗牌,然后取第一个元素作为选中的服务节点,如果请求失败,那么选择下一个节点重试,以此类推:
|
||||
考虑到我们需要随机选取每次发送请求的节点,同时在遇到下游返回错误时换其它节点重试。所以我们设计一个大小和节点数组大小一致的索引数组,每次来新的请求,我们对索引数组做洗牌,然后取第一个元素作为选中的服务节点,如果请求失败,那么选择下一个节点重试,以此类推:
|
||||
|
||||
```go
|
||||
var endpoints = []string {
|
||||
@ -75,9 +75,9 @@ func request(params map[string]interface{}) error {
|
||||
|
||||
2. 洗牌不均匀,会导致整个数组第一个节点有大概率被选中,并且多个节点的负载分布不均衡。
|
||||
|
||||
第一点比较简单,应该不用在这里给出证明了。关于第二点,我们可以用概率知识来简单证明一下。假设每次挑选都是真随机,我们假设第一个位置的endpoint在`len(slice)`次交换中都不被选中的概率是`((6/7)*(6/7))^7 ≈ 0.34`。而分布均匀的情况下,我们肯定希望被第一个元素在任意位置上分布的概率均等,所以其被随机选到的概率应该约等于`1/7≈0.14`。
|
||||
第一点比较简单,应该不用在这里给出证明了。关于第二点,我们可以用概率知识来简单证明一下。假设每次挑选都是真随机,我们假设第一个位置的节点在`len(slice)`次交换中都不被选中的概率是`((6/7)*(6/7))^7 ≈ 0.34`。而分布均匀的情况下,我们肯定希望被第一个元素在任意位置上分布的概率均等,所以其被随机选到的概率应该约等于`1/7≈0.14`。
|
||||
|
||||
显然,这里给出的洗牌算法对于任意位置的元素来说,有30%的概率不对其进行交换操作。所以所有元素都倾向于留在原来的位置。因为我们每次对`shuffle`数组输入的都是同一个序列,所以第一个元素有更大的概率会被选中。在负载均衡的场景下,也就意味着endpoints数组中的第一台机器负载会比其它机器高不少(这里至少是3倍以上)。
|
||||
显然,这里给出的洗牌算法对于任意位置的元素来说,有30%的概率不对其进行交换操作。所以所有元素都倾向于留在原来的位置。因为我们每次对`shuffle`数组输入的都是同一个序列,所以第一个元素有更大的概率会被选中。在负载均衡的场景下,也就意味着节点数组中的第一台机器负载会比其它机器高不少(这里至少是3倍以上)。
|
||||
|
||||
### 6.5.2.2 修正洗牌算法
|
||||
|
||||
@ -108,7 +108,7 @@ func shuffle(n int) []int {
|
||||
|
||||
本节中的场景是从N个节点中选择一个节点发送请求,初始请求结束之后,后续的请求会重新对数组洗牌,所以每两个请求之间没有什么关联关系。因此我们上面的洗牌算法,理论上不初始化随机库的种子也是不会出什么问题的。
|
||||
|
||||
但在一些特殊的场景下,例如使用ZooKeeper时,客户端初始化从多个服务节点中挑选一个节点后,是会向该节点建立长连接的。并且之后如果有请求,也都会发送到该节点去。直到该节点不可用,才会在endpoints列表中挑选下一个节点。在这种场景下,我们的初始连接节点选择就要求必须是“真”随机了。否则,所有客户端起动时,都会去连接同一个ZooKeeper的实例,根本无法起到负载均衡的目的。如果在日常开发中,你的业务也是类似的场景,也务必考虑一下是否会发生类似的情况。为rand库设置种子的方法:
|
||||
但在一些特殊的场景下,例如使用ZooKeeper时,客户端初始化从多个服务节点中挑选一个节点后,是会向该节点建立长连接的。之后客户端请求都会发往该节点去。直到该节点不可用,才会在节点列表中挑选下一个节点。在这种场景下,我们的初始连接节点选择就要求必须是“真”随机了。否则,所有客户端起动时,都会去连接同一个ZooKeeper的实例,根本无法起到负载均衡的目的。如果在日常开发中,你的业务也是类似的场景,也务必考虑一下是否会发生类似的情况。为rand库设置种子的方法:
|
||||
|
||||
```go
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
@ -8,9 +8,7 @@
|
||||
|
||||
## 基于colly的单机爬虫
|
||||
|
||||
有很多程序员比较喜欢在v2ex上讨论问题,发表观点,有时候可能懒癌发作,我们希望能直接命令行爬到v2ex在Go tag下的新贴,只要简单写一个爬虫即可。
|
||||
|
||||
《Go 语言编程》一书给出了简单的爬虫示例,经过了多年的发展,现在使用Go语言写一个网站的爬虫要更加方便,比如用colly来实现爬取v2ex前十页内容:
|
||||
《Go 语言编程》一书给出了简单的爬虫示例,经过了多年的发展,现在使用Go语言写一个网站的爬虫要更加方便,比如用colly来实现爬取某网站(虚拟站点,这里用abcdefg作为占位符)在Go语言标签下的前十页内容:
|
||||
|
||||
```go
|
||||
package main
|
||||
@ -28,14 +26,16 @@ var visited = map[string]bool{}
|
||||
func main() {
|
||||
// Instantiate default collector
|
||||
c := colly.NewCollector(
|
||||
colly.AllowedDomains("www.v2ex.com"),
|
||||
colly.AllowedDomains("www.abcdefg.com"),
|
||||
colly.MaxDepth(1),
|
||||
)
|
||||
|
||||
// 我们认为匹配该模式的是该网站的详情页
|
||||
detailRegex, _ := regexp.Compile(`/go/go\?p=\d+$`)
|
||||
// 匹配下面模式的是该网站的列表页
|
||||
listRegex, _ := regexp.Compile(`/t/\d+#\w+`)
|
||||
|
||||
// On every a element which has href attribute call callback
|
||||
// 所有a标签,上设置回调函数
|
||||
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
|
||||
link := e.Attr("href")
|
||||
|
||||
@ -44,13 +44,15 @@ func main() {
|
||||
return
|
||||
}
|
||||
|
||||
// 匹配下列两种 url 模式的,才去 visit
|
||||
// https://www.v2ex.com/go/go?p=2
|
||||
// https://www.v2ex.com/t/472945#reply3
|
||||
// 既不是列表页,也不是详情页
|
||||
// 那么不是我们关心的内容,要跳过
|
||||
if !detailRegex.Match([]byte(link)) && !listRegex.Match([]byte(link)) {
|
||||
println("not match", link)
|
||||
return
|
||||
}
|
||||
|
||||
// 因为大多数网站有反爬虫策略
|
||||
// 所以爬虫逻辑中应该有 sleep 逻辑以避免被封杀
|
||||
time.Sleep(time.Second)
|
||||
println("match", link)
|
||||
|
||||
@ -60,10 +62,8 @@ func main() {
|
||||
c.Visit(e.Request.AbsoluteURL(link))
|
||||
})
|
||||
|
||||
err := c.Visit("https://www.v2ex.com/go/go")
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
}
|
||||
err := c.Visit("https://www.abcdefg.com/go/go")
|
||||
if err != nil {fmt.Println(err)}
|
||||
}
|
||||
```
|
||||
|
||||
@ -112,10 +112,7 @@ nats的服务端项目是gnatsd,客户端与gnatsd的通信方式为基于tcp
|
||||
|
||||
```go
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
// log error
|
||||
return
|
||||
}
|
||||
if err != nil {return}
|
||||
|
||||
// 指定 subject 为 tasks,消息内容随意
|
||||
err = nc.Publish("tasks", []byte("your task content"))
|
||||
@ -131,27 +128,18 @@ nc.Flush()
|
||||
|
||||
```go
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
// log error
|
||||
return
|
||||
}
|
||||
if err != nil {return}
|
||||
|
||||
// queue subscribe 相当于在消费者之间进行任务分发的分支均衡
|
||||
// 前提是所有消费者都使用 workers 这个 queue
|
||||
// nats 中的 queue 概念上类似于 Kafka 中的 consumer group
|
||||
sub, err := nc.QueueSubscribeSync("tasks", "workers")
|
||||
if err != nil {
|
||||
// log error
|
||||
return
|
||||
}
|
||||
if err != nil {return}
|
||||
|
||||
var msg *nats.Msg
|
||||
for {
|
||||
msg, err = sub.NextMsg(time.Hour * 10000)
|
||||
if err != nil {
|
||||
// log error
|
||||
break
|
||||
}
|
||||
if err != nil {break}
|
||||
// 正确地消费到了消息
|
||||
// 可用 nats.Msg 对象处理任务
|
||||
}
|
||||
@ -159,7 +147,7 @@ for {
|
||||
|
||||
#### 结合colly的消息生产
|
||||
|
||||
我们为每一个网站定制一个对应的collector,并设置相应的规则,比如v2ex,v2fx(虚构的),再用简单的工厂方法来将该collector和其host对应起来:
|
||||
我们为每一个网站定制一个对应的collector,并设置相应的规则,比如abcdefg,hijklmn(虚构的),再用简单的工厂方法来将该collector和其host对应起来,每个站点爬到列表页之后,需要在当前程序中把所有链接解析出来,并把落地页的URL发往消息队列。
|
||||
|
||||
```go
|
||||
package main
|
||||
@ -181,9 +169,9 @@ func factory(urlStr string) *colly.Collector {
|
||||
return domain2Collector[u.Host]
|
||||
}
|
||||
|
||||
func initV2exCollector() *colly.Collector {
|
||||
func initABCDECollector() *colly.Collector {
|
||||
c := colly.NewCollector(
|
||||
colly.AllowedDomains("www.v2ex.com"),
|
||||
colly.AllowedDomains("www.abcdefg.com"),
|
||||
colly.MaxDepth(maxDepth),
|
||||
)
|
||||
|
||||
@ -194,18 +182,25 @@ func initV2exCollector() *colly.Collector {
|
||||
|
||||
c.OnHTML("a[href]", func(e *colly.HTMLElement) {
|
||||
// 基本的反爬虫策略
|
||||
link := e.Attr("href")
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
// TODO, 正则 match 列表页的话,就 visit
|
||||
// TODO, 正则 match 落地页的话,就发消息队列
|
||||
c.Visit(e.Request.AbsoluteURL(link))
|
||||
// 正则 match 列表页的话,就 visit
|
||||
if listRegex.Match([]byte(link)) {
|
||||
c.Visit(e.Request.AbsoluteURL(link))
|
||||
}
|
||||
// 正则 match 落地页的话,就发消息队列
|
||||
if detailRegex.Match([]byte(link)) {
|
||||
err = nc.Publish("tasks", []byte(link))
|
||||
nc.Flush()
|
||||
}
|
||||
})
|
||||
return c
|
||||
}
|
||||
|
||||
func initV2fxCollector() *colly.Collector {
|
||||
func initHIJKLCollector() *colly.Collector {
|
||||
c := colly.NewCollector(
|
||||
colly.AllowedDomains("www.v2fx.com"),
|
||||
colly.AllowedDomains("www.hijklmn.com"),
|
||||
colly.MaxDepth(maxDepth),
|
||||
)
|
||||
|
||||
@ -216,19 +211,16 @@ func initV2fxCollector() *colly.Collector {
|
||||
}
|
||||
|
||||
func init() {
|
||||
domain2Collector["www.v2ex.com"] = initV2exCollector()
|
||||
domain2Collector["www.v2fx.com"] = initV2fxCollector()
|
||||
domain2Collector["www.abcdefg.com"] = initV2exCollector()
|
||||
domain2Collector["www.hijklmn.com"] = initV2fxCollector()
|
||||
|
||||
var err error
|
||||
nc, err = nats.Connect(natsURL)
|
||||
if err != nil {
|
||||
// log fatal
|
||||
os.Exit(1)
|
||||
}
|
||||
if err != nil {os.Exit(1)}
|
||||
}
|
||||
|
||||
func main() {
|
||||
urls := []string{"https://www.v2ex.com", "https://www.v2fx.com"}
|
||||
urls := []string{"https://www.abcdefg.com", "https://www.hijklmn.com"}
|
||||
for _, url := range urls {
|
||||
instance := factory(url)
|
||||
instance.Visit(url)
|
||||
@ -239,6 +231,8 @@ func main() {
|
||||
|
||||
#### 结合 colly 的消息消费
|
||||
|
||||
消费端就简单一些了,我们只需要订阅对应的主题,并直接访问网站的详情页(落地页)即可。
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
@ -261,60 +255,48 @@ func factory(urlStr string) *colly.Collector {
|
||||
|
||||
func initV2exCollector() *colly.Collector {
|
||||
c := colly.NewCollector(
|
||||
colly.AllowedDomains("www.v2ex.com"),
|
||||
colly.AllowedDomains("www.abcdefg.com"),
|
||||
colly.MaxDepth(maxDepth),
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func initV2fxCollector() *colly.Collector {
|
||||
c := colly.NewCollector(
|
||||
colly.AllowedDomains("www.v2fx.com"),
|
||||
colly.AllowedDomains("www.hijklmn.com"),
|
||||
colly.MaxDepth(maxDepth),
|
||||
)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func init() {
|
||||
domain2Collector["www.v2ex.com"] = initV2exCollector()
|
||||
domain2Collector["www.v2fx.com"] = initV2fxCollector()
|
||||
domain2Collector["www.abcdefg.com"] = initABCDECollector()
|
||||
domain2Collector["www.hijklmn.com"] = initHIJKLCollector()
|
||||
|
||||
var err error
|
||||
nc, err = nats.Connect(natsURL)
|
||||
if err != nil {
|
||||
// log fatal
|
||||
os.Exit(1)
|
||||
}
|
||||
if err != nil {os.Exit(1)}
|
||||
}
|
||||
|
||||
func startConsumer() {
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
// log error
|
||||
return
|
||||
}
|
||||
if err != nil {return}
|
||||
|
||||
sub, err := nc.QueueSubscribeSync("tasks", "workers")
|
||||
if err != nil {
|
||||
// log error
|
||||
return
|
||||
}
|
||||
if err != nil {return}
|
||||
|
||||
var msg *nats.Msg
|
||||
for {
|
||||
msg, err = sub.NextMsg(time.Hour * 10000)
|
||||
if err != nil {
|
||||
// log error
|
||||
break
|
||||
}
|
||||
if err != nil {break}
|
||||
|
||||
urlStr := string(msg.Data)
|
||||
ins := factory(urlStr)
|
||||
// 因为最下游拿到的一定是对应网站的落地页
|
||||
// 所以不用进行多余的判断了,直接爬内容即可
|
||||
ins.Visit(urlStr)
|
||||
// 防止被封杀
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user