summaryrefslogtreecommitdiffstats
path: root/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool
diff options
context:
space:
mode:
Diffstat (limited to 'dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool')
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/bench_test.go97
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/conn.go121
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/export_test.go9
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/main_test.go36
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool.go557
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_single.go58
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_sticky.go201
-rw-r--r--dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_test.go458
8 files changed, 1537 insertions, 0 deletions
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/bench_test.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/bench_test.go
new file mode 100644
index 0000000..dec5d3f
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/bench_test.go
@@ -0,0 +1,97 @@
+package pool_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/go-redis/redis/v8/internal/pool"
+)
+
+type poolGetPutBenchmark struct {
+ poolSize int
+}
+
+func (bm poolGetPutBenchmark) String() string {
+ return fmt.Sprintf("pool=%d", bm.poolSize)
+}
+
+func BenchmarkPoolGetPut(b *testing.B) {
+ ctx := context.Background()
+ benchmarks := []poolGetPutBenchmark{
+ {1},
+ {2},
+ {8},
+ {32},
+ {64},
+ {128},
+ }
+ for _, bm := range benchmarks {
+ b.Run(bm.String(), func(b *testing.B) {
+ connPool := pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: bm.poolSize,
+ PoolTimeout: time.Second,
+ IdleTimeout: time.Hour,
+ IdleCheckFrequency: time.Hour,
+ })
+
+ b.ResetTimer()
+
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ cn, err := connPool.Get(ctx)
+ if err != nil {
+ b.Fatal(err)
+ }
+ connPool.Put(ctx, cn)
+ }
+ })
+ })
+ }
+}
+
+type poolGetRemoveBenchmark struct {
+ poolSize int
+}
+
+func (bm poolGetRemoveBenchmark) String() string {
+ return fmt.Sprintf("pool=%d", bm.poolSize)
+}
+
+func BenchmarkPoolGetRemove(b *testing.B) {
+ ctx := context.Background()
+ benchmarks := []poolGetRemoveBenchmark{
+ {1},
+ {2},
+ {8},
+ {32},
+ {64},
+ {128},
+ }
+
+ for _, bm := range benchmarks {
+ b.Run(bm.String(), func(b *testing.B) {
+ connPool := pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: bm.poolSize,
+ PoolTimeout: time.Second,
+ IdleTimeout: time.Hour,
+ IdleCheckFrequency: time.Hour,
+ })
+
+ b.ResetTimer()
+
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ cn, err := connPool.Get(ctx)
+ if err != nil {
+ b.Fatal(err)
+ }
+ connPool.Remove(ctx, cn, nil)
+ }
+ })
+ })
+ }
+}
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/conn.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/conn.go
new file mode 100644
index 0000000..5661659
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/conn.go
@@ -0,0 +1,121 @@
+package pool
+
+import (
+ "bufio"
+ "context"
+ "net"
+ "sync/atomic"
+ "time"
+
+ "github.com/go-redis/redis/v8/internal/proto"
+)
+
+var noDeadline = time.Time{}
+
+type Conn struct {
+ usedAt int64 // atomic
+ netConn net.Conn
+
+ rd *proto.Reader
+ bw *bufio.Writer
+ wr *proto.Writer
+
+ Inited bool
+ pooled bool
+ createdAt time.Time
+}
+
+func NewConn(netConn net.Conn) *Conn {
+ cn := &Conn{
+ netConn: netConn,
+ createdAt: time.Now(),
+ }
+ cn.rd = proto.NewReader(netConn)
+ cn.bw = bufio.NewWriter(netConn)
+ cn.wr = proto.NewWriter(cn.bw)
+ cn.SetUsedAt(time.Now())
+ return cn
+}
+
+func (cn *Conn) UsedAt() time.Time {
+ unix := atomic.LoadInt64(&cn.usedAt)
+ return time.Unix(unix, 0)
+}
+
+func (cn *Conn) SetUsedAt(tm time.Time) {
+ atomic.StoreInt64(&cn.usedAt, tm.Unix())
+}
+
+func (cn *Conn) SetNetConn(netConn net.Conn) {
+ cn.netConn = netConn
+ cn.rd.Reset(netConn)
+ cn.bw.Reset(netConn)
+}
+
+func (cn *Conn) Write(b []byte) (int, error) {
+ return cn.netConn.Write(b)
+}
+
+func (cn *Conn) RemoteAddr() net.Addr {
+ if cn.netConn != nil {
+ return cn.netConn.RemoteAddr()
+ }
+ return nil
+}
+
+func (cn *Conn) WithReader(ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error) error {
+ if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
+ return err
+ }
+ return fn(cn.rd)
+}
+
+func (cn *Conn) WithWriter(
+ ctx context.Context, timeout time.Duration, fn func(wr *proto.Writer) error,
+) error {
+ if err := cn.netConn.SetWriteDeadline(cn.deadline(ctx, timeout)); err != nil {
+ return err
+ }
+
+ if cn.bw.Buffered() > 0 {
+ cn.bw.Reset(cn.netConn)
+ }
+
+ if err := fn(cn.wr); err != nil {
+ return err
+ }
+
+ return cn.bw.Flush()
+}
+
+func (cn *Conn) Close() error {
+ return cn.netConn.Close()
+}
+
+func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time {
+ tm := time.Now()
+ cn.SetUsedAt(tm)
+
+ if timeout > 0 {
+ tm = tm.Add(timeout)
+ }
+
+ if ctx != nil {
+ deadline, ok := ctx.Deadline()
+ if ok {
+ if timeout == 0 {
+ return deadline
+ }
+ if deadline.Before(tm) {
+ return deadline
+ }
+ return tm
+ }
+ }
+
+ if timeout > 0 {
+ return tm
+ }
+
+ return noDeadline
+}
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/export_test.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/export_test.go
new file mode 100644
index 0000000..75dd4ad
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/export_test.go
@@ -0,0 +1,9 @@
+package pool
+
+import (
+ "time"
+)
+
+func (cn *Conn) SetCreatedAt(tm time.Time) {
+ cn.createdAt = tm
+}
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/main_test.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/main_test.go
new file mode 100644
index 0000000..2365dbc
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/main_test.go
@@ -0,0 +1,36 @@
+package pool_test
+
+import (
+ "context"
+ "net"
+ "sync"
+ "testing"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestGinkgoSuite(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "pool")
+}
+
+func perform(n int, cbs ...func(int)) {
+ var wg sync.WaitGroup
+ for _, cb := range cbs {
+ for i := 0; i < n; i++ {
+ wg.Add(1)
+ go func(cb func(int), i int) {
+ defer GinkgoRecover()
+ defer wg.Done()
+
+ cb(i)
+ }(cb, i)
+ }
+ }
+ wg.Wait()
+}
+
+func dummyDialer(context.Context) (net.Conn, error) {
+ return &net.TCPConn{}, nil
+}
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool.go
new file mode 100644
index 0000000..44a4e77
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool.go
@@ -0,0 +1,557 @@
+package pool
+
+import (
+ "context"
+ "errors"
+ "net"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/go-redis/redis/v8/internal"
+)
+
+var (
+ // ErrClosed performs any operation on the closed client will return this error.
+ ErrClosed = errors.New("redis: client is closed")
+
+ // ErrPoolTimeout timed out waiting to get a connection from the connection pool.
+ ErrPoolTimeout = errors.New("redis: connection pool timeout")
+)
+
+var timers = sync.Pool{
+ New: func() interface{} {
+ t := time.NewTimer(time.Hour)
+ t.Stop()
+ return t
+ },
+}
+
+// Stats contains pool state information and accumulated stats.
+type Stats struct {
+ Hits uint32 // number of times free connection was found in the pool
+ Misses uint32 // number of times free connection was NOT found in the pool
+ Timeouts uint32 // number of times a wait timeout occurred
+
+ TotalConns uint32 // number of total connections in the pool
+ IdleConns uint32 // number of idle connections in the pool
+ StaleConns uint32 // number of stale connections removed from the pool
+}
+
+type Pooler interface {
+ NewConn(context.Context) (*Conn, error)
+ CloseConn(*Conn) error
+
+ Get(context.Context) (*Conn, error)
+ Put(context.Context, *Conn)
+ Remove(context.Context, *Conn, error)
+
+ Len() int
+ IdleLen() int
+ Stats() *Stats
+
+ Close() error
+}
+
+type Options struct {
+ Dialer func(context.Context) (net.Conn, error)
+ OnClose func(*Conn) error
+
+ PoolFIFO bool
+ PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
+ PoolTimeout time.Duration
+ IdleTimeout time.Duration
+ IdleCheckFrequency time.Duration
+}
+
+type lastDialErrorWrap struct {
+ err error
+}
+
+type ConnPool struct {
+ opt *Options
+
+ dialErrorsNum uint32 // atomic
+
+ lastDialError atomic.Value
+
+ queue chan struct{}
+
+ connsMu sync.Mutex
+ conns []*Conn
+ idleConns []*Conn
+ poolSize int
+ idleConnsLen int
+
+ stats Stats
+
+ _closed uint32 // atomic
+ closedCh chan struct{}
+}
+
+var _ Pooler = (*ConnPool)(nil)
+
+func NewConnPool(opt *Options) *ConnPool {
+ p := &ConnPool{
+ opt: opt,
+
+ queue: make(chan struct{}, opt.PoolSize),
+ conns: make([]*Conn, 0, opt.PoolSize),
+ idleConns: make([]*Conn, 0, opt.PoolSize),
+ closedCh: make(chan struct{}),
+ }
+
+ p.connsMu.Lock()
+ p.checkMinIdleConns()
+ p.connsMu.Unlock()
+
+ if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
+ go p.reaper(opt.IdleCheckFrequency)
+ }
+
+ return p
+}
+
+func (p *ConnPool) checkMinIdleConns() {
+ if p.opt.MinIdleConns == 0 {
+ return
+ }
+ for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
+ p.poolSize++
+ p.idleConnsLen++
+
+ go func() {
+ err := p.addIdleConn()
+ if err != nil && err != ErrClosed {
+ p.connsMu.Lock()
+ p.poolSize--
+ p.idleConnsLen--
+ p.connsMu.Unlock()
+ }
+ }()
+ }
+}
+
+func (p *ConnPool) addIdleConn() error {
+ cn, err := p.dialConn(context.TODO(), true)
+ if err != nil {
+ return err
+ }
+
+ p.connsMu.Lock()
+ defer p.connsMu.Unlock()
+
+ // It is not allowed to add new connections to the closed connection pool.
+ if p.closed() {
+ _ = cn.Close()
+ return ErrClosed
+ }
+
+ p.conns = append(p.conns, cn)
+ p.idleConns = append(p.idleConns, cn)
+ return nil
+}
+
+func (p *ConnPool) NewConn(ctx context.Context) (*Conn, error) {
+ return p.newConn(ctx, false)
+}
+
+func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
+ cn, err := p.dialConn(ctx, pooled)
+ if err != nil {
+ return nil, err
+ }
+
+ p.connsMu.Lock()
+ defer p.connsMu.Unlock()
+
+ // It is not allowed to add new connections to the closed connection pool.
+ if p.closed() {
+ _ = cn.Close()
+ return nil, ErrClosed
+ }
+
+ p.conns = append(p.conns, cn)
+ if pooled {
+ // If pool is full remove the cn on next Put.
+ if p.poolSize >= p.opt.PoolSize {
+ cn.pooled = false
+ } else {
+ p.poolSize++
+ }
+ }
+
+ return cn, nil
+}
+
+func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
+ if p.closed() {
+ return nil, ErrClosed
+ }
+
+ if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
+ return nil, p.getLastDialError()
+ }
+
+ netConn, err := p.opt.Dialer(ctx)
+ if err != nil {
+ p.setLastDialError(err)
+ if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
+ go p.tryDial()
+ }
+ return nil, err
+ }
+
+ cn := NewConn(netConn)
+ cn.pooled = pooled
+ return cn, nil
+}
+
+func (p *ConnPool) tryDial() {
+ for {
+ if p.closed() {
+ return
+ }
+
+ conn, err := p.opt.Dialer(context.Background())
+ if err != nil {
+ p.setLastDialError(err)
+ time.Sleep(time.Second)
+ continue
+ }
+
+ atomic.StoreUint32(&p.dialErrorsNum, 0)
+ _ = conn.Close()
+ return
+ }
+}
+
+func (p *ConnPool) setLastDialError(err error) {
+ p.lastDialError.Store(&lastDialErrorWrap{err: err})
+}
+
+func (p *ConnPool) getLastDialError() error {
+ err, _ := p.lastDialError.Load().(*lastDialErrorWrap)
+ if err != nil {
+ return err.err
+ }
+ return nil
+}
+
+// Get returns existed connection from the pool or creates a new one.
+func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
+ if p.closed() {
+ return nil, ErrClosed
+ }
+
+ if err := p.waitTurn(ctx); err != nil {
+ return nil, err
+ }
+
+ for {
+ p.connsMu.Lock()
+ cn, err := p.popIdle()
+ p.connsMu.Unlock()
+
+ if err != nil {
+ return nil, err
+ }
+
+ if cn == nil {
+ break
+ }
+
+ if p.isStaleConn(cn) {
+ _ = p.CloseConn(cn)
+ continue
+ }
+
+ atomic.AddUint32(&p.stats.Hits, 1)
+ return cn, nil
+ }
+
+ atomic.AddUint32(&p.stats.Misses, 1)
+
+ newcn, err := p.newConn(ctx, true)
+ if err != nil {
+ p.freeTurn()
+ return nil, err
+ }
+
+ return newcn, nil
+}
+
+func (p *ConnPool) getTurn() {
+ p.queue <- struct{}{}
+}
+
+func (p *ConnPool) waitTurn(ctx context.Context) error {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ select {
+ case p.queue <- struct{}{}:
+ return nil
+ default:
+ }
+
+ timer := timers.Get().(*time.Timer)
+ timer.Reset(p.opt.PoolTimeout)
+
+ select {
+ case <-ctx.Done():
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timers.Put(timer)
+ return ctx.Err()
+ case p.queue <- struct{}{}:
+ if !timer.Stop() {
+ <-timer.C
+ }
+ timers.Put(timer)
+ return nil
+ case <-timer.C:
+ timers.Put(timer)
+ atomic.AddUint32(&p.stats.Timeouts, 1)
+ return ErrPoolTimeout
+ }
+}
+
+func (p *ConnPool) freeTurn() {
+ <-p.queue
+}
+
+func (p *ConnPool) popIdle() (*Conn, error) {
+ if p.closed() {
+ return nil, ErrClosed
+ }
+ n := len(p.idleConns)
+ if n == 0 {
+ return nil, nil
+ }
+
+ var cn *Conn
+ if p.opt.PoolFIFO {
+ cn = p.idleConns[0]
+ copy(p.idleConns, p.idleConns[1:])
+ p.idleConns = p.idleConns[:n-1]
+ } else {
+ idx := n - 1
+ cn = p.idleConns[idx]
+ p.idleConns = p.idleConns[:idx]
+ }
+ p.idleConnsLen--
+ p.checkMinIdleConns()
+ return cn, nil
+}
+
+func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
+ if cn.rd.Buffered() > 0 {
+ internal.Logger.Printf(ctx, "Conn has unread data")
+ p.Remove(ctx, cn, BadConnError{})
+ return
+ }
+
+ if !cn.pooled {
+ p.Remove(ctx, cn, nil)
+ return
+ }
+
+ p.connsMu.Lock()
+ p.idleConns = append(p.idleConns, cn)
+ p.idleConnsLen++
+ p.connsMu.Unlock()
+ p.freeTurn()
+}
+
+func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
+ p.removeConnWithLock(cn)
+ p.freeTurn()
+ _ = p.closeConn(cn)
+}
+
+func (p *ConnPool) CloseConn(cn *Conn) error {
+ p.removeConnWithLock(cn)
+ return p.closeConn(cn)
+}
+
+func (p *ConnPool) removeConnWithLock(cn *Conn) {
+ p.connsMu.Lock()
+ p.removeConn(cn)
+ p.connsMu.Unlock()
+}
+
+func (p *ConnPool) removeConn(cn *Conn) {
+ for i, c := range p.conns {
+ if c == cn {
+ p.conns = append(p.conns[:i], p.conns[i+1:]...)
+ if cn.pooled {
+ p.poolSize--
+ p.checkMinIdleConns()
+ }
+ return
+ }
+ }
+}
+
+func (p *ConnPool) closeConn(cn *Conn) error {
+ if p.opt.OnClose != nil {
+ _ = p.opt.OnClose(cn)
+ }
+ return cn.Close()
+}
+
+// Len returns total number of connections.
+func (p *ConnPool) Len() int {
+ p.connsMu.Lock()
+ n := len(p.conns)
+ p.connsMu.Unlock()
+ return n
+}
+
+// IdleLen returns number of idle connections.
+func (p *ConnPool) IdleLen() int {
+ p.connsMu.Lock()
+ n := p.idleConnsLen
+ p.connsMu.Unlock()
+ return n
+}
+
+func (p *ConnPool) Stats() *Stats {
+ idleLen := p.IdleLen()
+ return &Stats{
+ Hits: atomic.LoadUint32(&p.stats.Hits),
+ Misses: atomic.LoadUint32(&p.stats.Misses),
+ Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
+
+ TotalConns: uint32(p.Len()),
+ IdleConns: uint32(idleLen),
+ StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
+ }
+}
+
+func (p *ConnPool) closed() bool {
+ return atomic.LoadUint32(&p._closed) == 1
+}
+
+func (p *ConnPool) Filter(fn func(*Conn) bool) error {
+ p.connsMu.Lock()
+ defer p.connsMu.Unlock()
+
+ var firstErr error
+ for _, cn := range p.conns {
+ if fn(cn) {
+ if err := p.closeConn(cn); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ }
+ return firstErr
+}
+
+func (p *ConnPool) Close() error {
+ if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
+ return ErrClosed
+ }
+ close(p.closedCh)
+
+ var firstErr error
+ p.connsMu.Lock()
+ for _, cn := range p.conns {
+ if err := p.closeConn(cn); err != nil && firstErr == nil {
+ firstErr = err
+ }
+ }
+ p.conns = nil
+ p.poolSize = 0
+ p.idleConns = nil
+ p.idleConnsLen = 0
+ p.connsMu.Unlock()
+
+ return firstErr
+}
+
+func (p *ConnPool) reaper(frequency time.Duration) {
+ ticker := time.NewTicker(frequency)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ // It is possible that ticker and closedCh arrive together,
+ // and select pseudo-randomly pick ticker case, we double
+ // check here to prevent being executed after closed.
+ if p.closed() {
+ return
+ }
+ _, err := p.ReapStaleConns()
+ if err != nil {
+ internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err)
+ continue
+ }
+ case <-p.closedCh:
+ return
+ }
+ }
+}
+
+func (p *ConnPool) ReapStaleConns() (int, error) {
+ var n int
+ for {
+ p.getTurn()
+
+ p.connsMu.Lock()
+ cn := p.reapStaleConn()
+ p.connsMu.Unlock()
+
+ p.freeTurn()
+
+ if cn != nil {
+ _ = p.closeConn(cn)
+ n++
+ } else {
+ break
+ }
+ }
+ atomic.AddUint32(&p.stats.StaleConns, uint32(n))
+ return n, nil
+}
+
+func (p *ConnPool) reapStaleConn() *Conn {
+ if len(p.idleConns) == 0 {
+ return nil
+ }
+
+ cn := p.idleConns[0]
+ if !p.isStaleConn(cn) {
+ return nil
+ }
+
+ p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
+ p.idleConnsLen--
+ p.removeConn(cn)
+
+ return cn
+}
+
+func (p *ConnPool) isStaleConn(cn *Conn) bool {
+ if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
+ return false
+ }
+
+ now := time.Now()
+ if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
+ return true
+ }
+ if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {
+ return true
+ }
+
+ return false
+}
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_single.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_single.go
new file mode 100644
index 0000000..5a3fde1
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_single.go
@@ -0,0 +1,58 @@
+package pool
+
+import "context"
+
+type SingleConnPool struct {
+ pool Pooler
+ cn *Conn
+ stickyErr error
+}
+
+var _ Pooler = (*SingleConnPool)(nil)
+
+func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool {
+ return &SingleConnPool{
+ pool: pool,
+ cn: cn,
+ }
+}
+
+func (p *SingleConnPool) NewConn(ctx context.Context) (*Conn, error) {
+ return p.pool.NewConn(ctx)
+}
+
+func (p *SingleConnPool) CloseConn(cn *Conn) error {
+ return p.pool.CloseConn(cn)
+}
+
+func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) {
+ if p.stickyErr != nil {
+ return nil, p.stickyErr
+ }
+ return p.cn, nil
+}
+
+func (p *SingleConnPool) Put(ctx context.Context, cn *Conn) {}
+
+func (p *SingleConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
+ p.cn = nil
+ p.stickyErr = reason
+}
+
+func (p *SingleConnPool) Close() error {
+ p.cn = nil
+ p.stickyErr = ErrClosed
+ return nil
+}
+
+func (p *SingleConnPool) Len() int {
+ return 0
+}
+
+func (p *SingleConnPool) IdleLen() int {
+ return 0
+}
+
+func (p *SingleConnPool) Stats() *Stats {
+ return &Stats{}
+}
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_sticky.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_sticky.go
new file mode 100644
index 0000000..3adb99b
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_sticky.go
@@ -0,0 +1,201 @@
+package pool
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync/atomic"
+)
+
+const (
+ stateDefault = 0
+ stateInited = 1
+ stateClosed = 2
+)
+
+type BadConnError struct {
+ wrapped error
+}
+
+var _ error = (*BadConnError)(nil)
+
+func (e BadConnError) Error() string {
+ s := "redis: Conn is in a bad state"
+ if e.wrapped != nil {
+ s += ": " + e.wrapped.Error()
+ }
+ return s
+}
+
+func (e BadConnError) Unwrap() error {
+ return e.wrapped
+}
+
+//------------------------------------------------------------------------------
+
+type StickyConnPool struct {
+ pool Pooler
+ shared int32 // atomic
+
+ state uint32 // atomic
+ ch chan *Conn
+
+ _badConnError atomic.Value
+}
+
+var _ Pooler = (*StickyConnPool)(nil)
+
+func NewStickyConnPool(pool Pooler) *StickyConnPool {
+ p, ok := pool.(*StickyConnPool)
+ if !ok {
+ p = &StickyConnPool{
+ pool: pool,
+ ch: make(chan *Conn, 1),
+ }
+ }
+ atomic.AddInt32(&p.shared, 1)
+ return p
+}
+
+func (p *StickyConnPool) NewConn(ctx context.Context) (*Conn, error) {
+ return p.pool.NewConn(ctx)
+}
+
+func (p *StickyConnPool) CloseConn(cn *Conn) error {
+ return p.pool.CloseConn(cn)
+}
+
+func (p *StickyConnPool) Get(ctx context.Context) (*Conn, error) {
+ // In worst case this races with Close which is not a very common operation.
+ for i := 0; i < 1000; i++ {
+ switch atomic.LoadUint32(&p.state) {
+ case stateDefault:
+ cn, err := p.pool.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
+ return cn, nil
+ }
+ p.pool.Remove(ctx, cn, ErrClosed)
+ case stateInited:
+ if err := p.badConnError(); err != nil {
+ return nil, err
+ }
+ cn, ok := <-p.ch
+ if !ok {
+ return nil, ErrClosed
+ }
+ return cn, nil
+ case stateClosed:
+ return nil, ErrClosed
+ default:
+ panic("not reached")
+ }
+ }
+ return nil, fmt.Errorf("redis: StickyConnPool.Get: infinite loop")
+}
+
+func (p *StickyConnPool) Put(ctx context.Context, cn *Conn) {
+ defer func() {
+ if recover() != nil {
+ p.freeConn(ctx, cn)
+ }
+ }()
+ p.ch <- cn
+}
+
+func (p *StickyConnPool) freeConn(ctx context.Context, cn *Conn) {
+ if err := p.badConnError(); err != nil {
+ p.pool.Remove(ctx, cn, err)
+ } else {
+ p.pool.Put(ctx, cn)
+ }
+}
+
+func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
+ defer func() {
+ if recover() != nil {
+ p.pool.Remove(ctx, cn, ErrClosed)
+ }
+ }()
+ p._badConnError.Store(BadConnError{wrapped: reason})
+ p.ch <- cn
+}
+
+func (p *StickyConnPool) Close() error {
+ if shared := atomic.AddInt32(&p.shared, -1); shared > 0 {
+ return nil
+ }
+
+ for i := 0; i < 1000; i++ {
+ state := atomic.LoadUint32(&p.state)
+ if state == stateClosed {
+ return ErrClosed
+ }
+ if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
+ close(p.ch)
+ cn, ok := <-p.ch
+ if ok {
+ p.freeConn(context.TODO(), cn)
+ }
+ return nil
+ }
+ }
+
+ return errors.New("redis: StickyConnPool.Close: infinite loop")
+}
+
+func (p *StickyConnPool) Reset(ctx context.Context) error {
+ if p.badConnError() == nil {
+ return nil
+ }
+
+ select {
+ case cn, ok := <-p.ch:
+ if !ok {
+ return ErrClosed
+ }
+ p.pool.Remove(ctx, cn, ErrClosed)
+ p._badConnError.Store(BadConnError{wrapped: nil})
+ default:
+ return errors.New("redis: StickyConnPool does not have a Conn")
+ }
+
+ if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
+ state := atomic.LoadUint32(&p.state)
+ return fmt.Errorf("redis: invalid StickyConnPool state: %d", state)
+ }
+
+ return nil
+}
+
+func (p *StickyConnPool) badConnError() error {
+ if v := p._badConnError.Load(); v != nil {
+ if err := v.(BadConnError); err.wrapped != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (p *StickyConnPool) Len() int {
+ switch atomic.LoadUint32(&p.state) {
+ case stateDefault:
+ return 0
+ case stateInited:
+ return 1
+ case stateClosed:
+ return 0
+ default:
+ panic("not reached")
+ }
+}
+
+func (p *StickyConnPool) IdleLen() int {
+ return len(p.ch)
+}
+
+func (p *StickyConnPool) Stats() *Stats {
+ return &Stats{}
+}
diff --git a/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_test.go b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_test.go
new file mode 100644
index 0000000..423a783
--- /dev/null
+++ b/dependencies/pkg/mod/github.com/go-redis/redis/v8@v8.11.5/internal/pool/pool_test.go
@@ -0,0 +1,458 @@
+package pool_test
+
+import (
+ "context"
+ "net"
+ "sync"
+ "testing"
+ "time"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/go-redis/redis/v8/internal/pool"
+)
+
+var _ = Describe("ConnPool", func() {
+ ctx := context.Background()
+ var connPool *pool.ConnPool
+
+ BeforeEach(func() {
+ connPool = pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: 10,
+ PoolTimeout: time.Hour,
+ IdleTimeout: time.Millisecond,
+ IdleCheckFrequency: time.Millisecond,
+ })
+ })
+
+ AfterEach(func() {
+ connPool.Close()
+ })
+
+ It("should safe close", func() {
+ const minIdleConns = 10
+
+ var (
+ wg sync.WaitGroup
+ closedChan = make(chan struct{})
+ )
+ wg.Add(minIdleConns)
+ connPool = pool.NewConnPool(&pool.Options{
+ Dialer: func(ctx context.Context) (net.Conn, error) {
+ wg.Done()
+ <-closedChan
+ return &net.TCPConn{}, nil
+ },
+ PoolSize: 10,
+ PoolTimeout: time.Hour,
+ IdleTimeout: time.Millisecond,
+ IdleCheckFrequency: time.Millisecond,
+ MinIdleConns: minIdleConns,
+ })
+ wg.Wait()
+ Expect(connPool.Close()).NotTo(HaveOccurred())
+ close(closedChan)
+
+ // We wait for 1 second and believe that checkMinIdleConns has been executed.
+ time.Sleep(time.Second)
+
+ Expect(connPool.Stats()).To(Equal(&pool.Stats{
+ Hits: 0,
+ Misses: 0,
+ Timeouts: 0,
+ TotalConns: 0,
+ IdleConns: 0,
+ StaleConns: 0,
+ }))
+ })
+
+ It("should unblock client when conn is removed", func() {
+ // Reserve one connection.
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+
+ // Reserve all other connections.
+ var cns []*pool.Conn
+ for i := 0; i < 9; i++ {
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ cns = append(cns, cn)
+ }
+
+ started := make(chan bool, 1)
+ done := make(chan bool, 1)
+ go func() {
+ defer GinkgoRecover()
+
+ started <- true
+ _, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ done <- true
+
+ connPool.Put(ctx, cn)
+ }()
+ <-started
+
+ // Check that Get is blocked.
+ select {
+ case <-done:
+ Fail("Get is not blocked")
+ case <-time.After(time.Millisecond):
+ // ok
+ }
+
+ connPool.Remove(ctx, cn, nil)
+
+ // Check that Get is unblocked.
+ select {
+ case <-done:
+ // ok
+ case <-time.After(time.Second):
+ Fail("Get is not unblocked")
+ }
+
+ for _, cn := range cns {
+ connPool.Put(ctx, cn)
+ }
+ })
+})
+
+var _ = Describe("MinIdleConns", func() {
+ const poolSize = 100
+ ctx := context.Background()
+ var minIdleConns int
+ var connPool *pool.ConnPool
+
+ newConnPool := func() *pool.ConnPool {
+ connPool := pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: poolSize,
+ MinIdleConns: minIdleConns,
+ PoolTimeout: 100 * time.Millisecond,
+ IdleTimeout: -1,
+ IdleCheckFrequency: -1,
+ })
+ Eventually(func() int {
+ return connPool.Len()
+ }).Should(Equal(minIdleConns))
+ return connPool
+ }
+
+ assert := func() {
+ It("has idle connections when created", func() {
+ Expect(connPool.Len()).To(Equal(minIdleConns))
+ Expect(connPool.IdleLen()).To(Equal(minIdleConns))
+ })
+
+ Context("after Get", func() {
+ var cn *pool.Conn
+
+ BeforeEach(func() {
+ var err error
+ cn, err = connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+
+ Eventually(func() int {
+ return connPool.Len()
+ }).Should(Equal(minIdleConns + 1))
+ })
+
+ It("has idle connections", func() {
+ Expect(connPool.Len()).To(Equal(minIdleConns + 1))
+ Expect(connPool.IdleLen()).To(Equal(minIdleConns))
+ })
+
+ Context("after Remove", func() {
+ BeforeEach(func() {
+ connPool.Remove(ctx, cn, nil)
+ })
+
+ It("has idle connections", func() {
+ Expect(connPool.Len()).To(Equal(minIdleConns))
+ Expect(connPool.IdleLen()).To(Equal(minIdleConns))
+ })
+ })
+ })
+
+ Describe("Get does not exceed pool size", func() {
+ var mu sync.RWMutex
+ var cns []*pool.Conn
+
+ BeforeEach(func() {
+ cns = make([]*pool.Conn, 0)
+
+ perform(poolSize, func(_ int) {
+ defer GinkgoRecover()
+
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ mu.Lock()
+ cns = append(cns, cn)
+ mu.Unlock()
+ })
+
+ Eventually(func() int {
+ return connPool.Len()
+ }).Should(BeNumerically(">=", poolSize))
+ })
+
+ It("Get is blocked", func() {
+ done := make(chan struct{})
+ go func() {
+ connPool.Get(ctx)
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ Fail("Get is not blocked")
+ case <-time.After(time.Millisecond):
+ // ok
+ }
+
+ select {
+ case <-done:
+ // ok
+ case <-time.After(time.Second):
+ Fail("Get is not unblocked")
+ }
+ })
+
+ Context("after Put", func() {
+ BeforeEach(func() {
+ perform(len(cns), func(i int) {
+ mu.RLock()
+ connPool.Put(ctx, cns[i])
+ mu.RUnlock()
+ })
+
+ Eventually(func() int {
+ return connPool.Len()
+ }).Should(Equal(poolSize))
+ })
+
+ It("pool.Len is back to normal", func() {
+ Expect(connPool.Len()).To(Equal(poolSize))
+ Expect(connPool.IdleLen()).To(Equal(poolSize))
+ })
+ })
+
+ Context("after Remove", func() {
+ BeforeEach(func() {
+ perform(len(cns), func(i int) {
+ mu.RLock()
+ connPool.Remove(ctx, cns[i], nil)
+ mu.RUnlock()
+ })
+
+ Eventually(func() int {
+ return connPool.Len()
+ }).Should(Equal(minIdleConns))
+ })
+
+ It("has idle connections", func() {
+ Expect(connPool.Len()).To(Equal(minIdleConns))
+ Expect(connPool.IdleLen()).To(Equal(minIdleConns))
+ })
+ })
+ })
+ }
+
+ Context("minIdleConns = 1", func() {
+ BeforeEach(func() {
+ minIdleConns = 1
+ connPool = newConnPool()
+ })
+
+ AfterEach(func() {
+ connPool.Close()
+ })
+
+ assert()
+ })
+
+ Context("minIdleConns = 32", func() {
+ BeforeEach(func() {
+ minIdleConns = 32
+ connPool = newConnPool()
+ })
+
+ AfterEach(func() {
+ connPool.Close()
+ })
+
+ assert()
+ })
+})
+
+var _ = Describe("conns reaper", func() {
+ const idleTimeout = time.Minute
+ const maxAge = time.Hour
+
+ ctx := context.Background()
+ var connPool *pool.ConnPool
+ var conns, staleConns, closedConns []*pool.Conn
+
+ assert := func(typ string) {
+ BeforeEach(func() {
+ closedConns = nil
+ connPool = pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: 10,
+ IdleTimeout: idleTimeout,
+ MaxConnAge: maxAge,
+ PoolTimeout: time.Second,
+ IdleCheckFrequency: time.Hour,
+ OnClose: func(cn *pool.Conn) error {
+ closedConns = append(closedConns, cn)
+ return nil
+ },
+ })
+
+ conns = nil
+
+ // add stale connections
+ staleConns = nil
+ for i := 0; i < 3; i++ {
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ switch typ {
+ case "idle":
+ cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
+ case "aged":
+ cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
+ }
+ conns = append(conns, cn)
+ staleConns = append(staleConns, cn)
+ }
+
+ // add fresh connections
+ for i := 0; i < 3; i++ {
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ conns = append(conns, cn)
+ }
+
+ for _, cn := range conns {
+ connPool.Put(ctx, cn)
+ }
+
+ Expect(connPool.Len()).To(Equal(6))
+ Expect(connPool.IdleLen()).To(Equal(6))
+
+ n, err := connPool.ReapStaleConns()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(3))
+ })
+
+ AfterEach(func() {
+ _ = connPool.Close()
+ Expect(connPool.Len()).To(Equal(0))
+ Expect(connPool.IdleLen()).To(Equal(0))
+ Expect(len(closedConns)).To(Equal(len(conns)))
+ Expect(closedConns).To(ConsistOf(conns))
+ })
+
+ It("reaps stale connections", func() {
+ Expect(connPool.Len()).To(Equal(3))
+ Expect(connPool.IdleLen()).To(Equal(3))
+ })
+
+ It("does not reap fresh connections", func() {
+ n, err := connPool.ReapStaleConns()
+ Expect(err).NotTo(HaveOccurred())
+ Expect(n).To(Equal(0))
+ })
+
+ It("stale connections are closed", func() {
+ Expect(len(closedConns)).To(Equal(len(staleConns)))
+ Expect(closedConns).To(ConsistOf(staleConns))
+ })
+
+ It("pool is functional", func() {
+ for j := 0; j < 3; j++ {
+ var freeCns []*pool.Conn
+ for i := 0; i < 3; i++ {
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(cn).NotTo(BeNil())
+ freeCns = append(freeCns, cn)
+ }
+
+ Expect(connPool.Len()).To(Equal(3))
+ Expect(connPool.IdleLen()).To(Equal(0))
+
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(cn).NotTo(BeNil())
+ conns = append(conns, cn)
+
+ Expect(connPool.Len()).To(Equal(4))
+ Expect(connPool.IdleLen()).To(Equal(0))
+
+ connPool.Remove(ctx, cn, nil)
+
+ Expect(connPool.Len()).To(Equal(3))
+ Expect(connPool.IdleLen()).To(Equal(0))
+
+ for _, cn := range freeCns {
+ connPool.Put(ctx, cn)
+ }
+
+ Expect(connPool.Len()).To(Equal(3))
+ Expect(connPool.IdleLen()).To(Equal(3))
+ }
+ })
+ }
+
+ assert("idle")
+ assert("aged")
+})
+
+var _ = Describe("race", func() {
+ ctx := context.Background()
+ var connPool *pool.ConnPool
+ var C, N int
+
+ BeforeEach(func() {
+ C, N = 10, 1000
+ if testing.Short() {
+ C = 4
+ N = 100
+ }
+ })
+
+ AfterEach(func() {
+ connPool.Close()
+ })
+
+ It("does not happen on Get, Put, and Remove", func() {
+ connPool = pool.NewConnPool(&pool.Options{
+ Dialer: dummyDialer,
+ PoolSize: 10,
+ PoolTimeout: time.Minute,
+ IdleTimeout: time.Millisecond,
+ IdleCheckFrequency: time.Millisecond,
+ })
+
+ perform(C, func(id int) {
+ for i := 0; i < N; i++ {
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ if err == nil {
+ connPool.Put(ctx, cn)
+ }
+ }
+ }, func(id int) {
+ for i := 0; i < N; i++ {
+ cn, err := connPool.Get(ctx)
+ Expect(err).NotTo(HaveOccurred())
+ if err == nil {
+ connPool.Remove(ctx, cn, nil)
+ }
+ }
+ })
+ })
+})