diff --git a/ch4-rpc/ch4-03-netrpc-hack.md b/ch4-rpc/ch4-03-netrpc-hack.md index 85c3ca9..4370a02 100644 --- a/ch4-rpc/ch4-03-netrpc-hack.md +++ b/ch4-rpc/ch4-03-netrpc-hack.md @@ -2,6 +2,179 @@ 在不同的场景中RPC有着不同的需求,因此开源的社区就诞生了各种RPC框架。本节我们将尝试Go内置RPC框架在一些比较特殊场景的用法。 +## 客户端RPC的实现原理 + +Go语言的RPC库最简单的方式是通过`Client.Call`方法进行同步阻塞调用,该方法的实现如下: + +```go +func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error { + call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done + return call.Error +} +``` + +首先通过`Client.Go`方法进行一次异步调用,返回一个表示这次调用的`Call`结构体。然后等待`Call`结构体的Done管道返回调用结果。 + +我们也可以通过`Client.Go`方法异步调用前面的HelloService服务: + +```go +func doClientWork(client *rpc.Client) { + helloCall := client.Go("HelloService.Hello", "hello", new(string), nil) + + // do some thing + + helloCall = <-helloCall.Done + if err := helloCall.Error; err != nil { + log.Fatal(err) + } + + args := helloCall.Args.(string) + reply := helloCall.Reply.(string) + fmt.Println(args, reply) +} +``` + +在异步调用命令发出后,一般会执行其他的任务,因此异步调用的输入参数和返回值可以通过返回的Call变量进行获取。 + +执行异步调用的`Client.Go`方法实现如下: + +```go +func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call { + call := new(Call) + call.ServiceMethod = serviceMethod + call.Args = args + call.Reply = reply + call.Done = make(chan *Call, 10) // buffered. + + client.send(call) + return call +} +``` + +首先是构造一个表示当前调用的call变量,然后通过`client.send`将call的完整参数发送到RPC框架。`client.send`方法调用是线程安全的,因此可以从多个Goroutine同时向同一个RPC链接发生调用指令。 + +当调用完成或者发生错误时,将调用`call.done`方法通知完成: + +```go +func (call *Call) done() { + select { + case call.Done <- call: + // ok + default: + // We don't want to block here. It is the caller's responsibility to make + // sure the channel has enough buffer space. See comment in Go(). + } +} +``` + +从`Call.done`方法的实现可以得知`call.Done`管道会将输入的call原样返回。 + +## 基于RPC实现Watch功能 + +在很多系统中都提供了Watch监视功能的接口,当系统满足某种条件时Watch方法返回监控的结果。在这里我们可以尝试通过RPC框架实现一个基本的Watch功能。如前文所描述,因为`client.send`是线程安全的,我们也可以通过在不同的Goroutine中同时并发阻塞调用RPC方法。通过在一个独立的Goroutine中调用Watch函数进行监控。 + +为了便于演示,我们计划通过RPC构造一个简单的内存KV数据库。首先定义服务如下: + +```go +type KVStoreService struct { + m map[string]string + filter map[string]func(key string) + mu sync.Mutex +} + +func NewKVStoreService() *KVStoreService { + return &KVStoreService{ + m: make(map[string]string), + filter: make(map[string]func(key string)), + } +} +``` + +其中`m`成员用于存储KV数据。`filter`成员对应每个Watch调用时定义的过滤器函数列表。而`mu`成员为互斥锁,用于在多个Goroutine访问或修改时对其它成员提供保护。 + +然后就是Get和Set方法: + +```go +func (p *KVStoreService) Get(key string, value *string) error { + p.Lock() + defer p.Unlock() + + if v, ok := p.m[key]; ok { + *value = v + return nil + } + + return fmt.Errorf("not found") +} + +func (p *KVStoreService) Set(kv [2]string, *struct{}) error { + p.Lock() + defer p.Unlock() + + key, value := kv[0], kv[1] + + if oldValue := p.m[key]; oldValue != value { + for _, fn := range p.filter { + fn(key) + } + } + + p.m[key] = value + return nil +} +``` + +在Set方法中,输入参数是key和value组成的数组,用一个匿名的空结构体表示忽略到返回值。当修改某个key对应的值时会调用每一个过滤器函数。 + +而过滤器列表在Watch方法中提供: + +```go +func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error { + id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int()) + ch := make(chan string, 10) // buffered + + p.Lock() + p.filter[id] = func(key string) { ch <- key } + p.Unlock() + + select { + case <-time.After(time.Duration(timeoutSecond) * time.Second): + return fmt.Errorf("timeout") + case key := <-ch: + *keyChanged = key + return nil + } + + return nil +} +``` + +Watch方法的输入参数是超时的秒数。当有key变化时将key作为返回值返回。如果超过时间后依然没有key被修改,则返回超时的错误。Watch的实现中,用唯一的id表示每个Watch调用,然后根据id将自身对应的过滤器函数注册到`p.filter`列表。 + +KVStoreService服务到注册和启动过程我们不再赘述。下面我们看看如何从客户端使用Watch方法: + +```go +func doClientWork(client *rpc.Client) { + go func() { + var keyChanged string + err := client.Call("KVStoreService.Watch", 30, &keyChanged) + if err != nil { + log.Fatal(err) + } + fmt.Println("watch:", keyChanged) + } () + + err := client.Call("KVStoreService.Set", [2]string{"abc", "abc-value"}, new(struct{})) + if err != nil { + log.Fatal(err) + } + + time.Sleep(time.Second*3) +} +``` + +首先启动一个独立的Goroutine监控key的变化。同步的watch调用会阻塞,直到有可以发生变化或者超时。然后在通过Set方法修改KV值时,服务器会将变化的key通过Watch方法返回。这样我们就可以实现对某些状态的监控。 + ## 反向RPC 通常的RPC是基于C/S结构,RPC的服务端对应网络的服务器,RPC的客户端也对应网络客户端。但是对于一些特殊场景,比如在公司内网提供一个RPC服务,但是在外网无法链接到内网的服务器。这种时候我们可以参考类似反向代理的技术,首先从内网主动链接到外网的TCP服务器,然后基于TCP链接向外网提供RPC服务。