diff options
Diffstat (limited to 'sql/rpl_parallel.cc')
-rw-r--r-- | sql/rpl_parallel.cc | 286 |
1 files changed, 220 insertions, 66 deletions
diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc index 333a3960..bbfc0211 100644 --- a/sql/rpl_parallel.cc +++ b/sql/rpl_parallel.cc @@ -131,12 +131,13 @@ handle_queued_pos_update(THD *thd, rpl_parallel_thread::queued_event *qev) asynchronously, we need to be sure they will be completed before starting a new transaction. Otherwise the new transaction might suffer a spurious kill. */ -static void +void wait_for_pending_deadlock_kill(THD *thd, rpl_group_info *rgi) { PSI_stage_info old_stage; mysql_mutex_lock(&thd->LOCK_wakeup_ready); + thd->set_time_for_next_stage(); thd->ENTER_COND(&thd->COND_wakeup_ready, &thd->LOCK_wakeup_ready, &stage_waiting_for_deadlock_kill, &old_stage); while (rgi->killed_for_retry == rpl_group_info::RETRY_KILL_PENDING) @@ -214,6 +215,13 @@ finish_event_group(rpl_parallel_thread *rpt, uint64 sub_id, signal_error_to_sql_driver_thread(thd, rgi, err); thd->wait_for_commit_ptr= NULL; + /* + Calls to check_duplicate_gtid() must match up with + record_and_update_gtid() (or release_domain_owner() in error case). This + assertion tries to catch any missing release of the domain. + */ + DBUG_ASSERT(rgi->gtid_ignore_duplicate_state != rpl_group_info::GTID_DUPLICATE_OWNER); + mysql_mutex_lock(&entry->LOCK_parallel_entry); /* We need to mark that this event group started its commit phase, in case we @@ -399,12 +407,12 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer *gco, if (wait_count > entry->count_committing_event_groups) { DEBUG_SYNC(thd, "rpl_parallel_start_waiting_for_prior"); + thd->set_time_for_next_stage(); thd->ENTER_COND(&gco->COND_group_commit_orderer, &entry->LOCK_parallel_entry, &stage_waiting_for_prior_transaction_to_start_commit, old_stage); *did_enter_cond= true; - thd->set_time_for_next_stage(); do { if (!rgi->worker_error && unlikely(thd->check_killed(1))) @@ -492,10 +500,10 @@ do_ftwrl_wait(rpl_group_info *rgi, */ if (unlikely(sub_id > entry->pause_sub_id)) { + thd->set_time_for_next_stage(); thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry, &stage_waiting_for_ftwrl, old_stage); *did_enter_cond= true; - thd->set_time_for_next_stage(); do { if (entry->force_abort || rgi->worker_error) @@ -558,9 +566,9 @@ pool_mark_busy(rpl_parallel_thread_pool *pool, THD *thd) mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); if (thd) { + thd->set_time_for_next_stage(); thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool, &stage_waiting_for_rpl_thread_pool, &old_stage); - thd->set_time_for_next_stage(); } while (pool->busy) { @@ -700,9 +708,9 @@ rpl_pause_for_ftwrl(THD *thd) mysql_mutex_lock(&e->LOCK_parallel_entry); }); } + thd->set_time_for_next_stage(); thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, &stage_waiting_for_ftwrl_threads_to_pause, &old_stage); - thd->set_time_for_next_stage(); while (e->pause_sub_id < (uint64)ULONGLONG_MAX && e->last_committed_sub_id < e->pause_sub_id && !err) @@ -907,7 +915,13 @@ do_retry: }); #endif - rgi->cleanup_context(thd, 1); + /* + We are still applying the event group, even though we will roll it back + and retry it. So for --gtid-ignore-duplicates, keep ownership of the + domain during the retry so another master connection will not try to take + over and duplicate apply the same event group (MDEV-33475). + */ + rgi->cleanup_context(thd, 1, 1 /* keep_domain_owner */); wait_for_pending_deadlock_kill(thd, rgi); thd->reset_killed(); thd->clear_error(); @@ -2404,13 +2418,17 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) false Worker not allocated (choose_thread_internal not called) */ static bool handle_split_alter(rpl_parallel_entry *e, - Gtid_log_event *gtid_ev, uint32 *idx, + Gtid_log_event *gtid_ev, + //uint32 *idx, + rpl_parallel_entry::sched_bucket **ptr_cur_thr, //choose_thread_internal specific bool *did_enter_cond, rpl_group_info* rgi, PSI_stage_info *old_stage) { uint16 flags_extra= gtid_ev->flags_extra; bool thread_allocated= false; + uint32 i= 0, *idx= &i; + //Step 1 if (flags_extra & Gtid_log_event::FL_START_ALTER_E1 || //This will arrange finding threads for CA/RA as well @@ -2421,11 +2439,12 @@ static bool handle_split_alter(rpl_parallel_entry *e, j is needed for round robin scheduling, we will start with rpl_thread_idx go till rpl_thread_max and then start with 0 to rpl_thread_idx */ - int j= e->rpl_thread_idx; + auto j= static_cast<uint32>(e->thread_sched_fifo->head() - e->rpl_threads); // formerly e->rpl_thread_idx; for(uint i= 0; i < e->rpl_thread_max; i++) { - if (!e->rpl_threads[j] || e->rpl_threads[j]->current_owner - != &e->rpl_threads[j] || !e->rpl_threads[j]->current_start_alter_id) + if (!e->rpl_threads[j].thr || + e->rpl_threads[j].thr->current_owner != &e->rpl_threads[j].thr || + !e->rpl_threads[j].thr->current_start_alter_id) { //This condition will hit atleast one time no matter what happens *idx= j; @@ -2436,17 +2455,26 @@ static bool handle_split_alter(rpl_parallel_entry *e, j= j % e->rpl_thread_max; } //We did not find and idx - DBUG_ASSERT(0); - return false; + DBUG_ASSERT(0); + + return false; + idx_found: - e->rpl_thread_idx= *idx; - e->choose_thread_internal(*idx, did_enter_cond, rgi, old_stage); + //e->rpl_thread_idx= *idx; + /* place the found *idx index into the head */ + *ptr_cur_thr= &e->rpl_threads[*idx]; + (*ptr_cur_thr)->unlink(); + e->thread_sched_fifo->append(*ptr_cur_thr); + *ptr_cur_thr= e->thread_sched_fifo->head(); + + e->choose_thread_internal(*ptr_cur_thr, did_enter_cond, rgi, + old_stage); thread_allocated= true; if (flags_extra & Gtid_log_event::FL_START_ALTER_E1) { - mysql_mutex_assert_owner(&e->rpl_threads[*idx]->LOCK_rpl_thread); - e->rpl_threads[e->rpl_thread_idx]->current_start_alter_id= gtid_ev->seq_no; - e->rpl_threads[e->rpl_thread_idx]->current_start_alter_domain_id= + mysql_mutex_assert_owner(&e->rpl_threads[*idx].thr->LOCK_rpl_thread); + e->rpl_threads[*idx].thr->current_start_alter_id= gtid_ev->seq_no; + e->rpl_threads[*idx].thr->current_start_alter_domain_id= gtid_ev->domain_id; /* We are locking LOCK_rpl_thread_pool becuase we are going to update @@ -2462,9 +2490,9 @@ idx_found: } else { - e->rpl_threads[*idx]->reserved_start_alter_thread= true; - e->rpl_threads[*idx]->current_start_alter_id= 0; - e->rpl_threads[*idx]->current_start_alter_domain_id= 0; + e->rpl_threads[*idx].thr->reserved_start_alter_thread= true; + e->rpl_threads[*idx].thr->current_start_alter_id= 0; + e->rpl_threads[*idx].thr->current_start_alter_domain_id= 0; } mysql_mutex_unlock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); } @@ -2475,13 +2503,13 @@ idx_found: //Free the corrosponding rpt current_start_alter_id for(uint i= 0; i < e->rpl_thread_max; i++) { - if(e->rpl_threads[i] && - e->rpl_threads[i]->current_start_alter_id == gtid_ev->sa_seq_no && - e->rpl_threads[i]->current_start_alter_domain_id == gtid_ev->domain_id) + if(e->rpl_threads[i].thr && + e->rpl_threads[i].thr->current_start_alter_id == gtid_ev->sa_seq_no && + e->rpl_threads[i].thr->current_start_alter_domain_id == gtid_ev->domain_id) { mysql_mutex_lock(&global_rpl_thread_pool.LOCK_rpl_thread_pool); - e->rpl_threads[i]->current_start_alter_id= 0; - e->rpl_threads[i]->current_start_alter_domain_id= 0; + e->rpl_threads[i].thr->current_start_alter_id= 0; + e->rpl_threads[i].thr->current_start_alter_domain_id= 0; global_rpl_thread_pool.current_start_alters--; e->pending_start_alters--; DBUG_PRINT("info", ("Commit/Rollback alter id %d", i)); @@ -2497,6 +2525,79 @@ idx_found: /* + Check when we have done a complete round of scheduling for workers + 0, 1, ..., (rpl_thread_max-1), in this order. + This often occurs every rpl_thread_max event group, but XA XID dependency + restrictions can cause insertion of extra out-of-order worker scheduling + in-between the normal round-robin scheduling. +*/ +void +rpl_parallel_entry::check_scheduling_generation(sched_bucket *cur) +{ + uint32 idx= static_cast<uint32>(cur - rpl_threads); + DBUG_ASSERT(cur >= rpl_threads); + DBUG_ASSERT(cur < rpl_threads + rpl_thread_max); + if (idx == current_generation_idx) + { + ++idx; + if (idx >= rpl_thread_max) + { + /* A new generation; all workers have been scheduled at least once. */ + idx= 0; + ++current_generation; + } + current_generation_idx= idx; + } +} + + +rpl_parallel_entry::sched_bucket * +rpl_parallel_entry::check_xa_xid_dependency(xid_t *xid) +{ + uint64 cur_gen= current_generation; + my_off_t i= 0; + while (i < maybe_active_xid.elements) + { + /* + Purge no longer active XID from the list: + + - In generation N, XID might have been scheduled for worker W. + - Events in generation (N+1) might run freely in parallel with W. + - Events in generation (N+2) will have done wait_for_prior_commit for + the event group with XID (or a later one), but the XID might still be + active for a bit longer after wakeup_prior_commit(). + - Events in generation (N+3) will have done wait_for_prior_commit() for + an event in W _after_ the XID, so are sure not to see the XID active. + + Therefore, XID can be safely scheduled to a different worker in + generation (N+3) when last prior use was in generation N (or earlier). + */ + xid_active_generation *a= + dynamic_element(&maybe_active_xid, i, xid_active_generation *); + if (a->generation + 3 <= cur_gen) + { + *a= *((xid_active_generation *)pop_dynamic(&maybe_active_xid)); + continue; + } + if (xid->eq(&a->xid)) + { + /* Update the last used generation and return the match. */ + a->generation= cur_gen; + return a->thr; + } + ++i; + } + /* try to keep allocated memory in the range of [2,10] * initial_chunk_size */ + if (maybe_active_xid.elements <= 2 * active_xid_init_alloc() && + maybe_active_xid.max_element > 10 * active_xid_init_alloc()) + freeze_size(&maybe_active_xid); + + /* No matching XID conflicts. */ + return nullptr; +} + + +/* Obtain a worker thread that we can queue an event to. Each invocation allocates a new worker thread, to maximise @@ -2528,40 +2629,70 @@ rpl_parallel_entry::choose_thread(rpl_group_info *rgi, bool *did_enter_cond, PSI_stage_info *old_stage, Gtid_log_event *gtid_ev) { - uint32 idx; + sched_bucket *cur_thr; - idx= rpl_thread_idx; if (gtid_ev) { - if (++idx >= rpl_thread_max) - idx= 0; + /* New event group; cycle the thread scheduling buckets round-robin. */ + thread_sched_fifo->push_back(thread_sched_fifo->get()); + //rpl_thread_idx will be updated handle_split_alter - if (handle_split_alter(this, gtid_ev, &idx, did_enter_cond, rgi, old_stage)) - return rpl_threads[idx]; + if (handle_split_alter(this, gtid_ev, &cur_thr, did_enter_cond, rgi, + old_stage)) + return cur_thr->thr; + if (gtid_ev->flags2 & (Gtid_log_event::FL_COMPLETED_XA | Gtid_log_event::FL_PREPARED_XA)) - { - idx= my_hash_sort(&my_charset_bin, gtid_ev->xid.key(), - gtid_ev->xid.key_length()) % rpl_thread_max; + { + if ((cur_thr= check_xa_xid_dependency(>id_ev->xid))) + { + /* + A previously scheduled event group with the same XID might still be + active in a worker, so schedule this event group in the same worker + to avoid a conflict. + */ + cur_thr->unlink(); + thread_sched_fifo->append(cur_thr); + } + else + { + /* Record this XID now active. */ + xid_active_generation *a= + (xid_active_generation *)alloc_dynamic(&maybe_active_xid); + if (!a) + return NULL; + a->thr= cur_thr= thread_sched_fifo->head(); + a->generation= current_generation; + a->xid.set(>id_ev->xid); + } } - rpl_thread_idx= idx; + else + cur_thr= thread_sched_fifo->head(); + + check_scheduling_generation(cur_thr); } - return choose_thread_internal(idx, did_enter_cond, rgi, old_stage); + else + cur_thr= thread_sched_fifo->head(); + + return choose_thread_internal(cur_thr /*idx*/, did_enter_cond, rgi, old_stage); } -rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx, - bool *did_enter_cond, rpl_group_info *rgi, - PSI_stage_info *old_stage) +rpl_parallel_thread * +rpl_parallel_entry::choose_thread_internal(sched_bucket *cur_thr, + bool *did_enter_cond, + rpl_group_info *rgi, + PSI_stage_info *old_stage) { - rpl_parallel_thread* thr= rpl_threads[idx]; Relay_log_info *rli= rgi->rli; + rpl_parallel_thread *thr= cur_thr->thr; + if (thr) { *did_enter_cond= false; mysql_mutex_lock(&thr->LOCK_rpl_thread); for (;;) { - if (thr->current_owner != &rpl_threads[idx]) + if (thr->current_owner != &cur_thr->thr) { /* The worker thread became idle, and returned to the free list and @@ -2593,16 +2724,16 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx, and this can cause THD::awake to use the wrong mutex. */ #ifdef ENABLED_DEBUG_SYNC - DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", - { - debug_sync_set_action(rli->sql_driver_thd, - STRING_WITH_LEN("now SIGNAL wait_queue_ready")); - };); + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", { + debug_sync_set_action( + rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_ready")); + };); #endif - rli->sql_driver_thd->ENTER_COND(&thr->COND_rpl_thread_queue, - &thr->LOCK_rpl_thread, - &stage_waiting_for_room_in_worker_thread, - old_stage); + rli->sql_driver_thd->set_time_for_next_stage(); + rli->sql_driver_thd->ENTER_COND( + &thr->COND_rpl_thread_queue, &thr->LOCK_rpl_thread, + &stage_waiting_for_room_in_worker_thread, old_stage); *did_enter_cond= true; } @@ -2612,11 +2743,11 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx, did_enter_cond, old_stage); my_error(ER_CONNECTION_KILLED, MYF(0)); #ifdef ENABLED_DEBUG_SYNC - DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", - { - debug_sync_set_action(rli->sql_driver_thd, - STRING_WITH_LEN("now SIGNAL wait_queue_killed")); - };); + DBUG_EXECUTE_IF("rpl_parallel_wait_queue_max", { + debug_sync_set_action( + rli->sql_driver_thd, + STRING_WITH_LEN("now SIGNAL wait_queue_killed")); + };); #endif slave_output_error_info(rgi, rli->sql_driver_thd); return NULL; @@ -2626,9 +2757,10 @@ rpl_parallel_thread * rpl_parallel_entry::choose_thread_internal(uint idx, } } } + if (!thr) - rpl_threads[idx]= thr= global_rpl_thread_pool.get_thread(&rpl_threads[idx], - this); + cur_thr->thr= thr= + global_rpl_thread_pool.get_thread(&cur_thr->thr, this); return thr; } @@ -2643,6 +2775,7 @@ free_rpl_parallel_entry(void *element) dealloc_gco(e->current_gco); e->current_gco= prev_gco; } + delete_dynamic(&e->maybe_active_xid); mysql_cond_destroy(&e->COND_parallel_entry); mysql_mutex_destroy(&e->LOCK_parallel_entry); my_free(e); @@ -2686,17 +2819,37 @@ rpl_parallel::find(uint32 domain_id, Relay_log_info *rli) ulong count= opt_slave_domain_parallel_threads; if (count == 0 || count > opt_slave_parallel_threads) count= opt_slave_parallel_threads; - rpl_parallel_thread **p; + rpl_parallel_entry::sched_bucket *p; + I_List<rpl_parallel_entry::sched_bucket> *fifo; if (!my_multi_malloc(PSI_INSTRUMENT_ME, MYF(MY_WME|MY_ZEROFILL), &e, sizeof(*e), &p, count*sizeof(*p), + &fifo, sizeof(*fifo), NULL)) { my_error(ER_OUTOFMEMORY, MYF(0), (int)(sizeof(*e)+count*sizeof(*p))); return NULL; } + /* Initialize a FIFO of scheduled worker threads. */ + e->thread_sched_fifo = new (fifo) I_List<rpl_parallel_entry::sched_bucket>; + /* + (We cycle the FIFO _before_ allocating next entry in + rpl_parallel_entry::choose_thread(). So initialize the FIFO with the + highest element at the front, just so that the first event group gets + scheduled on entry 0). + */ + e->thread_sched_fifo-> + push_back(::new (p+count-1) rpl_parallel_entry::sched_bucket); + for (ulong i= 0; i < count-1; ++i) + e->thread_sched_fifo-> + push_back(::new (p+i) rpl_parallel_entry::sched_bucket); e->rpl_threads= p; e->rpl_thread_max= count; + e->current_generation = 0; + e->current_generation_idx = 0; + init_dynamic_array2(PSI_INSTRUMENT_ME, &e->maybe_active_xid, + sizeof(rpl_parallel_entry::xid_active_generation), + 0, e->active_xid_init_alloc(), 0, MYF(0)); e->domain_id= domain_id; e->stop_on_error_sub_id= (uint64)ULONGLONG_MAX; e->pause_sub_id= (uint64)ULONGLONG_MAX; @@ -2766,10 +2919,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) mysql_mutex_unlock(&e->LOCK_parallel_entry); for (j= 0; j < e->rpl_thread_max; ++j) { - if ((rpt= e->rpl_threads[j])) + if ((rpt= e->rpl_threads[j].thr)) { mysql_mutex_lock(&rpt->LOCK_rpl_thread); - if (rpt->current_owner == &e->rpl_threads[j]) + if (rpt->current_owner == &e->rpl_threads[j].thr) mysql_cond_signal(&rpt->COND_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); } @@ -2828,10 +2981,10 @@ rpl_parallel::wait_for_done(THD *thd, Relay_log_info *rli) e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); for (j= 0; j < e->rpl_thread_max; ++j) { - if ((rpt= e->rpl_threads[j])) + if ((rpt= e->rpl_threads[j].thr)) { mysql_mutex_lock(&rpt->LOCK_rpl_thread); - while (rpt->current_owner == &e->rpl_threads[j]) + while (rpt->current_owner == &e->rpl_threads[j].thr) mysql_cond_wait(&rpt->COND_rpl_thread_stop, &rpt->LOCK_rpl_thread); mysql_mutex_unlock(&rpt->LOCK_rpl_thread); } @@ -2889,7 +3042,7 @@ int rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, Format_description_log_event *fdev) { - uint32 idx; + sched_bucket *cur_thr; rpl_parallel_thread *thr; rpl_parallel_thread::queued_event *qev; Relay_log_info *rli= rgi->rli; @@ -2904,12 +3057,12 @@ rpl_parallel_entry::queue_master_restart(rpl_group_info *rgi, Thus there is no need for the full complexity of choose_thread(). We only need to check if we have a current worker thread, and queue for it if so. */ - idx= rpl_thread_idx; - thr= rpl_threads[idx]; + cur_thr= thread_sched_fifo->head(); + thr= cur_thr->thr; if (!thr) return 0; mysql_mutex_lock(&thr->LOCK_rpl_thread); - if (thr->current_owner != &rpl_threads[idx]) + if (thr->current_owner != &cur_thr->thr) { /* No active worker thread, so no need to queue the master restart. */ mysql_mutex_unlock(&thr->LOCK_rpl_thread); @@ -2953,6 +3106,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); mysql_mutex_lock(&e->LOCK_parallel_entry); ++e->need_sub_id_signal; + thd->set_time_for_next_stage(); thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, &stage_waiting_for_workers_idle, &old_stage); while (e->current_sub_id > e->last_committed_sub_id) |