diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 11:19:16 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-07-24 09:53:24 +0000 |
commit | b5f8ee61a7f7e9bd291dd26b0585d03eb686c941 (patch) | |
tree | d4d31289c39fc00da064a825df13a0b98ce95b10 /src/go/collectors/go.d.plugin/modules/vsphere/scrape | |
parent | Adding upstream version 1.44.3. (diff) | |
download | netdata-b5f8ee61a7f7e9bd291dd26b0585d03eb686c941.tar.xz netdata-b5f8ee61a7f7e9bd291dd26b0585d03eb686c941.zip |
Adding upstream version 1.46.3.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
4 files changed, 304 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape.go b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape.go new file mode 100644 index 000000000..adda665cc --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape.go @@ -0,0 +1,159 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package scrape + +import ( + "fmt" + "strconv" + "strings" + "sync" + "time" + + rs "github.com/netdata/netdata/go/go.d.plugin/modules/vsphere/resources" + + "github.com/netdata/netdata/go/go.d.plugin/logger" + "github.com/vmware/govmomi/performance" + "github.com/vmware/govmomi/vim25/types" +) + +type Client interface { + Version() string + PerformanceMetrics([]types.PerfQuerySpec) ([]performance.EntityMetric, error) +} + +func New(client Client) *Scraper { + v := &Scraper{Client: client} + v.calcMaxQuery() + return v +} + +type Scraper struct { + *logger.Logger + Client + maxQuery int +} + +// Default settings for vCenter 6.5 and above is 256, prior versions of vCenter have this set to 64. +func (s *Scraper) calcMaxQuery() { + major, minor, err := parseVersion(s.Version()) + if err != nil || major < 6 || minor == 0 { + s.maxQuery = 64 + return + } + s.maxQuery = 256 +} + +func (s *Scraper) ScrapeHosts(hosts rs.Hosts) []performance.EntityMetric { + t := time.Now() + pqs := newHostsPerfQuerySpecs(hosts) + ms := s.scrapeMetrics(pqs) + s.Debugf("scraping : scraped metrics for %d/%d hosts, process took %s", + len(ms), + len(hosts), + time.Since(t), + ) + return ms +} + +func (s *Scraper) ScrapeVMs(vms rs.VMs) []performance.EntityMetric { + t := time.Now() + pqs := newVMsPerfQuerySpecs(vms) + ms := s.scrapeMetrics(pqs) + s.Debugf("scraping : scraped metrics for %d/%d vms, process took %s", + len(ms), + len(vms), + time.Since(t), + ) + return ms +} + +func (s *Scraper) scrapeMetrics(pqs []types.PerfQuerySpec) []performance.EntityMetric { + tc := newThrottledCaller(5) + var ms []performance.EntityMetric + lock := &sync.Mutex{} + + chunks := chunkify(pqs, s.maxQuery) + for _, chunk := range chunks { + pqs := chunk + job := func() { + s.scrape(&ms, lock, pqs) + } + tc.call(job) + } + tc.wait() + + return ms +} + +func (s *Scraper) scrape(metrics *[]performance.EntityMetric, lock *sync.Mutex, pqs []types.PerfQuerySpec) { + m, err := s.PerformanceMetrics(pqs) + if err != nil { + s.Error(err) + return + } + + lock.Lock() + *metrics = append(*metrics, m...) + lock.Unlock() +} + +func chunkify(pqs []types.PerfQuerySpec, chunkSize int) (chunks [][]types.PerfQuerySpec) { + for i := 0; i < len(pqs); i += chunkSize { + end := i + chunkSize + if end > len(pqs) { + end = len(pqs) + } + chunks = append(chunks, pqs[i:end]) + } + return chunks +} + +const ( + pqsMaxSample = 1 + pqsIntervalID = 20 + pqsFormat = "normal" +) + +func newHostsPerfQuerySpecs(hosts rs.Hosts) []types.PerfQuerySpec { + pqs := make([]types.PerfQuerySpec, 0, len(hosts)) + for _, host := range hosts { + pq := types.PerfQuerySpec{ + Entity: host.Ref, + MaxSample: pqsMaxSample, + MetricId: host.MetricList, + IntervalId: pqsIntervalID, + Format: pqsFormat, + } + pqs = append(pqs, pq) + } + return pqs +} + +func newVMsPerfQuerySpecs(vms rs.VMs) []types.PerfQuerySpec { + pqs := make([]types.PerfQuerySpec, 0, len(vms)) + for _, vm := range vms { + pq := types.PerfQuerySpec{ + Entity: vm.Ref, + MaxSample: pqsMaxSample, + MetricId: vm.MetricList, + IntervalId: pqsIntervalID, + Format: pqsFormat, + } + pqs = append(pqs, pq) + } + return pqs +} + +func parseVersion(version string) (major, minor int, err error) { + parts := strings.Split(version, ".") + if len(parts) < 2 { + return 0, 0, fmt.Errorf("unparsable version string : %s", version) + } + if major, err = strconv.Atoi(parts[0]); err != nil { + return 0, 0, err + } + if minor, err = strconv.Atoi(parts[1]); err != nil { + return 0, 0, err + } + return major, minor, nil +} diff --git a/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape_test.go b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape_test.go new file mode 100644 index 000000000..0576850f6 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape_test.go @@ -0,0 +1,70 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package scrape + +import ( + "crypto/tls" + "net/url" + "testing" + "time" + + "github.com/netdata/netdata/go/go.d.plugin/modules/vsphere/client" + "github.com/netdata/netdata/go/go.d.plugin/modules/vsphere/discover" + rs "github.com/netdata/netdata/go/go.d.plugin/modules/vsphere/resources" + "github.com/netdata/netdata/go/go.d.plugin/pkg/tlscfg" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/vmware/govmomi/simulator" +) + +func TestNew(t *testing.T) { +} + +func TestScraper_ScrapeVMs(t *testing.T) { + s, res, teardown := prepareScraper(t) + defer teardown() + + metrics := s.ScrapeVMs(res.VMs) + assert.Len(t, metrics, len(res.VMs)) +} + +func TestScraper_ScrapeHosts(t *testing.T) { + s, res, teardown := prepareScraper(t) + defer teardown() + + metrics := s.ScrapeHosts(res.Hosts) + assert.Len(t, metrics, len(res.Hosts)) +} + +func prepareScraper(t *testing.T) (s *Scraper, res *rs.Resources, teardown func()) { + model, srv := createSim(t) + teardown = func() { model.Remove(); srv.Close() } + + c := newClient(t, srv.URL) + d := discover.New(c) + res, err := d.Discover() + require.NoError(t, err) + + return New(c), res, teardown +} + +func newClient(t *testing.T, vCenterURL *url.URL) *client.Client { + c, err := client.New(client.Config{ + URL: vCenterURL.String(), + User: "admin", + Password: "password", + Timeout: time.Second * 3, + TLSConfig: tlscfg.TLSConfig{InsecureSkipVerify: true}, + }) + require.NoError(t, err) + return c +} + +func createSim(t *testing.T) (*simulator.Model, *simulator.Server) { + model := simulator.VPX() + err := model.Create() + require.NoError(t, err) + model.Service.TLS = new(tls.Config) + return model, model.Service.NewServer() +} diff --git a/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller.go b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller.go new file mode 100644 index 000000000..5127c28c1 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller.go @@ -0,0 +1,33 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package scrape + +import "sync" + +type throttledCaller struct { + limit chan struct{} + wg sync.WaitGroup +} + +func newThrottledCaller(limit int) *throttledCaller { + if limit <= 0 { + panic("limit must be > 0") + } + return &throttledCaller{limit: make(chan struct{}, limit)} +} + +func (t *throttledCaller) call(job func()) { + t.wg.Add(1) + go func() { + defer t.wg.Done() + t.limit <- struct{}{} + defer func() { + <-t.limit + }() + job() + }() +} + +func (t *throttledCaller) wait() { + t.wg.Wait() +} diff --git a/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller_test.go b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller_test.go new file mode 100644 index 000000000..545ed1603 --- /dev/null +++ b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller_test.go @@ -0,0 +1,42 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package scrape + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_throttledCaller(t *testing.T) { + var current int64 + var max int64 + var total int64 + var mux sync.Mutex + limit := 5 + n := 10000 + tc := newThrottledCaller(limit) + + for i := 0; i < n; i++ { + job := func() { + atomic.AddInt64(&total, 1) + atomic.AddInt64(¤t, 1) + time.Sleep(100 * time.Microsecond) + + mux.Lock() + defer mux.Unlock() + if atomic.LoadInt64(¤t) > max { + max = atomic.LoadInt64(¤t) + } + atomic.AddInt64(¤t, -1) + } + tc.call(job) + } + tc.wait() + + assert.Equal(t, int64(n), total) + assert.Equal(t, max, int64(limit)) +} |