diff options
Diffstat (limited to 'fluent-bit/lib/cmetrics/src/cmt_encode_msgpack.c')
-rw-r--r-- | fluent-bit/lib/cmetrics/src/cmt_encode_msgpack.c | 552 |
1 files changed, 552 insertions, 0 deletions
diff --git a/fluent-bit/lib/cmetrics/src/cmt_encode_msgpack.c b/fluent-bit/lib/cmetrics/src/cmt_encode_msgpack.c new file mode 100644 index 00000000..b31a78e3 --- /dev/null +++ b/fluent-bit/lib/cmetrics/src/cmt_encode_msgpack.c @@ -0,0 +1,552 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* CMetrics + * ======== + * Copyright 2021-2022 The CMetrics Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <cmetrics/cmetrics.h> +#include <cmetrics/cmt_metric.h> +#include <cmetrics/cmt_map.h> +#include <cmetrics/cmt_histogram.h> +#include <cmetrics/cmt_summary.h> +#include <cmetrics/cmt_counter.h> +#include <cmetrics/cmt_gauge.h> +#include <cmetrics/cmt_untyped.h> +#include <cmetrics/cmt_compat.h> +#include <cmetrics/cmt_encode_msgpack.h> +#include <cmetrics/cmt_variant_utils.h> + +struct cmt_map_label *create_label(char *label_text) +{ + struct cmt_map_label *new_label; + + new_label = calloc(1, sizeof(struct cmt_map_label)); + + if (NULL != new_label) { + new_label->name = cfl_sds_create(label_text); + + if (NULL == new_label->name) { + free(new_label); + + new_label = NULL; + } + } + + return new_label; +} + +static void pack_header(mpack_writer_t *writer, struct cmt *cmt, struct cmt_map *map) +{ + struct cmt_opts *opts; + struct cfl_list *head; + struct cmt_map_label *label; + size_t index; + struct cmt_summary *summary = NULL; + struct cmt_histogram *histogram = NULL; + struct cmt_counter *counter = NULL; + size_t meta_field_count; + + opts = map->opts; + meta_field_count = 4; + + if (map->type == CMT_HISTOGRAM) { + histogram = (struct cmt_histogram *) map->parent; + + meta_field_count++; + } + else if (map->type == CMT_SUMMARY) { + summary = (struct cmt_summary *) map->parent; + + meta_field_count++; + } + else if (map->type == CMT_COUNTER){ + counter = (struct cmt_counter *) map->parent; + + meta_field_count++; + } + + /* 'meta' */ + mpack_write_cstr(writer, "meta"); + mpack_start_map(writer, meta_field_count); + + /* 'ver' */ + mpack_write_cstr(writer, "ver"); + mpack_write_uint(writer, MSGPACK_ENCODER_VERSION); + + /* 'type' */ + mpack_write_cstr(writer, "type"); + mpack_write_uint(writer, map->type); + + /* 'opts' */ + mpack_write_cstr(writer, "opts"); + mpack_start_map(writer, 4); + + /* opts['ns'] */ + mpack_write_cstr(writer, "ns"); + mpack_write_cstr(writer, opts->ns); + + /* opts['subsystem'] */ + mpack_write_cstr(writer, "ss"); + mpack_write_cstr(writer, opts->subsystem); + + /* opts['name'] */ + mpack_write_cstr(writer, "name"); + mpack_write_cstr(writer, opts->name); + + /* opts['description'] */ + mpack_write_cstr(writer, "desc"); + mpack_write_cstr(writer, opts->description); + + mpack_finish_map(writer); /* 'opts' */ + + /* 'labels' (label keys) */ + mpack_write_cstr(writer, "labels"); + mpack_start_array(writer, map->label_count); + cfl_list_foreach(head, &map->label_keys) { + label = cfl_list_entry(head, struct cmt_map_label, _head); + + mpack_write_cstr(writer, label->name); + } + mpack_finish_array(writer); + + if (map->type == CMT_HISTOGRAM) { + /* 'buckets' (histogram buckets) */ + mpack_write_cstr(writer, "buckets"); + + if (histogram->buckets != NULL) { + mpack_start_array(writer, histogram->buckets->count); + + for (index = 0 ; + index < histogram->buckets->count ; + index++) { + mpack_write_double(writer, histogram->buckets->upper_bounds[index]); + } + } + else { + mpack_start_array(writer, 0); + } + + mpack_finish_array(writer); + } + else if (map->type == CMT_SUMMARY) { + /* 'quantiles' (summary quantiles) */ + mpack_write_cstr(writer, "quantiles"); + + mpack_start_array(writer, summary->quantiles_count); + + for (index = 0 ; + index < summary->quantiles_count ; + index++) { + mpack_write_double(writer, summary->quantiles[index]); + } + + mpack_finish_array(writer); + } + else if (map->type == CMT_COUNTER){ + /* aggregation_type */ + mpack_write_cstr(writer, "aggregation_type"); + mpack_write_int(writer, counter->aggregation_type); + } + + mpack_finish_map(writer); /* 'meta' */ +} + +static int pack_metric(mpack_writer_t *writer, struct cmt_map *map, struct cmt_metric *metric) +{ + int c_labels; + int s; + double val; + size_t index; + struct cfl_list *head; + struct cmt_map_label *label; + struct cmt_summary *summary; + struct cmt_histogram *histogram; + + c_labels = cfl_list_size(&metric->labels); + + s = 3; + + if (c_labels > 0) { + s++; + } + + mpack_start_map(writer, s); + + mpack_write_cstr(writer, "ts"); + mpack_write_uint(writer, metric->timestamp); + + if (map->type == CMT_HISTOGRAM) { + histogram = (struct cmt_histogram *) map->parent; + + mpack_write_cstr(writer, "histogram"); + mpack_start_map(writer, 3); + + mpack_write_cstr(writer, "buckets"); + mpack_start_array(writer, histogram->buckets->count + 1); + for (index = 0 ; + index <= histogram->buckets->count ; + index++) { + mpack_write_uint(writer, cmt_metric_hist_get_value(metric, index)); + } + + mpack_finish_array(writer); + + mpack_write_cstr(writer, "sum"); + mpack_write_double(writer, cmt_metric_hist_get_sum_value(metric)); + + mpack_write_cstr(writer, "count"); + mpack_write_uint(writer, cmt_metric_hist_get_count_value(metric)); + + mpack_finish_map(writer); /* 'histogram' */ + } + else if (map->type == CMT_SUMMARY) { + summary = (struct cmt_summary *) map->parent; + + mpack_write_cstr(writer, "summary"); + mpack_start_map(writer, 4); + + mpack_write_cstr(writer, "quantiles_set"); + mpack_write_uint(writer, metric->sum_quantiles_set); + + mpack_write_cstr(writer, "quantiles"); + mpack_start_array(writer, summary->quantiles_count); + + for (index = 0 ; index < summary->quantiles_count ; index++) { + mpack_write_uint(writer, metric->sum_quantiles[index]); + } + + mpack_finish_array(writer); + + mpack_write_cstr(writer, "count"); + mpack_write_uint(writer, cmt_summary_get_count_value(metric)); + + mpack_write_cstr(writer, "sum"); + mpack_write_uint(writer, metric->sum_sum); + + mpack_finish_map(writer); /* 'summary' */ + } + else { + mpack_write_cstr(writer, "value"); + val = cmt_metric_get_value(metric); + mpack_write_double(writer, val); + } + + s = cfl_list_size(&metric->labels); + if (s > 0) { + mpack_write_cstr(writer, "labels"); + mpack_start_array(writer, c_labels); + + cfl_list_foreach(head, &metric->labels) { + label = cfl_list_entry(head, struct cmt_map_label, _head); + + if (label->name != NULL) { + mpack_write_cstr(writer, label->name); + } + else { + mpack_write_nil(writer); + } + } + + mpack_finish_array(writer); + } + + mpack_write_cstr(writer, "hash"); + mpack_write_uint(writer, metric->hash); + + mpack_finish_map(writer); + + return 0; +} + +static int pack_basic_type(mpack_writer_t *writer, struct cmt *cmt, struct cmt_map *map) +{ + int values_size = 0; + struct cfl_list *head; + struct cmt_metric *metric; + + /* metric scope dictionary that holds meta and values*/ + mpack_start_map(writer, 2); + + pack_header(writer, cmt, map); + + if (map->metric_static_set) { + values_size++; + } + values_size += cfl_list_size(&map->metrics); + + mpack_write_cstr(writer, "values"); + mpack_start_array(writer, values_size); + + if (map->metric_static_set) { + pack_metric(writer, map, &map->metric); + } + + cfl_list_foreach(head, &map->metrics) { + metric = cfl_list_entry(head, struct cmt_metric, _head); + pack_metric(writer, map, metric); + } + mpack_finish_array(writer); + + mpack_finish_map(writer); + + return 0; +} + +static void pack_static_labels(mpack_writer_t *writer, struct cmt *cmt) +{ + struct cmt_label *static_label; + struct cfl_list *head; + + /* 'static_labels' (static labels) */ + mpack_write_cstr(writer, "static_labels"); + + mpack_start_array(writer, cfl_list_size(&cmt->static_labels->list)); + + cfl_list_foreach(head, &cmt->static_labels->list) { + static_label = cfl_list_entry(head, struct cmt_label, _head); + + mpack_start_array(writer, 2); + + mpack_write_cstr(writer, static_label->key); + mpack_write_cstr(writer, static_label->val); + + mpack_finish_array(writer); + } + + mpack_finish_array(writer); +} + +static int pack_static_processing_section(mpack_writer_t *writer, struct cmt *cmt) +{ + mpack_write_cstr(writer, "processing"); + + mpack_start_map(writer, 1); + + pack_static_labels(writer, cmt); + + mpack_finish_map(writer); /* 'processing' */ + + return 0; +} + +static int pack_context_header(mpack_writer_t *writer, struct cmt *cmt) +{ + int result; + + mpack_write_cstr(writer, "meta"); + mpack_start_map(writer, 3); + + mpack_write_cstr(writer, "cmetrics"); + result = pack_cfl_variant_kvlist(writer, cmt->internal_metadata); + + if (result != 0) { + return -1; + } + + mpack_write_cstr(writer, "external"); + result = pack_cfl_variant_kvlist(writer, cmt->external_metadata); + + if (result != 0) { + return -2; + } + + pack_static_processing_section(writer, cmt); + + mpack_finish_map(writer); /* 'context_header' */ + + return 0; +} + +static int pack_context_metrics(mpack_writer_t *writer, struct cmt *cmt) +{ + size_t metric_count; + struct cmt_histogram *histogram; + struct cmt_summary *summary; + struct cmt_untyped *untyped; + struct cmt_counter *counter; + struct cmt_gauge *gauge; + struct cfl_list *head; + + metric_count = 0; + metric_count += cfl_list_size(&cmt->counters); + metric_count += cfl_list_size(&cmt->gauges); + metric_count += cfl_list_size(&cmt->untypeds); + metric_count += cfl_list_size(&cmt->summaries); + metric_count += cfl_list_size(&cmt->histograms); + + mpack_write_cstr(writer, "metrics"); + mpack_start_array(writer, metric_count); + + /* Counters */ + cfl_list_foreach(head, &cmt->counters) { + counter = cfl_list_entry(head, struct cmt_counter, _head); + pack_basic_type(writer, cmt, counter->map); + } + + /* Gauges */ + cfl_list_foreach(head, &cmt->gauges) { + gauge = cfl_list_entry(head, struct cmt_gauge, _head); + pack_basic_type(writer, cmt, gauge->map); + } + + /* Untyped */ + cfl_list_foreach(head, &cmt->untypeds) { + untyped = cfl_list_entry(head, struct cmt_untyped, _head); + pack_basic_type(writer, cmt, untyped->map); + } + + /* Summary */ + cfl_list_foreach(head, &cmt->summaries) { + summary = cfl_list_entry(head, struct cmt_summary, _head); + pack_basic_type(writer, cmt, summary->map); + } + + /* Histogram */ + cfl_list_foreach(head, &cmt->histograms) { + histogram = cfl_list_entry(head, struct cmt_histogram, _head); + pack_basic_type(writer, cmt, histogram->map); + } + + mpack_finish_array(writer); + + return 0; +} + +static int pack_context(mpack_writer_t *writer, struct cmt *cmt) +{ + int result; + + mpack_start_map(writer, 2); + + result = pack_context_header(writer, cmt); + + if (result != 0) { + return -1; + } + + result = pack_context_metrics(writer, cmt); + + if (result != 0) { + return -2; + } + + mpack_finish_map(writer); /* outermost context scope */ + + return 0; +} + +/* Takes a cmetrics context and serialize it using msgpack */ +int cmt_encode_msgpack_create(struct cmt *cmt, char **out_buf, size_t *out_size) +{ + char *data; + size_t size; + mpack_writer_t writer; + int result; + + /* + * CMetrics data schema + + { + 'meta' => { + 'cmetrics' => { + 'producer': STRING + }, + 'external' => { ... }, + 'processing' => { + 'static_labels' => [ + [STRING, STRING], ... + ] + } + }, + 'metrics' => [ + { + 'meta' => { + 'ver' => INTEGER + 'type' => INTEGER + '0' = counter + '1' = gauge + '2' = histogram (WIP) + 'opts' => { + 'ns' => ns + 'subsystem' => subsystem + 'name' => name + 'description' => description + }, + 'label_keys' => [STRING, ...], + 'buckets' => [n, ...] + }, + 'values' => [ + { + 'ts' : nanosec timestamp, + 'value': float64 value, + 'label_values': [STRING, ...], + 'histogram':{ + 'sum': float64, + 'count': uint64, + 'buckets': [n, ...] + }, + 'summary': { + 'sum': float64, + 'count': uint64, + 'quantiles': [n, ...], + 'quantiles_set': uint64 + }, + 'hash': uint64 value + } + ] + }, ... + ] + } + * + * + * The following fields are metric type specific and are only + * included for histograms : + * meta->buckets + * values[n]->buckets + * values[n]->count + * values[n]->sum + */ + + if (cmt == NULL) { + return -1; + } + + mpack_writer_init_growable(&writer, &data, &size); + + result = pack_context(&writer, cmt); + + if (mpack_writer_destroy(&writer) != mpack_ok) { + fprintf(stderr, "An error occurred encoding the data!\n"); + + return -1; + } + + if (result != 0) { + return result; + } + + *out_buf = data; + *out_size = size; + + return 0; +} + +void cmt_encode_msgpack_destroy(char *out_buf) +{ + if (NULL != out_buf) { + MPACK_FREE(out_buf); + } +} |