summaryrefslogtreecommitdiffstats
path: root/src/libutil/rrd.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/libutil/rrd.c1502
1 files changed, 1502 insertions, 0 deletions
diff --git a/src/libutil/rrd.c b/src/libutil/rrd.c
new file mode 100644
index 0000000..451e222
--- /dev/null
+++ b/src/libutil/rrd.c
@@ -0,0 +1,1502 @@
+/*-
+ * Copyright 2016 Vsevolod Stakhov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include "rrd.h"
+#include "util.h"
+#include "cfg_file.h"
+#include "logger.h"
+#include "unix-std.h"
+#include "cryptobox.h"
+#include <math.h>
+
+#define RSPAMD_RRD_DS_COUNT METRIC_ACTION_MAX
+#define RSPAMD_RRD_OLD_DS_COUNT 4
+#define RSPAMD_RRD_RRA_COUNT 4
+
+#define msg_err_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_CRITICAL, \
+ "rrd", file->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_warn_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_WARNING, \
+ "rrd", file->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_info_rrd(...) rspamd_default_log_function(G_LOG_LEVEL_INFO, \
+ "rrd", file->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+#define msg_debug_rrd(...) rspamd_conditional_debug_fast(NULL, NULL, \
+ rspamd_rrd_log_id, "rrd", file->id, \
+ G_STRFUNC, \
+ __VA_ARGS__)
+
+INIT_LOG_MODULE(rrd)
+
+static GQuark
+rrd_error_quark(void)
+{
+ return g_quark_from_static_string("rrd-error");
+}
+
+/**
+ * Convert rrd dst type from string to numeric value
+ */
+enum rrd_dst_type
+rrd_dst_from_string(const gchar *str)
+{
+ if (g_ascii_strcasecmp(str, "counter") == 0) {
+ return RRD_DST_COUNTER;
+ }
+ else if (g_ascii_strcasecmp(str, "absolute") == 0) {
+ return RRD_DST_ABSOLUTE;
+ }
+ else if (g_ascii_strcasecmp(str, "gauge") == 0) {
+ return RRD_DST_GAUGE;
+ }
+ else if (g_ascii_strcasecmp(str, "cdef") == 0) {
+ return RRD_DST_CDEF;
+ }
+ else if (g_ascii_strcasecmp(str, "derive") == 0) {
+ return RRD_DST_DERIVE;
+ }
+
+ return RRD_DST_INVALID;
+}
+
+/**
+ * Convert numeric presentation of dst to string
+ */
+const gchar *
+rrd_dst_to_string(enum rrd_dst_type type)
+{
+ switch (type) {
+ case RRD_DST_COUNTER:
+ return "COUNTER";
+ case RRD_DST_ABSOLUTE:
+ return "ABSOLUTE";
+ case RRD_DST_GAUGE:
+ return "GAUGE";
+ case RRD_DST_CDEF:
+ return "CDEF";
+ case RRD_DST_DERIVE:
+ return "DERIVE";
+ default:
+ return "U";
+ }
+
+ return "U";
+}
+
+/**
+ * Convert rrd consolidation function type from string to numeric value
+ */
+enum rrd_cf_type
+rrd_cf_from_string(const gchar *str)
+{
+ if (g_ascii_strcasecmp(str, "average") == 0) {
+ return RRD_CF_AVERAGE;
+ }
+ else if (g_ascii_strcasecmp(str, "minimum") == 0) {
+ return RRD_CF_MINIMUM;
+ }
+ else if (g_ascii_strcasecmp(str, "maximum") == 0) {
+ return RRD_CF_MAXIMUM;
+ }
+ else if (g_ascii_strcasecmp(str, "last") == 0) {
+ return RRD_CF_LAST;
+ }
+ /* XXX: add other CF functions supported by rrd */
+
+ return RRD_CF_INVALID;
+}
+
+/**
+ * Convert numeric presentation of cf to string
+ */
+const gchar *
+rrd_cf_to_string(enum rrd_cf_type type)
+{
+ switch (type) {
+ case RRD_CF_AVERAGE:
+ return "AVERAGE";
+ case RRD_CF_MINIMUM:
+ return "MINIMUM";
+ case RRD_CF_MAXIMUM:
+ return "MAXIMUM";
+ case RRD_CF_LAST:
+ return "LAST";
+ default:
+ return "U";
+ }
+
+ /* XXX: add other CF functions supported by rrd */
+
+ return "U";
+}
+
+void rrd_make_default_rra(const gchar *cf_name,
+ gulong pdp_cnt,
+ gulong rows,
+ struct rrd_rra_def *rra)
+{
+ g_assert(cf_name != NULL);
+ g_assert(rrd_cf_from_string(cf_name) != RRD_CF_INVALID);
+
+ rra->pdp_cnt = pdp_cnt;
+ rra->row_cnt = rows;
+ rspamd_strlcpy(rra->cf_nam, cf_name, sizeof(rra->cf_nam));
+ memset(rra->par, 0, sizeof(rra->par));
+ rra->par[RRA_cdp_xff_val].dv = 0.5;
+}
+
+void rrd_make_default_ds(const gchar *name,
+ const gchar *type,
+ gulong pdp_step,
+ struct rrd_ds_def *ds)
+{
+ g_assert(name != NULL);
+ g_assert(type != NULL);
+ g_assert(rrd_dst_from_string(type) != RRD_DST_INVALID);
+
+ rspamd_strlcpy(ds->ds_nam, name, sizeof(ds->ds_nam));
+ rspamd_strlcpy(ds->dst, type, sizeof(ds->dst));
+ memset(ds->par, 0, sizeof(ds->par));
+ ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
+ ds->par[RRD_DS_min_val].dv = NAN;
+ ds->par[RRD_DS_max_val].dv = NAN;
+}
+
+/**
+ * Check rrd file for correctness (size, cookies, etc)
+ */
+static gboolean
+rspamd_rrd_check_file(const gchar *filename, gboolean need_data, GError **err)
+{
+ gint fd, i;
+ struct stat st;
+ struct rrd_file_head head;
+ struct rrd_rra_def rra;
+ gint head_size;
+
+ fd = open(filename, O_RDWR);
+ if (fd == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd open error: %s", strerror(errno));
+ return FALSE;
+ }
+
+ if (fstat(fd, &st) == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
+ close(fd);
+ return FALSE;
+ }
+ if (st.st_size < (goffset) sizeof(struct rrd_file_head)) {
+ /* We have trimmed file */
+ g_set_error(err, rrd_error_quark(), EINVAL, "rrd size is bad: %ud",
+ (guint) st.st_size);
+ close(fd);
+ return FALSE;
+ }
+
+ /* Try to read header */
+ if (read(fd, &head, sizeof(head)) != sizeof(head)) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd read head error: %s",
+ strerror(errno));
+ close(fd);
+ return FALSE;
+ }
+ /* Check magic */
+ if (memcmp(head.version, RRD_VERSION, sizeof(head.version)) != 0) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL, "rrd head error: bad cookie");
+ close(fd);
+ return FALSE;
+ }
+ if (head.float_cookie != RRD_FLOAT_COOKIE) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL, "rrd head error: another architecture "
+ "(file cookie %g != our cookie %g)",
+ head.float_cookie, RRD_FLOAT_COOKIE);
+ close(fd);
+ return FALSE;
+ }
+ /* Check for other params */
+ if (head.ds_cnt <= 0 || head.rra_cnt <= 0) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL, "rrd head cookies error: bad rra or ds count");
+ close(fd);
+ return FALSE;
+ }
+ /* Now we can calculate the overall size of rrd */
+ head_size = sizeof(struct rrd_file_head) +
+ sizeof(struct rrd_ds_def) * head.ds_cnt +
+ sizeof(struct rrd_rra_def) * head.rra_cnt +
+ sizeof(struct rrd_live_head) +
+ sizeof(struct rrd_pdp_prep) * head.ds_cnt +
+ sizeof(struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt +
+ sizeof(struct rrd_rra_ptr) * head.rra_cnt;
+ if (st.st_size < (goffset) head_size) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd file seems to have stripped header: %d",
+ head_size);
+ close(fd);
+ return FALSE;
+ }
+
+ if (need_data) {
+ /* Now check rra */
+ if (lseek(fd, sizeof(struct rrd_ds_def) * head.ds_cnt,
+ SEEK_CUR) == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd head lseek error: %s",
+ strerror(errno));
+ close(fd);
+ return FALSE;
+ }
+ for (i = 0; i < (gint) head.rra_cnt; i++) {
+ if (read(fd, &rra, sizeof(rra)) != sizeof(rra)) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd read rra error: %s",
+ strerror(errno));
+ close(fd);
+ return FALSE;
+ }
+ head_size += rra.row_cnt * head.ds_cnt * sizeof(gdouble);
+ }
+
+ if (st.st_size != head_size) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL, "rrd file seems to have incorrect size: %d, must be %d",
+ (gint) st.st_size, head_size);
+ close(fd);
+ return FALSE;
+ }
+ }
+
+ close(fd);
+ return TRUE;
+}
+
+/**
+ * Adjust pointers in mmapped rrd file
+ * @param file
+ */
+static void
+rspamd_rrd_adjust_pointers(struct rspamd_rrd_file *file, gboolean completed)
+{
+ guint8 *ptr;
+
+ ptr = file->map;
+ file->stat_head = (struct rrd_file_head *) ptr;
+ ptr += sizeof(struct rrd_file_head);
+ file->ds_def = (struct rrd_ds_def *) ptr;
+ ptr += sizeof(struct rrd_ds_def) * file->stat_head->ds_cnt;
+ file->rra_def = (struct rrd_rra_def *) ptr;
+ ptr += sizeof(struct rrd_rra_def) * file->stat_head->rra_cnt;
+ file->live_head = (struct rrd_live_head *) ptr;
+ ptr += sizeof(struct rrd_live_head);
+ file->pdp_prep = (struct rrd_pdp_prep *) ptr;
+ ptr += sizeof(struct rrd_pdp_prep) * file->stat_head->ds_cnt;
+ file->cdp_prep = (struct rrd_cdp_prep *) ptr;
+ ptr += sizeof(struct rrd_cdp_prep) * file->stat_head->rra_cnt *
+ file->stat_head->ds_cnt;
+ file->rra_ptr = (struct rrd_rra_ptr *) ptr;
+ if (completed) {
+ ptr += sizeof(struct rrd_rra_ptr) * file->stat_head->rra_cnt;
+ file->rrd_value = (gdouble *) ptr;
+ }
+ else {
+ file->rrd_value = NULL;
+ }
+}
+
+static void
+rspamd_rrd_calculate_checksum(struct rspamd_rrd_file *file)
+{
+ guchar sigbuf[rspamd_cryptobox_HASHBYTES];
+ struct rrd_ds_def *ds;
+ guint i;
+ rspamd_cryptobox_hash_state_t st;
+
+ if (file->finalized) {
+ rspamd_cryptobox_hash_init(&st, NULL, 0);
+ rspamd_cryptobox_hash_update(&st, file->filename, strlen(file->filename));
+
+ for (i = 0; i < file->stat_head->ds_cnt; i++) {
+ ds = &file->ds_def[i];
+ rspamd_cryptobox_hash_update(&st, ds->ds_nam, sizeof(ds->ds_nam));
+ }
+
+ rspamd_cryptobox_hash_final(&st, sigbuf);
+
+ file->id = rspamd_encode_base32(sigbuf, sizeof(sigbuf), RSPAMD_BASE32_DEFAULT);
+ }
+}
+
+static int
+rspamd_rrd_open_exclusive(const gchar *filename)
+{
+ struct timespec sleep_ts = {
+ .tv_sec = 0,
+ .tv_nsec = 1000000};
+ gint fd;
+
+ fd = open(filename, O_RDWR);
+
+ if (fd == -1) {
+ return -1;
+ }
+
+ for (;;) {
+ if (rspamd_file_lock(fd, TRUE) == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ nanosleep(&sleep_ts, NULL);
+ continue;
+ }
+ else {
+ close(fd);
+ return -1;
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ return fd;
+};
+
+/**
+ * Open completed or incompleted rrd file
+ * @param filename
+ * @param completed
+ * @param err
+ * @return
+ */
+static struct rspamd_rrd_file *
+rspamd_rrd_open_common(const gchar *filename, gboolean completed, GError **err)
+{
+ struct rspamd_rrd_file *file;
+ gint fd;
+ struct stat st;
+
+ if (!rspamd_rrd_check_file(filename, completed, err)) {
+ return NULL;
+ }
+
+ file = g_malloc0(sizeof(struct rspamd_rrd_file));
+
+ /* Open file */
+ fd = rspamd_rrd_open_exclusive(filename);
+ if (fd == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd open error: %s", strerror(errno));
+ g_free(file);
+ return FALSE;
+ }
+
+ if (fstat(fd, &st) == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
+ rspamd_file_unlock(fd, FALSE);
+ g_free(file);
+ close(fd);
+ return FALSE;
+ }
+ /* Mmap file */
+ file->size = st.st_size;
+ if ((file->map =
+ mmap(NULL, st.st_size, PROT_READ | PROT_WRITE,
+ MAP_SHARED, fd, 0)) == MAP_FAILED) {
+
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), ENOMEM, "mmap failed: %s", strerror(errno));
+ g_free(file);
+ return NULL;
+ }
+
+ file->fd = fd;
+
+ /* Adjust pointers */
+ rspamd_rrd_adjust_pointers(file, completed);
+
+ /* Mark it as finalized */
+ file->finalized = completed;
+
+ file->filename = g_strdup(filename);
+ rspamd_rrd_calculate_checksum(file);
+
+ return file;
+}
+
+/**
+ * Open (and mmap) existing RRD file
+ * @param filename path
+ * @param err error pointer
+ * @return rrd file structure
+ */
+struct rspamd_rrd_file *
+rspamd_rrd_open(const gchar *filename, GError **err)
+{
+ struct rspamd_rrd_file *file;
+
+ if ((file = rspamd_rrd_open_common(filename, TRUE, err))) {
+ msg_info_rrd("rrd file opened: %s", filename);
+ }
+
+ return file;
+}
+
+/**
+ * Create basic header for rrd file
+ * @param filename file path
+ * @param ds_count number of data sources
+ * @param rra_count number of round robin archives
+ * @param pdp_step step of primary data points
+ * @param err error pointer
+ * @return TRUE if file has been created
+ */
+struct rspamd_rrd_file *
+rspamd_rrd_create(const gchar *filename,
+ gulong ds_count,
+ gulong rra_count,
+ gulong pdp_step,
+ gdouble initial_ticks,
+ GError **err)
+{
+ struct rspamd_rrd_file *new;
+ struct rrd_file_head head;
+ struct rrd_ds_def ds;
+ struct rrd_rra_def rra;
+ struct rrd_live_head lh;
+ struct rrd_pdp_prep pdp;
+ struct rrd_cdp_prep cdp;
+ struct rrd_rra_ptr rra_ptr;
+ gint fd;
+ guint i, j;
+
+ /* Open file */
+ fd = open(filename, O_RDWR | O_CREAT | O_EXCL, 0644);
+ if (fd == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd create error: %s",
+ strerror(errno));
+ return NULL;
+ }
+
+ rspamd_file_lock(fd, FALSE);
+
+ /* Fill header */
+ memset(&head, 0, sizeof(head));
+ head.rra_cnt = rra_count;
+ head.ds_cnt = ds_count;
+ head.pdp_step = pdp_step;
+ memcpy(head.cookie, RRD_COOKIE, sizeof(head.cookie));
+ memcpy(head.version, RRD_VERSION, sizeof(head.version));
+ head.float_cookie = RRD_FLOAT_COOKIE;
+
+ if (write(fd, &head, sizeof(head)) != sizeof(head)) {
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd write error: %s", strerror(errno));
+ return NULL;
+ }
+
+ /* Fill DS section */
+ memset(&ds, 0, sizeof(ds));
+ memset(&ds.ds_nam, 0, sizeof(ds.ds_nam));
+ memcpy(&ds.dst, "COUNTER", sizeof("COUNTER"));
+ memset(&ds.par, 0, sizeof(ds.par));
+ for (i = 0; i < ds_count; i++) {
+ if (write(fd, &ds, sizeof(ds)) != sizeof(ds)) {
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd write error: %s",
+ strerror(errno));
+ return NULL;
+ }
+ }
+
+ /* Fill RRA section */
+ memset(&rra, 0, sizeof(rra));
+ memcpy(&rra.cf_nam, "AVERAGE", sizeof("AVERAGE"));
+ rra.pdp_cnt = 1;
+ memset(&rra.par, 0, sizeof(rra.par));
+ for (i = 0; i < rra_count; i++) {
+ if (write(fd, &rra, sizeof(rra)) != sizeof(rra)) {
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd write error: %s",
+ strerror(errno));
+ return NULL;
+ }
+ }
+
+ /* Fill live header */
+ memset(&lh, 0, sizeof(lh));
+ lh.last_up = (glong) initial_ticks;
+ lh.last_up_usec = (glong) ((initial_ticks - lh.last_up) * 1e6f);
+
+ if (write(fd, &lh, sizeof(lh)) != sizeof(lh)) {
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd write error: %s", strerror(errno));
+ return NULL;
+ }
+
+ /* Fill pdp prep */
+ memset(&pdp, 0, sizeof(pdp));
+ memcpy(&pdp.last_ds, "U", sizeof("U"));
+ memset(&pdp.scratch, 0, sizeof(pdp.scratch));
+ pdp.scratch[PDP_val].dv = NAN;
+ pdp.scratch[PDP_unkn_sec_cnt].lv = 0;
+
+ for (i = 0; i < ds_count; i++) {
+ if (write(fd, &pdp, sizeof(pdp)) != sizeof(pdp)) {
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd write error: %s",
+ strerror(errno));
+ return NULL;
+ }
+ }
+
+ /* Fill cdp prep */
+ memset(&cdp, 0, sizeof(cdp));
+ memset(&cdp.scratch, 0, sizeof(cdp.scratch));
+ cdp.scratch[CDP_val].dv = NAN;
+ cdp.scratch[CDP_unkn_pdp_cnt].lv = 0;
+
+ for (i = 0; i < rra_count; i++) {
+ for (j = 0; j < ds_count; j++) {
+ if (write(fd, &cdp, sizeof(cdp)) != sizeof(cdp)) {
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd write error: %s",
+ strerror(errno));
+ return NULL;
+ }
+ }
+ }
+
+ /* Set row pointers */
+ memset(&rra_ptr, 0, sizeof(rra_ptr));
+ for (i = 0; i < rra_count; i++) {
+ if (write(fd, &rra_ptr, sizeof(rra_ptr)) != sizeof(rra_ptr)) {
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd write error: %s",
+ strerror(errno));
+ return NULL;
+ }
+ }
+
+ rspamd_file_unlock(fd, FALSE);
+ close(fd);
+
+ new = rspamd_rrd_open_common(filename, FALSE, err);
+
+ return new;
+}
+
+/**
+ * Add data sources to rrd file
+ * @param filename path to file
+ * @param ds array of struct rrd_ds_def
+ * @param err error pointer
+ * @return TRUE if data sources were added
+ */
+gboolean
+rspamd_rrd_add_ds(struct rspamd_rrd_file *file, GArray *ds, GError **err)
+{
+
+ if (file == NULL || file->stat_head->ds_cnt * sizeof(struct rrd_ds_def) !=
+ ds->len) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL, "rrd add ds failed: wrong arguments");
+ return FALSE;
+ }
+
+ /* Straightforward memcpy */
+ memcpy(file->ds_def, ds->data, ds->len);
+
+ return TRUE;
+}
+
+/**
+ * Add round robin archives to rrd file
+ * @param filename path to file
+ * @param ds array of struct rrd_rra_def
+ * @param err error pointer
+ * @return TRUE if archives were added
+ */
+gboolean
+rspamd_rrd_add_rra(struct rspamd_rrd_file *file, GArray *rra, GError **err)
+{
+ if (file == NULL || file->stat_head->rra_cnt *
+ sizeof(struct rrd_rra_def) !=
+ rra->len) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL, "rrd add rra failed: wrong arguments");
+ return FALSE;
+ }
+
+ /* Straightforward memcpy */
+ memcpy(file->rra_def, rra->data, rra->len);
+
+ return TRUE;
+}
+
+/**
+ * Finalize rrd file header and initialize all RRA in the file
+ * @param filename file path
+ * @param err error pointer
+ * @return TRUE if rrd file is ready for use
+ */
+gboolean
+rspamd_rrd_finalize(struct rspamd_rrd_file *file, GError **err)
+{
+ gint fd;
+ guint i;
+ gint count = 0;
+ gdouble vbuf[1024];
+ struct stat st;
+
+ if (file == NULL || file->filename == NULL || file->fd == -1) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL, "rrd add rra failed: wrong arguments");
+ return FALSE;
+ }
+
+ fd = file->fd;
+
+ if (lseek(fd, 0, SEEK_END) == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd seek error: %s", strerror(errno));
+ close(fd);
+ return FALSE;
+ }
+
+ /* Adjust CDP */
+ for (i = 0; i < file->stat_head->rra_cnt; i++) {
+ file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv = 0;
+ /* Randomize row pointer (disabled) */
+ /* file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt; */
+ file->rra_ptr->cur_row = file->rra_def[i].row_cnt - 1;
+ /* Calculate values count */
+ count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
+ }
+
+ munmap(file->map, file->size);
+ /* Write values */
+ for (i = 0; i < G_N_ELEMENTS(vbuf); i++) {
+ vbuf[i] = NAN;
+ }
+
+ while (count > 0) {
+ /* Write values in buffered matter */
+ if (write(fd, vbuf,
+ MIN((gint) G_N_ELEMENTS(vbuf), count) * sizeof(gdouble)) == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd write error: %s",
+ strerror(errno));
+ close(fd);
+ return FALSE;
+ }
+ count -= G_N_ELEMENTS(vbuf);
+ }
+
+ if (fstat(fd, &st) == -1) {
+ g_set_error(err,
+ rrd_error_quark(), errno, "rrd stat error: %s", strerror(errno));
+ close(fd);
+ return FALSE;
+ }
+
+ /* Mmap again */
+ file->size = st.st_size;
+ if ((file->map =
+ mmap(NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd,
+ 0)) == MAP_FAILED) {
+ close(fd);
+ g_set_error(err,
+ rrd_error_quark(), ENOMEM, "mmap failed: %s", strerror(errno));
+
+ return FALSE;
+ }
+
+ /* Adjust pointers */
+ rspamd_rrd_adjust_pointers(file, TRUE);
+
+ file->finalized = TRUE;
+ rspamd_rrd_calculate_checksum(file);
+ msg_info_rrd("rrd file created: %s", file->filename);
+
+ return TRUE;
+}
+
+/**
+ * Update pdp_prep data
+ * @param file rrd file
+ * @param vals new values
+ * @param pdp_new new pdp array
+ * @param interval time elapsed from the last update
+ * @return
+ */
+static gboolean
+rspamd_rrd_update_pdp_prep(struct rspamd_rrd_file *file,
+ gdouble *vals,
+ gdouble *pdp_new,
+ gdouble interval)
+{
+ guint i;
+ enum rrd_dst_type type;
+
+ for (i = 0; i < file->stat_head->ds_cnt; i++) {
+ type = rrd_dst_from_string(file->ds_def[i].dst);
+
+ if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv < interval) {
+ rspamd_strlcpy(file->pdp_prep[i].last_ds, "U",
+ sizeof(file->pdp_prep[i].last_ds));
+ pdp_new[i] = NAN;
+ msg_debug_rrd("adding unknown point interval %.3f is less than heartbeat %l",
+ interval, file->ds_def[i].par[RRD_DS_mrhb_cnt].lv);
+ }
+ else {
+ switch (type) {
+ case RRD_DST_COUNTER:
+ case RRD_DST_DERIVE:
+ if (file->pdp_prep[i].last_ds[0] == 'U') {
+ pdp_new[i] = NAN;
+ msg_debug_rrd("last point is NaN for point %ud", i);
+ }
+ else {
+ pdp_new[i] = vals[i] - strtod(file->pdp_prep[i].last_ds,
+ NULL);
+ msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
+ }
+ break;
+ case RRD_DST_GAUGE:
+ pdp_new[i] = vals[i] * interval;
+ msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
+ break;
+ case RRD_DST_ABSOLUTE:
+ pdp_new[i] = vals[i];
+ msg_debug_rrd("new PDP %ud, %.3f", i, pdp_new[i]);
+ break;
+ default:
+ return FALSE;
+ }
+ }
+
+ /* Copy value to the last_ds */
+ if (!isnan(vals[i])) {
+ rspamd_snprintf(file->pdp_prep[i].last_ds,
+ sizeof(file->pdp_prep[i].last_ds), "%.4f", vals[i]);
+ }
+ else {
+ file->pdp_prep[i].last_ds[0] = 'U';
+ file->pdp_prep[i].last_ds[1] = '\0';
+ }
+ }
+
+
+ return TRUE;
+}
+
+/**
+ * Update step for this pdp
+ * @param file
+ * @param pdp_new new pdp array
+ * @param pdp_temp temp pdp array
+ * @param interval time till last update
+ * @param pre_int pre interval
+ * @param post_int post intervall
+ * @param pdp_diff time till last pdp update
+ */
+static void
+rspamd_rrd_update_pdp_step(struct rspamd_rrd_file *file,
+ gdouble *pdp_new,
+ gdouble *pdp_temp,
+ gdouble interval,
+ gulong pdp_diff)
+{
+ guint i;
+ rrd_value_t *scratch;
+ gulong heartbeat;
+
+
+ for (i = 0; i < file->stat_head->ds_cnt; i++) {
+ scratch = file->pdp_prep[i].scratch;
+ heartbeat = file->ds_def[i].par[RRD_DS_mrhb_cnt].lv;
+
+ if (!isnan(pdp_new[i])) {
+ if (isnan(scratch[PDP_val].dv)) {
+ scratch[PDP_val].dv = 0;
+ }
+ }
+
+ /* Check interval value for heartbeat for this DS */
+ if ((interval > heartbeat) ||
+ (file->stat_head->pdp_step / 2.0 < scratch[PDP_unkn_sec_cnt].lv)) {
+ pdp_temp[i] = NAN;
+ }
+ else {
+ pdp_temp[i] = scratch[PDP_val].dv /
+ ((double) (pdp_diff - scratch[PDP_unkn_sec_cnt].lv));
+ }
+
+ if (isnan(pdp_new[i])) {
+ scratch[PDP_unkn_sec_cnt].lv = interval;
+ scratch[PDP_val].dv = NAN;
+ }
+ else {
+ scratch[PDP_unkn_sec_cnt].lv = 0;
+ scratch[PDP_val].dv = pdp_new[i] / interval;
+ }
+
+ msg_debug_rrd("new temp PDP %ud, %.3f -> %.3f, scratch: %3f",
+ i, pdp_new[i], pdp_temp[i],
+ scratch[PDP_val].dv);
+ }
+}
+
+/**
+ * Update CDP for this rra
+ * @param file rrd file
+ * @param pdp_steps how much pdp steps elapsed from the last update
+ * @param pdp_offset offset from pdp
+ * @param rra_steps how much steps must be updated for this rra
+ * @param rra_index index of desired rra
+ * @param pdp_temp temporary pdp points
+ */
+static void
+rspamd_rrd_update_cdp(struct rspamd_rrd_file *file,
+ gdouble pdp_steps,
+ gdouble pdp_offset,
+ gulong *rra_steps,
+ gulong rra_index,
+ gdouble *pdp_temp)
+{
+ guint i;
+ struct rrd_rra_def *rra;
+ rrd_value_t *scratch;
+ enum rrd_cf_type cf;
+ gdouble last_cdp = INFINITY, cur_cdp = INFINITY;
+ gulong pdp_in_cdp;
+
+ rra = &file->rra_def[rra_index];
+ cf = rrd_cf_from_string(rra->cf_nam);
+
+ /* Iterate over all DS for this RRA */
+ for (i = 0; i < file->stat_head->ds_cnt; i++) {
+ /* Get CDP for this RRA and DS */
+ scratch =
+ file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
+ if (rra->pdp_cnt > 1) {
+ /* Do we have any CDP to update for this rra ? */
+ if (rra_steps[rra_index] > 0) {
+
+ if (isnan(pdp_temp[i])) {
+ /* New pdp is nan */
+ /* Increment unknown points count */
+ scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
+ /* Reset secondary value */
+ scratch[CDP_secondary_val].dv = NAN;
+ }
+ else {
+ scratch[CDP_secondary_val].dv = pdp_temp[i];
+ }
+
+ /* Check XFF for this rra */
+ if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt *
+ rra->par[RRA_cdp_xff_val].lv) {
+ /* XFF is reached */
+ scratch[CDP_primary_val].dv = NAN;
+ }
+ else {
+ /* Need to initialize CDP using specified consolidation */
+ switch (cf) {
+ case RRD_CF_AVERAGE:
+ last_cdp =
+ isnan(scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val].dv;
+ cur_cdp = isnan(pdp_temp[i]) ? 0.0 : pdp_temp[i];
+ scratch[CDP_primary_val].dv =
+ (last_cdp + cur_cdp *
+ pdp_offset) /
+ (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
+ break;
+ case RRD_CF_MAXIMUM:
+ last_cdp =
+ isnan(scratch[CDP_val].dv) ? -INFINITY : scratch[CDP_val].dv;
+ cur_cdp = isnan(pdp_temp[i]) ? -INFINITY : pdp_temp[i];
+ scratch[CDP_primary_val].dv = MAX(last_cdp, cur_cdp);
+ break;
+ case RRD_CF_MINIMUM:
+ last_cdp =
+ isnan(scratch[CDP_val].dv) ? INFINITY : scratch[CDP_val].dv;
+ cur_cdp = isnan(pdp_temp[i]) ? INFINITY : pdp_temp[i];
+ scratch[CDP_primary_val].dv = MIN(last_cdp, cur_cdp);
+ break;
+ case RRD_CF_LAST:
+ default:
+ scratch[CDP_primary_val].dv = pdp_temp[i];
+ last_cdp = INFINITY;
+ break;
+ }
+ }
+
+ /* Init carry of this CDP */
+ pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
+ if (pdp_in_cdp == 0 || isnan(pdp_temp[i])) {
+ /* Set overflow */
+ switch (cf) {
+ case RRD_CF_AVERAGE:
+ scratch[CDP_val].dv = 0;
+ break;
+ case RRD_CF_MAXIMUM:
+ scratch[CDP_val].dv = -INFINITY;
+ break;
+ case RRD_CF_MINIMUM:
+ scratch[CDP_val].dv = INFINITY;
+ break;
+ default:
+ scratch[CDP_val].dv = NAN;
+ break;
+ }
+ }
+ else {
+ /* Special carry for average */
+ if (cf == RRD_CF_AVERAGE) {
+ scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
+ }
+ else {
+ scratch[CDP_val].dv = pdp_temp[i];
+ }
+ }
+
+ scratch[CDP_unkn_pdp_cnt].lv = 0;
+
+ msg_debug_rrd("update cdp for DS %d with value %.3f, "
+ "stored value: %.3f, carry: %.3f",
+ i, last_cdp,
+ scratch[CDP_primary_val].dv, scratch[CDP_val].dv);
+ }
+ /* In this case we just need to update cdp_prep for this RRA */
+ else {
+ if (isnan(pdp_temp[i])) {
+ /* Just increase undefined zone */
+ scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
+ }
+ else {
+ /* Calculate cdp value */
+ last_cdp = scratch[CDP_val].dv;
+ switch (cf) {
+ case RRD_CF_AVERAGE:
+ if (isnan(last_cdp)) {
+ scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
+ }
+ else {
+ scratch[CDP_val].dv = last_cdp + pdp_temp[i] *
+ pdp_steps;
+ }
+ break;
+ case RRD_CF_MAXIMUM:
+ scratch[CDP_val].dv = MAX(last_cdp, pdp_temp[i]);
+ break;
+ case RRD_CF_MINIMUM:
+ scratch[CDP_val].dv = MIN(last_cdp, pdp_temp[i]);
+ break;
+ case RRD_CF_LAST:
+ scratch[CDP_val].dv = pdp_temp[i];
+ break;
+ default:
+ scratch[CDP_val].dv = NAN;
+ break;
+ }
+ }
+
+ msg_debug_rrd("aggregate cdp %d with pdp %.3f, "
+ "stored value: %.3f",
+ i, pdp_temp[i], scratch[CDP_val].dv);
+ }
+ }
+ else {
+ /* We have nothing to consolidate, but we may miss some pdp */
+ if (pdp_steps > 2) {
+ /* Just write PDP value */
+ scratch[CDP_primary_val].dv = pdp_temp[i];
+ scratch[CDP_secondary_val].dv = pdp_temp[i];
+ }
+ }
+ }
+}
+
+/**
+ * Update RRA in a file
+ * @param file rrd file
+ * @param rra_steps steps for each rra
+ * @param now current time
+ */
+void rspamd_rrd_write_rra(struct rspamd_rrd_file *file, gulong *rra_steps)
+{
+ guint i, j, ds_cnt;
+ struct rrd_rra_def *rra;
+ struct rrd_cdp_prep *cdp;
+ gdouble *rra_row = file->rrd_value, *cur_row;
+
+
+ ds_cnt = file->stat_head->ds_cnt;
+ /* Iterate over all RRA */
+ for (i = 0; i < file->stat_head->rra_cnt; i++) {
+ rra = &file->rra_def[i];
+
+ if (rra_steps[i] > 0) {
+
+ /* Move row ptr */
+ if (++file->rra_ptr[i].cur_row >= rra->row_cnt) {
+ file->rra_ptr[i].cur_row = 0;
+ }
+ /* Calculate seek */
+ cdp = &file->cdp_prep[ds_cnt * i];
+ cur_row = rra_row + ds_cnt * file->rra_ptr[i].cur_row;
+ /* Iterate over DS */
+ for (j = 0; j < ds_cnt; j++) {
+ cur_row[j] = cdp[j].scratch[CDP_primary_val].dv;
+ msg_debug_rrd("write cdp %d: %.3f", j, cur_row[j]);
+ }
+ }
+
+ rra_row += rra->row_cnt * ds_cnt;
+ }
+}
+
+/**
+ * Add record to rrd file
+ * @param file rrd file object
+ * @param points points (must be row suitable for this RRA, depending on ds count)
+ * @param err error pointer
+ * @return TRUE if a row has been added
+ */
+gboolean
+rspamd_rrd_add_record(struct rspamd_rrd_file *file,
+ GArray *points,
+ gdouble ticks,
+ GError **err)
+{
+ gdouble interval, *pdp_new, *pdp_temp;
+ guint i;
+ glong seconds, microseconds;
+ gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
+ prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
+
+ if (file == NULL || file->stat_head->ds_cnt * sizeof(gdouble) !=
+ points->len) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL,
+ "rrd add points failed: wrong arguments");
+ return FALSE;
+ }
+
+ /* Get interval */
+ seconds = (glong) ticks;
+ microseconds = (glong) ((ticks - seconds) * 1000000.);
+ interval = ticks - ((gdouble) file->live_head->last_up +
+ file->live_head->last_up_usec / 1000000.);
+
+ msg_debug_rrd("update rrd record after %.3f seconds", interval);
+
+ /* Update PDP preparation values */
+ pdp_new = g_malloc0(sizeof(gdouble) * file->stat_head->ds_cnt);
+ pdp_temp = g_malloc0(sizeof(gdouble) * file->stat_head->ds_cnt);
+ /* How much steps need to be updated in each RRA */
+ rra_steps = g_malloc0(sizeof(gulong) * file->stat_head->rra_cnt);
+
+ if (!rspamd_rrd_update_pdp_prep(file, (gdouble *) points->data, pdp_new,
+ interval)) {
+ g_set_error(err,
+ rrd_error_quark(), EINVAL,
+ "rrd update pdp failed: wrong arguments");
+ g_free(pdp_new);
+ g_free(pdp_temp);
+ g_free(rra_steps);
+ return FALSE;
+ }
+
+ /* Calculate elapsed steps */
+ /* Age in seconds for previous pdp store */
+ prev_pdp_age = file->live_head->last_up % file->stat_head->pdp_step;
+ /* Time in seconds for last pdp update */
+ prev_pdp_step = file->live_head->last_up - prev_pdp_age;
+ /* Age in seconds from current time to required pdp time */
+ cur_pdp_age = seconds % file->stat_head->pdp_step;
+ /* Time of desired pdp step */
+ cur_pdp_step = seconds - cur_pdp_age;
+ cur_pdp_count = cur_pdp_step / file->stat_head->pdp_step;
+ pdp_steps = (cur_pdp_step - prev_pdp_step) / file->stat_head->pdp_step;
+
+
+ if (pdp_steps == 0) {
+ /* Simple update of pdp prep */
+ for (i = 0; i < file->stat_head->ds_cnt; i++) {
+ if (isnan(pdp_new[i])) {
+ /* Increment unknown period */
+ file->pdp_prep[i].scratch[PDP_unkn_sec_cnt].lv += floor(
+ interval);
+ }
+ else {
+ if (isnan(file->pdp_prep[i].scratch[PDP_val].dv)) {
+ /* Reset pdp to the current value */
+ file->pdp_prep[i].scratch[PDP_val].dv = pdp_new[i];
+ }
+ else {
+ /* Increment pdp value */
+ file->pdp_prep[i].scratch[PDP_val].dv += pdp_new[i];
+ }
+ }
+ }
+ }
+ else {
+ /* Complex update of PDP, CDP and RRA */
+
+ /* Update PDP for this step */
+ rspamd_rrd_update_pdp_step(file,
+ pdp_new,
+ pdp_temp,
+ interval,
+ pdp_steps * file->stat_head->pdp_step);
+
+
+ /* Update CDP points for each RRA*/
+ for (i = 0; i < file->stat_head->rra_cnt; i++) {
+ /* Calculate pdp offset for this RRA */
+ pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count %
+ file->rra_def[i].pdp_cnt;
+ /* How much steps we got for this RRA */
+ if (pdp_offset <= pdp_steps) {
+ rra_steps[i] =
+ (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
+ }
+ else {
+ /* This rra have not passed enough pdp steps */
+ rra_steps[i] = 0;
+ }
+
+ msg_debug_rrd("cdp: %ud, rra steps: %ul(%ul), pdp steps: %ul",
+ i, rra_steps[i], pdp_offset, pdp_steps);
+
+ /* Update this specific CDP */
+ rspamd_rrd_update_cdp(file,
+ pdp_steps,
+ pdp_offset,
+ rra_steps,
+ i,
+ pdp_temp);
+ }
+
+ /* Write RRA */
+ rspamd_rrd_write_rra(file, rra_steps);
+ }
+ file->live_head->last_up = seconds;
+ file->live_head->last_up_usec = microseconds;
+
+ /* Sync and invalidate */
+ msync(file->map, file->size, MS_ASYNC | MS_INVALIDATE);
+
+ g_free(pdp_new);
+ g_free(pdp_temp);
+ g_free(rra_steps);
+
+ return TRUE;
+}
+
+/**
+ * Close rrd file
+ * @param file
+ * @return
+ */
+gint rspamd_rrd_close(struct rspamd_rrd_file *file)
+{
+ if (file == NULL) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ munmap(file->map, file->size);
+ close(file->fd);
+ g_free(file->filename);
+ g_free(file->id);
+
+ g_free(file);
+
+ return 0;
+}
+
+static struct rspamd_rrd_file *
+rspamd_rrd_create_file(const gchar *path, gboolean finalize, GError **err)
+{
+ struct rspamd_rrd_file *file;
+ struct rrd_ds_def ds[RSPAMD_RRD_DS_COUNT];
+ struct rrd_rra_def rra[RSPAMD_RRD_RRA_COUNT];
+ gint i;
+ GArray ar;
+
+ /* Try to create new rrd file */
+
+ file = rspamd_rrd_create(path, RSPAMD_RRD_DS_COUNT, RSPAMD_RRD_RRA_COUNT,
+ 1, rspamd_get_calendar_ticks(), err);
+
+ if (file == NULL) {
+ return NULL;
+ }
+
+ /* Create DS and RRA */
+
+ for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i++) {
+ rrd_make_default_ds(rspamd_action_to_str(i),
+ rrd_dst_to_string(RRD_DST_COUNTER), 1, &ds[i]);
+ }
+
+ ar.data = (gchar *) ds;
+ ar.len = sizeof(ds);
+
+ if (!rspamd_rrd_add_ds(file, &ar, err)) {
+ rspamd_rrd_close(file);
+ return NULL;
+ }
+
+ /* Once per minute for 1 day */
+ rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
+ 60, 24 * 60, &rra[0]);
+ /* Once per 5 minutes for 1 week */
+ rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
+ 5 * 60, 7 * 24 * 60 / 5, &rra[1]);
+ /* Once per 10 mins for 1 month */
+ rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
+ 60 * 10, 30 * 24 * 6, &rra[2]);
+ /* Once per hour for 1 year */
+ rrd_make_default_rra(rrd_cf_to_string(RRD_CF_AVERAGE),
+ 60 * 60, 365 * 24, &rra[3]);
+ ar.data = (gchar *) rra;
+ ar.len = sizeof(rra);
+
+ if (!rspamd_rrd_add_rra(file, &ar, err)) {
+ rspamd_rrd_close(file);
+ return NULL;
+ }
+
+ if (finalize && !rspamd_rrd_finalize(file, err)) {
+ rspamd_rrd_close(file);
+ return NULL;
+ }
+
+ return file;
+}
+
+static void
+rspamd_rrd_convert_ds(struct rspamd_rrd_file *old,
+ struct rspamd_rrd_file *cur, gint idx_old, gint idx_new)
+{
+ struct rrd_pdp_prep *pdp_prep_old, *pdp_prep_new;
+ struct rrd_cdp_prep *cdp_prep_old, *cdp_prep_new;
+ gdouble *val_old, *val_new;
+ gulong rra_cnt, i, j, points_cnt, old_ds, new_ds;
+
+ rra_cnt = old->stat_head->rra_cnt;
+ pdp_prep_old = &old->pdp_prep[idx_old];
+ pdp_prep_new = &cur->pdp_prep[idx_new];
+ memcpy(pdp_prep_new, pdp_prep_old, sizeof(*pdp_prep_new));
+ val_old = old->rrd_value;
+ val_new = cur->rrd_value;
+ old_ds = old->stat_head->ds_cnt;
+ new_ds = cur->stat_head->ds_cnt;
+
+ for (i = 0; i < rra_cnt; i++) {
+ cdp_prep_old = &old->cdp_prep[i * old_ds] + idx_old;
+ cdp_prep_new = &cur->cdp_prep[i * new_ds] + idx_new;
+ memcpy(cdp_prep_new, cdp_prep_old, sizeof(*cdp_prep_new));
+ points_cnt = old->rra_def[i].row_cnt;
+
+ for (j = 0; j < points_cnt; j++) {
+ val_new[j * new_ds + idx_new] = val_old[j * old_ds + idx_old];
+ }
+
+ val_new += points_cnt * new_ds;
+ val_old += points_cnt * old_ds;
+ }
+}
+
+static struct rspamd_rrd_file *
+rspamd_rrd_convert(const gchar *path, struct rspamd_rrd_file *old,
+ GError **err)
+{
+ struct rspamd_rrd_file *rrd;
+ gchar tpath[PATH_MAX];
+
+ g_assert(old != NULL);
+
+ rspamd_snprintf(tpath, sizeof(tpath), "%s.new", path);
+ rrd = rspamd_rrd_create_file(tpath, TRUE, err);
+
+ if (rrd) {
+ /* Copy old data */
+ memcpy(rrd->live_head, old->live_head, sizeof(*rrd->live_head));
+ memcpy(rrd->rra_ptr, old->rra_ptr,
+ sizeof(*old->rra_ptr) * rrd->stat_head->rra_cnt);
+
+ /*
+ * Old DSes:
+ * 0 - spam -> reject
+ * 1 - probable spam -> add header
+ * 2 - greylist -> greylist
+ * 3 - ham -> ham
+ */
+ rspamd_rrd_convert_ds(old, rrd, 0, METRIC_ACTION_REJECT);
+ rspamd_rrd_convert_ds(old, rrd, 1, METRIC_ACTION_ADD_HEADER);
+ rspamd_rrd_convert_ds(old, rrd, 2, METRIC_ACTION_GREYLIST);
+ rspamd_rrd_convert_ds(old, rrd, 3, METRIC_ACTION_NOACTION);
+
+ if (unlink(path) == -1) {
+ g_set_error(err, rrd_error_quark(), errno, "cannot unlink old rrd file %s: %s",
+ path, strerror(errno));
+ unlink(tpath);
+ rspamd_rrd_close(rrd);
+
+ return NULL;
+ }
+
+ if (rename(tpath, path) == -1) {
+ g_set_error(err, rrd_error_quark(), errno, "cannot rename old rrd file %s: %s",
+ path, strerror(errno));
+ unlink(tpath);
+ rspamd_rrd_close(rrd);
+
+ return NULL;
+ }
+ }
+
+ return rrd;
+}
+
+struct rspamd_rrd_file *
+rspamd_rrd_file_default(const gchar *path,
+ GError **err)
+{
+ struct rspamd_rrd_file *file, *nf;
+
+ g_assert(path != NULL);
+
+ if (access(path, R_OK) != -1) {
+ /* We can open rrd file */
+ file = rspamd_rrd_open(path, err);
+
+ if (file == NULL) {
+ return NULL;
+ }
+
+
+ if (file->stat_head->rra_cnt != RSPAMD_RRD_RRA_COUNT) {
+ msg_err_rrd("rrd file is not suitable for rspamd: it has "
+ "%ul ds and %ul rra",
+ file->stat_head->ds_cnt,
+ file->stat_head->rra_cnt);
+ g_set_error(err, rrd_error_quark(), EINVAL, "bad rrd file");
+ rspamd_rrd_close(file);
+
+ return NULL;
+ }
+ else if (file->stat_head->ds_cnt == RSPAMD_RRD_OLD_DS_COUNT) {
+ /* Old rrd, need to convert */
+ msg_info_rrd("rrd file %s is not suitable for rspamd, convert it",
+ path);
+
+ nf = rspamd_rrd_convert(path, file, err);
+ rspamd_rrd_close(file);
+
+ return nf;
+ }
+ else if (file->stat_head->ds_cnt == RSPAMD_RRD_DS_COUNT) {
+ return file;
+ }
+ else {
+ msg_err_rrd("rrd file is not suitable for rspamd: it has "
+ "%ul ds and %ul rra",
+ file->stat_head->ds_cnt,
+ file->stat_head->rra_cnt);
+ g_set_error(err, rrd_error_quark(), EINVAL, "bad rrd file");
+ rspamd_rrd_close(file);
+
+ return NULL;
+ }
+ }
+
+ file = rspamd_rrd_create_file(path, TRUE, err);
+
+ return file;
+}
+
+struct rspamd_rrd_query_result *
+rspamd_rrd_query(struct rspamd_rrd_file *file,
+ gulong rra_num)
+{
+ struct rspamd_rrd_query_result *res;
+ struct rrd_rra_def *rra;
+ const gdouble *rra_offset = NULL;
+ guint i;
+
+ g_assert(file != NULL);
+
+
+ if (rra_num > file->stat_head->rra_cnt) {
+ msg_err_rrd("requested unexisting rra: %l", rra_num);
+
+ return NULL;
+ }
+
+ res = g_malloc0(sizeof(*res));
+ res->ds_count = file->stat_head->ds_cnt;
+ res->last_update = (gdouble) file->live_head->last_up +
+ ((gdouble) file->live_head->last_up_usec / 1e6f);
+ res->pdp_per_cdp = file->rra_def[rra_num].pdp_cnt;
+ res->rra_rows = file->rra_def[rra_num].row_cnt;
+ rra_offset = file->rrd_value;
+
+ for (i = 0; i < file->stat_head->rra_cnt; i++) {
+ rra = &file->rra_def[i];
+
+ if (i == rra_num) {
+ res->cur_row = file->rra_ptr[i].cur_row % rra->row_cnt;
+ break;
+ }
+
+ rra_offset += rra->row_cnt * res->ds_count;
+ }
+
+ res->data = rra_offset;
+
+ return res;
+}