summaryrefslogtreecommitdiffstats
path: root/src/go/plugin/go.d/modules/gearman
diff options
context:
space:
mode:
Diffstat (limited to 'src/go/plugin/go.d/modules/gearman')
l---------src/go/plugin/go.d/modules/gearman/README.md1
-rw-r--r--src/go/plugin/go.d/modules/gearman/charts.go158
-rw-r--r--src/go/plugin/go.d/modules/gearman/client.go80
-rw-r--r--src/go/plugin/go.d/modules/gearman/collect.go221
-rw-r--r--src/go/plugin/go.d/modules/gearman/config_schema.json44
-rw-r--r--src/go/plugin/go.d/modules/gearman/gearman.go106
-rw-r--r--src/go/plugin/go.d/modules/gearman/gearman_test.go326
-rw-r--r--src/go/plugin/go.d/modules/gearman/integrations/gearman.md235
-rw-r--r--src/go/plugin/go.d/modules/gearman/metadata.yaml152
-rw-r--r--src/go/plugin/go.d/modules/gearman/testdata/config.json5
-rw-r--r--src/go/plugin/go.d/modules/gearman/testdata/config.yaml3
-rw-r--r--src/go/plugin/go.d/modules/gearman/testdata/priority-status.txt5
-rw-r--r--src/go/plugin/go.d/modules/gearman/testdata/status.txt5
13 files changed, 1341 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/modules/gearman/README.md b/src/go/plugin/go.d/modules/gearman/README.md
new file mode 120000
index 000000000..70189d698
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/README.md
@@ -0,0 +1 @@
+integrations/gearman.md \ No newline at end of file
diff --git a/src/go/plugin/go.d/modules/gearman/charts.go b/src/go/plugin/go.d/modules/gearman/charts.go
new file mode 100644
index 000000000..425c00fd4
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/charts.go
@@ -0,0 +1,158 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package gearman
+
+import (
+ "fmt"
+ "strings"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module"
+)
+
+const (
+ prioQueuedJobsByActivity = module.Priority + iota
+ prioQueuedJobsByPriority
+
+ prioFunctionQueuedJobsByActivity
+ prioFunctionQueuedJobsByPriority
+ prioFunctionAvailableWorkers
+)
+
+var summaryCharts = module.Charts{
+ chartQueuedJobsActivity.Copy(),
+ chartQueuedJobsPriority.Copy(),
+}
+
+var (
+ chartQueuedJobsActivity = module.Chart{
+ ID: "queued_jobs_by_activity",
+ Title: "Jobs Activity",
+ Units: "jobs",
+ Fam: "jobs",
+ Ctx: "gearman.queued_jobs_activity",
+ Priority: prioQueuedJobsByActivity,
+ Type: module.Stacked,
+ Dims: module.Dims{
+ {ID: "total_jobs_running", Name: "running"},
+ {ID: "total_jobs_waiting", Name: "waiting"},
+ },
+ }
+ chartQueuedJobsPriority = module.Chart{
+ ID: "queued_jobs_by_priority",
+ Title: "Jobs Priority",
+ Units: "jobs",
+ Fam: "jobs",
+ Ctx: "gearman.queued_jobs_priority",
+ Priority: prioQueuedJobsByPriority,
+ Type: module.Stacked,
+ Dims: module.Dims{
+ {ID: "total_high_priority_jobs", Name: "high"},
+ {ID: "total_normal_priority_jobs", Name: "normal"},
+ {ID: "total_low_priority_jobs", Name: "low"},
+ },
+ }
+)
+
+var functionStatusChartsTmpl = module.Charts{
+ functionQueuedJobsActivityChartTmpl.Copy(),
+ functionWorkersChartTmpl.Copy(),
+}
+
+var (
+ functionQueuedJobsActivityChartTmpl = module.Chart{
+ ID: "function_%s_queued_jobs_by_activity",
+ Title: "Function Jobs Activity",
+ Units: "jobs",
+ Fam: "fn jobs",
+ Ctx: "gearman.function_queued_jobs_activity",
+ Priority: prioFunctionQueuedJobsByActivity,
+ Type: module.Stacked,
+ Dims: module.Dims{
+ {ID: "function_%s_jobs_running", Name: "running"},
+ {ID: "function_%s_jobs_waiting", Name: "waiting"},
+ },
+ }
+ functionWorkersChartTmpl = module.Chart{
+ ID: "function_%s_workers",
+ Title: "Function Workers",
+ Units: "workers",
+ Fam: "fn workers",
+ Ctx: "gearman.function_workers",
+ Priority: prioFunctionAvailableWorkers,
+ Type: module.Line,
+ Dims: module.Dims{
+ {ID: "function_%s_workers_available", Name: "available"},
+ },
+ }
+)
+
+var functionPriorityStatusChartsTmpl = module.Charts{
+ functionQueuedJobsByPriorityChartTmpl.Copy(),
+}
+
+var (
+ functionQueuedJobsByPriorityChartTmpl = module.Chart{
+ ID: "prio_function_%s_queued_jobs_by_priority",
+ Title: "Function Jobs Priority",
+ Units: "jobs",
+ Fam: "fn jobs",
+ Ctx: "gearman.function_queued_jobs_priority",
+ Priority: prioFunctionQueuedJobsByPriority,
+ Type: module.Stacked,
+ Dims: module.Dims{
+ {ID: "function_%s_high_priority_jobs", Name: "high"},
+ {ID: "function_%s_normal_priority_jobs", Name: "normal"},
+ {ID: "function_%s_low_priority_jobs", Name: "low"},
+ },
+ }
+)
+
+func (g *Gearman) addFunctionStatusCharts(name string) {
+ g.addFunctionCharts(name, functionStatusChartsTmpl.Copy())
+}
+
+func (g *Gearman) removeFunctionStatusCharts(name string) {
+ px := fmt.Sprintf("function_%s_", cleanFunctionName(name))
+ g.removeCharts(px)
+}
+
+func (g *Gearman) addFunctionPriorityStatusCharts(name string) {
+ g.addFunctionCharts(name, functionPriorityStatusChartsTmpl.Copy())
+}
+
+func (g *Gearman) removeFunctionPriorityStatusCharts(name string) {
+ px := fmt.Sprintf("prio_function_%s_", cleanFunctionName(name))
+ g.removeCharts(px)
+}
+
+func (g *Gearman) addFunctionCharts(name string, charts *module.Charts) {
+ charts = charts.Copy()
+
+ for _, chart := range *charts {
+ chart.ID = fmt.Sprintf(chart.ID, cleanFunctionName(name))
+ chart.Labels = []module.Label{
+ {Key: "function_name", Value: name},
+ }
+ for _, dim := range chart.Dims {
+ dim.ID = fmt.Sprintf(dim.ID, name)
+ }
+ }
+
+ if err := g.Charts().Add(*charts...); err != nil {
+ g.Warning(err)
+ }
+}
+
+func (g *Gearman) removeCharts(px string) {
+ for _, chart := range *g.Charts() {
+ if strings.HasPrefix(chart.ID, px) {
+ chart.MarkRemove()
+ chart.MarkNotCreated()
+ }
+ }
+}
+
+func cleanFunctionName(name string) string {
+ r := strings.NewReplacer(".", "_", ",", "_", " ", "_")
+ return r.Replace(name)
+}
diff --git a/src/go/plugin/go.d/modules/gearman/client.go b/src/go/plugin/go.d/modules/gearman/client.go
new file mode 100644
index 000000000..dff9a1be4
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/client.go
@@ -0,0 +1,80 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package gearman
+
+import (
+ "bytes"
+ "fmt"
+ "strings"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/socket"
+)
+
+type gearmanConn interface {
+ connect() error
+ disconnect()
+ queryStatus() ([]byte, error)
+ queryPriorityStatus() ([]byte, error)
+}
+
+func newGearmanConn(conf Config) gearmanConn {
+ return &gearmanClient{conn: socket.New(socket.Config{
+ Address: conf.Address,
+ ConnectTimeout: conf.Timeout.Duration(),
+ ReadTimeout: conf.Timeout.Duration(),
+ WriteTimeout: conf.Timeout.Duration(),
+ })}
+}
+
+type gearmanClient struct {
+ conn socket.Client
+}
+
+func (c *gearmanClient) connect() error {
+ return c.conn.Connect()
+}
+
+func (c *gearmanClient) disconnect() {
+ _ = c.conn.Disconnect()
+}
+
+func (c *gearmanClient) queryStatus() ([]byte, error) {
+ return c.query("status")
+}
+
+func (c *gearmanClient) queryPriorityStatus() ([]byte, error) {
+ return c.query("prioritystatus")
+}
+
+func (c *gearmanClient) query(cmd string) ([]byte, error) {
+ const limitReadLines = 10000
+ var num int
+ var err error
+ var b bytes.Buffer
+
+ clientErr := c.conn.Command(cmd+"\n", func(bs []byte) bool {
+ s := string(bs)
+
+ if strings.HasPrefix(s, "ERR") {
+ err = fmt.Errorf("command '%s': %s", cmd, s)
+ return false
+ }
+
+ b.WriteString(s)
+ b.WriteByte('\n')
+
+ if num++; num >= limitReadLines {
+ err = fmt.Errorf("command '%s': read line limit exceeded (%d)", cmd, limitReadLines)
+ return false
+ }
+ return !strings.HasPrefix(s, ".")
+ })
+ if clientErr != nil {
+ return nil, fmt.Errorf("command '%s' client error: %v", cmd, clientErr)
+ }
+ if err != nil {
+ return nil, err
+ }
+
+ return b.Bytes(), nil
+}
diff --git a/src/go/plugin/go.d/modules/gearman/collect.go b/src/go/plugin/go.d/modules/gearman/collect.go
new file mode 100644
index 000000000..ddfd8c96b
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/collect.go
@@ -0,0 +1,221 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package gearman
+
+import (
+ "bufio"
+ "bytes"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+)
+
+func (g *Gearman) collect() (map[string]int64, error) {
+ if g.conn == nil {
+ conn, err := g.establishConn()
+ if err != nil {
+ return nil, err
+ }
+ g.conn = conn
+ }
+
+ status, err := g.conn.queryStatus()
+ if err != nil {
+ g.Cleanup()
+ return nil, fmt.Errorf("couldn't query status: %v", err)
+ }
+
+ prioStatus, err := g.conn.queryPriorityStatus()
+ if err != nil {
+ g.Cleanup()
+ return nil, fmt.Errorf("couldn't query priority status: %v", err)
+ }
+
+ mx := make(map[string]int64)
+
+ if err := g.collectStatus(mx, status); err != nil {
+ return nil, fmt.Errorf("couldn't collect status: %v", err)
+ }
+ if err := g.collectPriorityStatus(mx, prioStatus); err != nil {
+ return nil, fmt.Errorf("couldn't collect priority status: %v", err)
+ }
+
+ return mx, nil
+
+}
+
+func (g *Gearman) collectStatus(mx map[string]int64, statusData []byte) error {
+ /*
+ Same output as the "gearadmin --status" command:
+
+ FUNCTION\tTOTAL\tRUNNING\tAVAILABLE_WORKERS
+
+ E.g.:
+
+ prefix generic_worker4 78 78 500
+ generic_worker2 78 78 500
+ generic_worker3 0 0 760
+ generic_worker1 0 0 500
+ */
+
+ seen := make(map[string]bool)
+ var foundEnd bool
+ sc := bufio.NewScanner(bytes.NewReader(statusData))
+
+ mx["total_jobs_queued"] = 0
+ mx["total_jobs_running"] = 0
+ mx["total_jobs_waiting"] = 0
+ mx["total_workers_avail"] = 0
+
+ for sc.Scan() {
+ line := strings.TrimSpace(sc.Text())
+
+ if foundEnd = line == "."; foundEnd {
+ break
+ }
+
+ parts := strings.Fields(line)
+
+ // Gearman does not remove old tasks. We are only interested in tasks that have stats.
+ if len(parts) < 4 {
+ continue
+ }
+
+ name := strings.Join(parts[:len(parts)-3], "_")
+ metrics := parts[len(parts)-3:]
+
+ var queued, running, availWorkers int64
+ var err error
+
+ if queued, err = strconv.ParseInt(metrics[0], 10, 64); err != nil {
+ return fmt.Errorf("couldn't parse queued count: %v", err)
+ }
+ if running, err = strconv.ParseInt(metrics[1], 10, 64); err != nil {
+ return fmt.Errorf("couldn't parse running count: %v", err)
+ }
+ if availWorkers, err = strconv.ParseInt(metrics[2], 10, 64); err != nil {
+ return fmt.Errorf("couldn't parse available count: %v", err)
+ }
+
+ px := fmt.Sprintf("function_%s_", name)
+
+ waiting := queued - running
+
+ mx[px+"jobs_queued"] = queued
+ mx[px+"jobs_running"] = running
+ mx[px+"jobs_waiting"] = waiting
+ mx[px+"workers_available"] = availWorkers
+
+ mx["total_jobs_queued"] += queued
+ mx["total_jobs_running"] += running
+ mx["total_jobs_waiting"] += waiting
+ mx["total_workers_available"] += availWorkers
+
+ seen[name] = true
+ }
+
+ if !foundEnd {
+ return errors.New("unexpected status response")
+ }
+
+ for name := range seen {
+ if !g.seenTasks[name] {
+ g.seenTasks[name] = true
+ g.addFunctionStatusCharts(name)
+ }
+ }
+ for name := range g.seenTasks {
+ if !seen[name] {
+ delete(g.seenTasks, name)
+ g.removeFunctionStatusCharts(name)
+ }
+ }
+
+ return nil
+}
+
+func (g *Gearman) collectPriorityStatus(mx map[string]int64, prioStatusData []byte) error {
+ /*
+ Same output as the "gearadmin --priority-status" command:
+
+ FUNCTION\tHIGH\tNORMAL\tLOW\tAVAILABLE_WORKERS
+ */
+
+ seen := make(map[string]bool)
+ var foundEnd bool
+ sc := bufio.NewScanner(bytes.NewReader(prioStatusData))
+
+ mx["total_high_priority_jobs"] = 0
+ mx["total_normal_priority_jobs"] = 0
+ mx["total_low_priority_jobs"] = 0
+
+ for sc.Scan() {
+ line := strings.TrimSpace(sc.Text())
+
+ if foundEnd = line == "."; foundEnd {
+ break
+ }
+
+ parts := strings.Fields(line)
+ if len(parts) < 5 {
+ continue
+ }
+
+ name := strings.Join(parts[:len(parts)-4], "_")
+ metrics := parts[len(parts)-4:]
+
+ var high, normal, low int64
+ var err error
+
+ if high, err = strconv.ParseInt(metrics[0], 10, 64); err != nil {
+ return fmt.Errorf("couldn't parse high count: %v", err)
+ }
+ if normal, err = strconv.ParseInt(metrics[1], 10, 64); err != nil {
+ return fmt.Errorf("couldn't parse normal count: %v", err)
+ }
+ if low, err = strconv.ParseInt(metrics[2], 10, 64); err != nil {
+ return fmt.Errorf("couldn't parse low count: %v", err)
+ }
+
+ px := fmt.Sprintf("function_%s_", name)
+
+ mx[px+"high_priority_jobs"] = high
+ mx[px+"normal_priority_jobs"] = normal
+ mx[px+"low_priority_jobs"] = low
+ mx["total_high_priority_jobs"] += high
+ mx["total_normal_priority_jobs"] += normal
+ mx["total_low_priority_jobs"] += low
+
+ seen[name] = true
+ }
+
+ if !foundEnd {
+ return errors.New("unexpected priority status response")
+ }
+
+ for name := range seen {
+ if !g.seenPriorityTasks[name] {
+ g.seenPriorityTasks[name] = true
+ g.addFunctionPriorityStatusCharts(name)
+ }
+ }
+ for name := range g.seenPriorityTasks {
+ if !seen[name] {
+ delete(g.seenPriorityTasks, name)
+ g.removeFunctionPriorityStatusCharts(name)
+ }
+ }
+
+ return nil
+}
+
+func (g *Gearman) establishConn() (gearmanConn, error) {
+ conn := g.newConn(g.Config)
+
+ if err := conn.connect(); err != nil {
+ return nil, err
+ }
+
+ return conn, nil
+}
diff --git a/src/go/plugin/go.d/modules/gearman/config_schema.json b/src/go/plugin/go.d/modules/gearman/config_schema.json
new file mode 100644
index 000000000..dd5d3a0b8
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/config_schema.json
@@ -0,0 +1,44 @@
+{
+ "jsonSchema": {
+ "$schema": "http://json-schema.org/draft-07/schema#",
+ "title": "Gearman collector configuration.",
+ "type": "object",
+ "properties": {
+ "update_every": {
+ "title": "Update every",
+ "description": "Data collection interval, measured in seconds.",
+ "type": "integer",
+ "minimum": 1,
+ "default": 1
+ },
+ "address": {
+ "title": "Address",
+ "description": "The IP address and port where the Gearman service listens for connections.",
+ "type": "string",
+ "default": "127.0.0.1:4730"
+ },
+ "timeout": {
+ "title": "Timeout",
+ "description": "Timeout for establishing a connection and communication (reading and writing) in seconds.",
+ "type": "number",
+ "minimum": 0.5,
+ "default": 1
+ }
+ },
+ "required": [
+ "address"
+ ],
+ "additionalProperties": false,
+ "patternProperties": {
+ "^name$": {}
+ }
+ },
+ "uiSchema": {
+ "uiOptions": {
+ "fullPage": true
+ },
+ "timeout": {
+ "ui:help": "Accepts decimals for precise control (e.g., type 1.5 for 1.5 seconds)."
+ }
+ }
+}
diff --git a/src/go/plugin/go.d/modules/gearman/gearman.go b/src/go/plugin/go.d/modules/gearman/gearman.go
new file mode 100644
index 000000000..e1780a95c
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/gearman.go
@@ -0,0 +1,106 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package gearman
+
+import (
+ _ "embed"
+ "errors"
+ "time"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module"
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web"
+)
+
+//go:embed "config_schema.json"
+var configSchema string
+
+func init() {
+ module.Register("gearman", module.Creator{
+ JobConfigSchema: configSchema,
+ Create: func() module.Module { return New() },
+ Config: func() any { return &Config{} },
+ })
+}
+
+func New() *Gearman {
+ return &Gearman{
+ Config: Config{
+ Address: "127.0.0.1:4730",
+ Timeout: web.Duration(time.Second * 1),
+ },
+ newConn: newGearmanConn,
+ charts: summaryCharts.Copy(),
+ seenTasks: make(map[string]bool),
+ seenPriorityTasks: make(map[string]bool),
+ }
+}
+
+type Config struct {
+ UpdateEvery int `yaml:"update_every,omitempty" json:"update_every"`
+ Address string `yaml:"address" json:"address"`
+ Timeout web.Duration `yaml:"timeout" json:"timeout"`
+}
+
+type Gearman struct {
+ module.Base
+ Config `yaml:",inline" json:""`
+
+ charts *module.Charts
+
+ newConn func(Config) gearmanConn
+ conn gearmanConn
+
+ seenTasks map[string]bool
+ seenPriorityTasks map[string]bool
+}
+
+func (g *Gearman) Configuration() any {
+ return g.Config
+}
+
+func (g *Gearman) Init() error {
+ if g.Address == "" {
+ g.Error("config: 'address' not set")
+ return errors.New("address not set")
+ }
+
+ return nil
+}
+
+func (g *Gearman) Check() error {
+ mx, err := g.collect()
+ if err != nil {
+ g.Error(err)
+ return err
+ }
+
+ if len(mx) == 0 {
+ return errors.New("no metrics collected")
+ }
+
+ return nil
+}
+
+func (g *Gearman) Charts() *module.Charts {
+ return g.charts
+}
+
+func (g *Gearman) Collect() map[string]int64 {
+ mx, err := g.collect()
+ if err != nil {
+ g.Error(err)
+ }
+
+ if len(mx) == 0 {
+ return nil
+ }
+
+ return mx
+}
+
+func (g *Gearman) Cleanup() {
+ if g.conn != nil {
+ g.conn.disconnect()
+ g.conn = nil
+ }
+}
diff --git a/src/go/plugin/go.d/modules/gearman/gearman_test.go b/src/go/plugin/go.d/modules/gearman/gearman_test.go
new file mode 100644
index 000000000..43069abce
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/gearman_test.go
@@ -0,0 +1,326 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+package gearman
+
+import (
+ "errors"
+ "os"
+ "testing"
+
+ "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+var (
+ dataConfigJSON, _ = os.ReadFile("testdata/config.json")
+ dataConfigYAML, _ = os.ReadFile("testdata/config.yaml")
+
+ dataStatus, _ = os.ReadFile("testdata/status.txt")
+ dataPriorityStatus, _ = os.ReadFile("testdata/priority-status.txt")
+)
+
+func Test_testDataIsValid(t *testing.T) {
+ for name, data := range map[string][]byte{
+ "dataConfigJSON": dataConfigJSON,
+ "dataConfigYAML": dataConfigYAML,
+
+ "dataStatus": dataStatus,
+ "dataPriorityStatus": dataPriorityStatus,
+ } {
+ require.NotNil(t, data, name)
+ }
+}
+
+func TestGearman_ConfigurationSerialize(t *testing.T) {
+ module.TestConfigurationSerialize(t, &Gearman{}, dataConfigJSON, dataConfigYAML)
+}
+
+func TestGearman_Init(t *testing.T) {
+ tests := map[string]struct {
+ config Config
+ wantFail bool
+ }{
+ "success with default config": {
+ wantFail: false,
+ config: New().Config,
+ },
+ "fails if address not set": {
+ wantFail: true,
+ config: func() Config {
+ conf := New().Config
+ conf.Address = ""
+ return conf
+ }(),
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ gear := New()
+ gear.Config = test.config
+
+ if test.wantFail {
+ assert.Error(t, gear.Init())
+ } else {
+ assert.NoError(t, gear.Init())
+ }
+ })
+ }
+}
+
+func TestGearman_Cleanup(t *testing.T) {
+ tests := map[string]struct {
+ prepare func() *Gearman
+ }{
+ "not initialized": {
+ prepare: func() *Gearman {
+ return New()
+ },
+ },
+ "after check": {
+ prepare: func() *Gearman {
+ gear := New()
+ gear.newConn = func(config Config) gearmanConn { return prepareMockOk() }
+ _ = gear.Check()
+ return gear
+ },
+ },
+ "after collect": {
+ prepare: func() *Gearman {
+ gear := New()
+ gear.newConn = func(config Config) gearmanConn { return prepareMockOk() }
+ _ = gear.Collect()
+ return gear
+ },
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ gear := test.prepare()
+
+ assert.NotPanics(t, gear.Cleanup)
+ })
+ }
+}
+
+func TestGearman_Charts(t *testing.T) {
+ assert.NotNil(t, New().Charts())
+}
+
+func TestGearman_Check(t *testing.T) {
+ tests := map[string]struct {
+ prepareMock func() *mockGearmanConn
+ wantFail bool
+ }{
+ "success case": {
+ wantFail: false,
+ prepareMock: prepareMockOk,
+ },
+ "err on connect": {
+ wantFail: true,
+ prepareMock: prepareMockErrOnConnect,
+ },
+ "unexpected response": {
+ wantFail: true,
+ prepareMock: prepareMockUnexpectedResponse,
+ },
+ "empty response": {
+ wantFail: false,
+ prepareMock: prepareMockEmptyResponse,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ gear := New()
+ mock := test.prepareMock()
+ gear.newConn = func(config Config) gearmanConn { return mock }
+
+ if test.wantFail {
+ assert.Error(t, gear.Check())
+ } else {
+ assert.NoError(t, gear.Check())
+ }
+ })
+ }
+}
+
+func TestGearman_Collect(t *testing.T) {
+ tests := map[string]struct {
+ prepareMock func() *mockGearmanConn
+ wantMetrics map[string]int64
+ wantCharts int
+ disconnectBeforeCleanup bool
+ disconnectAfterCleanup bool
+ }{
+ "success case": {
+ prepareMock: prepareMockOk,
+ disconnectBeforeCleanup: false,
+ disconnectAfterCleanup: true,
+ wantCharts: len(summaryCharts) + len(functionStatusChartsTmpl)*4 + len(functionPriorityStatusChartsTmpl)*4,
+ wantMetrics: map[string]int64{
+ "function_generic_worker1_high_priority_jobs": 10,
+ "function_generic_worker1_jobs_queued": 4,
+ "function_generic_worker1_jobs_running": 3,
+ "function_generic_worker1_jobs_waiting": 1,
+ "function_generic_worker1_low_priority_jobs": 12,
+ "function_generic_worker1_normal_priority_jobs": 11,
+ "function_generic_worker1_workers_available": 500,
+ "function_generic_worker2_high_priority_jobs": 4,
+ "function_generic_worker2_jobs_queued": 78,
+ "function_generic_worker2_jobs_running": 78,
+ "function_generic_worker2_jobs_waiting": 0,
+ "function_generic_worker2_low_priority_jobs": 6,
+ "function_generic_worker2_normal_priority_jobs": 5,
+ "function_generic_worker2_workers_available": 500,
+ "function_generic_worker3_high_priority_jobs": 7,
+ "function_generic_worker3_jobs_queued": 2,
+ "function_generic_worker3_jobs_running": 1,
+ "function_generic_worker3_jobs_waiting": 1,
+ "function_generic_worker3_low_priority_jobs": 9,
+ "function_generic_worker3_normal_priority_jobs": 8,
+ "function_generic_worker3_workers_available": 760,
+ "function_prefix_generic_worker4_high_priority_jobs": 1,
+ "function_prefix_generic_worker4_jobs_queued": 78,
+ "function_prefix_generic_worker4_jobs_running": 78,
+ "function_prefix_generic_worker4_jobs_waiting": 0,
+ "function_prefix_generic_worker4_low_priority_jobs": 3,
+ "function_prefix_generic_worker4_normal_priority_jobs": 2,
+ "function_prefix_generic_worker4_workers_available": 500,
+ "total_high_priority_jobs": 22,
+ "total_jobs_queued": 162,
+ "total_jobs_running": 160,
+ "total_jobs_waiting": 2,
+ "total_low_priority_jobs": 30,
+ "total_normal_priority_jobs": 26,
+ "total_workers_avail": 0,
+ "total_workers_available": 2260,
+ },
+ },
+ "unexpected response": {
+ prepareMock: prepareMockUnexpectedResponse,
+ disconnectBeforeCleanup: false,
+ disconnectAfterCleanup: true,
+ },
+ "empty response": {
+ prepareMock: prepareMockEmptyResponse,
+ disconnectBeforeCleanup: false,
+ disconnectAfterCleanup: true,
+ wantCharts: len(summaryCharts),
+ wantMetrics: map[string]int64{
+ "total_high_priority_jobs": 0,
+ "total_jobs_queued": 0,
+ "total_jobs_running": 0,
+ "total_jobs_waiting": 0,
+ "total_low_priority_jobs": 0,
+ "total_normal_priority_jobs": 0,
+ "total_workers_avail": 0,
+ },
+ },
+ "err on connect": {
+ prepareMock: prepareMockErrOnConnect,
+ disconnectBeforeCleanup: false,
+ disconnectAfterCleanup: false,
+ },
+ "err on query status": {
+ prepareMock: prepareMockErrOnQueryStatus,
+ disconnectBeforeCleanup: true,
+ disconnectAfterCleanup: true,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ gear := New()
+ mock := test.prepareMock()
+ gear.newConn = func(config Config) gearmanConn { return mock }
+
+ mx := gear.Collect()
+
+ require.Equal(t, test.wantMetrics, mx, "want metrics")
+
+ if len(test.wantMetrics) > 0 {
+ module.TestMetricsHasAllChartsDims(t, gear.Charts(), mx)
+ assert.Equal(t, test.wantCharts, len(*gear.Charts()), "want charts")
+ }
+
+ assert.Equal(t, test.disconnectBeforeCleanup, mock.disconnectCalled, "disconnect before cleanup")
+ gear.Cleanup()
+ assert.Equal(t, test.disconnectAfterCleanup, mock.disconnectCalled, "disconnect after cleanup")
+ })
+ }
+}
+
+func prepareMockOk() *mockGearmanConn {
+ return &mockGearmanConn{
+ responseStatus: dataStatus,
+ responsePriorityStatus: dataPriorityStatus,
+ }
+}
+
+func prepareMockErrOnConnect() *mockGearmanConn {
+ return &mockGearmanConn{
+ errOnConnect: true,
+ }
+}
+
+func prepareMockErrOnQueryStatus() *mockGearmanConn {
+ return &mockGearmanConn{
+ errOnQueryStatus: true,
+ }
+}
+
+func prepareMockUnexpectedResponse() *mockGearmanConn {
+ resp := []byte("Lorem ipsum dolor sit amet, consectetur adipiscing elit.")
+ return &mockGearmanConn{
+ responseStatus: resp,
+ responsePriorityStatus: resp,
+ }
+}
+
+func prepareMockEmptyResponse() *mockGearmanConn {
+ return &mockGearmanConn{
+ responseStatus: []byte("."),
+ responsePriorityStatus: []byte("."),
+ }
+}
+
+type mockGearmanConn struct {
+ errOnConnect bool
+
+ responseStatus []byte
+ errOnQueryStatus bool
+
+ responsePriorityStatus []byte
+ errOnQueryPriorityStatus bool
+
+ disconnectCalled bool
+}
+
+func (m *mockGearmanConn) connect() error {
+ if m.errOnConnect {
+ return errors.New("mock.connect() error")
+ }
+ return nil
+}
+
+func (m *mockGearmanConn) disconnect() {
+ m.disconnectCalled = true
+}
+
+func (m *mockGearmanConn) queryStatus() ([]byte, error) {
+ if m.errOnQueryStatus {
+ return nil, errors.New("mock.queryStatus() error")
+ }
+ return m.responseStatus, nil
+}
+
+func (m *mockGearmanConn) queryPriorityStatus() ([]byte, error) {
+ if m.errOnQueryPriorityStatus {
+ return nil, errors.New("mock.queryPriorityStatus() error")
+ }
+ return m.responsePriorityStatus, nil
+}
diff --git a/src/go/plugin/go.d/modules/gearman/integrations/gearman.md b/src/go/plugin/go.d/modules/gearman/integrations/gearman.md
new file mode 100644
index 000000000..0a97a4cd4
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/integrations/gearman.md
@@ -0,0 +1,235 @@
+<!--startmeta
+custom_edit_url: "https://github.com/netdata/netdata/edit/master/src/go/plugin/go.d/modules/gearman/README.md"
+meta_yaml: "https://github.com/netdata/netdata/edit/master/src/go/plugin/go.d/modules/gearman/metadata.yaml"
+sidebar_label: "Gearman"
+learn_status: "Published"
+learn_rel_path: "Collecting Metrics/Distributed Computing Systems"
+most_popular: False
+message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE COLLECTOR'S metadata.yaml FILE"
+endmeta-->
+
+# Gearman
+
+
+<img src="https://netdata.cloud/img/gearman.png" width="150"/>
+
+
+Plugin: go.d.plugin
+Module: gearman
+
+<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" />
+
+## Overview
+
+Monitors jobs activity, priority and available workers. It collects summary and function-specific statistics.
+
+
+This collector connects to a Gearman instance via TCP socket and executes the following commands:
+
+- status
+- priority-status
+
+
+This collector is supported on all platforms.
+
+This collector supports collecting metrics from multiple instances of this integration, including remote instances.
+
+
+### Default Behavior
+
+#### Auto-Detection
+
+By default, it detects Gearman instances running on localhost that are listening on port 4730.
+
+
+#### Limits
+
+The default configuration for this integration does not impose any limits on data collection.
+
+#### Performance Impact
+
+The default configuration for this integration is not expected to impose a significant performance impact on the system.
+
+
+## Metrics
+
+Metrics grouped by *scope*.
+
+The scope defines the instance that the metric belongs to. An instance is uniquely identified by a set of labels.
+
+
+
+### Per Gearman instance
+
+These metrics refer to the entire monitored application.
+
+This scope has no labels.
+
+Metrics:
+
+| Metric | Dimensions | Unit |
+|:------|:----------|:----|
+| gearman.queued_jobs_activity | running, waiting | jobs |
+| gearman.queued_jobs_priority | high, normal, low | jobs |
+
+### Per Gearman instance
+
+These metrics refer to the Function (task).
+
+Labels:
+
+| Label | Description |
+|:-----------|:----------------|
+| function_name | Function name. |
+
+Metrics:
+
+| Metric | Dimensions | Unit |
+|:------|:----------|:----|
+| gearman.function_queued_jobs_activity | running, waiting | jobs |
+| gearman.function_queued_jobs_priority | high, normal, low | jobs |
+| gearman.function_workers | available | workers |
+
+
+
+## Alerts
+
+There are no alerts configured by default for this integration.
+
+
+## Setup
+
+### Prerequisites
+
+No action required.
+
+### Configuration
+
+#### File
+
+The configuration file name for this integration is `go.d/gearman.conf`.
+
+
+You can edit the configuration file using the `edit-config` script from the
+Netdata [config directory](/docs/netdata-agent/configuration/README.md#the-netdata-config-directory).
+
+```bash
+cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata
+sudo ./edit-config go.d/gearman.conf
+```
+#### Options
+
+The following options can be defined globally: update_every, autodetection_retry.
+
+
+<details open><summary>Config options</summary>
+
+| Name | Description | Default | Required |
+|:----|:-----------|:-------|:--------:|
+| update_every | Data collection frequency. | 1 | no |
+| autodetection_retry | Recheck interval in seconds. Zero means no recheck will be scheduled. | 0 | no |
+| address | The IP address and port where the Gearman service listens for connections. | 127.0.0.1:11211 | yes |
+| timeout | Connection, read, and write timeout duration in seconds. The timeout includes name resolution. | 1 | no |
+
+</details>
+
+#### Examples
+
+##### Basic
+
+A basic example configuration.
+
+<details open><summary>Config</summary>
+
+```yaml
+jobs:
+ - name: local
+ address: 127.0.0.1:4730
+
+```
+</details>
+
+##### Multi-instance
+
+> **Note**: When you define multiple jobs, their names must be unique.
+
+Collecting metrics from local and remote instances.
+
+
+<details open><summary>Config</summary>
+
+```yaml
+jobs:
+ - name: local
+ address: 127.0.0.1:4730
+
+ - name: remote
+ address: 203.0.113.0:4730
+
+```
+</details>
+
+
+
+## Troubleshooting
+
+### Debug Mode
+
+**Important**: Debug mode is not supported for data collection jobs created via the UI using the Dyncfg feature.
+
+To troubleshoot issues with the `gearman` collector, run the `go.d.plugin` with the debug option enabled. The output
+should give you clues as to why the collector isn't working.
+
+- Navigate to the `plugins.d` directory, usually at `/usr/libexec/netdata/plugins.d/`. If that's not the case on
+ your system, open `netdata.conf` and look for the `plugins` setting under `[directories]`.
+
+ ```bash
+ cd /usr/libexec/netdata/plugins.d/
+ ```
+
+- Switch to the `netdata` user.
+
+ ```bash
+ sudo -u netdata -s
+ ```
+
+- Run the `go.d.plugin` to debug the collector:
+
+ ```bash
+ ./go.d.plugin -d -m gearman
+ ```
+
+### Getting Logs
+
+If you're encountering problems with the `gearman` collector, follow these steps to retrieve logs and identify potential issues:
+
+- **Run the command** specific to your system (systemd, non-systemd, or Docker container).
+- **Examine the output** for any warnings or error messages that might indicate issues. These messages should provide clues about the root cause of the problem.
+
+#### System with systemd
+
+Use the following command to view logs generated since the last Netdata service restart:
+
+```bash
+journalctl _SYSTEMD_INVOCATION_ID="$(systemctl show --value --property=InvocationID netdata)" --namespace=netdata --grep gearman
+```
+
+#### System without systemd
+
+Locate the collector log file, typically at `/var/log/netdata/collector.log`, and use `grep` to filter for collector's name:
+
+```bash
+grep gearman /var/log/netdata/collector.log
+```
+
+**Note**: This method shows logs from all restarts. Focus on the **latest entries** for troubleshooting current issues.
+
+#### Docker Container
+
+If your Netdata runs in a Docker container named "netdata" (replace if different), use this command:
+
+```bash
+docker logs netdata 2>&1 | grep gearman
+```
+
+
diff --git a/src/go/plugin/go.d/modules/gearman/metadata.yaml b/src/go/plugin/go.d/modules/gearman/metadata.yaml
new file mode 100644
index 000000000..2312c9a53
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/metadata.yaml
@@ -0,0 +1,152 @@
+plugin_name: go.d.plugin
+modules:
+ - meta:
+ id: collector-go.d.plugin-gearman
+ plugin_name: go.d.plugin
+ module_name: gearman
+ monitored_instance:
+ name: Gearman
+ link: https://gearman.org/
+ categories:
+ - data-collection.distributed-computing-systems
+ icon_filename: "gearman.png"
+ related_resources:
+ integrations:
+ list: []
+ info_provided_to_referring_integrations:
+ description: ""
+ keywords:
+ - gearman
+ most_popular: false
+ overview:
+ data_collection:
+ metrics_description: |
+ Monitors jobs activity, priority and available workers. It collects summary and function-specific statistics.
+ method_description: |
+ This collector connects to a Gearman instance via TCP socket and executes the following commands:
+
+ - status
+ - priority-status
+ supported_platforms:
+ include: []
+ exclude: []
+ multi_instance: true
+ additional_permissions:
+ description: ""
+ default_behavior:
+ auto_detection:
+ description: |
+ By default, it detects Gearman instances running on localhost that are listening on port 4730.
+ limits:
+ description: ""
+ performance_impact:
+ description: ""
+ setup:
+ prerequisites:
+ list: []
+ configuration:
+ file:
+ name: go.d/gearman.conf
+ options:
+ description: |
+ The following options can be defined globally: update_every, autodetection_retry.
+ folding:
+ title: Config options
+ enabled: true
+ list:
+ - name: update_every
+ description: Data collection frequency.
+ default_value: 1
+ required: false
+ - name: autodetection_retry
+ description: Recheck interval in seconds. Zero means no recheck will be scheduled.
+ default_value: 0
+ required: false
+ - name: address
+ description: The IP address and port where the Gearman service listens for connections.
+ default_value: 127.0.0.1:11211
+ required: true
+ - name: timeout
+ description: Connection, read, and write timeout duration in seconds. The timeout includes name resolution.
+ default_value: 1
+ required: false
+ examples:
+ folding:
+ title: Config
+ enabled: true
+ list:
+ - name: Basic
+ description: A basic example configuration.
+ config: |
+ jobs:
+ - name: local
+ address: 127.0.0.1:4730
+ - name: Multi-instance
+ description: |
+ > **Note**: When you define multiple jobs, their names must be unique.
+
+ Collecting metrics from local and remote instances.
+ config: |
+ jobs:
+ - name: local
+ address: 127.0.0.1:4730
+
+ - name: remote
+ address: 203.0.113.0:4730
+ troubleshooting:
+ problems:
+ list: []
+ alerts: []
+ metrics:
+ folding:
+ title: Metrics
+ enabled: false
+ description: ""
+ availability: []
+ scopes:
+ - name: global
+ description: "These metrics refer to the entire monitored application."
+ labels: []
+ metrics:
+ - name: gearman.queued_jobs_activity
+ description: Jobs Activity
+ unit: "jobs"
+ chart_type: stacked
+ dimensions:
+ - name: running
+ - name: waiting
+ - name: gearman.queued_jobs_priority
+ description: Jobs Priority
+ unit: "jobs"
+ chart_type: stacked
+ dimensions:
+ - name: high
+ - name: normal
+ - name: low
+ - name: global
+ description: "These metrics refer to the Function (task)."
+ labels:
+ - name: function_name
+ description: Function name.
+ metrics:
+ - name: gearman.function_queued_jobs_activity
+ description: Function Jobs Activity
+ unit: "jobs"
+ chart_type: stacked
+ dimensions:
+ - name: running
+ - name: waiting
+ - name: gearman.function_queued_jobs_priority
+ description: Function Jobs Priority
+ unit: "jobs"
+ chart_type: stacked
+ dimensions:
+ - name: high
+ - name: normal
+ - name: low
+ - name: gearman.function_workers
+ description: Function Workers
+ unit: "workers"
+ chart_type: line
+ dimensions:
+ - name: available
diff --git a/src/go/plugin/go.d/modules/gearman/testdata/config.json b/src/go/plugin/go.d/modules/gearman/testdata/config.json
new file mode 100644
index 000000000..e86834720
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/testdata/config.json
@@ -0,0 +1,5 @@
+{
+ "update_every": 123,
+ "address": "ok",
+ "timeout": 123.123
+}
diff --git a/src/go/plugin/go.d/modules/gearman/testdata/config.yaml b/src/go/plugin/go.d/modules/gearman/testdata/config.yaml
new file mode 100644
index 000000000..1b81d09eb
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/testdata/config.yaml
@@ -0,0 +1,3 @@
+update_every: 123
+address: "ok"
+timeout: 123.123
diff --git a/src/go/plugin/go.d/modules/gearman/testdata/priority-status.txt b/src/go/plugin/go.d/modules/gearman/testdata/priority-status.txt
new file mode 100644
index 000000000..3cb669d10
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/testdata/priority-status.txt
@@ -0,0 +1,5 @@
+prefix generic_worker4 1 2 3 500
+generic_worker2 4 5 6 500
+generic_worker3 7 8 9 760
+generic_worker1 10 11 12 500
+.
diff --git a/src/go/plugin/go.d/modules/gearman/testdata/status.txt b/src/go/plugin/go.d/modules/gearman/testdata/status.txt
new file mode 100644
index 000000000..33d77ab83
--- /dev/null
+++ b/src/go/plugin/go.d/modules/gearman/testdata/status.txt
@@ -0,0 +1,5 @@
+prefix generic_worker4 78 78 500
+generic_worker2 78 78 500
+generic_worker3 2 1 760
+generic_worker1 4 3 500
+.