From 19fcec84d8d7d21e796c7624e521b60d28ee21ed Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 7 Apr 2024 20:45:59 +0200 Subject: Adding upstream version 16.2.11+ds. Signed-off-by: Daniel Baumann --- .../thrift/lib/cpp/test/concurrency/Tests.cpp | 224 ++++++++ .../lib/cpp/test/concurrency/ThreadFactoryTests.h | 308 ++++++++++ .../lib/cpp/test/concurrency/ThreadManagerTests.h | 639 +++++++++++++++++++++ .../lib/cpp/test/concurrency/TimerManagerTests.h | 273 +++++++++ 4 files changed, 1444 insertions(+) create mode 100644 src/jaegertracing/thrift/lib/cpp/test/concurrency/Tests.cpp create mode 100644 src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadFactoryTests.h create mode 100644 src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h create mode 100644 src/jaegertracing/thrift/lib/cpp/test/concurrency/TimerManagerTests.h (limited to 'src/jaegertracing/thrift/lib/cpp/test/concurrency') diff --git a/src/jaegertracing/thrift/lib/cpp/test/concurrency/Tests.cpp b/src/jaegertracing/thrift/lib/cpp/test/concurrency/Tests.cpp new file mode 100644 index 000000000..8c734c2d5 --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/concurrency/Tests.cpp @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include "ThreadFactoryTests.h" +#include "TimerManagerTests.h" +#include "ThreadManagerTests.h" + +// The test weight, where 10 is 10 times more threads than baseline +// and the baseline is optimized for running in valgrind +static int WEIGHT = 10; + +int main(int argc, char** argv) { + + std::vector args(argc - 1 > 1 ? argc - 1 : 1); + + args[0] = "all"; + + for (int ix = 1; ix < argc; ix++) { + args[ix - 1] = std::string(argv[ix]); + } + + if (getenv("VALGRIND") != nullptr) { + // lower the scale of every test + WEIGHT = 1; + } + + bool runAll = args[0].compare("all") == 0; + + if (runAll || args[0].compare("thread-factory") == 0) { + + ThreadFactoryTests threadFactoryTests; + + std::cout << "ThreadFactory tests..." << std::endl; + + int reapLoops = 2 * WEIGHT; + int reapCount = 100 * WEIGHT; + size_t floodLoops = 3; + size_t floodCount = 500 * WEIGHT; + + std::cout << "\t\tThreadFactory reap N threads test: N = " << reapLoops << "x" << reapCount << std::endl; + + if (!threadFactoryTests.reapNThreads(reapLoops, reapCount)) { + std::cerr << "\t\ttThreadFactory reap N threads FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tThreadFactory flood N threads test: N = " << floodLoops << "x" << floodCount << std::endl; + + if (!threadFactoryTests.floodNTest(floodLoops, floodCount)) { + std::cerr << "\t\ttThreadFactory flood N threads FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tThreadFactory synchronous start test" << std::endl; + + if (!threadFactoryTests.synchStartTest()) { + std::cerr << "\t\ttThreadFactory synchronous start FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tThreadFactory monitor timeout test" << std::endl; + + if (!threadFactoryTests.monitorTimeoutTest()) { + std::cerr << "\t\ttThreadFactory monitor timeout FAILED" << std::endl; + return 1; + } + } + + if (runAll || args[0].compare("util") == 0) { + + std::cout << "Util tests..." << std::endl; + + std::cout << "\t\tUtil minimum time" << std::endl; + + int64_t time00 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + int64_t time01 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + + std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl; + + time00 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + time01 = time00; + size_t count = 0; + + while (time01 < time00 + 10) { + count++; + time01 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + } + + std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl; + } + + if (runAll || args[0].compare("timer-manager") == 0) { + + std::cout << "TimerManager tests..." << std::endl; + + std::cout << "\t\tTimerManager test00" << std::endl; + + TimerManagerTests timerManagerTests; + + if (!timerManagerTests.test00()) { + std::cerr << "\t\tTimerManager tests FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tTimerManager test01" << std::endl; + + if (!timerManagerTests.test01()) { + std::cerr << "\t\tTimerManager tests FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tTimerManager test02" << std::endl; + + if (!timerManagerTests.test02()) { + std::cerr << "\t\tTimerManager tests FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tTimerManager test03" << std::endl; + + if (!timerManagerTests.test03()) { + std::cerr << "\t\tTimerManager tests FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tTimerManager test04" << std::endl; + + if (!timerManagerTests.test04()) { + std::cerr << "\t\tTimerManager tests FAILED" << std::endl; + return 1; + } + } + + if (runAll || args[0].compare("thread-manager") == 0) { + + std::cout << "ThreadManager tests..." << std::endl; + + { + size_t workerCount = 10 * WEIGHT; + size_t taskCount = 500 * WEIGHT; + int64_t delay = 10LL; + + ThreadManagerTests threadManagerTests; + + std::cout << "\t\tThreadManager api test:" << std::endl; + + if (!threadManagerTests.apiTest()) { + std::cerr << "\t\tThreadManager apiTest FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tThreadManager load test: worker count: " << workerCount + << " task count: " << taskCount << " delay: " << delay << std::endl; + + if (!threadManagerTests.loadTest(taskCount, delay, workerCount)) { + std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl; + return 1; + } + + std::cout << "\t\tThreadManager block test: worker count: " << workerCount + << " delay: " << delay << std::endl; + + if (!threadManagerTests.blockTest(delay, workerCount)) { + std::cerr << "\t\tThreadManager blockTest FAILED" << std::endl; + return 1; + } + } + } + + if (runAll || args[0].compare("thread-manager-benchmark") == 0) { + + std::cout << "ThreadManager benchmark tests..." << std::endl; + + { + + size_t minWorkerCount = 2; + + size_t maxWorkerCount = 8; + + size_t tasksPerWorker = 100 * WEIGHT; + + int64_t delay = 5LL; + + for (size_t workerCount = minWorkerCount; workerCount <= maxWorkerCount; workerCount *= 4) { + + size_t taskCount = workerCount * tasksPerWorker; + + std::cout << "\t\tThreadManager load test: worker count: " << workerCount + << " task count: " << taskCount << " delay: " << delay << std::endl; + + ThreadManagerTests threadManagerTests; + + if (!threadManagerTests.loadTest(taskCount, delay, workerCount)) + { + std::cerr << "\t\tThreadManager loadTest FAILED" << std::endl; + return 1; + } + } + } + } + + std::cout << "ALL TESTS PASSED" << std::endl; + return 0; +} diff --git a/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadFactoryTests.h b/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadFactoryTests.h new file mode 100644 index 000000000..febe3f8b3 --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadFactoryTests.h @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace apache { +namespace thrift { +namespace concurrency { +namespace test { + +using std::shared_ptr; +using namespace apache::thrift::concurrency; + +/** + * ThreadManagerTests class + * + * @version $Id:$ + */ +class ThreadFactoryTests { + +public: + /** + * Reap N threads + */ + class ReapNTask : public Runnable { + + public: + ReapNTask(Monitor& monitor, int& activeCount) : _monitor(monitor), _count(activeCount) {} + + void run() override { + Synchronized s(_monitor); + + if (--_count == 0) { + _monitor.notify(); + } + } + + Monitor& _monitor; + int& _count; + }; + + bool reapNThreads(int loop = 1, int count = 10) { + + ThreadFactory threadFactory = ThreadFactory(); + shared_ptr monitor(new Monitor); + + for (int lix = 0; lix < loop; lix++) { + + int activeCount = 0; + + std::vector > threads; + int tix; + + for (tix = 0; tix < count; tix++) { + try { + ++activeCount; + threads.push_back( + threadFactory.newThread(shared_ptr(new ReapNTask(*monitor, activeCount)))); + } catch (SystemResourceException& e) { + std::cout << "\t\t\tfailed to create " << lix* count + tix << " thread " << e.what() + << std::endl; + throw e; + } + } + + tix = 0; + for (std::vector >::const_iterator thread = threads.begin(); + thread != threads.end(); + tix++, ++thread) { + + try { + (*thread)->start(); + } catch (SystemResourceException& e) { + std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what() + << std::endl; + throw e; + } + } + + { + Synchronized s(*monitor); + while (activeCount > 0) { + monitor->wait(1000); + } + } + + std::cout << "\t\t\treaped " << lix* count << " threads" << std::endl; + } + + std::cout << "\t\t\tSuccess!" << std::endl; + return true; + } + + class SynchStartTask : public Runnable { + + public: + enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED }; + + SynchStartTask(Monitor& monitor, volatile STATE& state) : _monitor(monitor), _state(state) {} + + void run() override { + { + Synchronized s(_monitor); + if (_state == SynchStartTask::STARTING) { + _state = SynchStartTask::STARTED; + _monitor.notify(); + } + } + + { + Synchronized s(_monitor); + while (_state == SynchStartTask::STARTED) { + _monitor.wait(); + } + + if (_state == SynchStartTask::STOPPING) { + _state = SynchStartTask::STOPPED; + _monitor.notifyAll(); + } + } + } + + private: + Monitor& _monitor; + volatile STATE& _state; + }; + + bool synchStartTest() { + + Monitor monitor; + + SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED; + + shared_ptr task + = shared_ptr(new SynchStartTask(monitor, state)); + + ThreadFactory threadFactory = ThreadFactory(); + + shared_ptr thread = threadFactory.newThread(task); + + if (state == SynchStartTask::UNINITIALIZED) { + + state = SynchStartTask::STARTING; + + thread->start(); + } + + { + Synchronized s(monitor); + while (state == SynchStartTask::STARTING) { + monitor.wait(); + } + } + + assert(state != SynchStartTask::STARTING); + + { + Synchronized s(monitor); + + try { + monitor.wait(100); + } catch (TimedOutException&) { + } + + if (state == SynchStartTask::STARTED) { + + state = SynchStartTask::STOPPING; + + monitor.notify(); + } + + while (state == SynchStartTask::STOPPING) { + monitor.wait(); + } + } + + assert(state == SynchStartTask::STOPPED); + + bool success = true; + + std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl; + + return true; + } + + /** + * The only guarantee a monitor timeout can give you is that + * it will take "at least" as long as the timeout, no less. + * There is absolutely no guarantee around regaining execution + * near the timeout. On a busy system (like inside a third party + * CI environment) it could take quite a bit longer than the + * requested timeout, and that's ok. + */ + + bool monitorTimeoutTest(int64_t count = 1000, int64_t timeout = 2) { + + Monitor monitor; + + int64_t startTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + + for (int64_t ix = 0; ix < count; ix++) { + { + Synchronized s(monitor); + try { + monitor.wait(timeout); + } catch (TimedOutException&) { + } + } + } + + int64_t endTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + + bool success = (endTime - startTime) >= (count * timeout); + + std::cout << "\t\t\t" << (success ? "Success" : "Failure") + << ": minimum required time to elapse " << count * timeout + << "ms; actual elapsed time " << endTime - startTime << "ms" + << std::endl; + + return success; + } + + class FloodTask : public Runnable { + public: + FloodTask(const size_t id, Monitor& mon) : _id(id), _mon(mon) {} + ~FloodTask() override { + if (_id % 10000 == 0) { + Synchronized sync(_mon); + std::cout << "\t\tthread " << _id << " done" << std::endl; + } + } + + void run() override { + if (_id % 10000 == 0) { + Synchronized sync(_mon); + std::cout << "\t\tthread " << _id << " started" << std::endl; + } + } + const size_t _id; + Monitor& _mon; + }; + + void foo(ThreadFactory* tf) { (void)tf; } + + bool floodNTest(size_t loop = 1, size_t count = 100000) { + + bool success = false; + Monitor mon; + + for (size_t lix = 0; lix < loop; lix++) { + + ThreadFactory threadFactory = ThreadFactory(); + threadFactory.setDetached(true); + + for (size_t tix = 0; tix < count; tix++) { + + try { + + shared_ptr task(new FloodTask(lix * count + tix, mon)); + shared_ptr thread = threadFactory.newThread(task); + thread->start(); + + } catch (TException& e) { + + std::cout << "\t\t\tfailed to start " << lix* count + tix << " thread " << e.what() + << std::endl; + + return success; + } + } + + Synchronized sync(mon); + std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl; + success = true; + } + + return success; + } +}; + +} +} +} +} // apache::thrift::concurrency::test diff --git a/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h b/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h new file mode 100644 index 000000000..fee7c7c51 --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h @@ -0,0 +1,639 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace apache { +namespace thrift { +namespace concurrency { +namespace test { + +using namespace apache::thrift::concurrency; + +static std::deque > m_expired; +static void expiredNotifier(std::shared_ptr runnable) +{ + m_expired.push_back(runnable); +} + +static void sleep_(int64_t millisec) { + Monitor _sleep; + Synchronized s(_sleep); + + try { + _sleep.wait(millisec); + } catch (TimedOutException&) { + ; + } catch (...) { + assert(0); + } +} + +class ThreadManagerTests { + +public: + class Task : public Runnable { + + public: + Task(Monitor& monitor, size_t& count, int64_t timeout) + : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {} + + void run() override { + + _startTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + + sleep_(_timeout); + + _endTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + + _done = true; + + { + Synchronized s(_monitor); + + // std::cout << "Thread " << _count << " completed " << std::endl; + + _count--; + if (_count % 10000 == 0) { + _monitor.notify(); + } + } + } + + Monitor& _monitor; + size_t& _count; + int64_t _timeout; + int64_t _startTime; + int64_t _endTime; + bool _done; + Monitor _sleep; + }; + + /** + * Dispatch count tasks, each of which blocks for timeout milliseconds then + * completes. Verify that all tasks completed and that thread manager cleans + * up properly on delete. + */ + bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) { + + Monitor monitor; + + size_t activeCount = count; + + shared_ptr threadManager = ThreadManager::newSimpleThreadManager(workerCount); + + shared_ptr threadFactory + = shared_ptr(new ThreadFactory(false)); + + threadManager->threadFactory(threadFactory); + + threadManager->start(); + + std::set > tasks; + + for (size_t ix = 0; ix < count; ix++) { + + tasks.insert(shared_ptr( + new ThreadManagerTests::Task(monitor, activeCount, timeout))); + } + + int64_t time00 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + + for (auto ix = tasks.begin(); + ix != tasks.end(); + ix++) { + + threadManager->add(*ix); + } + + std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl; + + { + Synchronized s(monitor); + + while (activeCount > 0) { + std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl; + monitor.wait(); + } + } + + int64_t time01 = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + + int64_t firstTime = 9223372036854775807LL; + int64_t lastTime = 0; + + double averageTime = 0; + int64_t minTime = 9223372036854775807LL; + int64_t maxTime = 0; + + for (auto ix = tasks.begin(); + ix != tasks.end(); + ix++) { + + shared_ptr task = *ix; + + int64_t delta = task->_endTime - task->_startTime; + + assert(delta > 0); + + if (task->_startTime < firstTime) { + firstTime = task->_startTime; + } + + if (task->_endTime > lastTime) { + lastTime = task->_endTime; + } + + if (delta < minTime) { + minTime = delta; + } + + if (delta > maxTime) { + maxTime = delta; + } + + averageTime += delta; + } + + averageTime /= count; + + std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime + << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime + << "ms" << std::endl; + + bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount; + + std::cout << "\t\t\t" << (success ? "Success" : "Failure") + << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00 + << "ms" << std::endl; + + return success; + } + + class BlockTask : public Runnable { + + public: + BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count) + : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {} + + void run() override { + { + Synchronized s(_entryMonitor); + _entered = true; + _entryMonitor.notify(); + } + + { + Synchronized s(_blockMonitor); + while (_blocked) { + _blockMonitor.wait(); + } + } + + { + Synchronized s(_doneMonitor); + if (--_count == 0) { + _doneMonitor.notify(); + } + } + } + + Monitor& _entryMonitor; + bool _entered; + Monitor& _blockMonitor; + bool& _blocked; + Monitor& _doneMonitor; + size_t& _count; + }; + + /** + * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the + * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */ + + bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) { + (void)timeout; + bool success = false; + + try { + + Monitor entryMonitor; // not used by this test + Monitor blockMonitor; + bool blocked[] = {true, true, true}; + Monitor doneMonitor; + + size_t pendingTaskMaxCount = workerCount; + + size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1}; + + shared_ptr threadManager + = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount); + + shared_ptr threadFactory + = shared_ptr(new ThreadFactory()); + + threadManager->threadFactory(threadFactory); + + threadManager->start(); + + std::vector > tasks; + tasks.reserve(workerCount + pendingTaskMaxCount); + + for (size_t ix = 0; ix < workerCount; ix++) { + + tasks.push_back(shared_ptr( + new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0]))); + } + + for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) { + + tasks.push_back(shared_ptr( + new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1]))); + } + + for (auto ix = tasks.begin(); + ix != tasks.end(); + ix++) { + threadManager->add(*ix); + } + + if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) { + throw TException("Unexpected pending task count"); + } + + shared_ptr extraTask( + new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2])); + + try { + threadManager->add(extraTask, 1); + throw TException("Unexpected success adding task in excess of pending task count"); + } catch (TooManyPendingTasksException&) { + throw TException("Should have timed out adding task in excess of pending task count"); + } catch (TimedOutException&) { + // Expected result + } + + try { + threadManager->add(extraTask, -1); + throw TException("Unexpected success adding task in excess of pending task count"); + } catch (TimedOutException&) { + throw TException("Unexpected timeout adding task in excess of pending task count"); + } catch (TooManyPendingTasksException&) { + // Expected result + } + + std::cout << "\t\t\t" + << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; + + { + Synchronized s(blockMonitor); + blocked[0] = false; + blockMonitor.notifyAll(); + } + + { + Synchronized s(doneMonitor); + while (activeCounts[0] != 0) { + doneMonitor.wait(); + } + } + + std::cout << "\t\t\t" + << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; + + try { + threadManager->add(extraTask, 1); + } catch (TimedOutException&) { + std::cout << "\t\t\t" + << "add timed out unexpectedly" << std::endl; + throw TException("Unexpected timeout adding task"); + + } catch (TooManyPendingTasksException&) { + std::cout << "\t\t\t" + << "add encountered too many pending exepctions" << std::endl; + throw TException("Unexpected timeout adding task"); + } + + // Wake up tasks that were pending before and wait for them to complete + + { + Synchronized s(blockMonitor); + blocked[1] = false; + blockMonitor.notifyAll(); + } + + { + Synchronized s(doneMonitor); + while (activeCounts[1] != 0) { + doneMonitor.wait(); + } + } + + // Wake up the extra task and wait for it to complete + + { + Synchronized s(blockMonitor); + blocked[2] = false; + blockMonitor.notifyAll(); + } + + { + Synchronized s(doneMonitor); + while (activeCounts[2] != 0) { + doneMonitor.wait(); + } + } + + threadManager->stop(); + + if (!(success = (threadManager->totalTaskCount() == 0))) { + throw TException("Unexpected total task count"); + } + + } catch (TException& e) { + std::cout << "ERROR: " << e.what() << std::endl; + } + + std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl; + return success; + } + + + bool apiTest() { + + // prove currentTime has milliseconds granularity since many other things depend on it + int64_t a = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + sleep_(100); + int64_t b = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + if (b - a < 50 || b - a > 150) { + std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl; + return false; + } + + return apiTestWithThreadFactory(shared_ptr(new ThreadFactory())); + + } + + bool apiTestWithThreadFactory(shared_ptr threadFactory) + { + shared_ptr threadManager = ThreadManager::newSimpleThreadManager(1); + threadManager->threadFactory(threadFactory); + + std::cout << "\t\t\t\tstarting.. " << std::endl; + + threadManager->start(); + threadManager->setExpireCallback(expiredNotifier); // std::bind(&ThreadManagerTests::expiredNotifier, this)); + +#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } } + + EXPECT(threadManager->workerCount(), 1); + EXPECT(threadManager->idleWorkerCount(), 1); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl; + + threadManager->addWorker(); + + EXPECT(threadManager->workerCount(), 2); + EXPECT(threadManager->idleWorkerCount(), 2); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl; + + threadManager->removeWorker(); + + EXPECT(threadManager->workerCount(), 1); + EXPECT(threadManager->idleWorkerCount(), 1); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tremove 1st worker.. " << std::endl; + + threadManager->removeWorker(); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tadd blocking task.. " << std::endl; + + // We're going to throw a blocking task into the mix + Monitor entryMonitor; // signaled when task is running + Monitor blockMonitor; // to be signaled to unblock the task + bool blocked(true); // set to false before notifying + Monitor doneMonitor; // signaled when count reaches zero + size_t activeCount = 1; + shared_ptr blockingTask( + new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount)); + threadManager->add(blockingTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 1); + + std::cout << "\t\t\t\tadd other task.. " << std::endl; + + shared_ptr otherTask( + new ThreadManagerTests::Task(doneMonitor, activeCount, 0)); + + threadManager->add(otherTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 2); + + std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl; + + threadManager->remove(blockingTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 1); + + std::cout << "\t\t\t\tremove next pending task.." << std::endl; + + shared_ptr nextTask = threadManager->removeNextPending(); + if (nextTask != otherTask) { + std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl; + return false; + } + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl; + + nextTask = threadManager->removeNextPending(); + if (nextTask) { + std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl; + return false; + } + + std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl; + + shared_ptr expiredTask( + new ThreadManagerTests::Task(doneMonitor, activeCount, 0)); + + threadManager->add(expiredTask, 0, 1); + threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped + threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both + + sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 3); + EXPECT(threadManager->expiredTaskCount(), 0); + + std::cout << "\t\t\t\tremove expired tasks.." << std::endl; + + if (!m_expired.empty()) { + std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl; + return false; + } + + threadManager->removeExpiredTasks(); + + if (m_expired.size() != 2) { + std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl; + return false; + } + + if (m_expired.front() != expiredTask) { + std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl; + return false; + } + m_expired.pop_front(); + + if (m_expired.front() != expiredTask) { + std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl; + return false; + } + + m_expired.clear(); + + threadManager->remove(blockingTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + EXPECT(threadManager->expiredTaskCount(), 2); + + std::cout << "\t\t\t\tadd expired task (again).." << std::endl; + + threadManager->add(expiredTask, 0, 1); // expires in 1ms + sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms + + std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl; + + threadManager->addWorker(); + sleep_(100); // make sure it has time to spin up and expire the task + + if (m_expired.empty()) { + std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl; + return false; + } + + if (m_expired.front() != expiredTask) { + std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl; + return false; + } + + m_expired.clear(); + + EXPECT(threadManager->workerCount(), 1); + EXPECT(threadManager->idleWorkerCount(), 1); + EXPECT(threadManager->pendingTaskCount(), 0); + EXPECT(threadManager->expiredTaskCount(), 3); + + std::cout << "\t\t\t\ttry to remove too many workers" << std::endl; + try { + threadManager->removeWorker(2); + std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl; + return false; + } catch (const InvalidArgumentException&) { + /* expected */ + } + + std::cout << "\t\t\t\tremove worker.. " << std::endl; + + threadManager->removeWorker(); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + EXPECT(threadManager->expiredTaskCount(), 3); + + std::cout << "\t\t\t\tadd blocking task.. " << std::endl; + + threadManager->add(blockingTask); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 1); + + std::cout << "\t\t\t\tadd worker.. " << std::endl; + + threadManager->addWorker(); + { + Synchronized s(entryMonitor); + while (!blockingTask->_entered) { + entryMonitor.wait(); + } + } + + EXPECT(threadManager->workerCount(), 1); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl; + + { + Synchronized s(blockMonitor); + blocked = false; + blockMonitor.notifyAll(); + } + threadManager->removeWorker(); + + EXPECT(threadManager->workerCount(), 0); + EXPECT(threadManager->idleWorkerCount(), 0); + EXPECT(threadManager->pendingTaskCount(), 0); + + std::cout << "\t\t\t\tcleanup.. " << std::endl; + + blockingTask.reset(); + threadManager.reset(); + return true; + } +}; + +} +} +} +} // apache::thrift::concurrency + +using namespace apache::thrift::concurrency::test; diff --git a/src/jaegertracing/thrift/lib/cpp/test/concurrency/TimerManagerTests.h b/src/jaegertracing/thrift/lib/cpp/test/concurrency/TimerManagerTests.h new file mode 100644 index 000000000..2d1a2620a --- /dev/null +++ b/src/jaegertracing/thrift/lib/cpp/test/concurrency/TimerManagerTests.h @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include +#include +#include +#include + +namespace apache { +namespace thrift { +namespace concurrency { +namespace test { + +using namespace apache::thrift::concurrency; + +class TimerManagerTests { + +public: + class Task : public Runnable { + public: + Task(Monitor& monitor, uint64_t timeout) + : _timeout(timeout), + _startTime(std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count()), + _endTime(0), + _monitor(monitor), + _success(false), + _done(false) {} + + ~Task() override { std::cerr << this << std::endl; } + + void run() override { + + _endTime = std::chrono::duration_cast(std::chrono::steady_clock::now().time_since_epoch()).count(); + _success = (_endTime - _startTime) >= _timeout; + + { + Synchronized s(_monitor); + _done = true; + _monitor.notifyAll(); + } + } + + int64_t _timeout; + int64_t _startTime; + int64_t _endTime; + Monitor& _monitor; + bool _success; + bool _done; + }; + + /** + * This test creates two tasks and waits for the first to expire within 10% + * of the expected expiration time. It then verifies that the timer manager + * properly clean up itself and the remaining orphaned timeout task when the + * manager goes out of scope and its destructor is called. + */ + bool test00(uint64_t timeout = 1000LL) { + + shared_ptr orphanTask + = shared_ptr(new TimerManagerTests::Task(_monitor, 10 * timeout)); + + { + TimerManager timerManager; + timerManager.threadFactory(shared_ptr(new ThreadFactory())); + timerManager.start(); + if (timerManager.state() != TimerManager::STARTED) { + std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl; + return false; + } + + // Don't create task yet, because its constructor sets the expected completion time, and we + // need to delay between inserting the two tasks into the run queue. + shared_ptr task; + + { + Synchronized s(_monitor); + timerManager.add(orphanTask, 10 * timeout); + + std::this_thread::sleep_for(std::chrono::milliseconds(timeout)); + + task.reset(new TimerManagerTests::Task(_monitor, timeout)); + timerManager.add(task, timeout); + _monitor.wait(); + } + + if (!task->_done) { + std::cerr << "task is not done, but it should have executed" << std::endl; + return false; + } + + std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl; + } + + if (orphanTask->_done) { + std::cerr << "orphan task is done, but it should not have executed" << std::endl; + return false; + } + + return true; + } + + /** + * This test creates two tasks, removes the first one then waits for the second one. It then + * verifies that the timer manager properly clean up itself and the remaining orphaned timeout + * task when the manager goes out of scope and its destructor is called. + */ + bool test01(uint64_t timeout = 1000LL) { + TimerManager timerManager; + timerManager.threadFactory(shared_ptr(new ThreadFactory())); + timerManager.start(); + assert(timerManager.state() == TimerManager::STARTED); + + Synchronized s(_monitor); + + // Setup the two tasks + shared_ptr taskToRemove + = shared_ptr(new TimerManagerTests::Task(_monitor, timeout / 2)); + timerManager.add(taskToRemove, taskToRemove->_timeout); + + shared_ptr task + = shared_ptr(new TimerManagerTests::Task(_monitor, timeout)); + timerManager.add(task, task->_timeout); + + // Remove one task and wait until the other has completed + timerManager.remove(taskToRemove); + _monitor.wait(timeout * 2); + + assert(!taskToRemove->_done); + assert(task->_done); + + return true; + } + + /** + * This test creates two tasks with the same callback and another one, then removes the two + * duplicated then waits for the last one. It then verifies that the timer manager properly + * clean up itself and the remaining orphaned timeout task when the manager goes out of scope + * and its destructor is called. + */ + bool test02(uint64_t timeout = 1000LL) { + TimerManager timerManager; + timerManager.threadFactory(shared_ptr(new ThreadFactory())); + timerManager.start(); + assert(timerManager.state() == TimerManager::STARTED); + + Synchronized s(_monitor); + + // Setup the one tasks and add it twice + shared_ptr taskToRemove + = shared_ptr(new TimerManagerTests::Task(_monitor, timeout / 3)); + timerManager.add(taskToRemove, taskToRemove->_timeout); + timerManager.add(taskToRemove, taskToRemove->_timeout * 2); + + shared_ptr task + = shared_ptr(new TimerManagerTests::Task(_monitor, timeout)); + timerManager.add(task, task->_timeout); + + // Remove the first task (e.g. two timers) and wait until the other has completed + timerManager.remove(taskToRemove); + _monitor.wait(timeout * 2); + + assert(!taskToRemove->_done); + assert(task->_done); + + return true; + } + + /** + * This test creates two tasks, removes the first one then waits for the second one. It then + * verifies that the timer manager properly clean up itself and the remaining orphaned timeout + * task when the manager goes out of scope and its destructor is called. + */ + bool test03(uint64_t timeout = 1000LL) { + TimerManager timerManager; + timerManager.threadFactory(shared_ptr(new ThreadFactory())); + timerManager.start(); + assert(timerManager.state() == TimerManager::STARTED); + + Synchronized s(_monitor); + + // Setup the two tasks + shared_ptr taskToRemove + = shared_ptr(new TimerManagerTests::Task(_monitor, timeout / 2)); + TimerManager::Timer timer = timerManager.add(taskToRemove, taskToRemove->_timeout); + + shared_ptr task + = shared_ptr(new TimerManagerTests::Task(_monitor, timeout)); + timerManager.add(task, task->_timeout); + + // Remove one task and wait until the other has completed + timerManager.remove(timer); + _monitor.wait(timeout * 2); + + assert(!taskToRemove->_done); + assert(task->_done); + + // Verify behavior when removing the removed task + try { + timerManager.remove(timer); + assert(nullptr == "ERROR: This remove should send a NoSuchTaskException exception."); + } catch (NoSuchTaskException&) { + } + + return true; + } + + /** + * This test creates one task, and tries to remove it after it has expired. + */ + bool test04(uint64_t timeout = 1000LL) { + TimerManager timerManager; + timerManager.threadFactory(shared_ptr(new ThreadFactory())); + timerManager.start(); + assert(timerManager.state() == TimerManager::STARTED); + + Synchronized s(_monitor); + + // Setup the task + shared_ptr task + = shared_ptr(new TimerManagerTests::Task(_monitor, timeout / 10)); + TimerManager::Timer timer = timerManager.add(task, task->_timeout); + task.reset(); + + // Wait until the task has completed + _monitor.wait(timeout); + + // Verify behavior when removing the expired task + // notify is called inside the task so the task may still + // be running when we get here, so we need to loop... + for (;;) { + try { + timerManager.remove(timer); + assert(nullptr == "ERROR: This remove should throw NoSuchTaskException, or UncancellableTaskException."); + } catch (const NoSuchTaskException&) { + break; + } catch (const UncancellableTaskException&) { + // the thread was still exiting; try again... + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } + + return true; + } + + friend class TestTask; + + Monitor _monitor; +}; + +} +} +} +} // apache::thrift::concurrency -- cgit v1.2.3