summaryrefslogtreecommitdiffstats
path: root/tpool/tpool_structs.h
diff options
context:
space:
mode:
Diffstat (limited to 'tpool/tpool_structs.h')
-rw-r--r--tpool/tpool_structs.h440
1 files changed, 440 insertions, 0 deletions
diff --git a/tpool/tpool_structs.h b/tpool/tpool_structs.h
new file mode 100644
index 00000000..099ae5c7
--- /dev/null
+++ b/tpool/tpool_structs.h
@@ -0,0 +1,440 @@
+/* Copyright(C) 2019, 20222, MariaDB Corporation.
+
+This program is free software; you can redistribute itand /or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/
+
+#pragma once
+#include <my_global.h>
+#include <my_pthread.h>
+#include <vector>
+#include <stack>
+#include <assert.h>
+#include <algorithm>
+
+/* Suppress TSAN warnings, that we believe are not critical. */
+#if defined(__has_feature)
+#define TPOOL_HAS_FEATURE(...) __has_feature(__VA_ARGS__)
+#else
+#define TPOOL_HAS_FEATURE(...) 0
+#endif
+
+#if TPOOL_HAS_FEATURE(address_sanitizer)
+#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize("thread"),noinline))
+#elif defined(__GNUC__) && defined (__SANITIZE_THREAD__)
+#define TPOOL_SUPPRESS_TSAN __attribute__((no_sanitize_thread,noinline))
+#else
+#define TPOOL_SUPPRESS_TSAN
+#endif
+
+#ifdef HAVE_PSI_INTERFACE
+typedef unsigned int mysql_pfs_key_t;
+extern mysql_pfs_key_t tpool_cache_mutex_key;
+#endif
+
+namespace tpool
+{
+
+/**
+ Generic "pointer" cache of a fixed size
+ with fast put/get operations.
+
+ Compared to STL containers,e.g stack or queue
+ is faster/does not do allocations.
+
+ However, get() operation will wait if there is no free items.
+
+ We assume that put() will only put back the elements that
+ were retrieved previously with get().
+*/
+template<typename T> class cache
+{
+ /** Protects updates of m_pos and m_cache members */
+ mysql_mutex_t m_mtx;
+
+ /**
+ Notify waiting threads about "cache full" or "cache not empty" conditions
+ @see get() and wait()
+ */
+ pthread_cond_t m_cv;
+
+ /** Cached items vector.Does not change after construction */
+ std::vector<T> m_base;
+
+ /**
+ Pointers to cached items. Protected by m_mtx. Does not grow after
+ construction. Elements in position [0,m_pos-1] are "borrowed",
+ elements in position [m_pos,capacity()-1] are "free"
+ */
+ std::vector<T*> m_cache;
+
+ /** Number of threads waiting for "cache full" condition (s. wait())
+ Protected by m_mtx */
+ int m_waiters;
+
+ /** Current cache size. Protected by m_mtx*/
+ size_t m_pos;
+
+private:
+
+ inline size_t capacity()
+ {
+ return m_base.size();
+ }
+
+ /**
+ @return true if cache is full (no items are borrowed)
+ */
+ bool is_full()
+ {
+ return m_pos == 0;
+ }
+
+ /**
+ @return true if cache is empty (all items are borrowed)
+ */
+ bool is_empty()
+ {
+ return m_pos == capacity();
+ }
+
+public:
+ /**
+ Constructor
+ @param size - maximum number of items in cache
+ */
+ cache(size_t size) : m_base(size), m_cache(size),
+ m_waiters(), m_pos(0)
+ {
+ mysql_mutex_init(tpool_cache_mutex_key, &m_mtx, nullptr);
+ pthread_cond_init(&m_cv, nullptr);
+
+ for(size_t i= 0 ; i < size; i++)
+ m_cache[i]= &m_base[i];
+ }
+
+ ~cache()
+ {
+ mysql_mutex_destroy(&m_mtx);
+ pthread_cond_destroy(&m_cv);
+ }
+
+ /**
+ Retrieve an item from cache. Waits for free item, if cache is
+ currently empty.
+ @return borrowed item
+ */
+ T* get()
+ {
+ mysql_mutex_lock(&m_mtx);
+ while (is_empty())
+ my_cond_wait(&m_cv, &m_mtx.m_mutex);
+ assert(m_pos < capacity());
+ // return last element
+ T *t= m_cache[m_pos++];
+ mysql_mutex_unlock(&m_mtx);
+ return t;
+ }
+
+ mysql_mutex_t &mutex() { return m_mtx; }
+
+ /**
+ Put back an element to cache.
+ @param ele element to put back
+ */
+ void put(T *ele)
+ {
+ mysql_mutex_lock(&m_mtx);
+ assert(!is_full());
+ // put element to the logical end of the array
+ m_cache[--m_pos] = ele;
+
+ /* Notify waiters when the cache becomes
+ not empty, or when it becomes full */
+ if (m_pos == 1 || (m_waiters && is_full()))
+ pthread_cond_broadcast(&m_cv);
+ mysql_mutex_unlock(&m_mtx);
+ }
+
+ /** Check if pointer represents cached element */
+ bool contains(T* ele)
+ {
+ // No locking required, m_base does not change after construction.
+ return ele >= &m_base[0] && ele <= &m_base[capacity() - 1];
+ }
+
+ /** Wait until cache is full
+ @param m cache mutex (locked) */
+ void wait(mysql_mutex_t &m)
+ {
+ mysql_mutex_assert_owner(&m);
+ m_waiters++;
+ while (!is_full())
+ my_cond_wait(&m_cv, &m.m_mutex);
+ m_waiters--;
+ }
+
+ /* Wait until cache is full.*/
+ void wait()
+ {
+ mysql_mutex_lock(&m_mtx);
+ wait(m_mtx);
+ mysql_mutex_unlock(&m_mtx);
+ }
+
+ /**
+ @return approximate number of "borrowed" items.
+ A "dirty" read, not used in any critical functionality.
+ */
+ TPOOL_SUPPRESS_TSAN size_t pos()
+ {
+ return m_pos;
+ }
+
+ void resize(size_t count)
+ {
+ mysql_mutex_assert_owner(&m_mtx);
+ assert(is_full());
+ m_base.resize(count);
+ m_cache.resize(count);
+ for (size_t i = 0; i < count; i++)
+ m_cache[i] = &m_base[i];
+ }
+};
+
+
+/**
+ Circular, fixed size queue
+ used for the task queue.
+
+ Compared to STL queue, this one is
+ faster, and does not do memory allocations
+*/
+template <typename T> class circular_queue
+{
+
+public:
+ circular_queue(size_t N = 16)
+ : m_capacity(N + 1), m_buffer(m_capacity), m_head(), m_tail()
+ {
+ }
+ bool empty() { return m_head == m_tail; }
+ bool full() { return (m_head + 1) % m_capacity == m_tail; }
+ void clear() { m_head = m_tail = 0; }
+ void resize(size_t new_size)
+ {
+ auto current_size = size();
+ if (new_size <= current_size)
+ return;
+ size_t new_capacity = new_size - 1;
+ std::vector<T> new_buffer(new_capacity);
+ /* Figure out faster way to copy*/
+ size_t i = 0;
+ while (!empty())
+ {
+ T& ele = front();
+ pop();
+ new_buffer[i++] = ele;
+ }
+ m_buffer = new_buffer;
+ m_capacity = new_capacity;
+ m_tail = 0;
+ m_head = current_size;
+ }
+ void push(T ele)
+ {
+ if (full())
+ {
+ assert(size() == m_capacity - 1);
+ resize(size() + 1024);
+ }
+ m_buffer[m_head] = ele;
+ m_head = (m_head + 1) % m_capacity;
+ }
+ void push_front(T ele)
+ {
+ if (full())
+ {
+ resize(size() + 1024);
+ }
+ if (m_tail == 0)
+ m_tail = m_capacity - 1;
+ else
+ m_tail--;
+ m_buffer[m_tail] = ele;
+ }
+ T& front()
+ {
+ assert(!empty());
+ return m_buffer[m_tail];
+ }
+ void pop()
+ {
+ assert(!empty());
+ m_tail = (m_tail + 1) % m_capacity;
+ }
+ size_t size()
+ {
+ if (m_head < m_tail)
+ {
+ return m_capacity - m_tail + m_head;
+ }
+ else
+ {
+ return m_head - m_tail;
+ }
+ }
+
+ /*Iterator over elements in queue.*/
+ class iterator
+ {
+ size_t m_pos;
+ circular_queue<T>* m_queue;
+ public:
+ explicit iterator(size_t pos , circular_queue<T>* q) : m_pos(pos), m_queue(q) {}
+ iterator& operator++()
+ {
+ m_pos= (m_pos + 1) % m_queue->m_capacity;
+ return *this;
+ }
+ iterator operator++(int)
+ {
+ iterator retval= *this;
+ ++*this;
+ return retval;
+ }
+ bool operator==(iterator other) const { return m_pos == other.m_pos; }
+ bool operator!=(iterator other) const { return !(*this == other); }
+ T& operator*() const { return m_queue->m_buffer[m_pos]; }
+ };
+
+ iterator begin() { return iterator(m_tail, this); }
+ iterator end() { return iterator(m_head, this); }
+private:
+ size_t m_capacity;
+ std::vector<T> m_buffer;
+ size_t m_head;
+ size_t m_tail;
+};
+
+/* Doubly linked list. Intrusive,
+ requires element to have m_next and m_prev pointers.
+*/
+template<typename T> class doubly_linked_list
+{
+public:
+ T* m_first;
+ T* m_last;
+ size_t m_count;
+ doubly_linked_list():m_first(),m_last(),m_count()
+ {}
+ void check()
+ {
+ assert(!m_first || !m_first->m_prev);
+ assert(!m_last || !m_last->m_next);
+ assert((!m_first && !m_last && m_count == 0)
+ || (m_first != 0 && m_last != 0 && m_count > 0));
+ T* current = m_first;
+ for(size_t i=1; i< m_count;i++)
+ {
+ current = current->m_next;
+ }
+ assert(current == m_last);
+ current = m_last;
+ for (size_t i = 1; i < m_count; i++)
+ {
+ current = current->m_prev;
+ }
+ assert(current == m_first);
+ }
+ T* front()
+ {
+ return m_first;
+ }
+ size_t size()
+ {
+ return m_count;
+ }
+ void push_back(T* ele)
+ {
+ ele->m_prev = m_last;
+ if (m_last)
+ m_last->m_next = ele;
+
+ ele->m_next = 0;
+ m_last = ele;
+ if (!m_first)
+ m_first = m_last;
+
+ m_count++;
+ }
+ T* back()
+ {
+ return m_last;
+ }
+ bool empty()
+ {
+ return m_count == 0;
+ }
+ void pop_back()
+ {
+ m_last = m_last->m_prev;
+ if (m_last)
+ m_last->m_next = 0;
+ else
+ m_first = 0;
+ m_count--;
+ }
+ bool contains(T* ele)
+ {
+ if (!ele)
+ return false;
+ T* current = m_first;
+ while(current)
+ {
+ if(current == ele)
+ return true;
+ current = current->m_next;
+ }
+ return false;
+ }
+
+ void erase(T* ele)
+ {
+ assert(contains(ele));
+
+ if (ele == m_first)
+ {
+ m_first = ele->m_next;
+ if (m_first)
+ m_first->m_prev = 0;
+ else
+ m_last = 0;
+ }
+ else if (ele == m_last)
+ {
+ assert(ele->m_prev);
+ m_last = ele->m_prev;
+ m_last->m_next = 0;
+ }
+ else
+ {
+ assert(ele->m_next);
+ assert(ele->m_prev);
+ ele->m_next->m_prev = ele->m_prev;
+ ele->m_prev->m_next = ele->m_next;
+ }
+ m_count--;
+ }
+};
+
+}