summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/cpp/test/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/lib/cpp/test/concurrency')
-rw-r--r--src/jaegertracing/thrift/lib/cpp/test/concurrency/Tests.cpp224
-rw-r--r--src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadFactoryTests.h308
-rw-r--r--src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h639
-rw-r--r--src/jaegertracing/thrift/lib/cpp/test/concurrency/TimerManagerTests.h273
4 files changed, 1444 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/cpp/test/concurrency/Tests.cpp b/src/jaegertracing/thrift/lib/cpp/test/concurrency/Tests.cpp
new file mode 100644
index 000000000..8c734c2d5
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/cpp/test/concurrency/Tests.cpp
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <iostream>
+#include <vector>
+#include <string>
+
+#include "ThreadFactoryTests.h"
+#include "TimerManagerTests.h"
+#include "ThreadManagerTests.h"
+
+// The test weight, where 10 is 10 times more threads than baseline
+// and the baseline is optimized for running in valgrind
+static int WEIGHT = 10;
+
+int main(int argc, char** argv) {
+
+ std::vector<std::string> args(argc - 1 > 1 ? argc - 1 : 1);
+
+ args[0] = "all";
+
+ for (int ix = 1; ix < argc; ix++) {
+ args[ix - 1] = std::string(argv[ix]);
+ }
+
+ if (getenv("VALGRIND") != nullptr) {
+ // lower the scale of every test
+ WEIGHT = 1;
+ }
+
+ bool runAll = args[0].compare("all") == 0;
+
+ if (runAll || args[0].compare("thread-factory") == 0) {
+
+ ThreadFactoryTests threadFactoryTests;
+
+ std::cout << "ThreadFactory tests..." << std::endl;
+
+ int reapLoops = 2 * WEIGHT;
+ int reapCount = 100 * WEIGHT;
+ size_t floodLoops = 3;
+ size_t floodCount = 500 * WEIGHT;
+
+ std::cout << "\t\tThreadFactory reap N threads test: N = " << reapLoops << "x" << reapCount << std::endl;
+
+ if (!threadFactoryTests.reapNThreads(reapLoops, reapCount)) {
+ std::cerr << "\t\ttThreadFactory reap N threads FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tThreadFactory flood N threads test: N = " << floodLoops << "x" << floodCount << std::endl;
+
+ if (!threadFactoryTests.floodNTest(floodLoops, floodCount)) {
+ std::cerr << "\t\ttThreadFactory flood N threads FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
+
+ if (!threadFactoryTests.synchStartTest()) {
+ std::cerr << "\t\ttThreadFactory synchronous start FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tThreadFactory monitor timeout test" << std::endl;
+
+ if (!threadFactoryTests.monitorTimeoutTest()) {
+ std::cerr << "\t\ttThreadFactory monitor timeout FAILED" << std::endl;
+ return 1;
+ }
+ }
+
+ if (runAll || args[0].compare("util") == 0) {
+
+ std::cout << "Util tests..." << std::endl;
+
+ std::cout << "\t\tUtil minimum time" << std::endl;
+
+ int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
+
+ time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ time01 = time00;
+ size_t count = 0;
+
+ while (time01 < time00 + 10) {
+ count++;
+ time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ }
+
+ std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl;
+ }
+
+ if (runAll || args[0].compare("timer-manager") == 0) {
+
+ std::cout << "TimerManager tests..." << std::endl;
+
+ std::cout << "\t\tTimerManager test00" << std::endl;
+
+ TimerManagerTests timerManagerTests;
+
+ if (!timerManagerTests.test00()) {
+ std::cerr << "\t\tTimerManager tests FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tTimerManager test01" << std::endl;
+
+ if (!timerManagerTests.test01()) {
+ std::cerr << "\t\tTimerManager tests FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tTimerManager test02" << std::endl;
+
+ if (!timerManagerTests.test02()) {
+ std::cerr << "\t\tTimerManager tests FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tTimerManager test03" << std::endl;
+
+ if (!timerManagerTests.test03()) {
+ std::cerr << "\t\tTimerManager tests FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tTimerManager test04" << std::endl;
+
+ if (!timerManagerTests.test04()) {
+ std::cerr << "\t\tTimerManager tests FAILED" << std::endl;
+ return 1;
+ }
+ }
+
+ if (runAll || args[0].compare("thread-manager") == 0) {
+
+ std::cout << "ThreadManager tests..." << std::endl;
+
+ {
+ size_t workerCount = 10 * WEIGHT;
+ size_t taskCount = 500 * WEIGHT;
+ int64_t delay = 10LL;
+
+ ThreadManagerTests threadManagerTests;
+
+ std::cout << "\t\tThreadManager api test:" << std::endl;
+
+ if (!threadManagerTests.apiTest()) {
+ std::cerr << "\t\tThreadManager apiTest FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tThreadManager load test: worker count: " << workerCount
+ << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+ if (!threadManagerTests.loadTest(taskCount, delay, workerCount)) {
+ std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl;
+ return 1;
+ }
+
+ std::cout << "\t\tThreadManager block test: worker count: " << workerCount
+ << " delay: " << delay << std::endl;
+
+ if (!threadManagerTests.blockTest(delay, workerCount)) {
+ std::cerr << "\t\tThreadManager blockTest FAILED" << std::endl;
+ return 1;
+ }
+ }
+ }
+
+ if (runAll || args[0].compare("thread-manager-benchmark") == 0) {
+
+ std::cout << "ThreadManager benchmark tests..." << std::endl;
+
+ {
+
+ size_t minWorkerCount = 2;
+
+ size_t maxWorkerCount = 8;
+
+ size_t tasksPerWorker = 100 * WEIGHT;
+
+ int64_t delay = 5LL;
+
+ for (size_t workerCount = minWorkerCount; workerCount <= maxWorkerCount; workerCount *= 4) {
+
+ size_t taskCount = workerCount * tasksPerWorker;
+
+ std::cout << "\t\tThreadManager load test: worker count: " << workerCount
+ << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+ ThreadManagerTests threadManagerTests;
+
+ if (!threadManagerTests.loadTest(taskCount, delay, workerCount))
+ {
+ std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl;
+ return 1;
+ }
+ }
+ }
+ }
+
+ std::cout << "ALL TESTS PASSED" << std::endl;
+ return 0;
+}
diff --git a/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadFactoryTests.h b/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadFactoryTests.h
new file mode 100644
index 000000000..febe3f8b3
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadFactoryTests.h
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+#include <thrift/concurrency/Thread.h>
+#include <thrift/concurrency/ThreadFactory.h>
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Mutex.h>
+
+#include <assert.h>
+#include <iostream>
+#include <vector>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+namespace test {
+
+using std::shared_ptr;
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadFactoryTests {
+
+public:
+ /**
+ * Reap N threads
+ */
+ class ReapNTask : public Runnable {
+
+ public:
+ ReapNTask(Monitor& monitor, int& activeCount) : _monitor(monitor), _count(activeCount) {}
+
+ void run() override {
+ Synchronized s(_monitor);
+
+ if (--_count == 0) {
+ _monitor.notify();
+ }
+ }
+
+ Monitor& _monitor;
+ int& _count;
+ };
+
+ bool reapNThreads(int loop = 1, int count = 10) {
+
+ ThreadFactory threadFactory = ThreadFactory();
+ shared_ptr<Monitor> monitor(new Monitor);
+
+ for (int lix = 0; lix < loop; lix++) {
+
+ int activeCount = 0;
+
+ std::vector<shared_ptr<Thread> > threads;
+ int tix;
+
+ for (tix = 0; tix < count; tix++) {
+ try {
+ ++activeCount;
+ threads.push_back(
+ threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, activeCount))));
+ } catch (SystemResourceException& e) {
+ std::cout << "\t\t\tfailed to create " << lix* count + tix << " thread " << e.what()
+ << std::endl;
+ throw e;
+ }
+ }
+
+ tix = 0;
+ for (std::vector<shared_ptr<Thread> >::const_iterator thread = threads.begin();
+ thread != threads.end();
+ tix++, ++thread) {
+
+ try {
+ (*thread)->start();
+ } catch (SystemResourceException& e) {
+ std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what()
+ << std::endl;
+ throw e;
+ }
+ }
+
+ {
+ Synchronized s(*monitor);
+ while (activeCount > 0) {
+ monitor->wait(1000);
+ }
+ }
+
+ std::cout << "\t\t\treaped " << lix* count << " threads" << std::endl;
+ }
+
+ std::cout << "\t\t\tSuccess!" << std::endl;
+ return true;
+ }
+
+ class SynchStartTask : public Runnable {
+
+ public:
+ enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
+
+ SynchStartTask(Monitor& monitor, volatile STATE& state) : _monitor(monitor), _state(state) {}
+
+ void run() override {
+ {
+ Synchronized s(_monitor);
+ if (_state == SynchStartTask::STARTING) {
+ _state = SynchStartTask::STARTED;
+ _monitor.notify();
+ }
+ }
+
+ {
+ Synchronized s(_monitor);
+ while (_state == SynchStartTask::STARTED) {
+ _monitor.wait();
+ }
+
+ if (_state == SynchStartTask::STOPPING) {
+ _state = SynchStartTask::STOPPED;
+ _monitor.notifyAll();
+ }
+ }
+ }
+
+ private:
+ Monitor& _monitor;
+ volatile STATE& _state;
+ };
+
+ bool synchStartTest() {
+
+ Monitor monitor;
+
+ SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
+
+ shared_ptr<SynchStartTask> task
+ = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
+
+ ThreadFactory threadFactory = ThreadFactory();
+
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+ if (state == SynchStartTask::UNINITIALIZED) {
+
+ state = SynchStartTask::STARTING;
+
+ thread->start();
+ }
+
+ {
+ Synchronized s(monitor);
+ while (state == SynchStartTask::STARTING) {
+ monitor.wait();
+ }
+ }
+
+ assert(state != SynchStartTask::STARTING);
+
+ {
+ Synchronized s(monitor);
+
+ try {
+ monitor.wait(100);
+ } catch (TimedOutException&) {
+ }
+
+ if (state == SynchStartTask::STARTED) {
+
+ state = SynchStartTask::STOPPING;
+
+ monitor.notify();
+ }
+
+ while (state == SynchStartTask::STOPPING) {
+ monitor.wait();
+ }
+ }
+
+ assert(state == SynchStartTask::STOPPED);
+
+ bool success = true;
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
+
+ return true;
+ }
+
+ /**
+ * The only guarantee a monitor timeout can give you is that
+ * it will take "at least" as long as the timeout, no less.
+ * There is absolutely no guarantee around regaining execution
+ * near the timeout. On a busy system (like inside a third party
+ * CI environment) it could take quite a bit longer than the
+ * requested timeout, and that's ok.
+ */
+
+ bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) {
+
+ Monitor monitor;
+
+ int64_t startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ for (int64_t ix = 0; ix < count; ix++) {
+ {
+ Synchronized s(monitor);
+ try {
+ monitor.wait(timeout);
+ } catch (TimedOutException&) {
+ }
+ }
+ }
+
+ int64_t endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ bool success = (endTime - startTime) >= (count * timeout);
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure")
+ << ": minimum required time to elapse " << count * timeout
+ << "ms; actual elapsed time " << endTime - startTime << "ms"
+ << std::endl;
+
+ return success;
+ }
+
+ class FloodTask : public Runnable {
+ public:
+ FloodTask(const size_t id, Monitor& mon) : _id(id), _mon(mon) {}
+ ~FloodTask() override {
+ if (_id % 10000 == 0) {
+ Synchronized sync(_mon);
+ std::cout << "\t\tthread " << _id << " done" << std::endl;
+ }
+ }
+
+ void run() override {
+ if (_id % 10000 == 0) {
+ Synchronized sync(_mon);
+ std::cout << "\t\tthread " << _id << " started" << std::endl;
+ }
+ }
+ const size_t _id;
+ Monitor& _mon;
+ };
+
+ void foo(ThreadFactory* tf) { (void)tf; }
+
+ bool floodNTest(size_t loop = 1, size_t count = 100000) {
+
+ bool success = false;
+ Monitor mon;
+
+ for (size_t lix = 0; lix < loop; lix++) {
+
+ ThreadFactory threadFactory = ThreadFactory();
+ threadFactory.setDetached(true);
+
+ for (size_t tix = 0; tix < count; tix++) {
+
+ try {
+
+ shared_ptr<FloodTask> task(new FloodTask(lix * count + tix, mon));
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
+ thread->start();
+
+ } catch (TException& e) {
+
+ std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what()
+ << std::endl;
+
+ return success;
+ }
+ }
+
+ Synchronized sync(mon);
+ std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
+ success = true;
+ }
+
+ return success;
+ }
+};
+
+}
+}
+}
+} // apache::thrift::concurrency::test
diff --git a/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h b/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h
new file mode 100644
index 000000000..fee7c7c51
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h
@@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/thrift-config.h>
+#include <thrift/concurrency/ThreadManager.h>
+#include <thrift/concurrency/ThreadFactory.h>
+#include <thrift/concurrency/Monitor.h>
+
+#include <assert.h>
+#include <deque>
+#include <set>
+#include <iostream>
+#include <stdint.h>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+namespace test {
+
+using namespace apache::thrift::concurrency;
+
+static std::deque<std::shared_ptr<Runnable> > m_expired;
+static void expiredNotifier(std::shared_ptr<Runnable> runnable)
+{
+ m_expired.push_back(runnable);
+}
+
+static void sleep_(int64_t millisec) {
+ Monitor _sleep;
+ Synchronized s(_sleep);
+
+ try {
+ _sleep.wait(millisec);
+ } catch (TimedOutException&) {
+ ;
+ } catch (...) {
+ assert(0);
+ }
+}
+
+class ThreadManagerTests {
+
+public:
+ class Task : public Runnable {
+
+ public:
+ Task(Monitor& monitor, size_t& count, int64_t timeout)
+ : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {}
+
+ void run() override {
+
+ _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ sleep_(_timeout);
+
+ _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ _done = true;
+
+ {
+ Synchronized s(_monitor);
+
+ // std::cout << "Thread " << _count << " completed " << std::endl;
+
+ _count--;
+ if (_count % 10000 == 0) {
+ _monitor.notify();
+ }
+ }
+ }
+
+ Monitor& _monitor;
+ size_t& _count;
+ int64_t _timeout;
+ int64_t _startTime;
+ int64_t _endTime;
+ bool _done;
+ Monitor _sleep;
+ };
+
+ /**
+ * Dispatch count tasks, each of which blocks for timeout milliseconds then
+ * completes. Verify that all tasks completed and that thread manager cleans
+ * up properly on delete.
+ */
+ bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) {
+
+ Monitor monitor;
+
+ size_t activeCount = count;
+
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+
+ shared_ptr<ThreadFactory> threadFactory
+ = shared_ptr<ThreadFactory>(new ThreadFactory(false));
+
+ threadManager->threadFactory(threadFactory);
+
+ threadManager->start();
+
+ std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
+
+ for (size_t ix = 0; ix < count; ix++) {
+
+ tasks.insert(shared_ptr<ThreadManagerTests::Task>(
+ new ThreadManagerTests::Task(monitor, activeCount, timeout)));
+ }
+
+ int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ for (auto ix = tasks.begin();
+ ix != tasks.end();
+ ix++) {
+
+ threadManager->add(*ix);
+ }
+
+ std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl;
+
+ {
+ Synchronized s(monitor);
+
+ while (activeCount > 0) {
+ std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl;
+ monitor.wait();
+ }
+ }
+
+ int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+
+ int64_t firstTime = 9223372036854775807LL;
+ int64_t lastTime = 0;
+
+ double averageTime = 0;
+ int64_t minTime = 9223372036854775807LL;
+ int64_t maxTime = 0;
+
+ for (auto ix = tasks.begin();
+ ix != tasks.end();
+ ix++) {
+
+ shared_ptr<ThreadManagerTests::Task> task = *ix;
+
+ int64_t delta = task->_endTime - task->_startTime;
+
+ assert(delta > 0);
+
+ if (task->_startTime < firstTime) {
+ firstTime = task->_startTime;
+ }
+
+ if (task->_endTime > lastTime) {
+ lastTime = task->_endTime;
+ }
+
+ if (delta < minTime) {
+ minTime = delta;
+ }
+
+ if (delta > maxTime) {
+ maxTime = delta;
+ }
+
+ averageTime += delta;
+ }
+
+ averageTime /= count;
+
+ std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
+ << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
+ << "ms" << std::endl;
+
+ bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure")
+ << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
+ << "ms" << std::endl;
+
+ return success;
+ }
+
+ class BlockTask : public Runnable {
+
+ public:
+ BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
+ : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
+
+ void run() override {
+ {
+ Synchronized s(_entryMonitor);
+ _entered = true;
+ _entryMonitor.notify();
+ }
+
+ {
+ Synchronized s(_blockMonitor);
+ while (_blocked) {
+ _blockMonitor.wait();
+ }
+ }
+
+ {
+ Synchronized s(_doneMonitor);
+ if (--_count == 0) {
+ _doneMonitor.notify();
+ }
+ }
+ }
+
+ Monitor& _entryMonitor;
+ bool _entered;
+ Monitor& _blockMonitor;
+ bool& _blocked;
+ Monitor& _doneMonitor;
+ size_t& _count;
+ };
+
+ /**
+ * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
+ * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
+
+ bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) {
+ (void)timeout;
+ bool success = false;
+
+ try {
+
+ Monitor entryMonitor; // not used by this test
+ Monitor blockMonitor;
+ bool blocked[] = {true, true, true};
+ Monitor doneMonitor;
+
+ size_t pendingTaskMaxCount = workerCount;
+
+ size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+ shared_ptr<ThreadManager> threadManager
+ = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+ shared_ptr<ThreadFactory> threadFactory
+ = shared_ptr<ThreadFactory>(new ThreadFactory());
+
+ threadManager->threadFactory(threadFactory);
+
+ threadManager->start();
+
+ std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+ tasks.reserve(workerCount + pendingTaskMaxCount);
+
+ for (size_t ix = 0; ix < workerCount; ix++) {
+
+ tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
+ new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
+ }
+
+ for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+ tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
+ new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
+ }
+
+ for (auto ix = tasks.begin();
+ ix != tasks.end();
+ ix++) {
+ threadManager->add(*ix);
+ }
+
+ if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+ throw TException("Unexpected pending task count");
+ }
+
+ shared_ptr<ThreadManagerTests::BlockTask> extraTask(
+ new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
+
+ try {
+ threadManager->add(extraTask, 1);
+ throw TException("Unexpected success adding task in excess of pending task count");
+ } catch (TooManyPendingTasksException&) {
+ throw TException("Should have timed out adding task in excess of pending task count");
+ } catch (TimedOutException&) {
+ // Expected result
+ }
+
+ try {
+ threadManager->add(extraTask, -1);
+ throw TException("Unexpected success adding task in excess of pending task count");
+ } catch (TimedOutException&) {
+ throw TException("Unexpected timeout adding task in excess of pending task count");
+ } catch (TooManyPendingTasksException&) {
+ // Expected result
+ }
+
+ std::cout << "\t\t\t"
+ << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ {
+ Synchronized s(blockMonitor);
+ blocked[0] = false;
+ blockMonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(doneMonitor);
+ while (activeCounts[0] != 0) {
+ doneMonitor.wait();
+ }
+ }
+
+ std::cout << "\t\t\t"
+ << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+ try {
+ threadManager->add(extraTask, 1);
+ } catch (TimedOutException&) {
+ std::cout << "\t\t\t"
+ << "add timed out unexpectedly" << std::endl;
+ throw TException("Unexpected timeout adding task");
+
+ } catch (TooManyPendingTasksException&) {
+ std::cout << "\t\t\t"
+ << "add encountered too many pending exepctions" << std::endl;
+ throw TException("Unexpected timeout adding task");
+ }
+
+ // Wake up tasks that were pending before and wait for them to complete
+
+ {
+ Synchronized s(blockMonitor);
+ blocked[1] = false;
+ blockMonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(doneMonitor);
+ while (activeCounts[1] != 0) {
+ doneMonitor.wait();
+ }
+ }
+
+ // Wake up the extra task and wait for it to complete
+
+ {
+ Synchronized s(blockMonitor);
+ blocked[2] = false;
+ blockMonitor.notifyAll();
+ }
+
+ {
+ Synchronized s(doneMonitor);
+ while (activeCounts[2] != 0) {
+ doneMonitor.wait();
+ }
+ }
+
+ threadManager->stop();
+
+ if (!(success = (threadManager->totalTaskCount() == 0))) {
+ throw TException("Unexpected total task count");
+ }
+
+ } catch (TException& e) {
+ std::cout << "ERROR: " << e.what() << std::endl;
+ }
+
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+ return success;
+ }
+
+
+ bool apiTest() {
+
+ // prove currentTime has milliseconds granularity since many other things depend on it
+ int64_t a = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ sleep_(100);
+ int64_t b = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ if (b - a < 50 || b - a > 150) {
+ std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
+ return false;
+ }
+
+ return apiTestWithThreadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
+
+ }
+
+ bool apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory)
+ {
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
+ threadManager->threadFactory(threadFactory);
+
+ std::cout << "\t\t\t\tstarting.. " << std::endl;
+
+ threadManager->start();
+ threadManager->setExpireCallback(expiredNotifier); // std::bind(&ThreadManagerTests::expiredNotifier, this));
+
+#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
+
+ EXPECT(threadManager->workerCount(), 1);
+ EXPECT(threadManager->idleWorkerCount(), 1);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl;
+
+ threadManager->addWorker();
+
+ EXPECT(threadManager->workerCount(), 2);
+ EXPECT(threadManager->idleWorkerCount(), 2);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl;
+
+ threadManager->removeWorker();
+
+ EXPECT(threadManager->workerCount(), 1);
+ EXPECT(threadManager->idleWorkerCount(), 1);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tremove 1st worker.. " << std::endl;
+
+ threadManager->removeWorker();
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
+
+ // We're going to throw a blocking task into the mix
+ Monitor entryMonitor; // signaled when task is running
+ Monitor blockMonitor; // to be signaled to unblock the task
+ bool blocked(true); // set to false before notifying
+ Monitor doneMonitor; // signaled when count reaches zero
+ size_t activeCount = 1;
+ shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
+ new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
+ threadManager->add(blockingTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 1);
+
+ std::cout << "\t\t\t\tadd other task.. " << std::endl;
+
+ shared_ptr<ThreadManagerTests::Task> otherTask(
+ new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
+
+ threadManager->add(otherTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 2);
+
+ std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl;
+
+ threadManager->remove(blockingTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 1);
+
+ std::cout << "\t\t\t\tremove next pending task.." << std::endl;
+
+ shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
+ if (nextTask != otherTask) {
+ std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl;
+ return false;
+ }
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl;
+
+ nextTask = threadManager->removeNextPending();
+ if (nextTask) {
+ std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl;
+ return false;
+ }
+
+ std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl;
+
+ shared_ptr<ThreadManagerTests::Task> expiredTask(
+ new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
+
+ threadManager->add(expiredTask, 0, 1);
+ threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped
+ threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both
+
+ sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 3);
+ EXPECT(threadManager->expiredTaskCount(), 0);
+
+ std::cout << "\t\t\t\tremove expired tasks.." << std::endl;
+
+ if (!m_expired.empty()) {
+ std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl;
+ return false;
+ }
+
+ threadManager->removeExpiredTasks();
+
+ if (m_expired.size() != 2) {
+ std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
+ return false;
+ }
+
+ if (m_expired.front() != expiredTask) {
+ std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl;
+ return false;
+ }
+ m_expired.pop_front();
+
+ if (m_expired.front() != expiredTask) {
+ std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl;
+ return false;
+ }
+
+ m_expired.clear();
+
+ threadManager->remove(blockingTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+ EXPECT(threadManager->expiredTaskCount(), 2);
+
+ std::cout << "\t\t\t\tadd expired task (again).." << std::endl;
+
+ threadManager->add(expiredTask, 0, 1); // expires in 1ms
+ sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
+
+ std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl;
+
+ threadManager->addWorker();
+ sleep_(100); // make sure it has time to spin up and expire the task
+
+ if (m_expired.empty()) {
+ std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
+ return false;
+ }
+
+ if (m_expired.front() != expiredTask) {
+ std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl;
+ return false;
+ }
+
+ m_expired.clear();
+
+ EXPECT(threadManager->workerCount(), 1);
+ EXPECT(threadManager->idleWorkerCount(), 1);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+ EXPECT(threadManager->expiredTaskCount(), 3);
+
+ std::cout << "\t\t\t\ttry to remove too many workers" << std::endl;
+ try {
+ threadManager->removeWorker(2);
+ std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl;
+ return false;
+ } catch (const InvalidArgumentException&) {
+ /* expected */
+ }
+
+ std::cout << "\t\t\t\tremove worker.. " << std::endl;
+
+ threadManager->removeWorker();
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+ EXPECT(threadManager->expiredTaskCount(), 3);
+
+ std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
+
+ threadManager->add(blockingTask);
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 1);
+
+ std::cout << "\t\t\t\tadd worker.. " << std::endl;
+
+ threadManager->addWorker();
+ {
+ Synchronized s(entryMonitor);
+ while (!blockingTask->_entered) {
+ entryMonitor.wait();
+ }
+ }
+
+ EXPECT(threadManager->workerCount(), 1);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl;
+
+ {
+ Synchronized s(blockMonitor);
+ blocked = false;
+ blockMonitor.notifyAll();
+ }
+ threadManager->removeWorker();
+
+ EXPECT(threadManager->workerCount(), 0);
+ EXPECT(threadManager->idleWorkerCount(), 0);
+ EXPECT(threadManager->pendingTaskCount(), 0);
+
+ std::cout << "\t\t\t\tcleanup.. " << std::endl;
+
+ blockingTask.reset();
+ threadManager.reset();
+ return true;
+ }
+};
+
+}
+}
+}
+} // apache::thrift::concurrency
+
+using namespace apache::thrift::concurrency::test;
diff --git a/src/jaegertracing/thrift/lib/cpp/test/concurrency/TimerManagerTests.h b/src/jaegertracing/thrift/lib/cpp/test/concurrency/TimerManagerTests.h
new file mode 100644
index 000000000..2d1a2620a
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/concurrency/TimerManager.h>
+#include <thrift/concurrency/ThreadFactory.h>
+#include <thrift/concurrency/Monitor.h>
+
+#include <assert.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+namespace apache {
+namespace thrift {
+namespace concurrency {
+namespace test {
+
+using namespace apache::thrift::concurrency;
+
+class TimerManagerTests {
+
+public:
+ class Task : public Runnable {
+ public:
+ Task(Monitor& monitor, uint64_t timeout)
+ : _timeout(timeout),
+ _startTime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count()),
+ _endTime(0),
+ _monitor(monitor),
+ _success(false),
+ _done(false) {}
+
+ ~Task() override { std::cerr << this << std::endl; }
+
+ void run() override {
+
+ _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
+ _success = (_endTime - _startTime) >= _timeout;
+
+ {
+ Synchronized s(_monitor);
+ _done = true;
+ _monitor.notifyAll();
+ }
+ }
+
+ int64_t _timeout;
+ int64_t _startTime;
+ int64_t _endTime;
+ Monitor& _monitor;
+ bool _success;
+ bool _done;
+ };
+
+ /**
+ * This test creates two tasks and waits for the first to expire within 10%
+ * of the expected expiration time. It then verifies that the timer manager
+ * properly clean up itself and the remaining orphaned timeout task when the
+ * manager goes out of scope and its destructor is called.
+ */
+ bool test00(uint64_t timeout = 1000LL) {
+
+ shared_ptr<TimerManagerTests::Task> orphanTask
+ = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
+
+ {
+ TimerManager timerManager;
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
+ timerManager.start();
+ if (timerManager.state() != TimerManager::STARTED) {
+ std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl;
+ return false;
+ }
+
+ // Don't create task yet, because its constructor sets the expected completion time, and we
+ // need to delay between inserting the two tasks into the run queue.
+ shared_ptr<TimerManagerTests::Task> task;
+
+ {
+ Synchronized s(_monitor);
+ timerManager.add(orphanTask, 10 * timeout);
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(timeout));
+
+ task.reset(new TimerManagerTests::Task(_monitor, timeout));
+ timerManager.add(task, timeout);
+ _monitor.wait();
+ }
+
+ if (!task->_done) {
+ std::cerr << "task is not done, but it should have executed" << std::endl;
+ return false;
+ }
+
+ std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
+ }
+
+ if (orphanTask->_done) {
+ std::cerr << "orphan task is done, but it should not have executed" << std::endl;
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * This test creates two tasks, removes the first one then waits for the second one. It then
+ * verifies that the timer manager properly clean up itself and the remaining orphaned timeout
+ * task when the manager goes out of scope and its destructor is called.
+ */
+ bool test01(uint64_t timeout = 1000LL) {
+ TimerManager timerManager;
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
+ timerManager.start();
+ assert(timerManager.state() == TimerManager::STARTED);
+
+ Synchronized s(_monitor);
+
+ // Setup the two tasks
+ shared_ptr<TimerManagerTests::Task> taskToRemove
+ = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout / 2));
+ timerManager.add(taskToRemove, taskToRemove->_timeout);
+
+ shared_ptr<TimerManagerTests::Task> task
+ = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
+ timerManager.add(task, task->_timeout);
+
+ // Remove one task and wait until the other has completed
+ timerManager.remove(taskToRemove);
+ _monitor.wait(timeout * 2);
+
+ assert(!taskToRemove->_done);
+ assert(task->_done);
+
+ return true;
+ }
+
+ /**
+ * This test creates two tasks with the same callback and another one, then removes the two
+ * duplicated then waits for the last one. It then verifies that the timer manager properly
+ * clean up itself and the remaining orphaned timeout task when the manager goes out of scope
+ * and its destructor is called.
+ */
+ bool test02(uint64_t timeout = 1000LL) {
+ TimerManager timerManager;
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
+ timerManager.start();
+ assert(timerManager.state() == TimerManager::STARTED);
+
+ Synchronized s(_monitor);
+
+ // Setup the one tasks and add it twice
+ shared_ptr<TimerManagerTests::Task> taskToRemove
+ = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout / 3));
+ timerManager.add(taskToRemove, taskToRemove->_timeout);
+ timerManager.add(taskToRemove, taskToRemove->_timeout * 2);
+
+ shared_ptr<TimerManagerTests::Task> task
+ = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
+ timerManager.add(task, task->_timeout);
+
+ // Remove the first task (e.g. two timers) and wait until the other has completed
+ timerManager.remove(taskToRemove);
+ _monitor.wait(timeout * 2);
+
+ assert(!taskToRemove->_done);
+ assert(task->_done);
+
+ return true;
+ }
+
+ /**
+ * This test creates two tasks, removes the first one then waits for the second one. It then
+ * verifies that the timer manager properly clean up itself and the remaining orphaned timeout
+ * task when the manager goes out of scope and its destructor is called.
+ */
+ bool test03(uint64_t timeout = 1000LL) {
+ TimerManager timerManager;
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
+ timerManager.start();
+ assert(timerManager.state() == TimerManager::STARTED);
+
+ Synchronized s(_monitor);
+
+ // Setup the two tasks
+ shared_ptr<TimerManagerTests::Task> taskToRemove
+ = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout / 2));
+ TimerManager::Timer timer = timerManager.add(taskToRemove, taskToRemove->_timeout);
+
+ shared_ptr<TimerManagerTests::Task> task
+ = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
+ timerManager.add(task, task->_timeout);
+
+ // Remove one task and wait until the other has completed
+ timerManager.remove(timer);
+ _monitor.wait(timeout * 2);
+
+ assert(!taskToRemove->_done);
+ assert(task->_done);
+
+ // Verify behavior when removing the removed task
+ try {
+ timerManager.remove(timer);
+ assert(nullptr == "ERROR: This remove should send a NoSuchTaskException exception.");
+ } catch (NoSuchTaskException&) {
+ }
+
+ return true;
+ }
+
+ /**
+ * This test creates one task, and tries to remove it after it has expired.
+ */
+ bool test04(uint64_t timeout = 1000LL) {
+ TimerManager timerManager;
+ timerManager.threadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
+ timerManager.start();
+ assert(timerManager.state() == TimerManager::STARTED);
+
+ Synchronized s(_monitor);
+
+ // Setup the task
+ shared_ptr<TimerManagerTests::Task> task
+ = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout / 10));
+ TimerManager::Timer timer = timerManager.add(task, task->_timeout);
+ task.reset();
+
+ // Wait until the task has completed
+ _monitor.wait(timeout);
+
+ // Verify behavior when removing the expired task
+ // notify is called inside the task so the task may still
+ // be running when we get here, so we need to loop...
+ for (;;) {
+ try {
+ timerManager.remove(timer);
+ assert(nullptr == "ERROR: This remove should throw NoSuchTaskException, or UncancellableTaskException.");
+ } catch (const NoSuchTaskException&) {
+ break;
+ } catch (const UncancellableTaskException&) {
+ // the thread was still exiting; try again...
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ }
+
+ return true;
+ }
+
+ friend class TestTask;
+
+ Monitor _monitor;
+};
+
+}
+}
+}
+} // apache::thrift::concurrency