diff options
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 46 |
1 files changed, 23 insertions, 23 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index c6fafc35..0e5a0b40 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -40,9 +40,9 @@ static struct replication_query_statistics replication_queries = { }; struct replication_query_statistics replication_get_query_statistics(void) { - netdata_spinlock_lock(&replication_queries.spinlock); + spinlock_lock(&replication_queries.spinlock); struct replication_query_statistics ret = replication_queries; - netdata_spinlock_unlock(&replication_queries.spinlock); + spinlock_unlock(&replication_queries.spinlock); return ret; } @@ -144,7 +144,7 @@ static struct replication_query *replication_query_prepare( } if(q->query.enable_streaming) { - netdata_spinlock_lock(&st->data_collection_lock); + spinlock_lock(&st->data_collection_lock); q->query.locked_data_collection = true; if (st->last_updated.tv_sec > q->query.before) { @@ -168,7 +168,7 @@ static struct replication_query *replication_query_prepare( size_t count = 0; RRDDIM *rd; rrddim_foreach_read(rd, st) { - if (unlikely(!rd || !rd_dfe.item || !rd->exposed)) + if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd))) continue; if (unlikely(rd_dfe.counter >= q->dimensions)) { @@ -198,7 +198,7 @@ static struct replication_query *replication_query_prepare( q->query.execute = false; if(q->query.locked_data_collection) { - netdata_spinlock_unlock(&st->data_collection_lock); + spinlock_unlock(&st->data_collection_lock); q->query.locked_data_collection = false; } @@ -216,20 +216,20 @@ static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STRE NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL; RRDDIM *rd; rrddim_foreach_read(rd, st){ - if (!rd->exposed) continue; + if (!rrddim_check_exposed(rd)) continue; buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2); buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id)); buffer_fast_strcat(wb, "' ", 2); - buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC + - (usec_t) rd->last_collected_time.tv_usec); + buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + + (usec_t) rd->collector.last_collected_time.tv_usec); buffer_fast_strcat(wb, " ", 1); - buffer_print_int64_encoded(wb, encoding, rd->last_collected_value); + buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value); buffer_fast_strcat(wb, " ", 1); - buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value); + buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value); buffer_fast_strcat(wb, " ", 1); - buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value); + buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value); buffer_fast_strcat(wb, "\n", 1); } rrddim_foreach_done(rd); @@ -248,7 +248,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, replication_send_chart_collection_state(wb, q->st, q->query.capabilities); if(q->query.locked_data_collection) { - netdata_spinlock_unlock(&q->st->data_collection_lock); + spinlock_unlock(&q->st->data_collection_lock); q->query.locked_data_collection = false; } @@ -269,7 +269,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, } if(executed) { - netdata_spinlock_lock(&replication_queries.spinlock); + spinlock_lock(&replication_queries.spinlock); replication_queries.queries_started += queries; replication_queries.queries_finished += queries; replication_queries.points_read += q->points_read; @@ -280,7 +280,7 @@ static void replication_query_finalize(BUFFER *wb, struct replication_query *q, s->replication.latest_completed_before_t = q->query.before; } - netdata_spinlock_unlock(&replication_queries.spinlock); + spinlock_unlock(&replication_queries.spinlock); } __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED); @@ -678,7 +678,7 @@ bool replication_response_execute_and_finalize(struct replication_query *q, size } if(locked_data_collection) - netdata_spinlock_unlock(&st->data_collection_lock); + spinlock_unlock(&st->data_collection_lock); return enable_streaming; } @@ -797,9 +797,9 @@ static bool send_replay_chart_cmd(struct replication_request_details *r, const c rrdset_id(st), r->wanted.start_streaming ? "true" : "false", (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before); - int ret = r->caller.callback(buffer, r->caller.data); + ssize_t ret = r->caller.callback(buffer, r->caller.data); if (ret < 0) { - error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)", + netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)", rrdhost_hostname(r->host), rrdset_id(r->st), ret); return false; } @@ -1056,11 +1056,11 @@ static inline bool replication_recursive_lock_mode(char mode) { if(mode == 'L') { // (L)ock if(++recursions == 1) - netdata_spinlock_lock(&replication_globals.spinlock); + spinlock_lock(&replication_globals.spinlock); } else if(mode == 'U') { // (U)nlock if(--recursions == 0) - netdata_spinlock_unlock(&replication_globals.spinlock); + spinlock_unlock(&replication_globals.spinlock); } else if(mode == 'C') { // (C)heck if(recursions > 0) @@ -1096,7 +1096,7 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) { // ---------------------------------------------------------------------------- // replication sort entry management -static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { +static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse); __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); @@ -1120,7 +1120,7 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { } static void replication_sort_entry_add(struct replication_request *rq) { - if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { + if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) { rq->indexed_in_judy = false; rq->not_indexed_buffer_full = true; rq->not_indexed_preprocessing = false; @@ -1606,7 +1606,7 @@ static void verify_all_hosts_charts_are_streaming_now(void) { dfe_done(host); size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED); - info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", + netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.main_thread.last_executed, errors); replication_globals.main_thread.last_executed = executed; } @@ -1860,7 +1860,7 @@ void *replication_thread_main(void *ptr __maybe_unused) { int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1); if(threads < 1 || threads > MAX_REPLICATION_THREADS) { - error("replication threads given %d is invalid, resetting to 1", threads); + netdata_log_error("replication threads given %d is invalid, resetting to 1", threads); threads = 1; } |