/* Copyright (C) 2010 Sergei Golubchik and Monty Program Ab This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ #include "feedback.h" #include #include #include #include namespace feedback { static THD *thd= 0; ///< background thread thd static my_thread_id thd_thread_id; ///< its thread_id static size_t needed_size= 20480; ulong startup_interval= 60*5; ///< in seconds (5 minutes) ulong first_interval= 60*60*24; ///< in seconds (one day) ulong interval= 60*60*24*7; ///< in seconds (one week) /** reads the rows from a table and puts them, concatenated, in a String @note 1. only supports two column tables - no less, no more. 2. it emulates mysql -e "select * from..." and thus it separates columns with \t and starts the output with column names. */ static int table_to_string(TABLE *table, String *result) { int res; char buff1[MAX_FIELD_WIDTH], buff2[MAX_FIELD_WIDTH]; String str1(buff1, sizeof(buff1), system_charset_info); String str2(buff2, sizeof(buff2), system_charset_info); res= table->file->ha_rnd_init(1); dbug_tmp_use_all_columns(table, &table->read_set); while(!res && !table->file->ha_rnd_next(table->record[0])) { table->field[0]->val_str(&str1); table->field[1]->val_str(&str2); if (result->reserve(str1.length() + str2.length() + 3)) res= 1; else { result->qs_append(str1.ptr(), str1.length()); result->qs_append('\t'); result->qs_append(str2.ptr(), str2.length()); result->qs_append('\n'); } } res = res || (int)result->append('\n'); /* Note, "|=" and not "||" - because we want to call ha_rnd_end() even if res is already 1. */ res |= table->file->ha_rnd_end(); return res; } /** Initialize the THD and TABLE_LIST The structures must be sufficiently initialized for create_tmp_table() and fill_feedback() to work. */ static int prepare_for_fill(TABLE_LIST *tables) { /* Add our thd to the list, for it to be visible in SHOW PROCESSLIST. But don't generate thread_id every time - use the saved value (every increment of global thread_id counts as a new connection in SHOW STATUS and we want to avoid skewing the statistics) */ thd->variables.pseudo_thread_id= thd->thread_id; server_threads.insert(thd); thd->thread_stack= (char*) &tables; thd->store_globals(); thd->mysys_var->current_cond= &sleep_condition; thd->mysys_var->current_mutex= &sleep_mutex; thd->mark_connection_idle(); thd->proc_info="feedback"; thd->system_thread= SYSTEM_THREAD_EVENT_WORKER; // whatever thd->set_time(); thd->init_for_queries(); thd->real_id= pthread_self(); thd->db= null_clex_str; thd->security_ctx->host_or_ip= ""; thd->security_ctx->db_access= DB_ACLS; thd->security_ctx->master_access= ALL_KNOWN_ACL; bzero((char*) &thd->net, sizeof(thd->net)); lex_start(thd); thd->lex->init_select(); LEX_CSTRING tbl_name= {i_s_feedback->table_name, strlen(i_s_feedback->table_name) }; tables->init_one_table(&INFORMATION_SCHEMA_NAME, &tbl_name, 0, TL_READ); tables->schema_table= i_s_feedback; tables->schema_table_reformed= 1; tables->select_lex= thd->lex->first_select_lex(); DBUG_ASSERT(tables->select_lex); tables->table= create_schema_table(thd, tables); if (!tables->table) return 1; tables->table->pos_in_table_list= tables; return 0; } /** Try to detect if this thread is going down which can happen for different reasons: * plugin is being unloaded * mysqld server is being shut down * the thread is being killed */ static bool going_down() { return shutdown_plugin || abort_loop || (thd && thd->killed); } /** just like sleep, but waits on a condition and checks "plugin shutdown" status */ static int slept_ok(time_t sec) { struct timespec abstime; int ret= 0; set_timespec(abstime, sec); mysql_mutex_lock(&sleep_mutex); while (!going_down() && ret != ETIMEDOUT) ret= mysql_cond_timedwait(&sleep_condition, &sleep_mutex, &abstime); mysql_mutex_unlock(&sleep_mutex); return !going_down(); } /** create a feedback report and send it to all specified urls If "when" argument is not null, only it and the server uid are sent. Otherwise a full report is generated. */ static void send_report(const char *when) { TABLE_LIST tables; String str; int i, last_todo; Url **todo= (Url**)alloca(url_count*sizeof(Url*)); str.alloc(needed_size); // preallocate it to avoid many small mallocs /* on startup and shutdown the server may not be completely initialized, and full report won't work. We send a short status notice only. */ if (when) { str.length(0); str.append(STRING_WITH_LEN("FEEDBACK_SERVER_UID")); str.append('\t'); str.append(server_uid_buf, sizeof(server_uid_buf)-1); str.append('\n'); str.append(STRING_WITH_LEN("FEEDBACK_WHEN")); str.append('\t'); str.append(when, strlen(when)); str.append('\n'); str.append(STRING_WITH_LEN("FEEDBACK_USER_INFO")); str.append('\t'); str.append(user_info, strlen(user_info)); str.append('\n'); str.append('\n'); } else { /* otherwise, prepare the THD and TABLE_LIST, create and fill the temporary table with data just like SELECT * FROM INFORMATION_SCHEMA.FEEDBACK is doing, read and concatenate table data into a String. */ if (!(thd= new THD(thd_thread_id))) return; if (prepare_for_fill(&tables)) goto ret; if (fill_feedback(thd, &tables, NULL)) goto ret; if (table_to_string(tables.table, &str)) goto ret; needed_size= (size_t)(str.length() * 1.1); free_tmp_table(thd, tables.table); tables.table= 0; } /* Try to send the report on every url from the list, remove url on success, keep failed in the list. Repeat until the list is empty. */ memcpy(todo, urls, url_count*sizeof(Url*)); last_todo= url_count - 1; do { for (i= 0; i <= last_todo;) { Url *url= todo[i]; if (thd) // for nicer SHOW PROCESSLIST thd->set_query(const_cast(url->url()), (uint) url->url_length()); if (url->send(str.ptr(), str.length())) i++; else todo[i]= todo[last_todo--]; } if (last_todo < 0) break; } while (slept_ok(send_retry_wait)); // wait a little bit before retrying ret: if (thd) { if (tables.table) free_tmp_table(thd, tables.table); thd->cleanup_after_query(); /* clean up, free the thd. reset all thread local status variables to minimize the effect of the background thread on SHOW STATUS. */ server_threads.erase(thd); thd->set_status_var_init(); thd->killed= KILL_CONNECTION; delete thd; thd= 0; } } /** background sending thread */ pthread_handler_t background_thread(void *arg __attribute__((unused))) { if (my_thread_init()) return 0; thd_thread_id= next_thread_id(); if (slept_ok(startup_interval)) { send_report("startup"); if (slept_ok(first_interval)) { send_report(NULL); while(slept_ok(interval)) send_report(NULL); } send_report("shutdown"); } my_thread_end(); pthread_exit(0); return 0; } } // namespace feedback