summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/k8s_state/kube_state.go
blob: eca8ed7fd478006897886bf87b6a90aaa9dd79c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// SPDX-License-Identifier: GPL-3.0-or-later

package k8s_state

import (
	"context"
	_ "embed"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/netdata/netdata/go/go.d.plugin/agent/module"

	"k8s.io/client-go/kubernetes"
)

//go:embed "config_schema.json"
var configSchema string

func init() {
	module.Register("k8s_state", module.Creator{
		JobConfigSchema: configSchema,
		Defaults: module.Defaults{
			Disabled: true,
		},
		Create: func() module.Module { return New() },
	})
}

func New() *KubeState {
	return &KubeState{
		initDelay:     time.Second * 3,
		newKubeClient: newKubeClient,
		charts:        baseCharts.Copy(),
		once:          &sync.Once{},
		wg:            &sync.WaitGroup{},
		state:         newKubeState(),
	}
}

type Config struct {
	UpdateEvery int `yaml:"update_every" json:"update_every"`
}

type (
	KubeState struct {
		module.Base
		Config `yaml:",inline" json:""`

		charts *module.Charts

		client        kubernetes.Interface
		newKubeClient func() (kubernetes.Interface, error)

		startTime       time.Time
		initDelay       time.Duration
		once            *sync.Once
		wg              *sync.WaitGroup
		discoverer      discoverer
		ctx             context.Context
		ctxCancel       context.CancelFunc
		kubeClusterID   string
		kubeClusterName string

		state *kubeState
	}
	discoverer interface {
		run(ctx context.Context, in chan<- resource)
		ready() bool
		stopped() bool
	}
)

func (ks *KubeState) Configuration() any {
	return ks.Config
}

func (ks *KubeState) Init() error {
	client, err := ks.initClient()
	if err != nil {
		ks.Errorf("client initialization: %v", err)
		return err
	}
	ks.client = client

	ks.ctx, ks.ctxCancel = context.WithCancel(context.Background())

	ks.discoverer = ks.initDiscoverer(ks.client)

	return nil
}

func (ks *KubeState) Check() error {
	if ks.client == nil || ks.discoverer == nil {
		ks.Error("not initialized job")
		return errors.New("not initialized")
	}

	ver, err := ks.client.Discovery().ServerVersion()
	if err != nil {
		err := fmt.Errorf("failed to connect to K8s API server: %v", err)
		ks.Error(err)
		return err
	}

	ks.Infof("successfully connected to the Kubernetes API server '%s'", ver)

	return nil
}

func (ks *KubeState) Charts() *module.Charts {
	return ks.charts
}

func (ks *KubeState) Collect() map[string]int64 {
	ms, err := ks.collect()
	if err != nil {
		ks.Error(err)
	}

	if len(ms) == 0 {
		return nil
	}
	return ms
}

func (ks *KubeState) Cleanup() {
	if ks.ctxCancel == nil {
		return
	}
	ks.ctxCancel()

	c := make(chan struct{})
	go func() { defer close(c); ks.wg.Wait() }()

	t := time.NewTimer(time.Second * 5)
	defer t.Stop()

	select {
	case <-c:
		return
	case <-t.C:
		return
	}
}