summaryrefslogtreecommitdiffstats
path: root/sql/my_apc.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/my_apc.cc')
-rw-r--r--sql/my_apc.cc239
1 files changed, 239 insertions, 0 deletions
diff --git a/sql/my_apc.cc b/sql/my_apc.cc
new file mode 100644
index 00000000..50e88abf
--- /dev/null
+++ b/sql/my_apc.cc
@@ -0,0 +1,239 @@
+/*
+ Copyright (c) 2011, 2013 Monty Program Ab.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
+
+
+#ifndef MY_APC_STANDALONE
+
+#include "mariadb.h"
+#include "sql_class.h"
+
+#endif
+
+/* For standalone testing of APC system, see unittest/sql/my_apc-t.cc */
+
+/*
+ Initialize the target.
+
+ @note
+ Initialization must be done prior to enabling/disabling the target, or making
+ any call requests to it.
+ Initial state after initialization is 'disabled'.
+*/
+void Apc_target::init(mysql_mutex_t *target_mutex)
+{
+ DBUG_ASSERT(!enabled);
+ LOCK_thd_kill_ptr= target_mutex;
+#ifndef DBUG_OFF
+ n_calls_processed= 0;
+#endif
+}
+
+
+/* [internal] Put request qe into the request list */
+
+void Apc_target::enqueue_request(Call_request *qe)
+{
+ mysql_mutex_assert_owner(LOCK_thd_kill_ptr);
+ if (apc_calls)
+ {
+ Call_request *after= apc_calls->prev;
+ qe->next= apc_calls;
+ apc_calls->prev= qe;
+
+ qe->prev= after;
+ after->next= qe;
+ }
+ else
+ {
+ apc_calls= qe;
+ qe->next= qe->prev= qe;
+ }
+}
+
+
+/*
+ [internal] Remove request qe from the request queue.
+
+ The request is not necessarily first in the queue.
+*/
+
+void Apc_target::dequeue_request(Call_request *qe)
+{
+ mysql_mutex_assert_owner(LOCK_thd_kill_ptr);
+ if (apc_calls == qe)
+ {
+ if ((apc_calls= apc_calls->next) == qe)
+ {
+ apc_calls= NULL;
+ }
+ }
+
+ qe->prev->next= qe->next;
+ qe->next->prev= qe->prev;
+}
+
+#ifdef HAVE_PSI_INTERFACE
+
+/* One key for all conds */
+PSI_cond_key key_show_explain_request_COND;
+
+static PSI_cond_info show_explain_psi_conds[]=
+{
+ { &key_show_explain_request_COND, "show_explain", 0 /* not using PSI_FLAG_GLOBAL*/ }
+};
+
+void init_show_explain_psi_keys(void)
+{
+ if (PSI_server == NULL)
+ return;
+
+ PSI_server->register_cond("sql", show_explain_psi_conds,
+ array_elements(show_explain_psi_conds));
+}
+#endif
+
+
+/*
+ Make an APC (Async Procedure Call) to another thread.
+
+ @detail
+ Make an APC call: schedule it for execution and wait until the target
+ thread has executed it.
+
+ - The caller is responsible for making sure he's not posting request
+ to the thread he's calling this function from.
+
+ - The caller must have locked target_mutex. The function will release it.
+
+ @retval FALSE - Ok, the call has been made
+ @retval TRUE - Call wasnt made (either the target is in disabled state or
+ timeout occurred)
+*/
+
+bool Apc_target::make_apc_call(THD *caller_thd, Apc_call *call,
+ int timeout_sec, bool *timed_out)
+{
+ bool res= TRUE;
+ *timed_out= FALSE;
+
+ if (enabled)
+ {
+ /* Create and post the request */
+ Call_request apc_request;
+ apc_request.call= call;
+ apc_request.processed= FALSE;
+ mysql_cond_init(key_show_explain_request_COND, &apc_request.COND_request,
+ NULL);
+ enqueue_request(&apc_request);
+ apc_request.what="enqueued by make_apc_call";
+
+ struct timespec abstime;
+ const int timeout= timeout_sec;
+ set_timespec(abstime, timeout);
+
+ int wait_res= 0;
+ PSI_stage_info old_stage;
+ caller_thd->ENTER_COND(&apc_request.COND_request, LOCK_thd_kill_ptr,
+ &stage_show_explain, &old_stage);
+ /* todo: how about processing other errors here? */
+ while (!apc_request.processed && (wait_res != ETIMEDOUT))
+ {
+ /* We own LOCK_thd_kill_ptr */
+ wait_res= mysql_cond_timedwait(&apc_request.COND_request,
+ LOCK_thd_kill_ptr, &abstime);
+ // &apc_request.LOCK_request, &abstime);
+ if (caller_thd->killed)
+ break;
+ }
+
+ if (!apc_request.processed)
+ {
+ /*
+ The wait has timed out, or this thread was KILLed.
+ Remove the request from the queue (ok to do because we own
+ LOCK_thd_kill_ptr)
+ */
+ apc_request.processed= TRUE;
+ dequeue_request(&apc_request);
+ *timed_out= TRUE;
+ res= TRUE;
+ }
+ else
+ {
+ /* Request was successfully executed and dequeued by the target thread */
+ res= FALSE;
+ }
+ /*
+ exit_cond() will call mysql_mutex_unlock(LOCK_thd_kill_ptr) for us:
+ */
+ caller_thd->EXIT_COND(&old_stage);
+
+ /* Destroy all APC request data */
+ mysql_cond_destroy(&apc_request.COND_request);
+ }
+ else
+ {
+#ifndef DBUG_OFF
+ /* We didn't make the call, because the target is disabled */
+ n_calls_processed++;
+#endif
+ mysql_mutex_unlock(LOCK_thd_kill_ptr);
+ }
+ return res;
+}
+
+
+/*
+ Process all APC requests.
+ This should be called periodically by the APC target thread.
+*/
+
+void Apc_target::process_apc_requests(bool force)
+{
+ while (1)
+ {
+ Call_request *request;
+
+ if (force)
+ mysql_mutex_lock(LOCK_thd_kill_ptr);
+ else if (mysql_mutex_trylock(LOCK_thd_kill_ptr))
+ break; // Mutex is blocked, try again later
+ if (!(request= get_first_in_queue()))
+ {
+ /* No requests in the queue */
+ mysql_mutex_unlock(LOCK_thd_kill_ptr);
+ break;
+ }
+
+ /*
+ Remove the request from the queue (we're holding queue lock so we can be
+ sure that request owner won't try to remove it)
+ */
+ request->what="dequeued by process_apc_requests";
+ dequeue_request(request);
+ request->processed= TRUE;
+
+ request->call->call_in_target_thread();
+ request->what="func called by process_apc_requests";
+
+#ifndef DBUG_OFF
+ n_calls_processed++;
+#endif
+ mysql_cond_signal(&request->COND_request);
+ mysql_mutex_unlock(LOCK_thd_kill_ptr);
+ }
+}
+