diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/tools/immutable_object_cache/CacheSession.cc | |
parent | Initial commit. (diff) | |
download | ceph-upstream/18.2.2.tar.xz ceph-upstream/18.2.2.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/tools/immutable_object_cache/CacheSession.cc')
-rw-r--r-- | src/tools/immutable_object_cache/CacheSession.cc | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/src/tools/immutable_object_cache/CacheSession.cc b/src/tools/immutable_object_cache/CacheSession.cc new file mode 100644 index 000000000..38c38c97d --- /dev/null +++ b/src/tools/immutable_object_cache/CacheSession.cc @@ -0,0 +1,140 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include <boost/bind/bind.hpp> +#include "common/debug.h" +#include "common/ceph_context.h" +#include "CacheSession.h" + +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_immutable_obj_cache +#undef dout_prefix +#define dout_prefix *_dout << "ceph::cache::CacheSession: " << this << " " \ + << __func__ << ": " + + +namespace ceph { +namespace immutable_obj_cache { + +CacheSession::CacheSession(io_service& io_service, + ProcessMsg processmsg, + CephContext* cct) + : m_dm_socket(io_service), + m_server_process_msg(processmsg), m_cct(cct) { + m_bp_header = buffer::create(get_header_size()); +} + +CacheSession::~CacheSession() { + close(); +} + +stream_protocol::socket& CacheSession::socket() { + return m_dm_socket; +} + +void CacheSession::set_client_version(const std::string &version) { + m_client_version = version; +} + +const std::string &CacheSession::client_version() const { + return m_client_version; +} + +void CacheSession::close() { + if (m_dm_socket.is_open()) { + boost::system::error_code close_ec; + m_dm_socket.close(close_ec); + if (close_ec) { + ldout(m_cct, 20) << "close: " << close_ec.message() << dendl; + } + } +} + +void CacheSession::start() { + read_request_header(); +} + +void CacheSession::read_request_header() { + ldout(m_cct, 20) << dendl; + boost::asio::async_read(m_dm_socket, + boost::asio::buffer(m_bp_header.c_str(), get_header_size()), + boost::asio::transfer_exactly(get_header_size()), + boost::bind(&CacheSession::handle_request_header, + shared_from_this(), boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); +} + +void CacheSession::handle_request_header(const boost::system::error_code& err, + size_t bytes_transferred) { + ldout(m_cct, 20) << dendl; + if (err || bytes_transferred != get_header_size()) { + fault(err); + return; + } + + read_request_data(get_data_len(m_bp_header.c_str())); +} + +void CacheSession::read_request_data(uint64_t data_len) { + ldout(m_cct, 20) << dendl; + bufferptr bp_data(buffer::create(data_len)); + boost::asio::async_read(m_dm_socket, + boost::asio::buffer(bp_data.c_str(), bp_data.length()), + boost::asio::transfer_exactly(data_len), + boost::bind(&CacheSession::handle_request_data, + shared_from_this(), bp_data, data_len, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); +} + +void CacheSession::handle_request_data(bufferptr bp, uint64_t data_len, + const boost::system::error_code& err, + size_t bytes_transferred) { + ldout(m_cct, 20) << dendl; + if (err || bytes_transferred != data_len) { + fault(err); + return; + } + + bufferlist bl_data; + + bl_data.append(m_bp_header); + bl_data.append(std::move(bp)); + + ObjectCacheRequest* req = decode_object_cache_request(bl_data); + + process(req); + delete req; + read_request_header(); +} + +void CacheSession::process(ObjectCacheRequest* req) { + ldout(m_cct, 20) << dendl; + m_server_process_msg(this, req); +} + +void CacheSession::send(ObjectCacheRequest* reply) { + ldout(m_cct, 20) << dendl; + bufferlist bl; + reply->encode(); + bl.append(reply->get_payload_bufferlist()); + + boost::asio::async_write(m_dm_socket, + boost::asio::buffer(bl.c_str(), bl.length()), + boost::asio::transfer_exactly(bl.length()), + [this, bl, reply](const boost::system::error_code& err, + size_t bytes_transferred) { + delete reply; + if (err || bytes_transferred != bl.length()) { + fault(err); + return; + } + }); +} + +void CacheSession::fault(const boost::system::error_code& ec) { + ldout(m_cct, 20) << "session fault : " << ec.message() << dendl; +} + +} // namespace immutable_obj_cache +} // namespace ceph |