summaryrefslogtreecommitdiffstats
path: root/src/test/immutable_object_cache/test_multi_session.cc
blob: c0c629ab036435bf1f7645427e4ca2ce07248069 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include <iostream>
#include <unistd.h>

#include "gtest/gtest.h"
#include "include/Context.h"
#include "global/global_init.h"
#include "global/global_context.h"

#include "test/immutable_object_cache/test_common.h"
#include "tools/immutable_object_cache/CacheClient.h"
#include "tools/immutable_object_cache/CacheServer.h"

using namespace ceph::immutable_obj_cache;

class TestMultiSession : public ::testing::Test {
public:
  std::string m_local_path;
  CacheServer* m_cache_server;
  std::thread* m_cache_server_thread;
  std::vector<CacheClient*> m_cache_client_vec;
  WaitEvent m_wait_event;
  std::atomic<uint64_t> m_send_request_index;
  std::atomic<uint64_t> m_recv_ack_index;
  uint64_t m_session_num = 110;

  TestMultiSession() : m_local_path("/tmp/ceph_test_multisession_socket"),
                       m_cache_server_thread(nullptr),  m_send_request_index(0),
                       m_recv_ack_index(0) {
    m_cache_client_vec.resize(m_session_num + 1, nullptr);
  }

  ~TestMultiSession() {}

  static void SetUpTestCase() {}
  static void TearDownTestCase() {}

  void SetUp() override {
    std::remove(m_local_path.c_str());
    m_cache_server = new CacheServer(g_ceph_context, m_local_path,
      [this](CacheSession* session_id, ObjectCacheRequest* req){
        server_handle_request(session_id, req);
    });
    ASSERT_TRUE(m_cache_server != nullptr);

    m_cache_server_thread = new std::thread(([this]() {
        m_wait_event.signal();
        m_cache_server->run();
    }));

    // waiting for thread running.
    m_wait_event.wait();

    // waiting for io_service run.
    usleep(2);
  }

  void TearDown() override {
    for (uint64_t i = 0; i < m_session_num; i++) {
      if (m_cache_client_vec[i] != nullptr) {
        m_cache_client_vec[i]->close();
        delete m_cache_client_vec[i];
      }
    }
    m_cache_server->stop();
    if (m_cache_server_thread->joinable()) {
      m_cache_server_thread->join();
    }
    delete m_cache_server;
    delete m_cache_server_thread;

    std::remove(m_local_path.c_str());
  }

  CacheClient* create_session(uint64_t random_index) {
     CacheClient* cache_client = new CacheClient(m_local_path, g_ceph_context);
     cache_client->run();
     while (true) {
       if (0 == cache_client->connect()) {
         break;
       }
     }
    m_cache_client_vec[random_index] = cache_client;
    return cache_client;
  }

  void server_handle_request(CacheSession* session_id, ObjectCacheRequest* req) {

    switch (req->get_request_type()) {
      case RBDSC_REGISTER: {
        ObjectCacheRequest* reply = new ObjectCacheRegReplyData(RBDSC_REGISTER_REPLY,
                                                                req->seq);
        session_id->send(reply);
        break;
      }
      case RBDSC_READ: {
        ObjectCacheRequest* reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
                                                                req->seq);
        session_id->send(reply);
        break;
      }
    }
  }

  void test_register_client(uint64_t random_index) {
    ASSERT_TRUE(m_cache_client_vec[random_index] == nullptr);

    auto ctx = new LambdaContext([](int ret){
       ASSERT_TRUE(ret == 0);
    });
    auto session = create_session(random_index);
    session->register_client(ctx);

    ASSERT_TRUE(m_cache_client_vec[random_index] != nullptr);
    ASSERT_TRUE(session->is_session_work());
  }

  void test_lookup_object(std::string pool_nspace, uint64_t index,
                          uint64_t request_num, bool is_last) {

    for (uint64_t i = 0; i < request_num; i++) {
      auto ctx = make_gen_lambda_context<ObjectCacheRequest*,
            std::function<void(ObjectCacheRequest*)>>([this](ObjectCacheRequest* ack) {
        m_recv_ack_index++;
      });
      m_send_request_index++;
      // here just for concurrently testing register + lookup, so fix object id.
      m_cache_client_vec[index]->lookup_object(pool_nspace, 1, 2, 3, "1234", std::move(ctx));
    }

    if (is_last) {
      while(m_send_request_index != m_recv_ack_index) {
        usleep(1);
      }
      m_wait_event.signal();
    }
  }
};

// test concurrent : multi-session + register_client + lookup_request
TEST_F(TestMultiSession, test_multi_session) {

  uint64_t test_times = 1000;
  uint64_t test_session_num = 100;

  for (uint64_t i = 0; i <= test_times; i++) {
    uint64_t random_index = random() % test_session_num;
    if (m_cache_client_vec[random_index] == nullptr) {
      test_register_client(random_index);
    } else {
      test_lookup_object(string("test_nspace") + std::to_string(random_index),
                         random_index, 4, i == test_times ? true : false);
    }
  }

  // make sure all ack will be received.
  m_wait_event.wait();

  ASSERT_TRUE(m_send_request_index == m_recv_ack_index);
}