diff --git a/vendor/README.md b/vendor/README.md index 8681d14..951f649 100644 --- a/vendor/README.md +++ b/vendor/README.md @@ -9,5 +9,5 @@ TODO - [gRPC](https://github.com/grpc/grpc-go): [v1.13.0](https://github.com/grpc/grpc-go/releases/tag/v1.13.0) - [grpc-gateway](https://github.com/grpc-ecosystem/grpc-gateway): [v1.4.1](https://github.com/grpc-ecosystem/grpc-gateway/releases/tag/v1.4.1) - [grpc-middleware](https://github.com/grpc-ecosystem/go-grpc-middleware): [v1.0.0](https://github.com/grpc-ecosystem/go-grpc-middleware/releases/tag/v1.0.0) - +- [pubsub](https://godoc.org/github.com/docker/docker/pkg/pubsub): [v17.03.2-ce](https://github.com/moby/moby/releases/tag/v17.03.2-ce) diff --git a/vendor/github.com/docker/docker/pkg/pubsub/publisher.go b/vendor/github.com/docker/docker/pkg/pubsub/publisher.go new file mode 100644 index 0000000..76033ed --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/pubsub/publisher.go @@ -0,0 +1,121 @@ +package pubsub // import "github.com/docker/docker/pkg/pubsub" + +import ( + "sync" + "time" +) + +var wgPool = sync.Pool{New: func() interface{} { return new(sync.WaitGroup) }} + +// NewPublisher creates a new pub/sub publisher to broadcast messages. +// The duration is used as the send timeout as to not block the publisher publishing +// messages to other clients if one client is slow or unresponsive. +// The buffer is used when creating new channels for subscribers. +func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { + return &Publisher{ + buffer: buffer, + timeout: publishTimeout, + subscribers: make(map[subscriber]topicFunc), + } +} + +type subscriber chan interface{} +type topicFunc func(v interface{}) bool + +// Publisher is basic pub/sub structure. Allows to send events and subscribe +// to them. Can be safely used from multiple goroutines. +type Publisher struct { + m sync.RWMutex + buffer int + timeout time.Duration + subscribers map[subscriber]topicFunc +} + +// Len returns the number of subscribers for the publisher +func (p *Publisher) Len() int { + p.m.RLock() + i := len(p.subscribers) + p.m.RUnlock() + return i +} + +// Subscribe adds a new subscriber to the publisher returning the channel. +func (p *Publisher) Subscribe() chan interface{} { + return p.SubscribeTopic(nil) +} + +// SubscribeTopic adds a new subscriber that filters messages sent by a topic. +func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { + ch := make(chan interface{}, p.buffer) + p.m.Lock() + p.subscribers[ch] = topic + p.m.Unlock() + return ch +} + +// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic. +// The returned channel has a buffer of the specified size. +func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} { + ch := make(chan interface{}, buffer) + p.m.Lock() + p.subscribers[ch] = topic + p.m.Unlock() + return ch +} + +// Evict removes the specified subscriber from receiving any more messages. +func (p *Publisher) Evict(sub chan interface{}) { + p.m.Lock() + delete(p.subscribers, sub) + close(sub) + p.m.Unlock() +} + +// Publish sends the data in v to all subscribers currently registered with the publisher. +func (p *Publisher) Publish(v interface{}) { + p.m.RLock() + if len(p.subscribers) == 0 { + p.m.RUnlock() + return + } + + wg := wgPool.Get().(*sync.WaitGroup) + for sub, topic := range p.subscribers { + wg.Add(1) + go p.sendTopic(sub, topic, v, wg) + } + wg.Wait() + wgPool.Put(wg) + p.m.RUnlock() +} + +// Close closes the channels to all subscribers registered with the publisher. +func (p *Publisher) Close() { + p.m.Lock() + for sub := range p.subscribers { + delete(p.subscribers, sub) + close(sub) + } + p.m.Unlock() +} + +func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { + defer wg.Done() + if topic != nil && !topic(v) { + return + } + + // send under a select as to not block if the receiver is unavailable + if p.timeout > 0 { + select { + case sub <- v: + case <-time.After(p.timeout): + } + return + } + + select { + case sub <- v: + default: + } +} diff --git a/vendor/github.com/docker/docker/pkg/pubsub/publisher_test.go b/vendor/github.com/docker/docker/pkg/pubsub/publisher_test.go new file mode 100644 index 0000000..98e1582 --- /dev/null +++ b/vendor/github.com/docker/docker/pkg/pubsub/publisher_test.go @@ -0,0 +1,142 @@ +package pubsub // import "github.com/docker/docker/pkg/pubsub" + +import ( + "fmt" + "testing" + "time" +) + +func TestSendToOneSub(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + c := p.Subscribe() + + p.Publish("hi") + + msg := <-c + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } +} + +func TestSendToMultipleSubs(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + var subs []chan interface{} + subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe()) + + p.Publish("hi") + + for _, c := range subs { + msg := <-c + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } + } +} + +func TestEvictOneSub(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + s1 := p.Subscribe() + s2 := p.Subscribe() + + p.Evict(s1) + p.Publish("hi") + if _, ok := <-s1; ok { + t.Fatal("expected s1 to not receive the published message") + } + + msg := <-s2 + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } +} + +func TestClosePublisher(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + var subs []chan interface{} + subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe()) + p.Close() + + for _, c := range subs { + if _, ok := <-c; ok { + t.Fatal("expected all subscriber channels to be closed") + } + } +} + +const sampleText = "test" + +type testSubscriber struct { + dataCh chan interface{} + ch chan error +} + +func (s *testSubscriber) Wait() error { + return <-s.ch +} + +func newTestSubscriber(p *Publisher) *testSubscriber { + ts := &testSubscriber{ + dataCh: p.Subscribe(), + ch: make(chan error), + } + go func() { + for data := range ts.dataCh { + s, ok := data.(string) + if !ok { + ts.ch <- fmt.Errorf("Unexpected type %T", data) + break + } + if s != sampleText { + ts.ch <- fmt.Errorf("Unexpected text %s", s) + break + } + } + close(ts.ch) + }() + return ts +} + +// for testing with -race +func TestPubSubRace(t *testing.T) { + p := NewPublisher(0, 1024) + var subs []*testSubscriber + for j := 0; j < 50; j++ { + subs = append(subs, newTestSubscriber(p)) + } + for j := 0; j < 1000; j++ { + p.Publish(sampleText) + } + time.AfterFunc(1*time.Second, func() { + for _, s := range subs { + p.Evict(s.dataCh) + } + }) + for _, s := range subs { + s.Wait() + } +} + +func BenchmarkPubSub(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + p := NewPublisher(0, 1024) + var subs []*testSubscriber + for j := 0; j < 50; j++ { + subs = append(subs, newTestSubscriber(p)) + } + b.StartTimer() + for j := 0; j < 1000; j++ { + p.Publish(sampleText) + } + time.AfterFunc(1*time.Second, func() { + for _, s := range subs { + p.Evict(s.dataCh) + } + }) + for _, s := range subs { + if err := s.Wait(); err != nil { + b.Fatal(err) + } + } + } +}