summaryrefslogtreecommitdiffstats
path: root/collectors/statsd.plugin
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2022-06-09 04:52:39 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2022-06-09 04:52:39 +0000
commit89f3604407aff8f4cb2ed958252c61e23c767e24 (patch)
tree7fbf408102cab051557d38193524d8c6e991d070 /collectors/statsd.plugin
parentAdding upstream version 1.34.1. (diff)
downloadnetdata-upstream/1.35.0.tar.xz
netdata-upstream/1.35.0.zip
Adding upstream version 1.35.0.upstream/1.35.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--collectors/statsd.plugin/README.md185
-rw-r--r--collectors/statsd.plugin/statsd.c799
2 files changed, 655 insertions, 329 deletions
diff --git a/collectors/statsd.plugin/README.md b/collectors/statsd.plugin/README.md
index aadd55bd6..7dc5dbb72 100644
--- a/collectors/statsd.plugin/README.md
+++ b/collectors/statsd.plugin/README.md
@@ -11,17 +11,17 @@ If you want to learn more about the StatsD protocol, we have written a [blog pos
Netdata is a fully featured statsd server. It can collect statsd formatted metrics, visualize them on its dashboards and store them in it's database for long-term retention.
-Netdata statsd is inside Netdata (an internal plugin, running inside the Netdata daemon), it is configured via `netdata.conf` and by-default listens on standard statsd port 8125. Netdata supports both tcp and udp packets at the same time.
+Netdata statsd is inside Netdata (an internal plugin, running inside the Netdata daemon), it is configured via `netdata.conf` and by-default listens on standard statsd port 8125. Netdata supports both TCP and UDP packets at the same time.
Since statsd is embedded in Netdata, it means you now have a statsd server embedded on all your servers.
-Netdata statsd is fast. It can collect more than **1.200.000 metrics per second** on modern hardware, more than **200Mbps of sustained statsd traffic**, using 1 CPU core. The implementation uses two threads: one thread collects metrics, another one updates the charts from the collected data.
+Netdata statsd is fast. It can collect several millions of metrics per second on modern hardware, using just 1 CPU core. The implementation uses two threads: one thread collects metrics, another thread updates the charts from the collected data.
-## Available StatsD collectors
+## Available StatsD synthetic application charts
-Netdata ships with collectors implemented using the StatsD collector. They are configuration files (as you will read below), but they function as a collector, in the sense that configuration file organize the metrics of a data source into pre-defined charts.
+Netdata ships with a few synthetic chart definitions to automatically present application metrics into a more uniform way. These synthetic charts are configuration files (you can create your own) that re-arrange statsd metrics into a more meaningful way.
-On these charts, we can have alarms as with any metric and chart.
+On synthetic charts, we can have alarms as with any metric and chart.
- [K6 load testing tool](https://k6.io)
- **Description:** k6 is a developer-centric, free and open-source load testing tool built for making performance testing a productive and enjoyable experience.
@@ -34,34 +34,36 @@ On these charts, we can have alarms as with any metric and chart.
## Metrics supported by Netdata
-Netdata fully supports the StatsD protocol. All StatsD client libraries can be used with Netdata too.
+Netdata fully supports the StatsD protocol and also extends it to support more advanced Netdata specific use cases. All StatsD client libraries can be used with Netdata too.
-- **Gauges**
+- **Gauges**
The application sends `name:value|g`, where `value` is any **decimal/fractional** number, StatsD reports the latest value collected and the number of times it was updated (events).
The application may increment or decrement a previous value, by setting the first character of the value to `+` or `-` (so, the only way to set a gauge to an absolute negative value, is to first set it to zero).
[Sampling rate](#sampling-rates) is supported.
+ [Tags](#tags) are supported for changing chart units, family and dimension name.
When a gauge is not collected and the setting is not to show gaps on the charts (the default), the last value will be shown, until a data collection event changes it.
-- **Counters** and **Meters**
+- **Counters** and **Meters**
The application sends `name:value|c`, `name:value|C` or `name:value|m`, where `value` is a positive or negative **integer** number of events occurred, StatsD reports the **rate** and the number of times it was updated (events).
- `:value` can be omitted and StatsD will assume it is `1`. `|c`, `|C` and `|m` can be omitted an StatsD will assume it is `|m`. So, the application may send just `name` and StatsD will parse it as `name:1|m`.
+ `:value` can be omitted and StatsD will assume it is `1`. `|c`, `|C` and `|m` can be omitted and StatsD will assume it is `|m`. So, the application may send just `name` and StatsD will parse it as `name:1|m`.
- Counters use `|c` (etsy/StatsD compatible) or `|C` (brubeck compatible)
- Meters use `|m`
[Sampling rate](#sampling-rates) is supported.
+ [Tags](#tags) are supported for changing chart units, family and dimension name.
- When a counter or meter is not collected, Netdata **defaults** to showing a zero value, until a data collection event changes the value.
+ When a counter or meter is not collected, StatsD **defaults** to showing a zero value, until a data collection event changes the value.
-- **Timers** and **Histograms**
+- **Timers** and **Histograms**
- The application sends `name:value|ms` or `name:value|h`, where `value` is any **decimal/fractional** number, StatsD reports **min**, **max**, **average**, **sum**, **95th percentile**, **median** and **standard deviation** and the total number of times it was updated (events).
+ The application sends `name:value|ms` or `name:value|h`, where `value` is any **decimal/fractional** number, StatsD reports **min**, **max**, **average**, **95th percentile**, **median** and **standard deviation** and the total number of times it was updated (events). Internally it also calculates the **sum**, which is available for synthetic charts.
- Timers use `|ms`
- Histograms use `|h`
@@ -69,38 +71,74 @@ Netdata fully supports the StatsD protocol. All StatsD client libraries can be u
The only difference between the two, is the `units` of the charts, as timers report *milliseconds*.
[Sampling rate](#sampling-rates) is supported.
+ [Tags](#tags) are supported for changing chart units and family.
- When a counter or meter is not collected, Netdata **defaults** to showing a zero value, until a data collection event changes the value.
+ When a counter or meter is not collected, StatsD **defaults** to showing a zero value, until a data collection event changes the value.
-- **Sets**
+- **Sets**
The application sends `name:value|s`, where `value` is anything (**number or text**, leading and trailing spaces are removed), StatsD reports the number of unique values sent and the number of times it was updated (events).
- Sampling rate is **not** supported for Sets. `value` is always considered text.
+ Sampling rate is **not** supported for Sets. `value` is always considered text (so `01` and `1` are considered different).
- When a counter or meter is not collected, Netdata **defaults** to showing a zero value, until a data collection event changes the value.
+ [Tags](#tags) are supported for changing chart units and family.
+
+ When a set is not collected, Netdata **defaults** to showing a zero value, until a data collection event changes the value.
+
+- **Dictionaries**
+
+ The application sends `name:value|d`, where `value` is anything (**number or text**, leading and trailing spaces are removed), StatsD reports the number of events sent for each `value` and the total times `name` was updated (events).
+
+ Sampling rate is **not** supported for Dictionaries. `value` is always considered text (so `01` and `1` are considered different).
+
+ [Tags](#tags) are supported for changing chart units and family.
+
+ When a set is not collected, Netdata **defaults** to showing a zero value, until a data collection event changes the value.
#### Sampling Rates
The application may append `|@sampling_rate`, where `sampling_rate` is a number from `0.0` to `1.0` in order for StatD to extrapolate the value and predict the total for the entire period. If the application reports to StatsD a value for 1/10th of the time, it can append `|@0.1` to the metrics it sends to statsd.
+#### Tags
+
+The application may append `|#tag1:value1,tag2:value2,tag3:value3` etc, where `tagX` and `valueX` are strings. `:valueX` can be omitted.
+
+Currently, Netdata uses only 2 tags:
+
+ * `units=string` which sets the units of the chart that is automatically generated
+ * `family=string` which sets the family of the chart that is automatically generated (the family is the submenu of the dashboard)
+ * `name=string` which sets the name of the dimension of the chart that is automatically generated (only for counters, meters, gauges)
+
+Other tags are parsed, but currently are ignored.
+
+Charts are not updated to change units or dimension names once they are created. So, either send the tags on every event, or use the special `zinit` value to initiaze the charts at the beginning. `zinit` is a special value that can be used on any chart, to have netdata initialize the charts, without actually setting any values to them. So, instead of sending `my.metric:VALUE|c|#units=bytes,name=size` every time, the application can send at the beginning `my.metric:zinit|c|#units=bytes,name=size` and then `my.metric:VALUE|c`.
+
#### Overlapping metrics
-Netdata's StatsD server maintains different indexes for each of the types supported. This means the same metric `name` may exist under different types concurrently.
+Netdata's StatsD server maintains different indexes for each of the metric types supported. This means the same metric `name` may exist under different types concurrently.
+
+#### How to name your metrics
+
+A good practice is to name your metrics like `application.operation.metric`, where:
+
+- `application` is the application name - Netdata will automatically create a dashboard section based on the first keyword of the metrics, so you can have all your applications in different sections.
+- `operation` is the operation your application is executing, like `dbquery`, `request`, `response`, etc.
+- `metric` is anything you want to name your metric as. Netdata will automatically append the metric type (meter, counter, gauge, set, dictionary, timer, histogram) to the generated chart.
+
+Using [Tags](#tags) you can also change the submenus of the dashboard, the units of the charts and for meters, counters and gauges, the name of dimension. So, you can have a usable default view without using [Synthetic StatsD charts](#synthetic-statsd-charts)
#### Multiple metrics per packet
-Netdata accepts multiple metrics per packet if each is terminated with `\n`.
+Netdata accepts multiple metrics per packet if each is terminated with a newline (`\n`) at the end.
#### TCP packets
Netdata listens for both TCP and UDP packets. For TCP, is it important to always append `\n` on each metric, as Netdata will use the newline character to detect if a metric is split into multiple TCP packets.
-On disconnect, Netdata will process the entire buffer, even if it is not terminated with a `\n`.
#### UDP packets
-When sending multiple packets over UDP, it is important not to exceed the network MTU, which is usually 1500 bytes.
+When sending multiple metrics over a single UDP message, it is important not to exceed the network MTU, which is usually 1500 bytes.
Netdata will accept UDP packets up to 9000 bytes, but the underlying network will not exceed MTU.
@@ -122,7 +160,7 @@ You can find the configuration at `/etc/netdata/netdata.conf`:
# private charts memory mode = save
# private charts history = 3996
# histograms and timers percentile (percentThreshold) = 95.00000
- # add dimension for number of events received = yes
+ # add dimension for number of events received = no
# gaps on gauges (deleteGauges) = no
# gaps on counters (deleteCounters) = no
# gaps on meters (deleteMeters) = no
@@ -150,7 +188,7 @@ You can find the configuration at `/etc/netdata/netdata.conf`:
- `update every (flushInterval) = 1` seconds, controls the frequency StatsD will push the collected metrics to Netdata charts.
-- `decimal detail = 1000` controls the number of fractional digits in gauges and histograms. Netdata collects metrics using signed 64 bit integers and their fractional detail is controlled using multipliers and divisors. This setting is used to multiply all collected values to convert them to integers and is also set as the divisors, so that the final data will be a floating point number with this fractional detail (1000 = X.0 - X.999, 10000 = X.0 - X.9999, etc).
+- `decimal detail = 1000` controls the number of fractional digits in gauges and histograms. Netdata collects metrics using signed 64-bit integers and their fractional detail is controlled using multipliers and divisors. This setting is used to multiply all collected values to convert them to integers and is also set as the divisors, so that the final data will be a floating point number with this fractional detail (1000 = X.0 - X.999, 10000 = X.0 - X.9999, etc).
The rest of the settings are discussed below.
@@ -180,10 +218,9 @@ The default behavior is to use the same settings as the rest of the Netdata Agen
### Optimize private metric charts visualization and storage
-
If you have thousands of metrics, each with its own private chart, you may notice that your web browser becomes slow when you view the Netdata dashboard (this is a web browser issue we need to address at the Netdata UI). So, Netdata has a protection to stop creating charts when `max private charts allowed = 200` (soft limit) is reached.
-The metrics above this soft limit are still processed by Netdata and will be available to be sent to backend time-series databases, up to `max private charts hard limit = 1000`. So, between 200 and 1000 charts, Netdata will still generate charts, but they will automatically be created with `memory mode = none` (Netdata will not maintain a database for them). These metrics will be sent to backend time series databases, if the backend configuration is set to `as collected`.
+The metrics above this soft limit are still processed by Netdata, can be used in synthetic charts and will be available to be sent to backend time-series databases, up to `max private charts hard limit = 1000`. So, between 200 and 1000 charts, Netdata will still generate charts, but they will automatically be created with `memory mode = none` (Netdata will not maintain a database for them). These metrics will be sent to backend time series databases, if the backend configuration is set to `as collected`.
Metrics above the hard limit are still collected, but they can only be used in synthetic charts (once a metric is added to chart, it will be sent to backend servers too).
@@ -240,9 +277,6 @@ This is identical to `counter`.
- Format: `name:FLOAT|ms`
- StatsD maintains a list of all the values supplied and provides statistics on them.
-![image](https://cloud.githubusercontent.com/assets/2662304/26131620/acbea6a4-3aa3-11e7-8bdd-4a8996847767.png)
-
-The same chart with the `sum` unselected:
![image](https://cloud.githubusercontent.com/assets/2662304/26131629/bc34f2d2-3aa3-11e7-8a07-f2fc94ba4352.png)
### Synthetic StatsD charts
@@ -369,7 +403,7 @@ Synthetic chart:
![screenshot from 2017-08-03 23-29-14](https://user-images.githubusercontent.com/2662304/28942317-958a2c68-78a3-11e7-853f-32850141dd36.png)
-#### Renaming StatsD metrics
+#### Renaming StatsD synthetic charts' metrics
You can define a dictionary to rename metrics sent by StatsD clients. This enables you to send response `"200"` and Netdata visualize it as `succesful connection`
@@ -438,7 +472,7 @@ You can rename the dimensions with this:
Note that we added a `NAME` to the dimension line with `get.`. This is prefixed to the wildcarded part of the metric name, to compose the key for looking up the dictionary. So `500` became `get.500` which was looked up to the dictionary to find value `500 cannot connect to db`. This way we can have different dimension names, for each of the API methods (i.e. `get.500 = 500 cannot connect to db` while `post.500 = 500 cannot write to disk`).
-To add all API methods to a chart, you can do this:
+To add all 200s across all API methods to a chart, you can do this:
```
[ok_by_method]
@@ -539,44 +573,79 @@ You can also use StatsD with:
### Shell
-Getting the proper support for a programming language is not always easy, but the Unix shell is available on most Unix systems. You can use shell and `nc` to instrument your systems and send metric data to Netdata's StatsD implementation. Here's how:
+Getting the proper support for a programming language is not always easy, but the Unix shell is available on most Unix systems. You can use shell and `nc` to instrument your systems and send metric data to Netdata's StatsD implementation.
+
+Using the method you can send metrics from any script. You can generate events like: backup.started, backup.ended, backup.time, or even tail logs and convert them to metrics.
+
+> **IMPORTANT**:
+>
+> To send StatsD messages you need from the `netcat` package, the `nc` command.
+> There are multiple versions of this package. Please try to experiment with the `nc` command you have available on your right system, to find the right parameters.
+>
+> In the examples below, we assume the `openbsd-netcat` is installed.
+
+If you plan to send short StatsD events at sporadic occasions, use UDP. The messages should not be too long (remember, most networks support up to 1500 bytes MTU, which is also the limit for StatsD messages over UDP). The good thing is that using UDP will not block your script, even if the StatsD server is not there (UDP messages are "fire-and-forget").
+
-The command you need to run is:
+For UDP use this:
```sh
-echo "NAME:VALUE|TYPE" | nc -u --send-only localhost 8125
+echo "APPLICATION.METRIC:VALUE|TYPE" | nc -u -w 0 localhost 8125
```
-Where:
+`-u` turns on UDP, `-w 0` tells `nc` not to wait for a response from StatsD (idle time to close the connection).
-- `NAME` is the metric name
-- `VALUE` is the value for that metric (**gauges** `|g`, **timers** `|ms` and **histograms** `|h` accept decimal/fractional numbers, **counters** `|c` and **meters** `|m` accept integers, **sets** `|s` accept anything)
-- `TYPE` is one of `g`, `ms`, `h`, `c`, `m`, `s` to select the metric type.
+where:
-So, to set `metric1` as gauge to value `10`, use:
+- `APPLICATION` is any name for your application
+- `METRIC` is the name for the specific metric
+- `VALUE` is the value for that metric (**meters**, **counters**, **gauges**, **timers** and **histograms** accept integer/decimal/fractional numbers, **sets** and **dictionaries** accept strings)
+- `TYPE` is one of `m`, `c`, `g`, `ms`, `h`, `s`, `d` to define the metric type.
+
+For tailing a log and converting it to metrics, do something like this:
```sh
-echo "metric1:10|g" | nc -u --send-only localhost 8125
+tail -f some.log | awk 'awk commands to parse the log and format statsd metrics' | nc -N -w 120 localhost 8125
```
-To increment `metric2` by `10`, as a counter, use:
+`-N` tells `nc` to close the socket once it receives EOF on its input. `-w 120` tells `nc` to stop if the connection is idle for 120 seconds. The timeout is needed to stop the `nc` command if you restart Netdata while `nc` is connected to it. Without it, `nc` will sit idle forever.
+
+When you embed the above commands to a script, you may notice that all the metrics are sent to StatsD with a delay. They are buffered in the pipes `|`. You can turn them to real-time by prepending each command with `stdbuf -i0 -oL -eL command to be run`, like this:
```sh
-echo "metric2:10|c" | nc -u --send-only localhost 8125
+stdbuf -i0 -oL -eL tail -f some.log |\
+ stdbuf -i0 -oL -eL awk 'awk commands to parse the log and format statsd metrics' |\
+ stdbuf -i0 -oL -eL nc -N -w 120 localhost 8125
+```
+
+If you use `mawk` you also need to run awk with `-W interactive`.
+
+Examples:
+
+To set `myapp.used_memory` as gauge to value `123456`, use:
+
+```sh
+echo "myapp.used_memory:123456|g|#units:bytes" | nc -u -w 0 localhost 8125
+```
+
+To increment `myapp.files_sent` by `10`, as a counter, use:
+
+```sh
+echo "myapp.files_sent:10|c|#units:files" | nc -u -w 0 localhost 8125
```
You can send multiple metrics like this:
```sh
# send multiple metrics via UDP
-printf "metric1:10|g\nmetric2:10|c\n" | nc -u --send-only localhost 8125
+printf "myapp.used_memory:123456|g|#units:bytes\nmyapp.files_sent:10|c|#units:files\n" | nc -u -w 0 localhost 8125
```
Remember, for UDP communication each packet should not exceed the MTU. So, if you plan to push too many metrics at once, prefer TCP communication:
```sh
# send multiple metrics via TCP
-printf "metric1:10|g\nmetric2:10|c\n" | nc --send-only localhost 8125
+cat /tmp/statsd.metrics.txt | nc -N -w 120 localhost 8125
```
You can also use this little function to take care of all the details:
@@ -584,22 +653,29 @@ You can also use this little function to take care of all the details:
```sh
#!/usr/bin/env bash
+# we assume nc is from the openbsd-netcat package
+
STATSD_HOST="localhost"
STATSD_PORT="8125"
statsd() {
- local udp="-u" all="${*}"
+ local options="-u -w 0" all="${*}"
+
+ # replace all spaces with newlines
+ all="${all// /\\n}"
# if the string length of all parameters given is above 1000, use TCP
- [ "${#all}" -gt 1000 ] && udp=
+ [ "${#all}" -gt 1000 ] && options="-N -w 0"
- while [ ! -z "${1}" ]
- do
- printf "${1}\n"
- shift
- done | nc ${udp} --send-only ${STATSD_HOST} ${STATSD_PORT} || return 1
+ # send the metrics to statsd
+ printf "${all}\n" | nc ${options} ${STATSD_HOST} ${STATSD_PORT} || return 1
return 0
}
+
+if [ ! -z "${*}" ]
+then
+ statsd "${@}"
+fi
```
You can use it like this:
@@ -609,10 +685,15 @@ You can use it like this:
source statsd.sh
# then, at any point:
-StatsD "metric1:10|g" "metric2:10|c" ...
+statsd "myapp.used_memory:123456|g|#units:bytes" "myapp.files_sent:10|c|#units:files" ...
```
-The function is smart enough to call `nc` just once and pass all the metrics to it. It will also automatically switch to TCP if the metrics to send are above 1000 bytes.
-If you have gotten thus far, make sure to check out our [community forums](https://community.netdata.cloud) to share your experience using Netdata with StatsD.
+or even at a terminal prompt, like this:
+```sh
+./statsd.sh "myapp.used_memory:123456|g|#units:bytes" "myapp.files_sent:10|c|#units:files" ...
+```
+The function is smart enough to call `nc` just once and pass all the metrics to it. It will also automatically switch to TCP if the metrics to send are above 1000 bytes.
+
+If you have gotten thus far, make sure to check out our [community forums](https://community.netdata.cloud) to share your experience using Netdata with StatsD.
diff --git a/collectors/statsd.plugin/statsd.c b/collectors/statsd.plugin/statsd.c
index a630d00d0..63e3316cb 100644
--- a/collectors/statsd.plugin/statsd.c
+++ b/collectors/statsd.plugin/statsd.c
@@ -9,31 +9,24 @@
#define STATSD_LISTEN_PORT 8125
#define STATSD_LISTEN_BACKLOG 4096
+#define WORKER_JOB_TYPE_TCP_CONNECTED 0
+#define WORKER_JOB_TYPE_TCP_DISCONNECTED 1
+#define WORKER_JOB_TYPE_RCV_DATA 2
+#define WORKER_JOB_TYPE_SND_DATA 3
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 4
+#error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least 4
+#endif
+
// --------------------------------------------------------------------------------------
// #define STATSD_MULTITHREADED 1
#ifdef STATSD_MULTITHREADED
// DO NOT ENABLE MULTITHREADING - IT IS NOT WELL TESTED
-#define STATSD_AVL_TREE avl_tree_lock
-#define STATSD_AVL_INSERT avl_insert_lock
-#define STATSD_AVL_SEARCH avl_search_lock
-#define STATSD_AVL_INDEX_INIT { .avl_tree = { NULL, statsd_metric_compare }, .rwlock = AVL_LOCK_INITIALIZER }
-#define STATSD_FIRST_PTR_MUTEX netdata_mutex_t first_mutex
-#define STATSD_FIRST_PTR_MUTEX_INIT .first_mutex = NETDATA_MUTEX_INITIALIZER
-#define STATSD_FIRST_PTR_MUTEX_LOCK(index) netdata_mutex_lock(&((index)->first_mutex))
-#define STATSD_FIRST_PTR_MUTEX_UNLOCK(index) netdata_mutex_unlock(&((index)->first_mutex))
-#define STATSD_DICTIONARY_OPTIONS DICTIONARY_FLAG_DEFAULT
+#define STATSD_DICTIONARY_OPTIONS DICTIONARY_FLAG_DONT_OVERWRITE_VALUE|DICTIONARY_FLAG_ADD_IN_FRONT
#else
-#define STATSD_AVL_TREE avl_tree_type
-#define STATSD_AVL_INSERT avl_insert
-#define STATSD_AVL_SEARCH avl_search
-#define STATSD_AVL_INDEX_INIT { .root = NULL, .compar = statsd_metric_compare }
-#define STATSD_FIRST_PTR_MUTEX
-#define STATSD_FIRST_PTR_MUTEX_INIT
-#define STATSD_FIRST_PTR_MUTEX_LOCK(index)
-#define STATSD_FIRST_PTR_MUTEX_UNLOCK(index)
-#define STATSD_DICTIONARY_OPTIONS DICTIONARY_FLAG_SINGLE_THREADED
+#define STATSD_DICTIONARY_OPTIONS DICTIONARY_FLAG_DONT_OVERWRITE_VALUE|DICTIONARY_FLAG_ADD_IN_FRONT|DICTIONARY_FLAG_SINGLE_THREADED
#endif
#define STATSD_DECIMAL_DETAIL 1000 // floating point values get multiplied by this, with the same divisor
@@ -67,7 +60,7 @@ typedef struct statsd_histogram_extensions {
RRDDIM *rd_percentile;
RRDDIM *rd_median;
RRDDIM *rd_stddev;
- RRDDIM *rd_sum;
+ //RRDDIM *rd_sum;
size_t size;
size_t used;
@@ -83,6 +76,16 @@ typedef struct statsd_metric_set {
size_t unique;
} STATSD_METRIC_SET;
+typedef struct statsd_metric_dictionary_item {
+ size_t count;
+ RRDDIM *rd;
+} STATSD_METRIC_DICTIONARY_ITEM;
+
+typedef struct statsd_metric_dictionary {
+ DICTIONARY *dict;
+ size_t unique;
+} STATSD_METRIC_DICTIONARY;
+
// --------------------------------------------------------------------------------------------------------------------
// this is a metric - for all types of metrics
@@ -97,6 +100,7 @@ typedef enum statsd_metric_options {
STATSD_METRIC_OPTION_USED_IN_APPS = 0x00000020, // set when this metric is used in apps
STATSD_METRIC_OPTION_CHECKED = 0x00000040, // set when the charting thread checks this metric for use in charts (its usefulness)
STATSD_METRIC_OPTION_USEFUL = 0x00000080, // set when the charting thread finds the metric useful (i.e. used in a chart)
+ STATSD_METRIC_OPTION_COLLECTION_FULL_LOGGED = 0x00000100, // set when the collection is full for this metric
} STATS_METRIC_OPTIONS;
typedef enum statsd_metric_type {
@@ -105,14 +109,13 @@ typedef enum statsd_metric_type {
STATSD_METRIC_TYPE_METER,
STATSD_METRIC_TYPE_TIMER,
STATSD_METRIC_TYPE_HISTOGRAM,
- STATSD_METRIC_TYPE_SET
+ STATSD_METRIC_TYPE_SET,
+ STATSD_METRIC_TYPE_DICTIONARY
} STATSD_METRIC_TYPE;
typedef struct statsd_metric {
- avl_t avl; // indexing - has to be first
-
- const char *name; // the name of the metric
+ const char *name; // the name of the metric - linked to dictionary name
uint32_t hash; // hash of the name
STATSD_METRIC_TYPE type;
@@ -127,8 +130,13 @@ typedef struct statsd_metric {
STATSD_METRIC_COUNTER counter;
STATSD_METRIC_HISTOGRAM histogram;
STATSD_METRIC_SET set;
+ STATSD_METRIC_DICTIONARY dictionary;
};
+ char *units;
+ char *dimname;
+ char *family;
+
// chart related members
STATS_METRIC_OPTIONS options; // STATSD_METRIC_OPTION_* (bitfield)
char reset; // set to 1 by the charting thread to instruct the collector thread(s) to reset this metric
@@ -138,7 +146,6 @@ typedef struct statsd_metric {
RRDDIM *rd_count; // the dimension for the number of events received
// linking, used for walking through all metrics
- struct statsd_metric *next;
struct statsd_metric *next_useful;
} STATSD_METRIC;
@@ -152,17 +159,14 @@ typedef struct statsd_index {
size_t metrics; // the number of metrics in this index
size_t useful; // the number of useful metrics in this index
- STATSD_AVL_TREE index; // the AVL tree
+ STATSD_METRIC_TYPE type; // the type of index
+ DICTIONARY *dict;
- STATSD_METRIC *first; // the linked list of metrics (new metrics are added in front)
STATSD_METRIC *first_useful; // the linked list of useful metrics (new metrics are added in front)
- STATSD_FIRST_PTR_MUTEX; // when multi-threading is enabled, a lock to protect the linked list
STATS_METRIC_OPTIONS default_options; // default options for all metrics in this index
} STATSD_INDEX;
-static int statsd_metric_compare(void* a, void* b);
-
// --------------------------------------------------------------------------------------------------------------------
// synthetic charts
@@ -237,10 +241,6 @@ struct collection_thread_status {
size_t max_sockets;
netdata_thread_t thread;
- struct rusage rusage;
- RRDSET *st_cpu;
- RRDDIM *rd_user;
- RRDDIM *rd_system;
};
static struct statsd {
@@ -250,6 +250,8 @@ static struct statsd {
STATSD_INDEX histograms;
STATSD_INDEX meters;
STATSD_INDEX sets;
+ STATSD_INDEX dictionaries;
+
size_t unknown_types;
size_t socket_errors;
size_t tcp_socket_connects;
@@ -280,6 +282,7 @@ static struct statsd {
size_t histogram_increase_step;
double histogram_percentile;
char *histogram_percentile_str;
+ size_t dictionary_max_unique;
int threads;
struct collection_thread_status *collection_threads_status;
@@ -297,55 +300,57 @@ static struct statsd {
.name = "gauge",
.events = 0,
.metrics = 0,
- .index = STATSD_AVL_INDEX_INIT,
- .default_options = STATSD_METRIC_OPTION_NONE,
- .first = NULL,
- STATSD_FIRST_PTR_MUTEX_INIT
+ .dict = NULL,
+ .type = STATSD_METRIC_TYPE_GAUGE,
+ .default_options = STATSD_METRIC_OPTION_NONE
},
.counters = {
.name = "counter",
.events = 0,
.metrics = 0,
- .index = STATSD_AVL_INDEX_INIT,
- .default_options = STATSD_METRIC_OPTION_NONE,
- .first = NULL,
- STATSD_FIRST_PTR_MUTEX_INIT
+ .dict = NULL,
+ .type = STATSD_METRIC_TYPE_COUNTER,
+ .default_options = STATSD_METRIC_OPTION_NONE
},
.timers = {
.name = "timer",
.events = 0,
.metrics = 0,
- .index = STATSD_AVL_INDEX_INIT,
- .default_options = STATSD_METRIC_OPTION_NONE,
- .first = NULL,
- STATSD_FIRST_PTR_MUTEX_INIT
+ .dict = NULL,
+ .type = STATSD_METRIC_TYPE_TIMER,
+ .default_options = STATSD_METRIC_OPTION_NONE
},
.histograms = {
.name = "histogram",
.events = 0,
.metrics = 0,
- .index = STATSD_AVL_INDEX_INIT,
- .default_options = STATSD_METRIC_OPTION_NONE,
- .first = NULL,
- STATSD_FIRST_PTR_MUTEX_INIT
+ .dict = NULL,
+ .type = STATSD_METRIC_TYPE_HISTOGRAM,
+ .default_options = STATSD_METRIC_OPTION_NONE
},
.meters = {
.name = "meter",
.events = 0,
.metrics = 0,
- .index = STATSD_AVL_INDEX_INIT,
- .default_options = STATSD_METRIC_OPTION_NONE,
- .first = NULL,
- STATSD_FIRST_PTR_MUTEX_INIT
+ .dict = NULL,
+ .type = STATSD_METRIC_TYPE_METER,
+ .default_options = STATSD_METRIC_OPTION_NONE
},
.sets = {
.name = "set",
.events = 0,
.metrics = 0,
- .index = STATSD_AVL_INDEX_INIT,
- .default_options = STATSD_METRIC_OPTION_NONE,
- .first = NULL,
- STATSD_FIRST_PTR_MUTEX_INIT
+ .dict = NULL,
+ .type = STATSD_METRIC_TYPE_SET,
+ .default_options = STATSD_METRIC_OPTION_NONE
+ },
+ .dictionaries = {
+ .name = "dictionary",
+ .events = 0,
+ .metrics = 0,
+ .dict = NULL,
+ .type = STATSD_METRIC_TYPE_DICTIONARY,
+ .default_options = STATSD_METRIC_OPTION_NONE
},
.tcp_idle_timeout = 600,
@@ -353,6 +358,7 @@ static struct statsd {
.apps = NULL,
.histogram_percentile = 95.0,
.histogram_increase_step = 10,
+ .dictionary_max_unique = 200,
.threads = 0,
.collection_threads_status = NULL,
.sockets = {
@@ -368,54 +374,54 @@ static struct statsd {
// --------------------------------------------------------------------------------------------------------------------
// statsd index management - add/find metrics
-static int statsd_metric_compare(void* a, void* b) {
- if(((STATSD_METRIC *)a)->hash < ((STATSD_METRIC *)b)->hash) return -1;
- else if(((STATSD_METRIC *)a)->hash > ((STATSD_METRIC *)b)->hash) return 1;
- else return strcmp(((STATSD_METRIC *)a)->name, ((STATSD_METRIC *)b)->name);
-}
+static void dictionary_metric_insert_callback(const char *name, void *value, void *data) {
+ STATSD_INDEX *index = (STATSD_INDEX *)data;
+ STATSD_METRIC *m = (STATSD_METRIC *)value;
+
+ debug(D_STATSD, "Creating new %s metric '%s'", index->name, name);
-static inline STATSD_METRIC *statsd_metric_index_find(STATSD_INDEX *index, const char *name, uint32_t hash) {
- STATSD_METRIC tmp;
- tmp.name = name;
- tmp.hash = (hash)?hash:simple_hash(tmp.name);
+ m->name = name;
+ m->hash = simple_hash(name);
+ m->type = index->type;
+ m->options = index->default_options;
+
+ if (m->type == STATSD_METRIC_TYPE_HISTOGRAM || m->type == STATSD_METRIC_TYPE_TIMER) {
+ m->histogram.ext = callocz(1,sizeof(STATSD_METRIC_HISTOGRAM_EXTENSIONS));
+ netdata_mutex_init(&m->histogram.ext->mutex);
+ }
- return (STATSD_METRIC *)STATSD_AVL_SEARCH(&index->index, (avl_t *)&tmp);
+ __atomic_fetch_add(&index->metrics, 1, __ATOMIC_SEQ_CST);
}
-static inline STATSD_METRIC *statsd_find_or_add_metric(STATSD_INDEX *index, const char *name, STATSD_METRIC_TYPE type) {
- debug(D_STATSD, "searching for metric '%s' under '%s'", name, index->name);
+static void dictionary_metric_delete_callback(const char *name, void *value, void *data) {
+ (void)data; // STATSD_INDEX *index = (STATSD_INDEX *)data;
+ (void)name;
+ STATSD_METRIC *m = (STATSD_METRIC *)value;
- uint32_t hash = simple_hash(name);
+ if(m->type == STATSD_METRIC_TYPE_HISTOGRAM || m->type == STATSD_METRIC_TYPE_TIMER) {
+ freez(m->histogram.ext);
+ m->histogram.ext = NULL;
+ }
- STATSD_METRIC *m = statsd_metric_index_find(index, name, hash);
- if(unlikely(!m)) {
- debug(D_STATSD, "Creating new %s metric '%s'", index->name, name);
+ freez(m->units);
+ freez(m->family);
+ freez(m->dimname);
+}
- m = (STATSD_METRIC *)callocz(sizeof(STATSD_METRIC), 1);
- m->name = strdupz(name);
- m->hash = hash;
- m->type = type;
- m->options = index->default_options;
+static inline STATSD_METRIC *statsd_find_or_add_metric(STATSD_INDEX *index, const char *name) {
+ debug(D_STATSD, "searching for metric '%s' under '%s'", name, index->name);
- if(type == STATSD_METRIC_TYPE_HISTOGRAM || type == STATSD_METRIC_TYPE_TIMER) {
- m->histogram.ext = callocz(sizeof(STATSD_METRIC_HISTOGRAM_EXTENSIONS), 1);
- netdata_mutex_init(&m->histogram.ext->mutex);
- }
- STATSD_METRIC *n = (STATSD_METRIC *)STATSD_AVL_INSERT(&index->index, (avl_t *)m);
- if(unlikely(n != m)) {
- freez((void *)m->histogram.ext);
- freez((void *)m->name);
- freez((void *)m);
- m = n;
- }
- else {
- STATSD_FIRST_PTR_MUTEX_LOCK(index);
- index->metrics++;
- m->next = index->first;
- index->first = m;
- STATSD_FIRST_PTR_MUTEX_UNLOCK(index);
- }
- }
+#ifdef STATSD_MULTITHREADED
+ // avoid the write lock of dictionary_set() for existing metrics
+ STATSD_METRIC *m = dictionary_get(index->dict, name);
+ if(!m) m = dictionary_set(index->dict, name, NULL, sizeof(STATSD_METRIC));
+#else
+ // no locks here, go faster
+ // this will call the dictionary_metric_insert_callback() if an item
+ // is inserted, otherwise it will return the existing one.
+ // We used the flag DICTIONARY_FLAG_DONT_OVERWRITE_VALUE to support this.
+ STATSD_METRIC *m = dictionary_set(index->dict, name, NULL, sizeof(STATSD_METRIC));
+#endif
index->events++;
return m;
@@ -569,6 +575,13 @@ static inline void statsd_process_histogram_or_timer(STATSD_METRIC *m, const cha
#define statsd_process_timer(m, value, sampling) statsd_process_histogram_or_timer(m, value, sampling, "timer")
#define statsd_process_histogram(m, value, sampling) statsd_process_histogram_or_timer(m, value, sampling, "histogram")
+static void dictionary_metric_set_value_insert_callback(const char *name, void *value, void *data) {
+ (void)name;
+ (void)value;
+ STATSD_METRIC *m = (STATSD_METRIC *)data;
+ m->set.unique++;
+}
+
static inline void statsd_process_set(STATSD_METRIC *m, const char *value) {
if(!is_metric_useful_for_collection(m)) return;
@@ -580,13 +593,14 @@ static inline void statsd_process_set(STATSD_METRIC *m, const char *value) {
if(unlikely(m->reset)) {
if(likely(m->set.dict)) {
dictionary_destroy(m->set.dict);
+ dictionary_register_insert_callback(m->set.dict, dictionary_metric_set_value_insert_callback, m);
m->set.dict = NULL;
}
statsd_reset_metric(m);
}
if (unlikely(!m->set.dict)) {
- m->set.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS | DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE);
+ m->set.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
m->set.unique = 0;
}
@@ -594,12 +608,56 @@ static inline void statsd_process_set(STATSD_METRIC *m, const char *value) {
// magic loading of metric, without affecting anything
}
else {
- void *t = dictionary_get(m->set.dict, value);
+#ifdef STATSD_MULTITHREADED
+ // avoid the write lock to check if something is already there
+ if(!dictionary_get(m->set.dict, value))
+ dictionary_set(m->set.dict, value, NULL, 0);
+#else
+ dictionary_set(m->set.dict, value, NULL, 0);
+#endif
+ m->events++;
+ m->count++;
+ }
+}
+
+static void dictionary_metric_dict_value_insert_callback(const char *name, void *value, void *data) {
+ (void)name;
+ (void)value;
+ STATSD_METRIC *m = (STATSD_METRIC *)data;
+ m->dictionary.unique++;
+}
+
+static inline void statsd_process_dictionary(STATSD_METRIC *m, const char *value) {
+ if(!is_metric_useful_for_collection(m)) return;
+
+ if(unlikely(!value || !*value)) {
+ error("STATSD: metric of type set, with empty value is ignored.");
+ return;
+ }
+
+ if(unlikely(m->reset))
+ statsd_reset_metric(m);
+
+ if (unlikely(!m->dictionary.dict)) {
+ m->dictionary.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ dictionary_register_insert_callback(m->dictionary.dict, dictionary_metric_dict_value_insert_callback, m);
+ m->dictionary.unique = 0;
+ }
+
+ if(unlikely(value_is_zinit(value))) {
+ // magic loading of metric, without affecting anything
+ }
+ else {
+ STATSD_METRIC_DICTIONARY_ITEM *t = (STATSD_METRIC_DICTIONARY_ITEM *)dictionary_get(m->dictionary.dict, value);
+
if (unlikely(!t)) {
- dictionary_set(m->set.dict, value, NULL, 1);
- m->set.unique++;
+ if(!t && m->dictionary.unique >= statsd.dictionary_max_unique)
+ value = "other";
+
+ t = (STATSD_METRIC_DICTIONARY_ITEM *)dictionary_set(m->dictionary.dict, value, NULL, sizeof(STATSD_METRIC_DICTIONARY_ITEM));
}
+ t->count++;
m->events++;
m->count++;
}
@@ -609,85 +667,125 @@ static inline void statsd_process_set(STATSD_METRIC *m, const char *value) {
// --------------------------------------------------------------------------------------------------------------------
// statsd parsing
-static void statsd_process_metric(const char *name, const char *value, const char *type, const char *sampling, const char *tags) {
- (void)tags;
+static inline const char *statsd_parse_skip_up_to(const char *s, char d1, char d2, char d3) {
+ char c;
+
+ for(c = *s; c && c != d1 && c != d2 && c != d3 && c != '\r' && c != '\n'; c = *++s) ;
+ return s;
+}
+
+const char *statsd_parse_skip_spaces(const char *s) {
+ char c;
+
+ for(c = *s; c && ( c == ' ' || c == '\t' || c == '\r' || c == '\n' ); c = *++s) ;
+
+ return s;
+}
+
+static inline const char *statsd_parse_field_trim(const char *start, char *end) {
+ if(unlikely(!start || !*start)) {
+ start = end;
+ return start;
+ }
+
+ while(start <= end && (*start == ' ' || *start == '\t'))
+ start++;
+
+ *end = '\0';
+ end--;
+ while(end >= start && (*end == ' ' || *end == '\t'))
+ *end-- = '\0';
+
+ return start;
+}
+
+static void statsd_process_metric(const char *name, const char *value, const char *type, const char *sampling, const char *tags) {
debug(D_STATSD, "STATSD: raw metric '%s', value '%s', type '%s', sampling '%s', tags '%s'", name?name:"(null)", value?value:"(null)", type?type:"(null)", sampling?sampling:"(null)", tags?tags:"(null)");
if(unlikely(!name || !*name)) return;
if(unlikely(!type || !*type)) type = "m";
- char t0 = type[0], t1 = type[1];
+ STATSD_METRIC *m = NULL;
+ char t0 = type[0], t1 = type[1];
if(unlikely(t0 == 'g' && t1 == '\0')) {
statsd_process_gauge(
- statsd_find_or_add_metric(&statsd.gauges, name, STATSD_METRIC_TYPE_GAUGE),
- value, sampling);
+ m = statsd_find_or_add_metric(&statsd.gauges, name),
+ value, sampling);
}
else if(unlikely((t0 == 'c' || t0 == 'C') && t1 == '\0')) {
// etsy/statsd uses 'c'
// brubeck uses 'C'
statsd_process_counter(
- statsd_find_or_add_metric(&statsd.counters, name, STATSD_METRIC_TYPE_COUNTER),
- value, sampling);
+ m = statsd_find_or_add_metric(&statsd.counters, name),
+ value, sampling);
}
else if(unlikely(t0 == 'm' && t1 == '\0')) {
statsd_process_meter(
- statsd_find_or_add_metric(&statsd.meters, name, STATSD_METRIC_TYPE_METER),
- value, sampling);
+ m = statsd_find_or_add_metric(&statsd.meters, name),
+ value, sampling);
}
else if(unlikely(t0 == 'h' && t1 == '\0')) {
statsd_process_histogram(
- statsd_find_or_add_metric(&statsd.histograms, name, STATSD_METRIC_TYPE_HISTOGRAM),
- value, sampling);
+ m = statsd_find_or_add_metric(&statsd.histograms, name),
+ value, sampling);
}
else if(unlikely(t0 == 's' && t1 == '\0')) {
statsd_process_set(
- statsd_find_or_add_metric(&statsd.sets, name, STATSD_METRIC_TYPE_SET),
- value);
+ m = statsd_find_or_add_metric(&statsd.sets, name),
+ value);
+ }
+ else if(unlikely(t0 == 'd' && t1 == '\0')) {
+ statsd_process_dictionary(
+ m = statsd_find_or_add_metric(&statsd.dictionaries, name),
+ value);
}
else if(unlikely(t0 == 'm' && t1 == 's' && type[2] == '\0')) {
statsd_process_timer(
- statsd_find_or_add_metric(&statsd.timers, name, STATSD_METRIC_TYPE_TIMER),
- value, sampling);
+ m = statsd_find_or_add_metric(&statsd.timers, name),
+ value, sampling);
}
else {
statsd.unknown_types++;
error("STATSD: metric '%s' with value '%s' is sent with unknown metric type '%s'", name, value?value:"", type);
}
-}
-
-static inline const char *statsd_parse_skip_up_to(const char *s, char d1, char d2) {
- char c;
- for(c = *s; c && c != d1 && c != d2 && c != '\r' && c != '\n'; c = *++s) ;
-
- return s;
-}
+ if(m && tags && *tags) {
+ const char *s = tags;
+ while(*s) {
+ const char *tagkey = NULL, *tagvalue = NULL;
+ char *tagkey_end = NULL, *tagvalue_end = NULL;
-const char *statsd_parse_skip_spaces(const char *s) {
- char c;
+ s = tagkey_end = (char *)statsd_parse_skip_up_to(tagkey = s, ':', '=', ',');
+ if(tagkey == tagkey_end) {
+ if (*s) {
+ s++;
+ s = statsd_parse_skip_spaces(s);
+ }
+ continue;
+ }
- for(c = *s; c && ( c == ' ' || c == '\t' || c == '\r' || c == '\n' ); c = *++s) ;
+ if(likely(*s == ':' || *s == '='))
+ s = tagvalue_end = (char *) statsd_parse_skip_up_to(tagvalue = ++s, ',', '\0', '\0');
- return s;
-}
+ if(*s == ',') s++;
-static inline const char *statsd_parse_field_trim(const char *start, char *end) {
- if(unlikely(!start)) {
- start = end;
- return start;
- }
+ statsd_parse_field_trim(tagkey, tagkey_end);
+ statsd_parse_field_trim(tagvalue, tagvalue_end);
- while(start <= end && (*start == ' ' || *start == '\t'))
- start++;
+ if(tagkey && *tagkey && tagvalue && *tagvalue) {
+ if (!m->units && strcmp(tagkey, "units") == 0)
+ m->units = strdupz(tagvalue);
- *end = '\0';
- end--;
- while(end >= start && (*end == ' ' || *end == '\t'))
- *end-- = '\0';
+ if (!m->dimname && strcmp(tagkey, "name") == 0)
+ m->dimname = strdupz(tagvalue);
- return start;
+ if (!m->family && strcmp(tagkey, "family") == 0)
+ m->family = strdupz(tagvalue);
+ }
+ }
+ }
}
static inline size_t statsd_process(char *buffer, size_t size, int require_newlines) {
@@ -699,7 +797,7 @@ static inline size_t statsd_process(char *buffer, size_t size, int require_newli
const char *name = NULL, *value = NULL, *type = NULL, *sampling = NULL, *tags = NULL;
char *name_end = NULL, *value_end = NULL, *type_end = NULL, *sampling_end = NULL, *tags_end = NULL;
- s = name_end = (char *)statsd_parse_skip_up_to(name = s, ':', '|');
+ s = name_end = (char *)statsd_parse_skip_up_to(name = s, ':', '=', '|');
if(name == name_end) {
if (*s) {
s++;
@@ -708,20 +806,27 @@ static inline size_t statsd_process(char *buffer, size_t size, int require_newli
continue;
}
- if(likely(*s == ':'))
- s = value_end = (char *) statsd_parse_skip_up_to(value = ++s, '|', '|');
+ if(likely(*s == ':' || *s == '='))
+ s = value_end = (char *) statsd_parse_skip_up_to(value = ++s, '|', '@', '#');
if(likely(*s == '|'))
- s = type_end = (char *) statsd_parse_skip_up_to(type = ++s, '|', '@');
+ s = type_end = (char *) statsd_parse_skip_up_to(type = ++s, '|', '@', '#');
- if(likely(*s == '|' || *s == '@')) {
- s = sampling_end = (char *) statsd_parse_skip_up_to(sampling = ++s, '|', '#');
- if(*sampling == '@') sampling++;
- }
+ while(*s == '|' || *s == '@' || *s == '#') {
+ // parse all the fields that may be appended
- if(likely(*s == '|' || *s == '#')) {
- s = tags_end = (char *) statsd_parse_skip_up_to(tags = ++s, '|', '|');
- if(*tags == '#') tags++;
+ if ((*s == '|' && s[1] == '@') || *s == '@') {
+ s = sampling_end = (char *)statsd_parse_skip_up_to(sampling = ++s, '|', '@', '#');
+ if (*sampling == '@') sampling++;
+ }
+ else if ((*s == '|' && s[1] == '#') || *s == '#') {
+ s = tags_end = (char *)statsd_parse_skip_up_to(tags = ++s, '|', '@', '#');
+ if (*tags == '#') tags++;
+ }
+ else {
+ // unknown field, skip it
+ s = (char *)statsd_parse_skip_up_to(++s, '|', '@', '#');
+ }
}
// skip everything until the end of the line
@@ -788,6 +893,7 @@ static void *statsd_add_callback(POLLINFO *pi, short int *events, void *data) {
(void)pi;
(void)data;
+ worker_is_busy(WORKER_JOB_TYPE_TCP_CONNECTED);
*events = POLLIN;
struct statsd_tcp *t = (struct statsd_tcp *)callocz(sizeof(struct statsd_tcp) + STATSD_TCP_BUFFER_SIZE, 1);
@@ -796,11 +902,14 @@ static void *statsd_add_callback(POLLINFO *pi, short int *events, void *data) {
statsd.tcp_socket_connects++;
statsd.tcp_socket_connected++;
+ worker_is_idle();
return t;
}
// TCP client disconnected
static void statsd_del_callback(POLLINFO *pi) {
+ worker_is_busy(WORKER_JOB_TYPE_TCP_DISCONNECTED);
+
struct statsd_tcp *t = pi->data;
if(likely(t)) {
@@ -818,10 +927,15 @@ static void statsd_del_callback(POLLINFO *pi) {
freez(t);
}
+
+ worker_is_idle();
}
// Receive data
static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
+ int retval = -1;
+ worker_is_busy(WORKER_JOB_TYPE_RCV_DATA);
+
*events = POLLIN;
int fd = pi->fd;
@@ -832,14 +946,16 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
if(unlikely(!d)) {
error("STATSD: internal error: expected TCP data pointer is NULL");
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(d->type != STATSD_SOCKET_DATA_TYPE_TCP)) {
error("STATSD: internal error: socket data type should be %d, but it is %d", (int)STATSD_SOCKET_DATA_TYPE_TCP, (int)d->type);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
#endif
@@ -872,8 +988,10 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
d->len = statsd_process(d->buffer, d->len, 1);
}
- if(unlikely(ret == -1))
- return -1;
+ if(unlikely(ret == -1)) {
+ retval = -1;
+ goto cleanup;
+ }
} while (rc != -1);
break;
@@ -884,14 +1002,16 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
if(unlikely(!d)) {
error("STATSD: internal error: expected UDP data pointer is NULL");
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
#ifdef NETDATA_INTERNAL_CHECKS
if(unlikely(d->type != STATSD_SOCKET_DATA_TYPE_UDP)) {
error("STATSD: internal error: socket data should be %d, but it is %d", (int)d->type, (int)STATSD_SOCKET_DATA_TYPE_UDP);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
#endif
@@ -904,7 +1024,8 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
if (errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) {
error("STATSD: recvmmsg() on UDP socket %d failed.", fd);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
} else if (rc) {
// data received
@@ -929,7 +1050,8 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
if (errno != EWOULDBLOCK && errno != EAGAIN && errno != EINTR) {
error("STATSD: recv() on UDP socket %d failed.", fd);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
} else if (rc) {
// data received
@@ -947,24 +1069,26 @@ static int statsd_rcv_callback(POLLINFO *pi, short int *events) {
default: {
error("STATSD: internal error: unknown socktype %d on socket %d", pi->socktype, fd);
statsd.socket_errors++;
- return -1;
+ retval = -1;
+ goto cleanup;
}
}
- return 0;
+ retval = 0;
+cleanup:
+ worker_is_idle();
+ return retval;
}
static int statsd_snd_callback(POLLINFO *pi, short int *events) {
(void)pi;
(void)events;
+ worker_is_busy(WORKER_JOB_TYPE_SND_DATA);
error("STATSD: snd_callback() called, but we never requested to send data to statsd clients.");
- return -1;
-}
+ worker_is_idle();
-static void statsd_timer_callback(void *timer_data) {
- struct collection_thread_status *status = timer_data;
- getrusage(RUSAGE_THREAD, &status->rusage);
+ return -1;
}
// --------------------------------------------------------------------------------------------------------------------
@@ -986,12 +1110,19 @@ void statsd_collector_thread_cleanup(void *data) {
#endif
freez(d);
+ worker_unregister();
}
void *statsd_collector_thread(void *ptr) {
struct collection_thread_status *status = ptr;
status->status = 1;
+ worker_register("STATSD");
+ worker_register_job_name(WORKER_JOB_TYPE_TCP_CONNECTED, "tcp connect");
+ worker_register_job_name(WORKER_JOB_TYPE_TCP_DISCONNECTED, "tcp disconnect");
+ worker_register_job_name(WORKER_JOB_TYPE_RCV_DATA, "receive");
+ worker_register_job_name(WORKER_JOB_TYPE_SND_DATA, "send");
+
info("STATSD collector thread started with taskid %d", gettid());
struct statsd_udp *d = callocz(sizeof(struct statsd_udp), 1);
@@ -1019,7 +1150,7 @@ void *statsd_collector_thread(void *ptr) {
, statsd_del_callback
, statsd_rcv_callback
, statsd_snd_callback
- , statsd_timer_callback
+ , NULL
, NULL // No access control pattern
, 0 // No dns lookups for access control pattern
, (void *)d
@@ -1413,23 +1544,50 @@ static inline void statsd_readdir(const char *user_path, const char *stock_path,
// send metrics to netdata - in private charts - called from the main thread
// extract chart type and chart id from metric name
-static inline void statsd_get_metric_type_and_id(STATSD_METRIC *m, char *type, char *id, const char *defid, size_t len) {
- char *s;
+static inline void statsd_get_metric_type_and_id(STATSD_METRIC *m, char *type, char *id, char *context, const char *metrictype, size_t len) {
+
+ // The full chart type.id looks like this:
+ // ${STATSD_CHART_PREFIX} + "_" + ${METRIC_NAME} + "_" + ${METRIC_TYPE}
+ //
+ // where:
+ // STATSD_CHART_PREFIX = "statsd" as defined above
+ // METRIC_NAME = whatever the user gave to statsd
+ // METRIC_TYPE = "gauge", "counter", "meter", "timer", "histogram", "set", "dictionary"
+
+ // for chart type, we want:
+ // ${STATSD_CHART_PREFIX} + "_" + the first word of ${METRIC_NAME}
+
+ // find the first word of ${METRIC_NAME}
+ char firstword[len + 1], *s = "";
+ strncpyz(firstword, m->name, len);
+ for (s = firstword; *s ; s++) {
+ if (unlikely(*s == '.' || *s == '_')) {
+ *s = '\0';
+ s++;
+ break;
+ }
+ }
+ // firstword has the first word of ${METRIC_NAME}
+ // s has the remaining, if any
- snprintfz(type, len, "%s_%s_%s", STATSD_CHART_PREFIX, defid, m->name);
- for(s = type; *s ;s++)
- if(unlikely(*s == '.')) break;
+ // create the chart type:
+ snprintfz(type, len, STATSD_CHART_PREFIX "_%s", firstword);
- if(*s == '.') {
- *s++ = '\0';
- strncpyz(id, s, len);
- }
- else {
- strncpyz(id, defid, len);
- }
+ // for chart id, we want:
+ // the remaining of the words of ${METRIC_NAME} + "_" + ${METRIC_TYPE}
+ // or the ${METRIC_NAME} has no remaining words, the ${METRIC_TYPE} alone
+ if(*s)
+ snprintfz(id, len, "%s_%s", s, metrictype);
+ else
+ snprintfz(id, len, "%s", metrictype);
+
+ // for the context, we want the full of both the above, separated with a dot (type.id):
+ snprintfz(context, RRD_ID_LENGTH_MAX, "%s.%s", type, id);
+ // make sure they don't have illegal characters
netdata_fix_chart_id(type);
netdata_fix_chart_id(id);
+ netdata_fix_chart_id(context);
}
static inline RRDSET *statsd_private_rrdset_create(
@@ -1486,11 +1644,8 @@ static inline void statsd_private_chart_gauge(STATSD_METRIC *m) {
debug(D_STATSD, "updating private chart for gauge metric '%s'", m->name);
if(unlikely(!m->st)) {
- char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1];
- statsd_get_metric_type_and_id(m, type, id, "gauge", RRD_ID_LENGTH_MAX);
-
- char context[RRD_ID_LENGTH_MAX + 1];
- snprintfz(context, RRD_ID_LENGTH_MAX, "statsd_gauge.%s", m->name);
+ char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1], context[RRD_ID_LENGTH_MAX + 1];
+ statsd_get_metric_type_and_id(m, type, id, context, "gauge", RRD_ID_LENGTH_MAX);
char title[RRD_ID_LENGTH_MAX + 1];
snprintfz(title, RRD_ID_LENGTH_MAX, "statsd private chart for gauge %s", m->name);
@@ -1500,16 +1655,16 @@ static inline void statsd_private_chart_gauge(STATSD_METRIC *m) {
, type
, id
, NULL // name
- , "gauges" // family (submenu)
+ , m->family?m->family:"gauges" // family (submenu)
, context // context
, title // title
- , "value" // units
+ , m->units?m->units:"value" // units
, NETDATA_CHART_PRIO_STATSD_PRIVATE
, statsd.update_every
, RRDSET_TYPE_LINE
);
- m->rd_value = rrddim_add(m->st, "gauge", NULL, 1, statsd.decimal_detail, RRD_ALGORITHM_ABSOLUTE);
+ m->rd_value = rrddim_add(m->st, "gauge", m->dimname?m->dimname:NULL, 1, statsd.decimal_detail, RRD_ALGORITHM_ABSOLUTE);
if(m->options & STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT)
m->rd_count = rrddim_add(m->st, "events", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
@@ -1528,11 +1683,8 @@ static inline void statsd_private_chart_counter_or_meter(STATSD_METRIC *m, const
debug(D_STATSD, "updating private chart for %s metric '%s'", dim, m->name);
if(unlikely(!m->st)) {
- char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1];
- statsd_get_metric_type_and_id(m, type, id, dim, RRD_ID_LENGTH_MAX);
-
- char context[RRD_ID_LENGTH_MAX + 1];
- snprintfz(context, RRD_ID_LENGTH_MAX, "statsd_%s.%s", dim, m->name);
+ char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1], context[RRD_ID_LENGTH_MAX + 1];
+ statsd_get_metric_type_and_id(m, type, id, context, dim, RRD_ID_LENGTH_MAX);
char title[RRD_ID_LENGTH_MAX + 1];
snprintfz(title, RRD_ID_LENGTH_MAX, "statsd private chart for %s %s", dim, m->name);
@@ -1542,16 +1694,16 @@ static inline void statsd_private_chart_counter_or_meter(STATSD_METRIC *m, const
, type
, id
, NULL // name
- , family // family (submenu)
+ , m->family?m->family:family // family (submenu)
, context // context
, title // title
- , "events/s" // units
+ , m->units?m->units:"events/s" // units
, NETDATA_CHART_PRIO_STATSD_PRIVATE
, statsd.update_every
, RRDSET_TYPE_AREA
);
- m->rd_value = rrddim_add(m->st, dim, NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ m->rd_value = rrddim_add(m->st, dim, m->dimname?m->dimname:NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
if(m->options & STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT)
m->rd_count = rrddim_add(m->st, "events", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
@@ -1570,11 +1722,8 @@ static inline void statsd_private_chart_set(STATSD_METRIC *m) {
debug(D_STATSD, "updating private chart for set metric '%s'", m->name);
if(unlikely(!m->st)) {
- char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1];
- statsd_get_metric_type_and_id(m, type, id, "set", RRD_ID_LENGTH_MAX);
-
- char context[RRD_ID_LENGTH_MAX + 1];
- snprintfz(context, RRD_ID_LENGTH_MAX, "statsd_set.%s", m->name);
+ char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1], context[RRD_ID_LENGTH_MAX + 1];
+ statsd_get_metric_type_and_id(m, type, id, context, "set", RRD_ID_LENGTH_MAX);
char title[RRD_ID_LENGTH_MAX + 1];
snprintfz(title, RRD_ID_LENGTH_MAX, "statsd private chart for set %s", m->name);
@@ -1584,16 +1733,16 @@ static inline void statsd_private_chart_set(STATSD_METRIC *m) {
, type
, id
, NULL // name
- , "sets" // family (submenu)
+ , m->family?m->family:"sets" // family (submenu)
, context // context
, title // title
- , "entries" // units
+ , m->units?m->units:"entries" // units
, NETDATA_CHART_PRIO_STATSD_PRIVATE
, statsd.update_every
, RRDSET_TYPE_LINE
);
- m->rd_value = rrddim_add(m->st, "set", "set size", 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ m->rd_value = rrddim_add(m->st, "set", m->dimname?m->dimname:"unique", 1, 1, RRD_ALGORITHM_ABSOLUTE);
if(m->options & STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT)
m->rd_count = rrddim_add(m->st, "events", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
@@ -1608,15 +1757,54 @@ static inline void statsd_private_chart_set(STATSD_METRIC *m) {
rrdset_done(m->st);
}
+static inline void statsd_private_chart_dictionary(STATSD_METRIC *m) {
+ debug(D_STATSD, "updating private chart for dictionary metric '%s'", m->name);
+
+ if(unlikely(!m->st)) {
+ char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1], context[RRD_ID_LENGTH_MAX + 1];
+ statsd_get_metric_type_and_id(m, type, id, context, "dictionary", RRD_ID_LENGTH_MAX);
+
+ char title[RRD_ID_LENGTH_MAX + 1];
+ snprintfz(title, RRD_ID_LENGTH_MAX, "statsd private chart for dictionary %s", m->name);
+
+ m->st = statsd_private_rrdset_create(
+ m
+ , type
+ , id
+ , NULL // name
+ , m->family?m->family:"dictionaries" // family (submenu)
+ , context // context
+ , title // title
+ , m->units?m->units:"events/s" // units
+ , NETDATA_CHART_PRIO_STATSD_PRIVATE
+ , statsd.update_every
+ , RRDSET_TYPE_STACKED
+ );
+
+ if(m->options & STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT)
+ m->rd_count = rrddim_add(m->st, "events", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ }
+ else rrdset_next(m->st);
+
+ STATSD_METRIC_DICTIONARY_ITEM *t;
+ dfe_start_read(m->dictionary.dict, t) {
+ if (!t->rd) t->rd = rrddim_add(m->st, t_name, NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ rrddim_set_by_pointer(m->st, t->rd, (collected_number)t->count);
+ }
+ dfe_done(t);
+
+ if(m->rd_count)
+ rrddim_set_by_pointer(m->st, m->rd_count, m->events);
+
+ rrdset_done(m->st);
+}
+
static inline void statsd_private_chart_timer_or_histogram(STATSD_METRIC *m, const char *dim, const char *family, const char *units) {
debug(D_STATSD, "updating private chart for %s metric '%s'", dim, m->name);
if(unlikely(!m->st)) {
- char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1];
- statsd_get_metric_type_and_id(m, type, id, dim, RRD_ID_LENGTH_MAX);
-
- char context[RRD_ID_LENGTH_MAX + 1];
- snprintfz(context, RRD_ID_LENGTH_MAX, "statsd_%s.%s", dim, m->name);
+ char type[RRD_ID_LENGTH_MAX + 1], id[RRD_ID_LENGTH_MAX + 1], context[RRD_ID_LENGTH_MAX + 1];
+ statsd_get_metric_type_and_id(m, type, id, context, dim, RRD_ID_LENGTH_MAX);
char title[RRD_ID_LENGTH_MAX + 1];
snprintfz(title, RRD_ID_LENGTH_MAX, "statsd private chart for %s %s", dim, m->name);
@@ -1626,10 +1814,10 @@ static inline void statsd_private_chart_timer_or_histogram(STATSD_METRIC *m, con
, type
, id
, NULL // name
- , family // family (submenu)
+ , m->family?m->family:family // family (submenu)
, context // context
, title // title
- , units // units
+ , m->units?m->units:units // units
, NETDATA_CHART_PRIO_STATSD_PRIVATE
, statsd.update_every
, RRDSET_TYPE_AREA
@@ -1641,7 +1829,7 @@ static inline void statsd_private_chart_timer_or_histogram(STATSD_METRIC *m, con
m->histogram.ext->rd_percentile = rrddim_add(m->st, statsd.histogram_percentile_str, NULL, 1, statsd.decimal_detail, RRD_ALGORITHM_ABSOLUTE);
m->histogram.ext->rd_median = rrddim_add(m->st, "median", NULL, 1, statsd.decimal_detail, RRD_ALGORITHM_ABSOLUTE);
m->histogram.ext->rd_stddev = rrddim_add(m->st, "stddev", NULL, 1, statsd.decimal_detail, RRD_ALGORITHM_ABSOLUTE);
- m->histogram.ext->rd_sum = rrddim_add(m->st, "sum", NULL, 1, statsd.decimal_detail, RRD_ALGORITHM_ABSOLUTE);
+ //m->histogram.ext->rd_sum = rrddim_add(m->st, "sum", NULL, 1, statsd.decimal_detail, RRD_ALGORITHM_ABSOLUTE);
if(m->options & STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT)
m->rd_count = rrddim_add(m->st, "events", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
@@ -1653,7 +1841,7 @@ static inline void statsd_private_chart_timer_or_histogram(STATSD_METRIC *m, con
rrddim_set_by_pointer(m->st, m->histogram.ext->rd_percentile, m->histogram.ext->last_percentile);
rrddim_set_by_pointer(m->st, m->histogram.ext->rd_median, m->histogram.ext->last_median);
rrddim_set_by_pointer(m->st, m->histogram.ext->rd_stddev, m->histogram.ext->last_stddev);
- rrddim_set_by_pointer(m->st, m->histogram.ext->rd_sum, m->histogram.ext->last_sum);
+ //rrddim_set_by_pointer(m->st, m->histogram.ext->rd_sum, m->histogram.ext->last_sum);
rrddim_set_by_pointer(m->st, m->rd_value, m->last);
if(m->rd_count)
@@ -1721,6 +1909,34 @@ static inline void statsd_flush_set(STATSD_METRIC *m) {
statsd_private_chart_set(m);
}
+static inline void statsd_flush_dictionary(STATSD_METRIC *m) {
+ debug(D_STATSD, "flushing dictionary metric '%s'", m->name);
+
+ int updated = 0;
+ if(unlikely(!m->reset && m->count)) {
+ m->last = (collected_number)m->dictionary.unique;
+
+ m->reset = 1;
+ updated = 1;
+ }
+ else {
+ m->last = 0;
+ }
+
+ if(unlikely(m->options & STATSD_METRIC_OPTION_PRIVATE_CHART_ENABLED && (updated || !(m->options & STATSD_METRIC_OPTION_SHOW_GAPS_WHEN_NOT_COLLECTED))))
+ statsd_private_chart_dictionary(m);
+
+ if(m->dictionary.unique >= statsd.dictionary_max_unique) {
+ if(!(m->options & STATSD_METRIC_OPTION_COLLECTION_FULL_LOGGED)) {
+ m->options |= STATSD_METRIC_OPTION_COLLECTION_FULL_LOGGED;
+ info(
+ "STATSD dictionary '%s' reach max of %zu items - try increasing 'dictionaries max unique dimensions' in netdata.conf",
+ m->name,
+ m->dictionary.unique);
+ }
+ }
+}
+
static inline void statsd_flush_timer_or_histogram(STATSD_METRIC *m, const char *dim, const char *family, const char *units) {
debug(D_STATSD, "flushing %s metric '%s'", dim, m->name);
@@ -1793,6 +2009,7 @@ static inline RRD_ALGORITHM statsd_algorithm_for_metric(STATSD_METRIC *m) {
case STATSD_METRIC_TYPE_METER:
case STATSD_METRIC_TYPE_COUNTER:
+ case STATSD_METRIC_TYPE_DICTIONARY:
return RRD_ALGORITHM_INCREMENTAL;
}
}
@@ -2059,6 +2276,7 @@ const char *statsd_metric_type_string(STATSD_METRIC_TYPE type) {
case STATSD_METRIC_TYPE_HISTOGRAM: return "histogram";
case STATSD_METRIC_TYPE_METER: return "meter";
case STATSD_METRIC_TYPE_SET: return "set";
+ case STATSD_METRIC_TYPE_DICTIONARY: return "dictionary";
case STATSD_METRIC_TYPE_TIMER: return "timer";
default: return "unknown";
}
@@ -2068,7 +2286,7 @@ static inline void statsd_flush_index_metrics(STATSD_INDEX *index, void (*flush_
STATSD_METRIC *m;
// find the useful metrics (incremental = each time we are called, we check the new metrics only)
- for(m = index->first; m ; m = m->next) {
+ dfe_start_read(index->dict, m) {
// since we add new metrics at the beginning
// check for useful charts, until the point we last checked
if(unlikely(is_metric_checked(m))) break;
@@ -2109,6 +2327,7 @@ static inline void statsd_flush_index_metrics(STATSD_INDEX *index, void (*flush_
index->first_useful = m;
}
}
+ dfe_done(m);
// flush all the useful metrics
for(m = index->first_useful; m ; m = m->next_useful) {
@@ -2145,17 +2364,75 @@ static void statsd_main_cleanup(void *data) {
info("STATSD: closing sockets...");
listen_sockets_close(&statsd.sockets);
+ // destroy the dictionaries
+ dictionary_destroy(statsd.gauges.dict);
+ dictionary_destroy(statsd.meters.dict);
+ dictionary_destroy(statsd.counters.dict);
+ dictionary_destroy(statsd.histograms.dict);
+ dictionary_destroy(statsd.dictionaries.dict);
+ dictionary_destroy(statsd.sets.dict);
+ dictionary_destroy(statsd.timers.dict);
+
info("STATSD: cleanup completed.");
static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
+
+ worker_unregister();
}
+#define WORKER_STATSD_FLUSH_GAUGES 0
+#define WORKER_STATSD_FLUSH_COUNTERS 1
+#define WORKER_STATSD_FLUSH_METERS 2
+#define WORKER_STATSD_FLUSH_TIMERS 3
+#define WORKER_STATSD_FLUSH_HISTOGRAMS 4
+#define WORKER_STATSD_FLUSH_SETS 5
+#define WORKER_STATSD_FLUSH_DICTIONARIES 6
+#define WORKER_STATSD_FLUSH_STATS 7
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 8
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 8
+#endif
+
void *statsd_main(void *ptr) {
+ worker_register("STATSDFLUSH");
+ worker_register_job_name(WORKER_STATSD_FLUSH_GAUGES, "gauges");
+ worker_register_job_name(WORKER_STATSD_FLUSH_COUNTERS, "counters");
+ worker_register_job_name(WORKER_STATSD_FLUSH_METERS, "meters");
+ worker_register_job_name(WORKER_STATSD_FLUSH_TIMERS, "timers");
+ worker_register_job_name(WORKER_STATSD_FLUSH_HISTOGRAMS, "histograms");
+ worker_register_job_name(WORKER_STATSD_FLUSH_SETS, "sets");
+ worker_register_job_name(WORKER_STATSD_FLUSH_DICTIONARIES, "dictionaries");
+ worker_register_job_name(WORKER_STATSD_FLUSH_STATS, "statistics");
+
netdata_thread_cleanup_push(statsd_main_cleanup, ptr);
+ statsd.gauges.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ statsd.meters.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ statsd.counters.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ statsd.histograms.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ statsd.dictionaries.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ statsd.sets.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+ statsd.timers.dict = dictionary_create(STATSD_DICTIONARY_OPTIONS);
+
+ dictionary_register_insert_callback(statsd.gauges.dict, dictionary_metric_insert_callback, &statsd.gauges);
+ dictionary_register_insert_callback(statsd.meters.dict, dictionary_metric_insert_callback, &statsd.meters);
+ dictionary_register_insert_callback(statsd.counters.dict, dictionary_metric_insert_callback, &statsd.counters);
+ dictionary_register_insert_callback(statsd.histograms.dict, dictionary_metric_insert_callback, &statsd.histograms);
+ dictionary_register_insert_callback(statsd.dictionaries.dict, dictionary_metric_insert_callback, &statsd.dictionaries);
+ dictionary_register_insert_callback(statsd.sets.dict, dictionary_metric_insert_callback, &statsd.sets);
+ dictionary_register_insert_callback(statsd.timers.dict, dictionary_metric_insert_callback, &statsd.timers);
+
+ dictionary_register_delete_callback(statsd.gauges.dict, dictionary_metric_delete_callback, &statsd.gauges);
+ dictionary_register_delete_callback(statsd.meters.dict, dictionary_metric_delete_callback, &statsd.meters);
+ dictionary_register_delete_callback(statsd.counters.dict, dictionary_metric_delete_callback, &statsd.counters);
+ dictionary_register_delete_callback(statsd.histograms.dict, dictionary_metric_delete_callback, &statsd.histograms);
+ dictionary_register_delete_callback(statsd.dictionaries.dict, dictionary_metric_delete_callback, &statsd.dictionaries);
+ dictionary_register_delete_callback(statsd.sets.dict, dictionary_metric_delete_callback, &statsd.sets);
+ dictionary_register_delete_callback(statsd.timers.dict, dictionary_metric_delete_callback, &statsd.timers);
+
// ----------------------------------------------------------------------------------------------------------------
// statsd configuration
- statsd.enabled = config_get_boolean(CONFIG_SECTION_STATSD, "enabled", statsd.enabled);
+ statsd.enabled = config_get_boolean(CONFIG_SECTION_PLUGINS, "statsd", statsd.enabled);
statsd.update_every = default_rrd_update_every;
statsd.update_every = (int)config_get_number(CONFIG_SECTION_STATSD, "update every (flushInterval)", statsd.update_every);
@@ -2188,13 +2465,16 @@ void *statsd_main(void *ptr) {
statsd.histogram_percentile_str = strdupz(buffer);
}
- if(config_get_boolean(CONFIG_SECTION_STATSD, "add dimension for number of events received", 1)) {
+ statsd.dictionary_max_unique = config_get_number(CONFIG_SECTION_STATSD, "dictionaries max unique dimensions", statsd.dictionary_max_unique);
+
+ if(config_get_boolean(CONFIG_SECTION_STATSD, "add dimension for number of events received", 0)) {
statsd.gauges.default_options |= STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT;
statsd.counters.default_options |= STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT;
statsd.meters.default_options |= STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT;
statsd.sets.default_options |= STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT;
statsd.histograms.default_options |= STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT;
statsd.timers.default_options |= STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT;
+ statsd.dictionaries.default_options |= STATSD_METRIC_OPTION_CHART_DIMENSION_COUNT;
}
if(config_get_boolean(CONFIG_SECTION_STATSD, "gaps on gauges (deleteGauges)", 0))
@@ -2215,6 +2495,9 @@ void *statsd_main(void *ptr) {
if(config_get_boolean(CONFIG_SECTION_STATSD, "gaps on timers (deleteTimers)", 0))
statsd.timers.default_options |= STATSD_METRIC_OPTION_SHOW_GAPS_WHEN_NOT_COLLECTED;
+ if(config_get_boolean(CONFIG_SECTION_STATSD, "gaps on dictionaries (deleteDictionaries)", 0))
+ statsd.dictionaries.default_options |= STATSD_METRIC_OPTION_SHOW_GAPS_WHEN_NOT_COLLECTED;
+
size_t max_sockets = (size_t)config_get_number(CONFIG_SECTION_STATSD, "statsd server max TCP sockets", (long long int)(rlimit_nofile.rlim_cur / 4));
#ifdef STATSD_MULTITHREADED
@@ -2275,6 +2558,7 @@ void *statsd_main(void *ptr) {
RRDDIM *rd_metrics_meter = rrddim_add(st_metrics, "meters", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
RRDDIM *rd_metrics_histogram = rrddim_add(st_metrics, "histograms", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
RRDDIM *rd_metrics_set = rrddim_add(st_metrics, "sets", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ RRDDIM *rd_metrics_dictionary= rrddim_add(st_metrics, "dictionaries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
RRDSET *st_useful_metrics = rrdset_create_localhost(
"netdata"
@@ -2296,6 +2580,7 @@ void *statsd_main(void *ptr) {
RRDDIM *rd_useful_metrics_meter = rrddim_add(st_useful_metrics, "meters", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
RRDDIM *rd_useful_metrics_histogram = rrddim_add(st_useful_metrics, "histograms", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
RRDDIM *rd_useful_metrics_set = rrddim_add(st_useful_metrics, "sets", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
+ RRDDIM *rd_useful_metrics_dictionary= rrddim_add(st_useful_metrics, "dictionaries", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
RRDSET *st_events = rrdset_create_localhost(
"netdata"
@@ -2317,6 +2602,7 @@ void *statsd_main(void *ptr) {
RRDDIM *rd_events_meter = rrddim_add(st_events, "meters", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
RRDDIM *rd_events_histogram = rrddim_add(st_events, "histograms", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
RRDDIM *rd_events_set = rrddim_add(st_events, "sets", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
+ RRDDIM *rd_events_dictionary= rrddim_add(st_events, "dictionaries", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
RRDDIM *rd_events_unknown = rrddim_add(st_events, "unknown", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
RRDDIM *rd_events_errors = rrddim_add(st_events, "errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
@@ -2420,70 +2706,39 @@ void *statsd_main(void *ptr) {
);
RRDDIM *rd_pcharts = rrddim_add(st_pcharts, "charts", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
- RRDSET *stcpu_thread = rrdset_create_localhost(
- "netdata"
- , "plugin_statsd_charting_cpu"
- , NULL
- , "statsd"
- , "netdata.statsd_cpu"
- , "Netdata statsd charting thread CPU usage"
- , "milliseconds/s"
- , PLUGIN_STATSD_NAME
- , "stats"
- , 132001
- , statsd.update_every
- , RRDSET_TYPE_STACKED
- );
-
- RRDDIM *rd_user = rrddim_add(stcpu_thread, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- RRDDIM *rd_system = rrddim_add(stcpu_thread, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- struct rusage thread;
-
- for(i = 0; i < statsd.threads ;i++) {
- char id[100 + 1];
- char title[100 + 1];
-
- snprintfz(id, 100, "plugin_statsd_collector%d_cpu", i + 1);
- snprintfz(title, 100, "Netdata statsd collector thread No %d CPU usage", i + 1);
-
- statsd.collection_threads_status[i].st_cpu = rrdset_create_localhost(
- "netdata"
- , id
- , NULL
- , "statsd"
- , "netdata.statsd_cpu"
- , title
- , "milliseconds/s"
- , PLUGIN_STATSD_NAME
- , "stats"
- , 132002 + i
- , statsd.update_every
- , RRDSET_TYPE_STACKED
- );
-
- statsd.collection_threads_status[i].rd_user = rrddim_add(statsd.collection_threads_status[i].st_cpu, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- statsd.collection_threads_status[i].rd_system = rrddim_add(statsd.collection_threads_status[i].st_cpu, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
- }
-
- // ----------------------------------------------------------------------------------------------------------------
+ // ----------------------------------------------------------------------------------------------------------------
// statsd thread to turn metrics into charts
usec_t step = statsd.update_every * USEC_PER_SEC;
heartbeat_t hb;
heartbeat_init(&hb);
while(!netdata_exit) {
+ worker_is_idle();
usec_t hb_dt = heartbeat_next(&hb, step);
+ worker_is_busy(WORKER_STATSD_FLUSH_GAUGES);
statsd_flush_index_metrics(&statsd.gauges, statsd_flush_gauge);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_COUNTERS);
statsd_flush_index_metrics(&statsd.counters, statsd_flush_counter);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_METERS);
statsd_flush_index_metrics(&statsd.meters, statsd_flush_meter);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_TIMERS);
statsd_flush_index_metrics(&statsd.timers, statsd_flush_timer);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_HISTOGRAMS);
statsd_flush_index_metrics(&statsd.histograms, statsd_flush_histogram);
+
+ worker_is_busy(WORKER_STATSD_FLUSH_SETS);
statsd_flush_index_metrics(&statsd.sets, statsd_flush_set);
- statsd_update_all_app_charts();
+ worker_is_busy(WORKER_STATSD_FLUSH_DICTIONARIES);
+ statsd_flush_index_metrics(&statsd.dictionaries,statsd_flush_dictionary);
- getrusage(RUSAGE_THREAD, &thread);
+ worker_is_busy(WORKER_STATSD_FLUSH_STATS);
+ statsd_update_all_app_charts();
if(unlikely(netdata_exit))
break;
@@ -2498,9 +2753,6 @@ void *statsd_main(void *ptr) {
rrdset_next(st_tcp_connects);
rrdset_next(st_tcp_connected);
rrdset_next(st_pcharts);
- rrdset_next(stcpu_thread);
- for(i = 0; i < statsd.threads ;i++)
- rrdset_next(statsd.collection_threads_status[i].st_cpu);
}
rrddim_set_by_pointer(st_metrics, rd_metrics_gauge, (collected_number)statsd.gauges.metrics);
@@ -2509,6 +2761,7 @@ void *statsd_main(void *ptr) {
rrddim_set_by_pointer(st_metrics, rd_metrics_meter, (collected_number)statsd.meters.metrics);
rrddim_set_by_pointer(st_metrics, rd_metrics_histogram, (collected_number)statsd.histograms.metrics);
rrddim_set_by_pointer(st_metrics, rd_metrics_set, (collected_number)statsd.sets.metrics);
+ rrddim_set_by_pointer(st_metrics, rd_metrics_dictionary, (collected_number)statsd.dictionaries.metrics);
rrdset_done(st_metrics);
rrddim_set_by_pointer(st_useful_metrics, rd_useful_metrics_gauge, (collected_number)statsd.gauges.useful);
@@ -2517,6 +2770,7 @@ void *statsd_main(void *ptr) {
rrddim_set_by_pointer(st_useful_metrics, rd_useful_metrics_meter, (collected_number)statsd.meters.useful);
rrddim_set_by_pointer(st_useful_metrics, rd_useful_metrics_histogram, (collected_number)statsd.histograms.useful);
rrddim_set_by_pointer(st_useful_metrics, rd_useful_metrics_set, (collected_number)statsd.sets.useful);
+ rrddim_set_by_pointer(st_useful_metrics, rd_useful_metrics_dictionary, (collected_number)statsd.dictionaries.useful);
rrdset_done(st_useful_metrics);
rrddim_set_by_pointer(st_events, rd_events_gauge, (collected_number)statsd.gauges.events);
@@ -2525,6 +2779,7 @@ void *statsd_main(void *ptr) {
rrddim_set_by_pointer(st_events, rd_events_meter, (collected_number)statsd.meters.events);
rrddim_set_by_pointer(st_events, rd_events_histogram, (collected_number)statsd.histograms.events);
rrddim_set_by_pointer(st_events, rd_events_set, (collected_number)statsd.sets.events);
+ rrddim_set_by_pointer(st_events, rd_events_dictionary, (collected_number)statsd.dictionaries.events);
rrddim_set_by_pointer(st_events, rd_events_unknown, (collected_number)statsd.unknown_types);
rrddim_set_by_pointer(st_events, rd_events_errors, (collected_number)statsd.socket_errors);
rrdset_done(st_events);
@@ -2550,16 +2805,6 @@ void *statsd_main(void *ptr) {
rrddim_set_by_pointer(st_pcharts, rd_pcharts, (collected_number)statsd.private_charts);
rrdset_done(st_pcharts);
-
- rrddim_set_by_pointer(stcpu_thread, rd_user, thread.ru_utime.tv_sec * 1000000ULL + thread.ru_utime.tv_usec);
- rrddim_set_by_pointer(stcpu_thread, rd_system, thread.ru_stime.tv_sec * 1000000ULL + thread.ru_stime.tv_usec);
- rrdset_done(stcpu_thread);
-
- for(i = 0; i < statsd.threads ;i++) {
- rrddim_set_by_pointer(statsd.collection_threads_status[i].st_cpu, statsd.collection_threads_status[i].rd_user, statsd.collection_threads_status[i].rusage.ru_utime.tv_sec * 1000000ULL + statsd.collection_threads_status[i].rusage.ru_utime.tv_usec);
- rrddim_set_by_pointer(statsd.collection_threads_status[i].st_cpu, statsd.collection_threads_status[i].rd_system, statsd.collection_threads_status[i].rusage.ru_stime.tv_sec * 1000000ULL + statsd.collection_threads_status[i].rusage.ru_stime.tv_usec);
- rrdset_done(statsd.collection_threads_status[i].st_cpu);
- }
}
cleanup: ; // added semi-colon to prevent older gcc error: label at end of compound statement