summaryrefslogtreecommitdiffstats
path: root/tpool/task_group.cc
blob: eb57a8bee37fd6846fca37df774ddb8b11bd79c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/* Copyright(C) 2019 MariaDB Corporation.

This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/

#include <tpool.h>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <tpool_structs.h>
#include <thread>
#include <assert.h>
#ifndef _WIN32
#include <unistd.h> // usleep
#endif
namespace tpool
{

  /**
    Task_group constructor

     @param max_threads - maximum number of threads allowed to execute
     tasks from the group at the same time.

     @param enable_task_release - if true (default), task::release() will be
     called after task execution.'false' should only be used in rare cases
     when accessing memory, pointed by task structures, would be unsafe after.
     the callback. Also 'false' is only possible ,if task::release() is a trivial function
  */
  task_group::task_group(unsigned int max_concurrency,
                       bool enable_task_release)
    :
    m_queue(8),
    m_mtx(),
    m_tasks_running(),
    m_max_concurrent_tasks(max_concurrency),
    m_enable_task_release(enable_task_release)
  {};

  void task_group::set_max_tasks(unsigned int max_concurrency)
  {
    std::unique_lock<std::mutex> lk(m_mtx);
    m_max_concurrent_tasks = max_concurrency;
  }
  void task_group::execute(task* t)
  {
    std::unique_lock<std::mutex> lk(m_mtx);
    if (m_tasks_running == m_max_concurrent_tasks)
    {
      /* Queue for later execution by another thread.*/
      m_queue.push(t);
      return;
    }
    m_tasks_running++;
    for (;;)
    {
      lk.unlock();
      if (t)
      {
        t->m_func(t->m_arg);
        if (m_enable_task_release)
          t->release();
      }
      lk.lock();

      if (m_queue.empty())
        break;
      t = m_queue.front();
      m_queue.pop();
    }
    m_tasks_running--;
  }

  void task_group::cancel_pending(task* t)
  {
    std::unique_lock<std::mutex> lk(m_mtx);
    if (!t)
      m_queue.clear();
    for (auto it = m_queue.begin(); it != m_queue.end(); it++)
    {
      if (*it == t)
      {
        (*it)->release();
        (*it) = nullptr;
      }
    }
  }

  task_group::~task_group()
  {
    std::unique_lock<std::mutex> lk(m_mtx);
    assert(m_queue.empty());

    while (m_tasks_running)
    {
      lk.unlock();
#ifndef _WIN32
      usleep(1000);
#else
      Sleep(1);
#endif
      lk.lock();
    }
  }
}