diff options
Diffstat (limited to 'src/jaegertracing/opentelemetry-cpp/sdk/test/common/circular_buffer_test.cc')
-rw-r--r-- | src/jaegertracing/opentelemetry-cpp/sdk/test/common/circular_buffer_test.cc | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/src/jaegertracing/opentelemetry-cpp/sdk/test/common/circular_buffer_test.cc b/src/jaegertracing/opentelemetry-cpp/sdk/test/common/circular_buffer_test.cc new file mode 100644 index 000000000..a20c3e42a --- /dev/null +++ b/src/jaegertracing/opentelemetry-cpp/sdk/test/common/circular_buffer_test.cc @@ -0,0 +1,161 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +#include "opentelemetry/sdk/common/circular_buffer.h" + +#include <algorithm> +#include <cassert> +#include <random> +#include <thread> + +#include <gtest/gtest.h> +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBuffer; +using opentelemetry::sdk::common::CircularBufferRange; + +static thread_local std::mt19937 RandomNumberGenerator{std::random_device{}()}; + +static void GenerateRandomNumbers(CircularBuffer<uint32_t> &buffer, + std::vector<uint32_t> &numbers, + int n) +{ + for (int i = 0; i < n; ++i) + { + auto value = static_cast<uint32_t>(RandomNumberGenerator()); + std::unique_ptr<uint32_t> x{new uint32_t{value}}; + if (buffer.Add(x)) + { + numbers.push_back(value); + } + } +} + +static void RunNumberProducers(CircularBuffer<uint32_t> &buffer, + std::vector<uint32_t> &numbers, + int num_threads, + int n) +{ + std::vector<std::vector<uint32_t>> thread_numbers(num_threads); + std::vector<std::thread> threads(num_threads); + for (int thread_index = 0; thread_index < num_threads; ++thread_index) + { + threads[thread_index] = std::thread{GenerateRandomNumbers, std::ref(buffer), + std::ref(thread_numbers[thread_index]), n}; + } + for (auto &thread : threads) + { + thread.join(); + } + for (int thread_index = 0; thread_index < num_threads; ++thread_index) + { + numbers.insert(numbers.end(), thread_numbers[thread_index].begin(), + thread_numbers[thread_index].end()); + } +} + +void RunNumberConsumer(CircularBuffer<uint32_t> &buffer, + std::atomic<bool> &exit, + std::vector<uint32_t> &numbers) +{ + while (true) + { + if (exit && buffer.Peek().empty()) + { + return; + } + auto n = std::uniform_int_distribution<size_t>{0, buffer.Peek().size()}(RandomNumberGenerator); + buffer.Consume(n, [&](CircularBufferRange<AtomicUniquePtr<uint32_t>> range) noexcept { + assert(range.size() == n); + range.ForEach([&](AtomicUniquePtr<uint32_t> &ptr) noexcept { + assert(!ptr.IsNull()); + numbers.push_back(*ptr); + ptr.Reset(); + return true; + }); + }); + } +} + +TEST(CircularBufferTest, Add) +{ + CircularBuffer<int> buffer{10}; + + std::unique_ptr<int> x{new int{11}}; + EXPECT_TRUE(buffer.Add(x)); + EXPECT_EQ(x, nullptr); + auto range = buffer.Peek(); + EXPECT_EQ(range.size(), 1); + range.ForEach([](const AtomicUniquePtr<int> &y) { + EXPECT_EQ(*y, 11); + return true; + }); +} + +TEST(CircularBufferTest, Clear) +{ + CircularBuffer<int> buffer{10}; + + std::unique_ptr<int> x{new int{11}}; + EXPECT_TRUE(buffer.Add(x)); + EXPECT_EQ(x, nullptr); + buffer.Clear(); + EXPECT_TRUE(buffer.empty()); +} + +TEST(CircularBufferTest, AddOnFull) +{ + CircularBuffer<int> buffer{10}; + for (int i = 0; i < static_cast<int>(buffer.max_size()); ++i) + { + std::unique_ptr<int> x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + std::unique_ptr<int> x{new int{33}}; + EXPECT_FALSE(buffer.Add(x)); + EXPECT_NE(x, nullptr); + EXPECT_EQ(*x, 33); +} + +TEST(CircularBufferTest, Consume) +{ + CircularBuffer<int> buffer{10}; + for (int i = 0; i < static_cast<int>(buffer.max_size()); ++i) + { + std::unique_ptr<int> x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + int count = 0; + buffer.Consume(5, [&](CircularBufferRange<AtomicUniquePtr<int>> range) noexcept { + range.ForEach([&](AtomicUniquePtr<int> &ptr) { + EXPECT_EQ(*ptr, count++); + ptr.Reset(); + return true; + }); + }); + EXPECT_EQ(count, 5); +} + +TEST(CircularBufferTest, Simulation) +{ + const int num_producer_threads = 4; + const int n = 25000; + for (size_t max_size : {1, 2, 10, 50, 100, 1000}) + { + CircularBuffer<uint32_t> buffer{max_size}; + std::vector<uint32_t> producer_numbers; + std::vector<uint32_t> consumer_numbers; + auto producers = std::thread{RunNumberProducers, std::ref(buffer), std::ref(producer_numbers), + num_producer_threads, n}; + std::atomic<bool> exit{false}; + auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), std::ref(exit), + std::ref(consumer_numbers)}; + producers.join(); + exit = true; + consumer.join(); + std::sort(producer_numbers.begin(), producer_numbers.end()); + std::sort(consumer_numbers.begin(), consumer_numbers.end()); + + EXPECT_EQ(producer_numbers.size(), consumer_numbers.size()); + EXPECT_EQ(producer_numbers, consumer_numbers); + } +} |