// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 #include "opentelemetry/sdk/common/circular_buffer.h" #include #include #include #include #include using opentelemetry::sdk::common::AtomicUniquePtr; using opentelemetry::sdk::common::CircularBuffer; using opentelemetry::sdk::common::CircularBufferRange; static thread_local std::mt19937 RandomNumberGenerator{std::random_device{}()}; static void GenerateRandomNumbers(CircularBuffer &buffer, std::vector &numbers, int n) { for (int i = 0; i < n; ++i) { auto value = static_cast(RandomNumberGenerator()); std::unique_ptr x{new uint32_t{value}}; if (buffer.Add(x)) { numbers.push_back(value); } } } static void RunNumberProducers(CircularBuffer &buffer, std::vector &numbers, int num_threads, int n) { std::vector> thread_numbers(num_threads); std::vector threads(num_threads); for (int thread_index = 0; thread_index < num_threads; ++thread_index) { threads[thread_index] = std::thread{GenerateRandomNumbers, std::ref(buffer), std::ref(thread_numbers[thread_index]), n}; } for (auto &thread : threads) { thread.join(); } for (int thread_index = 0; thread_index < num_threads; ++thread_index) { numbers.insert(numbers.end(), thread_numbers[thread_index].begin(), thread_numbers[thread_index].end()); } } void RunNumberConsumer(CircularBuffer &buffer, std::atomic &exit, std::vector &numbers) { while (true) { if (exit && buffer.Peek().empty()) { return; } auto n = std::uniform_int_distribution{0, buffer.Peek().size()}(RandomNumberGenerator); buffer.Consume(n, [&](CircularBufferRange> range) noexcept { assert(range.size() == n); range.ForEach([&](AtomicUniquePtr &ptr) noexcept { assert(!ptr.IsNull()); numbers.push_back(*ptr); ptr.Reset(); return true; }); }); } } TEST(CircularBufferTest, Add) { CircularBuffer buffer{10}; std::unique_ptr x{new int{11}}; EXPECT_TRUE(buffer.Add(x)); EXPECT_EQ(x, nullptr); auto range = buffer.Peek(); EXPECT_EQ(range.size(), 1); range.ForEach([](const AtomicUniquePtr &y) { EXPECT_EQ(*y, 11); return true; }); } TEST(CircularBufferTest, Clear) { CircularBuffer buffer{10}; std::unique_ptr x{new int{11}}; EXPECT_TRUE(buffer.Add(x)); EXPECT_EQ(x, nullptr); buffer.Clear(); EXPECT_TRUE(buffer.empty()); } TEST(CircularBufferTest, AddOnFull) { CircularBuffer buffer{10}; for (int i = 0; i < static_cast(buffer.max_size()); ++i) { std::unique_ptr x{new int{i}}; EXPECT_TRUE(buffer.Add(x)); } std::unique_ptr x{new int{33}}; EXPECT_FALSE(buffer.Add(x)); EXPECT_NE(x, nullptr); EXPECT_EQ(*x, 33); } TEST(CircularBufferTest, Consume) { CircularBuffer buffer{10}; for (int i = 0; i < static_cast(buffer.max_size()); ++i) { std::unique_ptr x{new int{i}}; EXPECT_TRUE(buffer.Add(x)); } int count = 0; buffer.Consume(5, [&](CircularBufferRange> range) noexcept { range.ForEach([&](AtomicUniquePtr &ptr) { EXPECT_EQ(*ptr, count++); ptr.Reset(); return true; }); }); EXPECT_EQ(count, 5); } TEST(CircularBufferTest, Simulation) { const int num_producer_threads = 4; const int n = 25000; for (size_t max_size : {1, 2, 10, 50, 100, 1000}) { CircularBuffer buffer{max_size}; std::vector producer_numbers; std::vector consumer_numbers; auto producers = std::thread{RunNumberProducers, std::ref(buffer), std::ref(producer_numbers), num_producer_threads, n}; std::atomic exit{false}; auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), std::ref(exit), std::ref(consumer_numbers)}; producers.join(); exit = true; consumer.join(); std::sort(producer_numbers.begin(), producer_numbers.end()); std::sort(consumer_numbers.begin(), consumer_numbers.end()); EXPECT_EQ(producer_numbers.size(), consumer_numbers.size()); EXPECT_EQ(producer_numbers, consumer_numbers); } }