1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
package config
import (
"context"
"crypto/tls"
"fmt"
"github.com/go-redis/redis/v8"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/icingaredis"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/retry"
"github.com/icinga/icingadb/pkg/utils"
"github.com/pkg/errors"
"go.uber.org/zap"
"net"
"strings"
"time"
)
// Redis defines Redis client configuration.
type Redis struct {
Host string `yaml:"host"`
Port int `yaml:"port" default:"6380"`
Password string `yaml:"password"`
TlsOptions TLS `yaml:",inline"`
Options icingaredis.Options `yaml:"options"`
}
type ctxDialerFunc = func(ctx context.Context, network, addr string) (net.Conn, error)
// NewClient prepares Redis client configuration,
// calls redis.NewClient, but returns *icingaredis.Client.
func (r *Redis) NewClient(logger *logging.Logger) (*icingaredis.Client, error) {
tlsConfig, err := r.TlsOptions.MakeConfig(r.Host)
if err != nil {
return nil, err
}
var dialer ctxDialerFunc
dl := &net.Dialer{Timeout: 15 * time.Second}
if tlsConfig == nil {
dialer = dl.DialContext
} else {
dialer = (&tls.Dialer{NetDialer: dl, Config: tlsConfig}).DialContext
}
options := &redis.Options{
Dialer: dialWithLogging(dialer, logger),
Password: r.Password,
DB: 0, // Use default DB,
ReadTimeout: r.Options.Timeout,
TLSConfig: tlsConfig,
}
if strings.HasPrefix(r.Host, "/") {
options.Network = "unix"
options.Addr = r.Host
} else {
options.Network = "tcp"
options.Addr = net.JoinHostPort(r.Host, fmt.Sprint(r.Port))
}
c := redis.NewClient(options)
opts := c.Options()
opts.PoolSize = utils.MaxInt(32, opts.PoolSize)
opts.MaxRetries = opts.PoolSize + 1 // https://github.com/go-redis/redis/issues/1737
c = redis.NewClient(opts)
return icingaredis.NewClient(c, logger, &r.Options), nil
}
// dialWithLogging returns a Redis Dialer with logging capabilities.
func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc {
// dial behaves like net.Dialer#DialContext,
// but re-tries on common errors that are considered retryable.
return func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
err = retry.WithBackoff(
ctx,
func(ctx context.Context) (err error) {
conn, err = dialer(ctx, network, addr)
return
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
retry.Settings{
Timeout: 5 * time.Minute,
OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
if attempt > 0 {
logger.Infow("Reconnected to Redis",
zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))
}
},
},
)
err = errors.Wrap(err, "can't connect to Redis")
return
}
}
// Validate checks constraints in the supplied Redis configuration and returns an error if they are violated.
func (r *Redis) Validate() error {
if r.Host == "" {
return errors.New("Redis host missing")
}
return r.Options.Validate()
}
|