diff options
Diffstat (limited to 'src/go/plugin/go.d/modules/gearman')
l--------- | src/go/plugin/go.d/modules/gearman/README.md | 1 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/charts.go | 158 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/client.go | 80 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/collect.go | 221 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/config_schema.json | 44 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/gearman.go | 106 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/gearman_test.go | 326 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/integrations/gearman.md | 235 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/metadata.yaml | 152 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/testdata/config.json | 5 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/testdata/config.yaml | 3 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/testdata/priority-status.txt | 5 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/gearman/testdata/status.txt | 5 |
13 files changed, 1341 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/modules/gearman/README.md b/src/go/plugin/go.d/modules/gearman/README.md new file mode 120000 index 000000000..70189d698 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/README.md @@ -0,0 +1 @@ +integrations/gearman.md
\ No newline at end of file diff --git a/src/go/plugin/go.d/modules/gearman/charts.go b/src/go/plugin/go.d/modules/gearman/charts.go new file mode 100644 index 000000000..425c00fd4 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/charts.go @@ -0,0 +1,158 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package gearman + +import ( + "fmt" + "strings" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" +) + +const ( + prioQueuedJobsByActivity = module.Priority + iota + prioQueuedJobsByPriority + + prioFunctionQueuedJobsByActivity + prioFunctionQueuedJobsByPriority + prioFunctionAvailableWorkers +) + +var summaryCharts = module.Charts{ + chartQueuedJobsActivity.Copy(), + chartQueuedJobsPriority.Copy(), +} + +var ( + chartQueuedJobsActivity = module.Chart{ + ID: "queued_jobs_by_activity", + Title: "Jobs Activity", + Units: "jobs", + Fam: "jobs", + Ctx: "gearman.queued_jobs_activity", + Priority: prioQueuedJobsByActivity, + Type: module.Stacked, + Dims: module.Dims{ + {ID: "total_jobs_running", Name: "running"}, + {ID: "total_jobs_waiting", Name: "waiting"}, + }, + } + chartQueuedJobsPriority = module.Chart{ + ID: "queued_jobs_by_priority", + Title: "Jobs Priority", + Units: "jobs", + Fam: "jobs", + Ctx: "gearman.queued_jobs_priority", + Priority: prioQueuedJobsByPriority, + Type: module.Stacked, + Dims: module.Dims{ + {ID: "total_high_priority_jobs", Name: "high"}, + {ID: "total_normal_priority_jobs", Name: "normal"}, + {ID: "total_low_priority_jobs", Name: "low"}, + }, + } +) + +var functionStatusChartsTmpl = module.Charts{ + functionQueuedJobsActivityChartTmpl.Copy(), + functionWorkersChartTmpl.Copy(), +} + +var ( + functionQueuedJobsActivityChartTmpl = module.Chart{ + ID: "function_%s_queued_jobs_by_activity", + Title: "Function Jobs Activity", + Units: "jobs", + Fam: "fn jobs", + Ctx: "gearman.function_queued_jobs_activity", + Priority: prioFunctionQueuedJobsByActivity, + Type: module.Stacked, + Dims: module.Dims{ + {ID: "function_%s_jobs_running", Name: "running"}, + {ID: "function_%s_jobs_waiting", Name: "waiting"}, + }, + } + functionWorkersChartTmpl = module.Chart{ + ID: "function_%s_workers", + Title: "Function Workers", + Units: "workers", + Fam: "fn workers", + Ctx: "gearman.function_workers", + Priority: prioFunctionAvailableWorkers, + Type: module.Line, + Dims: module.Dims{ + {ID: "function_%s_workers_available", Name: "available"}, + }, + } +) + +var functionPriorityStatusChartsTmpl = module.Charts{ + functionQueuedJobsByPriorityChartTmpl.Copy(), +} + +var ( + functionQueuedJobsByPriorityChartTmpl = module.Chart{ + ID: "prio_function_%s_queued_jobs_by_priority", + Title: "Function Jobs Priority", + Units: "jobs", + Fam: "fn jobs", + Ctx: "gearman.function_queued_jobs_priority", + Priority: prioFunctionQueuedJobsByPriority, + Type: module.Stacked, + Dims: module.Dims{ + {ID: "function_%s_high_priority_jobs", Name: "high"}, + {ID: "function_%s_normal_priority_jobs", Name: "normal"}, + {ID: "function_%s_low_priority_jobs", Name: "low"}, + }, + } +) + +func (g *Gearman) addFunctionStatusCharts(name string) { + g.addFunctionCharts(name, functionStatusChartsTmpl.Copy()) +} + +func (g *Gearman) removeFunctionStatusCharts(name string) { + px := fmt.Sprintf("function_%s_", cleanFunctionName(name)) + g.removeCharts(px) +} + +func (g *Gearman) addFunctionPriorityStatusCharts(name string) { + g.addFunctionCharts(name, functionPriorityStatusChartsTmpl.Copy()) +} + +func (g *Gearman) removeFunctionPriorityStatusCharts(name string) { + px := fmt.Sprintf("prio_function_%s_", cleanFunctionName(name)) + g.removeCharts(px) +} + +func (g *Gearman) addFunctionCharts(name string, charts *module.Charts) { + charts = charts.Copy() + + for _, chart := range *charts { + chart.ID = fmt.Sprintf(chart.ID, cleanFunctionName(name)) + chart.Labels = []module.Label{ + {Key: "function_name", Value: name}, + } + for _, dim := range chart.Dims { + dim.ID = fmt.Sprintf(dim.ID, name) + } + } + + if err := g.Charts().Add(*charts...); err != nil { + g.Warning(err) + } +} + +func (g *Gearman) removeCharts(px string) { + for _, chart := range *g.Charts() { + if strings.HasPrefix(chart.ID, px) { + chart.MarkRemove() + chart.MarkNotCreated() + } + } +} + +func cleanFunctionName(name string) string { + r := strings.NewReplacer(".", "_", ",", "_", " ", "_") + return r.Replace(name) +} diff --git a/src/go/plugin/go.d/modules/gearman/client.go b/src/go/plugin/go.d/modules/gearman/client.go new file mode 100644 index 000000000..dff9a1be4 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/client.go @@ -0,0 +1,80 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package gearman + +import ( + "bytes" + "fmt" + "strings" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/socket" +) + +type gearmanConn interface { + connect() error + disconnect() + queryStatus() ([]byte, error) + queryPriorityStatus() ([]byte, error) +} + +func newGearmanConn(conf Config) gearmanConn { + return &gearmanClient{conn: socket.New(socket.Config{ + Address: conf.Address, + ConnectTimeout: conf.Timeout.Duration(), + ReadTimeout: conf.Timeout.Duration(), + WriteTimeout: conf.Timeout.Duration(), + })} +} + +type gearmanClient struct { + conn socket.Client +} + +func (c *gearmanClient) connect() error { + return c.conn.Connect() +} + +func (c *gearmanClient) disconnect() { + _ = c.conn.Disconnect() +} + +func (c *gearmanClient) queryStatus() ([]byte, error) { + return c.query("status") +} + +func (c *gearmanClient) queryPriorityStatus() ([]byte, error) { + return c.query("prioritystatus") +} + +func (c *gearmanClient) query(cmd string) ([]byte, error) { + const limitReadLines = 10000 + var num int + var err error + var b bytes.Buffer + + clientErr := c.conn.Command(cmd+"\n", func(bs []byte) bool { + s := string(bs) + + if strings.HasPrefix(s, "ERR") { + err = fmt.Errorf("command '%s': %s", cmd, s) + return false + } + + b.WriteString(s) + b.WriteByte('\n') + + if num++; num >= limitReadLines { + err = fmt.Errorf("command '%s': read line limit exceeded (%d)", cmd, limitReadLines) + return false + } + return !strings.HasPrefix(s, ".") + }) + if clientErr != nil { + return nil, fmt.Errorf("command '%s' client error: %v", cmd, clientErr) + } + if err != nil { + return nil, err + } + + return b.Bytes(), nil +} diff --git a/src/go/plugin/go.d/modules/gearman/collect.go b/src/go/plugin/go.d/modules/gearman/collect.go new file mode 100644 index 000000000..ddfd8c96b --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/collect.go @@ -0,0 +1,221 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package gearman + +import ( + "bufio" + "bytes" + "errors" + "fmt" + "strconv" + "strings" +) + +func (g *Gearman) collect() (map[string]int64, error) { + if g.conn == nil { + conn, err := g.establishConn() + if err != nil { + return nil, err + } + g.conn = conn + } + + status, err := g.conn.queryStatus() + if err != nil { + g.Cleanup() + return nil, fmt.Errorf("couldn't query status: %v", err) + } + + prioStatus, err := g.conn.queryPriorityStatus() + if err != nil { + g.Cleanup() + return nil, fmt.Errorf("couldn't query priority status: %v", err) + } + + mx := make(map[string]int64) + + if err := g.collectStatus(mx, status); err != nil { + return nil, fmt.Errorf("couldn't collect status: %v", err) + } + if err := g.collectPriorityStatus(mx, prioStatus); err != nil { + return nil, fmt.Errorf("couldn't collect priority status: %v", err) + } + + return mx, nil + +} + +func (g *Gearman) collectStatus(mx map[string]int64, statusData []byte) error { + /* + Same output as the "gearadmin --status" command: + + FUNCTION\tTOTAL\tRUNNING\tAVAILABLE_WORKERS + + E.g.: + + prefix generic_worker4 78 78 500 + generic_worker2 78 78 500 + generic_worker3 0 0 760 + generic_worker1 0 0 500 + */ + + seen := make(map[string]bool) + var foundEnd bool + sc := bufio.NewScanner(bytes.NewReader(statusData)) + + mx["total_jobs_queued"] = 0 + mx["total_jobs_running"] = 0 + mx["total_jobs_waiting"] = 0 + mx["total_workers_avail"] = 0 + + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + + if foundEnd = line == "."; foundEnd { + break + } + + parts := strings.Fields(line) + + // Gearman does not remove old tasks. We are only interested in tasks that have stats. + if len(parts) < 4 { + continue + } + + name := strings.Join(parts[:len(parts)-3], "_") + metrics := parts[len(parts)-3:] + + var queued, running, availWorkers int64 + var err error + + if queued, err = strconv.ParseInt(metrics[0], 10, 64); err != nil { + return fmt.Errorf("couldn't parse queued count: %v", err) + } + if running, err = strconv.ParseInt(metrics[1], 10, 64); err != nil { + return fmt.Errorf("couldn't parse running count: %v", err) + } + if availWorkers, err = strconv.ParseInt(metrics[2], 10, 64); err != nil { + return fmt.Errorf("couldn't parse available count: %v", err) + } + + px := fmt.Sprintf("function_%s_", name) + + waiting := queued - running + + mx[px+"jobs_queued"] = queued + mx[px+"jobs_running"] = running + mx[px+"jobs_waiting"] = waiting + mx[px+"workers_available"] = availWorkers + + mx["total_jobs_queued"] += queued + mx["total_jobs_running"] += running + mx["total_jobs_waiting"] += waiting + mx["total_workers_available"] += availWorkers + + seen[name] = true + } + + if !foundEnd { + return errors.New("unexpected status response") + } + + for name := range seen { + if !g.seenTasks[name] { + g.seenTasks[name] = true + g.addFunctionStatusCharts(name) + } + } + for name := range g.seenTasks { + if !seen[name] { + delete(g.seenTasks, name) + g.removeFunctionStatusCharts(name) + } + } + + return nil +} + +func (g *Gearman) collectPriorityStatus(mx map[string]int64, prioStatusData []byte) error { + /* + Same output as the "gearadmin --priority-status" command: + + FUNCTION\tHIGH\tNORMAL\tLOW\tAVAILABLE_WORKERS + */ + + seen := make(map[string]bool) + var foundEnd bool + sc := bufio.NewScanner(bytes.NewReader(prioStatusData)) + + mx["total_high_priority_jobs"] = 0 + mx["total_normal_priority_jobs"] = 0 + mx["total_low_priority_jobs"] = 0 + + for sc.Scan() { + line := strings.TrimSpace(sc.Text()) + + if foundEnd = line == "."; foundEnd { + break + } + + parts := strings.Fields(line) + if len(parts) < 5 { + continue + } + + name := strings.Join(parts[:len(parts)-4], "_") + metrics := parts[len(parts)-4:] + + var high, normal, low int64 + var err error + + if high, err = strconv.ParseInt(metrics[0], 10, 64); err != nil { + return fmt.Errorf("couldn't parse high count: %v", err) + } + if normal, err = strconv.ParseInt(metrics[1], 10, 64); err != nil { + return fmt.Errorf("couldn't parse normal count: %v", err) + } + if low, err = strconv.ParseInt(metrics[2], 10, 64); err != nil { + return fmt.Errorf("couldn't parse low count: %v", err) + } + + px := fmt.Sprintf("function_%s_", name) + + mx[px+"high_priority_jobs"] = high + mx[px+"normal_priority_jobs"] = normal + mx[px+"low_priority_jobs"] = low + mx["total_high_priority_jobs"] += high + mx["total_normal_priority_jobs"] += normal + mx["total_low_priority_jobs"] += low + + seen[name] = true + } + + if !foundEnd { + return errors.New("unexpected priority status response") + } + + for name := range seen { + if !g.seenPriorityTasks[name] { + g.seenPriorityTasks[name] = true + g.addFunctionPriorityStatusCharts(name) + } + } + for name := range g.seenPriorityTasks { + if !seen[name] { + delete(g.seenPriorityTasks, name) + g.removeFunctionPriorityStatusCharts(name) + } + } + + return nil +} + +func (g *Gearman) establishConn() (gearmanConn, error) { + conn := g.newConn(g.Config) + + if err := conn.connect(); err != nil { + return nil, err + } + + return conn, nil +} diff --git a/src/go/plugin/go.d/modules/gearman/config_schema.json b/src/go/plugin/go.d/modules/gearman/config_schema.json new file mode 100644 index 000000000..dd5d3a0b8 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/config_schema.json @@ -0,0 +1,44 @@ +{ + "jsonSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Gearman collector configuration.", + "type": "object", + "properties": { + "update_every": { + "title": "Update every", + "description": "Data collection interval, measured in seconds.", + "type": "integer", + "minimum": 1, + "default": 1 + }, + "address": { + "title": "Address", + "description": "The IP address and port where the Gearman service listens for connections.", + "type": "string", + "default": "127.0.0.1:4730" + }, + "timeout": { + "title": "Timeout", + "description": "Timeout for establishing a connection and communication (reading and writing) in seconds.", + "type": "number", + "minimum": 0.5, + "default": 1 + } + }, + "required": [ + "address" + ], + "additionalProperties": false, + "patternProperties": { + "^name$": {} + } + }, + "uiSchema": { + "uiOptions": { + "fullPage": true + }, + "timeout": { + "ui:help": "Accepts decimals for precise control (e.g., type 1.5 for 1.5 seconds)." + } + } +} diff --git a/src/go/plugin/go.d/modules/gearman/gearman.go b/src/go/plugin/go.d/modules/gearman/gearman.go new file mode 100644 index 000000000..e1780a95c --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/gearman.go @@ -0,0 +1,106 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package gearman + +import ( + _ "embed" + "errors" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web" +) + +//go:embed "config_schema.json" +var configSchema string + +func init() { + module.Register("gearman", module.Creator{ + JobConfigSchema: configSchema, + Create: func() module.Module { return New() }, + Config: func() any { return &Config{} }, + }) +} + +func New() *Gearman { + return &Gearman{ + Config: Config{ + Address: "127.0.0.1:4730", + Timeout: web.Duration(time.Second * 1), + }, + newConn: newGearmanConn, + charts: summaryCharts.Copy(), + seenTasks: make(map[string]bool), + seenPriorityTasks: make(map[string]bool), + } +} + +type Config struct { + UpdateEvery int `yaml:"update_every,omitempty" json:"update_every"` + Address string `yaml:"address" json:"address"` + Timeout web.Duration `yaml:"timeout" json:"timeout"` +} + +type Gearman struct { + module.Base + Config `yaml:",inline" json:""` + + charts *module.Charts + + newConn func(Config) gearmanConn + conn gearmanConn + + seenTasks map[string]bool + seenPriorityTasks map[string]bool +} + +func (g *Gearman) Configuration() any { + return g.Config +} + +func (g *Gearman) Init() error { + if g.Address == "" { + g.Error("config: 'address' not set") + return errors.New("address not set") + } + + return nil +} + +func (g *Gearman) Check() error { + mx, err := g.collect() + if err != nil { + g.Error(err) + return err + } + + if len(mx) == 0 { + return errors.New("no metrics collected") + } + + return nil +} + +func (g *Gearman) Charts() *module.Charts { + return g.charts +} + +func (g *Gearman) Collect() map[string]int64 { + mx, err := g.collect() + if err != nil { + g.Error(err) + } + + if len(mx) == 0 { + return nil + } + + return mx +} + +func (g *Gearman) Cleanup() { + if g.conn != nil { + g.conn.disconnect() + g.conn = nil + } +} diff --git a/src/go/plugin/go.d/modules/gearman/gearman_test.go b/src/go/plugin/go.d/modules/gearman/gearman_test.go new file mode 100644 index 000000000..43069abce --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/gearman_test.go @@ -0,0 +1,326 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package gearman + +import ( + "errors" + "os" + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + dataConfigJSON, _ = os.ReadFile("testdata/config.json") + dataConfigYAML, _ = os.ReadFile("testdata/config.yaml") + + dataStatus, _ = os.ReadFile("testdata/status.txt") + dataPriorityStatus, _ = os.ReadFile("testdata/priority-status.txt") +) + +func Test_testDataIsValid(t *testing.T) { + for name, data := range map[string][]byte{ + "dataConfigJSON": dataConfigJSON, + "dataConfigYAML": dataConfigYAML, + + "dataStatus": dataStatus, + "dataPriorityStatus": dataPriorityStatus, + } { + require.NotNil(t, data, name) + } +} + +func TestGearman_ConfigurationSerialize(t *testing.T) { + module.TestConfigurationSerialize(t, &Gearman{}, dataConfigJSON, dataConfigYAML) +} + +func TestGearman_Init(t *testing.T) { + tests := map[string]struct { + config Config + wantFail bool + }{ + "success with default config": { + wantFail: false, + config: New().Config, + }, + "fails if address not set": { + wantFail: true, + config: func() Config { + conf := New().Config + conf.Address = "" + return conf + }(), + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + gear := New() + gear.Config = test.config + + if test.wantFail { + assert.Error(t, gear.Init()) + } else { + assert.NoError(t, gear.Init()) + } + }) + } +} + +func TestGearman_Cleanup(t *testing.T) { + tests := map[string]struct { + prepare func() *Gearman + }{ + "not initialized": { + prepare: func() *Gearman { + return New() + }, + }, + "after check": { + prepare: func() *Gearman { + gear := New() + gear.newConn = func(config Config) gearmanConn { return prepareMockOk() } + _ = gear.Check() + return gear + }, + }, + "after collect": { + prepare: func() *Gearman { + gear := New() + gear.newConn = func(config Config) gearmanConn { return prepareMockOk() } + _ = gear.Collect() + return gear + }, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + gear := test.prepare() + + assert.NotPanics(t, gear.Cleanup) + }) + } +} + +func TestGearman_Charts(t *testing.T) { + assert.NotNil(t, New().Charts()) +} + +func TestGearman_Check(t *testing.T) { + tests := map[string]struct { + prepareMock func() *mockGearmanConn + wantFail bool + }{ + "success case": { + wantFail: false, + prepareMock: prepareMockOk, + }, + "err on connect": { + wantFail: true, + prepareMock: prepareMockErrOnConnect, + }, + "unexpected response": { + wantFail: true, + prepareMock: prepareMockUnexpectedResponse, + }, + "empty response": { + wantFail: false, + prepareMock: prepareMockEmptyResponse, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + gear := New() + mock := test.prepareMock() + gear.newConn = func(config Config) gearmanConn { return mock } + + if test.wantFail { + assert.Error(t, gear.Check()) + } else { + assert.NoError(t, gear.Check()) + } + }) + } +} + +func TestGearman_Collect(t *testing.T) { + tests := map[string]struct { + prepareMock func() *mockGearmanConn + wantMetrics map[string]int64 + wantCharts int + disconnectBeforeCleanup bool + disconnectAfterCleanup bool + }{ + "success case": { + prepareMock: prepareMockOk, + disconnectBeforeCleanup: false, + disconnectAfterCleanup: true, + wantCharts: len(summaryCharts) + len(functionStatusChartsTmpl)*4 + len(functionPriorityStatusChartsTmpl)*4, + wantMetrics: map[string]int64{ + "function_generic_worker1_high_priority_jobs": 10, + "function_generic_worker1_jobs_queued": 4, + "function_generic_worker1_jobs_running": 3, + "function_generic_worker1_jobs_waiting": 1, + "function_generic_worker1_low_priority_jobs": 12, + "function_generic_worker1_normal_priority_jobs": 11, + "function_generic_worker1_workers_available": 500, + "function_generic_worker2_high_priority_jobs": 4, + "function_generic_worker2_jobs_queued": 78, + "function_generic_worker2_jobs_running": 78, + "function_generic_worker2_jobs_waiting": 0, + "function_generic_worker2_low_priority_jobs": 6, + "function_generic_worker2_normal_priority_jobs": 5, + "function_generic_worker2_workers_available": 500, + "function_generic_worker3_high_priority_jobs": 7, + "function_generic_worker3_jobs_queued": 2, + "function_generic_worker3_jobs_running": 1, + "function_generic_worker3_jobs_waiting": 1, + "function_generic_worker3_low_priority_jobs": 9, + "function_generic_worker3_normal_priority_jobs": 8, + "function_generic_worker3_workers_available": 760, + "function_prefix_generic_worker4_high_priority_jobs": 1, + "function_prefix_generic_worker4_jobs_queued": 78, + "function_prefix_generic_worker4_jobs_running": 78, + "function_prefix_generic_worker4_jobs_waiting": 0, + "function_prefix_generic_worker4_low_priority_jobs": 3, + "function_prefix_generic_worker4_normal_priority_jobs": 2, + "function_prefix_generic_worker4_workers_available": 500, + "total_high_priority_jobs": 22, + "total_jobs_queued": 162, + "total_jobs_running": 160, + "total_jobs_waiting": 2, + "total_low_priority_jobs": 30, + "total_normal_priority_jobs": 26, + "total_workers_avail": 0, + "total_workers_available": 2260, + }, + }, + "unexpected response": { + prepareMock: prepareMockUnexpectedResponse, + disconnectBeforeCleanup: false, + disconnectAfterCleanup: true, + }, + "empty response": { + prepareMock: prepareMockEmptyResponse, + disconnectBeforeCleanup: false, + disconnectAfterCleanup: true, + wantCharts: len(summaryCharts), + wantMetrics: map[string]int64{ + "total_high_priority_jobs": 0, + "total_jobs_queued": 0, + "total_jobs_running": 0, + "total_jobs_waiting": 0, + "total_low_priority_jobs": 0, + "total_normal_priority_jobs": 0, + "total_workers_avail": 0, + }, + }, + "err on connect": { + prepareMock: prepareMockErrOnConnect, + disconnectBeforeCleanup: false, + disconnectAfterCleanup: false, + }, + "err on query status": { + prepareMock: prepareMockErrOnQueryStatus, + disconnectBeforeCleanup: true, + disconnectAfterCleanup: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + gear := New() + mock := test.prepareMock() + gear.newConn = func(config Config) gearmanConn { return mock } + + mx := gear.Collect() + + require.Equal(t, test.wantMetrics, mx, "want metrics") + + if len(test.wantMetrics) > 0 { + module.TestMetricsHasAllChartsDims(t, gear.Charts(), mx) + assert.Equal(t, test.wantCharts, len(*gear.Charts()), "want charts") + } + + assert.Equal(t, test.disconnectBeforeCleanup, mock.disconnectCalled, "disconnect before cleanup") + gear.Cleanup() + assert.Equal(t, test.disconnectAfterCleanup, mock.disconnectCalled, "disconnect after cleanup") + }) + } +} + +func prepareMockOk() *mockGearmanConn { + return &mockGearmanConn{ + responseStatus: dataStatus, + responsePriorityStatus: dataPriorityStatus, + } +} + +func prepareMockErrOnConnect() *mockGearmanConn { + return &mockGearmanConn{ + errOnConnect: true, + } +} + +func prepareMockErrOnQueryStatus() *mockGearmanConn { + return &mockGearmanConn{ + errOnQueryStatus: true, + } +} + +func prepareMockUnexpectedResponse() *mockGearmanConn { + resp := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit.") + return &mockGearmanConn{ + responseStatus: resp, + responsePriorityStatus: resp, + } +} + +func prepareMockEmptyResponse() *mockGearmanConn { + return &mockGearmanConn{ + responseStatus: []byte("."), + responsePriorityStatus: []byte("."), + } +} + +type mockGearmanConn struct { + errOnConnect bool + + responseStatus []byte + errOnQueryStatus bool + + responsePriorityStatus []byte + errOnQueryPriorityStatus bool + + disconnectCalled bool +} + +func (m *mockGearmanConn) connect() error { + if m.errOnConnect { + return errors.New("mock.connect() error") + } + return nil +} + +func (m *mockGearmanConn) disconnect() { + m.disconnectCalled = true +} + +func (m *mockGearmanConn) queryStatus() ([]byte, error) { + if m.errOnQueryStatus { + return nil, errors.New("mock.queryStatus() error") + } + return m.responseStatus, nil +} + +func (m *mockGearmanConn) queryPriorityStatus() ([]byte, error) { + if m.errOnQueryPriorityStatus { + return nil, errors.New("mock.queryPriorityStatus() error") + } + return m.responsePriorityStatus, nil +} diff --git a/src/go/plugin/go.d/modules/gearman/integrations/gearman.md b/src/go/plugin/go.d/modules/gearman/integrations/gearman.md new file mode 100644 index 000000000..0a97a4cd4 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/integrations/gearman.md @@ -0,0 +1,235 @@ +<!--startmeta +custom_edit_url: "https://github.com/netdata/netdata/edit/master/src/go/plugin/go.d/modules/gearman/README.md" +meta_yaml: "https://github.com/netdata/netdata/edit/master/src/go/plugin/go.d/modules/gearman/metadata.yaml" +sidebar_label: "Gearman" +learn_status: "Published" +learn_rel_path: "Collecting Metrics/Distributed Computing Systems" +most_popular: False +message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE COLLECTOR'S metadata.yaml FILE" +endmeta--> + +# Gearman + + +<img src="https://netdata.cloud/img/gearman.png" width="150"/> + + +Plugin: go.d.plugin +Module: gearman + +<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" /> + +## Overview + +Monitors jobs activity, priority and available workers. It collects summary and function-specific statistics. + + +This collector connects to a Gearman instance via TCP socket and executes the following commands: + +- status +- priority-status + + +This collector is supported on all platforms. + +This collector supports collecting metrics from multiple instances of this integration, including remote instances. + + +### Default Behavior + +#### Auto-Detection + +By default, it detects Gearman instances running on localhost that are listening on port 4730. + + +#### Limits + +The default configuration for this integration does not impose any limits on data collection. + +#### Performance Impact + +The default configuration for this integration is not expected to impose a significant performance impact on the system. + + +## Metrics + +Metrics grouped by *scope*. + +The scope defines the instance that the metric belongs to. An instance is uniquely identified by a set of labels. + + + +### Per Gearman instance + +These metrics refer to the entire monitored application. + +This scope has no labels. + +Metrics: + +| Metric | Dimensions | Unit | +|:------|:----------|:----| +| gearman.queued_jobs_activity | running, waiting | jobs | +| gearman.queued_jobs_priority | high, normal, low | jobs | + +### Per Gearman instance + +These metrics refer to the Function (task). + +Labels: + +| Label | Description | +|:-----------|:----------------| +| function_name | Function name. | + +Metrics: + +| Metric | Dimensions | Unit | +|:------|:----------|:----| +| gearman.function_queued_jobs_activity | running, waiting | jobs | +| gearman.function_queued_jobs_priority | high, normal, low | jobs | +| gearman.function_workers | available | workers | + + + +## Alerts + +There are no alerts configured by default for this integration. + + +## Setup + +### Prerequisites + +No action required. + +### Configuration + +#### File + +The configuration file name for this integration is `go.d/gearman.conf`. + + +You can edit the configuration file using the `edit-config` script from the +Netdata [config directory](/docs/netdata-agent/configuration/README.md#the-netdata-config-directory). + +```bash +cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata +sudo ./edit-config go.d/gearman.conf +``` +#### Options + +The following options can be defined globally: update_every, autodetection_retry. + + +<details open><summary>Config options</summary> + +| Name | Description | Default | Required | +|:----|:-----------|:-------|:--------:| +| update_every | Data collection frequency. | 1 | no | +| autodetection_retry | Recheck interval in seconds. Zero means no recheck will be scheduled. | 0 | no | +| address | The IP address and port where the Gearman service listens for connections. | 127.0.0.1:11211 | yes | +| timeout | Connection, read, and write timeout duration in seconds. The timeout includes name resolution. | 1 | no | + +</details> + +#### Examples + +##### Basic + +A basic example configuration. + +<details open><summary>Config</summary> + +```yaml +jobs: + - name: local + address: 127.0.0.1:4730 + +``` +</details> + +##### Multi-instance + +> **Note**: When you define multiple jobs, their names must be unique. + +Collecting metrics from local and remote instances. + + +<details open><summary>Config</summary> + +```yaml +jobs: + - name: local + address: 127.0.0.1:4730 + + - name: remote + address: 203.0.113.0:4730 + +``` +</details> + + + +## Troubleshooting + +### Debug Mode + +**Important**: Debug mode is not supported for data collection jobs created via the UI using the Dyncfg feature. + +To troubleshoot issues with the `gearman` collector, run the `go.d.plugin` with the debug option enabled. The output +should give you clues as to why the collector isn't working. + +- Navigate to the `plugins.d` directory, usually at `/usr/libexec/netdata/plugins.d/`. If that's not the case on + your system, open `netdata.conf` and look for the `plugins` setting under `[directories]`. + + ```bash + cd /usr/libexec/netdata/plugins.d/ + ``` + +- Switch to the `netdata` user. + + ```bash + sudo -u netdata -s + ``` + +- Run the `go.d.plugin` to debug the collector: + + ```bash + ./go.d.plugin -d -m gearman + ``` + +### Getting Logs + +If you're encountering problems with the `gearman` collector, follow these steps to retrieve logs and identify potential issues: + +- **Run the command** specific to your system (systemd, non-systemd, or Docker container). +- **Examine the output** for any warnings or error messages that might indicate issues. These messages should provide clues about the root cause of the problem. + +#### System with systemd + +Use the following command to view logs generated since the last Netdata service restart: + +```bash +journalctl _SYSTEMD_INVOCATION_ID="$(systemctl show --value --property=InvocationID netdata)" --namespace=netdata --grep gearman +``` + +#### System without systemd + +Locate the collector log file, typically at `/var/log/netdata/collector.log`, and use `grep` to filter for collector's name: + +```bash +grep gearman /var/log/netdata/collector.log +``` + +**Note**: This method shows logs from all restarts. Focus on the **latest entries** for troubleshooting current issues. + +#### Docker Container + +If your Netdata runs in a Docker container named "netdata" (replace if different), use this command: + +```bash +docker logs netdata 2>&1 | grep gearman +``` + + diff --git a/src/go/plugin/go.d/modules/gearman/metadata.yaml b/src/go/plugin/go.d/modules/gearman/metadata.yaml new file mode 100644 index 000000000..2312c9a53 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/metadata.yaml @@ -0,0 +1,152 @@ +plugin_name: go.d.plugin +modules: + - meta: + id: collector-go.d.plugin-gearman + plugin_name: go.d.plugin + module_name: gearman + monitored_instance: + name: Gearman + link: https://gearman.org/ + categories: + - data-collection.distributed-computing-systems + icon_filename: "gearman.png" + related_resources: + integrations: + list: [] + info_provided_to_referring_integrations: + description: "" + keywords: + - gearman + most_popular: false + overview: + data_collection: + metrics_description: | + Monitors jobs activity, priority and available workers. It collects summary and function-specific statistics. + method_description: | + This collector connects to a Gearman instance via TCP socket and executes the following commands: + + - status + - priority-status + supported_platforms: + include: [] + exclude: [] + multi_instance: true + additional_permissions: + description: "" + default_behavior: + auto_detection: + description: | + By default, it detects Gearman instances running on localhost that are listening on port 4730. + limits: + description: "" + performance_impact: + description: "" + setup: + prerequisites: + list: [] + configuration: + file: + name: go.d/gearman.conf + options: + description: | + The following options can be defined globally: update_every, autodetection_retry. + folding: + title: Config options + enabled: true + list: + - name: update_every + description: Data collection frequency. + default_value: 1 + required: false + - name: autodetection_retry + description: Recheck interval in seconds. Zero means no recheck will be scheduled. + default_value: 0 + required: false + - name: address + description: The IP address and port where the Gearman service listens for connections. + default_value: 127.0.0.1:11211 + required: true + - name: timeout + description: Connection, read, and write timeout duration in seconds. The timeout includes name resolution. + default_value: 1 + required: false + examples: + folding: + title: Config + enabled: true + list: + - name: Basic + description: A basic example configuration. + config: | + jobs: + - name: local + address: 127.0.0.1:4730 + - name: Multi-instance + description: | + > **Note**: When you define multiple jobs, their names must be unique. + + Collecting metrics from local and remote instances. + config: | + jobs: + - name: local + address: 127.0.0.1:4730 + + - name: remote + address: 203.0.113.0:4730 + troubleshooting: + problems: + list: [] + alerts: [] + metrics: + folding: + title: Metrics + enabled: false + description: "" + availability: [] + scopes: + - name: global + description: "These metrics refer to the entire monitored application." + labels: [] + metrics: + - name: gearman.queued_jobs_activity + description: Jobs Activity + unit: "jobs" + chart_type: stacked + dimensions: + - name: running + - name: waiting + - name: gearman.queued_jobs_priority + description: Jobs Priority + unit: "jobs" + chart_type: stacked + dimensions: + - name: high + - name: normal + - name: low + - name: global + description: "These metrics refer to the Function (task)." + labels: + - name: function_name + description: Function name. + metrics: + - name: gearman.function_queued_jobs_activity + description: Function Jobs Activity + unit: "jobs" + chart_type: stacked + dimensions: + - name: running + - name: waiting + - name: gearman.function_queued_jobs_priority + description: Function Jobs Priority + unit: "jobs" + chart_type: stacked + dimensions: + - name: high + - name: normal + - name: low + - name: gearman.function_workers + description: Function Workers + unit: "workers" + chart_type: line + dimensions: + - name: available diff --git a/src/go/plugin/go.d/modules/gearman/testdata/config.json b/src/go/plugin/go.d/modules/gearman/testdata/config.json new file mode 100644 index 000000000..e86834720 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/testdata/config.json @@ -0,0 +1,5 @@ +{ + "update_every": 123, + "address": "ok", + "timeout": 123.123 +} diff --git a/src/go/plugin/go.d/modules/gearman/testdata/config.yaml b/src/go/plugin/go.d/modules/gearman/testdata/config.yaml new file mode 100644 index 000000000..1b81d09eb --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/testdata/config.yaml @@ -0,0 +1,3 @@ +update_every: 123 +address: "ok" +timeout: 123.123 diff --git a/src/go/plugin/go.d/modules/gearman/testdata/priority-status.txt b/src/go/plugin/go.d/modules/gearman/testdata/priority-status.txt new file mode 100644 index 000000000..3cb669d10 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/testdata/priority-status.txt @@ -0,0 +1,5 @@ +prefix generic_worker4 1 2 3 500 +generic_worker2 4 5 6 500 +generic_worker3 7 8 9 760 +generic_worker1 10 11 12 500 +. diff --git a/src/go/plugin/go.d/modules/gearman/testdata/status.txt b/src/go/plugin/go.d/modules/gearman/testdata/status.txt new file mode 100644 index 000000000..33d77ab83 --- /dev/null +++ b/src/go/plugin/go.d/modules/gearman/testdata/status.txt @@ -0,0 +1,5 @@ +prefix generic_worker4 78 78 500 +generic_worker2 78 78 500 +generic_worker3 2 1 760 +generic_worker1 4 3 500 +. |