diff --git a/ch6-cloud/ch6-01-cloud.md b/ch6-cloud/ch6-01-cloud.md index 68c5865..1e269b5 100644 --- a/ch6-cloud/ch6-01-cloud.md +++ b/ch6-cloud/ch6-01-cloud.md @@ -4,8 +4,10 @@ 从语言本身的特点来讲,Go 编译后不依赖外部运行环境,部署方便,无论公司内的部署系统是古老的 ansible,还是现在的 docker 镜像部署。都能够方便地与 Go 进行集成。所以 Go 天生适合分布式场景。而在没有 docker 的旧时代,python、java 类的语言都会遇到部署时依赖环境冲突的问题,为此还出现了不少专门解决这些环境问题的工具。 -TODO pipenv java 咋解决的? +比如 python 为了解决这个问题推出了 pipenv 方案。java 系统中要为无法共享 jvm 环境的应用分别设置自己的环境变量。 -TODO,这里有图 +## 锁 -本章将带大家 Go 语言如何与一些常见的分布式系统打交道,重点在工程与应用。 +## 数据一致性 + +## 定时器 diff --git a/ch6-cloud/ch6-02-dist-search-engine.md b/ch6-cloud/ch6-02-dist-search-engine.md index 010844c..89e1d70 100644 --- a/ch6-cloud/ch6-02-dist-search-engine.md +++ b/ch6-cloud/ch6-02-dist-search-engine.md @@ -187,7 +187,97 @@ es 的 Bool Query 方案,实际上就是用 json 来表达了这种程序语 ### 基于 client sdk 做开发 -TODO +初始化: + +```go +// 选用 elastic 版本时 +// 注意与自己使用的 elasticsearch 要对应 +import ( + elastic "gopkg.in/olivere/elastic.v3" +) + +var esClient *elastic.Client + +func initElasticsearchClient(host string, port string) { + var err error + esClient, err = elastic.NewClient( + elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)), + elastic.SetMaxRetries(3), + ) + + if err != nil { + // log error + } +} +``` + +插入: + +```go +func insertDocument(db string, table string, obj map[string]interface{}) { + + id := obj["id"] + + var indexName, typeName string + // 数据库中的 database/table 概念,可以简单映射到 es 的 index 和 type + // 不过需要注意,因为 es 中的 _type 本质上只是 document 的一个字段 + // 所以单个 index 内容过多会导致性能问题 + // 在新版本中 type 已经废弃 + // 为了让不同表的数据落入不同的 index,这里我们用 table+name 作为 index 的名字 + indexName = fmt.Sprintf("%v_%v", db, table) + typeName = table + + //正常情况 + res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do() + if err != nil { + // handle error + } else { + // insert success + } +} + +``` + +获取: + +```go +func query(indexName string, typeName string) (*elastic.SearchResult, error) { + // 通过 bool must 和 bool shoud 添加 bool 查询条件 + q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1), + elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m"))) + + q = q.Should(elastic.NewMatchPhraseQuery("name", "alex"), + elastic.NewMatchPhraseQuery("name", "xargin")) + + searchService := esClient.Search(indexName).Type(typeName) + res, err := searchService.Query(q).Do() + if err != nil { + // log error + return nil, err + } + + return res, nil +} +``` + +删除: + +```go +func deleteDocument(indexName string, typeName string, obj map[string]interface{}) { + id := obj["id"] + + res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do() + if err != nil { + // handle error + } else { + // delete success + } +} +``` + +因为 lucene 的性质,本质上搜索引擎内的数据是不可变的,所以如果要对 document 进行更新,实际上是按照 id 进行完全覆盖的操作,所以与插入的情况是一样的。 + +使用 es 作为数据库使用时,需要注意,因为 es 有索引合并的操作,所以数据插入到 es 中到可以查询得到有一段时间(由 es 的 refresh_interval 决定)。所以千万不要把 es 当成强一致的关系型数据库来使用。 ### 将 sql 转换为 DSL @@ -264,7 +354,6 @@ SQL 的 where 部分就是 boolean expression。我们之前提到过,这种 b 当然可以,我们把 SQL 的 where 被 Parse 之后的结构和 es 的 DSL 的结构做个对比: - ![ast](../images/ch6-ast-dsl.png) 既然结构上完全一致,逻辑上我们就可以相互转换。我们以广度优先对 AST 树进行遍历,然后将二元表达式转换成 json 字符串,再拼装起来就可以了,限于篇幅,本文中就不给出示例了,读者朋友可以查看: @@ -281,7 +370,25 @@ SQL 的 where 部分就是 boolean expression。我们之前提到过,这种 b ### 通过时间戳进行增量数据同步 -TODO 这里有图 +``` + ┌────────────────────────┐ ┌────────────────────────┐ + │ move 10 min data to es │ │ move 10 min data to es │ + └────────────────────────┘ └────────────────────────┘ + + │ │ ┌───────────────┐ +───────────────┼────────────────┬──────────────┴─────────────┬──────────────▶ │ time passes │ + │ ┌───────┐ │ │ └───────────────┘ + │◀──┤ 10min ├───▶│ ┌────────────────────────┐ + │ └───────┘ │ │ move 10 min data to es │ + │ └────────────────────────┘ + │ + │ + │ + │ + ┌────────────────────────┐ + │ move 10 min data to es │ + └────────────────────────┘ +``` 这种同步方式与业务强绑定,例如 wms 系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从 MySQL 的出库单表中,把最近十分钟创建的所有出库单取出,批量存入 es 中,具体的逻辑实际上就是一条 SQL: @@ -299,8 +406,56 @@ select * from wms_orders where update_time >= date_sub(now(), interval 11 minute ### 通过 binlog 进行数据同步 -TODO 这里有图 +``` + ┌────────────────────────┐ + │ MySQL master │ + └────────────────────────┘ + │ + │ + │ + │ + │ + │ + ▼ + ┌───────────────────┐ + │ row format binlog │ + └───────────────────┘ + │ + │ + │ + ┌───────────────┴──────────────┐ + │ │ + │ │ + ▼ ▼ +┌────────────────────────┐ ┌─────────────────┐ +│ MySQL slave │ │ canal │ +└────────────────────────┘ └─────────────────┘ + │ + ┌─────────┴──────────┐ + │ parsed binlog │ + └─────────┬──────────┘ + │ + ▼ + ┌────────────────┐ + │ kafka │─────┐ + └────────────────┘ │ + │ + │ + │ + │ + ┌───────────┴──────┐ + │ kafka consumer │ + └───────────┬──────┘ + │ + │ + │ + │ ┌────────────────┐ + └─────▶│ elasticsearch │ + └────────────────┘ +``` -业界使用较多的是阿里开源的 canal,来进行 binlog 解析与同步。把上游数据表的自增主键作为 es 的 document 的 id 进行写入,这样可以保证每次接收到 binlog 时,对应 id 的数据都被覆盖更新为最新。MySQL 的 row 格式的 binlog 会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按 id 进行覆盖即可。 +业界使用较多的是阿里开源的 canal,来进行 binlog 解析与同步。canal 会伪装成 MySQL 的从库,然后解析好行格式的 binlog,再以更容易解析的格式(例如 json) 发送到消息队列。 -这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键 id 来保证我们进入 es 的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。 +由下游的 kafka 消费者负责把上游数据表的自增主键作为 es 的 document 的 id 进行写入,这样可以保证每次接收到 binlog 时,对应 id 的数据都被覆盖更新为最新。MySQL 的 row 格式的 binlog 会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按 id 进行覆盖即可。 + +这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键 id 来保证我们进入 es 的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。当然,你也可以为每一张需要的表去订制消费者的逻辑,这就不是通用系统讨论的范畴了。