summaryrefslogtreecommitdiffstats
path: root/tpool/task_group.cc
blob: 97fbb0911c8d13c795b05d54f018c3009449b3ba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/* Copyright(C) 2019 MariaDB Corporation.

This program is free software; you can redistribute itand /or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111 - 1301 USA*/

#include <tpool.h>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <tpool_structs.h>
#include <thread>
#include <assert.h>
#ifndef _WIN32
#include <unistd.h> // usleep
#endif
namespace tpool
{
  task_group::task_group(unsigned int max_concurrency) :
    m_queue(8),
    m_mtx(),
    m_tasks_running(),
    m_max_concurrent_tasks(max_concurrency)
  {};

  void task_group::set_max_tasks(unsigned int max_concurrency)
  {
    std::unique_lock<std::mutex> lk(m_mtx);
    m_max_concurrent_tasks = max_concurrency;
  }
  void task_group::execute(task* t)
  {
    std::unique_lock<std::mutex> lk(m_mtx);
    if (m_tasks_running == m_max_concurrent_tasks)
    {
      /* Queue for later execution by another thread.*/
      m_queue.push(t);
      return;
    }
    m_tasks_running++;
    for (;;)
    {
      lk.unlock();
      if (t)
      {
        t->m_func(t->m_arg);
        dbug_execute_after_task_callback();
        t->release();
      }
      lk.lock();

      if (m_queue.empty())
        break;
      t = m_queue.front();
      m_queue.pop();
    }
    m_tasks_running--;
  }

  void task_group::cancel_pending(task* t)
  {
    std::unique_lock<std::mutex> lk(m_mtx);
    if (!t)
      m_queue.clear();
    for (auto it = m_queue.begin(); it != m_queue.end(); it++)
    {
      if (*it == t)
      {
        (*it)->release();
        (*it) = nullptr;
      }
    }
  }

  task_group::~task_group()
  {
    std::unique_lock<std::mutex> lk(m_mtx);
    assert(m_queue.empty());

    while (m_tasks_running)
    {
      lk.unlock();
#ifndef _WIN32
      usleep(1000);
#else
      Sleep(1);
#endif
      lk.lock();
    }
  }
}