summaryrefslogtreecommitdiffstats
path: root/database/ram/rrddim_mem.c
blob: 3226d3c0de8f1f9c5846449acd8327a06fadcde6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
// SPDX-License-Identifier: GPL-3.0-or-later

#include "rrddim_mem.h"

// ----------------------------------------------------------------------------
// RRDDIM legacy data collection functions

STORAGE_METRIC_HANDLE *rrddim_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_unused) {
    return (STORAGE_METRIC_HANDLE *)rd;
}

void rrddim_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle __maybe_unused) {
    ;
}

STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle) {
    RRDDIM *rd = (RRDDIM *)db_metric_handle;
    rd->db[rd->rrdset->current_entry] = pack_storage_number(NAN, SN_FLAG_NONE);
    struct mem_collect_handle *ch = calloc(1, sizeof(struct mem_collect_handle));
    ch->rd = rd;
    return (STORAGE_COLLECT_HANDLE *)ch;
}

void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number,
        NETDATA_DOUBLE min_value,
        NETDATA_DOUBLE max_value,
        uint16_t count,
        uint16_t anomaly_count,
        SN_FLAGS flags)
{
    UNUSED(point_in_time);
    UNUSED(min_value);
    UNUSED(max_value);
    UNUSED(count);
    UNUSED(anomaly_count);

    struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
    RRDDIM *rd = ch->rd;
    rd->db[rd->rrdset->current_entry] = pack_storage_number(number, flags);
}

void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle) {
    struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
    RRDDIM *rd = ch->rd;
    memset(rd->db, 0, rd->entries * sizeof(storage_number));
}

int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
    free(collection_handle);
    return 0;
}

// ----------------------------------------------------------------------------

// get the total duration in seconds of the round robin database
#define rrddim_duration(st) (( (time_t)(rd)->rrdset->counter >= (time_t)(rd)->rrdset->entries ? (time_t)(rd)->rrdset->entries : (time_t)(rd)->rrdset->counter ) * (time_t)(rd)->rrdset->update_every)

// get the last slot updated in the round robin database
#define rrddim_last_slot(rd) ((size_t)(((rd)->rrdset->current_entry == 0) ? (rd)->rrdset->entries - 1 : (rd)->rrdset->current_entry - 1))

// return the slot that has the oldest value
#define rrddim_first_slot(rd) ((size_t)((rd)->rrdset->counter >= (size_t)(rd)->rrdset->entries ? (rd)->rrdset->current_entry : 0))

// get the slot of the round robin database, for the given timestamp (t)
// it always returns a valid slot, although may not be for the time requested if the time is outside the round robin database
// only valid when not using dbengine
static inline size_t rrddim_time2slot(RRDDIM *rd, time_t t) {
    size_t ret = 0;
    time_t last_entry_t  = rrddim_query_latest_time((STORAGE_METRIC_HANDLE *)rd);
    time_t first_entry_t = rrddim_query_oldest_time((STORAGE_METRIC_HANDLE *)rd);
    size_t entries       = rd->rrdset->entries;
    size_t first_slot    = rrddim_first_slot(rd);
    size_t last_slot     = rrddim_last_slot(rd);
    size_t update_every  = rd->rrdset->update_every;

    if(t >= last_entry_t) {
        // the requested time is after the last entry we have
        ret = last_slot;
    }
    else {
        if(t <= first_entry_t) {
            // the requested time is before the first entry we have
            ret = first_slot;
        }
        else {
            if(last_slot >= (size_t)((last_entry_t - t) / update_every))
                ret = last_slot - ((last_entry_t - t) / update_every);
            else
                ret = last_slot - ((last_entry_t - t) / update_every) + entries;
        }
    }

    if(unlikely(ret >= entries)) {
        error("INTERNAL ERROR: rrddim_time2slot() on %s returns values outside entries", rd->name);
        ret = entries - 1;
    }

    return ret;
}

// get the timestamp of a specific slot in the round robin database
// only valid when not using dbengine
static inline time_t rrddim_slot2time(RRDDIM *rd, size_t slot) {
    time_t ret;
    time_t last_entry_t  = rrddim_query_latest_time((STORAGE_METRIC_HANDLE *)rd);
    time_t first_entry_t = rrddim_query_oldest_time((STORAGE_METRIC_HANDLE *)rd);
    size_t entries       = rd->rrdset->entries;
    size_t last_slot     = rrddim_last_slot(rd);
    size_t update_every  = rd->rrdset->update_every;

    if(slot >= entries) {
        error("INTERNAL ERROR: caller of rrddim_slot2time() gives invalid slot %zu", slot);
        slot = entries - 1;
    }

    if(slot > last_slot)
        ret = last_entry_t - (time_t)(update_every * (last_slot - slot + entries));
    else
        ret = last_entry_t - (time_t)(update_every * (last_slot - slot));

    if(unlikely(ret < first_entry_t)) {
        error("INTERNAL ERROR: rrddim_slot2time() on %s returns time too far in the past", rd->name);
        ret = first_entry_t;
    }

    if(unlikely(ret > last_entry_t)) {
        error("INTERNAL ERROR: rrddim_slot2time() on %s returns time into the future", rd->name);
        ret = last_entry_t;
    }

    return ret;
}

// ----------------------------------------------------------------------------
// RRDDIM legacy database query functions

void rrddim_query_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type) {
    UNUSED(tier_query_fetch_type);

    RRDDIM *rd = (RRDDIM *)db_metric_handle;

    handle->rd = rd;
    handle->start_time = start_time;
    handle->end_time = end_time;
    struct mem_query_handle* h = calloc(1, sizeof(struct mem_query_handle));
    h->slot           = rrddim_time2slot(rd, start_time);
    h->last_slot      = rrddim_time2slot(rd, end_time);
    h->dt = rd->rrdset->update_every;

    h->next_timestamp = start_time;
    h->slot_timestamp = rrddim_slot2time(rd, h->slot);
    h->last_timestamp = rrddim_slot2time(rd, h->last_slot);

    // info("RRDDIM QUERY INIT: start %ld, end %ld, next %ld, first %ld, last %ld, dt %ld", start_time, end_time, h->next_timestamp, h->slot_timestamp, h->last_timestamp, h->dt);

    handle->handle = (STORAGE_QUERY_HANDLE *)h;
}

// Returns the metric and sets its timestamp into current_time
// IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
// IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
STORAGE_POINT rrddim_query_next_metric(struct rrddim_query_handle *handle) {
    RRDDIM *rd = handle->rd;
    struct mem_query_handle* h = (struct mem_query_handle*)handle->handle;
    size_t entries = rd->rrdset->entries;
    size_t slot = h->slot;

    STORAGE_POINT sp;
    sp.count = 1;

    time_t this_timestamp = h->next_timestamp;
    h->next_timestamp += h->dt;

    // set this timestamp for our caller
    sp.start_time = this_timestamp - h->dt;
    sp.end_time = this_timestamp;

    if(unlikely(this_timestamp < h->slot_timestamp)) {
        storage_point_empty(sp, sp.start_time, sp.end_time);
        return sp;
    }

    if(unlikely(this_timestamp > h->last_timestamp)) {
        storage_point_empty(sp, sp.start_time, sp.end_time);
        return sp;
    }

    storage_number n = rd->db[slot++];
    if(unlikely(slot >= entries)) slot = 0;

    h->slot = slot;
    h->slot_timestamp += h->dt;

    sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
    sp.flags = (n & SN_USER_FLAGS);
    sp.min = sp.max = sp.sum = unpack_storage_number(n);

    return sp;
}

int rrddim_query_is_finished(struct rrddim_query_handle *handle) {
    struct mem_query_handle* h = (struct mem_query_handle*)handle->handle;
    return (h->next_timestamp > handle->end_time);
}

void rrddim_query_finalize(struct rrddim_query_handle *handle) {
#ifdef NETDATA_INTERNAL_CHECKS
    if(!rrddim_query_is_finished(handle))
        error("QUERY: query for chart '%s' dimension '%s' has been stopped unfinished", handle->rd->rrdset->id, handle->rd->name);
#endif
    freez(handle->handle);
}

time_t rrddim_query_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
    RRDDIM *rd = (RRDDIM *)db_metric_handle;
    return rd->rrdset->last_updated.tv_sec;
}

time_t rrddim_query_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
    RRDDIM *rd = (RRDDIM *)db_metric_handle;
    return (time_t)(rd->rrdset->last_updated.tv_sec - rrddim_duration(rd));
}