1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
// SPDX-License-Identifier: GPL-3.0-or-later
package k8s_state
import (
"context"
_ "embed"
"errors"
"fmt"
"sync"
"time"
"github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module"
"k8s.io/client-go/kubernetes"
)
//go:embed "config_schema.json"
var configSchema string
func init() {
module.Register("k8s_state", module.Creator{
JobConfigSchema: configSchema,
Defaults: module.Defaults{
Disabled: true,
},
Create: func() module.Module { return New() },
Config: func() any { return &Config{} },
})
}
func New() *KubeState {
return &KubeState{
initDelay: time.Second * 3,
newKubeClient: newKubeClient,
charts: baseCharts.Copy(),
once: &sync.Once{},
wg: &sync.WaitGroup{},
state: newKubeState(),
}
}
type Config struct {
UpdateEvery int `yaml:"update_every,omitempty" json:"update_every"`
}
type (
KubeState struct {
module.Base
Config `yaml:",inline" json:""`
charts *module.Charts
client kubernetes.Interface
newKubeClient func() (kubernetes.Interface, error)
startTime time.Time
initDelay time.Duration
once *sync.Once
wg *sync.WaitGroup
discoverer discoverer
ctx context.Context
ctxCancel context.CancelFunc
kubeClusterID string
kubeClusterName string
state *kubeState
}
discoverer interface {
run(ctx context.Context, in chan<- resource)
ready() bool
stopped() bool
}
)
func (ks *KubeState) Configuration() any {
return ks.Config
}
func (ks *KubeState) Init() error {
client, err := ks.initClient()
if err != nil {
ks.Errorf("client initialization: %v", err)
return err
}
ks.client = client
ks.ctx, ks.ctxCancel = context.WithCancel(context.Background())
ks.discoverer = ks.initDiscoverer(ks.client)
return nil
}
func (ks *KubeState) Check() error {
if ks.client == nil || ks.discoverer == nil {
ks.Error("not initialized job")
return errors.New("not initialized")
}
ver, err := ks.client.Discovery().ServerVersion()
if err != nil {
err := fmt.Errorf("failed to connect to K8s API server: %v", err)
ks.Error(err)
return err
}
ks.Infof("successfully connected to the Kubernetes API server '%s'", ver)
return nil
}
func (ks *KubeState) Charts() *module.Charts {
return ks.charts
}
func (ks *KubeState) Collect() map[string]int64 {
ms, err := ks.collect()
if err != nil {
ks.Error(err)
}
if len(ms) == 0 {
return nil
}
return ms
}
func (ks *KubeState) Cleanup() {
if ks.ctxCancel == nil {
return
}
ks.ctxCancel()
c := make(chan struct{})
go func() { defer close(c); ks.wg.Wait() }()
t := time.NewTimer(time.Second * 5)
defer t.Stop()
select {
case <-c:
return
case <-t.C:
return
}
}
|