mirror of
https://github.com/chai2010/advanced-go-programming-book.git
synced 2025-05-24 04:22:22 +00:00
ch4-04: 完善
This commit is contained in:
parent
504b58f904
commit
9c90459cc9
@ -206,33 +206,176 @@ for {
|
||||
|
||||
这样就完成了完整的流接收和发送支持。
|
||||
|
||||
## 发布和订阅模式
|
||||
|
||||
<!--
|
||||
Publish
|
||||
Watch
|
||||
在前一节中,我们基于Go内置的RPC库实现了一个简化版的Watch方法。基于Watch的思路虽然也可以构造发布和订阅系统,但是因为RPC缺乏流机制导致每次只能返回一个结果。在发布和订阅模式中,由调用者主动发起的发布行为类似一个普通函数调用,而被动的订阅者则类似GRPC客户端单向流中的接收者。现在我们可以尝试基于GRPC的流特性构造一个发布和订阅系统。
|
||||
|
||||
TODO
|
||||
发布订阅是一个常见的设计模式,开源社区中已经存在很多该模式的实现。其中docker项目中提供了一个pubsub的极简实现,下面是基于pubsub包实现的本地发布订阅代码:
|
||||
|
||||
## 认证
|
||||
```go
|
||||
import (
|
||||
"github.com/docker/docker/pkg/pubsub"
|
||||
)
|
||||
|
||||
TODO
|
||||
func main() {
|
||||
p := pubsub.NewPublisher(100*time.Millisecond, 10)
|
||||
|
||||
golang := p.SubscribeTopic(func(v interface{}) bool {
|
||||
if key, ok := v.(string); ok {
|
||||
if strings.Hasprefix("golang:") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
docker := p.SubscribeTopic(func(v interface{}) bool {
|
||||
if key, ok := v.(string); ok {
|
||||
if strings.Hasprefix("docker:") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
go p.Publish("hi")
|
||||
go p.Publish("golang: https://golang.org")
|
||||
go p.Publish("docker: https://www.docker.com/")
|
||||
time.Sleep(1)
|
||||
|
||||
入门/流/认证
|
||||
go func () {
|
||||
fmt.Println("golang topic:", <-golang)
|
||||
} ()
|
||||
go func () {
|
||||
fmt.Println("docker topic:", <-docker)
|
||||
} ()
|
||||
|
||||
--
|
||||
<-make(chan bool)
|
||||
}
|
||||
```
|
||||
|
||||
简单介绍
|
||||
其中`pubsub.NewPublisher`构造一个发布对象,`p.SubscribeTopic()`可以通过函数筛选感兴趣的主题进行订阅。
|
||||
|
||||
同步/异步
|
||||
流
|
||||
现在尝试基于gRPC和pubsub包,提供一个跨网络的发布和订阅系统。首先通过Protobuf定义一个发布订阅服务接口:
|
||||
|
||||
验证/密码
|
||||
```protobuf
|
||||
service PubsubService {
|
||||
rpc Publish (String) returns (String);
|
||||
rpc SubscribeTopic (String) returns (stream String);
|
||||
}
|
||||
```
|
||||
|
||||
日志截取器,panic 捕获
|
||||
其中Publish是普通的RPC方法,SubscribeTopic则是一个单向的流服务。然后grpc插件会为服务端和客户端生成对应的接口:
|
||||
|
||||
gtpc到rest扩展
|
||||
```go
|
||||
type PubsubServiceServer interface {
|
||||
Publish(context.Context, *String) (*String, error)
|
||||
Subscribe(*String, PubsubService_SubscribeServer) error
|
||||
}
|
||||
type PubsubServiceClient interface {
|
||||
Publish(ctx context.Context, in *String, opts ...grpc.CallOption) (*String, error)
|
||||
Subscribe(ctx context.Context, in *String, opts ...grpc.CallOption) (PubsubService_SubscribeClient, error)
|
||||
}
|
||||
|
||||
type HelloService_SubscribeServer interface {
|
||||
Send(*String) error
|
||||
grpc.ServerStream
|
||||
}
|
||||
```
|
||||
|
||||
然后就可以实现发布和订阅服务了:
|
||||
|
||||
```go
|
||||
type PubsubService struct {
|
||||
pub *pubsub.Publisher
|
||||
}
|
||||
|
||||
func NewPubsubService() *PubsubService {
|
||||
return &PubsubService{
|
||||
pub: pubsub.NewPublisher(100*time.Millisecond, 10),
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
然后是实现发布方法和订阅方法:
|
||||
|
||||
```go
|
||||
func (p *PubsubService) Publish(ctx context.Context, arg *String) (*String, error) {
|
||||
p.pub.Publish(arg.GetValue())
|
||||
return &String{}, nil
|
||||
}
|
||||
|
||||
func (p *PubsubService) Subscribe(arg *String, stream PubsubService_SubscribeServer) error {
|
||||
ch := p.SubscribeTopic(func(v interface{}) bool {
|
||||
if key, ok := v.(string); ok {
|
||||
if strings.Hasprefix(arg.GetValue()) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
for v := range ch {
|
||||
if err := stream.Send(&String{Value: v.(string)}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
这样就可以从客户端向服务器发布信息了:
|
||||
|
||||
```go
|
||||
func main() {
|
||||
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := NewPubsubServiceClient(conn)
|
||||
|
||||
reply, err := client.Publish(context.Background(), &String{Value: "golang: hello Go"})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
reply, err := client.Publish(context.Background(), &String{Value: "docker: hello Docker"})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
然后就可以在新的客户端进行订阅信息了:
|
||||
|
||||
```go
|
||||
func main() {
|
||||
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure())
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
client := NewPubsubServiceClient(conn)
|
||||
stream, err := client.Channel(context.Background(), &String{Value: "golang:"})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
for {
|
||||
reply, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
fmt.Println(reply.GetValue())
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
到此我们就基于GRPC简单实现了一个跨网络的发布和订阅服务。
|
||||
|
||||
参数的自动验证,在截取器进行
|
||||
-->
|
||||
|
Loading…
x
Reference in New Issue
Block a user