diff options
Diffstat (limited to 'sql/rpl_parallel.h')
-rw-r--r-- | sql/rpl_parallel.h | 479 |
1 files changed, 479 insertions, 0 deletions
diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h new file mode 100644 index 00000000..a9cfefcb --- /dev/null +++ b/sql/rpl_parallel.h @@ -0,0 +1,479 @@ +#ifndef RPL_PARALLEL_H +#define RPL_PARALLEL_H + +#include "log_event.h" + + +struct rpl_parallel; +struct rpl_parallel_entry; +struct rpl_parallel_thread_pool; +extern struct rpl_parallel_thread_pool pool_bkp_for_pfs; + +class Relay_log_info; +struct inuse_relaylog; + + +/* + Structure used to keep track of the parallel replication of a batch of + event-groups that group-committed together on the master. + + It is used to ensure that every event group in one batch has reached the + commit stage before the next batch starts executing. + + Note the lifetime of this structure: + + - It is allocated when the first event in a new batch of group commits + is queued, from the free list rpl_parallel_entry::gco_free_list. + + - The gco for the batch currently being queued is owned by + rpl_parallel_entry::current_gco. The gco for a previous batch that has + been fully queued is owned by the gco->prev_gco pointer of the gco for + the following batch. + + - The worker thread waits on gco->COND_group_commit_orderer for + rpl_parallel_entry::count_committing_event_groups to reach wait_count + before starting; the first waiter links the gco into the next_gco + pointer of the gco of the previous batch for signalling. + + - When an event group reaches the commit stage, it signals the + COND_group_commit_orderer if its gco->next_gco pointer is non-NULL and + rpl_parallel_entry::count_committing_event_groups has reached + gco->next_gco->wait_count. + + - The gco lives until all its event groups have completed their commit. + This is detected by rpl_parallel_entry::last_committed_sub_id being + greater than or equal gco->last_sub_id. Once this happens, the gco is + freed. Note that since update of last_committed_sub_id can happen + out-of-order, the thread that frees a given gco can be for any later + event group, not necessarily an event group from the gco being freed. +*/ +struct group_commit_orderer { + /* Wakeup condition, used with rpl_parallel_entry::LOCK_parallel_entry. */ + mysql_cond_t COND_group_commit_orderer; + uint64 wait_count; + group_commit_orderer *prev_gco; + group_commit_orderer *next_gco; + /* + The sub_id of last event group in the previous GCO. + Only valid if prev_gco != NULL. + */ + uint64 prior_sub_id; + /* + The sub_id of the last event group in this GCO. Only valid when next_gco + is non-NULL. + */ + uint64 last_sub_id; + /* + This flag is set when this GCO has been installed into the next_gco pointer + of the previous GCO. + */ + bool installed; + + enum force_switch_bits + { + /* + This flag is set for a GCO in which we have event groups with multiple + different commit_id values from the master. This happens when we + optimistically try to execute in parallel transactions not known to be + conflict-free. + + When this flag is set, in case of DDL we need to start a new GCO + regardless of current commit_id, as DDL is not safe to + speculatively apply in parallel with prior event groups. + */ + MULTI_BATCH= 1, + /* + This flag is set for a GCO that contains DDL. If set, it forces + a switch to a new GCO upon seeing a new commit_id, as DDL is not + safe to speculatively replicate in parallel with subsequent + transactions. + */ + FORCE_SWITCH= 2 + }; + uint8 flags; +#ifndef DBUG_OFF + /* + Flag set when the GCO has been freed and entered the free list, to catch + (in debug) errors in the complex lifetime of this object. + */ + bool gc_done; +#endif +}; + + +struct rpl_parallel_thread { + bool delay_start; + bool running; + bool stop; + bool pause_for_ftwrl; + /* + 0 = No start alter assigned + >0 = Start alter assigned + */ + uint64 current_start_alter_id; + uint32 current_start_alter_domain_id; + /* + This flag is true when Start Alter just needs to be binlogged only. + This scenario will happens when there is congestion , and we can not + allocate independent worker to start alter. + */ + bool reserved_start_alter_thread; + mysql_mutex_t LOCK_rpl_thread; + mysql_cond_t COND_rpl_thread; + mysql_cond_t COND_rpl_thread_queue; + mysql_cond_t COND_rpl_thread_stop; + struct rpl_parallel_thread *next; /* For free list. */ + struct rpl_parallel_thread_pool *pool; + THD *thd; + /* + Who owns the thread, if any (it's a pointer into the + rpl_parallel_entry::rpl_threads array. + */ + struct rpl_parallel_thread **current_owner; + /* The rpl_parallel_entry of the owner. */ + rpl_parallel_entry *current_entry; + struct queued_event { + queued_event *next; + /* + queued_event can hold either an event to be executed, or just a binlog + position to be updated without any associated event. + */ + enum queued_event_t { + QUEUED_EVENT, + QUEUED_POS_UPDATE, + QUEUED_MASTER_RESTART + } typ; + union { + Log_event *ev; /* QUEUED_EVENT */ + rpl_parallel_entry *entry_for_queued; /* QUEUED_POS_UPDATE and + QUEUED_MASTER_RESTART */ + }; + rpl_group_info *rgi; + inuse_relaylog *ir; + ulonglong future_event_relay_log_pos; + char event_relay_log_name[FN_REFLEN]; + char future_event_master_log_name[FN_REFLEN]; + ulonglong event_relay_log_pos; + my_off_t future_event_master_log_pos; + size_t event_size; + } *event_queue, *last_in_queue; + uint64 queued_size; + /* These free lists are protected by LOCK_rpl_thread. */ + queued_event *qev_free_list; + rpl_group_info *rgi_free_list; + group_commit_orderer *gco_free_list; + /* + These free lists are local to the thread, so need not be protected by any + lock. They are moved to the global free lists in batches in the function + batch_free(), to reduce LOCK_rpl_thread contention. + + The lists are not NULL-terminated (as we do not need to traverse them). + Instead, if they are non-NULL, the loc_XXX_last_ptr_ptr points to the + `next' pointer of the last element, which is used to link into the front + of the global freelists. + */ + queued_event *loc_qev_list, **loc_qev_last_ptr_ptr; + size_t loc_qev_size; + uint64 qev_free_pending; + rpl_group_info *loc_rgi_list, **loc_rgi_last_ptr_ptr; + group_commit_orderer *loc_gco_list, **loc_gco_last_ptr_ptr; + /* These keep track of batch update of inuse_relaylog refcounts. */ + inuse_relaylog *accumulated_ir_last; + uint64 accumulated_ir_count; + + char channel_name[MAX_CONNECTION_NAME]; + uint channel_name_length; + rpl_gtid last_seen_gtid; + int last_error_number; + char last_error_message[MAX_SLAVE_ERRMSG]; + ulonglong last_error_timestamp; + ulonglong worker_idle_time; + ulong last_trans_retry_count; + ulonglong start_time; + void start_time_tracker() + { + start_time= microsecond_interval_timer(); + } + ulonglong compute_time_lapsed() + { + return (ulonglong)((microsecond_interval_timer() - start_time) / 1000000.0); + } + void add_to_worker_idle_time_and_reset() + { + worker_idle_time+= compute_time_lapsed(); + start_time=0; + } + ulonglong get_worker_idle_time() + { + if (start_time) + return (worker_idle_time + compute_time_lapsed()); + else + return worker_idle_time; + } + void enqueue(queued_event *qev) + { + if (last_in_queue) + last_in_queue->next= qev; + else + event_queue= qev; + last_in_queue= qev; + queued_size+= qev->event_size; + } + + void dequeue1(queued_event *list) + { + DBUG_ASSERT(list == event_queue); + event_queue= last_in_queue= NULL; + } + + void dequeue2(size_t dequeue_size) + { + queued_size-= dequeue_size; + } + + queued_event *get_qev_common(Log_event *ev, ulonglong event_size); + queued_event *get_qev(Log_event *ev, ulonglong event_size, + Relay_log_info *rli); + queued_event *retry_get_qev(Log_event *ev, queued_event *orig_qev, + const char *relay_log_name, + ulonglong event_pos, ulonglong event_size); + /* + Put a qev on the local free list, to be later released to the global free + list by batch_free(). + */ + void loc_free_qev(queued_event *qev); + /* + Release an rgi immediately to the global free list. Requires holding the + LOCK_rpl_thread mutex. + */ + void free_qev(queued_event *qev); + rpl_group_info *get_rgi(Relay_log_info *rli, Gtid_log_event *gtid_ev, + rpl_parallel_entry *e, ulonglong event_size); + /* + Put an gco on the local free list, to be later released to the global free + list by batch_free(). + */ + void loc_free_rgi(rpl_group_info *rgi); + /* + Release an rgi immediately to the global free list. Requires holding the + LOCK_rpl_thread mutex. + */ + void free_rgi(rpl_group_info *rgi); + group_commit_orderer *get_gco(uint64 wait_count, group_commit_orderer *prev, + uint64 first_sub_id); + /* + Put a gco on the local free list, to be later released to the global free + list by batch_free(). + */ + void loc_free_gco(group_commit_orderer *gco); + /* + Move all local free lists to the global ones. Requires holding + LOCK_rpl_thread. + */ + void batch_free(); + /* Update inuse_relaylog refcounts with what we have accumulated so far. */ + void inuse_relaylog_refcount_update(); + rpl_parallel_thread(); +}; + + +struct pool_bkp_for_pfs{ + uint32 count; + bool inited, is_valid; + struct rpl_parallel_thread **rpl_thread_arr; + void init(uint32 thd_count) + { + DBUG_ASSERT(thd_count); + rpl_thread_arr= (rpl_parallel_thread **) + my_malloc(PSI_INSTRUMENT_ME, + thd_count * sizeof(rpl_parallel_thread*), + MYF(MY_WME | MY_ZEROFILL)); + for (uint i=0; i<thd_count; i++) + rpl_thread_arr[i]= (rpl_parallel_thread *) + my_malloc(PSI_INSTRUMENT_ME, sizeof(rpl_parallel_thread), + MYF(MY_WME | MY_ZEROFILL)); + count= thd_count; + inited= true; + } + + void destroy() + { + if (inited) + { + for (uint i=0; i<count; i++) + my_free(rpl_thread_arr[i]); + + my_free(rpl_thread_arr); + rpl_thread_arr= NULL; + } + inited= false; + } +}; + +struct rpl_parallel_thread_pool { + struct rpl_parallel_thread **threads; + struct rpl_parallel_thread *free_list; + mysql_mutex_t LOCK_rpl_thread_pool; + mysql_cond_t COND_rpl_thread_pool; + uint32 count; + bool inited; + + /* + Lock first LOCK_rpl_thread_pool and then LOCK_rpl_thread to + update this variable. + */ + uint32 current_start_alters; + /* + While FTWRL runs, this counter is incremented to make SQL thread or + STOP/START slave not try to start new activity while that operation + is in progress. + */ + bool busy; + struct pool_bkp_for_pfs pfs_bkp; + + rpl_parallel_thread_pool(); + void copy_pool_for_pfs(Relay_log_info *rli); + int init(uint32 size); + void destroy(); + void deactivate(); + void destroy_cond_mutex(); + struct rpl_parallel_thread *get_thread(rpl_parallel_thread **owner, + rpl_parallel_entry *entry); + void release_thread(rpl_parallel_thread *rpt); +}; + + +struct rpl_parallel_entry { + mysql_mutex_t LOCK_parallel_entry; + mysql_cond_t COND_parallel_entry; + uint32 domain_id; + /* + Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show + that they are waiting, so that finish_event_group knows to signal them + when last_committed_sub_id is increased. + */ + uint32 need_sub_id_signal; + uint64 last_commit_id; + uint32 pending_start_alters; + bool active; + /* + Set when SQL thread is shutting down, and no more events can be processed, + so worker threads must force abort any current transactions without + waiting for event groups to complete. + */ + bool force_abort; + /* + At STOP SLAVE (force_abort=true), we do not want to process all events in + the queue (which could unnecessarily delay stop, if a lot of events happen + to be queued). The stop_sub_id provides a safe point at which to stop, so + that everything before becomes committed and nothing after does. The value + corresponds to rpl_group_info::gtid_sub_id; if that is less than or equal + to stop_sub_id, we execute the associated event group, else we skip it (and + all following) and stop. + */ + uint64 stop_sub_id; + + /* + Cyclic array recording the last rpl_thread_max worker threads that we + queued event for. This is used to limit how many workers a single domain + can occupy (--slave-domain-parallel-threads). + + Note that workers are never explicitly deleted from the array. Instead, + we need to check (under LOCK_rpl_thread) that the thread still belongs + to us before re-using (rpl_thread::current_owner). + */ + rpl_parallel_thread **rpl_threads; + uint32 rpl_thread_max; + uint32 rpl_thread_idx; + /* + The sub_id of the last transaction to commit within this domain_id. + Must be accessed under LOCK_parallel_entry protection. + + Event groups commit in order, so the rpl_group_info for an event group + will be alive (at least) as long as + rpl_group_info::gtid_sub_id > last_committed_sub_id. This can be used to + safely refer back to previous event groups if they are still executing, + and ignore them if they completed, without requiring explicit + synchronisation between the threads. + */ + uint64 last_committed_sub_id; + /* + The sub_id of the last event group in this replication domain that was + queued for execution by a worker thread. + */ + uint64 current_sub_id; + /* + The largest sub_id that has started its transaction. Protected by + LOCK_parallel_entry. + + (Transactions can start out-of-order, so this value signifies that no + transactions with larger sub_id have started, but not necessarily that all + transactions with smaller sub_id have started). + */ + uint64 largest_started_sub_id; + rpl_group_info *current_group_info; + /* + If we get an error in some event group, we set the sub_id of that event + group here. Then later event groups (with higher sub_id) can know not to + try to start (event groups that already started will be rolled back when + wait_for_prior_commit() returns error). + The value is ULONGLONG_MAX when no error occurred. + */ + uint64 stop_on_error_sub_id; + /* + During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than + this value must not start, but wait until the global read lock is released. + The value is set to ULONGLONG_MAX when no FTWRL is pending. + */ + uint64 pause_sub_id; + /* Total count of event groups queued so far. */ + uint64 count_queued_event_groups; + /* + Count of event groups that have started (but not necessarily completed) + the commit phase. We use this to know when every event group in a previous + batch of master group commits have started committing on the slave, so + that it is safe to start executing the events in the following batch. + */ + uint64 count_committing_event_groups; + /* The group_commit_orderer object for the events currently being queued. */ + group_commit_orderer *current_gco; + /* Relay log info of replication source for this entry. */ + Relay_log_info *rli; + + rpl_parallel_thread * choose_thread(rpl_group_info *rgi, bool *did_enter_cond, + PSI_stage_info *old_stage, + Gtid_log_event *gtid_ev); + rpl_parallel_thread * + choose_thread_internal(uint idx, bool *did_enter_cond, rpl_group_info *rgi, + PSI_stage_info *old_stage); + int queue_master_restart(rpl_group_info *rgi, + Format_description_log_event *fdev); +}; +struct rpl_parallel { + HASH domain_hash; + rpl_parallel_entry *current; + bool sql_thread_stopping; + + rpl_parallel(); + ~rpl_parallel(); + void reset(); + rpl_parallel_entry *find(uint32 domain_id, Relay_log_info *rli); + void wait_for_done(THD *thd, Relay_log_info *rli); + void stop_during_until(); + bool workers_idle(); + int wait_for_workers_idle(THD *thd); + int do_event(rpl_group_info *serial_rgi, Log_event *ev, ulonglong event_size); +}; + + +extern struct rpl_parallel_thread_pool global_rpl_thread_pool; + + +extern int rpl_parallel_resize_pool_if_no_slaves(void); +extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); +extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); +extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid *gtid); +extern int rpl_pause_for_ftwrl(THD *thd); +extern void rpl_unpause_after_ftwrl(THD *thd); + +#endif /* RPL_PARALLEL_H */ |