1
0
mirror of https://github.com/chai2010/advanced-go-programming-book.git synced 2025-05-24 12:32:21 +00:00

ch4-03: 完善

This commit is contained in:
chai2010 2018-07-09 11:48:08 +08:00
parent 94fd255f59
commit 9a0d2db439

View File

@ -2,6 +2,179 @@
在不同的场景中RPC有着不同的需求因此开源的社区就诞生了各种RPC框架。本节我们将尝试Go内置RPC框架在一些比较特殊场景的用法。
## 客户端RPC的实现原理
Go语言的RPC库最简单的方式是通过`Client.Call`方法进行同步阻塞调用,该方法的实现如下:
```go
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
```
首先通过`Client.Go`方法进行一次异步调用,返回一个表示这次调用的`Call`结构体。然后等待`Call`结构体的Done管道返回调用结果。
我们也可以通过`Client.Go`方法异步调用前面的HelloService服务
```go
func doClientWork(client *rpc.Client) {
helloCall := client.Go("HelloService.Hello", "hello", new(string), nil)
// do some thing
helloCall = <-helloCall.Done
if err := helloCall.Error; err != nil {
log.Fatal(err)
}
args := helloCall.Args.(string)
reply := helloCall.Reply.(string)
fmt.Println(args, reply)
}
```
在异步调用命令发出后一般会执行其他的任务因此异步调用的输入参数和返回值可以通过返回的Call变量进行获取。
执行异步调用的`Client.Go`方法实现如下:
```go
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
call.Done = make(chan *Call, 10) // buffered.
client.send(call)
return call
}
```
首先是构造一个表示当前调用的call变量然后通过`client.send`将call的完整参数发送到RPC框架。`client.send`方法调用是线程安全的因此可以从多个Goroutine同时向同一个RPC链接发生调用指令。
当调用完成或者发生错误时,将调用`call.done`方法通知完成:
```go
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
}
}
```
`Call.done`方法的实现可以得知`call.Done`管道会将输入的call原样返回。
## 基于RPC实现Watch功能
在很多系统中都提供了Watch监视功能的接口当系统满足某种条件时Watch方法返回监控的结果。在这里我们可以尝试通过RPC框架实现一个基本的Watch功能。如前文所描述因为`client.send`是线程安全的我们也可以通过在不同的Goroutine中同时并发阻塞调用RPC方法。通过在一个独立的Goroutine中调用Watch函数进行监控。
为了便于演示我们计划通过RPC构造一个简单的内存KV数据库。首先定义服务如下
```go
type KVStoreService struct {
m map[string]string
filter map[string]func(key string)
mu sync.Mutex
}
func NewKVStoreService() *KVStoreService {
return &KVStoreService{
m: make(map[string]string),
filter: make(map[string]func(key string)),
}
}
```
其中`m`成员用于存储KV数据。`filter`成员对应每个Watch调用时定义的过滤器函数列表。而`mu`成员为互斥锁用于在多个Goroutine访问或修改时对其它成员提供保护。
然后就是Get和Set方法
```go
func (p *KVStoreService) Get(key string, value *string) error {
p.Lock()
defer p.Unlock()
if v, ok := p.m[key]; ok {
*value = v
return nil
}
return fmt.Errorf("not found")
}
func (p *KVStoreService) Set(kv [2]string, *struct{}) error {
p.Lock()
defer p.Unlock()
key, value := kv[0], kv[1]
if oldValue := p.m[key]; oldValue != value {
for _, fn := range p.filter {
fn(key)
}
}
p.m[key] = value
return nil
}
```
在Set方法中输入参数是key和value组成的数组用一个匿名的空结构体表示忽略到返回值。当修改某个key对应的值时会调用每一个过滤器函数。
而过滤器列表在Watch方法中提供
```go
func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
ch := make(chan string, 10) // buffered
p.Lock()
p.filter[id] = func(key string) { ch <- key }
p.Unlock()
select {
case <-time.After(time.Duration(timeoutSecond) * time.Second):
return fmt.Errorf("timeout")
case key := <-ch:
*keyChanged = key
return nil
}
return nil
}
```
Watch方法的输入参数是超时的秒数。当有key变化时将key作为返回值返回。如果超过时间后依然没有key被修改则返回超时的错误。Watch的实现中用唯一的id表示每个Watch调用然后根据id将自身对应的过滤器函数注册到`p.filter`列表。
KVStoreService服务到注册和启动过程我们不再赘述。下面我们看看如何从客户端使用Watch方法
```go
func doClientWork(client *rpc.Client) {
go func() {
var keyChanged string
err := client.Call("KVStoreService.Watch", 30, &keyChanged)
if err != nil {
log.Fatal(err)
}
fmt.Println("watch:", keyChanged)
} ()
err := client.Call("KVStoreService.Set", [2]string{"abc", "abc-value"}, new(struct{}))
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second*3)
}
```
首先启动一个独立的Goroutine监控key的变化。同步的watch调用会阻塞直到有可以发生变化或者超时。然后在通过Set方法修改KV值时服务器会将变化的key通过Watch方法返回。这样我们就可以实现对某些状态的监控。
## 反向RPC
通常的RPC是基于C/S结构RPC的服务端对应网络的服务器RPC的客户端也对应网络客户端。但是对于一些特殊场景比如在公司内网提供一个RPC服务但是在外网无法链接到内网的服务器。这种时候我们可以参考类似反向代理的技术首先从内网主动链接到外网的TCP服务器然后基于TCP链接向外网提供RPC服务。