1
0
mirror of https://github.com/chai2010/advanced-go-programming-book.git synced 2025-05-27 14:52:20 +00:00

add fixed ch4-04-grpc/grpc-pubsub example

This commit is contained in:
sfw 2018-08-10 08:36:27 +08:00
parent e323f6c40a
commit 1a842affa5
6 changed files with 54 additions and 51 deletions

View File

@ -1,10 +1,12 @@
package main
import (
"google.golang.org/grpc"
"log"
."gobook.examples/ch4-04-grpc/grpc-pubsub/helloservice"
"context"
"log"
"google.golang.org/grpc"
pb "gobook.examples/ch4-04-grpc/grpc-pubsub/pubsubservice"
)
func main() {
@ -14,13 +16,13 @@ func main() {
}
defer conn.Close()
client := NewPubsubServiceClient(conn)
client := pb.NewPubsubServiceClient(conn)
_, err = client.Publish(context.Background(), &String{Value: "golang: hello Go"})
_, err = client.Publish(context.Background(), &pb.String{Value: "golang: hello Go"})
if err != nil {
log.Fatal(err)
}
_, err = client.Publish(context.Background(), &String{Value: "docker: hello Docker"})
_, err = client.Publish(context.Background(), &pb.String{Value: "docker: hello Docker"})
if err != nil {
log.Fatal(err)
}

View File

@ -1,12 +1,14 @@
package main
import (
"google.golang.org/grpc"
"log"
."gobook.examples/ch4-04-grpc/grpc-pubsub/helloservice"
"context"
"io"
"fmt"
"io"
"log"
"google.golang.org/grpc"
pb "gobook.examples/ch4-04-grpc/grpc-pubsub/pubsubservice"
)
func main() {
@ -16,8 +18,8 @@ func main() {
}
defer conn.Close()
client := NewPubsubServiceClient(conn)
stream, err := client.Subscribe(context.Background(), &String{Value: "golang:"})
client := pb.NewPubsubServiceClient(conn)
stream, err := client.Subscribe(context.Background(), &pb.String{Value: "golang:"})
if err != nil {
log.Fatal(err)
}

View File

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: hello.proto
// source: pubsubservice.proto
package helloservice
package pubsubservice
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
@ -34,7 +34,7 @@ func (m *String) Reset() { *m = String{} }
func (m *String) String() string { return proto.CompactTextString(m) }
func (*String) ProtoMessage() {}
func (*String) Descriptor() ([]byte, []int) {
return fileDescriptor_hello_0af1fa4e228d172c, []int{0}
return fileDescriptor_pubsubservice_f5055945d20b225a, []int{0}
}
func (m *String) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_String.Unmarshal(m, b)
@ -62,7 +62,7 @@ func (m *String) GetValue() string {
}
func init() {
proto.RegisterType((*String)(nil), "helloservice.String")
proto.RegisterType((*String)(nil), "pubsubservice.String")
}
// Reference imports to suppress errors if they are not otherwise used.
@ -91,7 +91,7 @@ func NewPubsubServiceClient(cc *grpc.ClientConn) PubsubServiceClient {
func (c *pubsubServiceClient) Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error) {
out := new(String)
err := c.cc.Invoke(ctx, "/helloservice.PubsubService/Publish", in, out, opts...)
err := c.cc.Invoke(ctx, "/pubsubservice.PubsubService/Publish", in, out, opts...)
if err != nil {
return nil, err
}
@ -99,7 +99,7 @@ func (c *pubsubServiceClient) Publish(ctx context.Context, in *String, opts ...g
}
func (c *pubsubServiceClient) Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubsubService_SubscribeClient, error) {
stream, err := c.cc.NewStream(ctx, &_PubsubService_serviceDesc.Streams[0], "/helloservice.PubsubService/Subscribe", opts...)
stream, err := c.cc.NewStream(ctx, &_PubsubService_serviceDesc.Streams[0], "/pubsubservice.PubsubService/Subscribe", opts...)
if err != nil {
return nil, err
}
@ -150,7 +150,7 @@ func _PubsubService_Publish_Handler(srv interface{}, ctx context.Context, dec fu
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/helloservice.PubsubService/Publish",
FullMethod: "/pubsubservice.PubsubService/Publish",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(PubsubServiceServer).Publish(ctx, req.(*String))
@ -180,7 +180,7 @@ func (x *pubsubServiceSubscribeServer) Send(m *String) error {
}
var _PubsubService_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloservice.PubsubService",
ServiceName: "pubsubservice.PubsubService",
HandlerType: (*PubsubServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
@ -195,20 +195,20 @@ var _PubsubService_serviceDesc = grpc.ServiceDesc{
ServerStreams: true,
},
},
Metadata: "hello.proto",
Metadata: "pubsubservice.proto",
}
func init() { proto.RegisterFile("hello.proto", fileDescriptor_hello_0af1fa4e228d172c) }
func init() { proto.RegisterFile("pubsubservice.proto", fileDescriptor_pubsubservice_f5055945d20b225a) }
var fileDescriptor_hello_0af1fa4e228d172c = []byte{
// 135 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x48, 0xcd, 0xc9,
0xc9, 0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x01, 0x73, 0x8a, 0x53, 0x8b, 0xca, 0x32,
0x93, 0x53, 0x95, 0xe4, 0xb8, 0xd8, 0x82, 0x4b, 0x8a, 0x32, 0xf3, 0xd2, 0x85, 0x44, 0xb8, 0x58,
0xcb, 0x12, 0x73, 0x4a, 0x53, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83, 0x20, 0x1c, 0xa3, 0x46,
0x46, 0x2e, 0xde, 0x80, 0xd2, 0xa4, 0xe2, 0xd2, 0xa4, 0x60, 0x88, 0x0e, 0x21, 0x53, 0x2e, 0xf6,
0x80, 0xd2, 0xa4, 0x9c, 0xcc, 0xe2, 0x0c, 0x21, 0x11, 0x3d, 0x64, 0xb3, 0xf4, 0x20, 0x06, 0x49,
0x61, 0x15, 0x15, 0xb2, 0xe4, 0xe2, 0x0c, 0x2e, 0x4d, 0x2a, 0x4e, 0x2e, 0xca, 0x4c, 0x4a, 0x25,
0x45, 0xa3, 0x01, 0x63, 0x12, 0x1b, 0xd8, 0xe1, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6e,
0xf2, 0xde, 0x40, 0xc7, 0x00, 0x00, 0x00,
var fileDescriptor_pubsubservice_f5055945d20b225a = []byte{
// 132 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2e, 0x28, 0x4d, 0x2a,
0x2e, 0x4d, 0x2a, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17,
0xe2, 0x45, 0x11, 0x54, 0x92, 0xe3, 0x62, 0x0b, 0x2e, 0x29, 0xca, 0xcc, 0x4b, 0x17, 0x12, 0xe1,
0x62, 0x2d, 0x4b, 0xcc, 0x29, 0x4d, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x70, 0x8c,
0x5a, 0x19, 0xb9, 0x78, 0x03, 0xc0, 0x3a, 0x82, 0x21, 0x3a, 0x84, 0xcc, 0xb9, 0xd8, 0x03, 0x4a,
0x93, 0x72, 0x32, 0x8b, 0x33, 0x84, 0x44, 0xf5, 0x50, 0x6d, 0x80, 0x98, 0x24, 0x85, 0x5d, 0x58,
0xc8, 0x9a, 0x8b, 0x33, 0xb8, 0x34, 0xa9, 0x38, 0xb9, 0x28, 0x33, 0x29, 0x95, 0x34, 0xad, 0x06,
0x8c, 0x49, 0x6c, 0x60, 0xd7, 0x1b, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0x25, 0x3b, 0x39, 0x24,
0xd4, 0x00, 0x00, 0x00,
}

View File

@ -1,6 +1,6 @@
syntax = "proto3";
package helloservice;
package pubsubservice;
message String {
string value = 1;
@ -11,4 +11,4 @@ service PubsubService {
rpc Subscribe (String) returns (stream String);
}
//protoc --go_out=plugins=grpc:. hello.proto
//protoc --go_out=plugins=grpc:. pubsubservice.proto

View File

@ -1,16 +1,17 @@
package main
import (
"time"
"google.golang.org/grpc"
"net"
"log"
."gobook.examples/ch4-04-grpc/grpc-pubsub/helloservice"
"github.com/docker/docker/pkg/pubsub"
"context"
"log"
"net"
"strings"
)
"time"
"github.com/docker/docker/pkg/pubsub"
"google.golang.org/grpc"
pb "gobook.examples/ch4-04-grpc/grpc-pubsub/pubsubservice"
)
type PubsubService struct {
pub *pubsub.Publisher
@ -23,25 +24,24 @@ func NewPubsubService() *PubsubService {
}
func (p *PubsubService) Publish(
ctx context.Context, arg *String,
) (*String, error) {
ctx context.Context, arg *pb.String,
) (*pb.String, error) {
p.pub.Publish(arg.GetValue())
//debug
//reply := &String{Value: "<Publish> " + arg.GetValue()}
//fmt.Println(reply.GetValue())
return &String{}, nil
return &pb.String{}, nil
}
func (p *PubsubService) Subscribe(
arg *String, stream PubsubService_SubscribeServer,
arg *pb.String, stream pb.PubsubService_SubscribeServer,
) error {
ch := p.pub.SubscribeTopic(func(v interface{}) bool {
if key, ok := v.(string); ok {
//debug
//fmt.Printf("<debug> %t %s %s %t\n",
// ok,arg.GetValue(),key,strings.HasPrefix(key,arg.GetValue()))
if strings.HasPrefix(key,arg.GetValue()) {
if strings.HasPrefix(key, arg.GetValue()) {
return true
}
}
@ -49,7 +49,7 @@ func (p *PubsubService) Subscribe(
})
for v := range ch {
if err := stream.Send(&String{Value: v.(string)}); err != nil {
if err := stream.Send(&pb.String{Value: v.(string)}); err != nil {
return err
}
}
@ -59,7 +59,7 @@ func (p *PubsubService) Subscribe(
func main() {
grpcServer := grpc.NewServer()
RegisterPubsubServiceServer(grpcServer,NewPubsubService())
pb.RegisterPubsubServiceServer(grpcServer, NewPubsubService())
lis, err := net.Listen("tcp", ":1234")
if err != nil {
@ -67,5 +67,4 @@ func main() {
}
grpcServer.Serve(lis)
}