summaryrefslogtreecommitdiffstats
path: root/src/go/collectors/go.d.plugin/modules/vsphere/scrape
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape.go159
-rw-r--r--src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape_test.go70
-rw-r--r--src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller.go33
-rw-r--r--src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller_test.go42
4 files changed, 304 insertions, 0 deletions
diff --git a/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape.go b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape.go
new file mode 100644
index 000000000..d803fa414
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape.go
@@ -0,0 +1,159 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package scrape
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ rs "github.com/netdata/netdata/go/go.d.plugin/modules/vsphere/resources"
+
+ "github.com/netdata/netdata/go/go.d.plugin/logger"
+ "github.com/vmware/govmomi/performance"
+ "github.com/vmware/govmomi/vim25/types"
+)
+
+type Client interface {
+ Version() string
+ PerformanceMetrics([]types.PerfQuerySpec) ([]performance.EntityMetric, error)
+}
+
+func New(client Client) *Scraper {
+ v := &Scraper{Client: client}
+ v.calcMaxQuery()
+ return v
+}
+
+type Scraper struct {
+ *logger.Logger
+ Client
+ maxQuery int
+}
+
+// Default settings for vCenter 6.5 and above is 256, prior versions of vCenter have this set to 64.
+func (c *Scraper) calcMaxQuery() {
+ major, minor, err := parseVersion(c.Version())
+ if err != nil || major < 6 || minor == 0 {
+ c.maxQuery = 64
+ return
+ }
+ c.maxQuery = 256
+}
+
+func (c Scraper) ScrapeHosts(hosts rs.Hosts) []performance.EntityMetric {
+ t := time.Now()
+ pqs := newHostsPerfQuerySpecs(hosts)
+ ms := c.scrapeMetrics(pqs)
+ c.Debugf("scraping : scraped metrics for %d/%d hosts, process took %s",
+ len(ms),
+ len(hosts),
+ time.Since(t),
+ )
+ return ms
+}
+
+func (c Scraper) ScrapeVMs(vms rs.VMs) []performance.EntityMetric {
+ t := time.Now()
+ pqs := newVMsPerfQuerySpecs(vms)
+ ms := c.scrapeMetrics(pqs)
+ c.Debugf("scraping : scraped metrics for %d/%d vms, process took %s",
+ len(ms),
+ len(vms),
+ time.Since(t),
+ )
+ return ms
+}
+
+func (c Scraper) scrapeMetrics(pqs []types.PerfQuerySpec) []performance.EntityMetric {
+ tc := newThrottledCaller(5)
+ var ms []performance.EntityMetric
+ lock := &sync.Mutex{}
+
+ chunks := chunkify(pqs, c.maxQuery)
+ for _, chunk := range chunks {
+ pqs := chunk
+ job := func() {
+ c.scrape(&ms, lock, pqs)
+ }
+ tc.call(job)
+ }
+ tc.wait()
+
+ return ms
+}
+
+func (c Scraper) scrape(metrics *[]performance.EntityMetric, lock *sync.Mutex, pqs []types.PerfQuerySpec) {
+ m, err := c.PerformanceMetrics(pqs)
+ if err != nil {
+ c.Error(err)
+ return
+ }
+
+ lock.Lock()
+ *metrics = append(*metrics, m...)
+ lock.Unlock()
+}
+
+func chunkify(pqs []types.PerfQuerySpec, chunkSize int) (chunks [][]types.PerfQuerySpec) {
+ for i := 0; i < len(pqs); i += chunkSize {
+ end := i + chunkSize
+ if end > len(pqs) {
+ end = len(pqs)
+ }
+ chunks = append(chunks, pqs[i:end])
+ }
+ return chunks
+}
+
+const (
+ pqsMaxSample = 1
+ pqsIntervalID = 20
+ pqsFormat = "normal"
+)
+
+func newHostsPerfQuerySpecs(hosts rs.Hosts) []types.PerfQuerySpec {
+ pqs := make([]types.PerfQuerySpec, 0, len(hosts))
+ for _, host := range hosts {
+ pq := types.PerfQuerySpec{
+ Entity: host.Ref,
+ MaxSample: pqsMaxSample,
+ MetricId: host.MetricList,
+ IntervalId: pqsIntervalID,
+ Format: pqsFormat,
+ }
+ pqs = append(pqs, pq)
+ }
+ return pqs
+}
+
+func newVMsPerfQuerySpecs(vms rs.VMs) []types.PerfQuerySpec {
+ pqs := make([]types.PerfQuerySpec, 0, len(vms))
+ for _, vm := range vms {
+ pq := types.PerfQuerySpec{
+ Entity: vm.Ref,
+ MaxSample: pqsMaxSample,
+ MetricId: vm.MetricList,
+ IntervalId: pqsIntervalID,
+ Format: pqsFormat,
+ }
+ pqs = append(pqs, pq)
+ }
+ return pqs
+}
+
+func parseVersion(version string) (major, minor int, err error) {
+ parts := strings.Split(version, ".")
+ if len(parts) < 2 {
+ return 0, 0, fmt.Errorf("unparsable version string : %s", version)
+ }
+ if major, err = strconv.Atoi(parts[0]); err != nil {
+ return 0, 0, err
+ }
+ if minor, err = strconv.Atoi(parts[1]); err != nil {
+ return 0, 0, err
+ }
+ return major, minor, nil
+}
diff --git a/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape_test.go b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape_test.go
new file mode 100644
index 000000000..0576850f6
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/scrape_test.go
@@ -0,0 +1,70 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package scrape
+
+import (
+ "crypto/tls"
+ "net/url"
+ "testing"
+ "time"
+
+ "github.com/netdata/netdata/go/go.d.plugin/modules/vsphere/client"
+ "github.com/netdata/netdata/go/go.d.plugin/modules/vsphere/discover"
+ rs "github.com/netdata/netdata/go/go.d.plugin/modules/vsphere/resources"
+ "github.com/netdata/netdata/go/go.d.plugin/pkg/tlscfg"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/vmware/govmomi/simulator"
+)
+
+func TestNew(t *testing.T) {
+}
+
+func TestScraper_ScrapeVMs(t *testing.T) {
+ s, res, teardown := prepareScraper(t)
+ defer teardown()
+
+ metrics := s.ScrapeVMs(res.VMs)
+ assert.Len(t, metrics, len(res.VMs))
+}
+
+func TestScraper_ScrapeHosts(t *testing.T) {
+ s, res, teardown := prepareScraper(t)
+ defer teardown()
+
+ metrics := s.ScrapeHosts(res.Hosts)
+ assert.Len(t, metrics, len(res.Hosts))
+}
+
+func prepareScraper(t *testing.T) (s *Scraper, res *rs.Resources, teardown func()) {
+ model, srv := createSim(t)
+ teardown = func() { model.Remove(); srv.Close() }
+
+ c := newClient(t, srv.URL)
+ d := discover.New(c)
+ res, err := d.Discover()
+ require.NoError(t, err)
+
+ return New(c), res, teardown
+}
+
+func newClient(t *testing.T, vCenterURL *url.URL) *client.Client {
+ c, err := client.New(client.Config{
+ URL: vCenterURL.String(),
+ User: "admin",
+ Password: "password",
+ Timeout: time.Second * 3,
+ TLSConfig: tlscfg.TLSConfig{InsecureSkipVerify: true},
+ })
+ require.NoError(t, err)
+ return c
+}
+
+func createSim(t *testing.T) (*simulator.Model, *simulator.Server) {
+ model := simulator.VPX()
+ err := model.Create()
+ require.NoError(t, err)
+ model.Service.TLS = new(tls.Config)
+ return model, model.Service.NewServer()
+}
diff --git a/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller.go b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller.go
new file mode 100644
index 000000000..5127c28c1
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller.go
@@ -0,0 +1,33 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package scrape
+
+import "sync"
+
+type throttledCaller struct {
+ limit chan struct{}
+ wg sync.WaitGroup
+}
+
+func newThrottledCaller(limit int) *throttledCaller {
+ if limit <= 0 {
+ panic("limit must be > 0")
+ }
+ return &throttledCaller{limit: make(chan struct{}, limit)}
+}
+
+func (t *throttledCaller) call(job func()) {
+ t.wg.Add(1)
+ go func() {
+ defer t.wg.Done()
+ t.limit <- struct{}{}
+ defer func() {
+ <-t.limit
+ }()
+ job()
+ }()
+}
+
+func (t *throttledCaller) wait() {
+ t.wg.Wait()
+}
diff --git a/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller_test.go b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller_test.go
new file mode 100644
index 000000000..545ed1603
--- /dev/null
+++ b/src/go/collectors/go.d.plugin/modules/vsphere/scrape/throttled_caller_test.go
@@ -0,0 +1,42 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package scrape
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_throttledCaller(t *testing.T) {
+ var current int64
+ var max int64
+ var total int64
+ var mux sync.Mutex
+ limit := 5
+ n := 10000
+ tc := newThrottledCaller(limit)
+
+ for i := 0; i < n; i++ {
+ job := func() {
+ atomic.AddInt64(&total, 1)
+ atomic.AddInt64(&current, 1)
+ time.Sleep(100 * time.Microsecond)
+
+ mux.Lock()
+ defer mux.Unlock()
+ if atomic.LoadInt64(&current) > max {
+ max = atomic.LoadInt64(&current)
+ }
+ atomic.AddInt64(&current, -1)
+ }
+ tc.call(job)
+ }
+ tc.wait()
+
+ assert.Equal(t, int64(n), total)
+ assert.Equal(t, max, int64(limit))
+}