summaryrefslogtreecommitdiffstats
path: root/pkg/config/redis.go
blob: 38571e3c856403ddc42a06c9731be3c38e5fbd5e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package config

import (
	"context"
	"crypto/tls"
	"fmt"
	"github.com/go-redis/redis/v8"
	"github.com/icinga/icingadb/pkg/backoff"
	"github.com/icinga/icingadb/pkg/icingaredis"
	"github.com/icinga/icingadb/pkg/logging"
	"github.com/icinga/icingadb/pkg/retry"
	"github.com/icinga/icingadb/pkg/utils"
	"github.com/pkg/errors"
	"go.uber.org/zap"
	"net"
	"strings"
	"time"
)

// Redis defines Redis client configuration.
type Redis struct {
	Host       string              `yaml:"host"`
	Port       int                 `yaml:"port" default:"6380"`
	Password   string              `yaml:"password"`
	TlsOptions TLS                 `yaml:",inline"`
	Options    icingaredis.Options `yaml:"options"`
}

type ctxDialerFunc = func(ctx context.Context, network, addr string) (net.Conn, error)

// NewClient prepares Redis client configuration,
// calls redis.NewClient, but returns *icingaredis.Client.
func (r *Redis) NewClient(logger *logging.Logger) (*icingaredis.Client, error) {
	tlsConfig, err := r.TlsOptions.MakeConfig(r.Host)
	if err != nil {
		return nil, err
	}

	var dialer ctxDialerFunc
	dl := &net.Dialer{Timeout: 15 * time.Second}

	if tlsConfig == nil {
		dialer = dl.DialContext
	} else {
		dialer = (&tls.Dialer{NetDialer: dl, Config: tlsConfig}).DialContext
	}

	options := &redis.Options{
		Dialer:      dialWithLogging(dialer, logger),
		Password:    r.Password,
		DB:          0, // Use default DB,
		ReadTimeout: r.Options.Timeout,
		TLSConfig:   tlsConfig,
	}

	if strings.HasPrefix(r.Host, "/") {
		options.Network = "unix"
		options.Addr = r.Host
	} else {
		options.Network = "tcp"
		options.Addr = net.JoinHostPort(r.Host, fmt.Sprint(r.Port))
	}

	c := redis.NewClient(options)

	opts := c.Options()
	opts.PoolSize = utils.MaxInt(32, opts.PoolSize)
	opts.MaxRetries = opts.PoolSize + 1 // https://github.com/go-redis/redis/issues/1737
	c = redis.NewClient(opts)

	return icingaredis.NewClient(c, logger, &r.Options), nil
}

// dialWithLogging returns a Redis Dialer with logging capabilities.
func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc {
	// dial behaves like net.Dialer#DialContext,
	// but re-tries on common errors that are considered retryable.
	return func(ctx context.Context, network, addr string) (conn net.Conn, err error) {
		err = retry.WithBackoff(
			ctx,
			func(ctx context.Context) (err error) {
				conn, err = dialer(ctx, network, addr)
				return
			},
			retry.Retryable,
			backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
			retry.Settings{
				Timeout: 5 * time.Minute,
				OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
					if lastErr == nil || err.Error() != lastErr.Error() {
						logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err))
					}
				},
				OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
					if attempt > 0 {
						logger.Infow("Reconnected to Redis",
							zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))
					}
				},
			},
		)

		err = errors.Wrap(err, "can't connect to Redis")

		return
	}
}

// Validate checks constraints in the supplied Redis configuration and returns an error if they are violated.
func (r *Redis) Validate() error {
	if r.Host == "" {
		return errors.New("Redis host missing")
	}

	return r.Options.Validate()
}