summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c46
1 files changed, 23 insertions, 23 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index c6fafc35..0e5a0b40 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -40,9 +40,9 @@ static struct replication_query_statistics replication_queries = {
};
struct replication_query_statistics replication_get_query_statistics(void) {
- netdata_spinlock_lock(&replication_queries.spinlock);
+ spinlock_lock(&replication_queries.spinlock);
struct replication_query_statistics ret = replication_queries;
- netdata_spinlock_unlock(&replication_queries.spinlock);
+ spinlock_unlock(&replication_queries.spinlock);
return ret;
}
@@ -144,7 +144,7 @@ static struct replication_query *replication_query_prepare(
}
if(q->query.enable_streaming) {
- netdata_spinlock_lock(&st->data_collection_lock);
+ spinlock_lock(&st->data_collection_lock);
q->query.locked_data_collection = true;
if (st->last_updated.tv_sec > q->query.before) {
@@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare(
size_t count = 0;
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- if (unlikely(!rd || !rd_dfe.item || !rd->exposed))
+ if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd)))
continue;
if (unlikely(rd_dfe.counter >= q->dimensions)) {
@@ -198,7 +198,7 @@ static struct replication_query *replication_query_prepare(
q->query.execute = false;
if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&st->data_collection_lock);
+ spinlock_unlock(&st->data_collection_lock);
q->query.locked_data_collection = false;
}
@@ -216,20 +216,20 @@ static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STRE
NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
RRDDIM *rd;
rrddim_foreach_read(rd, st){
- if (!rd->exposed) continue;
+ if (!rrddim_check_exposed(rd)) continue;
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC +
- (usec_t) rd->last_collected_time.tv_usec);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
+ (usec_t) rd->collector.last_collected_time.tv_usec);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_int64_encoded(wb, encoding, rd->last_collected_value);
+ buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value);
buffer_fast_strcat(wb, "\n", 1);
}
rrddim_foreach_done(rd);
@@ -248,7 +248,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&q->st->data_collection_lock);
+ spinlock_unlock(&q->st->data_collection_lock);
q->query.locked_data_collection = false;
}
@@ -269,7 +269,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
}
if(executed) {
- netdata_spinlock_lock(&replication_queries.spinlock);
+ spinlock_lock(&replication_queries.spinlock);
replication_queries.queries_started += queries;
replication_queries.queries_finished += queries;
replication_queries.points_read += q->points_read;
@@ -280,7 +280,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q,
s->replication.latest_completed_before_t = q->query.before;
}
- netdata_spinlock_unlock(&replication_queries.spinlock);
+ spinlock_unlock(&replication_queries.spinlock);
}
__atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
@@ -678,7 +678,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size
}
if(locked_data_collection)
- netdata_spinlock_unlock(&st->data_collection_lock);
+ spinlock_unlock(&st->data_collection_lock);
return enable_streaming;
}
@@ -797,9 +797,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c
rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
(unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
- int ret = r->caller.callback(buffer, r->caller.data);
+ ssize_t ret = r->caller.callback(buffer, r->caller.data);
if (ret < 0) {
- error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
+ netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)",
rrdhost_hostname(r->host), rrdset_id(r->st), ret);
return false;
}
@@ -1056,11 +1056,11 @@ static inline bool replication_recursive_lock_mode(char mode) {
if(mode == 'L') { // (L)ock
if(++recursions == 1)
- netdata_spinlock_lock(&replication_globals.spinlock);
+ spinlock_lock(&replication_globals.spinlock);
}
else if(mode == 'U') { // (U)nlock
if(--recursions == 0)
- netdata_spinlock_unlock(&replication_globals.spinlock);
+ spinlock_unlock(&replication_globals.spinlock);
}
else if(mode == 'C') { // (C)heck
if(recursions > 0)
@@ -1096,7 +1096,7 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) {
// ----------------------------------------------------------------------------
// replication sort entry management
-static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
+static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
__atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
@@ -1120,7 +1120,7 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
}
static void replication_sort_entry_add(struct replication_request *rq) {
- if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
+ if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) {
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = true;
rq->not_indexed_preprocessing = false;
@@ -1606,7 +1606,7 @@ static void verify_all_hosts_charts_are_streaming_now(void) {
dfe_done(host);
size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
- info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
+ netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
executed - replication_globals.main_thread.last_executed, errors);
replication_globals.main_thread.last_executed = executed;
}
@@ -1860,7 +1860,7 @@ void *replication_thread_main(void *ptr __maybe_unused) {
int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
- error("replication threads given %d is invalid, resetting to 1", threads);
+ netdata_log_error("replication threads given %d is invalid, resetting to 1", threads);
threads = 1;
}