1
0
mirror of https://github.com/chai2010/advanced-go-programming-book.git synced 2025-05-25 05:02:23 +00:00
This commit is contained in:
chai2010 2018-07-21 21:43:38 +08:00
commit 87f40d7217
2 changed files with 166 additions and 9 deletions

View File

@ -4,8 +4,10 @@
从语言本身的特点来讲Go 编译后不依赖外部运行环境,部署方便,无论公司内的部署系统是古老的 ansible还是现在的 docker 镜像部署。都能够方便地与 Go 进行集成。所以 Go 天生适合分布式场景。而在没有 docker 的旧时代python、java 类的语言都会遇到部署时依赖环境冲突的问题,为此还出现了不少专门解决这些环境问题的工具。 从语言本身的特点来讲Go 编译后不依赖外部运行环境,部署方便,无论公司内的部署系统是古老的 ansible还是现在的 docker 镜像部署。都能够方便地与 Go 进行集成。所以 Go 天生适合分布式场景。而在没有 docker 的旧时代python、java 类的语言都会遇到部署时依赖环境冲突的问题,为此还出现了不少专门解决这些环境问题的工具。
TODO pipenv java 咋解决的? 比如 python 为了解决这个问题推出了 pipenv 方案。java 系统中要为无法共享 jvm 环境的应用分别设置自己的环境变量。
TODO这里有图 ## 锁
本章将带大家 Go 语言如何与一些常见的分布式系统打交道,重点在工程与应用。 ## 数据一致性
## 定时器

View File

@ -187,7 +187,97 @@ es 的 Bool Query 方案,实际上就是用 json 来表达了这种程序语
### 基于 client sdk 做开发 ### 基于 client sdk 做开发
TODO 初始化:
```go
// 选用 elastic 版本时
// 注意与自己使用的 elasticsearch 要对应
import (
elastic "gopkg.in/olivere/elastic.v3"
)
var esClient *elastic.Client
func initElasticsearchClient(host string, port string) {
var err error
esClient, err = elastic.NewClient(
elastic.SetURL(fmt.Sprintf("http://%s:%s", host, port)),
elastic.SetMaxRetries(3),
)
if err != nil {
// log error
}
}
```
插入:
```go
func insertDocument(db string, table string, obj map[string]interface{}) {
id := obj["id"]
var indexName, typeName string
// 数据库中的 database/table 概念,可以简单映射到 es 的 index 和 type
// 不过需要注意,因为 es 中的 _type 本质上只是 document 的一个字段
// 所以单个 index 内容过多会导致性能问题
// 在新版本中 type 已经废弃
// 为了让不同表的数据落入不同的 index这里我们用 table+name 作为 index 的名字
indexName = fmt.Sprintf("%v_%v", db, table)
typeName = table
//正常情况
res, err := esClient.Index().Index(indexName).Type(typeName).Id(id).BodyJson(obj).Do()
if err != nil {
// handle error
} else {
// insert success
}
}
```
获取:
```go
func query(indexName string, typeName string) (*elastic.SearchResult, error) {
// 通过 bool must 和 bool shoud 添加 bool 查询条件
q := elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("id", 1),
elastic.NewBoolQuery().Must(elastic.NewMatchPhraseQuery("male", "m")))
q = q.Should(elastic.NewMatchPhraseQuery("name", "alex"),
elastic.NewMatchPhraseQuery("name", "xargin"))
searchService := esClient.Search(indexName).Type(typeName)
res, err := searchService.Query(q).Do()
if err != nil {
// log error
return nil, err
}
return res, nil
}
```
删除:
```go
func deleteDocument(indexName string, typeName string, obj map[string]interface{}) {
id := obj["id"]
res, err := esClient.Delete().Index(indexName).Type(typeName).Id(id).Do()
if err != nil {
// handle error
} else {
// delete success
}
}
```
因为 lucene 的性质,本质上搜索引擎内的数据是不可变的,所以如果要对 document 进行更新,实际上是按照 id 进行完全覆盖的操作,所以与插入的情况是一样的。
使用 es 作为数据库使用时,需要注意,因为 es 有索引合并的操作,所以数据插入到 es 中到可以查询得到有一段时间(由 es 的 refresh_interval 决定)。所以千万不要把 es 当成强一致的关系型数据库来使用。
### 将 sql 转换为 DSL ### 将 sql 转换为 DSL
@ -264,7 +354,6 @@ SQL 的 where 部分就是 boolean expression。我们之前提到过这种 b
当然可以,我们把 SQL 的 where 被 Parse 之后的结构和 es 的 DSL 的结构做个对比: 当然可以,我们把 SQL 的 where 被 Parse 之后的结构和 es 的 DSL 的结构做个对比:
![ast](../images/ch6-ast-dsl.png) ![ast](../images/ch6-ast-dsl.png)
既然结构上完全一致,逻辑上我们就可以相互转换。我们以广度优先对 AST 树进行遍历,然后将二元表达式转换成 json 字符串,再拼装起来就可以了,限于篇幅,本文中就不给出示例了,读者朋友可以查看: 既然结构上完全一致,逻辑上我们就可以相互转换。我们以广度优先对 AST 树进行遍历,然后将二元表达式转换成 json 字符串,再拼装起来就可以了,限于篇幅,本文中就不给出示例了,读者朋友可以查看:
@ -281,7 +370,25 @@ SQL 的 where 部分就是 boolean expression。我们之前提到过这种 b
### 通过时间戳进行增量数据同步 ### 通过时间戳进行增量数据同步
TODO 这里有图 ```
┌────────────────────────┐ ┌────────────────────────┐
│ move 10 min data to es │ │ move 10 min data to es │
└────────────────────────┘ └────────────────────────┘
│ │ ┌───────────────┐
───────────────┼────────────────┬──────────────┴─────────────┬──────────────▶ │ time passes │
│ ┌───────┐ │ │ └───────────────┘
│◀──┤ 10min ├───▶│ ┌────────────────────────┐
│ └───────┘ │ │ move 10 min data to es │
│ └────────────────────────┘
┌────────────────────────┐
│ move 10 min data to es │
└────────────────────────┘
```
这种同步方式与业务强绑定,例如 wms 系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从 MySQL 的出库单表中,把最近十分钟创建的所有出库单取出,批量存入 es 中,具体的逻辑实际上就是一条 SQL 这种同步方式与业务强绑定,例如 wms 系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从 MySQL 的出库单表中,把最近十分钟创建的所有出库单取出,批量存入 es 中,具体的逻辑实际上就是一条 SQL
@ -299,8 +406,56 @@ select * from wms_orders where update_time >= date_sub(now(), interval 11 minute
### 通过 binlog 进行数据同步 ### 通过 binlog 进行数据同步
TODO 这里有图 ```
┌────────────────────────┐
│ MySQL master │
└────────────────────────┘
┌───────────────────┐
│ row format binlog │
└───────────────────┘
┌───────────────┴──────────────┐
│ │
│ │
▼ ▼
┌────────────────────────┐ ┌─────────────────┐
│ MySQL slave │ │ canal │
└────────────────────────┘ └─────────────────┘
┌─────────┴──────────┐
│ parsed binlog │
└─────────┬──────────┘
┌────────────────┐
│ kafka │─────┐
└────────────────┘ │
┌───────────┴──────┐
│ kafka consumer │
└───────────┬──────┘
│ ┌────────────────┐
└─────▶│ elasticsearch │
└────────────────┘
```
业界使用较多的是阿里开源的 canal来进行 binlog 解析与同步。把上游数据表的自增主键作为 es 的 document 的 id 进行写入,这样可以保证每次接收到 binlog 时,对应 id 的数据都被覆盖更新为最新。MySQL 的 row 格式的 binlog 会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按 id 进行覆盖即可。 业界使用较多的是阿里开源的 canal来进行 binlog 解析与同步。canal 会伪装成 MySQL 的从库,然后解析好行格式的 binlog再以更容易解析的格式(例如 json) 发送到消息队列
这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键 id 来保证我们进入 es 的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。 由下游的 kafka 消费者负责把上游数据表的自增主键作为 es 的 document 的 id 进行写入,这样可以保证每次接收到 binlog 时,对应 id 的数据都被覆盖更新为最新。MySQL 的 row 格式的 binlog 会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按 id 进行覆盖即可。
这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键 id 来保证我们进入 es 的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。当然,你也可以为每一张需要的表去订制消费者的逻辑,这就不是通用系统讨论的范畴了。