/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ /* vim: set ts=8 sts=2 et sw=2 tw=80: */ /* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this file, * You can obtain one at http://mozilla.org/MPL/2.0/. */ #include "MultiWriterQueue.h" #include "DDTimeStamp.h" #include "mozilla/Assertions.h" #include "nsDeque.h" #include "nsIThread.h" #include "nsThreadUtils.h" #include #include using mozilla::MultiWriterQueue; using mozilla::MultiWriterQueueDefaultBufferSize; using mozilla::MultiWriterQueueReaderLocking_Mutex; using mozilla::MultiWriterQueueReaderLocking_None; template static void TestMultiWriterQueueST(const int loops) { using Q = MultiWriterQueue; Q q; int pushes = 0; // Go through 2 cycles of pushes&pops, to exercize reusable buffers. for (int max = loops; max <= loops * 2; max *= 2) { // Push all numbers. for (int i = 1; i <= max; ++i) { bool newBuffer = q.Push(i); // A new buffer should be added at the last push of each buffer. EXPECT_EQ(++pushes % BufferSize == 0, newBuffer); } // Pop numbers, should be FIFO. int x = 0; q.PopAll([&](int& i) { EXPECT_EQ(++x, i); }); // We should have got all numbers. EXPECT_EQ(max, x); // Nothing left. q.PopAll([&](int&) { EXPECT_TRUE(false); }); } } TEST(MultiWriterQueue, SingleThreaded) { TestMultiWriterQueueST<1>(10); TestMultiWriterQueueST<2>(10); TestMultiWriterQueueST<4>(10); TestMultiWriterQueueST<10>(9); TestMultiWriterQueueST<10>(10); TestMultiWriterQueueST<10>(11); TestMultiWriterQueueST<10>(19); TestMultiWriterQueueST<10>(20); TestMultiWriterQueueST<10>(21); TestMultiWriterQueueST<10>(999); TestMultiWriterQueueST<10>(1000); TestMultiWriterQueueST<10>(1001); TestMultiWriterQueueST<8192>(8192 * 4 + 1); } template static void TestMultiWriterQueueMT(int aWriterThreads, int aReaderThreads, int aTotalLoops, const char* aPrintPrefix) { Q q; const int threads = aWriterThreads + aReaderThreads; const int loops = aTotalLoops / aWriterThreads; nsIThread** array = new nsIThread*[threads]; mozilla::Atomic pushThreadsCompleted{0}; int pops = 0; nsCOMPtr popper = NS_NewRunnableFunction("MWQPopper", [&]() { // int popsBefore = pops; // int allocsBefore = q.AllocatedBuffersStats().mCount; q.PopAll([&pops](const int& i) { ++pops; }); // if (pops != popsBefore || // q.AllocatedBuffersStats().mCount != allocsBefore) { // printf("%s threads=1+%d loops/thread=%d pops=%d " // "buffers: live=%d (w %d) reusable=%d (w %d) " // "alloc=%d (w %d)\n", // aPrintPrefix, // aWriterThreads, // loops, // pops, // q.LiveBuffersStats().mCount, // q.LiveBuffersStats().mWatermark, // q.ReusableBuffersStats().mCount, // q.ReusableBuffersStats().mWatermark, // q.AllocatedBuffersStats().mCount, // q.AllocatedBuffersStats().mWatermark); // } }); // Cycle through reader threads. mozilla::Atomic readerThread{0}; double start = mozilla::ToSeconds(mozilla::DDNow()); for (int k = 0; k < threads; k++) { // First `aReaderThreads` threads to pop, all others to push. if (k < aReaderThreads) { nsCOMPtr t; nsresult rv = NS_NewNamedThread("MWQThread", getter_AddRefs(t)); EXPECT_TRUE(NS_SUCCEEDED(rv)); NS_ADDREF(array[k] = t); } else { nsCOMPtr t; nsCOMPtr r = NS_NewRunnableFunction("MWQPusher", [&, k]() { // Give a bit of breathing space to construct other threads. PR_Sleep(PR_MillisecondsToInterval(100)); for (int i = 0; i < loops; ++i) { if (q.Push(k * threads + i) && aReaderThreads != 0) { // Run a popper task every time we push the last element of a // buffer. array[++readerThread % aReaderThreads]->Dispatch( popper, nsIThread::DISPATCH_NORMAL); } } ++pushThreadsCompleted; }); nsresult rv = NS_NewNamedThread("MWQThread", getter_AddRefs(t), r); EXPECT_TRUE(NS_SUCCEEDED(rv)); NS_ADDREF(array[k] = t); } } for (int k = threads - 1; k >= 0; k--) { array[k]->Shutdown(); NS_RELEASE(array[k]); } delete[] array; // There may be a few more elements that haven't been read yet. q.PopAll([&pops](const int& i) { ++pops; }); const int pushes = aWriterThreads * loops; EXPECT_EQ(pushes, pops); q.PopAll([](const int& i) { EXPECT_TRUE(false); }); double duration = mozilla::ToSeconds(mozilla::DDNow()) - start - 0.1; printf( "%s threads=%dw+%dr loops/thread=%d pushes=pops=%d duration=%fs " "pushes/s=%f buffers: live=%d (w %d) reusable=%d (w %d) " "alloc=%d (w %d)\n", aPrintPrefix, aWriterThreads, aReaderThreads, loops, pushes, duration, pushes / duration, q.LiveBuffersStats().mCount, q.LiveBuffersStats().mWatermark, q.ReusableBuffersStats().mCount, q.ReusableBuffersStats().mWatermark, q.AllocatedBuffersStats().mCount, q.AllocatedBuffersStats().mWatermark); } // skip test on windows10-aarch64 due to unexpected test timeout at // MultiWriterSingleReader, bug 1526001 #if !defined(_M_ARM64) TEST(MultiWriterQueue, MultiWriterSingleReader) { // Small BufferSize, to exercize the buffer management code. TestMultiWriterQueueMT< MultiWriterQueue>( 1, 0, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 1, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 2, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 3, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 4, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 5, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 6, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 7, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 8, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 9, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 10, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 16, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 32, 1, 2 * 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 64, 1, 2 * 1024 * 1024, "MultiWriterQueue"); // A more real-life buffer size. TestMultiWriterQueueMT< MultiWriterQueue>( 64, 1, 2 * 1024 * 1024, "MultiWriterQueue"); // DEBUG-mode thread-safety checks should make the following (multi-reader // with no locking) crash; uncomment to verify. // TestMultiWriterQueueMT< // MultiWriterQueue>(64, 2, 2*1024*1024); } #endif // skip test on windows10-aarch64 due to unexpected test timeout at // MultiWriterMultiReade, bug 1526001 #if !defined(_M_ARM64) TEST(MultiWriterQueue, MultiWriterMultiReader) { static_assert( std::is_same_v< MultiWriterQueue, MultiWriterQueue>, "MultiWriterQueue reader locking should use Mutex by default"); // Small BufferSize, to exercize the buffer management code. TestMultiWriterQueueMT< MultiWriterQueue>( 1, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 2, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 3, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 4, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 5, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 6, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 7, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 8, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 9, 2, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 10, 4, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 16, 8, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 32, 16, 1024 * 1024, "MultiWriterQueue"); TestMultiWriterQueueMT< MultiWriterQueue>( 64, 32, 1024 * 1024, "MultiWriterQueue"); // A more real-life buffer size. TestMultiWriterQueueMT< MultiWriterQueue>( 64, 32, 1024 * 1024, "MultiWriterQueue"); } #endif // Single-threaded use only. struct DequeWrapperST { nsDeque mDQ; bool Push(int i) { mDQ.PushFront(reinterpret_cast(static_cast(i))); return true; } template void PopAll(F&& aF) { while (mDQ.GetSize() != 0) { int i = static_cast(reinterpret_cast(mDQ.Pop())); aF(i); } } struct CountAndWatermark { int mCount = 0; int mWatermark = 0; } mLiveBuffersStats, mReusableBuffersStats, mAllocatedBuffersStats; CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats; } CountAndWatermark ReusableBuffersStats() const { return mReusableBuffersStats; } CountAndWatermark AllocatedBuffersStats() const { return mAllocatedBuffersStats; } }; // Multi-thread (atomic) writes allowed, make sure you don't pop unless writes // can't happen. struct DequeWrapperAW : DequeWrapperST { mozilla::Atomic mWriting{false}; bool Push(int i) { while (!mWriting.compareExchange(false, true)) { } mDQ.PushFront(reinterpret_cast(static_cast(i))); mWriting = false; return true; } }; // Multi-thread writes allowed, make sure you don't pop unless writes can't // happen. struct DequeWrapperMW : DequeWrapperST { mozilla::Mutex mMutex; DequeWrapperMW() : mMutex("DequeWrapperMW/MT") {} bool Push(int i) { mozilla::MutexAutoLock lock(mMutex); mDQ.PushFront(reinterpret_cast(static_cast(i))); return true; } }; // Multi-thread read&writes allowed. struct DequeWrapperMT : DequeWrapperMW { template void PopAll(F&& aF) { while (mDQ.GetSize() != 0) { int i; { mozilla::MutexAutoLock lock(mMutex); i = static_cast(reinterpret_cast(mDQ.Pop())); } aF(i); } } }; TEST(MultiWriterQueue, nsDequeBenchmark) { TestMultiWriterQueueMT(1, 0, 2 * 1024 * 1024, "DequeWrapperST "); TestMultiWriterQueueMT(1, 0, 2 * 1024 * 1024, "DequeWrapperAW "); TestMultiWriterQueueMT(1, 0, 2 * 1024 * 1024, "DequeWrapperMW "); TestMultiWriterQueueMT(1, 0, 2 * 1024 * 1024, "DequeWrapperMT "); TestMultiWriterQueueMT(1, 1, 2 * 1024 * 1024, "DequeWrapperMT "); TestMultiWriterQueueMT(8, 0, 2 * 1024 * 1024, "DequeWrapperAW "); TestMultiWriterQueueMT(8, 0, 2 * 1024 * 1024, "DequeWrapperMW "); TestMultiWriterQueueMT(8, 0, 2 * 1024 * 1024, "DequeWrapperMT "); TestMultiWriterQueueMT(8, 1, 2 * 1024 * 1024, "DequeWrapperMT "); }