1
0
mirror of https://github.com/chai2010/advanced-go-programming-book.git synced 2025-05-24 04:22:22 +00:00

ch4-04: done

This commit is contained in:
chai2010 2018-07-03 09:10:22 +08:00
parent f227bb7900
commit c3c6370d29
5 changed files with 246 additions and 10 deletions

View File

@ -35,7 +35,7 @@
* [4.1. RPC入门](ch4-rpc/ch4-01-rpc-intro.md) * [4.1. RPC入门](ch4-rpc/ch4-01-rpc-intro.md)
* [4.2. Protobuf](ch4-rpc/ch4-02-pb-intro.md) * [4.2. Protobuf](ch4-rpc/ch4-02-pb-intro.md)
* [4.3. 玩转RPC](ch4-rpc/ch4-03-netrpc-hack.md) * [4.3. 玩转RPC](ch4-rpc/ch4-03-netrpc-hack.md)
* [4.4. GRPC入门(TODO)](ch4-rpc/ch4-04-grpc.md) * [4.4. GRPC入门](ch4-rpc/ch4-04-grpc.md)
* [4.5. GRPC进阶(TODO)](ch4-rpc/ch4-05-grpc-hack.md) * [4.5. GRPC进阶(TODO)](ch4-rpc/ch4-05-grpc-hack.md)
* [4.6. Protobuf扩展语法和插件(TODO)](ch4-rpc/ch4-06-pb-option.md) * [4.6. Protobuf扩展语法和插件(TODO)](ch4-rpc/ch4-06-pb-option.md)
* [4.7. 其它RPC系统(TODO)](ch4-rpc/ch4-07-other-rpc.md) * [4.7. 其它RPC系统(TODO)](ch4-rpc/ch4-07-other-rpc.md)

View File

@ -95,13 +95,129 @@ GRPC和标准库的RPC框架还有一个区别GRPC生成的接口并不支持
## GRPC流 ## GRPC流
RPC是远程函数调用因此每次调用的函数参数和返回值不能太大负责将严重影响每次调用的性能。因此传统的RPC方法调用对于上传和下载较大数据量场景并不适合。同时传统RPC模式也不适用于对于时间不确定的订阅和发布模式。为此GRPC框架分别提供了服务器端和客户端的流特性。
服务端或客户端的单向流是双向流的特例我们在HelloService增加一个支持双向流的Channel方法
```proto
service HelloService {
rpc Hello (String) returns (String);
rpc Channel (stream String) returns (stream String);
}
```
关键字stream指定启用流特性参数部分是接收客户端参数的流返回值是返回给客户端的流。
重新生成代码可以可以看到接口中新增加的Channel方法的定义
```go
type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error)
Channel(HelloService_ChannelServer) error
}
type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
Channel(ctx context.Context, opts ...grpc.CallOption) (HelloService_ChannelClient, error)
}
```
在服务端的Channel方法参数是一个新的HelloService_ChannelServer类型的参数可以用于和客户端双向通信。客户端的Channel方法返回一个HelloService_ChannelClient类型的返回值可以用于和服务端进行双向通信。
HelloService_ChannelServer和HelloService_ChannelClient均为接口类型
```go
type HelloService_ChannelServer interface {
Send(*String) error
Recv() (*String, error)
grpc.ServerStream
}
type HelloService_ChannelClient interface {
Send(*String) error
Recv() (*String, error)
grpc.ClientStream
}
```
可以发现服务端和客户端的流辅助接口均定义了Send和Recv方法用于流数据的双向通信。
现在我们可以实现流服务:
```go
func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
for {
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
reply := &String{Value: "hello:" + args.GetValue()}
err = stream.Send(reply)
if err != nil {
return err
}
}
}
```
服务端在循环中接收客户端发来的数据如果遇到io.EOF表示客户端流被关闭如果函数退出表示服务端流关闭。然后生成返回的数据通过流发送给客户端。需要主要的是发送和接收的操作并不需要一一对应用户可以根据真实场景进行组织代码。
客户端需要先调用Channel方法获取返回的流对象
```go
stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}
```
在客户端我们将发送和接收操作放到两个独立的Goroutine。首先是向服务端发生数据
```go
go func() {
for {
if err := stream.Send(&String{Value: "hi"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()
```
然后在循环中接收服务端返回的数据:
```go
for {
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
```
这样就完成了完整的流接收和发生支持。
<!--
Publish
Watch
TODO TODO
## 认证 ## 认证
TODO TODO
<!--
入门/流/认证 入门/流/认证

View File

@ -34,7 +34,7 @@ func (m *String) Reset() { *m = String{} }
func (m *String) String() string { return proto.CompactTextString(m) } func (m *String) String() string { return proto.CompactTextString(m) }
func (*String) ProtoMessage() {} func (*String) ProtoMessage() {}
func (*String) Descriptor() ([]byte, []int) { func (*String) Descriptor() ([]byte, []int) {
return fileDescriptor_hello_5dd9d59ecabc789f, []int{0} return fileDescriptor_hello_8909b24da4a57d21, []int{0}
} }
func (m *String) XXX_Unmarshal(b []byte) error { func (m *String) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_String.Unmarshal(m, b) return xxx_messageInfo_String.Unmarshal(m, b)
@ -78,6 +78,7 @@ const _ = grpc.SupportPackageIsVersion4
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type HelloServiceClient interface { type HelloServiceClient interface {
Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) Hello(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
Channel(ctx context.Context, opts ...grpc.CallOption) (HelloService_ChannelClient, error)
} }
type helloServiceClient struct { type helloServiceClient struct {
@ -97,9 +98,41 @@ func (c *helloServiceClient) Hello(ctx context.Context, in *String, opts ...grpc
return out, nil return out, nil
} }
func (c *helloServiceClient) Channel(ctx context.Context, opts ...grpc.CallOption) (HelloService_ChannelClient, error) {
stream, err := c.cc.NewStream(ctx, &_HelloService_serviceDesc.Streams[0], "/main.HelloService/Channel", opts...)
if err != nil {
return nil, err
}
x := &helloServiceChannelClient{stream}
return x, nil
}
type HelloService_ChannelClient interface {
Send(*String) error
Recv() (*String, error)
grpc.ClientStream
}
type helloServiceChannelClient struct {
grpc.ClientStream
}
func (x *helloServiceChannelClient) Send(m *String) error {
return x.ClientStream.SendMsg(m)
}
func (x *helloServiceChannelClient) Recv() (*String, error) {
m := new(String)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// HelloServiceServer is the server API for HelloService service. // HelloServiceServer is the server API for HelloService service.
type HelloServiceServer interface { type HelloServiceServer interface {
Hello(context.Context, *String) (*String, error) Hello(context.Context, *String) (*String, error)
Channel(HelloService_ChannelServer) error
} }
func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) { func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) {
@ -124,6 +157,32 @@ func _HelloService_Hello_Handler(srv interface{}, ctx context.Context, dec func(
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _HelloService_Channel_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(HelloServiceServer).Channel(&helloServiceChannelServer{stream})
}
type HelloService_ChannelServer interface {
Send(*String) error
Recv() (*String, error)
grpc.ServerStream
}
type helloServiceChannelServer struct {
grpc.ServerStream
}
func (x *helloServiceChannelServer) Send(m *String) error {
return x.ServerStream.SendMsg(m)
}
func (x *helloServiceChannelServer) Recv() (*String, error) {
m := new(String)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
var _HelloService_serviceDesc = grpc.ServiceDesc{ var _HelloService_serviceDesc = grpc.ServiceDesc{
ServiceName: "main.HelloService", ServiceName: "main.HelloService",
HandlerType: (*HelloServiceServer)(nil), HandlerType: (*HelloServiceServer)(nil),
@ -133,19 +192,27 @@ var _HelloService_serviceDesc = grpc.ServiceDesc{
Handler: _HelloService_Hello_Handler, Handler: _HelloService_Hello_Handler,
}, },
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{
{
StreamName: "Channel",
Handler: _HelloService_Channel_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "hello.proto", Metadata: "hello.proto",
} }
func init() { proto.RegisterFile("hello.proto", fileDescriptor_hello_5dd9d59ecabc789f) } func init() { proto.RegisterFile("hello.proto", fileDescriptor_hello_8909b24da4a57d21) }
var fileDescriptor_hello_5dd9d59ecabc789f = []byte{ var fileDescriptor_hello_8909b24da4a57d21 = []byte{
// 107 bytes of a gzipped FileDescriptorProto // 124 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x48, 0xcd, 0xc9, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x48, 0xcd, 0xc9,
0xc9, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc, 0x53, 0x92, 0xe3, 0xc9, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc, 0x53, 0x92, 0xe3,
0x62, 0x0b, 0x2e, 0x29, 0xca, 0xcc, 0x4b, 0x17, 0x12, 0xe1, 0x62, 0x2d, 0x4b, 0xcc, 0x29, 0x4d, 0x62, 0x0b, 0x2e, 0x29, 0xca, 0xcc, 0x4b, 0x17, 0x12, 0xe1, 0x62, 0x2d, 0x4b, 0xcc, 0x29, 0x4d,
0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x70, 0x8c, 0x8c, 0xb9, 0x78, 0x3c, 0x40, 0x9a, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x70, 0x8c, 0xe2, 0xb8, 0x78, 0x3c, 0x40, 0x9a,
0x82, 0x53, 0x8b, 0xca, 0x32, 0x93, 0x53, 0x85, 0x94, 0xb9, 0x58, 0xc1, 0x7c, 0x21, 0x1e, 0x3d, 0x82, 0x53, 0x8b, 0xca, 0x32, 0x93, 0x53, 0x85, 0x94, 0xb9, 0x58, 0xc1, 0x7c, 0x21, 0x1e, 0x3d,
0x90, 0x7e, 0x3d, 0x88, 0x66, 0x29, 0x14, 0x5e, 0x12, 0x1b, 0xd8, 0x06, 0x63, 0x40, 0x00, 0x00, 0x90, 0x7e, 0x3d, 0x88, 0x66, 0x29, 0x14, 0x9e, 0x90, 0x26, 0x17, 0xbb, 0x73, 0x46, 0x62, 0x5e,
0x00, 0xff, 0xff, 0xa1, 0x2a, 0xa2, 0x5a, 0x70, 0x00, 0x00, 0x00, 0x5e, 0x6a, 0x0e, 0x3e, 0x65, 0x1a, 0x8c, 0x06, 0x8c, 0x49, 0x6c, 0x60, 0xc7, 0x18, 0x03, 0x02,
0x00, 0x00, 0xff, 0xff, 0x96, 0x1d, 0x8c, 0x0a, 0x9b, 0x00, 0x00, 0x00,
} }

View File

@ -8,4 +8,6 @@ message String {
service HelloService { service HelloService {
rpc Hello (String) returns (String); rpc Hello (String) returns (String);
rpc Channel (stream String) returns (stream String);
} }

View File

@ -3,8 +3,10 @@ package main
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"log" "log"
"net" "net"
"time"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -16,6 +18,26 @@ func (p *HelloServiceImpl) Hello(ctx context.Context, args *String) (*String, er
return reply, nil return reply, nil
} }
func (p *HelloServiceImpl) Channel(stream HelloService_ChannelServer) error {
for {
log.Println(111)
args, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
reply := &String{Value: "hello:" + args.GetValue()}
err = stream.Send(reply)
if err != nil {
return err
}
}
}
func main() { func main() {
go startGrpcServer() go startGrpcServer()
doClientWork() doClientWork()
@ -46,4 +68,33 @@ func doClientWork() {
log.Fatal(err) log.Fatal(err)
} }
fmt.Println(reply.GetValue()) fmt.Println(reply.GetValue())
stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}
go func() {
for {
if err := stream.Send(&String{Value: "hi"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()
for {
log.Println(222)
reply, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println(reply.GetValue())
}
} }