summaryrefslogtreecommitdiffstats
path: root/sql/rpl_parallel.cc
diff options
context:
space:
mode:
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r--sql/rpl_parallel.cc286
1 files changed, 220 insertions, 66 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc
index 333a3960..bbfc0211 100644
--- a/sql/rpl_parallel.cc
+++ b/sql/rpl_parallel.cc
@@ -131,12 +131,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev)
asynchronously, we need to be sure they will be completed before starting a
new transaction. Otherwise the new transaction might suffer a spurious kill.
*/
-static void
+void
wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi)
{
PSI_stage_info old_stage;
mysql_mutex_lock(&thd->LOCK_wakeup_ready);
+ thd->set_time_for_next_stage();
thd->ENTER_COND(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready,
&stage_waiting_for_deadlock_kill, &old_stage);
while (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING)
@@ -214,6 +215,13 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id,
signal_error_to_sql_driver_thread(thd, rgi, err);
thd->wait_for_commit_ptr= NULL;
+ /*
+ Calls to check_duplicate_gtid() must match up with
+ record_and_update_gtid() (or release_domain_owner() in error case). This
+ assertion tries to catch any missing release of the domain.
+ */
+ DBUG_ASSERT(rgi->gtid_ignore_duplicate_state != rpl_group_info::GTID_DUPLICATE_OWNER);
+
mysql_mutex_lock(&entry->LOCK_parallel_entry);
/*
We need to mark that this event group started its commit phase, in case we
@@ -399,12 +407,12 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco,
if (wait_count > entry->count_committing_event_groups)
{
DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior");
+ thd->set_time_for_next_stage();
thd->ENTER_COND(&gco->COND_group_commit_orderer,
&entry->LOCK_parallel_entry,
&stage_waiting_for_prior_transaction_to_start_commit,
old_stage);
*did_enter_cond= true;
- thd->set_time_for_next_stage();
do
{
if (!rgi->worker_error && unlikely(thd->check_killed(1)))
@@ -492,10 +500,10 @@ do_ftwrl_wait(rpl_group_info *rgi,
*/
if (unlikely(sub_id > entry->pause_sub_id))
{
+ thd->set_time_for_next_stage();
thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry,
&stage_waiting_for_ftwrl, old_stage);
*did_enter_cond= true;
- thd->set_time_for_next_stage();
do
{
if (entry->force_abort || rgi->worker_error)
@@ -558,9 +566,9 @@ pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd)
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
if (thd)
{
+ thd->set_time_for_next_stage();
thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool,
&stage_waiting_for_rpl_thread_pool, &old_stage);
- thd->set_time_for_next_stage();
}
while (pool->busy)
{
@@ -700,9 +708,9 @@ rpl_pause_for_ftwrl(THD *thd)
mysql_mutex_lock(&e->LOCK_parallel_entry);
});
}
+ thd->set_time_for_next_stage();
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_ftwrl_threads_to_pause, &old_stage);
- thd->set_time_for_next_stage();
while (e->pause_sub_id < (uint64)ULONGLONG_MAX &&
e->last_committed_sub_id < e->pause_sub_id &&
!err)
@@ -907,7 +915,13 @@ do_retry:
});
#endif
- rgi->cleanup_context(thd, 1);
+ /*
+ We are still applying the event group, even though we will roll it back
+ and retry it. So for --gtid-ignore-duplicates, keep ownership of the
+ domain during the retry so another master connection will not try to take
+ over and duplicate apply the same event group (MDEV-33475).
+ */
+ rgi->cleanup_context(thd, 1, 1 /* keep_domain_owner */);
wait_for_pending_deadlock_kill(thd, rgi);
thd->reset_killed();
thd->clear_error();
@@ -2404,13 +2418,17 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
false Worker not allocated (choose_thread_internal not called)
*/
static bool handle_split_alter(rpl_parallel_entry *e,
- Gtid_log_event *gtid_ev, uint32 *idx,
+ Gtid_log_event *gtid_ev,
+ //uint32 *idx,
+ rpl_parallel_entry::sched_bucket **ptr_cur_thr,
//choose_thread_internal specific
bool *did_enter_cond, rpl_group_info* rgi,
PSI_stage_info *old_stage)
{
uint16 flags_extra= gtid_ev->flags_extra;
bool thread_allocated= false;
+ uint32 i= 0, *idx= &i;
+
//Step 1
if (flags_extra & Gtid_log_event::FL_START_ALTER_E1 ||
//This will arrange finding threads for CA/RA as well
@@ -2421,11 +2439,12 @@ static bool handle_split_alter(rpl_parallel_entry *e,
j is needed for round robin scheduling, we will start with rpl_thread_idx
go till rpl_thread_max and then start with 0 to rpl_thread_idx
*/
- int j= e->rpl_thread_idx;
+ auto j= static_cast<uint32>(e->thread_sched_fifo->head() - e->rpl_threads); // formerly e->rpl_thread_idx;
for(uint i= 0; i < e->rpl_thread_max; i++)
{
- if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner
- != &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id)
+ if (!e->rpl_threads[j].thr ||
+ e->rpl_threads[j].thr->current_owner != &e->rpl_threads[j].thr ||
+ !e->rpl_threads[j].thr->current_start_alter_id)
{
//This condition will hit atleast one time no matter what happens
*idx= j;
@@ -2436,17 +2455,26 @@ static bool handle_split_alter(rpl_parallel_entry *e,
j= j % e->rpl_thread_max;
}
//We did not find and idx
- DBUG_ASSERT(0);
- return false;
+ DBUG_ASSERT(0);
+
+ return false;
+
idx_found:
- e->rpl_thread_idx= *idx;
- e->choose_thread_internal(*idx, did_enter_cond, rgi, old_stage);
+ //e->rpl_thread_idx= *idx;
+ /* place the found *idx index into the head */
+ *ptr_cur_thr= &e->rpl_threads[*idx];
+ (*ptr_cur_thr)->unlink();
+ e->thread_sched_fifo->append(*ptr_cur_thr);
+ *ptr_cur_thr= e->thread_sched_fifo->head();
+
+ e->choose_thread_internal(*ptr_cur_thr, did_enter_cond, rgi,
+ old_stage);
thread_allocated= true;
if (flags_extra & Gtid_log_event::FL_START_ALTER_E1)
{
- mysql_mutex_assert_owner(&e->rpl_threads[*idx]->LOCK_rpl_thread);
- e->rpl_threads[e->rpl_thread_idx]->current_start_alter_id= gtid_ev->seq_no;
- e->rpl_threads[e->rpl_thread_idx]->current_start_alter_domain_id=
+ mysql_mutex_assert_owner(&e->rpl_threads[*idx].thr->LOCK_rpl_thread);
+ e->rpl_threads[*idx].thr->current_start_alter_id= gtid_ev->seq_no;
+ e->rpl_threads[*idx].thr->current_start_alter_domain_id=
gtid_ev->domain_id;
/*
We are locking LOCK_rpl_thread_pool becuase we are going to update
@@ -2462,9 +2490,9 @@ idx_found:
}
else
{
- e->rpl_threads[*idx]->reserved_start_alter_thread= true;
- e->rpl_threads[*idx]->current_start_alter_id= 0;
- e->rpl_threads[*idx]->current_start_alter_domain_id= 0;
+ e->rpl_threads[*idx].thr->reserved_start_alter_thread= true;
+ e->rpl_threads[*idx].thr->current_start_alter_id= 0;
+ e->rpl_threads[*idx].thr->current_start_alter_domain_id= 0;
}
mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
}
@@ -2475,13 +2503,13 @@ idx_found:
//Free the corrosponding rpt current_start_alter_id
for(uint i= 0; i < e->rpl_thread_max; i++)
{
- if(e->rpl_threads[i] &&
- e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no &&
- e->rpl_threads[i]->current_start_alter_domain_id == gtid_ev->domain_id)
+ if(e->rpl_threads[i].thr &&
+ e->rpl_threads[i].thr->current_start_alter_id == gtid_ev->sa_seq_no &&
+ e->rpl_threads[i].thr->current_start_alter_domain_id == gtid_ev->domain_id)
{
mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool);
- e->rpl_threads[i]->current_start_alter_id= 0;
- e->rpl_threads[i]->current_start_alter_domain_id= 0;
+ e->rpl_threads[i].thr->current_start_alter_id= 0;
+ e->rpl_threads[i].thr->current_start_alter_domain_id= 0;
global_rpl_thread_pool.current_start_alters--;
e->pending_start_alters--;
DBUG_PRINT("info", ("Commit/Rollback alter id %d", i));
@@ -2497,6 +2525,79 @@ idx_found:
/*
+ Check when we have done a complete round of scheduling for workers
+ 0, 1, ..., (rpl_thread_max-1), in this order.
+ This often occurs every rpl_thread_max event group, but XA XID dependency
+ restrictions can cause insertion of extra out-of-order worker scheduling
+ in-between the normal round-robin scheduling.
+*/
+void
+rpl_parallel_entry::check_scheduling_generation(sched_bucket *cur)
+{
+ uint32 idx= static_cast<uint32>(cur - rpl_threads);
+ DBUG_ASSERT(cur >= rpl_threads);
+ DBUG_ASSERT(cur < rpl_threads + rpl_thread_max);
+ if (idx == current_generation_idx)
+ {
+ ++idx;
+ if (idx >= rpl_thread_max)
+ {
+ /* A new generation; all workers have been scheduled at least once. */
+ idx= 0;
+ ++current_generation;
+ }
+ current_generation_idx= idx;
+ }
+}
+
+
+rpl_parallel_entry::sched_bucket *
+rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid)
+{
+ uint64 cur_gen= current_generation;
+ my_off_t i= 0;
+ while (i < maybe_active_xid.elements)
+ {
+ /*
+ Purge no longer active XID from the list:
+
+ - In generation N, XID might have been scheduled for worker W.
+ - Events in generation (N+1) might run freely in parallel with W.
+ - Events in generation (N+2) will have done wait_for_prior_commit for
+ the event group with XID (or a later one), but the XID might still be
+ active for a bit longer after wakeup_prior_commit().
+ - Events in generation (N+3) will have done wait_for_prior_commit() for
+ an event in W _after_ the XID, so are sure not to see the XID active.
+
+ Therefore, XID can be safely scheduled to a different worker in
+ generation (N+3) when last prior use was in generation N (or earlier).
+ */
+ xid_active_generation *a=
+ dynamic_element(&maybe_active_xid, i, xid_active_generation *);
+ if (a->generation + 3 <= cur_gen)
+ {
+ *a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid));
+ continue;
+ }
+ if (xid->eq(&a->xid))
+ {
+ /* Update the last used generation and return the match. */
+ a->generation= cur_gen;
+ return a->thr;
+ }
+ ++i;
+ }
+ /* try to keep allocated memory in the range of [2,10] * initial_chunk_size */
+ if (maybe_active_xid.elements <= 2 * active_xid_init_alloc() &&
+ maybe_active_xid.max_element > 10 * active_xid_init_alloc())
+ freeze_size(&maybe_active_xid);
+
+ /* No matching XID conflicts. */
+ return nullptr;
+}
+
+
+/*
Obtain a worker thread that we can queue an event to.
Each invocation allocates a new worker thread, to maximise
@@ -2528,40 +2629,70 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond,
PSI_stage_info *old_stage,
Gtid_log_event *gtid_ev)
{
- uint32 idx;
+ sched_bucket *cur_thr;
- idx= rpl_thread_idx;
if (gtid_ev)
{
- if (++idx >= rpl_thread_max)
- idx= 0;
+ /* New event group; cycle the thread scheduling buckets round-robin. */
+ thread_sched_fifo->push_back(thread_sched_fifo->get());
+
//rpl_thread_idx will be updated handle_split_alter
- if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage))
- return rpl_threads[idx];
+ if (handle_split_alter(this, gtid_ev, &cur_thr, did_enter_cond, rgi,
+ old_stage))
+ return cur_thr->thr;
+
if (gtid_ev->flags2 &
(Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA))
- {
- idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(),
- gtid_ev->xid.key_length()) % rpl_thread_max;
+ {
+ if ((cur_thr= check_xa_xid_dependency(&gtid_ev->xid)))
+ {
+ /*
+ A previously scheduled event group with the same XID might still be
+ active in a worker, so schedule this event group in the same worker
+ to avoid a conflict.
+ */
+ cur_thr->unlink();
+ thread_sched_fifo->append(cur_thr);
+ }
+ else
+ {
+ /* Record this XID now active. */
+ xid_active_generation *a=
+ (xid_active_generation *)alloc_dynamic(&maybe_active_xid);
+ if (!a)
+ return NULL;
+ a->thr= cur_thr= thread_sched_fifo->head();
+ a->generation= current_generation;
+ a->xid.set(&gtid_ev->xid);
+ }
}
- rpl_thread_idx= idx;
+ else
+ cur_thr= thread_sched_fifo->head();
+
+ check_scheduling_generation(cur_thr);
}
- return choose_thread_internal(idx, did_enter_cond, rgi, old_stage);
+ else
+ cur_thr= thread_sched_fifo->head();
+
+ return choose_thread_internal(cur_thr /*idx*/, did_enter_cond, rgi, old_stage);
}
-rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
- bool *did_enter_cond, rpl_group_info *rgi,
- PSI_stage_info *old_stage)
+rpl_parallel_thread *
+rpl_parallel_entry::choose_thread_internal(sched_bucket *cur_thr,
+ bool *did_enter_cond,
+ rpl_group_info *rgi,
+ PSI_stage_info *old_stage)
{
- rpl_parallel_thread* thr= rpl_threads[idx];
Relay_log_info *rli= rgi->rli;
+ rpl_parallel_thread *thr= cur_thr->thr;
+
if (thr)
{
*did_enter_cond= false;
mysql_mutex_lock(&thr->LOCK_rpl_thread);
for (;;)
{
- if (thr->current_owner != &rpl_threads[idx])
+ if (thr->current_owner != &cur_thr->thr)
{
/*
The worker thread became idle, and returned to the free list and
@@ -2593,16 +2724,16 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
and this can cause THD::awake to use the wrong mutex.
*/
#ifdef ENABLED_DEBUG_SYNC
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
- };);
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", {
+ debug_sync_set_action(
+ rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_ready"));
+ };);
#endif
- rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue,
- &thr->LOCK_rpl_thread,
- &stage_waiting_for_room_in_worker_thread,
- old_stage);
+ rli->sql_driver_thd->set_time_for_next_stage();
+ rli->sql_driver_thd->ENTER_COND(
+ &thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread,
+ &stage_waiting_for_room_in_worker_thread, old_stage);
*did_enter_cond= true;
}
@@ -2612,11 +2743,11 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
did_enter_cond, old_stage);
my_error(ER_CONNECTION_KILLED, MYF(0));
#ifdef ENABLED_DEBUG_SYNC
- DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max",
- {
- debug_sync_set_action(rli->sql_driver_thd,
- STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
- };);
+ DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", {
+ debug_sync_set_action(
+ rli->sql_driver_thd,
+ STRING_WITH_LEN("now SIGNAL wait_queue_killed"));
+ };);
#endif
slave_output_error_info(rgi, rli->sql_driver_thd);
return NULL;
@@ -2626,9 +2757,10 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx,
}
}
}
+
if (!thr)
- rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx],
- this);
+ cur_thr->thr= thr=
+ global_rpl_thread_pool.get_thread(&cur_thr->thr, this);
return thr;
}
@@ -2643,6 +2775,7 @@ free_rpl_parallel_entry(void *element)
dealloc_gco(e->current_gco);
e->current_gco= prev_gco;
}
+ delete_dynamic(&e->maybe_active_xid);
mysql_cond_destroy(&e->COND_parallel_entry);
mysql_mutex_destroy(&e->LOCK_parallel_entry);
my_free(e);
@@ -2686,17 +2819,37 @@ rpl_parallel::find(uint32 domain_id, Relay_log_info *rli)
ulong count= opt_slave_domain_parallel_threads;
if (count == 0 || count > opt_slave_parallel_threads)
count= opt_slave_parallel_threads;
- rpl_parallel_thread **p;
+ rpl_parallel_entry::sched_bucket *p;
+ I_List<rpl_parallel_entry::sched_bucket> *fifo;
if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL),
&e, sizeof(*e),
&p, count*sizeof(*p),
+ &fifo, sizeof(*fifo),
NULL))
{
my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p)));
return NULL;
}
+ /* Initialize a FIFO of scheduled worker threads. */
+ e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>;
+ /*
+ (We cycle the FIFO _before_ allocating next entry in
+ rpl_parallel_entry::choose_thread(). So initialize the FIFO with the
+ highest element at the front, just so that the first event group gets
+ scheduled on entry 0).
+ */
+ e->thread_sched_fifo->
+ push_back(::new (p+count-1) rpl_parallel_entry::sched_bucket);
+ for (ulong i= 0; i < count-1; ++i)
+ e->thread_sched_fifo->
+ push_back(::new (p+i) rpl_parallel_entry::sched_bucket);
e->rpl_threads= p;
e->rpl_thread_max= count;
+ e->current_generation = 0;
+ e->current_generation_idx = 0;
+ init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid,
+ sizeof(rpl_parallel_entry::xid_active_generation),
+ 0, e->active_xid_init_alloc(), 0, MYF(0));
e->domain_id= domain_id;
e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX;
e->pause_sub_id= (uint64)ULONGLONG_MAX;
@@ -2766,10 +2919,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
mysql_mutex_unlock(&e->LOCK_parallel_entry);
for (j= 0; j < e->rpl_thread_max; ++j)
{
- if ((rpt= e->rpl_threads[j]))
+ if ((rpt= e->rpl_threads[j].thr))
{
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- if (rpt->current_owner == &e->rpl_threads[j])
+ if (rpt->current_owner == &e->rpl_threads[j].thr)
mysql_cond_signal(&rpt->COND_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
}
@@ -2828,10 +2981,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli)
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
for (j= 0; j < e->rpl_thread_max; ++j)
{
- if ((rpt= e->rpl_threads[j]))
+ if ((rpt= e->rpl_threads[j].thr))
{
mysql_mutex_lock(&rpt->LOCK_rpl_thread);
- while (rpt->current_owner == &e->rpl_threads[j])
+ while (rpt->current_owner == &e->rpl_threads[j].thr)
mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread);
mysql_mutex_unlock(&rpt->LOCK_rpl_thread);
}
@@ -2889,7 +3042,7 @@ int
rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
Format_description_log_event *fdev)
{
- uint32 idx;
+ sched_bucket *cur_thr;
rpl_parallel_thread *thr;
rpl_parallel_thread::queued_event *qev;
Relay_log_info *rli= rgi->rli;
@@ -2904,12 +3057,12 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi,
Thus there is no need for the full complexity of choose_thread(). We only
need to check if we have a current worker thread, and queue for it if so.
*/
- idx= rpl_thread_idx;
- thr= rpl_threads[idx];
+ cur_thr= thread_sched_fifo->head();
+ thr= cur_thr->thr;
if (!thr)
return 0;
mysql_mutex_lock(&thr->LOCK_rpl_thread);
- if (thr->current_owner != &rpl_threads[idx])
+ if (thr->current_owner != &cur_thr->thr)
{
/* No active worker thread, so no need to queue the master restart. */
mysql_mutex_unlock(&thr->LOCK_rpl_thread);
@@ -2953,6 +3106,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd)
e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i);
mysql_mutex_lock(&e->LOCK_parallel_entry);
++e->need_sub_id_signal;
+ thd->set_time_for_next_stage();
thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry,
&stage_waiting_for_workers_idle, &old_stage);
while (e->current_sub_id > e->last_committed_sub_id)