summaryrefslogtreecommitdiffstats
path: root/storage/perfschema/pfs_buffer_container.h
diff options
context:
space:
mode:
Diffstat (limited to 'storage/perfschema/pfs_buffer_container.h')
-rw-r--r--storage/perfschema/pfs_buffer_container.h1626
1 files changed, 1626 insertions, 0 deletions
diff --git a/storage/perfschema/pfs_buffer_container.h b/storage/perfschema/pfs_buffer_container.h
new file mode 100644
index 00000000..0f856cee
--- /dev/null
+++ b/storage/perfschema/pfs_buffer_container.h
@@ -0,0 +1,1626 @@
+/* Copyright (c) 2014, 2022, Oracle and/or its affiliates.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License, version 2.0,
+ as published by the Free Software Foundation.
+
+ This program is also distributed with certain software (including
+ but not limited to OpenSSL) that is licensed under separate terms,
+ as designated in a particular file or component or in included license
+ documentation. The authors of MySQL hereby grant you an additional
+ permission to link the program and your derivative works with the
+ separately licensed software that they have included with MySQL.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License, version 2.0, for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software Foundation,
+ 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
+
+#ifndef PFS_BUFFER_CONTAINER_H
+#define PFS_BUFFER_CONTAINER_H
+
+#include "my_global.h"
+#include "pfs.h" // PSI_COUNT_VOLATILITY
+#include "pfs_lock.h"
+#include "pfs_instr.h"
+#include "pfs_setup_actor.h"
+#include "pfs_setup_object.h"
+#include "pfs_program.h"
+#include "pfs_prepared_stmt.h"
+#include "pfs_builtin_memory.h"
+
+#define USE_SCALABLE
+
+class PFS_opaque_container_page;
+class PFS_opaque_container;
+
+struct PFS_builtin_memory_class;
+
+template <class T>
+class PFS_buffer_const_iterator;
+
+template <class T>
+class PFS_buffer_processor;
+
+template <class T, class U, class V>
+class PFS_buffer_iterator;
+
+template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT, class U, class V>
+class PFS_buffer_scalable_iterator;
+
+template <class T>
+class PFS_buffer_default_array;
+
+template <class T>
+class PFS_buffer_default_allocator;
+
+template <class T, class U, class V>
+class PFS_buffer_container;
+
+template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT, class U, class V>
+class PFS_buffer_scalable_container;
+
+template <class B, int COUNT>
+class PFS_partitioned_buffer_scalable_iterator;
+
+template <class B, int COUNT>
+class PFS_partitioned_buffer_scalable_container;
+
+
+template <class T>
+class PFS_buffer_default_array
+{
+public:
+ typedef T value_type;
+
+ value_type *allocate(pfs_dirty_state *dirty_state)
+ {
+ uint index;
+ uint monotonic;
+ uint monotonic_max;
+ value_type *pfs;
+
+ if (m_full)
+ return NULL;
+
+ monotonic= PFS_atomic::add_u32(& m_monotonic.m_u32, 1);
+ monotonic_max= monotonic + static_cast<uint>(m_max);
+
+ while (monotonic < monotonic_max)
+ {
+ index= monotonic % m_max;
+ pfs= m_ptr + index;
+
+ if (pfs->m_lock.free_to_dirty(dirty_state))
+ {
+ return pfs;
+ }
+ monotonic= PFS_atomic::add_u32(& m_monotonic.m_u32, 1);
+ }
+
+ m_full= true;
+ return NULL;
+ }
+
+ void deallocate(value_type *pfs)
+ {
+ pfs->m_lock.allocated_to_free();
+ m_full= false;
+ }
+
+ T* get_first()
+ {
+ return m_ptr;
+ }
+
+ T* get_last()
+ {
+ return m_ptr + m_max;
+ }
+
+ bool m_full;
+ PFS_cacheline_uint32 m_monotonic;
+ T * m_ptr;
+ size_t m_max;
+ /** Container. */
+ PFS_opaque_container *m_container;
+};
+
+template <class T>
+class PFS_buffer_default_allocator
+{
+public:
+ typedef PFS_buffer_default_array<T> array_type;
+
+ PFS_buffer_default_allocator(PFS_builtin_memory_class *klass)
+ : m_builtin_class(klass)
+ {}
+
+ int alloc_array(array_type *array)
+ {
+ array->m_ptr= NULL;
+ array->m_full= true;
+ array->m_monotonic.m_u32= 0;
+
+ if (array->m_max > 0)
+ {
+ array->m_ptr= PFS_MALLOC_ARRAY(m_builtin_class,
+ array->m_max, sizeof(T), T, MYF(MY_ZEROFILL));
+ if (array->m_ptr == NULL)
+ return 1;
+ array->m_full= false;
+ }
+ return 0;
+ }
+
+ void free_array(array_type *array)
+ {
+ assert(array->m_max > 0);
+
+ PFS_FREE_ARRAY(m_builtin_class,
+ array->m_max, sizeof(T), array->m_ptr);
+ array->m_ptr= NULL;
+ }
+
+private:
+ PFS_builtin_memory_class *m_builtin_class;
+};
+
+template <class T,
+ class U = PFS_buffer_default_array<T>,
+ class V = PFS_buffer_default_allocator<T> >
+class PFS_buffer_container
+{
+public:
+ friend class PFS_buffer_iterator<T, U, V>;
+
+ typedef T value_type;
+ typedef U array_type;
+ typedef V allocator_type;
+ typedef PFS_buffer_const_iterator<T> const_iterator_type;
+ typedef PFS_buffer_iterator<T, U, V> iterator_type;
+ typedef PFS_buffer_processor<T> processor_type;
+ typedef void (*function_type)(value_type *);
+
+ PFS_buffer_container(allocator_type *allocator)
+ {
+ m_array.m_full= true;
+ m_array.m_ptr= NULL;
+ m_array.m_max= 0;
+ m_array.m_monotonic.m_u32= 0;
+ m_lost= 0;
+ m_max= 0;
+ m_allocator= allocator;
+ }
+
+ int init(ulong max_size)
+ {
+ if (max_size > 0)
+ {
+ m_array.m_max= max_size;
+ int rc= m_allocator->alloc_array(& m_array);
+ if (rc != 0)
+ {
+ m_allocator->free_array(& m_array);
+ return 1;
+ }
+ m_max= max_size;
+ m_array.m_full= false;
+ }
+ return 0;
+ }
+
+ void cleanup()
+ {
+ m_allocator->free_array(& m_array);
+ }
+
+ ulong get_row_count() const
+ {
+ return m_max;
+ }
+
+ ulong get_row_size() const
+ {
+ return sizeof(value_type);
+ }
+
+ ulong get_memory() const
+ {
+ return get_row_count() * get_row_size();
+ }
+
+ value_type *allocate(pfs_dirty_state *dirty_state)
+ {
+ value_type *pfs;
+
+ pfs= m_array.allocate(dirty_state, m_max);
+ if (pfs == NULL)
+ {
+ m_lost++;
+ }
+
+ return pfs;
+ }
+
+ void deallocate(value_type *pfs)
+ {
+ m_array.deallocate(pfs);
+ }
+
+ iterator_type iterate()
+ {
+ return PFS_buffer_iterator<T, U, V>(this, 0);
+ }
+
+ iterator_type iterate(uint index)
+ {
+ assert(index <= m_max);
+ return PFS_buffer_iterator<T, U, V>(this, index);
+ }
+
+ void apply(function_type fct)
+ {
+ value_type *pfs= m_array.get_first();
+ value_type *pfs_last= m_array.get_last();
+
+ while (pfs < pfs_last)
+ {
+ if (pfs->m_lock.is_populated())
+ {
+ fct(pfs);
+ }
+ pfs++;
+ }
+ }
+
+ void apply_all(function_type fct)
+ {
+ value_type *pfs= m_array.get_first();
+ value_type *pfs_last= m_array.get_last();
+
+ while (pfs < pfs_last)
+ {
+ fct(pfs);
+ pfs++;
+ }
+ }
+
+ void apply(processor_type & proc)
+ {
+ value_type *pfs= m_array.get_first();
+ value_type *pfs_last= m_array.get_last();
+
+ while (pfs < pfs_last)
+ {
+ if (pfs->m_lock.is_populated())
+ {
+ proc(pfs);
+ }
+ pfs++;
+ }
+ }
+
+ void apply_all(processor_type & proc)
+ {
+ value_type *pfs= m_array.get_first();
+ value_type *pfs_last= m_array.get_last();
+
+ while (pfs < pfs_last)
+ {
+ proc(pfs);
+ pfs++;
+ }
+ }
+
+ inline value_type* get(uint index)
+ {
+ assert(index < m_max);
+
+ value_type *pfs= m_array.m_ptr + index;
+ if (pfs->m_lock.is_populated())
+ {
+ return pfs;
+ }
+
+ return NULL;
+ }
+
+ value_type* get(uint index, bool *has_more)
+ {
+ if (index >= m_max)
+ {
+ *has_more= false;
+ return NULL;
+ }
+
+ *has_more= true;
+ return get(index);
+ }
+
+ value_type *sanitize(value_type *unsafe)
+ {
+ intptr offset;
+ value_type *pfs= m_array.get_first();
+ value_type *pfs_last= m_array.get_last();
+
+ if ((pfs <= unsafe) &&
+ (unsafe < pfs_last))
+ {
+ offset= ((intptr) unsafe - (intptr) pfs) % sizeof(value_type);
+ if (offset == 0)
+ return unsafe;
+ }
+
+ return NULL;
+ }
+
+ ulong m_lost;
+
+private:
+ value_type* scan_next(uint & index, uint * found_index)
+ {
+ assert(index <= m_max);
+
+ value_type *pfs_first= m_array.get_first();
+ value_type *pfs= pfs_first + index;
+ value_type *pfs_last= m_array.get_last();
+
+ while (pfs < pfs_last)
+ {
+ if (pfs->m_lock.is_populated())
+ {
+ uint found= pfs - pfs_first;
+ *found_index= found;
+ index= found + 1;
+ return pfs;
+ }
+ pfs++;
+ }
+
+ index= m_max;
+ return NULL;
+ }
+
+ ulong m_max;
+ array_type m_array;
+ allocator_type *m_allocator;
+};
+
+template <class T,
+ int PFS_PAGE_SIZE,
+ int PFS_PAGE_COUNT,
+ class U = PFS_buffer_default_array<T>,
+ class V = PFS_buffer_default_allocator<T> >
+class PFS_buffer_scalable_container
+{
+public:
+ friend class PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>;
+
+ /**
+ Type of elements in the buffer.
+ The following attributes are required:
+ - pfs_lock m_lock
+ - PFS_opaque_container_page *m_page
+ */
+ typedef T value_type;
+ /**
+ Type of pages in the buffer.
+ The following attributes are required:
+ - PFS_opaque_container *m_container
+ */
+ typedef U array_type;
+ typedef V allocator_type;
+ /** This container type */
+ typedef PFS_buffer_scalable_container<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V> container_type;
+ typedef PFS_buffer_const_iterator<T> const_iterator_type;
+ typedef PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V> iterator_type;
+ typedef PFS_buffer_processor<T> processor_type;
+ typedef void (*function_type)(value_type *);
+
+ static const size_t MAX_SIZE= PFS_PAGE_SIZE*PFS_PAGE_COUNT;
+
+ PFS_buffer_scalable_container(allocator_type *allocator)
+ {
+ m_allocator= allocator;
+ m_initialized= false;
+ m_lost= 0;
+ }
+
+ int init(long max_size)
+ {
+ int i;
+
+ m_initialized= true;
+ m_full= true;
+ m_max= PFS_PAGE_COUNT * PFS_PAGE_SIZE;
+ m_max_page_count= PFS_PAGE_COUNT;
+ m_last_page_size= PFS_PAGE_SIZE;
+ m_lost= 0;
+ m_monotonic.m_u32= 0;
+ m_max_page_index.m_u32= 0;
+
+ for (i=0 ; i < PFS_PAGE_COUNT; i++)
+ {
+ m_pages[i]= NULL;
+ }
+
+ if (max_size == 0)
+ {
+ /* No allocation. */
+ m_max_page_count= 0;
+ }
+ else if (max_size > 0)
+ {
+ if (max_size % PFS_PAGE_SIZE == 0)
+ {
+ m_max_page_count= max_size / PFS_PAGE_SIZE;
+ }
+ else
+ {
+ m_max_page_count= max_size / PFS_PAGE_SIZE + 1;
+ m_last_page_size= max_size % PFS_PAGE_SIZE;
+ }
+ /* Bounded allocation. */
+ m_full= false;
+
+ if (m_max_page_count > PFS_PAGE_COUNT)
+ {
+ m_max_page_count= PFS_PAGE_COUNT;
+ m_last_page_size= PFS_PAGE_SIZE;
+ }
+ }
+ else
+ {
+ /* max_size = -1 means unbounded allocation */
+ m_full= false;
+ }
+
+ assert(m_max_page_count <= PFS_PAGE_COUNT);
+ assert(0 < m_last_page_size);
+ assert(m_last_page_size <= PFS_PAGE_SIZE);
+
+ pthread_mutex_init(& m_critical_section, NULL);
+ return 0;
+ }
+
+ void cleanup()
+ {
+ int i;
+ array_type *page;
+
+ if (! m_initialized)
+ return;
+
+ pthread_mutex_lock(& m_critical_section);
+
+ for (i=0 ; i < PFS_PAGE_COUNT; i++)
+ {
+ page= m_pages[i];
+ if (page != NULL)
+ {
+ m_allocator->free_array(page);
+ delete page;
+ m_pages[i]= NULL;
+ }
+ }
+ pthread_mutex_unlock(& m_critical_section);
+
+ pthread_mutex_destroy(& m_critical_section);
+
+ m_initialized= false;
+ }
+
+ ulong get_row_count()
+ {
+ ulong page_count= PFS_atomic::load_u32(& m_max_page_index.m_u32);
+
+ return page_count * PFS_PAGE_SIZE;
+ }
+
+ ulong get_row_size() const
+ {
+ return sizeof(value_type);
+ }
+
+ ulong get_memory()
+ {
+ return get_row_count() * get_row_size();
+ }
+
+ value_type *allocate(pfs_dirty_state *dirty_state)
+ {
+ if (m_full)
+ {
+ m_lost++;
+ return NULL;
+ }
+
+ uint index;
+ uint monotonic;
+ uint monotonic_max;
+ uint current_page_count;
+ value_type *pfs;
+ array_type *array;
+
+ void *addr;
+ void * volatile * typed_addr;
+ void *ptr;
+
+ /*
+ 1: Try to find an available record within the existing pages
+ */
+ current_page_count= PFS_atomic::load_u32(& m_max_page_index.m_u32);
+
+ if (current_page_count != 0)
+ {
+ monotonic= PFS_atomic::load_u32(& m_monotonic.m_u32);
+ monotonic_max= monotonic + current_page_count;
+
+ while (monotonic < monotonic_max)
+ {
+ /*
+ Scan in the [0 .. current_page_count - 1] range,
+ in parallel with m_monotonic (see below)
+ */
+ index= monotonic % current_page_count;
+
+ /* Atomic Load, array= m_pages[index] */
+ addr= & m_pages[index];
+ typed_addr= static_cast<void * volatile *>(addr);
+ ptr= my_atomic_loadptr(typed_addr);
+ array= static_cast<array_type *>(ptr);
+
+ if (array != NULL)
+ {
+ pfs= array->allocate(dirty_state);
+ if (pfs != NULL)
+ {
+ /* Keep a pointer to the parent page, for deallocate(). */
+ pfs->m_page= reinterpret_cast<PFS_opaque_container_page *> (array);
+ return pfs;
+ }
+ }
+
+ /*
+ Parallel scans collaborate to increase
+ the common monotonic scan counter.
+
+ Note that when all the existing page are full,
+ one thread will eventually add a new page,
+ and cause m_max_page_index to increase,
+ which fools all the modulo logic for scans already in progress,
+ because the monotonic counter is not folded to the same place
+ (sometime modulo N, sometime modulo N+1).
+
+ This is actually ok: since all the pages are full anyway,
+ there is nothing to miss, so better increase the monotonic
+ counter faster and then move on to the detection of new pages,
+ in part 2: below.
+ */
+ monotonic= PFS_atomic::add_u32(& m_monotonic.m_u32, 1);
+ };
+ }
+
+ /*
+ 2: Try to add a new page, beyond the m_max_page_index limit
+ */
+ while (current_page_count < m_max_page_count)
+ {
+ /* Peek for pages added by collaborating threads */
+
+ /* (2-a) Atomic Load, array= m_pages[current_page_count] */
+ addr= & m_pages[current_page_count];
+ typed_addr= static_cast<void * volatile *>(addr);
+ ptr= my_atomic_loadptr(typed_addr);
+ array= static_cast<array_type *>(ptr);
+
+ if (array == NULL)
+ {
+ // ==================================================================
+ // BEGIN CRITICAL SECTION -- buffer expand
+ // ==================================================================
+
+ /*
+ On a fresh started server, buffers are typically empty.
+ When a sudden load spike is seen by the server,
+ multiple threads may want to expand the buffer at the same time.
+
+ Using a compare and swap to allow multiple pages to be added,
+ possibly freeing duplicate pages on collisions,
+ does not work well because the amount of code involved
+ when creating a new page can be significant (PFS_thread),
+ causing MANY collisions between (2-b) and (2-d).
+
+ A huge number of collisions (which can happen when thousands
+ of new connections hits the server after a restart)
+ leads to a huge memory consumption, and to OOM.
+
+ To mitigate this, we use here a mutex,
+ to enforce that only ONE page is added at a time,
+ so that scaling the buffer happens in a predictable
+ and controlled manner.
+ */
+ pthread_mutex_lock(& m_critical_section);
+
+ /*
+ Peek again for pages added by collaborating threads,
+ this time as the only thread allowed to expand the buffer
+ */
+
+ /* (2-b) Atomic Load, array= m_pages[current_page_count] */
+
+ ptr= my_atomic_loadptr(typed_addr);
+ array= static_cast<array_type *>(ptr);
+
+ if (array == NULL)
+ {
+ /* (2-c) Found no page, allocate a new one */
+ array= new array_type();
+ builtin_memory_scalable_buffer.count_alloc(sizeof (array_type));
+
+ array->m_max= get_page_logical_size(current_page_count);
+ int rc= m_allocator->alloc_array(array);
+ if (rc != 0)
+ {
+ m_allocator->free_array(array);
+ delete array;
+ builtin_memory_scalable_buffer.count_free(sizeof (array_type));
+ m_lost++;
+ pthread_mutex_unlock(& m_critical_section);
+ return NULL;
+ }
+
+ /* Keep a pointer to this container, for static_deallocate(). */
+ array->m_container= reinterpret_cast<PFS_opaque_container *> (this);
+
+ /* (2-d) Atomic STORE, m_pages[current_page_count] = array */
+ ptr= array;
+ my_atomic_storeptr(typed_addr, ptr);
+
+ /* Advertise the new page */
+ PFS_atomic::add_u32(& m_max_page_index.m_u32, 1);
+ }
+
+ pthread_mutex_unlock(& m_critical_section);
+
+ // ==================================================================
+ // END CRITICAL SECTION -- buffer expand
+ // ==================================================================
+ }
+
+ assert(array != NULL);
+ pfs= array->allocate(dirty_state);
+ if (pfs != NULL)
+ {
+ /* Keep a pointer to the parent page, for deallocate(). */
+ pfs->m_page= reinterpret_cast<PFS_opaque_container_page *> (array);
+ return pfs;
+ }
+
+ current_page_count++;
+ }
+
+ m_lost++;
+ m_full= true;
+ return NULL;
+ }
+
+ void deallocate(value_type *safe_pfs)
+ {
+ /* Find the containing page */
+ PFS_opaque_container_page *opaque_page= safe_pfs->m_page;
+ array_type *page= reinterpret_cast<array_type *> (opaque_page);
+
+ /* Mark the object free */
+ safe_pfs->m_lock.allocated_to_free();
+
+ /* Flag the containing page as not full. */
+ page->m_full= false;
+
+ /* Flag the overall container as not full. */
+ m_full= false;
+ }
+
+ static void static_deallocate(value_type *safe_pfs)
+ {
+ /* Find the containing page */
+ PFS_opaque_container_page *opaque_page= safe_pfs->m_page;
+ array_type *page= reinterpret_cast<array_type *> (opaque_page);
+
+ /* Mark the object free */
+ safe_pfs->m_lock.allocated_to_free();
+
+ /* Flag the containing page as not full. */
+ page->m_full= false;
+
+ /* Find the containing buffer */
+ PFS_opaque_container *opaque_container= page->m_container;
+ PFS_buffer_scalable_container *container;
+ container= reinterpret_cast<container_type *> (opaque_container);
+
+ /* Flag the overall container as not full. */
+ container->m_full= false;
+ }
+
+ iterator_type iterate()
+ {
+ return PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>(this, 0);
+ }
+
+ iterator_type iterate(uint index)
+ {
+ assert(index <= m_max);
+ return PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>(this, index);
+ }
+
+ void apply(function_type fct)
+ {
+ uint i;
+ array_type *page;
+ value_type *pfs;
+ value_type *pfs_last;
+
+ for (i=0 ; i < PFS_PAGE_COUNT; i++)
+ {
+ page= m_pages[i];
+ if (page != NULL)
+ {
+ pfs= page->get_first();
+ pfs_last= page->get_last();
+
+ while (pfs < pfs_last)
+ {
+ if (pfs->m_lock.is_populated())
+ {
+ fct(pfs);
+ }
+ pfs++;
+ }
+ }
+ }
+ }
+
+ void apply_all(function_type fct)
+ {
+ uint i;
+ array_type *page;
+ value_type *pfs;
+ value_type *pfs_last;
+
+ for (i=0 ; i < PFS_PAGE_COUNT; i++)
+ {
+ page= m_pages[i];
+ if (page != NULL)
+ {
+ pfs= page->get_first();
+ pfs_last= page->get_last();
+
+ while (pfs < pfs_last)
+ {
+ fct(pfs);
+ pfs++;
+ }
+ }
+ }
+ }
+
+ void apply(processor_type & proc)
+ {
+ uint i;
+ array_type *page;
+ value_type *pfs;
+ value_type *pfs_last;
+
+ for (i=0 ; i < PFS_PAGE_COUNT; i++)
+ {
+ page= m_pages[i];
+ if (page != NULL)
+ {
+ pfs= page->get_first();
+ pfs_last= page->get_last();
+
+ while (pfs < pfs_last)
+ {
+ if (pfs->m_lock.is_populated())
+ {
+ proc(pfs);
+ }
+ pfs++;
+ }
+ }
+ }
+ }
+
+ void apply_all(processor_type & proc)
+ {
+ uint i;
+ array_type *page;
+ value_type *pfs;
+ value_type *pfs_last;
+
+ for (i=0 ; i < PFS_PAGE_COUNT; i++)
+ {
+ page= m_pages[i];
+ if (page != NULL)
+ {
+ pfs= page->get_first();
+ pfs_last= page->get_last();
+
+ while (pfs < pfs_last)
+ {
+ proc(pfs);
+ pfs++;
+ }
+ }
+ }
+ }
+
+ value_type* get(uint index)
+ {
+ assert(index < m_max);
+
+ uint index_1= index / PFS_PAGE_SIZE;
+ array_type *page= m_pages[index_1];
+ if (page != NULL)
+ {
+ uint index_2= index % PFS_PAGE_SIZE;
+
+ if (index_2 >= page->m_max)
+ {
+ return NULL;
+ }
+
+ value_type *pfs= page->m_ptr + index_2;
+
+ if (pfs->m_lock.is_populated())
+ {
+ return pfs;
+ }
+ }
+
+ return NULL;
+ }
+
+ value_type* get(uint index, bool *has_more)
+ {
+ if (index >= m_max)
+ {
+ *has_more= false;
+ return NULL;
+ }
+
+ uint index_1= index / PFS_PAGE_SIZE;
+ array_type *page= m_pages[index_1];
+
+ if (page == NULL)
+ {
+ *has_more= false;
+ return NULL;
+ }
+
+ uint index_2= index % PFS_PAGE_SIZE;
+
+ if (index_2 >= page->m_max)
+ {
+ *has_more= false;
+ return NULL;
+ }
+
+ *has_more= true;
+ value_type *pfs= page->m_ptr + index_2;
+
+ if (pfs->m_lock.is_populated())
+ {
+ return pfs;
+ }
+
+ return NULL;
+ }
+
+ value_type *sanitize(value_type *unsafe)
+ {
+ intptr offset;
+ uint i;
+ array_type *page;
+ value_type *pfs;
+ value_type *pfs_last;
+
+ for (i=0 ; i < PFS_PAGE_COUNT; i++)
+ {
+ page= m_pages[i];
+ if (page != NULL)
+ {
+ pfs= page->get_first();
+ pfs_last= page->get_last();
+
+ if ((pfs <= unsafe) &&
+ (unsafe < pfs_last))
+ {
+ offset= ((intptr) unsafe - (intptr) pfs) % sizeof(value_type);
+ if (offset == 0)
+ return unsafe;
+ }
+ }
+ }
+
+ return NULL;
+ }
+
+ ulong m_lost;
+
+private:
+
+ uint get_page_logical_size(uint page_index)
+ {
+ if (page_index + 1 < m_max_page_count)
+ return PFS_PAGE_SIZE;
+ assert(page_index + 1 == m_max_page_count);
+ return m_last_page_size;
+ }
+
+ value_type* scan_next(uint & index, uint * found_index)
+ {
+ assert(index <= m_max);
+
+ uint index_1= index / PFS_PAGE_SIZE;
+ uint index_2= index % PFS_PAGE_SIZE;
+ array_type *page;
+ value_type *pfs_first;
+ value_type *pfs;
+ value_type *pfs_last;
+
+ while (index_1 < PFS_PAGE_COUNT)
+ {
+ page= m_pages[index_1];
+
+ if (page == NULL)
+ {
+ index= static_cast<uint>(m_max);
+ return NULL;
+ }
+
+ pfs_first= page->get_first();
+ pfs= pfs_first + index_2;
+ pfs_last= page->get_last();
+
+ while (pfs < pfs_last)
+ {
+ if (pfs->m_lock.is_populated())
+ {
+ uint found= index_1 * PFS_PAGE_SIZE + static_cast<uint>(pfs - pfs_first);
+ *found_index= found;
+ index= found + 1;
+ return pfs;
+ }
+ pfs++;
+ }
+
+ index_1++;
+ index_2= 0;
+ }
+
+ index= static_cast<uint>(m_max);
+ return NULL;
+ }
+
+ bool m_initialized;
+ bool m_full;
+ size_t m_max;
+ PFS_cacheline_uint32 m_monotonic;
+ PFS_cacheline_uint32 m_max_page_index;
+ ulong m_max_page_count;
+ ulong m_last_page_size;
+ array_type * m_pages[PFS_PAGE_COUNT];
+ allocator_type *m_allocator;
+ pthread_mutex_t m_critical_section;
+};
+
+template <class T, class U, class V>
+class PFS_buffer_iterator
+{
+ friend class PFS_buffer_container<T, U, V>;
+
+ typedef T value_type;
+ typedef PFS_buffer_container<T, U, V> container_type;
+
+public:
+ value_type* scan_next()
+ {
+ uint unused;
+ return m_container->scan_next(m_index, & unused);
+ }
+
+ value_type* scan_next(uint * found_index)
+ {
+ return m_container->scan_next(m_index, found_index);
+ }
+
+private:
+ PFS_buffer_iterator(container_type *container, uint index)
+ : m_container(container),
+ m_index(index)
+ {}
+
+ container_type *m_container;
+ uint m_index;
+};
+
+template <class T, int page_size, int page_count, class U, class V>
+class PFS_buffer_scalable_iterator
+{
+ friend class PFS_buffer_scalable_container<T, page_size, page_count, U, V>;
+
+ typedef T value_type;
+ typedef PFS_buffer_scalable_container<T, page_size, page_count, U, V> container_type;
+
+public:
+ value_type* scan_next()
+ {
+ uint unused;
+ return m_container->scan_next(m_index, & unused);
+ }
+
+ value_type* scan_next(uint * found_index)
+ {
+ return m_container->scan_next(m_index, found_index);
+ }
+
+private:
+ PFS_buffer_scalable_iterator(container_type *container, uint index)
+ : m_container(container),
+ m_index(index)
+ {}
+
+ container_type *m_container;
+ uint m_index;
+};
+
+template <class T>
+class PFS_buffer_processor
+{
+public:
+ virtual ~PFS_buffer_processor<T> ()
+ {}
+ virtual void operator()(T *element) = 0;
+};
+
+template <class B, int PFS_PARTITION_COUNT>
+class PFS_partitioned_buffer_scalable_container
+{
+public:
+ friend class PFS_partitioned_buffer_scalable_iterator<B, PFS_PARTITION_COUNT>;
+
+ typedef typename B::value_type value_type;
+ typedef typename B::allocator_type allocator_type;
+ typedef PFS_partitioned_buffer_scalable_iterator<B, PFS_PARTITION_COUNT> iterator_type;
+ typedef typename B::iterator_type sub_iterator_type;
+ typedef typename B::processor_type processor_type;
+ typedef typename B::function_type function_type;
+
+ PFS_partitioned_buffer_scalable_container(allocator_type *allocator)
+ {
+ for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
+ {
+ m_partitions[i]= new B(allocator);
+ }
+ }
+
+ ~PFS_partitioned_buffer_scalable_container()
+ {
+ for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
+ {
+ delete m_partitions[i];
+ }
+ }
+
+ int init(long max_size)
+ {
+ int rc= 0;
+ // FIXME: we have max_size * PFS_PARTITION_COUNT here
+ for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
+ {
+ rc|= m_partitions[i]->init(max_size);
+ }
+ return rc;
+ }
+
+ void cleanup()
+ {
+ for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
+ {
+ m_partitions[i]->cleanup();
+ }
+ }
+
+ ulong get_row_count() const
+ {
+ ulong sum= 0;
+
+ for (int i=0; i < PFS_PARTITION_COUNT; i++)
+ {
+ sum += m_partitions[i]->get_row_count();
+ }
+
+ return sum;
+ }
+
+ ulong get_row_size() const
+ {
+ return sizeof(value_type);
+ }
+
+ ulong get_memory() const
+ {
+ ulong sum= 0;
+
+ for (int i=0; i < PFS_PARTITION_COUNT; i++)
+ {
+ sum += m_partitions[i]->get_memory();
+ }
+
+ return sum;
+ }
+
+ long get_lost_counter()
+ {
+ long sum= 0;
+
+ for (int i=0; i < PFS_PARTITION_COUNT; i++)
+ {
+ sum += m_partitions[i]->m_lost;
+ }
+
+ return sum;
+ }
+
+ value_type *allocate(pfs_dirty_state *dirty_state, uint partition)
+ {
+ assert(partition < PFS_PARTITION_COUNT);
+
+ return m_partitions[partition]->allocate(dirty_state);
+ }
+
+ void deallocate(value_type *safe_pfs)
+ {
+ /*
+ One issue here is that we do not know which partition
+ the record belongs to.
+ Each record points to the parent page,
+ and each page points to the parent buffer,
+ so using static_deallocate here,
+ which will find the correct partition by itself.
+ */
+ B::static_deallocate(safe_pfs);
+ }
+
+ iterator_type iterate()
+ {
+ return iterator_type(this, 0, 0);
+ }
+
+ iterator_type iterate(uint user_index)
+ {
+ uint partition_index;
+ uint sub_index;
+ unpack_index(user_index, &partition_index, &sub_index);
+ return iterator_type(this, partition_index, sub_index);
+ }
+
+ void apply(function_type fct)
+ {
+ for (int i=0; i < PFS_PARTITION_COUNT; i++)
+ {
+ m_partitions[i]->apply(fct);
+ }
+ }
+
+ void apply_all(function_type fct)
+ {
+ for (int i=0; i < PFS_PARTITION_COUNT; i++)
+ {
+ m_partitions[i]->apply_all(fct);
+ }
+ }
+
+ void apply(processor_type & proc)
+ {
+ for (int i=0; i < PFS_PARTITION_COUNT; i++)
+ {
+ m_partitions[i]->apply(proc);
+ }
+ }
+
+ void apply_all(processor_type & proc)
+ {
+ for (int i=0; i < PFS_PARTITION_COUNT; i++)
+ {
+ m_partitions[i]->apply_all(proc);
+ }
+ }
+
+ value_type* get(uint user_index)
+ {
+ uint partition_index;
+ uint sub_index;
+ unpack_index(user_index, &partition_index, &sub_index);
+
+ if (partition_index >= PFS_PARTITION_COUNT)
+ {
+ return NULL;
+ }
+
+ return m_partitions[partition_index]->get(sub_index);
+ }
+
+ value_type* get(uint user_index, bool *has_more)
+ {
+ uint partition_index;
+ uint sub_index;
+ unpack_index(user_index, &partition_index, &sub_index);
+
+ if (partition_index >= PFS_PARTITION_COUNT)
+ {
+ *has_more= false;
+ return NULL;
+ }
+
+ *has_more= true;
+ return m_partitions[partition_index]->get(sub_index);
+ }
+
+ value_type *sanitize(value_type *unsafe)
+ {
+ value_type *safe= NULL;
+
+ for (int i=0; i < PFS_PARTITION_COUNT; i++)
+ {
+ safe= m_partitions[i]->sanitize(unsafe);
+ if (safe != NULL)
+ {
+ return safe;
+ }
+ }
+
+ return safe;
+ }
+
+private:
+ static void pack_index(uint partition_index, uint sub_index, uint *user_index)
+ {
+ /* 2^8 = 256 partitions max */
+ compile_time_assert(PFS_PARTITION_COUNT <= (1 << 8));
+ /* 2^24 = 16777216 max per partitioned buffer. */
+ compile_time_assert((B::MAX_SIZE) <= (1 << 24));
+
+ *user_index= (partition_index << 24) + sub_index;
+ }
+
+ static void unpack_index(uint user_index, uint *partition_index, uint *sub_index)
+ {
+ *partition_index= user_index >> 24;
+ *sub_index= user_index & 0x00FFFFFF;
+ }
+
+ value_type* scan_next(uint & partition_index, uint & sub_index, uint * found_partition, uint * found_sub_index)
+ {
+ value_type *record= NULL;
+ assert(partition_index < PFS_PARTITION_COUNT);
+
+ while (partition_index < PFS_PARTITION_COUNT)
+ {
+ sub_iterator_type sub_iterator= m_partitions[partition_index]->iterate(sub_index);
+ record= sub_iterator.scan_next(found_sub_index);
+ if (record != NULL)
+ {
+ *found_partition= partition_index;
+ sub_index= *found_sub_index + 1;
+ return record;
+ }
+
+ partition_index++;
+ sub_index= 0;
+ }
+
+ *found_partition= PFS_PARTITION_COUNT;
+ *found_sub_index= 0;
+ sub_index= 0;
+ return NULL;
+ }
+
+ B *m_partitions[PFS_PARTITION_COUNT];
+};
+
+template <class B, int PFS_PARTITION_COUNT>
+class PFS_partitioned_buffer_scalable_iterator
+{
+public:
+ friend class PFS_partitioned_buffer_scalable_container<B, PFS_PARTITION_COUNT>;
+
+ typedef typename B::value_type value_type;
+ typedef PFS_partitioned_buffer_scalable_container<B, PFS_PARTITION_COUNT> container_type;
+
+ value_type* scan_next()
+ {
+ uint unused_partition;
+ uint unused_sub_index;
+ return m_container->scan_next(m_partition, m_sub_index, & unused_partition, & unused_sub_index);
+ }
+
+ value_type* scan_next(uint *found_user_index)
+ {
+ uint found_partition;
+ uint found_sub_index;
+ value_type *record;
+ record= m_container->scan_next(m_partition, m_sub_index, &found_partition, &found_sub_index);
+ container_type::pack_index(found_partition, found_sub_index, found_user_index);
+ return record;
+ }
+
+private:
+ PFS_partitioned_buffer_scalable_iterator(container_type *container, uint partition, uint sub_index)
+ : m_container(container),
+ m_partition(partition),
+ m_sub_index(sub_index)
+ {}
+
+ container_type *m_container;
+ uint m_partition;
+ uint m_sub_index;
+};
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_mutex, 1024, 1024> PFS_mutex_basic_container;
+typedef PFS_partitioned_buffer_scalable_container<PFS_mutex_basic_container, PSI_COUNT_VOLATILITY> PFS_mutex_container;
+#else
+typedef PFS_buffer_container<PFS_mutex> PFS_mutex_container;
+#endif
+typedef PFS_mutex_container::iterator_type PFS_mutex_iterator;
+extern PFS_mutex_container global_mutex_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_rwlock, 1024, 1024> PFS_rwlock_container;
+#else
+typedef PFS_buffer_container<PFS_rwlock> PFS_rwlock_container;
+#endif
+typedef PFS_rwlock_container::iterator_type PFS_rwlock_iterator;
+extern PFS_rwlock_container global_rwlock_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_cond, 256, 256> PFS_cond_container;
+#else
+typedef PFS_buffer_container<PFS_cond> PFS_cond_container;
+#endif
+typedef PFS_cond_container::iterator_type PFS_cond_iterator;
+extern PFS_cond_container global_cond_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_file, 4 * 1024, 4 * 1024> PFS_file_container;
+#else
+typedef PFS_buffer_container<PFS_file> PFS_file_container;
+#endif
+typedef PFS_file_container::iterator_type PFS_file_iterator;
+extern PFS_file_container global_file_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_socket, 256, 256> PFS_socket_container;
+#else
+typedef PFS_buffer_container<PFS_socket> PFS_socket_container;
+#endif
+typedef PFS_socket_container::iterator_type PFS_socket_iterator;
+extern PFS_socket_container global_socket_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_metadata_lock, 1024, 1024> PFS_mdl_container;
+#else
+typedef PFS_buffer_container<PFS_metadata_lock> PFS_mdl_container;
+#endif
+typedef PFS_mdl_container::iterator_type PFS_mdl_iterator;
+extern PFS_mdl_container global_mdl_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_setup_actor, 128, 1024> PFS_setup_actor_container;
+#else
+typedef PFS_buffer_container<PFS_setup_actor> PFS_setup_actor_container;
+#endif
+typedef PFS_setup_actor_container::iterator_type PFS_setup_actor_iterator;
+extern PFS_setup_actor_container global_setup_actor_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_setup_object, 128, 1024> PFS_setup_object_container;
+#else
+typedef PFS_buffer_container<PFS_setup_object> PFS_setup_object_container;
+#endif
+typedef PFS_setup_object_container::iterator_type PFS_setup_object_iterator;
+extern PFS_setup_object_container global_setup_object_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_table, 1024, 1024> PFS_table_container;
+#else
+typedef PFS_buffer_container<PFS_table> PFS_table_container;
+#endif
+typedef PFS_table_container::iterator_type PFS_table_iterator;
+extern PFS_table_container global_table_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_table_share, 4 * 1024, 4 * 1024> PFS_table_share_container;
+#else
+typedef PFS_buffer_container<PFS_table_share> PFS_table_share_container;
+#endif
+typedef PFS_table_share_container::iterator_type PFS_table_share_iterator;
+extern PFS_table_share_container global_table_share_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_table_share_index, 8 * 1024, 8 * 1024> PFS_table_share_index_container;
+#else
+typedef PFS_buffer_container<PFS_table_share_index> PFS_table_share_index_container;
+#endif
+typedef PFS_table_share_index_container::iterator_type PFS_table_share_index_iterator;
+extern PFS_table_share_index_container global_table_share_index_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_table_share_lock, 4 * 1024, 4 * 1024> PFS_table_share_lock_container;
+#else
+typedef PFS_buffer_container<PFS_table_share_lock> PFS_table_share_lock_container;
+#endif
+typedef PFS_table_share_lock_container::iterator_type PFS_table_share_lock_iterator;
+extern PFS_table_share_lock_container global_table_share_lock_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_program, 1024, 1024> PFS_program_container;
+#else
+typedef PFS_buffer_container<PFS_program> PFS_program_container;
+#endif
+typedef PFS_program_container::iterator_type PFS_program_iterator;
+extern PFS_program_container global_program_container;
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_prepared_stmt, 1024, 1024> PFS_prepared_stmt_container;
+#else
+typedef PFS_buffer_container<PFS_prepared_stmt> PFS_prepared_stmt_container;
+#endif
+typedef PFS_prepared_stmt_container::iterator_type PFS_prepared_stmt_iterator;
+extern PFS_prepared_stmt_container global_prepared_stmt_container;
+
+class PFS_account_array : public PFS_buffer_default_array<PFS_account>
+{
+public:
+ PFS_single_stat *m_instr_class_waits_array;
+ PFS_stage_stat *m_instr_class_stages_array;
+ PFS_statement_stat *m_instr_class_statements_array;
+ PFS_transaction_stat *m_instr_class_transactions_array;
+ PFS_memory_stat *m_instr_class_memory_array;
+};
+
+class PFS_account_allocator
+{
+public:
+ int alloc_array(PFS_account_array *array);
+ void free_array(PFS_account_array *array);
+};
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_account,
+ 128,
+ 128,
+ PFS_account_array,
+ PFS_account_allocator> PFS_account_container;
+#else
+typedef PFS_buffer_container<PFS_account,
+ PFS_account_array,
+ PFS_account_allocator> PFS_account_container;
+#endif
+typedef PFS_account_container::iterator_type PFS_account_iterator;
+extern PFS_account_container global_account_container;
+
+class PFS_host_array : public PFS_buffer_default_array<PFS_host>
+{
+public:
+ PFS_single_stat *m_instr_class_waits_array;
+ PFS_stage_stat *m_instr_class_stages_array;
+ PFS_statement_stat *m_instr_class_statements_array;
+ PFS_transaction_stat *m_instr_class_transactions_array;
+ PFS_memory_stat *m_instr_class_memory_array;
+};
+
+class PFS_host_allocator
+{
+public:
+ int alloc_array(PFS_host_array *array);
+ void free_array(PFS_host_array *array);
+};
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_host,
+ 128,
+ 128,
+ PFS_host_array,
+ PFS_host_allocator> PFS_host_container;
+#else
+typedef PFS_buffer_container<PFS_host,
+ PFS_host_array,
+ PFS_host_allocator> PFS_host_container;
+#endif
+typedef PFS_host_container::iterator_type PFS_host_iterator;
+extern PFS_host_container global_host_container;
+
+class PFS_thread_array : public PFS_buffer_default_array<PFS_thread>
+{
+public:
+ PFS_single_stat *m_instr_class_waits_array;
+ PFS_stage_stat *m_instr_class_stages_array;
+ PFS_statement_stat *m_instr_class_statements_array;
+ PFS_transaction_stat *m_instr_class_transactions_array;
+ PFS_memory_stat *m_instr_class_memory_array;
+
+ PFS_events_waits *m_waits_history_array;
+ PFS_events_stages *m_stages_history_array;
+ PFS_events_statements *m_statements_history_array;
+ PFS_events_statements *m_statements_stack_array;
+ PFS_events_transactions *m_transactions_history_array;
+ char *m_session_connect_attrs_array;
+
+ char *m_current_stmts_text_array;
+ char *m_history_stmts_text_array;
+ unsigned char *m_current_stmts_digest_token_array;
+ unsigned char *m_history_stmts_digest_token_array;
+};
+
+class PFS_thread_allocator
+{
+public:
+ int alloc_array(PFS_thread_array *array);
+ void free_array(PFS_thread_array *array);
+};
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_thread,
+ 256,
+ 256,
+ PFS_thread_array,
+ PFS_thread_allocator> PFS_thread_container;
+#else
+typedef PFS_buffer_container<PFS_thread,
+ PFS_thread_array,
+ PFS_thread_allocator> PFS_thread_container;
+#endif
+typedef PFS_thread_container::iterator_type PFS_thread_iterator;
+extern PFS_thread_container global_thread_container;
+
+class PFS_user_array : public PFS_buffer_default_array<PFS_user>
+{
+public:
+ PFS_single_stat *m_instr_class_waits_array;
+ PFS_stage_stat *m_instr_class_stages_array;
+ PFS_statement_stat *m_instr_class_statements_array;
+ PFS_transaction_stat *m_instr_class_transactions_array;
+ PFS_memory_stat *m_instr_class_memory_array;
+};
+
+class PFS_user_allocator
+{
+public:
+ int alloc_array(PFS_user_array *array);
+ void free_array(PFS_user_array *array);
+};
+
+#ifdef USE_SCALABLE
+typedef PFS_buffer_scalable_container<PFS_user,
+ 128,
+ 128,
+ PFS_user_array,
+ PFS_user_allocator> PFS_user_container;
+#else
+typedef PFS_buffer_container<PFS_user,
+ PFS_user_array,
+ PFS_user_allocator> PFS_user_container;
+#endif
+typedef PFS_user_container::iterator_type PFS_user_iterator;
+extern PFS_user_container global_user_container;
+
+#endif
+