summaryrefslogtreecommitdiffstats
path: root/src/tools/immutable_object_cache/CacheSession.cc
blob: 38c38c97d44e67c7113e8f7ccf772ff2ff622cc7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include <boost/bind/bind.hpp>
#include "common/debug.h"
#include "common/ceph_context.h"
#include "CacheSession.h"

#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_immutable_obj_cache
#undef dout_prefix
#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \
                           << __func__ << ": "


namespace ceph {
namespace immutable_obj_cache {

CacheSession::CacheSession(io_service& io_service,
                           ProcessMsg processmsg,
                           CephContext* cct)
    : m_dm_socket(io_service),
      m_server_process_msg(processmsg), m_cct(cct) {
  m_bp_header = buffer::create(get_header_size());
}

CacheSession::~CacheSession() {
  close();
}

stream_protocol::socket& CacheSession::socket() {
  return m_dm_socket;
}

void CacheSession::set_client_version(const std::string &version) {
  m_client_version = version;
}

const std::string &CacheSession::client_version() const {
  return m_client_version;
}

void CacheSession::close() {
  if (m_dm_socket.is_open()) {
    boost::system::error_code close_ec;
    m_dm_socket.close(close_ec);
    if (close_ec) {
       ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
    }
  }
}

void CacheSession::start() {
  read_request_header();
}

void CacheSession::read_request_header() {
  ldout(m_cct, 20) << dendl;
  boost::asio::async_read(m_dm_socket,
    boost::asio::buffer(m_bp_header.c_str(), get_header_size()),
    boost::asio::transfer_exactly(get_header_size()),
    boost::bind(&CacheSession::handle_request_header,
     shared_from_this(), boost::asio::placeholders::error,
     boost::asio::placeholders::bytes_transferred));
}

void CacheSession::handle_request_header(const boost::system::error_code& err,
                                         size_t bytes_transferred) {
  ldout(m_cct, 20) << dendl;
  if (err || bytes_transferred != get_header_size()) {
    fault(err);
    return;
  }

  read_request_data(get_data_len(m_bp_header.c_str()));
}

void CacheSession::read_request_data(uint64_t data_len) {
  ldout(m_cct, 20) << dendl;
  bufferptr bp_data(buffer::create(data_len));
  boost::asio::async_read(m_dm_socket,
    boost::asio::buffer(bp_data.c_str(), bp_data.length()),
    boost::asio::transfer_exactly(data_len),
    boost::bind(&CacheSession::handle_request_data,
      shared_from_this(), bp_data, data_len,
      boost::asio::placeholders::error,
      boost::asio::placeholders::bytes_transferred));
}

void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len,
                                      const boost::system::error_code& err,
                                      size_t bytes_transferred) {
  ldout(m_cct, 20) << dendl;
  if (err || bytes_transferred != data_len) {
    fault(err);
    return;
  }

  bufferlist bl_data;

  bl_data.append(m_bp_header);
  bl_data.append(std::move(bp));

  ObjectCacheRequest* req = decode_object_cache_request(bl_data);

  process(req);
  delete req;
  read_request_header();
}

void CacheSession::process(ObjectCacheRequest* req) {
  ldout(m_cct, 20) << dendl;
  m_server_process_msg(this, req);
}

void CacheSession::send(ObjectCacheRequest* reply) {
  ldout(m_cct, 20) << dendl;
  bufferlist bl;
  reply->encode();
  bl.append(reply->get_payload_bufferlist());

  boost::asio::async_write(m_dm_socket,
        boost::asio::buffer(bl.c_str(), bl.length()),
        boost::asio::transfer_exactly(bl.length()),
        [this, bl, reply](const boost::system::error_code& err,
          size_t bytes_transferred) {
          delete reply;
          if (err || bytes_transferred != bl.length()) {
            fault(err);
            return;
          }
        });
}

void CacheSession::fault(const boost::system::error_code& ec) {
  ldout(m_cct, 20) << "session fault : " << ec.message() << dendl;
}

}  // namespace immutable_obj_cache
}  // namespace ceph