diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /src/go/collectors/go.d.plugin/modules/k8s_state | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
21 files changed, 3639 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/README.md b/src/go/collectors/go.d.plugin/modules/k8s_state/README.md new file mode 120000 index 000000000..72c4e5cab --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/README.md @@ -0,0 +1 @@ +integrations/kubernetes_cluster_state.md
\ No newline at end of file diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/charts.go b/src/go/collectors/go.d.plugin/modules/k8s_state/charts.go new file mode 100644 index 000000000..0cec12512 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/charts.go @@ -0,0 +1,785 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "fmt" + "regexp" + "strings" + + "github.com/netdata/netdata/go/go.d.plugin/agent/module" +) + +// NETDATA_CHART_PRIO_CGROUPS_CONTAINERS 40000 +const prioDiscoveryDiscovererState = 50999 + +const ( + prioNodeAllocatableCPURequestsUtil = 50100 + iota + prioNodeAllocatableCPURequestsUsed + prioNodeAllocatableCPULimitsUtil + prioNodeAllocatableCPULimitsUsed + prioNodeAllocatableMemRequestsUtil + prioNodeAllocatableMemRequestsUsed + prioNodeAllocatableMemLimitsUtil + prioNodeAllocatableMemLimitsUsed + prioNodeAllocatablePodsUtil + prioNodeAllocatablePodsUsage + prioNodeConditions + prioNodeSchedulability + prioNodePodsReadiness + prioNodePodsReadinessState + prioNodePodsCondition + prioNodePodsPhase + prioNodeContainersCount + prioNodeContainersState + prioNodeInitContainersState + prioNodeAge +) + +const ( + prioPodCPURequestsUsed = 50300 + iota + prioPodCPULimitsUsed + prioPodMemRequestsUsed + prioPodMemLimitsUsed + prioPodCondition + prioPodPhase + prioPodAge + prioPodContainersCount + prioPodContainersState + prioPodInitContainersState + prioPodContainerReadinessState + prioPodContainerRestarts + prioPodContainerState + prioPodContainerWaitingStateReason + prioPodContainerTerminatedStateReason +) + +const ( + labelKeyPrefix = "k8s_" + //labelKeyLabelPrefix = labelKeyPrefix + "label_" + //labelKeyAnnotationPrefix = labelKeyPrefix + "annotation_" + labelKeyClusterID = labelKeyPrefix + "cluster_id" + labelKeyClusterName = labelKeyPrefix + "cluster_name" + labelKeyNamespace = labelKeyPrefix + "namespace" + labelKeyKind = labelKeyPrefix + "kind" + labelKeyPodName = labelKeyPrefix + "pod_name" + labelKeyNodeName = labelKeyPrefix + "node_name" + labelKeyPodUID = labelKeyPrefix + "pod_uid" + labelKeyControllerKind = labelKeyPrefix + "controller_kind" + labelKeyControllerName = labelKeyPrefix + "controller_name" + labelKeyContainerName = labelKeyPrefix + "container_name" + labelKeyContainerID = labelKeyPrefix + "container_id" + labelKeyQoSClass = labelKeyPrefix + "qos_class" +) + +var baseCharts = module.Charts{ + discoveryStatusChart.Copy(), +} + +var nodeChartsTmpl = module.Charts{ + nodeAllocatableCPURequestsUtilChartTmpl.Copy(), + nodeAllocatableCPURequestsUsedChartTmpl.Copy(), + nodeAllocatableCPULimitsUtilChartTmpl.Copy(), + nodeAllocatableCPULimitsUsedChartTmpl.Copy(), + nodeAllocatableMemRequestsUtilChartTmpl.Copy(), + nodeAllocatableMemRequestsUsedChartTmpl.Copy(), + nodeAllocatableMemLimitsUtilChartTmpl.Copy(), + nodeAllocatableMemLimitsUsedChartTmpl.Copy(), + nodeAllocatablePodsUtilizationChartTmpl.Copy(), + nodeAllocatablePodsUsageChartTmpl.Copy(), + nodeConditionsChartTmpl.Copy(), + nodeSchedulabilityChartTmpl.Copy(), + nodePodsReadinessChartTmpl.Copy(), + nodePodsReadinessStateChartTmpl.Copy(), + nodePodsConditionChartTmpl.Copy(), + nodePodsPhaseChartTmpl.Copy(), + nodeContainersChartTmpl.Copy(), + nodeContainersStateChartTmpl.Copy(), + nodeInitContainersStateChartTmpl.Copy(), + nodeAgeChartTmpl.Copy(), +} + +var podChartsTmpl = module.Charts{ + podCPURequestsUsedChartTmpl.Copy(), + podCPULimitsUsedChartTmpl.Copy(), + podMemRequestsUsedChartTmpl.Copy(), + podMemLimitsUsedChartTmpl.Copy(), + podConditionChartTmpl.Copy(), + podPhaseChartTmpl.Copy(), + podAgeChartTmpl.Copy(), + podContainersCountChartTmpl.Copy(), + podContainersStateChartTmpl.Copy(), + podInitContainersStateChartTmpl.Copy(), +} + +var containerChartsTmpl = module.Charts{ + containerReadinessStateChartTmpl.Copy(), + containerRestartsChartTmpl.Copy(), + containersStateChartTmpl.Copy(), + containersStateWaitingChartTmpl.Copy(), + containersStateTerminatedChartTmpl.Copy(), +} + +var ( + // CPU resource + nodeAllocatableCPURequestsUtilChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_cpu_requests_utilization", + Title: "CPU requests utilization", + Units: "%", + Fam: "node cpu resource", + Ctx: "k8s_state.node_allocatable_cpu_requests_utilization", + Priority: prioNodeAllocatableCPURequestsUtil, + Dims: module.Dims{ + {ID: "node_%s_alloc_cpu_requests_util", Name: "requests", Div: precision}, + }, + } + nodeAllocatableCPURequestsUsedChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_cpu_requests_used", + Title: "CPU requests used", + Units: "millicpu", + Fam: "node cpu resource", + Ctx: "k8s_state.node_allocatable_cpu_requests_used", + Priority: prioNodeAllocatableCPURequestsUsed, + Dims: module.Dims{ + {ID: "node_%s_alloc_cpu_requests_used", Name: "requests"}, + }, + } + nodeAllocatableCPULimitsUtilChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_cpu_limits_utilization", + Title: "CPU limits utilization", + Units: "%", + Fam: "node cpu resource", + Ctx: "k8s_state.node_allocatable_cpu_limits_utilization", + Priority: prioNodeAllocatableCPULimitsUtil, + Dims: module.Dims{ + {ID: "node_%s_alloc_cpu_limits_util", Name: "limits", Div: precision}, + }, + } + nodeAllocatableCPULimitsUsedChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_cpu_limits_used", + Title: "CPU limits used", + Units: "millicpu", + Fam: "node cpu resource", + Ctx: "k8s_state.node_allocatable_cpu_limits_used", + Priority: prioNodeAllocatableCPULimitsUsed, + Dims: module.Dims{ + {ID: "node_%s_alloc_cpu_limits_used", Name: "limits"}, + }, + } + // memory resource + nodeAllocatableMemRequestsUtilChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_mem_requests_utilization", + Title: "Memory requests utilization", + Units: "%", + Fam: "node mem resource", + Ctx: "k8s_state.node_allocatable_mem_requests_utilization", + Priority: prioNodeAllocatableMemRequestsUtil, + Dims: module.Dims{ + {ID: "node_%s_alloc_mem_requests_util", Name: "requests", Div: precision}, + }, + } + nodeAllocatableMemRequestsUsedChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_mem_requests_used", + Title: "Memory requests used", + Units: "bytes", + Fam: "node mem resource", + Ctx: "k8s_state.node_allocatable_mem_requests_used", + Priority: prioNodeAllocatableMemRequestsUsed, + Dims: module.Dims{ + {ID: "node_%s_alloc_mem_requests_used", Name: "requests"}, + }, + } + nodeAllocatableMemLimitsUtilChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_mem_limits_utilization", + Title: "Memory limits utilization", + Units: "%", + Fam: "node mem resource", + Ctx: "k8s_state.node_allocatable_mem_limits_utilization", + Priority: prioNodeAllocatableMemLimitsUtil, + Dims: module.Dims{ + {ID: "node_%s_alloc_mem_limits_util", Name: "limits", Div: precision}, + }, + } + nodeAllocatableMemLimitsUsedChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_mem_limits_used", + Title: "Memory limits used", + Units: "bytes", + Fam: "node mem resource", + Ctx: "k8s_state.node_allocatable_mem_limits_used", + Priority: prioNodeAllocatableMemLimitsUsed, + Dims: module.Dims{ + {ID: "node_%s_alloc_mem_limits_used", Name: "limits"}, + }, + } + // pods resource + nodeAllocatablePodsUtilizationChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocatable_pods_utilization", + Title: "Pods resource utilization", + Units: "%", + Fam: "node pods resource", + Ctx: "k8s_state.node_allocatable_pods_utilization", + Priority: prioNodeAllocatablePodsUtil, + Dims: module.Dims{ + {ID: "node_%s_alloc_pods_util", Name: "allocated", Div: precision}, + }, + } + nodeAllocatablePodsUsageChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.allocated_pods_usage", + Title: "Pods resource usage", + Units: "pods", + Fam: "node pods resource", + Ctx: "k8s_state.node_allocatable_pods_usage", + Type: module.Stacked, + Priority: prioNodeAllocatablePodsUsage, + Dims: module.Dims{ + {ID: "node_%s_alloc_pods_available", Name: "available"}, + {ID: "node_%s_alloc_pods_allocated", Name: "allocated"}, + }, + } + // condition + nodeConditionsChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.condition_status", + Title: "Condition status", + Units: "status", + Fam: "node condition", + Ctx: "k8s_state.node_condition", + Priority: prioNodeConditions, + } + nodeSchedulabilityChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.schedulability", + Title: "Schedulability", + Units: "state", + Fam: "node schedulability", + Ctx: "k8s_state.node_schedulability", + Priority: prioNodeSchedulability, + Dims: module.Dims{ + {ID: "node_%s_schedulability_schedulable", Name: "schedulable"}, + {ID: "node_%s_schedulability_unschedulable", Name: "unschedulable"}, + }, + } + // pods readiness + nodePodsReadinessChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.pods_readiness", + Title: "Pods readiness", + Units: "%", + Fam: "node pods readiness", + Ctx: "k8s_state.node_pods_readiness", + Priority: prioNodePodsReadiness, + Dims: module.Dims{ + {ID: "node_%s_pods_readiness", Name: "ready", Div: precision}, + }, + } + nodePodsReadinessStateChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.pods_readiness_state", + Title: "Pods readiness state", + Units: "pods", + Fam: "node pods readiness", + Ctx: "k8s_state.node_pods_readiness_state", + Type: module.Stacked, + Priority: prioNodePodsReadinessState, + Dims: module.Dims{ + {ID: "node_%s_pods_readiness_ready", Name: "ready"}, + {ID: "node_%s_pods_readiness_unready", Name: "unready"}, + }, + } + // pods condition + nodePodsConditionChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.pods_condition", + Title: "Pods condition", + Units: "pods", + Fam: "node pods condition", + Ctx: "k8s_state.node_pods_condition", + Priority: prioNodePodsCondition, + Dims: module.Dims{ + {ID: "node_%s_pods_cond_podready", Name: "pod_ready"}, + {ID: "node_%s_pods_cond_podscheduled", Name: "pod_scheduled"}, + {ID: "node_%s_pods_cond_podinitialized", Name: "pod_initialized"}, + {ID: "node_%s_pods_cond_containersready", Name: "containers_ready"}, + }, + } + // pods phase + nodePodsPhaseChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.pods_phase", + Title: "Pods phase", + Units: "pods", + Fam: "node pods phase", + Ctx: "k8s_state.node_pods_phase", + Type: module.Stacked, + Priority: prioNodePodsPhase, + Dims: module.Dims{ + {ID: "node_%s_pods_phase_running", Name: "running"}, + {ID: "node_%s_pods_phase_failed", Name: "failed"}, + {ID: "node_%s_pods_phase_succeeded", Name: "succeeded"}, + {ID: "node_%s_pods_phase_pending", Name: "pending"}, + }, + } + // containers + nodeContainersChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.containers", + Title: "Containers", + Units: "containers", + Fam: "node containers", + Ctx: "k8s_state.node_containers", + Priority: prioNodeContainersCount, + Dims: module.Dims{ + {ID: "node_%s_containers", Name: "containers"}, + {ID: "node_%s_init_containers", Name: "init_containers"}, + }, + } + nodeContainersStateChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.containers_state", + Title: "Containers state", + Units: "containers", + Fam: "node containers", + Ctx: "k8s_state.node_containers_state", + Type: module.Stacked, + Priority: prioNodeContainersState, + Dims: module.Dims{ + {ID: "node_%s_containers_state_running", Name: "running"}, + {ID: "node_%s_containers_state_waiting", Name: "waiting"}, + {ID: "node_%s_containers_state_terminated", Name: "terminated"}, + }, + } + nodeInitContainersStateChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.init_containers_state", + Title: "Init containers state", + Units: "containers", + Fam: "node containers", + Ctx: "k8s_state.node_init_containers_state", + Type: module.Stacked, + Priority: prioNodeInitContainersState, + Dims: module.Dims{ + {ID: "node_%s_init_containers_state_running", Name: "running"}, + {ID: "node_%s_init_containers_state_waiting", Name: "waiting"}, + {ID: "node_%s_init_containers_state_terminated", Name: "terminated"}, + }, + } + // age + nodeAgeChartTmpl = module.Chart{ + IDSep: true, + ID: "node_%s.age", + Title: "Age", + Units: "seconds", + Fam: "node age", + Ctx: "k8s_state.node_age", + Priority: prioNodeAge, + Dims: module.Dims{ + {ID: "node_%s_age", Name: "age"}, + }, + } +) + +func (ks *KubeState) newNodeCharts(ns *nodeState) *module.Charts { + cs := nodeChartsTmpl.Copy() + for _, c := range *cs { + c.ID = fmt.Sprintf(c.ID, replaceDots(ns.id())) + c.Labels = ks.newNodeChartLabels(ns) + for _, d := range c.Dims { + d.ID = fmt.Sprintf(d.ID, ns.id()) + } + } + return cs +} + +func (ks *KubeState) newNodeChartLabels(ns *nodeState) []module.Label { + labels := []module.Label{ + {Key: labelKeyNodeName, Value: ns.name, Source: module.LabelSourceK8s}, + {Key: labelKeyClusterID, Value: ks.kubeClusterID, Source: module.LabelSourceK8s}, + {Key: labelKeyClusterName, Value: ks.kubeClusterName, Source: module.LabelSourceK8s}, + } + return labels +} + +func (ks *KubeState) addNodeCharts(ns *nodeState) { + cs := ks.newNodeCharts(ns) + if err := ks.Charts().Add(*cs...); err != nil { + ks.Warning(err) + } +} + +func (ks *KubeState) removeNodeCharts(ns *nodeState) { + prefix := fmt.Sprintf("node_%s", replaceDots(ns.id())) + for _, c := range *ks.Charts() { + if strings.HasPrefix(c.ID, prefix) { + c.MarkRemove() + c.MarkNotCreated() + } + } +} + +func (ks *KubeState) addNodeConditionToCharts(ns *nodeState, cond string) { + id := fmt.Sprintf(nodeConditionsChartTmpl.ID, replaceDots(ns.id())) + c := ks.Charts().Get(id) + if c == nil { + ks.Warningf("chart '%s' does not exist", id) + return + } + dim := &module.Dim{ + ID: fmt.Sprintf("node_%s_cond_%s", ns.id(), strings.ToLower(cond)), + Name: cond, + } + if err := c.AddDim(dim); err != nil { + ks.Warning(err) + return + } + c.MarkNotCreated() +} + +var ( + podCPURequestsUsedChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.cpu_requests_used", + Title: "CPU requests used", + Units: "millicpu", + Fam: "pod allocated cpu", + Ctx: "k8s_state.pod_cpu_requests_used", + Priority: prioPodCPURequestsUsed, + Dims: module.Dims{ + {ID: "pod_%s_cpu_requests_used", Name: "requests"}, + }, + } + podCPULimitsUsedChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.cpu_limits_used", + Title: "CPU limits used", + Units: "millicpu", + Fam: "pod allocated cpu", + Ctx: "k8s_state.pod_cpu_limits_used", + Priority: prioPodCPULimitsUsed, + Dims: module.Dims{ + {ID: "pod_%s_cpu_limits_used", Name: "limits"}, + }, + } + podMemRequestsUsedChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.mem_requests_used", + Title: "Memory requests used", + Units: "bytes", + Fam: "pod allocated mem", + Ctx: "k8s_state.pod_mem_requests_used", + Priority: prioPodMemRequestsUsed, + Dims: module.Dims{ + {ID: "pod_%s_mem_requests_used", Name: "requests"}, + }, + } + podMemLimitsUsedChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.mem_limits_used", + Title: "Memory limits used", + Units: "bytes", + Fam: "pod allocated mem", + Ctx: "k8s_state.pod_mem_limits_used", + Priority: prioPodMemLimitsUsed, + Dims: module.Dims{ + {ID: "pod_%s_mem_limits_used", Name: "limits"}, + }, + } + podConditionChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.condition", + Title: "Condition", + Units: "state", + Fam: "pod condition", + Ctx: "k8s_state.pod_condition", + Priority: prioPodCondition, + Dims: module.Dims{ + {ID: "pod_%s_cond_podready", Name: "pod_ready"}, + {ID: "pod_%s_cond_podscheduled", Name: "pod_scheduled"}, + {ID: "pod_%s_cond_podinitialized", Name: "pod_initialized"}, + {ID: "pod_%s_cond_containersready", Name: "containers_ready"}, + }, + } + podPhaseChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.phase", + Title: "Phase", + Units: "state", + Fam: "pod phase", + Ctx: "k8s_state.pod_phase", + Priority: prioPodPhase, + Dims: module.Dims{ + {ID: "pod_%s_phase_running", Name: "running"}, + {ID: "pod_%s_phase_failed", Name: "failed"}, + {ID: "pod_%s_phase_succeeded", Name: "succeeded"}, + {ID: "pod_%s_phase_pending", Name: "pending"}, + }, + } + podAgeChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.age", + Title: "Age", + Units: "seconds", + Fam: "pod age", + Ctx: "k8s_state.pod_age", + Priority: prioPodAge, + Dims: module.Dims{ + {ID: "pod_%s_age", Name: "age"}, + }, + } + podContainersCountChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.containers_count", + Title: "Containers", + Units: "containers", + Fam: "pod containers", + Ctx: "k8s_state.pod_containers", + Priority: prioPodContainersCount, + Dims: module.Dims{ + {ID: "pod_%s_containers", Name: "containers"}, + {ID: "pod_%s_init_containers", Name: "init_containers"}, + }, + } + podContainersStateChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.containers_state", + Title: "Containers state", + Units: "containers", + Fam: "pod containers", + Ctx: "k8s_state.pod_containers_state", + Type: module.Stacked, + Priority: prioPodContainersState, + Dims: module.Dims{ + {ID: "pod_%s_containers_state_running", Name: "running"}, + {ID: "pod_%s_containers_state_waiting", Name: "waiting"}, + {ID: "pod_%s_containers_state_terminated", Name: "terminated"}, + }, + } + podInitContainersStateChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s.init_containers_state", + Title: "Init containers state", + Units: "containers", + Fam: "pod containers", + Ctx: "k8s_state.pod_init_containers_state", + Type: module.Stacked, + Priority: prioPodInitContainersState, + Dims: module.Dims{ + {ID: "pod_%s_init_containers_state_running", Name: "running"}, + {ID: "pod_%s_init_containers_state_waiting", Name: "waiting"}, + {ID: "pod_%s_init_containers_state_terminated", Name: "terminated"}, + }, + } +) + +func (ks *KubeState) newPodCharts(ps *podState) *module.Charts { + charts := podChartsTmpl.Copy() + for _, c := range *charts { + c.ID = fmt.Sprintf(c.ID, replaceDots(ps.id())) + c.Labels = ks.newPodChartLabels(ps) + for _, d := range c.Dims { + d.ID = fmt.Sprintf(d.ID, ps.id()) + } + } + return charts +} + +func (ks *KubeState) newPodChartLabels(ps *podState) []module.Label { + labels := []module.Label{ + {Key: labelKeyNamespace, Value: ps.namespace, Source: module.LabelSourceK8s}, + {Key: labelKeyPodName, Value: ps.name, Source: module.LabelSourceK8s}, + {Key: labelKeyNodeName, Value: ps.nodeName, Source: module.LabelSourceK8s}, + {Key: labelKeyQoSClass, Value: ps.qosClass, Source: module.LabelSourceK8s}, + {Key: labelKeyControllerKind, Value: ps.controllerKind, Source: module.LabelSourceK8s}, + {Key: labelKeyControllerName, Value: ps.controllerName, Source: module.LabelSourceK8s}, + {Key: labelKeyClusterID, Value: ks.kubeClusterID, Source: module.LabelSourceK8s}, + {Key: labelKeyClusterName, Value: ks.kubeClusterName, Source: module.LabelSourceK8s}, + } + return labels +} + +func (ks *KubeState) addPodCharts(ps *podState) { + charts := ks.newPodCharts(ps) + if err := ks.Charts().Add(*charts...); err != nil { + ks.Warning(err) + } +} + +func (ks *KubeState) updatePodChartsNodeLabel(ps *podState) { + prefix := fmt.Sprintf("pod_%s", replaceDots(ps.id())) + for _, c := range *ks.Charts() { + if strings.HasPrefix(c.ID, prefix) { + updateNodeLabel(c, ps.nodeName) + c.MarkNotCreated() + } + } +} + +func updateNodeLabel(c *module.Chart, nodeName string) { + for i, l := range c.Labels { + if l.Key == labelKeyNodeName { + c.Labels[i].Value = nodeName + break + } + } +} + +func (ks *KubeState) removePodCharts(ps *podState) { + prefix := fmt.Sprintf("pod_%s", replaceDots(ps.id())) + for _, c := range *ks.Charts() { + if strings.HasPrefix(c.ID, prefix) { + c.MarkRemove() + c.MarkNotCreated() + } + } +} + +var ( + containerReadinessStateChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s_container_%s.readiness_state", + Title: "Readiness state", + Units: "state", + Fam: "container readiness", + Ctx: "k8s_state.pod_container_readiness_state", + Priority: prioPodContainerReadinessState, + Dims: module.Dims{ + {ID: "pod_%s_container_%s_readiness", Name: "ready"}, + }, + } + containerRestartsChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s_container_%s.restarts", + Title: "Restarts", + Units: "restarts", + Fam: "container restarts", + Ctx: "k8s_state.pod_container_restarts", + Priority: prioPodContainerRestarts, + Dims: module.Dims{ + {ID: "pod_%s_container_%s_restarts", Name: "restarts"}, + }, + } + containersStateChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s_container_%s.state", + Title: "Container state", + Units: "state", + Fam: "container state", + Ctx: "k8s_state.pod_container_state", + Priority: prioPodContainerState, + Dims: module.Dims{ + {ID: "pod_%s_container_%s_state_running", Name: "running"}, + {ID: "pod_%s_container_%s_state_waiting", Name: "waiting"}, + {ID: "pod_%s_container_%s_state_terminated", Name: "terminated"}, + }, + } + containersStateWaitingChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s_container_%s.state_waiting_reason", + Title: "Container waiting state reason", + Units: "state", + Fam: "container waiting reason", + Ctx: "k8s_state.pod_container_waiting_state_reason", + Priority: prioPodContainerWaitingStateReason, + } + containersStateTerminatedChartTmpl = module.Chart{ + IDSep: true, + ID: "pod_%s_container_%s.state_terminated_reason", + Title: "Container terminated state reason", + Units: "state", + Fam: "container terminated reason", + Ctx: "k8s_state.pod_container_terminated_state_reason", + Priority: prioPodContainerTerminatedStateReason, + } +) + +func (ks *KubeState) newContainerCharts(ps *podState, cs *containerState) *module.Charts { + charts := containerChartsTmpl.Copy() + for _, c := range *charts { + c.ID = fmt.Sprintf(c.ID, replaceDots(ps.id()), cs.name) + c.Labels = ks.newContainerChartLabels(ps, cs) + for _, d := range c.Dims { + d.ID = fmt.Sprintf(d.ID, ps.id(), cs.name) + } + } + return charts +} + +func (ks *KubeState) newContainerChartLabels(ps *podState, cs *containerState) []module.Label { + labels := ks.newPodChartLabels(ps) + labels = append( + labels, module.Label{Key: labelKeyContainerName, Value: cs.name, Source: module.LabelSourceK8s}, + ) + return labels +} + +func (ks *KubeState) addContainerCharts(ps *podState, cs *containerState) { + charts := ks.newContainerCharts(ps, cs) + if err := ks.Charts().Add(*charts...); err != nil { + ks.Warning(err) + } +} + +func (ks *KubeState) addContainerWaitingStateReasonToChart(ps *podState, cs *containerState, reason string) { + id := fmt.Sprintf(containersStateWaitingChartTmpl.ID, replaceDots(ps.id()), cs.name) + c := ks.Charts().Get(id) + if c == nil { + ks.Warningf("chart '%s' does not exist", id) + return + } + dim := &module.Dim{ + ID: fmt.Sprintf("pod_%s_container_%s_state_waiting_reason_%s", ps.id(), cs.name, reason), + Name: reason, + } + if err := c.AddDim(dim); err != nil { + ks.Warning(err) + return + } + c.MarkNotCreated() +} + +func (ks *KubeState) addContainerTerminatedStateReasonToChart(ps *podState, cs *containerState, reason string) { + id := fmt.Sprintf(containersStateTerminatedChartTmpl.ID, replaceDots(ps.id()), cs.name) + c := ks.Charts().Get(id) + if c == nil { + ks.Warningf("chart '%s' does not exist", id) + return + } + dim := &module.Dim{ + ID: fmt.Sprintf("pod_%s_container_%s_state_terminated_reason_%s", ps.id(), cs.name, reason), + Name: reason, + } + if err := c.AddDim(dim); err != nil { + ks.Warning(err) + return + } + c.MarkNotCreated() +} + +var discoveryStatusChart = module.Chart{ + ID: "discovery_discoverers_state", + Title: "Running discoverers state", + Units: "state", + Fam: "discovery", + Ctx: "k8s_state.discovery_discoverers_state", + Priority: prioDiscoveryDiscovererState, + Opts: module.Opts{Hidden: true}, + Dims: module.Dims{ + {ID: "discovery_node_discoverer_state", Name: "node"}, + {ID: "discovery_pod_discoverer_state", Name: "pod"}, + }, +} + +var reDots = regexp.MustCompile(`\.`) + +func replaceDots(v string) string { + return reDots.ReplaceAllString(v, "-") +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/client.go b/src/go/collectors/go.d.plugin/modules/k8s_state/client.go new file mode 100644 index 000000000..315e823fe --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/client.go @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "errors" + "os" + "path/filepath" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + "github.com/mattn/go-isatty" +) + +const ( + envKubeServiceHost = "KUBERNETES_SERVICE_HOST" + envKubeServicePort = "KUBERNETES_SERVICE_PORT" +) + +func newKubeClient() (kubernetes.Interface, error) { + if os.Getenv(envKubeServiceHost) != "" && os.Getenv(envKubeServicePort) != "" { + return newKubeClientInCluster() + } + if isatty.IsTerminal(os.Stdout.Fd()) { + return newKubeClientOutOfCluster() + } + return nil, errors.New("can not create Kubernetes client: not inside a cluster") +} + +func newKubeClientInCluster() (*kubernetes.Clientset, error) { + config, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + config.UserAgent = "Netdata/kube-state" + return kubernetes.NewForConfig(config) +} + +func newKubeClientOutOfCluster() (*kubernetes.Clientset, error) { + home := homeDir() + if home == "" { + return nil, errors.New("couldn't find home directory") + } + + configPath := filepath.Join(home, ".kube", "config") + config, err := clientcmd.BuildConfigFromFlags("", configPath) + if err != nil { + return nil, err + } + + config.UserAgent = "Netdata/kube-state" + return kubernetes.NewForConfig(config) +} + +func homeDir() string { + if h := os.Getenv("HOME"); h != "" { + return h + } + return os.Getenv("USERPROFILE") // windows +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/cluster_meta.go b/src/go/collectors/go.d.plugin/modules/k8s_state/cluster_meta.go new file mode 100644 index 000000000..e7eb809cc --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/cluster_meta.go @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "fmt" + "io" + "net/http" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (ks *KubeState) getKubeClusterID() string { + ns, err := ks.client.CoreV1().Namespaces().Get(ks.ctx, "kube-system", metav1.GetOptions{}) + if err != nil { + ks.Warningf("error on getting 'kube-system' namespace UID: %v", err) + return "" + } + return string(ns.UID) +} + +func (ks *KubeState) getKubeClusterName() string { + client := http.Client{Timeout: time.Second} + n, err := getGKEKubeClusterName(client) + if err != nil { + ks.Debugf("error on getting GKE cluster name: %v", err) + } + return n +} + +func getGKEKubeClusterName(client http.Client) (string, error) { + id, err := doMetaGKEHTTPReq(client, "http://metadata/computeMetadata/v1/project/project-id") + if err != nil { + return "", err + } + loc, err := doMetaGKEHTTPReq(client, "http://metadata/computeMetadata/v1/instance/attributes/cluster-location") + if err != nil { + return "", err + } + name, err := doMetaGKEHTTPReq(client, "http://metadata/computeMetadata/v1/instance/attributes/cluster-name") + if err != nil { + return "", err + } + + return fmt.Sprintf("gke_%s_%s_%s", id, loc, name), nil +} + +func doMetaGKEHTTPReq(client http.Client, url string) (string, error) { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return "", err + } + + req.Header.Add("Metadata-Flavor", "Google") + resp, err := client.Do(req) + if err != nil { + return "", err + } + defer closeHTTPRespBody(resp) + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("'%s' returned HTTP status code %d", url, resp.StatusCode) + } + + bs, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + + s := string(bs) + if s == "" { + return "", fmt.Errorf("an empty response from '%s'", url) + } + + return s, nil +} + +func closeHTTPRespBody(resp *http.Response) { + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/collect.go b/src/go/collectors/go.d.plugin/modules/k8s_state/collect.go new file mode 100644 index 000000000..033d330ce --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/collect.go @@ -0,0 +1,264 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/module" + + corev1 "k8s.io/api/core/v1" +) + +const precision = 1000 + +func (ks *KubeState) collect() (map[string]int64, error) { + if ks.discoverer == nil { + return nil, errors.New("nil discoverer") + } + + ks.once.Do(func() { + ks.startTime = time.Now() + in := make(chan resource) + + ks.wg.Add(1) + go func() { defer ks.wg.Done(); ks.runUpdateState(in) }() + + ks.wg.Add(1) + go func() { defer ks.wg.Done(); ks.discoverer.run(ks.ctx, in) }() + + ks.kubeClusterID = ks.getKubeClusterID() + ks.kubeClusterName = ks.getKubeClusterName() + if chart := ks.Charts().Get(discoveryStatusChart.ID); chart != nil { + chart.Labels = []module.Label{ + {Key: labelKeyClusterID, Value: ks.kubeClusterID, Source: module.LabelSourceK8s}, + {Key: labelKeyClusterName, Value: ks.kubeClusterName, Source: module.LabelSourceK8s}, + } + } + }) + + mx := map[string]int64{ + "discovery_node_discoverer_state": 1, + "discovery_pod_discoverer_state": 1, + } + + if !ks.discoverer.ready() || time.Since(ks.startTime) < ks.initDelay { + return mx, nil + } + + ks.state.Lock() + defer ks.state.Unlock() + + ks.collectKubeState(mx) + + return mx, nil +} + +func (ks *KubeState) collectKubeState(mx map[string]int64) { + for _, ns := range ks.state.nodes { + ns.resetStats() + } + ks.collectPodsState(mx) + ks.collectNodesState(mx) +} + +func (ks *KubeState) collectPodsState(mx map[string]int64) { + now := time.Now() + for _, ps := range ks.state.pods { + if ps.deleted { + delete(ks.state.pods, podSource(ps.namespace, ps.name)) + ks.removePodCharts(ps) + continue + } + if ps.new { + ps.new = false + ks.addPodCharts(ps) + ps.unscheduled = ps.nodeName == "" + } else if ps.unscheduled && ps.nodeName != "" { + ps.unscheduled = false + ks.updatePodChartsNodeLabel(ps) + } + + ns := ks.state.nodes[nodeSource(ps.nodeName)] + if ns != nil { + ns.stats.pods++ + ns.stats.reqCPU += ps.reqCPU + ns.stats.limitCPU += ps.limitCPU + ns.stats.reqMem += ps.reqMem + ns.stats.limitMem += ps.limitMem + ns.stats.podsCondPodReady += condStatusToInt(ps.condPodReady) + ns.stats.podsCondPodScheduled += condStatusToInt(ps.condPodScheduled) + ns.stats.podsCondPodInitialized += condStatusToInt(ps.condPodInitialized) + ns.stats.podsCondContainersReady += condStatusToInt(ps.condContainersReady) + ns.stats.podsReadinessReady += boolToInt(ps.condPodReady == corev1.ConditionTrue) + ns.stats.podsReadinessUnready += boolToInt(ps.condPodReady != corev1.ConditionTrue) + ns.stats.podsPhasePending += boolToInt(ps.phase == corev1.PodPending) + ns.stats.podsPhaseRunning += boolToInt(ps.phase == corev1.PodRunning) + ns.stats.podsPhaseSucceeded += boolToInt(ps.phase == corev1.PodSucceeded) + ns.stats.podsPhaseFailed += boolToInt(ps.phase == corev1.PodFailed) + for _, cs := range ps.initContainers { + ns.stats.initContainers++ + ns.stats.initContStateRunning += boolToInt(cs.stateRunning) + ns.stats.initContStateWaiting += boolToInt(cs.stateWaiting) + ns.stats.initContStateTerminated += boolToInt(cs.stateTerminated) + } + for _, cs := range ps.containers { + ns.stats.containers++ + ns.stats.contStateRunning += boolToInt(cs.stateRunning) + ns.stats.contStateWaiting += boolToInt(cs.stateWaiting) + ns.stats.contStateTerminated += boolToInt(cs.stateTerminated) + } + } + + px := fmt.Sprintf("pod_%s_", ps.id()) + + mx[px+"cond_podready"] = condStatusToInt(ps.condPodReady) + mx[px+"cond_podscheduled"] = condStatusToInt(ps.condPodScheduled) + mx[px+"cond_podinitialized"] = condStatusToInt(ps.condPodInitialized) + mx[px+"cond_containersready"] = condStatusToInt(ps.condContainersReady) + mx[px+"phase_running"] = boolToInt(ps.phase == corev1.PodRunning) + mx[px+"phase_failed"] = boolToInt(ps.phase == corev1.PodFailed) + mx[px+"phase_succeeded"] = boolToInt(ps.phase == corev1.PodSucceeded) + mx[px+"phase_pending"] = boolToInt(ps.phase == corev1.PodPending) + mx[px+"age"] = int64(now.Sub(ps.creationTime).Seconds()) + mx[px+"cpu_requests_used"] = ps.reqCPU + mx[px+"cpu_limits_used"] = ps.limitCPU + mx[px+"mem_requests_used"] = ps.reqMem + mx[px+"mem_limits_used"] = ps.limitMem + + mx[px+"init_containers"] = int64(len(ps.initContainers)) + mx[px+"containers"] = int64(len(ps.containers)) + + mx[px+"init_containers_state_running"] = 0 + mx[px+"init_containers_state_waiting"] = 0 + mx[px+"init_containers_state_terminated"] = 0 + for _, cs := range ps.initContainers { + mx[px+"init_containers_state_running"] += boolToInt(cs.stateRunning) + mx[px+"init_containers_state_waiting"] += boolToInt(cs.stateWaiting) + mx[px+"init_containers_state_terminated"] += boolToInt(cs.stateTerminated) + } + mx[px+"containers_state_running"] = 0 + mx[px+"containers_state_waiting"] = 0 + mx[px+"containers_state_terminated"] = 0 + for _, cs := range ps.containers { + if cs.new { + cs.new = false + ks.addContainerCharts(ps, cs) + } + mx[px+"containers_state_running"] += boolToInt(cs.stateRunning) + mx[px+"containers_state_waiting"] += boolToInt(cs.stateWaiting) + mx[px+"containers_state_terminated"] += boolToInt(cs.stateTerminated) + + ppx := fmt.Sprintf("%scontainer_%s_", px, cs.name) + mx[ppx+"state_running"] = boolToInt(cs.stateRunning) + mx[ppx+"state_waiting"] = boolToInt(cs.stateWaiting) + mx[ppx+"state_terminated"] = boolToInt(cs.stateTerminated) + mx[ppx+"readiness"] = boolToInt(cs.ready) + mx[ppx+"restarts"] = cs.restarts + for _, r := range cs.stateWaitingReasons { + if r.new { + r.new = false + ks.addContainerWaitingStateReasonToChart(ps, cs, r.reason) + } + mx[ppx+"state_waiting_reason_"+r.reason] = boolToInt(r.active) + } + for _, r := range cs.stateTerminatedReasons { + if r.new { + r.new = false + ks.addContainerTerminatedStateReasonToChart(ps, cs, r.reason) + } + mx[ppx+"state_terminated_reason_"+r.reason] = boolToInt(r.active) + } + } + } +} + +func (ks *KubeState) collectNodesState(mx map[string]int64) { + now := time.Now() + for _, ns := range ks.state.nodes { + if ns.deleted { + delete(ks.state.nodes, nodeSource(ns.name)) + ks.removeNodeCharts(ns) + continue + } + if ns.new { + ns.new = false + ks.addNodeCharts(ns) + } + + px := fmt.Sprintf("node_%s_", ns.id()) + + for typ, cond := range ns.conditions { + if cond.new { + cond.new = false + ks.addNodeConditionToCharts(ns, typ) + } + mx[px+"cond_"+strings.ToLower(typ)] = condStatusToInt(cond.status) + } + + mx[px+"age"] = int64(now.Sub(ns.creationTime).Seconds()) + mx[px+"alloc_pods_util"] = calcPercentage(ns.stats.pods, ns.allocatablePods) + mx[px+"pods_readiness_ready"] = ns.stats.podsReadinessReady + mx[px+"pods_readiness_unready"] = ns.stats.podsReadinessUnready + mx[px+"pods_readiness"] = calcPercentage(ns.stats.podsReadinessReady, ns.stats.pods) + mx[px+"pods_phase_running"] = ns.stats.podsPhaseRunning + mx[px+"pods_phase_failed"] = ns.stats.podsPhaseFailed + mx[px+"pods_phase_succeeded"] = ns.stats.podsPhaseSucceeded + mx[px+"pods_phase_pending"] = ns.stats.podsPhasePending + mx[px+"pods_cond_podready"] = ns.stats.podsCondPodReady + mx[px+"pods_cond_podscheduled"] = ns.stats.podsCondPodScheduled + mx[px+"pods_cond_podinitialized"] = ns.stats.podsCondPodInitialized + mx[px+"pods_cond_containersready"] = ns.stats.podsCondContainersReady + mx[px+"pods_cond_containersready"] = ns.stats.podsCondContainersReady + mx[px+"schedulability_schedulable"] = boolToInt(!ns.unSchedulable) + mx[px+"schedulability_unschedulable"] = boolToInt(ns.unSchedulable) + mx[px+"alloc_pods_available"] = ns.allocatablePods - ns.stats.pods + mx[px+"alloc_pods_allocated"] = ns.stats.pods + mx[px+"alloc_cpu_requests_util"] = calcPercentage(ns.stats.reqCPU, ns.allocatableCPU) + mx[px+"alloc_cpu_limits_util"] = calcPercentage(ns.stats.limitCPU, ns.allocatableCPU) + mx[px+"alloc_mem_requests_util"] = calcPercentage(ns.stats.reqMem, ns.allocatableMem) + mx[px+"alloc_mem_limits_util"] = calcPercentage(ns.stats.limitMem, ns.allocatableMem) + mx[px+"alloc_cpu_requests_used"] = ns.stats.reqCPU + mx[px+"alloc_cpu_limits_used"] = ns.stats.limitCPU + mx[px+"alloc_mem_requests_used"] = ns.stats.reqMem + mx[px+"alloc_mem_limits_used"] = ns.stats.limitMem + mx[px+"init_containers"] = ns.stats.initContainers + mx[px+"containers"] = ns.stats.containers + mx[px+"containers_state_running"] = ns.stats.contStateRunning + mx[px+"containers_state_waiting"] = ns.stats.contStateWaiting + mx[px+"containers_state_terminated"] = ns.stats.contStateTerminated + mx[px+"init_containers_state_running"] = ns.stats.initContStateRunning + mx[px+"init_containers_state_waiting"] = ns.stats.initContStateWaiting + mx[px+"init_containers_state_terminated"] = ns.stats.initContStateTerminated + } +} + +func boolToInt(v bool) int64 { + if v { + return 1 + } + return 0 +} + +func condStatusToInt(cs corev1.ConditionStatus) int64 { + switch cs { + case corev1.ConditionFalse: + return 0 + case corev1.ConditionTrue: + return 1 + case corev1.ConditionUnknown: + return 0 + default: + return 0 + } +} + +func calcPercentage(value, total int64) int64 { + if total == 0 { + return 0 + } + return int64(float64(value) / float64(total) * 100 * precision) +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/config_schema.json b/src/go/collectors/go.d.plugin/modules/k8s_state/config_schema.json new file mode 100644 index 000000000..ae66d7cb5 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/config_schema.json @@ -0,0 +1,25 @@ +{ + "jsonSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Kubernetes Cluster State collector configuration.", + "type": "object", + "properties": { + "update_every": { + "title": "Update every", + "description": "Data collection interval, measured in seconds.", + "type": "integer", + "minimum": 1, + "default": 1 + } + }, + "additionalProperties": false, + "patternProperties": { + "^name$": {} + } + }, + "uiSchema": { + "uiOptions": { + "fullPage": true + } + } +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/discover_kubernetes.go b/src/go/collectors/go.d.plugin/modules/k8s_state/discover_kubernetes.go new file mode 100644 index 000000000..a4aeee974 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/discover_kubernetes.go @@ -0,0 +1,160 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "context" + "os" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/logger" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +func newKubeDiscovery(client kubernetes.Interface, l *logger.Logger) *kubeDiscovery { + return &kubeDiscovery{ + client: client, + Logger: l, + readyCh: make(chan struct{}), + stopCh: make(chan struct{}), + } +} + +type kubeDiscovery struct { + *logger.Logger + client kubernetes.Interface + discoverers []discoverer + readyCh chan struct{} + stopCh chan struct{} +} + +func (d *kubeDiscovery) run(ctx context.Context, in chan<- resource) { + d.Info("kube_discoverer is started") + defer func() { close(d.stopCh); d.Info("kube_discoverer is stopped") }() + + d.discoverers = d.setupDiscoverers(ctx) + + var wg sync.WaitGroup + updates := make(chan resource) + + for _, dd := range d.discoverers { + wg.Add(1) + go func(dd discoverer) { defer wg.Done(); dd.run(ctx, updates) }(dd) + } + + wg.Add(1) + go func() { defer wg.Done(); d.runDiscover(ctx, updates, in) }() + + close(d.readyCh) + wg.Wait() + <-ctx.Done() +} + +func (d *kubeDiscovery) ready() bool { + if !isChanClosed(d.readyCh) { + return false + } + for _, dd := range d.discoverers { + if !dd.ready() { + return false + } + } + return true +} + +func (d *kubeDiscovery) stopped() bool { + if !isChanClosed(d.stopCh) { + return false + } + for _, dd := range d.discoverers { + if !dd.stopped() { + return false + } + } + return true +} + +func (d *kubeDiscovery) runDiscover(ctx context.Context, updates chan resource, in chan<- resource) { + for { + select { + case <-ctx.Done(): + return + case r := <-updates: + select { + case <-ctx.Done(): + return + case in <- r: + } + } + } +} + +const resyncPeriod = 10 * time.Minute + +var ( + myNodeName = os.Getenv("MY_NODE_NAME") +) + +func (d *kubeDiscovery) setupDiscoverers(ctx context.Context) []discoverer { + node := d.client.CoreV1().Nodes() + nodeWatcher := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { return node.List(ctx, options) }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return node.Watch(ctx, options) }, + } + + pod := d.client.CoreV1().Pods(corev1.NamespaceAll) + podWatcher := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if myNodeName != "" { + options.FieldSelector = "spec.nodeName=" + myNodeName + } + return pod.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if myNodeName != "" { + options.FieldSelector = "spec.nodeName=" + myNodeName + } + return pod.Watch(ctx, options) + }, + } + + return []discoverer{ + newNodeDiscoverer(cache.NewSharedInformer(nodeWatcher, &corev1.Node{}, resyncPeriod), d.Logger), + newPodDiscoverer(cache.NewSharedInformer(podWatcher, &corev1.Pod{}, resyncPeriod), d.Logger), + } +} + +func enqueue(queue *workqueue.Type, obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + return + } + queue.Add(key) +} + +func send(ctx context.Context, in chan<- resource, r resource) { + if r == nil { + return + } + select { + case <-ctx.Done(): + case in <- r: + } +} + +func isChanClosed(ch chan struct{}) bool { + select { + case <-ch: + return true + default: + return false + } +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/discover_node.go b/src/go/collectors/go.d.plugin/modules/k8s_state/discover_node.go new file mode 100644 index 000000000..29761b204 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/discover_node.go @@ -0,0 +1,105 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "context" + + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +func newNodeDiscoverer(si cache.SharedInformer, l *logger.Logger) *nodeDiscoverer { + if si == nil { + panic("nil node shared informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "node"}) + _, _ = si.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj interface{}) { enqueue(queue, obj) }, + DeleteFunc: func(obj interface{}) { enqueue(queue, obj) }, + }) + + return &nodeDiscoverer{ + Logger: l, + informer: si, + queue: queue, + readyCh: make(chan struct{}), + stopCh: make(chan struct{}), + } +} + +type nodeResource struct { + src string + val interface{} +} + +func (r nodeResource) source() string { return r.src } +func (r nodeResource) kind() kubeResourceKind { return kubeResourceNode } +func (r nodeResource) value() interface{} { return r.val } + +type nodeDiscoverer struct { + *logger.Logger + informer cache.SharedInformer + queue *workqueue.Type + readyCh chan struct{} + stopCh chan struct{} +} + +func (d *nodeDiscoverer) run(ctx context.Context, in chan<- resource) { + d.Info("node_discoverer is started") + defer func() { close(d.stopCh); d.Info("node_discoverer is stopped") }() + + defer d.queue.ShutDown() + + go d.informer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), d.informer.HasSynced) { + return + } + + go d.runDiscover(ctx, in) + close(d.readyCh) + + <-ctx.Done() +} + +func (d *nodeDiscoverer) ready() bool { return isChanClosed(d.readyCh) } +func (d *nodeDiscoverer) stopped() bool { return isChanClosed(d.stopCh) } + +func (d *nodeDiscoverer) runDiscover(ctx context.Context, in chan<- resource) { + for { + item, shutdown := d.queue.Get() + if shutdown { + return + } + + func() { + defer d.queue.Done(item) + + key := item.(string) + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + item, exists, err := d.informer.GetStore().GetByKey(key) + if err != nil { + return + } + + r := &nodeResource{src: nodeSource(name)} + if exists { + r.val = item + } + send(ctx, in, r) + }() + } +} + +func nodeSource(name string) string { + return "k8s/node/" + name +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/discover_pod.go b/src/go/collectors/go.d.plugin/modules/k8s_state/discover_pod.go new file mode 100644 index 000000000..2def7ad50 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/discover_pod.go @@ -0,0 +1,105 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "context" + + "github.com/netdata/netdata/go/go.d.plugin/logger" + + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +func newPodDiscoverer(si cache.SharedInformer, l *logger.Logger) *podDiscoverer { + if si == nil { + panic("nil pod shared informer") + } + + queue := workqueue.NewWithConfig(workqueue.QueueConfig{Name: "pod"}) + _, _ = si.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { enqueue(queue, obj) }, + UpdateFunc: func(_, obj interface{}) { enqueue(queue, obj) }, + DeleteFunc: func(obj interface{}) { enqueue(queue, obj) }, + }) + + return &podDiscoverer{ + Logger: l, + informer: si, + queue: queue, + readyCh: make(chan struct{}), + stopCh: make(chan struct{}), + } +} + +type podResource struct { + src string + val interface{} +} + +func (r podResource) source() string { return r.src } +func (r podResource) kind() kubeResourceKind { return kubeResourcePod } +func (r podResource) value() interface{} { return r.val } + +type podDiscoverer struct { + *logger.Logger + informer cache.SharedInformer + queue *workqueue.Type + readyCh chan struct{} + stopCh chan struct{} +} + +func (d *podDiscoverer) run(ctx context.Context, in chan<- resource) { + d.Info("pod_discoverer is started") + defer func() { close(d.stopCh); d.Info("pod_discoverer is stopped") }() + + defer d.queue.ShutDown() + + go d.informer.Run(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), d.informer.HasSynced) { + return + } + + go d.runDiscover(ctx, in) + close(d.readyCh) + + <-ctx.Done() +} + +func (d *podDiscoverer) ready() bool { return isChanClosed(d.readyCh) } +func (d *podDiscoverer) stopped() bool { return isChanClosed(d.stopCh) } + +func (d *podDiscoverer) runDiscover(ctx context.Context, in chan<- resource) { + for { + item, shutdown := d.queue.Get() + if shutdown { + return + } + + func() { + defer d.queue.Done(item) + + key := item.(string) + ns, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return + } + + item, exists, err := d.informer.GetStore().GetByKey(key) + if err != nil { + return + } + + r := &podResource{src: podSource(ns, name)} + if exists { + r.val = item + } + send(ctx, in, r) + }() + } +} + +func podSource(namespace, name string) string { + return "k8s/pod/" + namespace + "/" + name +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/init.go b/src/go/collectors/go.d.plugin/modules/k8s_state/init.go new file mode 100644 index 000000000..2892da1c6 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/init.go @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "k8s.io/client-go/kubernetes" +) + +func (ks KubeState) initClient() (kubernetes.Interface, error) { + return ks.newKubeClient() +} + +func (ks *KubeState) initDiscoverer(client kubernetes.Interface) discoverer { + return newKubeDiscovery(client, ks.Logger) +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/integrations/kubernetes_cluster_state.md b/src/go/collectors/go.d.plugin/modules/k8s_state/integrations/kubernetes_cluster_state.md new file mode 100644 index 000000000..570c5de03 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/integrations/kubernetes_cluster_state.md @@ -0,0 +1,218 @@ +<!--startmeta +custom_edit_url: "https://github.com/netdata/netdata/edit/master/src/go/collectors/go.d.plugin/modules/k8s_state/README.md" +meta_yaml: "https://github.com/netdata/netdata/edit/master/src/go/collectors/go.d.plugin/modules/k8s_state/metadata.yaml" +sidebar_label: "Kubernetes Cluster State" +learn_status: "Published" +learn_rel_path: "Collecting Metrics/Kubernetes" +most_popular: True +message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE COLLECTOR'S metadata.yaml FILE" +endmeta--> + +# Kubernetes Cluster State + + +<img src="https://netdata.cloud/img/kubernetes.svg" width="150"/> + + +Plugin: go.d.plugin +Module: k8s_state + +<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" /> + +## Overview + +This collector monitors Kubernetes Nodes, Pods and Containers. + + + + +This collector is supported on all platforms. + +This collector only supports collecting metrics from a single instance of this integration. + + +### Default Behavior + +#### Auto-Detection + +This integration doesn't support auto-detection. + +#### Limits + +The default configuration for this integration does not impose any limits on data collection. + +#### Performance Impact + +The default configuration for this integration is not expected to impose a significant performance impact on the system. + + +## Metrics + +Metrics grouped by *scope*. + +The scope defines the instance that the metric belongs to. An instance is uniquely identified by a set of labels. + + + +### Per node + +These metrics refer to the Node. + +Labels: + +| Label | Description | +|:-----------|:----------------| +| k8s_cluster_id | Cluster ID. This is equal to the kube-system namespace UID. | +| k8s_cluster_name | Cluster name. Cluster name discovery only works in GKE. | +| k8s_node_name | Node name. | + +Metrics: + +| Metric | Dimensions | Unit | +|:------|:----------|:----| +| k8s_state.node_allocatable_cpu_requests_utilization | requests | % | +| k8s_state.node_allocatable_cpu_requests_used | requests | millicpu | +| k8s_state.node_allocatable_cpu_limits_utilization | limits | % | +| k8s_state.node_allocatable_cpu_limits_used | limits | millicpu | +| k8s_state.node_allocatable_mem_requests_utilization | requests | % | +| k8s_state.node_allocatable_mem_requests_used | requests | bytes | +| k8s_state.node_allocatable_mem_limits_utilization | limits | % | +| k8s_state.node_allocatable_mem_limits_used | limits | bytes | +| k8s_state.node_allocatable_pods_utilization | allocated | % | +| k8s_state.node_allocatable_pods_usage | available, allocated | pods | +| k8s_state.node_condition | a dimension per condition | status | +| k8s_state.node_schedulability | schedulable, unschedulable | state | +| k8s_state.node_pods_readiness | ready | % | +| k8s_state.node_pods_readiness_state | ready, unready | pods | +| k8s_state.node_pods_condition | pod_ready, pod_scheduled, pod_initialized, containers_ready | pods | +| k8s_state.node_pods_phase | running, failed, succeeded, pending | pods | +| k8s_state.node_containers | containers, init_containers | containers | +| k8s_state.node_containers_state | running, waiting, terminated | containers | +| k8s_state.node_init_containers_state | running, waiting, terminated | containers | +| k8s_state.node_age | age | seconds | + +### Per pod + +These metrics refer to the Pod. + +Labels: + +| Label | Description | +|:-----------|:----------------| +| k8s_cluster_id | Cluster ID. This is equal to the kube-system namespace UID. | +| k8s_cluster_name | Cluster name. Cluster name discovery only works in GKE. | +| k8s_node_name | Node name. | +| k8s_namespace | Namespace. | +| k8s_controller_kind | Controller kind (ReplicaSet, DaemonSet, StatefulSet, Job, etc.). | +| k8s_controller_name | Controller name. | +| k8s_pod_name | Pod name. | +| k8s_qos_class | Pod QOS class (burstable, guaranteed, besteffort). | + +Metrics: + +| Metric | Dimensions | Unit | +|:------|:----------|:----| +| k8s_state.pod_cpu_requests_used | requests | millicpu | +| k8s_state.pod_cpu_limits_used | limits | millicpu | +| k8s_state.pod_mem_requests_used | requests | bytes | +| k8s_state.pod_mem_limits_used | limits | bytes | +| k8s_state.pod_condition | pod_ready, pod_scheduled, pod_initialized, containers_ready | state | +| k8s_state.pod_phase | running, failed, succeeded, pending | state | +| k8s_state.pod_age | age | seconds | +| k8s_state.pod_containers | containers, init_containers | containers | +| k8s_state.pod_containers_state | running, waiting, terminated | containers | +| k8s_state.pod_init_containers_state | running, waiting, terminated | containers | + +### Per container + +These metrics refer to the Pod container. + +Labels: + +| Label | Description | +|:-----------|:----------------| +| k8s_cluster_id | Cluster ID. This is equal to the kube-system namespace UID. | +| k8s_cluster_name | Cluster name. Cluster name discovery only works in GKE. | +| k8s_node_name | Node name. | +| k8s_namespace | Namespace. | +| k8s_controller_kind | Controller kind (ReplicaSet, DaemonSet, StatefulSet, Job, etc.). | +| k8s_controller_name | Controller name. | +| k8s_pod_name | Pod name. | +| k8s_qos_class | Pod QOS class (burstable, guaranteed, besteffort). | +| k8s_container_name | Container name. | + +Metrics: + +| Metric | Dimensions | Unit | +|:------|:----------|:----| +| k8s_state.pod_container_readiness_state | ready | state | +| k8s_state.pod_container_restarts | restarts | restarts | +| k8s_state.pod_container_state | running, waiting, terminated | state | +| k8s_state.pod_container_waiting_state_reason | a dimension per reason | state | +| k8s_state.pod_container_terminated_state_reason | a dimension per reason | state | + + + +## Alerts + +There are no alerts configured by default for this integration. + + +## Setup + +### Prerequisites + +No action required. + +### Configuration + +#### File + +The configuration file name for this integration is `go.d/k8s_state.conf`. + + +You can edit the configuration file using the `edit-config` script from the +Netdata [config directory](https://github.com/netdata/netdata/blob/master/docs/netdata-agent/configuration.md#the-netdata-config-directory). + +```bash +cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata +sudo ./edit-config go.d/k8s_state.conf +``` +#### Options + + + +There are no configuration options. + +#### Examples +There are no configuration examples. + + + +## Troubleshooting + +### Debug Mode + +To troubleshoot issues with the `k8s_state` collector, run the `go.d.plugin` with the debug option enabled. The output +should give you clues as to why the collector isn't working. + +- Navigate to the `plugins.d` directory, usually at `/usr/libexec/netdata/plugins.d/`. If that's not the case on + your system, open `netdata.conf` and look for the `plugins` setting under `[directories]`. + + ```bash + cd /usr/libexec/netdata/plugins.d/ + ``` + +- Switch to the `netdata` user. + + ```bash + sudo -u netdata -s + ``` + +- Run the `go.d.plugin` to debug the collector: + + ```bash + ./go.d.plugin -d -m k8s_state + ``` + + diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/kube_state.go b/src/go/collectors/go.d.plugin/modules/k8s_state/kube_state.go new file mode 100644 index 000000000..eca8ed7fd --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/kube_state.go @@ -0,0 +1,146 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "context" + _ "embed" + "errors" + "fmt" + "sync" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/module" + + "k8s.io/client-go/kubernetes" +) + +//go:embed "config_schema.json" +var configSchema string + +func init() { + module.Register("k8s_state", module.Creator{ + JobConfigSchema: configSchema, + Defaults: module.Defaults{ + Disabled: true, + }, + Create: func() module.Module { return New() }, + }) +} + +func New() *KubeState { + return &KubeState{ + initDelay: time.Second * 3, + newKubeClient: newKubeClient, + charts: baseCharts.Copy(), + once: &sync.Once{}, + wg: &sync.WaitGroup{}, + state: newKubeState(), + } +} + +type Config struct { + UpdateEvery int `yaml:"update_every" json:"update_every"` +} + +type ( + KubeState struct { + module.Base + Config `yaml:",inline" json:""` + + charts *module.Charts + + client kubernetes.Interface + newKubeClient func() (kubernetes.Interface, error) + + startTime time.Time + initDelay time.Duration + once *sync.Once + wg *sync.WaitGroup + discoverer discoverer + ctx context.Context + ctxCancel context.CancelFunc + kubeClusterID string + kubeClusterName string + + state *kubeState + } + discoverer interface { + run(ctx context.Context, in chan<- resource) + ready() bool + stopped() bool + } +) + +func (ks *KubeState) Configuration() any { + return ks.Config +} + +func (ks *KubeState) Init() error { + client, err := ks.initClient() + if err != nil { + ks.Errorf("client initialization: %v", err) + return err + } + ks.client = client + + ks.ctx, ks.ctxCancel = context.WithCancel(context.Background()) + + ks.discoverer = ks.initDiscoverer(ks.client) + + return nil +} + +func (ks *KubeState) Check() error { + if ks.client == nil || ks.discoverer == nil { + ks.Error("not initialized job") + return errors.New("not initialized") + } + + ver, err := ks.client.Discovery().ServerVersion() + if err != nil { + err := fmt.Errorf("failed to connect to K8s API server: %v", err) + ks.Error(err) + return err + } + + ks.Infof("successfully connected to the Kubernetes API server '%s'", ver) + + return nil +} + +func (ks *KubeState) Charts() *module.Charts { + return ks.charts +} + +func (ks *KubeState) Collect() map[string]int64 { + ms, err := ks.collect() + if err != nil { + ks.Error(err) + } + + if len(ms) == 0 { + return nil + } + return ms +} + +func (ks *KubeState) Cleanup() { + if ks.ctxCancel == nil { + return + } + ks.ctxCancel() + + c := make(chan struct{}) + go func() { defer close(c); ks.wg.Wait() }() + + t := time.NewTimer(time.Second * 5) + defer t.Stop() + + select { + case <-c: + return + case <-t.C: + return + } +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/kube_state_test.go b/src/go/collectors/go.d.plugin/modules/k8s_state/kube_state_test.go new file mode 100644 index 000000000..99560d6dc --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/kube_state_test.go @@ -0,0 +1,859 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/agent/module" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + apiresource "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/discovery" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +var ( + dataConfigJSON, _ = os.ReadFile("testdata/config.json") + dataConfigYAML, _ = os.ReadFile("testdata/config.yaml") +) + +func Test_testDataIsValid(t *testing.T) { + for name, data := range map[string][]byte{ + "dataConfigJSON": dataConfigJSON, + "dataConfigYAML": dataConfigYAML, + } { + require.NotNil(t, data, name) + } +} + +func TestKubeState_ConfigurationSerialize(t *testing.T) { + module.TestConfigurationSerialize(t, &KubeState{}, dataConfigJSON, dataConfigYAML) +} + +func TestKubeState_Init(t *testing.T) { + tests := map[string]struct { + wantFail bool + prepare func() *KubeState + }{ + "success when no error on initializing K8s client": { + wantFail: false, + prepare: func() *KubeState { + ks := New() + ks.newKubeClient = func() (kubernetes.Interface, error) { return fake.NewSimpleClientset(), nil } + return ks + }, + }, + "fail when get an error on initializing K8s client": { + wantFail: true, + prepare: func() *KubeState { + ks := New() + ks.newKubeClient = func() (kubernetes.Interface, error) { return nil, errors.New("newKubeClient() error") } + return ks + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ks := test.prepare() + + if test.wantFail { + assert.Error(t, ks.Init()) + } else { + assert.NoError(t, ks.Init()) + } + }) + } +} + +func TestKubeState_Check(t *testing.T) { + tests := map[string]struct { + wantFail bool + prepare func() *KubeState + }{ + "success when connected to the K8s API": { + wantFail: false, + prepare: func() *KubeState { + ks := New() + ks.newKubeClient = func() (kubernetes.Interface, error) { return fake.NewSimpleClientset(), nil } + return ks + }, + }, + "fail when not connected to the K8s API": { + wantFail: true, + prepare: func() *KubeState { + ks := New() + client := &brokenInfoKubeClient{fake.NewSimpleClientset()} + ks.newKubeClient = func() (kubernetes.Interface, error) { return client, nil } + return ks + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ks := test.prepare() + require.NoError(t, ks.Init()) + + if test.wantFail { + assert.Error(t, ks.Check()) + } else { + assert.NoError(t, ks.Check()) + } + }) + } +} + +func TestKubeState_Charts(t *testing.T) { + ks := New() + + assert.NotEmpty(t, *ks.Charts()) +} + +func TestKubeState_Cleanup(t *testing.T) { + tests := map[string]struct { + prepare func() *KubeState + doInit bool + doCollect bool + }{ + "before init": { + doInit: false, + doCollect: false, + prepare: func() *KubeState { + ks := New() + ks.newKubeClient = func() (kubernetes.Interface, error) { return fake.NewSimpleClientset(), nil } + return ks + }, + }, + "after init": { + doInit: true, + doCollect: false, + prepare: func() *KubeState { + ks := New() + ks.newKubeClient = func() (kubernetes.Interface, error) { return fake.NewSimpleClientset(), nil } + return ks + }, + }, + "after collect": { + doInit: true, + doCollect: true, + prepare: func() *KubeState { + ks := New() + ks.newKubeClient = func() (kubernetes.Interface, error) { return fake.NewSimpleClientset(), nil } + return ks + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + ks := test.prepare() + + if test.doInit { + _ = ks.Init() + } + if test.doCollect { + _ = ks.Collect() + time.Sleep(ks.initDelay) + } + + assert.NotPanics(t, ks.Cleanup) + time.Sleep(time.Second) + if test.doCollect { + assert.True(t, ks.discoverer.stopped()) + } + }) + } +} + +func TestKubeState_Collect(t *testing.T) { + type ( + testCaseStep func(t *testing.T, ks *KubeState) + testCase struct { + client kubernetes.Interface + steps []testCaseStep + } + ) + + tests := map[string]struct { + create func(t *testing.T) testCase + }{ + "Node only": { + create: func(t *testing.T) testCase { + client := fake.NewSimpleClientset( + newNode("node01"), + ) + + step1 := func(t *testing.T, ks *KubeState) { + mx := ks.Collect() + expected := map[string]int64{ + "discovery_node_discoverer_state": 1, + "discovery_pod_discoverer_state": 1, + "node_node01_age": 3, + "node_node01_alloc_cpu_limits_used": 0, + "node_node01_alloc_cpu_limits_util": 0, + "node_node01_alloc_cpu_requests_used": 0, + "node_node01_alloc_cpu_requests_util": 0, + "node_node01_alloc_mem_limits_used": 0, + "node_node01_alloc_mem_limits_util": 0, + "node_node01_alloc_mem_requests_used": 0, + "node_node01_alloc_mem_requests_util": 0, + "node_node01_alloc_pods_allocated": 0, + "node_node01_alloc_pods_available": 110, + "node_node01_alloc_pods_util": 0, + "node_node01_cond_diskpressure": 0, + "node_node01_cond_memorypressure": 0, + "node_node01_cond_networkunavailable": 0, + "node_node01_cond_pidpressure": 0, + "node_node01_cond_ready": 1, + "node_node01_schedulability_schedulable": 1, + "node_node01_schedulability_unschedulable": 0, + "node_node01_containers": 0, + "node_node01_containers_state_running": 0, + "node_node01_containers_state_terminated": 0, + "node_node01_containers_state_waiting": 0, + "node_node01_init_containers": 0, + "node_node01_init_containers_state_running": 0, + "node_node01_init_containers_state_terminated": 0, + "node_node01_init_containers_state_waiting": 0, + "node_node01_pods_cond_containersready": 0, + "node_node01_pods_cond_podinitialized": 0, + "node_node01_pods_cond_podready": 0, + "node_node01_pods_cond_podscheduled": 0, + "node_node01_pods_phase_failed": 0, + "node_node01_pods_phase_pending": 0, + "node_node01_pods_phase_running": 0, + "node_node01_pods_phase_succeeded": 0, + "node_node01_pods_readiness": 0, + "node_node01_pods_readiness_ready": 0, + "node_node01_pods_readiness_unready": 0, + } + copyAge(expected, mx) + assert.Equal(t, expected, mx) + assert.Equal(t, + len(nodeChartsTmpl)+len(baseCharts), + len(*ks.Charts()), + ) + } + + return testCase{ + client: client, + steps: []testCaseStep{step1}, + } + }, + }, + "Pod only": { + create: func(t *testing.T) testCase { + pod := newPod("node01", "pod01") + client := fake.NewSimpleClientset( + pod, + ) + + step1 := func(t *testing.T, ks *KubeState) { + mx := ks.Collect() + expected := map[string]int64{ + "discovery_node_discoverer_state": 1, + "discovery_pod_discoverer_state": 1, + "pod_default_pod01_age": 3, + "pod_default_pod01_cpu_limits_used": 400, + "pod_default_pod01_cpu_requests_used": 200, + "pod_default_pod01_mem_limits_used": 419430400, + "pod_default_pod01_mem_requests_used": 209715200, + "pod_default_pod01_cond_containersready": 1, + "pod_default_pod01_cond_podinitialized": 1, + "pod_default_pod01_cond_podready": 1, + "pod_default_pod01_cond_podscheduled": 1, + "pod_default_pod01_container_container1_readiness": 1, + "pod_default_pod01_container_container1_restarts": 0, + "pod_default_pod01_container_container1_state_running": 1, + "pod_default_pod01_container_container1_state_terminated": 0, + "pod_default_pod01_container_container1_state_waiting": 0, + "pod_default_pod01_container_container2_readiness": 1, + "pod_default_pod01_container_container2_restarts": 0, + "pod_default_pod01_container_container2_state_running": 1, + "pod_default_pod01_container_container2_state_terminated": 0, + "pod_default_pod01_container_container2_state_waiting": 0, + "pod_default_pod01_containers": 2, + "pod_default_pod01_containers_state_running": 2, + "pod_default_pod01_containers_state_terminated": 0, + "pod_default_pod01_containers_state_waiting": 0, + "pod_default_pod01_init_containers": 1, + "pod_default_pod01_init_containers_state_running": 0, + "pod_default_pod01_init_containers_state_terminated": 1, + "pod_default_pod01_init_containers_state_waiting": 0, + "pod_default_pod01_phase_failed": 0, + "pod_default_pod01_phase_pending": 0, + "pod_default_pod01_phase_running": 1, + "pod_default_pod01_phase_succeeded": 0, + } + copyAge(expected, mx) + + assert.Equal(t, expected, mx) + assert.Equal(t, + len(podChartsTmpl)+len(containerChartsTmpl)*len(pod.Spec.Containers)+len(baseCharts), + len(*ks.Charts()), + ) + } + + return testCase{ + client: client, + steps: []testCaseStep{step1}, + } + }, + }, + "Nodes and Pods": { + create: func(t *testing.T) testCase { + node := newNode("node01") + pod := newPod(node.Name, "pod01") + client := fake.NewSimpleClientset( + node, + pod, + ) + + step1 := func(t *testing.T, ks *KubeState) { + mx := ks.Collect() + expected := map[string]int64{ + "discovery_node_discoverer_state": 1, + "discovery_pod_discoverer_state": 1, + "node_node01_age": 3, + "node_node01_alloc_cpu_limits_used": 400, + "node_node01_alloc_cpu_limits_util": 11428, + "node_node01_alloc_cpu_requests_used": 200, + "node_node01_alloc_cpu_requests_util": 5714, + "node_node01_alloc_mem_limits_used": 419430400, + "node_node01_alloc_mem_limits_util": 11428, + "node_node01_alloc_mem_requests_used": 209715200, + "node_node01_alloc_mem_requests_util": 5714, + "node_node01_alloc_pods_allocated": 1, + "node_node01_alloc_pods_available": 109, + "node_node01_alloc_pods_util": 909, + "node_node01_cond_diskpressure": 0, + "node_node01_cond_memorypressure": 0, + "node_node01_cond_networkunavailable": 0, + "node_node01_cond_pidpressure": 0, + "node_node01_cond_ready": 1, + "node_node01_schedulability_schedulable": 1, + "node_node01_schedulability_unschedulable": 0, + "node_node01_containers": 2, + "node_node01_containers_state_running": 2, + "node_node01_containers_state_terminated": 0, + "node_node01_containers_state_waiting": 0, + "node_node01_init_containers": 1, + "node_node01_init_containers_state_running": 0, + "node_node01_init_containers_state_terminated": 1, + "node_node01_init_containers_state_waiting": 0, + "node_node01_pods_cond_containersready": 1, + "node_node01_pods_cond_podinitialized": 1, + "node_node01_pods_cond_podready": 1, + "node_node01_pods_cond_podscheduled": 1, + "node_node01_pods_phase_failed": 0, + "node_node01_pods_phase_pending": 0, + "node_node01_pods_phase_running": 1, + "node_node01_pods_phase_succeeded": 0, + "node_node01_pods_readiness": 100000, + "node_node01_pods_readiness_ready": 1, + "node_node01_pods_readiness_unready": 0, + "pod_default_pod01_age": 3, + "pod_default_pod01_cpu_limits_used": 400, + "pod_default_pod01_cpu_requests_used": 200, + "pod_default_pod01_mem_limits_used": 419430400, + "pod_default_pod01_mem_requests_used": 209715200, + "pod_default_pod01_cond_containersready": 1, + "pod_default_pod01_cond_podinitialized": 1, + "pod_default_pod01_cond_podready": 1, + "pod_default_pod01_cond_podscheduled": 1, + "pod_default_pod01_container_container1_readiness": 1, + "pod_default_pod01_container_container1_restarts": 0, + "pod_default_pod01_container_container1_state_running": 1, + "pod_default_pod01_container_container1_state_terminated": 0, + "pod_default_pod01_container_container1_state_waiting": 0, + "pod_default_pod01_container_container2_readiness": 1, + "pod_default_pod01_container_container2_restarts": 0, + "pod_default_pod01_container_container2_state_running": 1, + "pod_default_pod01_container_container2_state_terminated": 0, + "pod_default_pod01_container_container2_state_waiting": 0, + "pod_default_pod01_containers": 2, + "pod_default_pod01_containers_state_running": 2, + "pod_default_pod01_containers_state_terminated": 0, + "pod_default_pod01_containers_state_waiting": 0, + "pod_default_pod01_init_containers": 1, + "pod_default_pod01_init_containers_state_running": 0, + "pod_default_pod01_init_containers_state_terminated": 1, + "pod_default_pod01_init_containers_state_waiting": 0, + "pod_default_pod01_phase_failed": 0, + "pod_default_pod01_phase_pending": 0, + "pod_default_pod01_phase_running": 1, + "pod_default_pod01_phase_succeeded": 0, + } + copyAge(expected, mx) + + assert.Equal(t, expected, mx) + assert.Equal(t, + len(nodeChartsTmpl)+len(podChartsTmpl)+len(containerChartsTmpl)*len(pod.Spec.Containers)+len(baseCharts), + len(*ks.Charts()), + ) + } + + return testCase{ + client: client, + steps: []testCaseStep{step1}, + } + }, + }, + "delete a Pod in runtime": { + create: func(t *testing.T) testCase { + ctx := context.Background() + node := newNode("node01") + pod := newPod(node.Name, "pod01") + client := fake.NewSimpleClientset( + node, + pod, + ) + step1 := func(t *testing.T, ks *KubeState) { + _ = ks.Collect() + _ = client.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) + } + + step2 := func(t *testing.T, ks *KubeState) { + mx := ks.Collect() + expected := map[string]int64{ + "discovery_node_discoverer_state": 1, + "discovery_pod_discoverer_state": 1, + "node_node01_age": 4, + "node_node01_alloc_cpu_limits_used": 0, + "node_node01_alloc_cpu_limits_util": 0, + "node_node01_alloc_cpu_requests_used": 0, + "node_node01_alloc_cpu_requests_util": 0, + "node_node01_alloc_mem_limits_used": 0, + "node_node01_alloc_mem_limits_util": 0, + "node_node01_alloc_mem_requests_used": 0, + "node_node01_alloc_mem_requests_util": 0, + "node_node01_alloc_pods_allocated": 0, + "node_node01_alloc_pods_available": 110, + "node_node01_alloc_pods_util": 0, + "node_node01_cond_diskpressure": 0, + "node_node01_cond_memorypressure": 0, + "node_node01_cond_networkunavailable": 0, + "node_node01_cond_pidpressure": 0, + "node_node01_cond_ready": 1, + "node_node01_schedulability_schedulable": 1, + "node_node01_schedulability_unschedulable": 0, + "node_node01_containers": 0, + "node_node01_containers_state_running": 0, + "node_node01_containers_state_terminated": 0, + "node_node01_containers_state_waiting": 0, + "node_node01_init_containers": 0, + "node_node01_init_containers_state_running": 0, + "node_node01_init_containers_state_terminated": 0, + "node_node01_init_containers_state_waiting": 0, + "node_node01_pods_cond_containersready": 0, + "node_node01_pods_cond_podinitialized": 0, + "node_node01_pods_cond_podready": 0, + "node_node01_pods_cond_podscheduled": 0, + "node_node01_pods_phase_failed": 0, + "node_node01_pods_phase_pending": 0, + "node_node01_pods_phase_running": 0, + "node_node01_pods_phase_succeeded": 0, + "node_node01_pods_readiness": 0, + "node_node01_pods_readiness_ready": 0, + "node_node01_pods_readiness_unready": 0, + } + copyAge(expected, mx) + + assert.Equal(t, expected, mx) + assert.Equal(t, + len(nodeChartsTmpl)+len(podChartsTmpl)+len(containerChartsTmpl)*len(pod.Spec.Containers)+len(baseCharts), + len(*ks.Charts()), + ) + assert.Equal(t, + len(podChartsTmpl)+len(containerChartsTmpl)*len(pod.Spec.Containers), + calcObsoleteCharts(*ks.Charts()), + ) + } + + return testCase{ + client: client, + steps: []testCaseStep{step1, step2}, + } + }, + }, + "slow spec.NodeName set": { + create: func(t *testing.T) testCase { + ctx := context.Background() + node := newNode("node01") + podOrig := newPod(node.Name, "pod01") + podOrig.Spec.NodeName = "" + client := fake.NewSimpleClientset( + node, + podOrig, + ) + podUpdated := newPod(node.Name, "pod01") // with set Spec.NodeName + + step1 := func(t *testing.T, ks *KubeState) { + _ = ks.Collect() + for _, c := range *ks.Charts() { + if strings.HasPrefix(c.ID, "pod_") { + ok := isLabelValueSet(c, labelKeyNodeName) + assert.Falsef(t, ok, "chart '%s' has not empty %s label", c.ID, labelKeyNodeName) + } + } + } + step2 := func(t *testing.T, ks *KubeState) { + _, _ = client.CoreV1().Pods(podOrig.Namespace).Update(ctx, podUpdated, metav1.UpdateOptions{}) + time.Sleep(time.Millisecond * 50) + _ = ks.Collect() + + for _, c := range *ks.Charts() { + if strings.HasPrefix(c.ID, "pod_") { + ok := isLabelValueSet(c, labelKeyNodeName) + assert.Truef(t, ok, "chart '%s' has empty %s label", c.ID, labelKeyNodeName) + } + } + } + + return testCase{ + client: client, + steps: []testCaseStep{step1, step2}, + } + }, + }, + "add a Pod in runtime": { + create: func(t *testing.T) testCase { + ctx := context.Background() + node := newNode("node01") + pod1 := newPod(node.Name, "pod01") + pod2 := newPod(node.Name, "pod02") + client := fake.NewSimpleClientset( + node, + pod1, + ) + step1 := func(t *testing.T, ks *KubeState) { + _ = ks.Collect() + _, _ = client.CoreV1().Pods(pod1.Namespace).Create(ctx, pod2, metav1.CreateOptions{}) + } + + step2 := func(t *testing.T, ks *KubeState) { + mx := ks.Collect() + expected := map[string]int64{ + "discovery_node_discoverer_state": 1, + "discovery_pod_discoverer_state": 1, + "node_node01_age": 4, + "node_node01_alloc_cpu_limits_used": 800, + "node_node01_alloc_cpu_limits_util": 22857, + "node_node01_alloc_cpu_requests_used": 400, + "node_node01_alloc_cpu_requests_util": 11428, + "node_node01_alloc_mem_limits_used": 838860800, + "node_node01_alloc_mem_limits_util": 22857, + "node_node01_alloc_mem_requests_used": 419430400, + "node_node01_alloc_mem_requests_util": 11428, + "node_node01_alloc_pods_allocated": 2, + "node_node01_alloc_pods_available": 108, + "node_node01_alloc_pods_util": 1818, + "node_node01_cond_diskpressure": 0, + "node_node01_cond_memorypressure": 0, + "node_node01_cond_networkunavailable": 0, + "node_node01_cond_pidpressure": 0, + "node_node01_cond_ready": 1, + "node_node01_schedulability_schedulable": 1, + "node_node01_schedulability_unschedulable": 0, + "node_node01_containers": 4, + "node_node01_containers_state_running": 4, + "node_node01_containers_state_terminated": 0, + "node_node01_containers_state_waiting": 0, + "node_node01_init_containers": 2, + "node_node01_init_containers_state_running": 0, + "node_node01_init_containers_state_terminated": 2, + "node_node01_init_containers_state_waiting": 0, + "node_node01_pods_cond_containersready": 2, + "node_node01_pods_cond_podinitialized": 2, + "node_node01_pods_cond_podready": 2, + "node_node01_pods_cond_podscheduled": 2, + "node_node01_pods_phase_failed": 0, + "node_node01_pods_phase_pending": 0, + "node_node01_pods_phase_running": 2, + "node_node01_pods_phase_succeeded": 0, + "node_node01_pods_readiness": 100000, + "node_node01_pods_readiness_ready": 2, + "node_node01_pods_readiness_unready": 0, + "pod_default_pod01_age": 4, + "pod_default_pod01_cpu_limits_used": 400, + "pod_default_pod01_cpu_requests_used": 200, + "pod_default_pod01_mem_limits_used": 419430400, + "pod_default_pod01_mem_requests_used": 209715200, + "pod_default_pod01_cond_containersready": 1, + "pod_default_pod01_cond_podinitialized": 1, + "pod_default_pod01_cond_podready": 1, + "pod_default_pod01_cond_podscheduled": 1, + "pod_default_pod01_container_container1_readiness": 1, + "pod_default_pod01_container_container1_restarts": 0, + "pod_default_pod01_container_container1_state_running": 1, + "pod_default_pod01_container_container1_state_terminated": 0, + "pod_default_pod01_container_container1_state_waiting": 0, + "pod_default_pod01_container_container2_readiness": 1, + "pod_default_pod01_container_container2_restarts": 0, + "pod_default_pod01_container_container2_state_running": 1, + "pod_default_pod01_container_container2_state_terminated": 0, + "pod_default_pod01_container_container2_state_waiting": 0, + "pod_default_pod01_containers": 2, + "pod_default_pod01_containers_state_running": 2, + "pod_default_pod01_containers_state_terminated": 0, + "pod_default_pod01_containers_state_waiting": 0, + "pod_default_pod01_init_containers": 1, + "pod_default_pod01_init_containers_state_running": 0, + "pod_default_pod01_init_containers_state_terminated": 1, + "pod_default_pod01_init_containers_state_waiting": 0, + "pod_default_pod01_phase_failed": 0, + "pod_default_pod01_phase_pending": 0, + "pod_default_pod01_phase_running": 1, + "pod_default_pod01_phase_succeeded": 0, + "pod_default_pod02_age": 4, + "pod_default_pod02_cpu_limits_used": 400, + "pod_default_pod02_cpu_requests_used": 200, + "pod_default_pod02_mem_limits_used": 419430400, + "pod_default_pod02_mem_requests_used": 209715200, + "pod_default_pod02_cond_containersready": 1, + "pod_default_pod02_cond_podinitialized": 1, + "pod_default_pod02_cond_podready": 1, + "pod_default_pod02_cond_podscheduled": 1, + "pod_default_pod02_container_container1_readiness": 1, + "pod_default_pod02_container_container1_restarts": 0, + "pod_default_pod02_container_container1_state_running": 1, + "pod_default_pod02_container_container1_state_terminated": 0, + "pod_default_pod02_container_container1_state_waiting": 0, + "pod_default_pod02_container_container2_readiness": 1, + "pod_default_pod02_container_container2_restarts": 0, + "pod_default_pod02_container_container2_state_running": 1, + "pod_default_pod02_container_container2_state_terminated": 0, + "pod_default_pod02_container_container2_state_waiting": 0, + "pod_default_pod02_containers": 2, + "pod_default_pod02_containers_state_running": 2, + "pod_default_pod02_containers_state_terminated": 0, + "pod_default_pod02_containers_state_waiting": 0, + "pod_default_pod02_init_containers": 1, + "pod_default_pod02_init_containers_state_running": 0, + "pod_default_pod02_init_containers_state_terminated": 1, + "pod_default_pod02_init_containers_state_waiting": 0, + "pod_default_pod02_phase_failed": 0, + "pod_default_pod02_phase_pending": 0, + "pod_default_pod02_phase_running": 1, + "pod_default_pod02_phase_succeeded": 0, + } + copyAge(expected, mx) + + assert.Equal(t, expected, mx) + assert.Equal(t, + len(nodeChartsTmpl)+ + len(podChartsTmpl)*2+ + len(containerChartsTmpl)*len(pod1.Spec.Containers)+ + len(containerChartsTmpl)*len(pod2.Spec.Containers)+ + len(baseCharts), + len(*ks.Charts()), + ) + } + + return testCase{ + client: client, + steps: []testCaseStep{step1, step2}, + } + }, + }, + } + + for name, creator := range tests { + t.Run(name, func(t *testing.T) { + test := creator.create(t) + + ks := New() + ks.newKubeClient = func() (kubernetes.Interface, error) { return test.client, nil } + + require.NoError(t, ks.Init()) + require.NoError(t, ks.Check()) + defer ks.Cleanup() + + for i, executeStep := range test.steps { + if i == 0 { + _ = ks.Collect() + time.Sleep(ks.initDelay) + } else { + time.Sleep(time.Second) + } + executeStep(t, ks) + } + }) + } +} + +func newNode(name string) *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Status: corev1.NodeStatus{ + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: mustQuantity("4000m"), + corev1.ResourceMemory: mustQuantity("4000Mi"), + "pods": mustQuantity("110"), + }, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: mustQuantity("3500m"), + corev1.ResourceMemory: mustQuantity("3500Mi"), + "pods": mustQuantity("110"), + }, + Conditions: []corev1.NodeCondition{ + {Type: corev1.NodeReady, Status: corev1.ConditionTrue}, + {Type: corev1.NodeMemoryPressure, Status: corev1.ConditionFalse}, + {Type: corev1.NodeDiskPressure, Status: corev1.ConditionFalse}, + {Type: corev1.NodePIDPressure, Status: corev1.ConditionFalse}, + {Type: corev1.NodeNetworkUnavailable, Status: corev1.ConditionFalse}, + }, + }, + } +} + +func newPod(nodeName, name string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: corev1.NamespaceDefault, + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + InitContainers: []corev1.Container{ + { + Name: "init-container1", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: mustQuantity("50m"), + corev1.ResourceMemory: mustQuantity("50Mi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: mustQuantity("10m"), + corev1.ResourceMemory: mustQuantity("10Mi"), + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: "container1", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: mustQuantity("200m"), + corev1.ResourceMemory: mustQuantity("200Mi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: mustQuantity("100m"), + corev1.ResourceMemory: mustQuantity("100Mi"), + }, + }, + }, + { + Name: "container2", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: mustQuantity("200m"), + corev1.ResourceMemory: mustQuantity("200Mi")}, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: mustQuantity("100m"), + corev1.ResourceMemory: mustQuantity("100Mi"), + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + {Type: corev1.PodReady, Status: corev1.ConditionTrue}, + {Type: corev1.PodScheduled, Status: corev1.ConditionTrue}, + {Type: corev1.PodInitialized, Status: corev1.ConditionTrue}, + {Type: corev1.ContainersReady, Status: corev1.ConditionTrue}, + }, + InitContainerStatuses: []corev1.ContainerStatus{ + { + Name: "init-container1", + State: corev1.ContainerState{Terminated: &corev1.ContainerStateTerminated{}}, + }, + }, + ContainerStatuses: []corev1.ContainerStatus{ + { + Name: "container1", + Ready: true, + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}, + }, + { + Name: "container2", + Ready: true, + State: corev1.ContainerState{Running: &corev1.ContainerStateRunning{}}, + }, + }, + }, + } +} + +type brokenInfoKubeClient struct { + kubernetes.Interface +} + +func (kc *brokenInfoKubeClient) Discovery() discovery.DiscoveryInterface { + return &brokenInfoDiscovery{kc.Interface.Discovery()} +} + +type brokenInfoDiscovery struct { + discovery.DiscoveryInterface +} + +func (d *brokenInfoDiscovery) ServerVersion() (*version.Info, error) { + return nil, errors.New("brokenInfoDiscovery.ServerVersion() error") +} + +func calcObsoleteCharts(charts module.Charts) (num int) { + for _, c := range charts { + if c.Obsolete { + num++ + } + } + return num +} + +func mustQuantity(s string) apiresource.Quantity { + q, err := apiresource.ParseQuantity(s) + if err != nil { + panic(fmt.Sprintf("fail to create resource quantity: %v", err)) + } + return q +} + +func copyAge(dst, src map[string]int64) { + for k, v := range src { + if !strings.HasSuffix(k, "_age") { + continue + } + if _, ok := dst[k]; ok { + dst[k] = v + } + } +} + +func isLabelValueSet(c *module.Chart, name string) bool { + for _, l := range c.Labels { + if l.Key == name { + return l.Value != "" + } + } + return false +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/metadata.yaml b/src/go/collectors/go.d.plugin/modules/k8s_state/metadata.yaml new file mode 100644 index 000000000..7617b297f --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/metadata.yaml @@ -0,0 +1,356 @@ +plugin_name: go.d.plugin +modules: + - meta: + id: collector-go.d.plugin-k8s_state + plugin_name: go.d.plugin + module_name: k8s_state + monitored_instance: + name: Kubernetes Cluster State + link: https://kubernetes.io/ + icon_filename: kubernetes.svg + categories: + - data-collection.kubernetes + keywords: + - kubernetes + - k8s + related_resources: + integrations: + list: [] + info_provided_to_referring_integrations: + description: "" + most_popular: true + overview: + data_collection: + metrics_description: | + This collector monitors Kubernetes Nodes, Pods and Containers. + method_description: "" + supported_platforms: + include: [] + exclude: [] + multi_instance: false + additional_permissions: + description: "" + default_behavior: + auto_detection: + description: "" + limits: + description: "" + performance_impact: + description: "" + setup: + prerequisites: + list: [] + configuration: + file: + name: go.d/k8s_state.conf + options: + description: "" + folding: + title: Config options + enabled: true + list: [] + examples: + folding: + title: Config + enabled: true + list: [] + troubleshooting: + problems: + list: [] + alerts: [] + metrics: + folding: + title: Metrics + enabled: false + description: "" + availability: [] + scopes: + - name: node + description: These metrics refer to the Node. + labels: + - name: k8s_cluster_id + description: Cluster ID. This is equal to the kube-system namespace UID. + - name: k8s_cluster_name + description: Cluster name. Cluster name discovery only works in GKE. + - name: k8s_node_name + description: Node name. + metrics: + - name: k8s_state.node_allocatable_cpu_requests_utilization + description: CPU requests utilization + unit: '%' + chart_type: line + dimensions: + - name: requests + - name: k8s_state.node_allocatable_cpu_requests_used + description: CPU requests used + unit: millicpu + chart_type: line + dimensions: + - name: requests + - name: k8s_state.node_allocatable_cpu_limits_utilization + description: CPU limits utilization + unit: '%' + chart_type: line + dimensions: + - name: limits + - name: k8s_state.node_allocatable_cpu_limits_used + description: CPU limits used + unit: millicpu + chart_type: line + dimensions: + - name: limits + - name: k8s_state.node_allocatable_mem_requests_utilization + description: Memory requests utilization + unit: '%' + chart_type: line + dimensions: + - name: requests + - name: k8s_state.node_allocatable_mem_requests_used + description: Memory requests used + unit: bytes + chart_type: line + dimensions: + - name: requests + - name: k8s_state.node_allocatable_mem_limits_utilization + description: Memory limits utilization + unit: '%' + chart_type: line + dimensions: + - name: limits + - name: k8s_state.node_allocatable_mem_limits_used + description: Memory limits used + unit: bytes + chart_type: line + dimensions: + - name: limits + - name: k8s_state.node_allocatable_pods_utilization + description: Pods resource utilization + unit: '%' + chart_type: line + dimensions: + - name: allocated + - name: k8s_state.node_allocatable_pods_usage + description: Pods resource usage + unit: pods + chart_type: stacked + dimensions: + - name: available + - name: allocated + - name: k8s_state.node_condition + description: Condition status + unit: status + chart_type: line + dimensions: + - name: a dimension per condition + - name: k8s_state.node_schedulability + description: Schedulability + unit: state + chart_type: line + dimensions: + - name: schedulable + - name: unschedulable + - name: k8s_state.node_pods_readiness + description: Pods readiness + unit: '%' + chart_type: line + dimensions: + - name: ready + - name: k8s_state.node_pods_readiness_state + description: Pods readiness state + unit: pods + chart_type: line + dimensions: + - name: ready + - name: unready + - name: k8s_state.node_pods_condition + description: Pods condition + unit: pods + chart_type: line + dimensions: + - name: pod_ready + - name: pod_scheduled + - name: pod_initialized + - name: containers_ready + - name: k8s_state.node_pods_phase + description: Pods phase + unit: pods + chart_type: stacked + dimensions: + - name: running + - name: failed + - name: succeeded + - name: pending + - name: k8s_state.node_containers + description: Containers + unit: containers + chart_type: line + dimensions: + - name: containers + - name: init_containers + - name: k8s_state.node_containers_state + description: Containers state + unit: containers + chart_type: stacked + dimensions: + - name: running + - name: waiting + - name: terminated + - name: k8s_state.node_init_containers_state + description: Init containers state + unit: containers + chart_type: stacked + dimensions: + - name: running + - name: waiting + - name: terminated + - name: k8s_state.node_age + description: Age + unit: seconds + chart_type: line + dimensions: + - name: age + - name: pod + description: These metrics refer to the Pod. + labels: + - name: k8s_cluster_id + description: Cluster ID. This is equal to the kube-system namespace UID. + - name: k8s_cluster_name + description: Cluster name. Cluster name discovery only works in GKE. + - name: k8s_node_name + description: Node name. + - name: k8s_namespace + description: Namespace. + - name: k8s_controller_kind + description: Controller kind (ReplicaSet, DaemonSet, StatefulSet, Job, etc.). + - name: k8s_controller_name + description: Controller name. + - name: k8s_pod_name + description: Pod name. + - name: k8s_qos_class + description: Pod QOS class (burstable, guaranteed, besteffort). + metrics: + - name: k8s_state.pod_cpu_requests_used + description: CPU requests used + unit: millicpu + chart_type: line + dimensions: + - name: requests + - name: k8s_state.pod_cpu_limits_used + description: CPU limits used + unit: millicpu + chart_type: line + dimensions: + - name: limits + - name: k8s_state.pod_mem_requests_used + description: Memory requests used + unit: bytes + chart_type: line + dimensions: + - name: requests + - name: k8s_state.pod_mem_limits_used + description: Memory limits used + unit: bytes + chart_type: line + dimensions: + - name: limits + - name: k8s_state.pod_condition + description: Condition + unit: state + chart_type: line + dimensions: + - name: pod_ready + - name: pod_scheduled + - name: pod_initialized + - name: containers_ready + - name: k8s_state.pod_phase + description: Phase + unit: state + chart_type: line + dimensions: + - name: running + - name: failed + - name: succeeded + - name: pending + - name: k8s_state.pod_age + description: Age + unit: seconds + chart_type: line + dimensions: + - name: age + - name: k8s_state.pod_containers + description: Containers + unit: containers + chart_type: line + dimensions: + - name: containers + - name: init_containers + - name: k8s_state.pod_containers_state + description: Containers state + unit: containers + chart_type: stacked + dimensions: + - name: running + - name: waiting + - name: terminated + - name: k8s_state.pod_init_containers_state + description: Init containers state + unit: containers + chart_type: stacked + dimensions: + - name: running + - name: waiting + - name: terminated + - name: container + description: These metrics refer to the Pod container. + labels: + - name: k8s_cluster_id + description: Cluster ID. This is equal to the kube-system namespace UID. + - name: k8s_cluster_name + description: Cluster name. Cluster name discovery only works in GKE. + - name: k8s_node_name + description: Node name. + - name: k8s_namespace + description: Namespace. + - name: k8s_controller_kind + description: Controller kind (ReplicaSet, DaemonSet, StatefulSet, Job, etc.). + - name: k8s_controller_name + description: Controller name. + - name: k8s_pod_name + description: Pod name. + - name: k8s_qos_class + description: Pod QOS class (burstable, guaranteed, besteffort). + - name: k8s_container_name + description: Container name. + metrics: + - name: k8s_state.pod_container_readiness_state + description: Readiness state + unit: state + chart_type: line + dimensions: + - name: ready + - name: k8s_state.pod_container_restarts + description: Restarts + unit: restarts + chart_type: line + dimensions: + - name: restarts + - name: k8s_state.pod_container_state + description: Container state + unit: state + chart_type: line + dimensions: + - name: running + - name: waiting + - name: terminated + - name: k8s_state.pod_container_waiting_state_reason + description: Container waiting state reason + unit: state + chart_type: line + dimensions: + - name: a dimension per reason + - name: k8s_state.pod_container_terminated_state_reason + description: Container terminated state reason + unit: state + chart_type: line + dimensions: + - name: a dimension per reason diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/resource.go b/src/go/collectors/go.d.plugin/modules/k8s_state/resource.go new file mode 100644 index 000000000..cabd41a67 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/resource.go @@ -0,0 +1,44 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" +) + +type resource interface { + source() string + kind() kubeResourceKind + value() interface{} +} + +type kubeResourceKind uint8 + +const ( + kubeResourceNode kubeResourceKind = iota + 1 + kubeResourcePod +) + +func toNode(i interface{}) (*corev1.Node, error) { + switch v := i.(type) { + case *corev1.Node: + return v, nil + case resource: + return toNode(v.value()) + default: + return nil, fmt.Errorf("unexpected type: %T (expected %T or %T)", v, &corev1.Node{}, resource(nil)) + } +} + +func toPod(i interface{}) (*corev1.Pod, error) { + switch v := i.(type) { + case *corev1.Pod: + return v, nil + case resource: + return toPod(v.value()) + default: + return nil, fmt.Errorf("unexpected type: %T (expected %T or %T)", v, &corev1.Pod{}, resource(nil)) + } +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/state.go b/src/go/collectors/go.d.plugin/modules/k8s_state/state.go new file mode 100644 index 000000000..1d39df10e --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/state.go @@ -0,0 +1,165 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "sync" + "time" + + corev1 "k8s.io/api/core/v1" +) + +func newKubeState() *kubeState { + return &kubeState{ + Mutex: &sync.Mutex{}, + nodes: make(map[string]*nodeState), + pods: make(map[string]*podState), + } +} + +func newNodeState() *nodeState { + return &nodeState{ + new: true, + labels: make(map[string]string), + conditions: make(map[string]*nodeStateCondition), + } +} + +func newPodState() *podState { + return &podState{ + new: true, + labels: make(map[string]string), + initContainers: make(map[string]*containerState), + containers: make(map[string]*containerState), + } +} + +func newContainerState() *containerState { + return &containerState{ + new: true, + stateWaitingReasons: make(map[string]*containerStateReason), + stateTerminatedReasons: make(map[string]*containerStateReason), + } +} + +type kubeState struct { + *sync.Mutex + nodes map[string]*nodeState + pods map[string]*podState +} + +type ( + nodeState struct { + new bool + deleted bool + + name string + unSchedulable bool + labels map[string]string + creationTime time.Time + allocatableCPU int64 + allocatableMem int64 + allocatablePods int64 + conditions map[string]*nodeStateCondition + + stats nodeStateStats + } + nodeStateCondition struct { + new bool + // https://kubernetes.io/docs/concepts/architecture/nodes/#condition + //typ corev1.NodeConditionType + status corev1.ConditionStatus + } + nodeStateStats struct { + reqCPU int64 + limitCPU int64 + reqMem int64 + limitMem int64 + pods int64 + + podsCondPodReady int64 + podsCondPodScheduled int64 + podsCondPodInitialized int64 + podsCondContainersReady int64 + + podsReadinessReady int64 + podsReadinessUnready int64 + + podsPhaseRunning int64 + podsPhaseFailed int64 + podsPhaseSucceeded int64 + podsPhasePending int64 + + containers int64 + initContainers int64 + initContStateRunning int64 + initContStateWaiting int64 + initContStateTerminated int64 + contStateRunning int64 + contStateWaiting int64 + contStateTerminated int64 + } +) + +func (ns nodeState) id() string { return ns.name } +func (ns *nodeState) resetStats() { ns.stats = nodeStateStats{} } + +type ( + podState struct { + new bool + deleted bool + unscheduled bool + + name string + nodeName string + namespace string + uid string + labels map[string]string + controllerKind string + controllerName string + qosClass string + creationTime time.Time + reqCPU int64 + reqMem int64 + limitCPU int64 + limitMem int64 + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-conditions + condPodScheduled corev1.ConditionStatus + condContainersReady corev1.ConditionStatus + condPodInitialized corev1.ConditionStatus + condPodReady corev1.ConditionStatus + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase + phase corev1.PodPhase + + initContainers map[string]*containerState + containers map[string]*containerState + } +) + +func (ps podState) id() string { return ps.namespace + "_" + ps.name } + +type ( + containerState struct { + new bool + + name string + uid string + + podName string + nodeName string + namespace string + + ready bool + restarts int64 + stateRunning bool + stateWaiting bool + stateTerminated bool + stateWaitingReasons map[string]*containerStateReason + stateTerminatedReasons map[string]*containerStateReason + } + containerStateReason struct { + new bool + reason string + active bool + } +) diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/testdata/config.json b/src/go/collectors/go.d.plugin/modules/k8s_state/testdata/config.json new file mode 100644 index 000000000..0e3f7c403 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/testdata/config.json @@ -0,0 +1,3 @@ +{ + "update_every": 123 +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/testdata/config.yaml b/src/go/collectors/go.d.plugin/modules/k8s_state/testdata/config.yaml new file mode 100644 index 000000000..f21a3a7a0 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/testdata/config.yaml @@ -0,0 +1 @@ +update_every: 123 diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/update_node_state.go b/src/go/collectors/go.d.plugin/modules/k8s_state/update_node_state.go new file mode 100644 index 000000000..80f5c26c8 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/update_node_state.go @@ -0,0 +1,47 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +func (ks *KubeState) updateNodeState(r resource) { + if r.value() == nil { + if ns, ok := ks.state.nodes[r.source()]; ok { + ns.deleted = true + } + return + } + + node, err := toNode(r) + if err != nil { + ks.Warning(err) + return + } + + if myNodeName != "" && node.Name != myNodeName { + return + } + + ns, ok := ks.state.nodes[r.source()] + if !ok { + ns = newNodeState() + ks.state.nodes[r.source()] = ns + } + + if !ok { + ns.name = node.Name + ns.creationTime = node.CreationTimestamp.Time + ns.allocatableCPU = int64(node.Status.Allocatable.Cpu().AsApproximateFloat64() * 1000) + ns.allocatableMem = node.Status.Allocatable.Memory().Value() + ns.allocatablePods = node.Status.Allocatable.Pods().Value() + copyLabels(ns.labels, node.Labels) + } + + ns.unSchedulable = node.Spec.Unschedulable + + for _, c := range node.Status.Conditions { + if v, ok := ns.conditions[string(c.Type)]; !ok { + ns.conditions[string(c.Type)] = &nodeStateCondition{new: true, status: c.Status} + } else { + v.status = c.Status + } + } +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/update_pod_state.go b/src/go/collectors/go.d.plugin/modules/k8s_state/update_pod_state.go new file mode 100644 index 000000000..22ef0f7fc --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/update_pod_state.go @@ -0,0 +1,165 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +import ( + "strings" + + corev1 "k8s.io/api/core/v1" +) + +func (ks *KubeState) updatePodState(r resource) { + if r.value() == nil { + if ps, ok := ks.state.pods[r.source()]; ok { + ps.deleted = true + } + return + } + + pod, err := toPod(r) + if err != nil { + ks.Warning(err) + return + } + + ps, ok := ks.state.pods[r.source()] + if !ok { + ps = newPodState() + ks.state.pods[r.source()] = ps + } + + if !ok { + ps.name = pod.Name + ps.nodeName = pod.Spec.NodeName + ps.namespace = pod.Namespace + ps.creationTime = pod.CreationTimestamp.Time + ps.uid = string(pod.UID) + ps.qosClass = strings.ToLower(string(pod.Status.QOSClass)) + copyLabels(ps.labels, pod.Labels) + for _, ref := range pod.OwnerReferences { + if ref.Controller != nil && *ref.Controller { + ps.controllerKind = ref.Kind + ps.controllerName = ref.Name + } + } + var res struct{ rCPU, lCPU, rMem, lMem, irCPU, ilCPU, irMem, ilMem int64 } + for _, cntr := range pod.Spec.Containers { + res.rCPU += int64(cntr.Resources.Requests.Cpu().AsApproximateFloat64() * 1000) + res.lCPU += int64(cntr.Resources.Limits.Cpu().AsApproximateFloat64() * 1000) + res.rMem += cntr.Resources.Requests.Memory().Value() + res.lMem += cntr.Resources.Limits.Memory().Value() + } + for _, cntr := range pod.Spec.InitContainers { + res.irCPU += int64(cntr.Resources.Requests.Cpu().AsApproximateFloat64() * 1000) + res.ilCPU += int64(cntr.Resources.Limits.Cpu().AsApproximateFloat64() * 1000) + res.irMem += cntr.Resources.Requests.Memory().Value() + res.ilMem += cntr.Resources.Limits.Memory().Value() + } + ps.reqCPU = max(res.rCPU, res.irCPU) + ps.limitCPU = max(res.lCPU, res.ilCPU) + ps.reqMem = max(res.rMem, res.irMem) + ps.limitMem = max(res.lMem, res.ilMem) + } + if ps.nodeName == "" { + ps.nodeName = pod.Spec.NodeName + } + + for _, c := range pod.Status.Conditions { + switch c.Type { + case corev1.ContainersReady: + ps.condContainersReady = c.Status + case corev1.PodInitialized: + ps.condPodInitialized = c.Status + case corev1.PodReady: + ps.condPodReady = c.Status + case corev1.PodScheduled: + ps.condPodScheduled = c.Status + } + } + + ps.phase = pod.Status.Phase + + for _, cs := range ps.containers { + for _, r := range cs.stateWaitingReasons { + r.active = false + } + for _, r := range cs.stateTerminatedReasons { + r.active = false + } + } + + for _, cntr := range pod.Status.ContainerStatuses { + cs, ok := ps.containers[cntr.Name] + if !ok { + cs = newContainerState() + ps.containers[cntr.Name] = cs + } + if !ok { + cs.name = cntr.Name + cs.podName = pod.Name + cs.namespace = pod.Namespace + cs.nodeName = pod.Spec.NodeName + cs.uid = extractContainerID(cntr.ContainerID) + } + cs.ready = cntr.Ready + cs.restarts = int64(cntr.RestartCount) + cs.stateRunning = cntr.State.Running != nil + cs.stateWaiting = cntr.State.Waiting != nil + cs.stateTerminated = cntr.State.Terminated != nil + + if cntr.State.Waiting != nil { + reason := cntr.State.Waiting.Reason + r, ok := cs.stateWaitingReasons[reason] + if !ok { + r = &containerStateReason{new: true, reason: reason} + cs.stateWaitingReasons[reason] = r + } + r.active = true + } + + if cntr.State.Terminated != nil { + reason := cntr.State.Terminated.Reason + r, ok := cs.stateTerminatedReasons[reason] + if !ok { + r = &containerStateReason{new: true, reason: reason} + cs.stateTerminatedReasons[reason] = r + } + r.active = true + } + } + + for _, cntr := range pod.Status.InitContainerStatuses { + cs, ok := ps.initContainers[cntr.Name] + if !ok { + cs = newContainerState() + ps.initContainers[cntr.Name] = cs + } + if !ok { + cs.name = cntr.Name + cs.podName = pod.Name + cs.namespace = pod.Namespace + cs.nodeName = pod.Spec.NodeName + cs.uid = extractContainerID(cntr.ContainerID) + } + cs.ready = cntr.Ready + cs.restarts = int64(cntr.RestartCount) + cs.stateRunning = cntr.State.Running != nil + cs.stateWaiting = cntr.State.Waiting != nil + cs.stateTerminated = cntr.State.Terminated != nil + } +} + +func max(a, b int64) int64 { + if a < b { + return b + } + return a +} + +func extractContainerID(id string) string { + // docker://d98... + if i := strings.LastIndexByte(id, '/'); i != -1 { + id = id[i+1:] + } + return id +} diff --git a/src/go/collectors/go.d.plugin/modules/k8s_state/update_state.go b/src/go/collectors/go.d.plugin/modules/k8s_state/update_state.go new file mode 100644 index 000000000..88f3272c1 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/k8s_state/update_state.go @@ -0,0 +1,27 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package k8s_state + +func (ks *KubeState) runUpdateState(in <-chan resource) { + for { + select { + case <-ks.ctx.Done(): + return + case r := <-in: + ks.state.Lock() + switch r.kind() { + case kubeResourceNode: + ks.updateNodeState(r) + case kubeResourcePod: + ks.updatePodState(r) + } + ks.state.Unlock() + } + } +} + +func copyLabels(dst, src map[string]string) { + for k, v := range src { + dst[k] = v + } +} |