diff options
Diffstat (limited to 'collectors/ebpf.plugin/ebpf_functions.c')
-rw-r--r-- | collectors/ebpf.plugin/ebpf_functions.c | 836 |
1 files changed, 755 insertions, 81 deletions
diff --git a/collectors/ebpf.plugin/ebpf_functions.c b/collectors/ebpf.plugin/ebpf_functions.c index 7a43692bc..ebee66395 100644 --- a/collectors/ebpf.plugin/ebpf_functions.c +++ b/collectors/ebpf.plugin/ebpf_functions.c @@ -4,6 +4,40 @@ #include "ebpf_functions.h" /***************************************************************** + * EBPF FUNCTION COMMON + *****************************************************************/ + +/** + * Function Start thread + * + * Start a specific thread after user request. + * + * @param em The structure with thread information + * @param period + * @return + */ +static int ebpf_function_start_thread(ebpf_module_t *em, int period) +{ + struct netdata_static_thread *st = em->thread; + // another request for thread that already ran, cleanup and restart + if (st->thread) + freez(st->thread); + + if (period <= 0) + period = EBPF_DEFAULT_LIFETIME; + + st->thread = mallocz(sizeof(netdata_thread_t)); + em->enabled = NETDATA_THREAD_EBPF_FUNCTION_RUNNING; + em->lifetime = period; + +#ifdef NETDATA_INTERNAL_CHECKS + netdata_log_info("Starting thread %s with lifetime = %d", em->info.thread_name, period); +#endif + + return netdata_thread_create(st->thread, st->name, NETDATA_THREAD_OPTION_DEFAULT, st->start_routine, em); +} + +/***************************************************************** * EBPF SELECT MODULE *****************************************************************/ @@ -13,17 +47,17 @@ * @param thread_name name of the thread we are looking for. * * @return it returns a pointer for the module that has thread_name on success or NULL otherwise. - */ ebpf_module_t *ebpf_functions_select_module(const char *thread_name) { int i; for (i = 0; i < EBPF_MODULE_FUNCTION_IDX; i++) { - if (strcmp(ebpf_modules[i].thread_name, thread_name) == 0) { + if (strcmp(ebpf_modules[i].info.thread_name, thread_name) == 0) { return &ebpf_modules[i]; } } return NULL; } + */ /***************************************************************** * EBPF HELP FUNCTIONS @@ -35,11 +69,9 @@ ebpf_module_t *ebpf_functions_select_module(const char *thread_name) { * Shows help with all options accepted by thread function. * * @param transaction the transaction id that Netdata sent for this function execution -*/ static void ebpf_function_thread_manipulation_help(const char *transaction) { - pthread_mutex_lock(&lock); - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600); - fprintf(stdout, "%s", + BUFFER *wb = buffer_create(0, NULL); + buffer_sprintf(wb, "%s", "ebpf.plugin / thread\n" "\n" "Function `thread` allows user to control eBPF threads.\n" @@ -57,13 +89,13 @@ static void ebpf_function_thread_manipulation_help(const char *transaction) { " Disable a sp.\n" "\n" "Filters can be combined. Each filter can be given only one time.\n" - "Process thread is not controlled by functions until we finish the creation of functions per thread..\n" ); - pluginsd_function_result_end_to_stdout(); - fflush(stdout); - pthread_mutex_unlock(&lock); -} + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb); + + buffer_free(wb); +} +*/ /***************************************************************** * EBPF ERROR FUNCTIONS @@ -79,12 +111,7 @@ static void ebpf_function_thread_manipulation_help(const char *transaction) { * @param msg the error message */ static void ebpf_function_error(const char *transaction, int code, const char *msg) { - char buffer[PLUGINSD_LINE_MAX + 1]; - json_escape_string(buffer, msg, PLUGINSD_LINE_MAX); - - pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec()); - fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); - pluginsd_function_result_end_to_stdout(); + pluginsd_function_json_error_to_stdout(transaction, code, msg); } /***************************************************************** @@ -92,7 +119,7 @@ static void ebpf_function_error(const char *transaction, int code, const char *m *****************************************************************/ /** - * Function enable + * Function: thread * * Enable a specific thread. * @@ -102,7 +129,6 @@ static void ebpf_function_error(const char *transaction, int code, const char *m * @param line_max Number of arguments given * @param timeout The function timeout * @param em The structure with thread information - */ static void ebpf_function_thread_manipulation(const char *transaction, char *function __maybe_unused, char *line_buffer __maybe_unused, @@ -141,27 +167,15 @@ static void ebpf_function_thread_manipulation(const char *transaction, pthread_mutex_lock(&ebpf_exit_cleanup); if (lem->enabled > NETDATA_THREAD_EBPF_FUNCTION_RUNNING) { - struct netdata_static_thread *st = lem->thread; // Load configuration again ebpf_update_module(lem, default_btf, running_on_kernel, isrh); - // another request for thread that already ran, cleanup and restart - if (st->thread) - freez(st->thread); - - if (period <= 0) - period = EBPF_DEFAULT_LIFETIME; - - st->thread = mallocz(sizeof(netdata_thread_t)); - lem->enabled = NETDATA_THREAD_EBPF_FUNCTION_RUNNING; - lem->lifetime = period; - -#ifdef NETDATA_INTERNAL_CHECKS - netdata_log_info("Starting thread %s with lifetime = %d", thread_name, period); -#endif - - netdata_thread_create(st->thread, st->name, NETDATA_THREAD_OPTION_DEFAULT, - st->start_routine, lem); + if (ebpf_function_start_thread(lem, period)) { + ebpf_function_error(transaction, + HTTP_RESP_INTERNAL_SERVER_ERROR, + "Cannot start thread."); + return; + } } else { lem->running_time = 0; if (period > 0) // user is modifying period to run @@ -226,10 +240,10 @@ static void ebpf_function_thread_manipulation(const char *transaction, // THE ORDER SHOULD BE THE SAME WITH THE FIELDS! // thread name - buffer_json_add_array_item_string(wb, wem->thread_name); + buffer_json_add_array_item_string(wb, wem->info.thread_name); // description - buffer_json_add_array_item_string(wb, wem->thread_description); + buffer_json_add_array_item_string(wb, wem->info.thread_description); // Either it is not running or received a disabled signal and it is stopping. if (wem->enabled > NETDATA_THREAD_EBPF_FUNCTION_RUNNING || (!wem->lifetime && (int)wem->running_time == wem->update_every)) { @@ -267,7 +281,7 @@ static void ebpf_function_thread_manipulation(const char *transaction, RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, RRDF_FIELD_FILTER_MULTISELECT, - RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, NULL); + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY | RRDF_FIELD_OPTS_UNIQUE_KEY, NULL); buffer_rrdf_table_add_field(wb, fields_id++, "Description", "Thread Desc", RRDF_FIELD_TYPE_STRING, RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, @@ -349,19 +363,697 @@ static void ebpf_function_thread_manipulation(const char *transaction, buffer_json_finalize(wb); // Lock necessary to avoid race condition - pthread_mutex_lock(&lock); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires, wb); + + buffer_free(wb); +} + */ + +/***************************************************************** + * EBPF SOCKET FUNCTION + *****************************************************************/ + +/** + * Thread Help + * + * Shows help with all options accepted by thread function. + * + * @param transaction the transaction id that Netdata sent for this function execution +*/ +static void ebpf_function_socket_help(const char *transaction) { + pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600); + fprintf(stdout, "%s", + "ebpf.plugin / socket\n" + "\n" + "Function `socket` display information for all open sockets during ebpf.plugin runtime.\n" + "During thread runtime the plugin is always collecting data, but when an option is modified, the plugin\n" + "resets completely the previous table and can show a clean data for the first request before to bring the\n" + "modified request.\n" + "\n" + "The following filters are supported:\n" + "\n" + " family:FAMILY\n" + " Shows information for the FAMILY specified. Option accepts IPV4, IPV6 and all, that is the default.\n" + "\n" + " period:PERIOD\n" + " Enable socket to run a specific PERIOD in seconds. When PERIOD is not\n" + " specified plugin will use the default 300 seconds\n" + "\n" + " resolve:BOOL\n" + " Resolve service name, default value is YES.\n" + "\n" + " range:CIDR\n" + " Show sockets that have only a specific destination. Default all addresses.\n" + "\n" + " port:range\n" + " Show sockets that have only a specific destination.\n" + "\n" + " reset\n" + " Send a reset to collector. When a collector receives this command, it uses everything defined in configuration file.\n" + "\n" + " interfaces\n" + " When the collector receives this command, it read all available interfaces on host.\n" + "\n" + "Filters can be combined. Each filter can be given only one time. Default all ports\n" + ); + pluginsd_function_result_end_to_stdout(); + fflush(stdout); +} + +/** + * Fill Fake socket + * + * Fill socket with an invalid request. + * + * @param fake_values is the structure where we are storing the value. + */ +static inline void ebpf_socket_fill_fake_socket(netdata_socket_plus_t *fake_values) +{ + snprintfz(fake_values->socket_string.src_ip, INET6_ADDRSTRLEN, "%s", "127.0.0.1"); + snprintfz(fake_values->socket_string.dst_ip, INET6_ADDRSTRLEN, "%s", "127.0.0.1"); + fake_values->pid = getpid(); + //fake_values->socket_string.src_port = 0; + fake_values->socket_string.dst_port[0] = 0; + snprintfz(fake_values->socket_string.dst_ip, NI_MAXSERV, "%s", "none"); + fake_values->data.family = AF_INET; + fake_values->data.protocol = AF_UNSPEC; +} + +/** + * Fill function buffer + * + * Fill buffer with data to be shown on cloud. + * + * @param wb buffer where we store data. + * @param values data read from hash table + * @param name the process name + */ +static void ebpf_fill_function_buffer(BUFFER *wb, netdata_socket_plus_t *values, char *name) +{ + buffer_json_add_array_item_array(wb); + + // IMPORTANT! + // THE ORDER SHOULD BE THE SAME WITH THE FIELDS! + + // PID + buffer_json_add_array_item_uint64(wb, (uint64_t)values->pid); + + // NAME + buffer_json_add_array_item_string(wb, (name) ? name : "not identified"); + + // Origin + buffer_json_add_array_item_string(wb, (values->data.external_origin) ? "incoming" : "outgoing"); + + // Source IP + buffer_json_add_array_item_string(wb, values->socket_string.src_ip); + + // SRC Port + //buffer_json_add_array_item_uint64(wb, (uint64_t) values->socket_string.src_port); + + // Destination IP + buffer_json_add_array_item_string(wb, values->socket_string.dst_ip); + + // DST Port + buffer_json_add_array_item_string(wb, values->socket_string.dst_port); + + uint64_t connections; + if (values->data.protocol == IPPROTO_TCP) { + // Protocol + buffer_json_add_array_item_string(wb, "TCP"); + + // Bytes received + buffer_json_add_array_item_uint64(wb, (uint64_t) values->data.tcp.tcp_bytes_received); + + // Bytes sent + buffer_json_add_array_item_uint64(wb, (uint64_t) values->data.tcp.tcp_bytes_sent); + + // Connections + connections = values->data.tcp.ipv4_connect + values->data.tcp.ipv6_connect; + } else if (values->data.protocol == IPPROTO_UDP) { + // Protocol + buffer_json_add_array_item_string(wb, "UDP"); + + // Bytes received + buffer_json_add_array_item_uint64(wb, (uint64_t) values->data.udp.udp_bytes_received); + + // Bytes sent + buffer_json_add_array_item_uint64(wb, (uint64_t) values->data.udp.udp_bytes_sent); + + // Connections + connections = values->data.udp.call_udp_sent + values->data.udp.call_udp_received; + } else { + // Protocol + buffer_json_add_array_item_string(wb, "UNSPEC"); + + // Bytes received + buffer_json_add_array_item_uint64(wb, 0); + + // Bytes sent + buffer_json_add_array_item_uint64(wb, 0); + + connections = 1; + } + + // Connections + if (values->flags & NETDATA_SOCKET_FLAGS_ALREADY_OPEN) { + connections++; + } else if (!connections) { + // If no connections, this means that we lost when connection was opened + values->flags |= NETDATA_SOCKET_FLAGS_ALREADY_OPEN; + connections++; + } + buffer_json_add_array_item_uint64(wb, connections); + + buffer_json_array_close(wb); +} + +/** + * Clean Judy array unsafe + * + * Clean all Judy Array allocated to show table when a function is called. + * Before to call this function it is necessary to lock `ebpf_judy_pid.index.rw_spinlock`. + **/ +static void ebpf_socket_clean_judy_array_unsafe() +{ + if (!ebpf_judy_pid.index.JudyLArray) + return; + + Pvoid_t *pid_value, *socket_value; + Word_t local_pid = 0, local_socket = 0; + bool first_pid = true, first_socket = true; + while ((pid_value = JudyLFirstThenNext(ebpf_judy_pid.index.JudyLArray, &local_pid, &first_pid))) { + netdata_ebpf_judy_pid_stats_t *pid_ptr = (netdata_ebpf_judy_pid_stats_t *)*pid_value; + rw_spinlock_write_lock(&pid_ptr->socket_stats.rw_spinlock); + if (pid_ptr->socket_stats.JudyLArray) { + while ((socket_value = JudyLFirstThenNext(pid_ptr->socket_stats.JudyLArray, &local_socket, &first_socket))) { + netdata_socket_plus_t *socket_clean = *socket_value; + aral_freez(aral_socket_table, socket_clean); + } + JudyLFreeArray(&pid_ptr->socket_stats.JudyLArray, PJE0); + pid_ptr->socket_stats.JudyLArray = NULL; + } + rw_spinlock_write_unlock(&pid_ptr->socket_stats.rw_spinlock); + } +} + +/** + * Fill function buffer unsafe + * + * Fill the function buffer with socket information. Before to call this function it is necessary to lock + * ebpf_judy_pid.index.rw_spinlock + * + * @param buf buffer used to store data to be shown by function. + * + * @return it returns 0 on success and -1 otherwise. + */ +static void ebpf_socket_fill_function_buffer_unsafe(BUFFER *buf) +{ + int counter = 0; + + Pvoid_t *pid_value, *socket_value; + Word_t local_pid = 0; + bool first_pid = true; + while ((pid_value = JudyLFirstThenNext(ebpf_judy_pid.index.JudyLArray, &local_pid, &first_pid))) { + netdata_ebpf_judy_pid_stats_t *pid_ptr = (netdata_ebpf_judy_pid_stats_t *)*pid_value; + bool first_socket = true; + Word_t local_timestamp = 0; + rw_spinlock_read_lock(&pid_ptr->socket_stats.rw_spinlock); + if (pid_ptr->socket_stats.JudyLArray) { + while ((socket_value = JudyLFirstThenNext(pid_ptr->socket_stats.JudyLArray, &local_timestamp, &first_socket))) { + netdata_socket_plus_t *values = (netdata_socket_plus_t *)*socket_value; + ebpf_fill_function_buffer(buf, values, pid_ptr->cmdline); + } + counter++; + } + rw_spinlock_read_unlock(&pid_ptr->socket_stats.rw_spinlock); + } + + if (!counter) { + netdata_socket_plus_t fake_values = { }; + ebpf_socket_fill_fake_socket(&fake_values); + ebpf_fill_function_buffer(buf, &fake_values, NULL); + } +} + +/** + * Socket read hash + * + * This is the thread callback. + * This thread is necessary, because we cannot freeze the whole plugin to read the data on very busy socket. + * + * @param buf the buffer to store data; + * @param em the module main structure. + * + * @return It always returns NULL. + */ +void ebpf_socket_read_open_connections(BUFFER *buf, struct ebpf_module *em) +{ + // thread was not initialized or Array was reset + rw_spinlock_read_lock(&ebpf_judy_pid.index.rw_spinlock); + if (!em->maps || (em->maps[NETDATA_SOCKET_OPEN_SOCKET].map_fd == ND_EBPF_MAP_FD_NOT_INITIALIZED) || + !ebpf_judy_pid.index.JudyLArray){ + netdata_socket_plus_t fake_values = { }; + + ebpf_socket_fill_fake_socket(&fake_values); + + ebpf_fill_function_buffer(buf, &fake_values, NULL); + rw_spinlock_read_unlock(&ebpf_judy_pid.index.rw_spinlock); + return; + } + + rw_spinlock_read_lock(&network_viewer_opt.rw_spinlock); + ebpf_socket_fill_function_buffer_unsafe(buf); + rw_spinlock_read_unlock(&network_viewer_opt.rw_spinlock); + rw_spinlock_read_unlock(&ebpf_judy_pid.index.rw_spinlock); +} + +/** + * Function: Socket + * + * Show information for sockets stored in hash tables. + * + * @param transaction the transaction id that Netdata sent for this function execution + * @param function function name and arguments given to thread. + * @param timeout The function timeout + * @param cancelled Variable used to store function status. + */ +static void ebpf_function_socket_manipulation(const char *transaction, + char *function __maybe_unused, + int timeout __maybe_unused, + bool *cancelled __maybe_unused) +{ + UNUSED(timeout); + ebpf_module_t *em = &ebpf_modules[EBPF_MODULE_SOCKET_IDX]; + + char *words[PLUGINSD_MAX_WORDS] = {NULL}; + size_t num_words = quoted_strings_splitter_pluginsd(function, words, PLUGINSD_MAX_WORDS); + const char *name; + int period = -1; + rw_spinlock_write_lock(&ebpf_judy_pid.index.rw_spinlock); + network_viewer_opt.enabled = CONFIG_BOOLEAN_YES; + uint32_t previous; + + for (int i = 1; i < PLUGINSD_MAX_WORDS; i++) { + const char *keyword = get_word(words, num_words, i); + if (!keyword) + break; + + if (strncmp(keyword, EBPF_FUNCTION_SOCKET_FAMILY, sizeof(EBPF_FUNCTION_SOCKET_FAMILY) - 1) == 0) { + name = &keyword[sizeof(EBPF_FUNCTION_SOCKET_FAMILY) - 1]; + previous = network_viewer_opt.family; + uint32_t family = AF_UNSPEC; + if (!strcmp(name, "IPV4")) + family = AF_INET; + else if (!strcmp(name, "IPV6")) + family = AF_INET6; + + if (family != previous) { + rw_spinlock_write_lock(&network_viewer_opt.rw_spinlock); + network_viewer_opt.family = family; + rw_spinlock_write_unlock(&network_viewer_opt.rw_spinlock); + ebpf_socket_clean_judy_array_unsafe(); + } + } else if (strncmp(keyword, EBPF_FUNCTION_SOCKET_PERIOD, sizeof(EBPF_FUNCTION_SOCKET_PERIOD) - 1) == 0) { + name = &keyword[sizeof(EBPF_FUNCTION_SOCKET_PERIOD) - 1]; + pthread_mutex_lock(&ebpf_exit_cleanup); + period = str2i(name); + if (period > 0) { + em->lifetime = period; + } else + em->lifetime = EBPF_NON_FUNCTION_LIFE_TIME; + +#ifdef NETDATA_DEV_MODE + collector_info("Lifetime modified for %u", em->lifetime); +#endif + pthread_mutex_unlock(&ebpf_exit_cleanup); + } else if (strncmp(keyword, EBPF_FUNCTION_SOCKET_RESOLVE, sizeof(EBPF_FUNCTION_SOCKET_RESOLVE) - 1) == 0) { + previous = network_viewer_opt.service_resolution_enabled; + uint32_t resolution; + name = &keyword[sizeof(EBPF_FUNCTION_SOCKET_RESOLVE) - 1]; + resolution = (!strcasecmp(name, "YES")) ? CONFIG_BOOLEAN_YES : CONFIG_BOOLEAN_NO; + + if (previous != resolution) { + rw_spinlock_write_lock(&network_viewer_opt.rw_spinlock); + network_viewer_opt.service_resolution_enabled = resolution; + rw_spinlock_write_unlock(&network_viewer_opt.rw_spinlock); + + ebpf_socket_clean_judy_array_unsafe(); + } + } else if (strncmp(keyword, EBPF_FUNCTION_SOCKET_RANGE, sizeof(EBPF_FUNCTION_SOCKET_RANGE) - 1) == 0) { + name = &keyword[sizeof(EBPF_FUNCTION_SOCKET_RANGE) - 1]; + rw_spinlock_write_lock(&network_viewer_opt.rw_spinlock); + ebpf_clean_ip_structure(&network_viewer_opt.included_ips); + ebpf_clean_ip_structure(&network_viewer_opt.excluded_ips); + ebpf_parse_ips_unsafe((char *)name); + rw_spinlock_write_unlock(&network_viewer_opt.rw_spinlock); + + ebpf_socket_clean_judy_array_unsafe(); + } else if (strncmp(keyword, EBPF_FUNCTION_SOCKET_PORT, sizeof(EBPF_FUNCTION_SOCKET_PORT) - 1) == 0) { + name = &keyword[sizeof(EBPF_FUNCTION_SOCKET_PORT) - 1]; + rw_spinlock_write_lock(&network_viewer_opt.rw_spinlock); + ebpf_clean_port_structure(&network_viewer_opt.included_port); + ebpf_clean_port_structure(&network_viewer_opt.excluded_port); + ebpf_parse_ports((char *)name); + rw_spinlock_write_unlock(&network_viewer_opt.rw_spinlock); + + ebpf_socket_clean_judy_array_unsafe(); + } else if (strncmp(keyword, EBPF_FUNCTION_SOCKET_RESET, sizeof(EBPF_FUNCTION_SOCKET_RESET) - 1) == 0) { + rw_spinlock_write_lock(&network_viewer_opt.rw_spinlock); + ebpf_clean_port_structure(&network_viewer_opt.included_port); + ebpf_clean_port_structure(&network_viewer_opt.excluded_port); + + ebpf_clean_ip_structure(&network_viewer_opt.included_ips); + ebpf_clean_ip_structure(&network_viewer_opt.excluded_ips); + ebpf_clean_ip_structure(&network_viewer_opt.ipv4_local_ip); + ebpf_clean_ip_structure(&network_viewer_opt.ipv6_local_ip); + + parse_network_viewer_section(&socket_config); + ebpf_read_local_addresses_unsafe(); + network_viewer_opt.enabled = CONFIG_BOOLEAN_YES; + rw_spinlock_write_unlock(&network_viewer_opt.rw_spinlock); + } else if (strncmp(keyword, EBPF_FUNCTION_SOCKET_INTERFACES, sizeof(EBPF_FUNCTION_SOCKET_INTERFACES) - 1) == 0) { + rw_spinlock_write_lock(&network_viewer_opt.rw_spinlock); + ebpf_read_local_addresses_unsafe(); + rw_spinlock_write_unlock(&network_viewer_opt.rw_spinlock); + } else if (strncmp(keyword, "help", 4) == 0) { + ebpf_function_socket_help(transaction); + rw_spinlock_write_unlock(&ebpf_judy_pid.index.rw_spinlock); + return; + } + } + rw_spinlock_write_unlock(&ebpf_judy_pid.index.rw_spinlock); + + pthread_mutex_lock(&ebpf_exit_cleanup); + if (em->enabled > NETDATA_THREAD_EBPF_FUNCTION_RUNNING) { + // Cleanup when we already had a thread running + rw_spinlock_write_lock(&ebpf_judy_pid.index.rw_spinlock); + ebpf_socket_clean_judy_array_unsafe(); + rw_spinlock_write_unlock(&ebpf_judy_pid.index.rw_spinlock); + + if (ebpf_function_start_thread(em, period)) { + ebpf_function_error(transaction, + HTTP_RESP_INTERNAL_SERVER_ERROR, + "Cannot start thread."); + pthread_mutex_unlock(&ebpf_exit_cleanup); + return; + } + } else { + if (period < 0 && em->lifetime < EBPF_NON_FUNCTION_LIFE_TIME) { + em->lifetime = EBPF_NON_FUNCTION_LIFE_TIME; + } + } + pthread_mutex_unlock(&ebpf_exit_cleanup); + + time_t expires = now_realtime_sec() + em->update_every; + + BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL); + buffer_json_initialize(wb, "\"", "\"", 0, true, false); + buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); + buffer_json_member_add_string(wb, "type", "table"); + buffer_json_member_add_time_t(wb, "update_every", em->update_every); + buffer_json_member_add_string(wb, "help", EBPF_PLUGIN_SOCKET_FUNCTION_DESCRIPTION); + + // Collect data + buffer_json_member_add_array(wb, "data"); + ebpf_socket_read_open_connections(wb, em); + buffer_json_array_close(wb); // data + + buffer_json_member_add_object(wb, "columns"); + { + int fields_id = 0; + + // IMPORTANT! + // THE ORDER SHOULD BE THE SAME WITH THE VALUES! + buffer_rrdf_table_add_field(wb, fields_id++, "PID", "Process ID", RRDF_FIELD_TYPE_INTEGER, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NUMBER, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, + NULL); + + buffer_rrdf_table_add_field(wb, fields_id++, "Process Name", "Process Name", RRDF_FIELD_TYPE_STRING, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, NULL); + + buffer_rrdf_table_add_field(wb, fields_id++, "Origin", "The connection origin.", RRDF_FIELD_TYPE_STRING, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, NULL); + + buffer_rrdf_table_add_field(wb, fields_id++, "Request from", "Request from IP", RRDF_FIELD_TYPE_STRING, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, NULL); + + /* + buffer_rrdf_table_add_field(wb, fields_id++, "SRC PORT", "Source Port", RRDF_FIELD_TYPE_INTEGER, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NUMBER, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, + NULL); + */ + + buffer_rrdf_table_add_field(wb, fields_id++, "Destination IP", "Destination IP", RRDF_FIELD_TYPE_STRING, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, NULL); + + buffer_rrdf_table_add_field(wb, fields_id++, "Destination Port", "Destination Port", RRDF_FIELD_TYPE_STRING, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, NULL); + + buffer_rrdf_table_add_field(wb, fields_id++, "Protocol", "Communication protocol", RRDF_FIELD_TYPE_STRING, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NONE, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, NULL); + + buffer_rrdf_table_add_field(wb, fields_id++, "Incoming Bandwidth", "Bytes received.", RRDF_FIELD_TYPE_INTEGER, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NUMBER, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, + NULL); + + buffer_rrdf_table_add_field(wb, fields_id++, "Outgoing Bandwidth", "Bytes sent.", RRDF_FIELD_TYPE_INTEGER, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NUMBER, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, + NULL); + + buffer_rrdf_table_add_field(wb, fields_id, "Connections", "Number of calls to tcp_vX_connections and udp_sendmsg, where X is the protocol version.", RRDF_FIELD_TYPE_INTEGER, + RRDF_FIELD_VISUAL_VALUE, RRDF_FIELD_TRANSFORM_NUMBER, 0, NULL, NAN, + RRDF_FIELD_SORT_ASCENDING, NULL, RRDF_FIELD_SUMMARY_COUNT, + RRDF_FIELD_FILTER_MULTISELECT, + RRDF_FIELD_OPTS_VISIBLE | RRDF_FIELD_OPTS_STICKY, + NULL); + } + buffer_json_object_close(wb); // columns + + buffer_json_member_add_object(wb, "charts"); + { + // OutBound Connections + buffer_json_member_add_object(wb, "IPInboundConn"); + { + buffer_json_member_add_string(wb, "name", "TCP Inbound Connection"); + buffer_json_member_add_string(wb, "type", "line"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "connected_tcp"); + buffer_json_add_array_item_string(wb, "connected_udp"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // OutBound Connections + buffer_json_member_add_object(wb, "IPTCPOutboundConn"); + { + buffer_json_member_add_string(wb, "name", "TCP Outbound Connection"); + buffer_json_member_add_string(wb, "type", "line"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "connected_V4"); + buffer_json_add_array_item_string(wb, "connected_V6"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // TCP Functions + buffer_json_member_add_object(wb, "TCPFunctions"); + { + buffer_json_member_add_string(wb, "name", "TCPFunctions"); + buffer_json_member_add_string(wb, "type", "line"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "received"); + buffer_json_add_array_item_string(wb, "sent"); + buffer_json_add_array_item_string(wb, "close"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // TCP Bandwidth + buffer_json_member_add_object(wb, "TCPBandwidth"); + { + buffer_json_member_add_string(wb, "name", "TCPBandwidth"); + buffer_json_member_add_string(wb, "type", "line"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "received"); + buffer_json_add_array_item_string(wb, "sent"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // UDP Functions + buffer_json_member_add_object(wb, "UDPFunctions"); + { + buffer_json_member_add_string(wb, "name", "UDPFunctions"); + buffer_json_member_add_string(wb, "type", "line"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "received"); + buffer_json_add_array_item_string(wb, "sent"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // UDP Bandwidth + buffer_json_member_add_object(wb, "UDPBandwidth"); + { + buffer_json_member_add_string(wb, "name", "UDPBandwidth"); + buffer_json_member_add_string(wb, "type", "line"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "received"); + buffer_json_add_array_item_string(wb, "sent"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + } + buffer_json_object_close(wb); // charts + + buffer_json_member_add_string(wb, "default_sort_column", "PID"); + + // Do we use only on fields that can be groupped? + buffer_json_member_add_object(wb, "group_by"); + { + // group by PID + buffer_json_member_add_object(wb, "PID"); + { + buffer_json_member_add_string(wb, "name", "Process ID"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "PID"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // group by Process Name + buffer_json_member_add_object(wb, "Process Name"); + { + buffer_json_member_add_string(wb, "name", "Process Name"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "Process Name"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // group by Process Name + buffer_json_member_add_object(wb, "Origin"); + { + buffer_json_member_add_string(wb, "name", "Origin"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "Origin"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // group by Request From IP + buffer_json_member_add_object(wb, "Request from"); + { + buffer_json_member_add_string(wb, "name", "Request from IP"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "Request from"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // group by Destination IP + buffer_json_member_add_object(wb, "Destination IP"); + { + buffer_json_member_add_string(wb, "name", "Destination IP"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "Destination IP"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // group by DST Port + buffer_json_member_add_object(wb, "Destination Port"); + { + buffer_json_member_add_string(wb, "name", "Destination Port"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "Destination Port"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + + // group by Protocol + buffer_json_member_add_object(wb, "Protocol"); + { + buffer_json_member_add_string(wb, "name", "Protocol"); + buffer_json_member_add_array(wb, "columns"); + { + buffer_json_add_array_item_string(wb, "Protocol"); + } + buffer_json_array_close(wb); + } + buffer_json_object_close(wb); + } + buffer_json_object_close(wb); // group_by + + buffer_json_member_add_time_t(wb, "expires", expires); + buffer_json_finalize(wb); + + // Lock necessary to avoid race condition pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires); fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout); pluginsd_function_result_end_to_stdout(); fflush(stdout); - pthread_mutex_unlock(&lock); buffer_free(wb); } - /***************************************************************** * EBPF FUNCTION THREAD *****************************************************************/ @@ -375,45 +1067,27 @@ static void ebpf_function_thread_manipulation(const char *transaction, */ void *ebpf_function_thread(void *ptr) { - ebpf_module_t *em = (ebpf_module_t *)ptr; - char buffer[PLUGINSD_LINE_MAX + 1]; - - char *s = NULL; - while(!ebpf_exit_plugin && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { - char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS); - - const char *keyword = get_word(words, num_words, 0); - - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); - - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - keyword, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - else { - int timeout = str2i(timeout_s); - if (!strncmp(function, EBPF_FUNCTION_THREAD, sizeof(EBPF_FUNCTION_THREAD) - 1)) - ebpf_function_thread_manipulation(transaction, - function, - buffer, - PLUGINSD_LINE_MAX + 1, - timeout, - em); - else - ebpf_function_error(transaction, - HTTP_RESP_NOT_FOUND, - "No function with this name found in ebpf.plugin."); - } + (void)ptr; + + struct functions_evloop_globals *wg = functions_evloop_init(1, + "EBPF", + &lock, + &ebpf_plugin_exit); + + functions_evloop_add_function(wg, + "ebpf_socket", + ebpf_function_socket_manipulation, + PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT); + + heartbeat_t hb; + heartbeat_init(&hb); + while(!ebpf_plugin_exit) { + (void)heartbeat_next(&hb, USEC_PER_SEC); + + if (ebpf_plugin_exit) { + break; } - else - netdata_log_error("Received unknown command: %s", keyword ? keyword : "(unset)"); } + return NULL; } |