summaryrefslogtreecommitdiffstats
path: root/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/pubsub_test.go
diff options
context:
space:
mode:
Diffstat (limited to 'dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/pubsub_test.go')
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/pubsub_test.go495
1 files changed, 495 insertions, 0 deletions
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/pubsub_test.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/pubsub_test.go
new file mode 100644
index 0000000..2dfa66b
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/pubsub_test.go
@@ -0,0 +1,495 @@
+package redis_test
+
+import (
+ "context"
+ "io"
+ "net"
+ "sync"
+ "time"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/go-redis/redis/v8"
+)
+
+var _ = Describe("PubSub", func() {
+ var client *redis.Client
+ var clientID int64
+
+ BeforeEach(func() {
+ opt := redisOptions()
+ opt.MinIdleConns = 0
+ opt.MaxConnAge = 0
+ opt.OnConnect = func(ctx context.Context, cn *redis.Conn) (err error) {
+ clientID, err = cn.ClientID(ctx).Result()
+ return err
+ }
+ client = redis.NewClient(opt)
+ Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ Expect(client.Close()).NotTo(HaveOccurred())
+ })
+
+ It("implements Stringer", func() {
+ pubsub := client.PSubscribe(ctx, "mychannel*")
+ defer pubsub.Close()
+
+ Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
+ })
+
+ It("should support pattern matching", func() {
+ pubsub := client.PSubscribe(ctx, "mychannel*")
+ defer pubsub.Close()
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ subscr := msgi.(*redis.Subscription)
+ Expect(subscr.Kind).To(Equal("psubscribe"))
+ Expect(subscr.Channel).To(Equal("mychannel*"))
+ Expect(subscr.Count).To(Equal(1))
+ }
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err.(net.Error).Timeout()).To(Equal(true))
+ Expect(msgi).To(BeNil())
+ }
+
+ n, err := client.Publish(ctx, "mychannel1", "hello").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(int64(1)))
+
+ Expect(pubsub.PUnsubscribe(ctx, "mychannel*")).NotTo(HaveOccurred())
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ subscr := msgi.(*redis.Message)
+ Expect(subscr.Channel).To(Equal("mychannel1"))
+ Expect(subscr.Pattern).To(Equal("mychannel*"))
+ Expect(subscr.Payload).To(Equal("hello"))
+ }
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ subscr := msgi.(*redis.Subscription)
+ Expect(subscr.Kind).To(Equal("punsubscribe"))
+ Expect(subscr.Channel).To(Equal("mychannel*"))
+ Expect(subscr.Count).To(Equal(0))
+ }
+
+ stats := client.PoolStats()
+ Expect(stats.Misses).To(Equal(uint32(1)))
+ })
+
+ It("should pub/sub channels", func() {
+ channels, err := client.PubSubChannels(ctx, "mychannel*").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(channels).To(BeEmpty())
+
+ pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
+ defer pubsub.Close()
+
+ channels, err = client.PubSubChannels(ctx, "mychannel*").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
+
+ channels, err = client.PubSubChannels(ctx, "").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(channels).To(BeEmpty())
+
+ channels, err = client.PubSubChannels(ctx, "*").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(len(channels)).To(BeNumerically(">=", 2))
+ })
+
+ It("should return the numbers of subscribers", func() {
+ pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
+ defer pubsub.Close()
+
+ channels, err := client.PubSubNumSub(ctx, "mychannel", "mychannel2", "mychannel3").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(channels).To(Equal(map[string]int64{
+ "mychannel": 1,
+ "mychannel2": 1,
+ "mychannel3": 0,
+ }))
+ })
+
+ It("should return the numbers of subscribers by pattern", func() {
+ num, err := client.PubSubNumPat(ctx).Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(num).To(Equal(int64(0)))
+
+ pubsub := client.PSubscribe(ctx, "*")
+ defer pubsub.Close()
+
+ num, err = client.PubSubNumPat(ctx).Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(num).To(Equal(int64(1)))
+ })
+
+ It("should pub/sub", func() {
+ pubsub := client.Subscribe(ctx, "mychannel", "mychannel2")
+ defer pubsub.Close()
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ subscr := msgi.(*redis.Subscription)
+ Expect(subscr.Kind).To(Equal("subscribe"))
+ Expect(subscr.Channel).To(Equal("mychannel"))
+ Expect(subscr.Count).To(Equal(1))
+ }
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ subscr := msgi.(*redis.Subscription)
+ Expect(subscr.Kind).To(Equal("subscribe"))
+ Expect(subscr.Channel).To(Equal("mychannel2"))
+ Expect(subscr.Count).To(Equal(2))
+ }
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err.(net.Error).Timeout()).To(Equal(true))
+ Expect(msgi).NotTo(HaveOccurred())
+ }
+
+ n, err := client.Publish(ctx, "mychannel", "hello").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(int64(1)))
+
+ n, err = client.Publish(ctx, "mychannel2", "hello2").Result()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(int64(1)))
+
+ Expect(pubsub.Unsubscribe(ctx, "mychannel", "mychannel2")).NotTo(HaveOccurred())
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ msg := msgi.(*redis.Message)
+ Expect(msg.Channel).To(Equal("mychannel"))
+ Expect(msg.Payload).To(Equal("hello"))
+ }
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ msg := msgi.(*redis.Message)
+ Expect(msg.Channel).To(Equal("mychannel2"))
+ Expect(msg.Payload).To(Equal("hello2"))
+ }
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ subscr := msgi.(*redis.Subscription)
+ Expect(subscr.Kind).To(Equal("unsubscribe"))
+ Expect(subscr.Channel).To(Equal("mychannel"))
+ Expect(subscr.Count).To(Equal(1))
+ }
+
+ {
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ subscr := msgi.(*redis.Subscription)
+ Expect(subscr.Kind).To(Equal("unsubscribe"))
+ Expect(subscr.Channel).To(Equal("mychannel2"))
+ Expect(subscr.Count).To(Equal(0))
+ }
+
+ stats := client.PoolStats()
+ Expect(stats.Misses).To(Equal(uint32(1)))
+ })
+
+ It("should ping/pong", func() {
+ pubsub := client.Subscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ _, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+
+ err = pubsub.Ping(ctx, "")
+ Expect(err).NotTo(HaveOccurred())
+
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ pong := msgi.(*redis.Pong)
+ Expect(pong.Payload).To(Equal(""))
+ })
+
+ It("should ping/pong with payload", func() {
+ pubsub := client.Subscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ _, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+
+ err = pubsub.Ping(ctx, "hello")
+ Expect(err).NotTo(HaveOccurred())
+
+ msgi, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ pong := msgi.(*redis.Pong)
+ Expect(pong.Payload).To(Equal("hello"))
+ })
+
+ It("should multi-ReceiveMessage", func() {
+ pubsub := client.Subscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(subscr).To(Equal(&redis.Subscription{
+ Kind: "subscribe",
+ Channel: "mychannel",
+ Count: 1,
+ }))
+
+ err = client.Publish(ctx, "mychannel", "hello").Err()
+ Expect(err).NotTo(HaveOccurred())
+
+ err = client.Publish(ctx, "mychannel", "world").Err()
+ Expect(err).NotTo(HaveOccurred())
+
+ msg, err := pubsub.ReceiveMessage(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(msg.Channel).To(Equal("mychannel"))
+ Expect(msg.Payload).To(Equal("hello"))
+
+ msg, err = pubsub.ReceiveMessage(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(msg.Channel).To(Equal("mychannel"))
+ Expect(msg.Payload).To(Equal("world"))
+ })
+
+ It("returns an error when subscribe fails", func() {
+ pubsub := client.Subscribe(ctx)
+ defer pubsub.Close()
+
+ pubsub.SetNetConn(&badConn{
+ readErr: io.EOF,
+ writeErr: io.EOF,
+ })
+
+ err := pubsub.Subscribe(ctx, "mychannel")
+ Expect(err).To(MatchError("EOF"))
+
+ err = pubsub.Subscribe(ctx, "mychannel")
+ Expect(err).NotTo(HaveOccurred())
+ })
+
+ expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
+ pubsub.SetNetConn(&badConn{
+ readErr: io.EOF,
+ writeErr: io.EOF,
+ })
+
+ step := make(chan struct{}, 3)
+
+ go func() {
+ defer GinkgoRecover()
+
+ Eventually(step).Should(Receive())
+ err := client.Publish(ctx, "mychannel", "hello").Err()
+ Expect(err).NotTo(HaveOccurred())
+ step <- struct{}{}
+ }()
+
+ _, err := pubsub.ReceiveMessage(ctx)
+ Expect(err).To(Equal(io.EOF))
+ step <- struct{}{}
+
+ msg, err := pubsub.ReceiveMessage(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(msg.Channel).To(Equal("mychannel"))
+ Expect(msg.Payload).To(Equal("hello"))
+
+ Eventually(step).Should(Receive())
+ }
+
+ It("Subscribe should reconnect on ReceiveMessage error", func() {
+ pubsub := client.Subscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(subscr).To(Equal(&redis.Subscription{
+ Kind: "subscribe",
+ Channel: "mychannel",
+ Count: 1,
+ }))
+
+ expectReceiveMessageOnError(pubsub)
+ })
+
+ It("PSubscribe should reconnect on ReceiveMessage error", func() {
+ pubsub := client.PSubscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ subscr, err := pubsub.ReceiveTimeout(ctx, time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(subscr).To(Equal(&redis.Subscription{
+ Kind: "psubscribe",
+ Channel: "mychannel",
+ Count: 1,
+ }))
+
+ expectReceiveMessageOnError(pubsub)
+ })
+
+ It("should return on Close", func() {
+ pubsub := client.Subscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer GinkgoRecover()
+
+ wg.Done()
+ defer wg.Done()
+
+ _, err := pubsub.ReceiveMessage(ctx)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(SatisfyAny(
+ Equal("redis: client is closed"),
+ ContainSubstring("use of closed network connection"),
+ ))
+ }()
+
+ wg.Wait()
+ wg.Add(1)
+
+ Expect(pubsub.Close()).NotTo(HaveOccurred())
+
+ wg.Wait()
+ })
+
+ It("should ReceiveMessage without a subscription", func() {
+ timeout := 100 * time.Millisecond
+
+ pubsub := client.Subscribe(ctx)
+ defer pubsub.Close()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer GinkgoRecover()
+ defer wg.Done()
+
+ time.Sleep(timeout)
+
+ err := pubsub.Subscribe(ctx, "mychannel")
+ Expect(err).NotTo(HaveOccurred())
+
+ time.Sleep(timeout)
+
+ err = client.Publish(ctx, "mychannel", "hello").Err()
+ Expect(err).NotTo(HaveOccurred())
+ }()
+
+ msg, err := pubsub.ReceiveMessage(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(msg.Channel).To(Equal("mychannel"))
+ Expect(msg.Payload).To(Equal("hello"))
+
+ wg.Wait()
+ })
+
+ It("handles big message payload", func() {
+ pubsub := client.Subscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ ch := pubsub.Channel()
+
+ bigVal := bigVal()
+ err := client.Publish(ctx, "mychannel", bigVal).Err()
+ Expect(err).NotTo(HaveOccurred())
+
+ var msg *redis.Message
+ Eventually(ch).Should(Receive(&msg))
+ Expect(msg.Channel).To(Equal("mychannel"))
+ Expect(msg.Payload).To(Equal(string(bigVal)))
+ })
+
+ It("handles message payload slice with server-assisted client-size caching", func() {
+ pubsub := client.Subscribe(ctx, "__redis__:invalidate")
+ defer pubsub.Close()
+
+ client2 := redis.NewClient(redisOptions())
+ defer client2.Close()
+
+ err := client2.Do(ctx, "CLIENT", "TRACKING", "on", "REDIRECT", clientID).Err()
+ Expect(err).NotTo(HaveOccurred())
+
+ err = client2.Do(ctx, "GET", "mykey").Err()
+ Expect(err).To(Equal(redis.Nil))
+
+ err = client2.Do(ctx, "SET", "mykey", "myvalue").Err()
+ Expect(err).NotTo(HaveOccurred())
+
+ ch := pubsub.Channel()
+
+ var msg *redis.Message
+ Eventually(ch).Should(Receive(&msg))
+ Expect(msg.Channel).To(Equal("__redis__:invalidate"))
+ Expect(msg.PayloadSlice).To(Equal([]string{"mykey"}))
+ })
+
+ It("supports concurrent Ping and Receive", func() {
+ const N = 100
+
+ pubsub := client.Subscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ done := make(chan struct{})
+ go func() {
+ defer GinkgoRecover()
+
+ for i := 0; i < N; i++ {
+ _, err := pubsub.ReceiveTimeout(ctx, 5*time.Second)
+ Expect(err).NotTo(HaveOccurred())
+ }
+ close(done)
+ }()
+
+ for i := 0; i < N; i++ {
+ err := pubsub.Ping(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ }
+
+ select {
+ case <-done:
+ case <-time.After(30 * time.Second):
+ Fail("timeout")
+ }
+ })
+
+ It("should ChannelMessage", func() {
+ pubsub := client.Subscribe(ctx, "mychannel")
+ defer pubsub.Close()
+
+ ch := pubsub.Channel(
+ redis.WithChannelSize(10),
+ redis.WithChannelHealthCheckInterval(time.Second),
+ )
+
+ text := "test channel message"
+ err := client.Publish(ctx, "mychannel", text).Err()
+ Expect(err).NotTo(HaveOccurred())
+
+ var msg *redis.Message
+ Eventually(ch).Should(Receive(&msg))
+ Expect(msg.Channel).To(Equal("mychannel"))
+ Expect(msg.Payload).To(Equal(text))
+ })
+})