summaryrefslogtreecommitdiffstats
path: root/plugin/feedback/sender_thread.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:00:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:00:34 +0000
commit3f619478f796eddbba6e39502fe941b285dd97b1 (patch)
treee2c7b5777f728320e5b5542b6213fd3591ba51e2 /plugin/feedback/sender_thread.cc
parentInitial commit. (diff)
downloadmariadb-3f619478f796eddbba6e39502fe941b285dd97b1.tar.xz
mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.zip
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'plugin/feedback/sender_thread.cc')
-rw-r--r--plugin/feedback/sender_thread.cc298
1 files changed, 298 insertions, 0 deletions
diff --git a/plugin/feedback/sender_thread.cc b/plugin/feedback/sender_thread.cc
new file mode 100644
index 00000000..6b5be475
--- /dev/null
+++ b/plugin/feedback/sender_thread.cc
@@ -0,0 +1,298 @@
+/* Copyright (C) 2010 Sergei Golubchik and Monty Program Ab
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
+
+#include "feedback.h"
+#include <sql_acl.h>
+#include <sql_parse.h>
+#include <sql_show.h>
+#include <time.h>
+
+namespace feedback {
+
+static THD *thd= 0; ///< background thread thd
+static my_thread_id thd_thread_id; ///< its thread_id
+
+static size_t needed_size= 20480;
+
+ulong startup_interval= 60*5; ///< in seconds (5 minutes)
+ulong first_interval= 60*60*24; ///< in seconds (one day)
+ulong interval= 60*60*24*7; ///< in seconds (one week)
+
+/**
+ reads the rows from a table and puts them, concatenated, in a String
+
+ @note
+ 1. only supports two column tables - no less, no more.
+ 2. it emulates mysql -e "select * from..." and thus it separates
+ columns with \t and starts the output with column names.
+*/
+static int table_to_string(TABLE *table, String *result)
+{
+ int res;
+ char buff1[MAX_FIELD_WIDTH], buff2[MAX_FIELD_WIDTH];
+ String str1(buff1, sizeof(buff1), system_charset_info);
+ String str2(buff2, sizeof(buff2), system_charset_info);
+
+ res= table->file->ha_rnd_init(1);
+
+ dbug_tmp_use_all_columns(table, &table->read_set);
+
+ while(!res && !table->file->ha_rnd_next(table->record[0]))
+ {
+ table->field[0]->val_str(&str1);
+ table->field[1]->val_str(&str2);
+ if (result->reserve(str1.length() + str2.length() + 3))
+ res= 1;
+ else
+ {
+ result->qs_append(str1.ptr(), str1.length());
+ result->qs_append('\t');
+ result->qs_append(str2.ptr(), str2.length());
+ result->qs_append('\n');
+ }
+ }
+
+ res = res || (int)result->append('\n');
+
+ /*
+ Note, "|=" and not "||" - because we want to call ha_rnd_end()
+ even if res is already 1.
+ */
+ res |= table->file->ha_rnd_end();
+
+ return res;
+}
+
+/**
+ Initialize the THD and TABLE_LIST
+
+ The structures must be sufficiently initialized for create_tmp_table()
+ and fill_feedback() to work.
+*/
+static int prepare_for_fill(TABLE_LIST *tables)
+{
+ /*
+ Add our thd to the list, for it to be visible in SHOW PROCESSLIST.
+ But don't generate thread_id every time - use the saved value
+ (every increment of global thread_id counts as a new connection
+ in SHOW STATUS and we want to avoid skewing the statistics)
+ */
+ thd->variables.pseudo_thread_id= thd->thread_id;
+ server_threads.insert(thd);
+ thd->thread_stack= (char*) &tables;
+ thd->store_globals();
+
+ thd->mysys_var->current_cond= &sleep_condition;
+ thd->mysys_var->current_mutex= &sleep_mutex;
+ thd->proc_info="feedback";
+ thd->set_command(COM_SLEEP);
+ thd->system_thread= SYSTEM_THREAD_EVENT_WORKER; // whatever
+ thd->set_time();
+ thd->init_for_queries();
+ thd->real_id= pthread_self();
+ thd->db= null_clex_str;
+ thd->security_ctx->host_or_ip= "";
+ thd->security_ctx->db_access= DB_ACLS;
+ thd->security_ctx->master_access= ALL_KNOWN_ACL;
+ bzero((char*) &thd->net, sizeof(thd->net));
+ lex_start(thd);
+ thd->lex->init_select();
+
+ LEX_CSTRING tbl_name= {i_s_feedback->table_name, strlen(i_s_feedback->table_name) };
+
+ tables->init_one_table(&INFORMATION_SCHEMA_NAME, &tbl_name, 0, TL_READ);
+ tables->schema_table= i_s_feedback;
+ tables->schema_table_reformed= 1;
+ tables->select_lex= thd->lex->first_select_lex();
+ DBUG_ASSERT(tables->select_lex);
+ tables->table= create_schema_table(thd, tables);
+ if (!tables->table)
+ return 1;
+
+ tables->table->pos_in_table_list= tables;
+
+ return 0;
+}
+
+/**
+ Try to detect if this thread is going down
+
+ which can happen for different reasons:
+ * plugin is being unloaded
+ * mysqld server is being shut down
+ * the thread is being killed
+
+*/
+static bool going_down()
+{
+ return shutdown_plugin || abort_loop || (thd && thd->killed);
+}
+
+/**
+ just like sleep, but waits on a condition and checks "plugin shutdown" status
+*/
+static int slept_ok(time_t sec)
+{
+ struct timespec abstime;
+ int ret= 0;
+
+ set_timespec(abstime, sec);
+
+ mysql_mutex_lock(&sleep_mutex);
+ while (!going_down() && ret != ETIMEDOUT)
+ ret= mysql_cond_timedwait(&sleep_condition, &sleep_mutex, &abstime);
+ mysql_mutex_unlock(&sleep_mutex);
+
+ return !going_down();
+}
+
+/**
+ create a feedback report and send it to all specified urls
+
+ If "when" argument is not null, only it and the server uid are sent.
+ Otherwise a full report is generated.
+*/
+static void send_report(const char *when)
+{
+ TABLE_LIST tables;
+ String str;
+ int i, last_todo;
+ Url **todo= (Url**)alloca(url_count*sizeof(Url*));
+
+ str.alloc(needed_size); // preallocate it to avoid many small mallocs
+
+ /*
+ on startup and shutdown the server may not be completely
+ initialized, and full report won't work.
+ We send a short status notice only.
+ */
+ if (when)
+ {
+ str.length(0);
+ str.append(STRING_WITH_LEN("FEEDBACK_SERVER_UID"));
+ str.append('\t');
+ str.append(server_uid_buf, sizeof(server_uid_buf)-1);
+ str.append('\n');
+ str.append(STRING_WITH_LEN("FEEDBACK_WHEN"));
+ str.append('\t');
+ str.append(when, strlen(when));
+ str.append('\n');
+ str.append(STRING_WITH_LEN("FEEDBACK_USER_INFO"));
+ str.append('\t');
+ str.append(user_info, strlen(user_info));
+ str.append('\n');
+ str.append('\n');
+ }
+ else
+ {
+ /*
+ otherwise, prepare the THD and TABLE_LIST,
+ create and fill the temporary table with data just like
+ SELECT * FROM INFORMATION_SCHEMA.FEEDBACK is doing,
+ read and concatenate table data into a String.
+ */
+ if (!(thd= new THD(thd_thread_id)))
+ return;
+
+ if (prepare_for_fill(&tables))
+ goto ret;
+
+ if (fill_feedback(thd, &tables, NULL))
+ goto ret;
+
+ if (table_to_string(tables.table, &str))
+ goto ret;
+
+ needed_size= (size_t)(str.length() * 1.1);
+
+ free_tmp_table(thd, tables.table);
+ tables.table= 0;
+ }
+
+ /*
+ Try to send the report on every url from the list, remove url on success,
+ keep failed in the list. Repeat until the list is empty.
+ */
+ memcpy(todo, urls, url_count*sizeof(Url*));
+ last_todo= url_count - 1;
+ do
+ {
+ for (i= 0; i <= last_todo;)
+ {
+ Url *url= todo[i];
+
+ if (thd) // for nicer SHOW PROCESSLIST
+ thd->set_query(const_cast<char*>(url->url()), (uint) url->url_length());
+
+ if (url->send(str.ptr(), str.length()))
+ i++;
+ else
+ todo[i]= todo[last_todo--];
+ }
+ if (last_todo < 0)
+ break;
+ } while (slept_ok(send_retry_wait)); // wait a little bit before retrying
+
+ret:
+ if (thd)
+ {
+ if (tables.table)
+ free_tmp_table(thd, tables.table);
+ thd->cleanup_after_query();
+ /*
+ clean up, free the thd.
+ reset all thread local status variables to minimize
+ the effect of the background thread on SHOW STATUS.
+ */
+ server_threads.erase(thd);
+ thd->set_status_var_init();
+ thd->killed= KILL_CONNECTION;
+ delete thd;
+ thd= 0;
+ }
+}
+
+/**
+ background sending thread
+*/
+pthread_handler_t background_thread(void *arg __attribute__((unused)))
+{
+ if (my_thread_init())
+ return 0;
+
+ thd_thread_id= next_thread_id();
+
+ if (slept_ok(startup_interval))
+ {
+ send_report("startup");
+
+ if (slept_ok(first_interval))
+ {
+ send_report(NULL);
+
+ while(slept_ok(interval))
+ send_report(NULL);
+ }
+
+ send_report("shutdown");
+ }
+
+ my_thread_end();
+ pthread_exit(0);
+ return 0;
+}
+
+} // namespace feedback
+