diff options
Diffstat (limited to 'storage/innobase/buf/buf0buf.cc')
-rw-r--r-- | storage/innobase/buf/buf0buf.cc | 360 |
1 files changed, 333 insertions, 27 deletions
diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index 8ef18ee0..23b5b776 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -404,7 +404,7 @@ static bool buf_page_decrypt_after_read(buf_page_t *bpage, if (id.space() == SRV_TMP_SPACE_ID && innodb_encrypt_temporary_tables) { - slot = buf_pool.io_buf_reserve(); + slot = buf_pool.io_buf_reserve(false); slot->allocate(); bool ok = buf_tmp_page_decrypt(slot->crypt_buf, dst_frame); slot->release(); @@ -426,7 +426,7 @@ decompress: return false; } - slot = buf_pool.io_buf_reserve(); + slot = buf_pool.io_buf_reserve(false); slot->allocate(); decompress_with_slot: @@ -449,7 +449,7 @@ decrypt_failed: return false; } - slot = buf_pool.io_buf_reserve(); + slot = buf_pool.io_buf_reserve(false); slot->allocate(); /* decrypt using crypt_buf to dst_frame */ @@ -742,6 +742,205 @@ bool buf_page_is_corrupted(bool check_lsn, const byte *read_buf, #ifndef UNIV_INNOCHECKSUM +#ifdef __linux__ +#include <poll.h> +#include <sys/eventfd.h> +#include <fstream> + +/** Memory Pressure + +based off https://www.kernel.org/doc/html/latest/accounting/psi.html#pressure-interface +and https://www.kernel.org/doc/html/latest/admin-guide/cgroup-v2.html#memory */ +class mem_pressure +{ + /* triggers + eventfd */ + struct pollfd m_fds[3]; + nfds_t m_num_fds; + int m_event_fd= -1; + Atomic_relaxed<bool> m_abort= false; + + std::thread m_thd; + /* mem pressure garbage collection restricted to interval */ + static constexpr ulonglong max_interval_us= 60*1000000; + +public: + mem_pressure() : m_num_fds(0) {} + + bool setup() + { + static_assert(array_elements(m_fds) == (array_elements(m_triggers) + 1), + "insufficient fds"); + std::string memcgroup{"/sys/fs/cgroup"}; + std::string cgroup; + { + std::ifstream selfcgroup("/proc/self/cgroup"); + std::getline(selfcgroup, cgroup, '\n'); + } + + cgroup.erase(0, 3); // Remove "0::" + memcgroup+= cgroup + "/memory.pressure"; + + m_num_fds= 0; + for (auto trig= std::begin(m_triggers); trig!= std::end(m_triggers); ++trig) + { + if ((m_fds[m_num_fds].fd= + open(memcgroup.c_str(), O_RDWR | O_NONBLOCK | O_CLOEXEC)) < 0) + { + /* User can't do anything about it, no point giving warning */ + shutdown(); + return false; + } + my_register_filename(m_fds[m_num_fds].fd, memcgroup.c_str(), FILE_BY_OPEN, 0, MYF(0)); + ssize_t slen= strlen(*trig); + if (write(m_fds[m_num_fds].fd, *trig, slen) < slen) + { + /* we may fail this one, but continue to the next */ + my_close(m_fds[m_num_fds].fd, MYF(MY_WME)); + continue; + } + m_fds[m_num_fds].events= POLLPRI; + m_num_fds++; + } + if (m_num_fds < 1) + return false; + + if ((m_event_fd= eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK)) == -1) + { + /* User can't do anything about it, no point giving warning */ + shutdown(); + return false; + } + my_register_filename(m_event_fd, "mem_pressure_eventfd", FILE_BY_DUP, 0, MYF(0)); + m_fds[m_num_fds].fd= m_event_fd; + m_fds[m_num_fds].events= POLLIN; + m_num_fds++; + m_thd= std::thread(pressure_routine, this); + sql_print_information("InnoDB: Initialized memory pressure event listener"); + return true; + } + + void shutdown() + { + /* m_event_fd is in this list */ + while (m_num_fds) + { + m_num_fds--; + my_close(m_fds[m_num_fds].fd, MYF(MY_WME)); + m_fds[m_num_fds].fd= -1; + } + } + + static void pressure_routine(mem_pressure *m); + +#ifdef UNIV_DEBUG + void trigger_collection() + { + uint64_t u= 1; + if (m_event_fd >=0 && write(m_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) + sql_print_information("InnoDB: (Debug) Failed to trigger memory pressure"); + else /* assumed failed to meet intialization criteria, so trigger directy */ + buf_pool.garbage_collect(); + } +#endif + + void quit() + { + uint64_t u= 1; + m_abort= true; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-result" + /* return result ignored, cannot do anything with it */ + write(m_event_fd, &u, sizeof(uint64_t)); +#pragma GCC diagnostic pop + } + + void join() + { + if (m_thd.joinable()) + { + quit(); + m_thd.join(); + } + } + + static const char* const m_triggers[2]; +}; + + +/* + ref: https://docs.kernel.org/accounting/psi.html + maximum window size (second number) 10 seconds. + window size in multiples of 2 second interval required (for Unprivileged) + Time is in usec. +*/ +const char* const mem_pressure::m_triggers[]= + {"some 5000000 10000000", /* 5s out of 10s */ + "full 10000 2000000"}; /* 10ms out of 2s */ + +static mem_pressure mem_pressure_obj; + +void mem_pressure::pressure_routine(mem_pressure *m) +{ + DBUG_ASSERT(m == &mem_pressure_obj); + if (my_thread_init()) + { + m->shutdown(); + return; + } + + ulonglong last= microsecond_interval_timer() - max_interval_us; + while (!m->m_abort) + { + if (poll(&m->m_fds[0], m->m_num_fds, -1) < 0) + { + if (errno == EINTR) + continue; + else + break; + } + if (!m->m_abort) + break; + + for (pollfd &p : st_::span<pollfd>(m->m_fds, m->m_num_fds)) + { + if (p.revents & POLLPRI) + { + ulonglong now= microsecond_interval_timer(); + if ((now - last) > max_interval_us) + { + last= now; + buf_pool.garbage_collect(); + } + } + +#ifdef UNIV_DEBUG + if (p.revents & POLLIN) + { + uint64_t u; + /* we haven't aborted, so this must be a debug trigger */ + if (read(p.fd, &u, sizeof(u)) >=0) + buf_pool.garbage_collect(); + } +#endif + } + } + m->shutdown(); + + my_thread_end(); +} + +/** Initialize mem pressure. */ +ATTRIBUTE_COLD void buf_mem_pressure_detect_init() +{ + mem_pressure_obj.setup(); +} + +ATTRIBUTE_COLD void buf_mem_pressure_shutdown() +{ + mem_pressure_obj.join(); +} +#endif /* __linux__ */ + #if defined(DBUG_OFF) && defined(HAVE_MADVISE) && defined(MADV_DODUMP) /** Enable buffers to be dumped to core files @@ -1099,6 +1298,11 @@ bool buf_pool_t::create() chunk_t::map_ref= chunk_t::map_reg; buf_LRU_old_ratio_update(100 * 3 / 8, false); btr_search_sys_create(); + +#ifdef __linux__ + if (srv_operation == SRV_OPERATION_NORMAL) + buf_mem_pressure_detect_init(); +#endif ut_ad(is_initialised()); return false; } @@ -1300,14 +1504,17 @@ void buf_pool_t::io_buf_t::close() n_slots= 0; } -buf_tmp_buffer_t *buf_pool_t::io_buf_t::reserve() +buf_tmp_buffer_t *buf_pool_t::io_buf_t::reserve(bool wait_for_reads) { for (;;) { for (buf_tmp_buffer_t *s= slots, *e= slots + n_slots; s != e; s++) if (s->acquire()) return s; + buf_dblwr.flush_buffered_writes(); os_aio_wait_until_no_pending_writes(true); + if (!wait_for_reads) + continue; for (buf_tmp_buffer_t *s= slots, *e= slots + n_slots; s != e; s++) if (s->acquire()) return s; @@ -1536,6 +1743,7 @@ struct find_interesting_trx inline void buf_pool_t::resize() { ut_ad(this == &buf_pool); + ut_ad(srv_shutdown_state < SRV_SHUTDOWN_CLEANUP); bool warning = false; @@ -1878,6 +2086,100 @@ calc_buf_pool_size: return; } +#ifdef __linux__ +inline void buf_pool_t::garbage_collect() +{ + mysql_mutex_lock(&mutex); + size_t freed= 0; + +#ifdef BTR_CUR_HASH_ADAPT + /* buf_LRU_free_page() will temporarily release and reacquire + buf_pool.mutex for invoking btr_search_drop_page_hash_index(). Thus, + we must protect ourselves with the hazard pointer. */ +rescan: +#else + lru_hp.set(nullptr); +#endif + for (buf_page_t *bpage= UT_LIST_GET_LAST(LRU), *prev; bpage; bpage= prev) + { + prev= UT_LIST_GET_PREV(LRU, bpage); +#ifdef BTR_CUR_HASH_ADAPT + lru_hp.set(prev); +#endif + auto state= bpage->state(); + ut_ad(state >= buf_page_t::FREED); + ut_ad(bpage->in_LRU_list); + + /* We try to free any pages that can be freed without writing out + anything. */ + switch (bpage->oldest_modification()) { + case 0: + try_to_evict: + if (buf_LRU_free_page(bpage, true)) + { + evicted: + freed++; +#ifdef BTR_CUR_HASH_ADAPT + bpage= prev; + prev= lru_hp.get(); + if (!prev && bpage) + goto rescan; +#endif + } + continue; + case 1: + break; + default: + if (state >= buf_page_t::UNFIXED) + continue; + } + + if (state < buf_page_t::READ_FIX && bpage->lock.u_lock_try(true)) + { + ut_ad(!bpage->is_io_fixed()); + lsn_t oldest_modification= bpage->oldest_modification(); + switch (oldest_modification) { + case 1: + mysql_mutex_lock(&flush_list_mutex); + oldest_modification= bpage->oldest_modification(); + if (oldest_modification) + { + ut_ad(oldest_modification == 1); + delete_from_flush_list(bpage); + } + mysql_mutex_unlock(&flush_list_mutex); + /* fall through */ + case 0: + bpage->lock.u_unlock(true); + goto try_to_evict; + default: + if (bpage->state() < buf_page_t::UNFIXED && + oldest_modification <= log_sys.get_flushed_lsn()) + { + release_freed_page(bpage); + goto evicted; + } + else + bpage->lock.u_unlock(true); + } + } + } + +#if defined MADV_FREE + /* FIXME: Issue fewer calls for larger contiguous blocks of + memory. For now, we assume that this is acceptable, because this + code should be executed rarely. */ + for (buf_page_t *bpage= UT_LIST_GET_FIRST(free); bpage; + bpage= UT_LIST_GET_NEXT(list, bpage)) + madvise(bpage->frame, srv_page_size, MADV_FREE); +#endif + mysql_mutex_unlock(&mutex); + sql_print_information("InnoDB: Memory pressure event freed %zu pages", + freed); + return; +} +#endif /* __linux__ */ + /** Thread pool task invoked by innodb_buffer_pool_size changes. */ static void buf_resize_callback(void *) { @@ -1906,12 +2208,23 @@ static tpool::waitable_task buf_resize_task(buf_resize_callback, void buf_resize_start() { - srv_thread_pool->submit_task(&buf_resize_task); +#if !defined(DBUG_OFF) && defined(__linux__) + DBUG_EXECUTE_IF("trigger_garbage_collection", + { + mem_pressure_obj.trigger_collection(); + } + ); +#endif + + srv_thread_pool->submit_task(&buf_resize_task); } void buf_resize_shutdown() { - buf_resize_task.wait(); +#ifdef __linux__ + buf_mem_pressure_shutdown(); +#endif + buf_resize_task.wait(); } @@ -2220,14 +2533,21 @@ lookup: if (discard_attempted || !bpage->frame) { - /* Even when we are holding a hash_lock, it should be - acceptable to wait for a page S-latch here, because - buf_page_t::read_complete() will not wait for buf_pool.mutex, - and because S-latch would not conflict with a U-latch - that would be protecting buf_page_t::write_complete(). */ - bpage->lock.s_lock(); + const bool got_s_latch= bpage->lock.s_lock_try(); hash_lock.unlock_shared(); - break; + if (UNIV_LIKELY(got_s_latch)) + break; + /* We may fail to acquire bpage->lock because + buf_page_t::read_complete() may be invoking + buf_pool_t::corrupted_evict() on this block, which it would + hold an exclusive latch on. + + Let us aqcuire and release buf_pool.mutex to ensure that any + buf_pool_t::corrupted_evict() will proceed before we reacquire + the hash_lock that it could be waiting for. */ + mysql_mutex_lock(&buf_pool.mutex); + mysql_mutex_unlock(&buf_pool.mutex); + goto lookup; } hash_lock.unlock_shared(); @@ -2246,7 +2566,6 @@ lookup: ut_ad(s < buf_page_t::READ_FIX || s >= buf_page_t::WRITE_FIX); } - bpage->set_accessed(); buf_page_make_young_if_needed(bpage); #ifdef UNIV_DEBUG @@ -2873,18 +3192,6 @@ get_latch_valid: ut_ad(page_id_t(page_get_space_id(block->page.frame), page_get_page_no(block->page.frame)) == page_id); - - if (mode == BUF_GET_POSSIBLY_FREED - || mode == BUF_PEEK_IF_IN_POOL) { - return block; - } - - const bool not_first_access{block->page.set_accessed()}; - buf_page_make_young_if_needed(&block->page); - if (!not_first_access) { - buf_read_ahead_linear(page_id, block->zip_size(), - ibuf_inside(mtr)); - } } return block; @@ -3057,7 +3364,6 @@ bool buf_page_optimistic_get(ulint rw_latch, buf_block_t *block, block->page.fix(); ut_ad(!block->page.is_read_fixed()); - block->page.set_accessed(); buf_page_make_young_if_needed(&block->page); mtr->memo_push(block, mtr_memo_type_t(rw_latch)); } |