summaryrefslogtreecommitdiffstats
path: root/src/tools/immutable_object_cache/CacheClient.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/tools/immutable_object_cache/CacheClient.cc')
-rw-r--r--src/tools/immutable_object_cache/CacheClient.cc435
1 files changed, 435 insertions, 0 deletions
diff --git a/src/tools/immutable_object_cache/CacheClient.cc b/src/tools/immutable_object_cache/CacheClient.cc
new file mode 100644
index 000000000..2b837be51
--- /dev/null
+++ b/src/tools/immutable_object_cache/CacheClient.cc
@@ -0,0 +1,435 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <boost/bind/bind.hpp>
+#include "CacheClient.h"
+#include "common/Cond.h"
+#include "common/version.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_immutable_obj_cache
+#undef dout_prefix
+#define dout_prefix *_dout << "ceph::cache::CacheClient: " << this << " " \
+ << __func__ << ": "
+
+namespace ceph {
+namespace immutable_obj_cache {
+
+ CacheClient::CacheClient(const std::string& file, CephContext* ceph_ctx)
+ : m_cct(ceph_ctx), m_io_service_work(m_io_service),
+ m_dm_socket(m_io_service), m_ep(stream_protocol::endpoint(file)),
+ m_io_thread(nullptr), m_session_work(false), m_writing(false),
+ m_reading(false), m_sequence_id(0) {
+ m_worker_thread_num =
+ m_cct->_conf.get_val<uint64_t>(
+ "immutable_object_cache_client_dedicated_thread_num");
+
+ if (m_worker_thread_num != 0) {
+ m_worker = new boost::asio::io_service();
+ m_worker_io_service_work = new boost::asio::io_service::work(*m_worker);
+ for (uint64_t i = 0; i < m_worker_thread_num; i++) {
+ std::thread* thd = new std::thread([this](){m_worker->run();});
+ m_worker_threads.push_back(thd);
+ }
+ }
+ m_bp_header = buffer::create(get_header_size());
+ }
+
+ CacheClient::~CacheClient() {
+ stop();
+ }
+
+ void CacheClient::run() {
+ m_io_thread.reset(new std::thread([this](){m_io_service.run(); }));
+ }
+
+ bool CacheClient::is_session_work() {
+ return m_session_work.load() == true;
+ }
+
+ int CacheClient::stop() {
+ m_session_work.store(false);
+ m_io_service.stop();
+
+ if (m_io_thread != nullptr) {
+ m_io_thread->join();
+ }
+ if (m_worker_thread_num != 0) {
+ m_worker->stop();
+ for (auto thd : m_worker_threads) {
+ thd->join();
+ delete thd;
+ }
+ delete m_worker_io_service_work;
+ delete m_worker;
+ }
+ return 0;
+ }
+
+ // close domain socket
+ void CacheClient::close() {
+ m_session_work.store(false);
+ boost::system::error_code close_ec;
+ m_dm_socket.close(close_ec);
+ if (close_ec) {
+ ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
+ }
+ }
+
+ // sync connect
+ int CacheClient::connect() {
+ int ret = -1;
+ C_SaferCond cond;
+ Context* on_finish = new LambdaContext([&cond, &ret](int err) {
+ ret = err;
+ cond.complete(err);
+ });
+
+ connect(on_finish);
+ cond.wait();
+
+ return ret;
+ }
+
+ // async connect
+ void CacheClient::connect(Context* on_finish) {
+ m_dm_socket.async_connect(m_ep,
+ boost::bind(&CacheClient::handle_connect, this,
+ on_finish, boost::asio::placeholders::error));
+ }
+
+ void CacheClient::handle_connect(Context* on_finish,
+ const boost::system::error_code& err) {
+ if (err) {
+ ldout(m_cct, 20) << "fails to connect to cache server. error : "
+ << err.message() << dendl;
+ fault(ASIO_ERROR_CONNECT, err);
+ on_finish->complete(-1);
+ return;
+ }
+
+ ldout(m_cct, 20) << "successfully connected to cache server." << dendl;
+ on_finish->complete(0);
+ }
+
+ void CacheClient::lookup_object(std::string pool_nspace, uint64_t pool_id,
+ uint64_t snap_id, uint64_t object_size,
+ std::string oid,
+ CacheGenContextURef&& on_finish) {
+ ldout(m_cct, 20) << dendl;
+ ObjectCacheRequest* req = new ObjectCacheReadData(RBDSC_READ,
+ ++m_sequence_id, 0, 0, pool_id,
+ snap_id, object_size, oid, pool_nspace);
+ req->process_msg = std::move(on_finish);
+ req->encode();
+
+ {
+ std::lock_guard locker{m_lock};
+ m_outcoming_bl.append(req->get_payload_bufferlist());
+ ceph_assert(m_seq_to_req.find(req->seq) == m_seq_to_req.end());
+ m_seq_to_req[req->seq] = req;
+ }
+
+ // try to send message to server.
+ try_send();
+
+ // try to receive ack from server.
+ try_receive();
+ }
+
+ void CacheClient::try_send() {
+ ldout(m_cct, 20) << dendl;
+ if (!m_writing.load()) {
+ m_writing.store(true);
+ send_message();
+ }
+ }
+
+ void CacheClient::send_message() {
+ ldout(m_cct, 20) << dendl;
+ bufferlist bl;
+ {
+ std::lock_guard locker{m_lock};
+ bl.swap(m_outcoming_bl);
+ ceph_assert(m_outcoming_bl.length() == 0);
+ }
+
+ // send bytes as many as possible.
+ boost::asio::async_write(m_dm_socket,
+ boost::asio::buffer(bl.c_str(), bl.length()),
+ boost::asio::transfer_exactly(bl.length()),
+ [this, bl](const boost::system::error_code& err, size_t cb) {
+ if (err || cb != bl.length()) {
+ fault(ASIO_ERROR_WRITE, err);
+ return;
+ }
+
+ ceph_assert(cb == bl.length());
+
+ {
+ std::lock_guard locker{m_lock};
+ if (m_outcoming_bl.length() == 0) {
+ m_writing.store(false);
+ return;
+ }
+ }
+
+ // still have left bytes, continue to send.
+ send_message();
+ });
+ try_receive();
+ }
+
+ void CacheClient::try_receive() {
+ ldout(m_cct, 20) << dendl;
+ if (!m_reading.load()) {
+ m_reading.store(true);
+ receive_message();
+ }
+ }
+
+ void CacheClient::receive_message() {
+ ldout(m_cct, 20) << dendl;
+ ceph_assert(m_reading.load());
+ read_reply_header();
+ }
+
+ void CacheClient::read_reply_header() {
+ ldout(m_cct, 20) << dendl;
+ /* create new head buffer for every reply */
+ bufferptr bp_head(buffer::create(get_header_size()));
+ auto raw_ptr = bp_head.c_str();
+
+ boost::asio::async_read(m_dm_socket,
+ boost::asio::buffer(raw_ptr, get_header_size()),
+ boost::asio::transfer_exactly(get_header_size()),
+ boost::bind(&CacheClient::handle_reply_header,
+ this, bp_head,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ }
+
+ void CacheClient::handle_reply_header(bufferptr bp_head,
+ const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
+ if (ec || bytes_transferred != get_header_size()) {
+ fault(ASIO_ERROR_READ, ec);
+ return;
+ }
+
+ ceph_assert(bytes_transferred == bp_head.length());
+
+ uint32_t data_len = get_data_len(bp_head.c_str());
+
+ bufferptr bp_data(buffer::create(data_len));
+ read_reply_data(std::move(bp_head), std::move(bp_data), data_len);
+ }
+
+ void CacheClient::read_reply_data(bufferptr&& bp_head,
+ bufferptr&& bp_data,
+ const uint64_t data_len) {
+ ldout(m_cct, 20) << dendl;
+ auto raw_ptr = bp_data.c_str();
+ boost::asio::async_read(m_dm_socket, boost::asio::buffer(raw_ptr, data_len),
+ boost::asio::transfer_exactly(data_len),
+ boost::bind(&CacheClient::handle_reply_data,
+ this, std::move(bp_head), std::move(bp_data), data_len,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+ }
+
+ void CacheClient::handle_reply_data(bufferptr bp_head,
+ bufferptr bp_data,
+ const uint64_t data_len,
+ const boost::system::error_code& ec,
+ size_t bytes_transferred) {
+ ldout(m_cct, 20) << dendl;
+ if (ec || bytes_transferred != data_len) {
+ fault(ASIO_ERROR_WRITE, ec);
+ return;
+ }
+ ceph_assert(bp_data.length() == data_len);
+
+ bufferlist data_buffer;
+ data_buffer.append(std::move(bp_head));
+ data_buffer.append(std::move(bp_data));
+
+ ObjectCacheRequest* reply = decode_object_cache_request(data_buffer);
+ data_buffer.clear();
+ ceph_assert(data_buffer.length() == 0);
+
+ process(reply, reply->seq);
+
+ {
+ std::lock_guard locker{m_lock};
+ if (m_seq_to_req.size() == 0 && m_outcoming_bl.length()) {
+ m_reading.store(false);
+ return;
+ }
+ }
+ if (is_session_work()) {
+ receive_message();
+ }
+ }
+
+ void CacheClient::process(ObjectCacheRequest* reply, uint64_t seq_id) {
+ ldout(m_cct, 20) << dendl;
+ ObjectCacheRequest* current_request = nullptr;
+ {
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_seq_to_req.find(seq_id) != m_seq_to_req.end());
+ current_request = m_seq_to_req[seq_id];
+ m_seq_to_req.erase(seq_id);
+ }
+
+ ceph_assert(current_request != nullptr);
+ auto process_reply = new LambdaContext([current_request, reply]
+ (bool dedicated) {
+ if (dedicated) {
+ // dedicated thrad to execute this context.
+ }
+ current_request->process_msg.release()->complete(reply);
+ delete current_request;
+ delete reply;
+ });
+
+ if (m_worker_thread_num != 0) {
+ m_worker->post([process_reply]() {
+ process_reply->complete(true);
+ });
+ } else {
+ process_reply->complete(false);
+ }
+ }
+
+ // if there is one request fails, just execute fault, then shutdown RO.
+ void CacheClient::fault(const int err_type,
+ const boost::system::error_code& ec) {
+ ldout(m_cct, 20) << "fault." << ec.message() << dendl;
+
+ if (err_type == ASIO_ERROR_CONNECT) {
+ ceph_assert(!m_session_work.load());
+ if (ec == boost::asio::error::connection_refused) {
+ ldout(m_cct, 20) << "Connecting RO daenmon fails : "<< ec.message()
+ << ". Immutable-object-cache daemon is down ? "
+ << "Data will be read from ceph cluster " << dendl;
+ } else {
+ ldout(m_cct, 20) << "Connecting RO daemon fails : "
+ << ec.message() << dendl;
+ }
+
+ if (m_dm_socket.is_open()) {
+ // Set to indicate what error occurred, if any.
+ // Note that, even if the function indicates an error,
+ // the underlying descriptor is closed.
+ boost::system::error_code close_ec;
+ m_dm_socket.close(close_ec);
+ if (close_ec) {
+ ldout(m_cct, 20) << "close: " << close_ec.message() << dendl;
+ }
+ }
+ return;
+ }
+
+ if (!m_session_work.load()) {
+ return;
+ }
+
+ /* when current session don't work, ASIO will don't receive any new request from hook.
+ * On the other hand, for pending request of ASIO, cancle these request,
+ * then call their callback. these request which are cancled by this method,
+ * will be re-dispatched to RADOS layer.
+ * make sure just have one thread to modify execute below code. */
+ m_session_work.store(false);
+
+ if (err_type == ASIO_ERROR_MSG_INCOMPLETE) {
+ ldout(m_cct, 20) << "ASIO In-complete message." << ec.message() << dendl;
+ ceph_assert(0);
+ }
+
+ if (err_type == ASIO_ERROR_READ) {
+ ldout(m_cct, 20) << "ASIO async read fails : " << ec.message() << dendl;
+ }
+
+ if (err_type == ASIO_ERROR_WRITE) {
+ ldout(m_cct, 20) << "ASIO asyn write fails : " << ec.message() << dendl;
+ // CacheClient should not occur this error.
+ ceph_assert(0);
+ }
+
+ // currently, for any asio error, just shutdown RO.
+ close();
+
+ /* all pending request, which have entered into ASIO,
+ * will be re-dispatched to RADOS.*/
+ {
+ std::lock_guard locker{m_lock};
+ for (auto it : m_seq_to_req) {
+ it.second->type = RBDSC_READ_RADOS;
+ it.second->process_msg->complete(it.second);
+ }
+ m_seq_to_req.clear();
+ }
+
+ ldout(m_cct, 20) << "Because ASIO domain socket fails, just shutdown RO.\
+ Later all reading will be re-dispatched RADOS layer"
+ << ec.message() << dendl;
+ }
+
+ // TODO : re-implement this method
+ int CacheClient::register_client(Context* on_finish) {
+ ObjectCacheRequest* reg_req = new ObjectCacheRegData(RBDSC_REGISTER,
+ m_sequence_id++,
+ ceph_version_to_str());
+ reg_req->encode();
+
+ bufferlist bl;
+ bl.append(reg_req->get_payload_bufferlist());
+
+ uint64_t ret;
+ boost::system::error_code ec;
+
+ ret = boost::asio::write(m_dm_socket,
+ boost::asio::buffer(bl.c_str(), bl.length()), ec);
+
+ if (ec || ret != bl.length()) {
+ fault(ASIO_ERROR_WRITE, ec);
+ return -1;
+ }
+ delete reg_req;
+
+ ret = boost::asio::read(m_dm_socket,
+ boost::asio::buffer(m_bp_header.c_str(), get_header_size()), ec);
+ if (ec || ret != get_header_size()) {
+ fault(ASIO_ERROR_READ, ec);
+ return -1;
+ }
+
+ uint64_t data_len = get_data_len(m_bp_header.c_str());
+ bufferptr bp_data(buffer::create(data_len));
+
+ ret = boost::asio::read(m_dm_socket, boost::asio::buffer(bp_data.c_str(),
+ data_len), ec);
+ if (ec || ret != data_len) {
+ fault(ASIO_ERROR_READ, ec);
+ return -1;
+ }
+
+ bufferlist data_buffer;
+ data_buffer.append(m_bp_header);
+ data_buffer.append(std::move(bp_data));
+ ObjectCacheRequest* req = decode_object_cache_request(data_buffer);
+ if (req->type == RBDSC_REGISTER_REPLY) {
+ m_session_work.store(true);
+ on_finish->complete(0);
+ } else {
+ on_finish->complete(-1);
+ }
+
+ delete req;
+ return 0;
+ }
+
+} // namespace immutable_obj_cache
+} // namespace ceph