summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/opentelemetry-cpp/sdk/test/logs/batch_log_processor_test.cc
blob: 63e44676c83cfc704d5352cce4599f97d1f11575 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#ifdef ENABLE_LOGS_PREVIEW

#  include "opentelemetry/sdk/logs/batch_log_processor.h"
#  include "opentelemetry/sdk/logs/exporter.h"
#  include "opentelemetry/sdk/logs/log_record.h"

#  include <gtest/gtest.h>
#  include <chrono>
#  include <thread>

using namespace opentelemetry::sdk::logs;
using namespace opentelemetry::sdk::common;

/**
 * A sample log exporter
 * for testing the batch log processor
 */
class MockLogExporter final : public LogExporter
{
public:
  MockLogExporter(std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received,
                  std::shared_ptr<std::atomic<bool>> is_shutdown,
                  std::shared_ptr<std::atomic<bool>> is_export_completed,
                  const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0))
      : logs_received_(logs_received),
        is_shutdown_(is_shutdown),
        is_export_completed_(is_export_completed),
        export_delay_(export_delay)
  {}

  std::unique_ptr<Recordable> MakeRecordable() noexcept
  {
    return std::unique_ptr<Recordable>(new LogRecord());
  }

  // Export method stores the logs received into a shared list of record names
  ExportResult Export(
      const opentelemetry::nostd::span<std::unique_ptr<Recordable>> &records) noexcept override
  {
    *is_export_completed_ = false;  // Meant exclusively to test scheduled_delay_millis

    for (auto &record : records)
    {
      auto log = std::unique_ptr<LogRecord>(static_cast<LogRecord *>(record.release()));
      if (log != nullptr)
      {
        logs_received_->push_back(std::move(log));
      }
    }

    *is_export_completed_ = true;
    return ExportResult::kSuccess;
  }

  // toggles the boolean flag marking this exporter as shut down
  bool Shutdown(
      std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override
  {
    *is_shutdown_ = true;
    return true;
  }

private:
  std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received_;
  std::shared_ptr<std::atomic<bool>> is_shutdown_;
  std::shared_ptr<std::atomic<bool>> is_export_completed_;
  const std::chrono::milliseconds export_delay_;
};

/**
 * A fixture class for testing the BatchLogProcessor class that uses the TestExporter defined above.
 */
class BatchLogProcessorTest : public testing::Test  // ::testing::Test
{
public:
  // returns a batch log processor that received a batch of log records, a shared pointer to a
  // is_shutdown flag, and the processor configuration options (default if unspecified)
  std::shared_ptr<LogProcessor> GetMockProcessor(
      std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received,
      std::shared_ptr<std::atomic<bool>> is_shutdown,
      std::shared_ptr<std::atomic<bool>> is_export_completed =
          std::shared_ptr<std::atomic<bool>>(new std::atomic<bool>(false)),
      const std::chrono::milliseconds export_delay           = std::chrono::milliseconds(0),
      const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
      const size_t max_queue_size                            = 2048,
      const size_t max_export_batch_size                     = 512)
  {
    return std::shared_ptr<LogProcessor>(
        new BatchLogProcessor(std::unique_ptr<LogExporter>(new MockLogExporter(
                                  logs_received, is_shutdown, is_export_completed, export_delay)),
                              max_queue_size, scheduled_delay_millis, max_export_batch_size));
  }
};

TEST_F(BatchLogProcessorTest, TestShutdown)
{
  // initialize a batch log processor with the test exporter
  std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
      new std::vector<std::unique_ptr<LogRecord>>);
  std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));

  auto batch_processor = GetMockProcessor(logs_received, is_shutdown);

  // Create a few test log records and send them to the processor
  const int num_logs = 3;

  for (int i = 0; i < num_logs; ++i)
  {
    auto log = batch_processor->MakeRecordable();
    log->SetBody("Log" + std::to_string(i));
    batch_processor->OnReceive(std::move(log));
  }

  // Test that shutting down the processor will first wait for the
  // current batch of logs to be sent to the log exporter
  // by checking the number of logs sent and the names of the logs sent
  EXPECT_EQ(true, batch_processor->Shutdown());
  // It's safe to shutdown again
  EXPECT_TRUE(batch_processor->Shutdown());

  EXPECT_EQ(num_logs, logs_received->size());

  // Assume logs are received by exporter in same order as sent by processor
  for (int i = 0; i < num_logs; ++i)
  {
    EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody());
  }

  // Also check that the processor is shut down at the end
  EXPECT_TRUE(is_shutdown->load());
}

TEST_F(BatchLogProcessorTest, TestForceFlush)
{
  std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
      new std::vector<std::unique_ptr<LogRecord>>);

  auto batch_processor = GetMockProcessor(logs_received, is_shutdown);
  const int num_logs   = 2048;

  for (int i = 0; i < num_logs; ++i)
  {
    auto log = batch_processor->MakeRecordable();
    log->SetBody("Log" + std::to_string(i));
    batch_processor->OnReceive(std::move(log));
  }

  EXPECT_TRUE(batch_processor->ForceFlush());

  EXPECT_EQ(num_logs, logs_received->size());
  for (int i = 0; i < num_logs; ++i)
  {
    EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody());
  }

  // Create some more logs to make sure that the processor still works
  for (int i = 0; i < num_logs; ++i)
  {
    auto log = batch_processor->MakeRecordable();
    log->SetBody("Log" + std::to_string(i));
    batch_processor->OnReceive(std::move(log));
  }

  EXPECT_TRUE(batch_processor->ForceFlush());

  EXPECT_EQ(num_logs * 2, logs_received->size());
  for (int i = 0; i < num_logs * 2; ++i)
  {
    EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetBody());
  }
}

TEST_F(BatchLogProcessorTest, TestManyLogsLoss)
{
  /* Test that when exporting more than max_queue_size logs, some are most likely lost*/

  std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
      new std::vector<std::unique_ptr<LogRecord>>);

  const int max_queue_size = 4096;

  auto batch_processor = GetMockProcessor(logs_received, is_shutdown);

  // Create max_queue_size log records
  for (int i = 0; i < max_queue_size; ++i)
  {
    auto log = batch_processor->MakeRecordable();
    log->SetBody("Log" + std::to_string(i));
    batch_processor->OnReceive(std::move(log));
  }

  EXPECT_TRUE(batch_processor->ForceFlush());

  // Log should be exported by now
  EXPECT_GE(max_queue_size, logs_received->size());
}

TEST_F(BatchLogProcessorTest, TestManyLogsLossLess)
{
  /* Test that no logs are lost when sending max_queue_size logs */

  std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
      new std::vector<std::unique_ptr<LogRecord>>);
  auto batch_processor = GetMockProcessor(logs_received, is_shutdown);

  const int num_logs = 2048;

  for (int i = 0; i < num_logs; ++i)
  {
    auto log = batch_processor->MakeRecordable();
    log->SetBody("Log" + std::to_string(i));
    batch_processor->OnReceive(std::move(log));
  }

  EXPECT_TRUE(batch_processor->ForceFlush());

  EXPECT_EQ(num_logs, logs_received->size());
  for (int i = 0; i < num_logs; ++i)
  {
    EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody());
  }
}

TEST_F(BatchLogProcessorTest, TestScheduledDelayMillis)
{
  /* Test that max_export_batch_size logs are exported every scheduled_delay_millis
     seconds */

  std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  std::shared_ptr<std::atomic<bool>> is_export_completed(new std::atomic<bool>(false));
  std::shared_ptr<std::vector<std::unique_ptr<LogRecord>>> logs_received(
      new std::vector<std::unique_ptr<LogRecord>>);

  const std::chrono::milliseconds export_delay(0);
  const std::chrono::milliseconds scheduled_delay_millis(2000);
  const size_t max_export_batch_size = 512;

  auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed,
                                          export_delay, scheduled_delay_millis);

  for (std::size_t i = 0; i < max_export_batch_size; ++i)
  {
    auto log = batch_processor->MakeRecordable();
    log->SetBody("Log" + std::to_string(i));
    batch_processor->OnReceive(std::move(log));
  }
  // Sleep for scheduled_delay_millis milliseconds
  std::this_thread::sleep_for(scheduled_delay_millis);

  // small delay to give time to export, which is being performed
  // asynchronously by the worker thread (this thread will not
  // forcibly join() the main thread unless processor's shutdown() is called).
  std::this_thread::sleep_for(std::chrono::milliseconds(50));

  // Logs should be exported by now
  EXPECT_TRUE(is_export_completed->load());
  EXPECT_EQ(max_export_batch_size, logs_received->size());
  for (size_t i = 0; i < max_export_batch_size; ++i)
  {
    EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetBody());
  }
}
#endif