diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:24 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-08-26 08:15:35 +0000 |
commit | f09848204fa5283d21ea43e262ee41aa578e1808 (patch) | |
tree | c62385d7adf209fa6a798635954d887f718fb3fb /src/go/plugin/go.d/modules/activemq | |
parent | Releasing debian version 1.46.3-2. (diff) | |
download | netdata-f09848204fa5283d21ea43e262ee41aa578e1808.tar.xz netdata-f09848204fa5283d21ea43e262ee41aa578e1808.zip |
Merging upstream version 1.47.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/go/plugin/go.d/modules/activemq')
l--------- | src/go/plugin/go.d/modules/activemq/README.md | 1 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/activemq.go | 138 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/activemq_test.go | 340 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/apiclient.go | 137 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/charts.go | 46 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/collect.go | 185 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/config_schema.json | 234 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/init.go | 32 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/integrations/activemq.md | 268 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/metadata.yaml | 230 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/testdata/config.json | 25 | ||||
-rw-r--r-- | src/go/plugin/go.d/modules/activemq/testdata/config.yaml | 22 |
12 files changed, 1658 insertions, 0 deletions
diff --git a/src/go/plugin/go.d/modules/activemq/README.md b/src/go/plugin/go.d/modules/activemq/README.md new file mode 120000 index 000000000..de893d1d0 --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/README.md @@ -0,0 +1 @@ +integrations/activemq.md
\ No newline at end of file diff --git a/src/go/plugin/go.d/modules/activemq/activemq.go b/src/go/plugin/go.d/modules/activemq/activemq.go new file mode 100644 index 000000000..bf47be72a --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/activemq.go @@ -0,0 +1,138 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package activemq + +import ( + _ "embed" + "errors" + "time" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/matcher" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web" +) + +//go:embed "config_schema.json" +var configSchema string + +func init() { + module.Register("activemq", module.Creator{ + JobConfigSchema: configSchema, + Create: func() module.Module { return New() }, + Config: func() any { return &Config{} }, + }) +} + +func New() *ActiveMQ { + return &ActiveMQ{ + Config: Config{ + HTTP: web.HTTP{ + Request: web.Request{ + URL: "http://127.0.0.1:8161", + }, + Client: web.Client{ + Timeout: web.Duration(time.Second), + }, + }, + Webadmin: "admin", + MaxQueues: 50, + MaxTopics: 50, + }, + charts: &Charts{}, + activeQueues: make(map[string]bool), + activeTopics: make(map[string]bool), + } +} + +type Config struct { + UpdateEvery int `yaml:"update_every,omitempty" json:"update_every"` + web.HTTP `yaml:",inline" json:""` + Webadmin string `yaml:"webadmin,omitempty" json:"webadmin"` + MaxQueues int `yaml:"max_queues" json:"max_queues"` + MaxTopics int `yaml:"max_topics" json:"max_topics"` + QueuesFilter string `yaml:"queues_filter,omitempty" json:"queues_filter"` + TopicsFilter string `yaml:"topics_filter,omitempty" json:"topics_filter"` +} + +type ActiveMQ struct { + module.Base + Config `yaml:",inline" json:""` + + charts *Charts + + apiClient *apiClient + + activeQueues map[string]bool + activeTopics map[string]bool + queuesFilter matcher.Matcher + topicsFilter matcher.Matcher +} + +func (a *ActiveMQ) Configuration() any { + return a.Config +} + +func (a *ActiveMQ) Init() error { + if err := a.validateConfig(); err != nil { + a.Errorf("config validation: %v", err) + return err + } + + qf, err := a.initQueuesFiler() + if err != nil { + a.Error(err) + return err + } + a.queuesFilter = qf + + tf, err := a.initTopicsFilter() + if err != nil { + a.Error(err) + return err + } + a.topicsFilter = tf + + client, err := web.NewHTTPClient(a.Client) + if err != nil { + a.Error(err) + return err + } + + a.apiClient = newAPIClient(client, a.Request, a.Webadmin) + + return nil +} + +func (a *ActiveMQ) Check() error { + mx, err := a.collect() + if err != nil { + a.Error(err) + return err + } + if len(mx) == 0 { + return errors.New("no metrics collected") + + } + return nil +} + +func (a *ActiveMQ) Charts() *Charts { + return a.charts +} + +func (a *ActiveMQ) Cleanup() { + if a.apiClient != nil && a.apiClient.httpClient != nil { + a.apiClient.httpClient.CloseIdleConnections() + } +} + +func (a *ActiveMQ) Collect() map[string]int64 { + mx, err := a.collect() + + if err != nil { + a.Error(err) + return nil + } + + return mx +} diff --git a/src/go/plugin/go.d/modules/activemq/activemq_test.go b/src/go/plugin/go.d/modules/activemq/activemq_test.go new file mode 100644 index 000000000..e2640f440 --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/activemq_test.go @@ -0,0 +1,340 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package activemq + +import ( + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + dataConfigJSON, _ = os.ReadFile("testdata/config.json") + dataConfigYAML, _ = os.ReadFile("testdata/config.yaml") +) + +func Test_testDataIsValid(t *testing.T) { + for name, data := range map[string][]byte{ + "dataConfigJSON": dataConfigJSON, + "dataConfigYAML": dataConfigYAML, + } { + require.NotNil(t, data, name) + + } +} + +func TestActiveMQ_ConfigurationSerialize(t *testing.T) { + module.TestConfigurationSerialize(t, &ActiveMQ{}, dataConfigJSON, dataConfigYAML) +} + +var ( + queuesData = []string{ + `<queues> +<queue name="sandra"> +<stats size="1" consumerCount="1" enqueueCount="2" dequeueCount="1"/> +<feed> +<atom>queueBrowse/sandra?view=rss&feedType=atom_1.0</atom> +<rss>queueBrowse/sandra?view=rss&feedType=rss_2.0</rss> +</feed> +</queue> +<queue name="Test"> +<stats size="1" consumerCount="1" enqueueCount="2" dequeueCount="1"/> +<feed> +<atom>queueBrowse/Test?view=rss&feedType=atom_1.0</atom> +<rss>queueBrowse/Test?view=rss&feedType=rss_2.0</rss> +</feed> +</queue> +</queues>`, + `<queues> +<queue name="sandra"> +<stats size="2" consumerCount="2" enqueueCount="3" dequeueCount="2"/> +<feed> +<atom>queueBrowse/sandra?view=rss&feedType=atom_1.0</atom> +<rss>queueBrowse/sandra?view=rss&feedType=rss_2.0</rss> +</feed> +</queue> +<queue name="Test"> +<stats size="2" consumerCount="2" enqueueCount="3" dequeueCount="2"/> +<feed> +<atom>queueBrowse/Test?view=rss&feedType=atom_1.0</atom> +<rss>queueBrowse/Test?view=rss&feedType=rss_2.0</rss> +</feed> +</queue> +<queue name="Test2"> +<stats size="0" consumerCount="0" enqueueCount="0" dequeueCount="0"/> +<feed> +<atom>queueBrowse/Test?view=rss&feedType=atom_1.0</atom> +<rss>queueBrowse/Test?view=rss&feedType=rss_2.0</rss> +</feed> +</queue> +</queues>`, + `<queues> +<queue name="sandra"> +<stats size="3" consumerCount="3" enqueueCount="4" dequeueCount="3"/> +<feed> +<atom>queueBrowse/sandra?view=rss&feedType=atom_1.0</atom> +<rss>queueBrowse/sandra?view=rss&feedType=rss_2.0</rss> +</feed> +</queue> +<queue name="Test"> +<stats size="3" consumerCount="3" enqueueCount="4" dequeueCount="3"/> +<feed> +<atom>queueBrowse/Test?view=rss&feedType=atom_1.0</atom> +<rss>queueBrowse/Test?view=rss&feedType=rss_2.0</rss> +</feed> +</queue> +</queues>`, + } + + topicsData = []string{ + `<topics> +<topic name="ActiveMQ.Advisory.MasterBroker "> +<stats size="0" consumerCount="0" enqueueCount="1" dequeueCount="0"/> +</topic> +<topic name="AAA "> +<stats size="1" consumerCount="1" enqueueCount="2" dequeueCount="1"/> +</topic> +<topic name="ActiveMQ.Advisory.Topic "> +<stats size="0" consumerCount="0" enqueueCount="1" dequeueCount="0"/> +</topic> +<topic name="ActiveMQ.Advisory.Queue "> +<stats size="0" consumerCount="0" enqueueCount="2" dequeueCount="0"/> +</topic> +<topic name="AAAA "> +<stats size="1" consumerCount="1" enqueueCount="2" dequeueCount="1"/> +</topic> +</topics>`, + `<topics> +<topic name="ActiveMQ.Advisory.MasterBroker "> +<stats size="0" consumerCount="0" enqueueCount="1" dequeueCount="0"/> +</topic> +<topic name="AAA "> +<stats size="2" consumerCount="2" enqueueCount="3" dequeueCount="2"/> +</topic> +<topic name="ActiveMQ.Advisory.Topic "> +<stats size="0" consumerCount="0" enqueueCount="1" dequeueCount="0"/> +</topic> +<topic name="ActiveMQ.Advisory.Queue "> +<stats size="0" consumerCount="0" enqueueCount="2" dequeueCount="0"/> +</topic> +<topic name="AAAA "> +<stats size="2" consumerCount="2" enqueueCount="3" dequeueCount="2"/> +</topic> +<topic name="BBB "> +<stats size="1" consumerCount="1" enqueueCount="2" dequeueCount="1"/> +</topic> +</topics>`, + `<topics> +<topic name="ActiveMQ.Advisory.MasterBroker "> +<stats size="0" consumerCount="0" enqueueCount="1" dequeueCount="0"/> +</topic> +<topic name="AAA "> +<stats size="3" consumerCount="3" enqueueCount="4" dequeueCount="3"/> +</topic> +<topic name="ActiveMQ.Advisory.Topic "> +<stats size="0" consumerCount="0" enqueueCount="1" dequeueCount="0"/> +</topic> +<topic name="ActiveMQ.Advisory.Queue "> +<stats size="0" consumerCount="0" enqueueCount="2" dequeueCount="0"/> +</topic> +<topic name="AAAA "> +<stats size="3" consumerCount="3" enqueueCount="4" dequeueCount="3"/> +</topic> +</topics>`, + } +) + +func TestActiveMQ_Init(t *testing.T) { + job := New() + + // NG case + job.Webadmin = "" + assert.Error(t, job.Init()) + + // OK case + job.Webadmin = "webadmin" + assert.NoError(t, job.Init()) + assert.NotNil(t, job.apiClient) +} + +func TestActiveMQ_Check(t *testing.T) { + ts := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/webadmin/xml/queues.jsp": + _, _ = w.Write([]byte(queuesData[0])) + case "/webadmin/xml/topics.jsp": + _, _ = w.Write([]byte(topicsData[0])) + } + })) + defer ts.Close() + + job := New() + job.HTTP.Request = web.Request{URL: ts.URL} + job.Webadmin = "webadmin" + + require.NoError(t, job.Init()) + require.NoError(t, job.Check()) +} + +func TestActiveMQ_Charts(t *testing.T) { + assert.NotNil(t, New().Charts()) +} + +func TestActiveMQ_Cleanup(t *testing.T) { + New().Cleanup() +} + +func TestActiveMQ_Collect(t *testing.T) { + var collectNum int + getQueues := func() string { return queuesData[collectNum] } + getTopics := func() string { return topicsData[collectNum] } + + ts := httptest.NewServer( + http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/webadmin/xml/queues.jsp": + _, _ = w.Write([]byte(getQueues())) + case "/webadmin/xml/topics.jsp": + _, _ = w.Write([]byte(getTopics())) + } + })) + defer ts.Close() + + job := New() + job.HTTP.Request = web.Request{URL: ts.URL} + job.Webadmin = "webadmin" + + require.NoError(t, job.Init()) + require.NoError(t, job.Check()) + + cases := []struct { + expected map[string]int64 + numQueues int + numTopics int + numCharts int + }{ + { + expected: map[string]int64{ + "queues_sandra_consumers": 1, + "queues_sandra_dequeued": 1, + "queues_Test_enqueued": 2, + "queues_Test_unprocessed": 1, + "topics_AAA_dequeued": 1, + "topics_AAAA_unprocessed": 1, + "queues_Test_dequeued": 1, + "topics_AAA_enqueued": 2, + "topics_AAA_unprocessed": 1, + "topics_AAAA_consumers": 1, + "topics_AAAA_dequeued": 1, + "queues_Test_consumers": 1, + "queues_sandra_enqueued": 2, + "queues_sandra_unprocessed": 1, + "topics_AAA_consumers": 1, + "topics_AAAA_enqueued": 2, + }, + numQueues: 2, + numTopics: 2, + numCharts: 12, + }, + { + expected: map[string]int64{ + "queues_sandra_enqueued": 3, + "queues_Test_enqueued": 3, + "queues_Test_unprocessed": 1, + "queues_Test2_dequeued": 0, + "topics_BBB_enqueued": 2, + "queues_sandra_dequeued": 2, + "queues_sandra_unprocessed": 1, + "queues_Test2_enqueued": 0, + "topics_AAAA_enqueued": 3, + "topics_AAAA_dequeued": 2, + "topics_BBB_unprocessed": 1, + "topics_AAA_dequeued": 2, + "topics_AAAA_unprocessed": 1, + "queues_Test_consumers": 2, + "queues_Test_dequeued": 2, + "queues_Test2_consumers": 0, + "queues_Test2_unprocessed": 0, + "topics_AAA_consumers": 2, + "topics_AAA_enqueued": 3, + "topics_BBB_dequeued": 1, + "queues_sandra_consumers": 2, + "topics_AAA_unprocessed": 1, + "topics_AAAA_consumers": 2, + "topics_BBB_consumers": 1, + }, + numQueues: 3, + numTopics: 3, + numCharts: 18, + }, + { + expected: map[string]int64{ + "queues_sandra_unprocessed": 1, + "queues_Test_unprocessed": 1, + "queues_sandra_consumers": 3, + "topics_AAAA_enqueued": 4, + "queues_sandra_dequeued": 3, + "queues_Test_consumers": 3, + "queues_Test_enqueued": 4, + "queues_Test_dequeued": 3, + "topics_AAA_consumers": 3, + "topics_AAA_unprocessed": 1, + "topics_AAAA_consumers": 3, + "topics_AAAA_unprocessed": 1, + "queues_sandra_enqueued": 4, + "topics_AAA_enqueued": 4, + "topics_AAA_dequeued": 3, + "topics_AAAA_dequeued": 3, + }, + numQueues: 2, + numTopics: 2, + numCharts: 18, + }, + } + + for _, c := range cases { + require.Equal(t, c.expected, job.Collect()) + assert.Len(t, job.activeQueues, c.numQueues) + assert.Len(t, job.activeTopics, c.numTopics) + assert.Len(t, *job.charts, c.numCharts) + collectNum++ + } +} + +func TestActiveMQ_404(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(404) + })) + defer ts.Close() + + job := New() + job.Webadmin = "webadmin" + job.HTTP.Request = web.Request{URL: ts.URL} + + require.NoError(t, job.Init()) + assert.Error(t, job.Check()) +} + +func TestActiveMQ_InvalidData(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("hello and goodbye!")) + })) + defer ts.Close() + + mod := New() + mod.Webadmin = "webadmin" + mod.HTTP.Request = web.Request{URL: ts.URL} + + require.NoError(t, mod.Init()) + assert.Error(t, mod.Check()) +} diff --git a/src/go/plugin/go.d/modules/activemq/apiclient.go b/src/go/plugin/go.d/modules/activemq/apiclient.go new file mode 100644 index 000000000..7f99c9bad --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/apiclient.go @@ -0,0 +1,137 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package activemq + +import ( + "encoding/xml" + "fmt" + "io" + "net/http" + "net/url" + "path" + + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/web" +) + +type topics struct { + XMLName xml.Name `xml:"topics"` + Items []topic `xml:"topic"` +} + +type topic struct { + XMLName xml.Name `xml:"topic"` + Name string `xml:"name,attr"` + Stats stats `xml:"stats"` +} + +type queues struct { + XMLName xml.Name `xml:"queues"` + Items []queue `xml:"queue"` +} + +type queue struct { + XMLName xml.Name `xml:"queue"` + Name string `xml:"name,attr"` + Stats stats `xml:"stats"` +} + +type stats struct { + XMLName xml.Name `xml:"stats"` + Size int64 `xml:"size,attr"` + ConsumerCount int64 `xml:"consumerCount,attr"` + EnqueueCount int64 `xml:"enqueueCount,attr"` + DequeueCount int64 `xml:"dequeueCount,attr"` +} + +const pathStats = "/%s/xml/%s.jsp" + +func newAPIClient(client *http.Client, request web.Request, webadmin string) *apiClient { + return &apiClient{ + httpClient: client, + request: request, + webadmin: webadmin, + } +} + +type apiClient struct { + httpClient *http.Client + request web.Request + webadmin string +} + +func (a *apiClient) getQueues() (*queues, error) { + req, err := a.createRequest(fmt.Sprintf(pathStats, a.webadmin, keyQueues)) + if err != nil { + return nil, fmt.Errorf("error on creating request '%s' : %v", a.request.URL, err) + } + + resp, err := a.doRequestOK(req) + + defer closeBody(resp) + + if err != nil { + return nil, err + } + + var queues queues + + if err := xml.NewDecoder(resp.Body).Decode(&queues); err != nil { + return nil, fmt.Errorf("error on decoding resp from %s : %s", req.URL, err) + } + + return &queues, nil +} + +func (a *apiClient) getTopics() (*topics, error) { + req, err := a.createRequest(fmt.Sprintf(pathStats, a.webadmin, keyTopics)) + if err != nil { + return nil, fmt.Errorf("error on creating request '%s' : %v", a.request.URL, err) + } + + resp, err := a.doRequestOK(req) + + defer closeBody(resp) + + if err != nil { + return nil, err + } + + var topics topics + + if err := xml.NewDecoder(resp.Body).Decode(&topics); err != nil { + return nil, fmt.Errorf("error on decoding resp from %s : %s", req.URL, err) + } + + return &topics, nil +} + +func (a *apiClient) doRequestOK(req *http.Request) (*http.Response, error) { + resp, err := a.httpClient.Do(req) + if err != nil { + return resp, fmt.Errorf("error on request to %s : %v", req.URL, err) + } + + if resp.StatusCode != http.StatusOK { + return resp, fmt.Errorf("%s returned HTTP status %d", req.URL, resp.StatusCode) + } + + return resp, err +} + +func (a *apiClient) createRequest(urlPath string) (*http.Request, error) { + req := a.request.Copy() + u, err := url.Parse(req.URL) + if err != nil { + return nil, err + } + u.Path = path.Join(u.Path, urlPath) + req.URL = u.String() + return web.NewHTTPRequest(req) +} + +func closeBody(resp *http.Response) { + if resp != nil && resp.Body != nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } +} diff --git a/src/go/plugin/go.d/modules/activemq/charts.go b/src/go/plugin/go.d/modules/activemq/charts.go new file mode 100644 index 000000000..a169da01a --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/charts.go @@ -0,0 +1,46 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package activemq + +import "github.com/netdata/netdata/go/plugins/plugin/go.d/agent/module" + +type ( + // Charts is an alias for module.Charts + Charts = module.Charts + // Dims is an alias for module.Dims + Dims = module.Dims +) + +var charts = Charts{ + { + ID: "%s_%s_messages", + Title: "%s Messages", + Units: "messages/s", + Fam: "", + Ctx: "activemq.messages", + Dims: Dims{ + {ID: "%s_%s_enqueued", Name: "enqueued", Algo: module.Incremental}, + {ID: "%s_%s_dequeued", Name: "dequeued", Algo: module.Incremental}, + }, + }, + { + ID: "%s_%s_unprocessed_messages", + Title: "%s Unprocessed Messages", + Units: "messages", + Fam: "", + Ctx: "activemq.unprocessed_messages", + Dims: Dims{ + {ID: "%s_%s_unprocessed", Name: "unprocessed"}, + }, + }, + { + ID: "%s_%s_consumers", + Title: "%s Consumers", + Units: "consumers", + Fam: "", + Ctx: "activemq.consumers", + Dims: Dims{ + {ID: "%s_%s_consumers", Name: "consumers"}, + }, + }, +} diff --git a/src/go/plugin/go.d/modules/activemq/collect.go b/src/go/plugin/go.d/modules/activemq/collect.go new file mode 100644 index 000000000..0dbaf5544 --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/collect.go @@ -0,0 +1,185 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package activemq + +import ( + "fmt" + "strings" +) + +const ( + keyQueues = "queues" + keyTopics = "topics" + keyAdvisory = "Advisory" +) + +var nameReplacer = strings.NewReplacer(".", "_", " ", "") + +func (a *ActiveMQ) collect() (map[string]int64, error) { + metrics := make(map[string]int64) + + var ( + queues *queues + topics *topics + err error + ) + + if queues, err = a.apiClient.getQueues(); err != nil { + return nil, err + } + + if topics, err = a.apiClient.getTopics(); err != nil { + return nil, err + } + + a.processQueues(queues, metrics) + a.processTopics(topics, metrics) + + return metrics, nil +} + +func (a *ActiveMQ) processQueues(queues *queues, metrics map[string]int64) { + var ( + count = len(a.activeQueues) + updated = make(map[string]bool) + unp int + ) + + for _, q := range queues.Items { + if strings.Contains(q.Name, keyAdvisory) { + continue + } + + if !a.activeQueues[q.Name] { + if a.MaxQueues != 0 && count > a.MaxQueues { + unp++ + continue + } + + if !a.filterQueues(q.Name) { + continue + } + + a.activeQueues[q.Name] = true + a.addQueueTopicCharts(q.Name, keyQueues) + } + + rname := nameReplacer.Replace(q.Name) + + metrics["queues_"+rname+"_consumers"] = q.Stats.ConsumerCount + metrics["queues_"+rname+"_enqueued"] = q.Stats.EnqueueCount + metrics["queues_"+rname+"_dequeued"] = q.Stats.DequeueCount + metrics["queues_"+rname+"_unprocessed"] = q.Stats.EnqueueCount - q.Stats.DequeueCount + + updated[q.Name] = true + } + + for name := range a.activeQueues { + if !updated[name] { + delete(a.activeQueues, name) + a.removeQueueTopicCharts(name, keyQueues) + } + } + + if unp > 0 { + a.Debugf("%d queues were unprocessed due to max_queues limit (%d)", unp, a.MaxQueues) + } +} + +func (a *ActiveMQ) processTopics(topics *topics, metrics map[string]int64) { + var ( + count = len(a.activeTopics) + updated = make(map[string]bool) + unp int + ) + + for _, t := range topics.Items { + if strings.Contains(t.Name, keyAdvisory) { + continue + } + + if !a.activeTopics[t.Name] { + if a.MaxTopics != 0 && count > a.MaxTopics { + unp++ + continue + } + + if !a.filterTopics(t.Name) { + continue + } + + a.activeTopics[t.Name] = true + a.addQueueTopicCharts(t.Name, keyTopics) + } + + rname := nameReplacer.Replace(t.Name) + + metrics["topics_"+rname+"_consumers"] = t.Stats.ConsumerCount + metrics["topics_"+rname+"_enqueued"] = t.Stats.EnqueueCount + metrics["topics_"+rname+"_dequeued"] = t.Stats.DequeueCount + metrics["topics_"+rname+"_unprocessed"] = t.Stats.EnqueueCount - t.Stats.DequeueCount + + updated[t.Name] = true + } + + for name := range a.activeTopics { + if !updated[name] { + // TODO: delete after timeout? + delete(a.activeTopics, name) + a.removeQueueTopicCharts(name, keyTopics) + } + } + + if unp > 0 { + a.Debugf("%d topics were unprocessed due to max_topics limit (%d)", unp, a.MaxTopics) + } +} + +func (a *ActiveMQ) filterQueues(line string) bool { + if a.queuesFilter == nil { + return true + } + return a.queuesFilter.MatchString(line) +} + +func (a *ActiveMQ) filterTopics(line string) bool { + if a.topicsFilter == nil { + return true + } + return a.topicsFilter.MatchString(line) +} + +func (a *ActiveMQ) addQueueTopicCharts(name, typ string) { + rname := nameReplacer.Replace(name) + + charts := charts.Copy() + + for _, chart := range *charts { + chart.ID = fmt.Sprintf(chart.ID, typ, rname) + chart.Title = fmt.Sprintf(chart.Title, name) + chart.Fam = typ + + for _, dim := range chart.Dims { + dim.ID = fmt.Sprintf(dim.ID, typ, rname) + } + } + + _ = a.charts.Add(*charts...) + +} + +func (a *ActiveMQ) removeQueueTopicCharts(name, typ string) { + rname := nameReplacer.Replace(name) + + chart := a.charts.Get(fmt.Sprintf("%s_%s_messages", typ, rname)) + chart.MarkRemove() + chart.MarkNotCreated() + + chart = a.charts.Get(fmt.Sprintf("%s_%s_unprocessed_messages", typ, rname)) + chart.MarkRemove() + chart.MarkNotCreated() + + chart = a.charts.Get(fmt.Sprintf("%s_%s_consumers", typ, rname)) + chart.MarkRemove() + chart.MarkNotCreated() +} diff --git a/src/go/plugin/go.d/modules/activemq/config_schema.json b/src/go/plugin/go.d/modules/activemq/config_schema.json new file mode 100644 index 000000000..df71bcadf --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/config_schema.json @@ -0,0 +1,234 @@ +{ + "jsonSchema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ActiveMQ collector configuration.", + "type": "object", + "properties": { + "update_every": { + "title": "Update every", + "description": "Data collection interval, measured in seconds.", + "type": "integer", + "minimum": 1, + "default": 1 + }, + "url": { + "title": "URL", + "description": "The base URL of the ActiveMQ [Web Console](https://activemq.apache.org/components/classic/documentation/web-console).", + "type": "string", + "default": "http://127.0.0.1:8161", + "format": "uri" + }, + "webadmin": { + "title": "Webadmin path", + "description": "Webadmin root path.", + "type": "string", + "default": "admin" + }, + "timeout": { + "title": "Timeout", + "description": "The timeout in seconds for the HTTP request.", + "type": "number", + "minimum": 0.5, + "default": 1 + }, + "not_follow_redirects": { + "title": "Not follow redirects", + "description": "If set, the client will not follow HTTP redirects automatically.", + "type": "boolean" + }, + "max_queues": { + "title": "Queue limit", + "description": "The maximum number of concurrently collected queues. Set to 0 for no limit.", + "type": "integer", + "minimum": 0, + "default": 50 + }, + "queues_filter": { + "title": "Queue selector", + "description": "Collect queues whose names match the specified [pattern](https://github.com/netdata/netdata/tree/master/src/libnetdata/simple_pattern#readme).", + "type": "string", + "minimum": 1, + "default": "*" + }, + "max_topics": { + "title": "Topic limit", + "description": "The maximum number of concurrently collected topics. Set to 0 for no limit.", + "type": "integer", + "minimum": 0, + "default": 50 + }, + "topics_filter": { + "title": "Topic selector", + "description": "Collect topics whose names match the specified [pattern](https://github.com/netdata/netdata/tree/master/src/libnetdata/simple_pattern#readme).", + "type": "string", + "minimum": 1, + "default": "*" + }, + "username": { + "title": "Username", + "description": "The username for basic authentication.", + "type": "string", + "sensitive": true + }, + "password": { + "title": "Password", + "description": "The password for basic authentication.", + "type": "string", + "sensitive": true + }, + "proxy_url": { + "title": "Proxy URL", + "description": "The URL of the proxy server.", + "type": "string" + }, + "proxy_username": { + "title": "Proxy username", + "description": "The username for proxy authentication.", + "type": "string", + "sensitive": true + }, + "proxy_password": { + "title": "Proxy password", + "description": "The password for proxy authentication.", + "type": "string", + "sensitive": true + }, + "headers": { + "title": "Headers", + "description": "Additional HTTP headers to include in the request.", + "type": [ + "object", + "null" + ], + "additionalProperties": { + "type": "string" + } + }, + "tls_skip_verify": { + "title": "Skip TLS verification", + "description": "If set, TLS certificate verification will be skipped.", + "type": "boolean" + }, + "tls_ca": { + "title": "TLS CA", + "description": "The path to the CA certificate file for TLS verification.", + "type": "string", + "pattern": "^$|^/" + }, + "tls_cert": { + "title": "TLS certificate", + "description": "The path to the client certificate file for TLS authentication.", + "type": "string", + "pattern": "^$|^/" + }, + "tls_key": { + "title": "TLS key", + "description": "The path to the client key file for TLS authentication.", + "type": "string", + "pattern": "^$|^/" + }, + "body": { + "title": "Body", + "type": "string" + }, + "method": { + "title": "Method", + "type": "string" + } + }, + "required": [ + "url", + "webadmin" + ], + "additionalProperties": false, + "patternProperties": { + "^name$": {} + } + }, + "uiSchema": { + "ui:flavour": "tabs", + "ui:options": { + "tabs": [ + { + "title": "Base", + "fields": [ + "update_every", + "url", + "webadmin", + "timeout", + "not_follow_redirects" + ] + }, + { + "title": "Filtering", + "fields": [ + "max_queues", + "queues_filter", + "max_topics", + "topics_filter" + ] + }, + { + "title": "Auth", + "fields": [ + "username", + "password" + ] + }, + { + "title": "TLS", + "fields": [ + "tls_skip_verify", + "tls_ca", + "tls_cert", + "tls_key" + ] + }, + { + "title": "Proxy", + "fields": [ + "proxy_url", + "proxy_username", + "proxy_password" + ] + }, + { + "title": "Headers", + "fields": [ + "headers" + ] + } + ] + }, + "uiOptions": { + "fullPage": true + }, + "body": { + "ui:widget": "hidden" + }, + "method": { + "ui:widget": "hidden" + }, + "timeout": { + "ui:help": "Accepts decimals for precise control (e.g., type 1.5 for 1.5 seconds)." + }, + "queues_filter": { + "ui:help": "Use `*` to collect all queues. To exclude all queues from collection, use `!*`." + }, + "topics_filter": { + "ui:help": "Use `*` to collect all topics. To exclude all topics from collection, use `!*`." + }, + "username": { + "ui:widget": "password" + }, + "proxy_username": { + "ui:widget": "password" + }, + "password": { + "ui:widget": "password" + }, + "proxy_password": { + "ui:widget": "password" + } + } +} diff --git a/src/go/plugin/go.d/modules/activemq/init.go b/src/go/plugin/go.d/modules/activemq/init.go new file mode 100644 index 000000000..e48dacad5 --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/init.go @@ -0,0 +1,32 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +package activemq + +import ( + "errors" + "github.com/netdata/netdata/go/plugins/plugin/go.d/pkg/matcher" +) + +func (a *ActiveMQ) validateConfig() error { + if a.URL == "" { + return errors.New("url not set") + } + if a.Webadmin == "" { + return errors.New("webadmin root path set") + } + return nil +} + +func (a *ActiveMQ) initQueuesFiler() (matcher.Matcher, error) { + if a.QueuesFilter == "" { + return matcher.TRUE(), nil + } + return matcher.NewSimplePatternsMatcher(a.QueuesFilter) +} + +func (a *ActiveMQ) initTopicsFilter() (matcher.Matcher, error) { + if a.TopicsFilter == "" { + return matcher.TRUE(), nil + } + return matcher.NewSimplePatternsMatcher(a.TopicsFilter) +} diff --git a/src/go/plugin/go.d/modules/activemq/integrations/activemq.md b/src/go/plugin/go.d/modules/activemq/integrations/activemq.md new file mode 100644 index 000000000..fc215bfb9 --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/integrations/activemq.md @@ -0,0 +1,268 @@ +<!--startmeta +custom_edit_url: "https://github.com/netdata/netdata/edit/master/src/go/plugin/go.d/modules/activemq/README.md" +meta_yaml: "https://github.com/netdata/netdata/edit/master/src/go/plugin/go.d/modules/activemq/metadata.yaml" +sidebar_label: "ActiveMQ" +learn_status: "Published" +learn_rel_path: "Collecting Metrics/Message Brokers" +most_popular: False +message: "DO NOT EDIT THIS FILE DIRECTLY, IT IS GENERATED BY THE COLLECTOR'S metadata.yaml FILE" +endmeta--> + +# ActiveMQ + + +<img src="https://netdata.cloud/img/activemq.png" width="150"/> + + +Plugin: go.d.plugin +Module: activemq + +<img src="https://img.shields.io/badge/maintained%20by-Netdata-%2300ab44" /> + +## Overview + +This collector monitors ActiveMQ queues and topics. + +It collects metrics by sending HTTP requests to the Web Console API. + +This collector is supported on all platforms. + +This collector supports collecting metrics from multiple instances of this integration, including remote instances. + + +### Default Behavior + +#### Auto-Detection + +This collector discovers instances running on the local host that provide metrics on port 8161. +On startup, it tries to collect metrics from: + +- http://localhost:8161 + + +#### Limits + +The default configuration for this integration does not impose any limits on data collection. + +#### Performance Impact + +The default configuration for this integration is not expected to impose a significant performance impact on the system. + + +## Metrics + +Metrics grouped by *scope*. + +The scope defines the instance that the metric belongs to. An instance is uniquely identified by a set of labels. + + + +### Per ActiveMQ instance + +These metrics refer to the entire monitored application. + +This scope has no labels. + +Metrics: + +| Metric | Dimensions | Unit | +|:------|:----------|:----| +| activemq.messages | enqueued, dequeued | messages/s | +| activemq.unprocessed_messages | unprocessed | messages | +| activemq.consumers | consumers | consumers | + + + +## Alerts + +There are no alerts configured by default for this integration. + + +## Setup + +### Prerequisites + +No action required. + +### Configuration + +#### File + +The configuration file name for this integration is `go.d/activemq.conf`. + + +You can edit the configuration file using the `edit-config` script from the +Netdata [config directory](/docs/netdata-agent/configuration/README.md#the-netdata-config-directory). + +```bash +cd /etc/netdata 2>/dev/null || cd /opt/netdata/etc/netdata +sudo ./edit-config go.d/activemq.conf +``` +#### Options + +The following options can be defined globally: update_every, autodetection_retry. + + +<details open><summary>Config options</summary> + +| Name | Description | Default | Required | +|:----|:-----------|:-------|:--------:| +| update_every | Data collection frequency. | 1 | no | +| autodetection_retry | Recheck interval in seconds. Zero means no recheck will be scheduled. | 0 | no | +| url | Server URL. | http://localhost:8161 | yes | +| webadmin | Webadmin root path. | admin | yes | +| max_queues | Maximum number of concurrently collected queues. | 50 | no | +| max_topics | Maximum number of concurrently collected topics. | 50 | no | +| queues_filter | Queues filter. Syntax is [simple patterns](/src/libnetdata/simple_pattern/README.md#simple-patterns). | | no | +| topics_filter | Topics filter. Syntax is [simple patterns](/src/libnetdata/simple_pattern/README.md#simple-patterns). | | no | +| username | Username for basic HTTP authentication. | | no | +| password | Password for basic HTTP authentication. | | no | +| proxy_username | Username for proxy basic HTTP authentication. | | no | +| proxy_password | Password for proxy basic HTTP authentication. | | no | +| method | HTTP request method. | GET | no | +| timeout | HTTP request timeout. | 1 | no | +| body | HTTP request body. | | no | +| headers | HTTP request headers. | | no | +| not_follow_redirects | Redirect handling policy. Controls whether the client follows redirects. | no | no | +| tls_skip_verify | Server certificate chain and hostname validation policy. Controls whether the client performs this check. | no | no | +| tls_ca | Certification authority that the client uses when verifying the server's certificates. | | no | +| tls_cert | Client TLS certificate. | | no | +| tls_key | Client TLS key. | | no | + +</details> + +#### Examples + +##### Basic + +A basic example configuration. + +```yaml +jobs: + - name: local + url: http://127.0.0.1:8161 + webadmin: admin + +``` +##### HTTP authentication + +Basic HTTP authentication. + +<details open><summary>Config</summary> + +```yaml +jobs: + - name: local + url: http://127.0.0.1:8161 + webadmin: admin + username: foo + password: bar + +``` +</details> + +##### Filters and limits + +Using filters and limits for queues and topics. + +<details open><summary>Config</summary> + +```yaml +jobs: + - name: local + url: http://127.0.0.1:8161 + webadmin: admin + max_queues: 100 + max_topics: 100 + queues_filter: '!sandr* *' + topics_filter: '!sandr* *' + +``` +</details> + +##### Multi-instance + +> **Note**: When you define multiple jobs, their names must be unique. + +Collecting metrics from local and remote instances. + + +<details open><summary>Config</summary> + +```yaml +jobs: + - name: local + url: http://127.0.0.1:8161 + webadmin: admin + + - name: remote + url: http://192.0.2.1:8161 + webadmin: admin + +``` +</details> + + + +## Troubleshooting + +### Debug Mode + +**Important**: Debug mode is not supported for data collection jobs created via the UI using the Dyncfg feature. + +To troubleshoot issues with the `activemq` collector, run the `go.d.plugin` with the debug option enabled. The output +should give you clues as to why the collector isn't working. + +- Navigate to the `plugins.d` directory, usually at `/usr/libexec/netdata/plugins.d/`. If that's not the case on + your system, open `netdata.conf` and look for the `plugins` setting under `[directories]`. + + ```bash + cd /usr/libexec/netdata/plugins.d/ + ``` + +- Switch to the `netdata` user. + + ```bash + sudo -u netdata -s + ``` + +- Run the `go.d.plugin` to debug the collector: + + ```bash + ./go.d.plugin -d -m activemq + ``` + +### Getting Logs + +If you're encountering problems with the `activemq` collector, follow these steps to retrieve logs and identify potential issues: + +- **Run the command** specific to your system (systemd, non-systemd, or Docker container). +- **Examine the output** for any warnings or error messages that might indicate issues. These messages should provide clues about the root cause of the problem. + +#### System with systemd + +Use the following command to view logs generated since the last Netdata service restart: + +```bash +journalctl _SYSTEMD_INVOCATION_ID="$(systemctl show --value --property=InvocationID netdata)" --namespace=netdata --grep activemq +``` + +#### System without systemd + +Locate the collector log file, typically at `/var/log/netdata/collector.log`, and use `grep` to filter for collector's name: + +```bash +grep activemq /var/log/netdata/collector.log +``` + +**Note**: This method shows logs from all restarts. Focus on the **latest entries** for troubleshooting current issues. + +#### Docker Container + +If your Netdata runs in a Docker container named "netdata" (replace if different), use this command: + +```bash +docker logs netdata 2>&1 | grep activemq +``` + + diff --git a/src/go/plugin/go.d/modules/activemq/metadata.yaml b/src/go/plugin/go.d/modules/activemq/metadata.yaml new file mode 100644 index 000000000..5bbb0e5a2 --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/metadata.yaml @@ -0,0 +1,230 @@ +plugin_name: go.d.plugin +modules: + - meta: + id: collector-go.d.plugin-activemq + module_name: activemq + plugin_name: go.d.plugin + monitored_instance: + categories: + - data-collection.message-brokers + icon_filename: activemq.png + name: ActiveMQ + link: https://activemq.apache.org/ + alternative_monitored_instances: [] + keywords: + - message broker + most_popular: false + info_provided_to_referring_integrations: + description: "" + related_resources: + integrations: + list: + - plugin_name: go.d.plugin + module_name: httpcheck + - plugin_name: apps.plugin + module_name: apps + overview: + data_collection: + metrics_description: This collector monitors ActiveMQ queues and topics. + method_description: It collects metrics by sending HTTP requests to the Web Console API. + additional_permissions: + description: "" + default_behavior: + auto_detection: + description: | + This collector discovers instances running on the local host that provide metrics on port 8161. + On startup, it tries to collect metrics from: + + - http://localhost:8161 + limits: + description: "" + performance_impact: + description: "" + multi_instance: true + supported_platforms: + include: [] + exclude: [] + setup: + prerequisites: + list: [] + configuration: + file: + name: go.d/activemq.conf + options: + description: | + The following options can be defined globally: update_every, autodetection_retry. + folding: + title: Config options + enabled: true + list: + - name: update_every + description: Data collection frequency. + default_value: 1 + required: false + - name: autodetection_retry + description: Recheck interval in seconds. Zero means no recheck will be scheduled. + default_value: 0 + required: false + - name: url + description: Server URL. + default_value: http://localhost:8161 + required: true + - name: webadmin + description: Webadmin root path. + default_value: admin + required: true + - name: max_queues + description: Maximum number of concurrently collected queues. + default_value: 50 + required: false + - name: max_topics + description: Maximum number of concurrently collected topics. + default_value: 50 + required: false + - name: queues_filter + description: | + Queues filter. Syntax is [simple patterns](/src/libnetdata/simple_pattern/README.md#simple-patterns). + default_value: "" + required: false + - name: topics_filter + description: | + Topics filter. Syntax is [simple patterns](/src/libnetdata/simple_pattern/README.md#simple-patterns). + default_value: "" + required: false + - name: username + description: Username for basic HTTP authentication. + default_value: "" + required: false + - name: password + description: Password for basic HTTP authentication. + default_value: "" + required: false + - name: proxy_username + description: Username for proxy basic HTTP authentication. + default_value: "" + required: false + - name: proxy_password + description: Password for proxy basic HTTP authentication. + default_value: "" + required: false + - name: method + description: HTTP request method. + default_value: GET + required: false + - name: timeout + description: HTTP request timeout. + default_value: 1 + required: false + - name: body + description: HTTP request body. + default_value: "" + required: false + - name: headers + description: HTTP request headers. + default_value: "" + required: false + - name: not_follow_redirects + description: Redirect handling policy. Controls whether the client follows redirects. + default_value: false + required: false + - name: tls_skip_verify + description: Server certificate chain and hostname validation policy. Controls whether the client performs this check. + default_value: false + required: false + - name: tls_ca + description: Certification authority that the client uses when verifying the server's certificates. + default_value: "" + required: false + - name: tls_cert + description: Client TLS certificate. + default_value: "" + required: false + - name: tls_key + description: Client TLS key. + default_value: "" + required: false + examples: + folding: + title: Config + enabled: true + list: + - name: Basic + folding: + enabled: false + description: A basic example configuration. + config: | + jobs: + - name: local + url: http://127.0.0.1:8161 + webadmin: admin + - name: HTTP authentication + description: Basic HTTP authentication. + config: | + jobs: + - name: local + url: http://127.0.0.1:8161 + webadmin: admin + username: foo + password: bar + - name: Filters and limits + description: Using filters and limits for queues and topics. + config: | + jobs: + - name: local + url: http://127.0.0.1:8161 + webadmin: admin + max_queues: 100 + max_topics: 100 + queues_filter: '!sandr* *' + topics_filter: '!sandr* *' + - name: Multi-instance + description: | + > **Note**: When you define multiple jobs, their names must be unique. + + Collecting metrics from local and remote instances. + config: | + jobs: + - name: local + url: http://127.0.0.1:8161 + webadmin: admin + + - name: remote + url: http://192.0.2.1:8161 + webadmin: admin + troubleshooting: + problems: + list: [] + alerts: [] + metrics: + folding: + title: Metrics + enabled: false + description: "" + availability: [] + scopes: + - name: global + description: These metrics refer to the entire monitored application. + labels: [] + metrics: + - name: activemq.messages + availability: [] + description: Messaged + unit: messages/s + chart_type: line + dimensions: + - name: enqueued + - name: dequeued + - name: activemq.unprocessed_messages + availability: [] + description: Unprocessed Messages + unit: messages + chart_type: line + dimensions: + - name: unprocessed + - name: activemq.consumers + availability: [] + description: Consumers + unit: consumers + chart_type: line + dimensions: + - name: consumers diff --git a/src/go/plugin/go.d/modules/activemq/testdata/config.json b/src/go/plugin/go.d/modules/activemq/testdata/config.json new file mode 100644 index 000000000..13327dd3f --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/testdata/config.json @@ -0,0 +1,25 @@ +{ + "update_every": 123, + "webadmin": "ok", + "max_queues": 123, + "max_topics": 123, + "queues_filter": "ok", + "topics_filter": "ok", + "url": "ok", + "body": "ok", + "method": "ok", + "headers": { + "ok": "ok" + }, + "username": "ok", + "password": "ok", + "proxy_url": "ok", + "proxy_username": "ok", + "proxy_password": "ok", + "timeout": 123.123, + "not_follow_redirects": true, + "tls_ca": "ok", + "tls_cert": "ok", + "tls_key": "ok", + "tls_skip_verify": true +} diff --git a/src/go/plugin/go.d/modules/activemq/testdata/config.yaml b/src/go/plugin/go.d/modules/activemq/testdata/config.yaml new file mode 100644 index 000000000..dbb4232e9 --- /dev/null +++ b/src/go/plugin/go.d/modules/activemq/testdata/config.yaml @@ -0,0 +1,22 @@ +update_every: 123 +webadmin: "ok" +max_queues: 123 +max_topics: 123 +queues_filter: "ok" +topics_filter: "ok" +url: "ok" +body: "ok" +method: "ok" +headers: + ok: "ok" +username: "ok" +password: "ok" +proxy_url: "ok" +proxy_username: "ok" +proxy_password: "ok" +timeout: 123.123 +not_follow_redirects: yes +tls_ca: "ok" +tls_cert: "ok" +tls_key: "ok" +tls_skip_verify: yes |