summaryrefslogtreecommitdiffstats
path: root/src/tools/immutable_object_cache/CacheController.cc
blob: ae16368392e9be0284dacb01a96263489e7f28c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include "CacheController.h"

#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_immutable_obj_cache
#undef dout_prefix
#define dout_prefix *_dout << "ceph::cache::CacheController: " << this << " " \
                           << __func__ << ": "

namespace ceph {
namespace immutable_obj_cache {

CacheController::CacheController(CephContext *cct,
                                 const std::vector<const char*> &args):
  m_args(args), m_cct(cct) {
  ldout(m_cct, 20) << dendl;
}

CacheController::~CacheController() {
  delete m_cache_server;
  delete m_object_cache_store;
}

int CacheController::init() {
  ldout(m_cct, 20) << dendl;
  m_object_cache_store = new ObjectCacheStore(m_cct);
  // TODO(dehao): make this configurable
  int r = m_object_cache_store->init(true);
  if (r < 0) {
    lderr(m_cct) << "init error\n" << dendl;
    return r;
  }

  r = m_object_cache_store->init_cache();
  if (r < 0) {
    lderr(m_cct) << "init error\n" << dendl;
  }

  return r;
}

int CacheController::shutdown() {
  ldout(m_cct, 20) << dendl;

  int r;
  if (m_cache_server != nullptr) {
    r = m_cache_server->stop();
    if (r < 0) {
      lderr(m_cct) << "stop error\n" << dendl;
      return r;
    }
  }

  r = m_object_cache_store->shutdown();
  if (r < 0) {
    lderr(m_cct) << "stop error\n" << dendl;
    return r;
  }

  return r;
}

void CacheController::handle_signal(int signum) {
  shutdown();
}

int CacheController::run() {
  try {
    std::string controller_path =
      m_cct->_conf.get_val<std::string>("immutable_object_cache_sock");
    if (controller_path.empty()) {
      lderr(m_cct) << "'immutable_object_cache_sock' path not set" << dendl;
      return -EINVAL;
    }

    std::remove(controller_path.c_str());

    m_cache_server = new CacheServer(m_cct, controller_path,
      std::bind(&CacheController::handle_request, this,
                std::placeholders::_1, std::placeholders::_2));

    int ret = m_cache_server->run();
    if (ret != 0) {
      return ret;
    }

    return 0;
  } catch (std::exception& e) {
    lderr(m_cct) << "Exception: " << e.what() << dendl;
    return -EFAULT;
  }
}

void CacheController::handle_request(CacheSession* session,
                                     ObjectCacheRequest* req) {
  ldout(m_cct, 20) << dendl;

  switch (req->get_request_type()) {
    case RBDSC_REGISTER: {
      // TODO(dehao): skip register and allow clients to lookup directly

      auto req_reg_data = reinterpret_cast <ObjectCacheRegData*> (req);
      session->set_client_version(req_reg_data->version);

      ObjectCacheRequest* reply = new ObjectCacheRegReplyData(
        RBDSC_REGISTER_REPLY, req->seq);
      session->send(reply);
      break;
    }
    case RBDSC_READ: {
      // lookup object in local cache store
      std::string cache_path;
      ObjectCacheReadData* req_read_data =
        reinterpret_cast <ObjectCacheReadData*> (req);
      bool return_dne_path = session->client_version().empty();
      int ret = m_object_cache_store->lookup_object(
        req_read_data->pool_namespace, req_read_data->pool_id,
        req_read_data->snap_id, req_read_data->object_size,
        req_read_data->oid, return_dne_path, cache_path);
      ObjectCacheRequest* reply = nullptr;
      if (ret != OBJ_CACHE_PROMOTED && ret != OBJ_CACHE_DNE) {
        reply = new ObjectCacheReadRadosData(RBDSC_READ_RADOS, req->seq);
      } else {
        reply = new ObjectCacheReadReplyData(RBDSC_READ_REPLY,
                                             req->seq, cache_path);
      }
      session->send(reply);
      break;
    }
    default:
      ldout(m_cct, 5) << "can't recongize request" << dendl;
      ceph_assert(0);
  }
}

}  // namespace immutable_obj_cache
}  // namespace ceph