diff options
Diffstat (limited to 'src/librbd/watcher')
-rw-r--r-- | src/librbd/watcher/Notifier.cc | 99 | ||||
-rw-r--r-- | src/librbd/watcher/Notifier.h | 64 | ||||
-rw-r--r-- | src/librbd/watcher/RewatchRequest.cc | 108 | ||||
-rw-r--r-- | src/librbd/watcher/RewatchRequest.h | 75 | ||||
-rw-r--r-- | src/librbd/watcher/Types.cc | 45 | ||||
-rw-r--r-- | src/librbd/watcher/Types.h | 71 | ||||
-rw-r--r-- | src/librbd/watcher/Utils.h | 74 |
7 files changed, 536 insertions, 0 deletions
diff --git a/src/librbd/watcher/Notifier.cc b/src/librbd/watcher/Notifier.cc new file mode 100644 index 000000000..9a4134402 --- /dev/null +++ b/src/librbd/watcher/Notifier.cc @@ -0,0 +1,99 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/watcher/Notifier.h" +#include "librbd/ImageCtx.h" +#include "librbd/Utils.h" +#include "librbd/asio/ContextWQ.h" +#include "librbd/watcher/Types.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::watcher::Notifier: " \ + << this << " " << __func__ << ": " + +namespace librbd { +namespace watcher { + +const uint64_t Notifier::NOTIFY_TIMEOUT = 5000; + +Notifier::C_AioNotify::C_AioNotify(Notifier *notifier, NotifyResponse *response, + Context *on_finish) + : notifier(notifier), response(response), on_finish(on_finish) { +} + +void Notifier::C_AioNotify::finish(int r) { + if (response != nullptr) { + if (r == 0 || r == -ETIMEDOUT) { + try { + auto it = out_bl.cbegin(); + decode(*response, it); + } catch (const buffer::error &err) { + r = -EBADMSG; + } + } + } + notifier->handle_notify(r, on_finish); +} + +Notifier::Notifier(asio::ContextWQ *work_queue, IoCtx &ioctx, + const std::string &oid) + : m_work_queue(work_queue), m_ioctx(ioctx), m_oid(oid), + m_aio_notify_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::object_watcher::Notifier::m_aio_notify_lock", this))) { + m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); +} + +Notifier::~Notifier() { + std::lock_guard aio_notify_locker{m_aio_notify_lock}; + ceph_assert(m_pending_aio_notifies == 0); +} + +void Notifier::flush(Context *on_finish) { + std::lock_guard aio_notify_locker{m_aio_notify_lock}; + if (m_pending_aio_notifies == 0) { + m_work_queue->queue(on_finish, 0); + return; + } + + m_aio_notify_flush_ctxs.push_back(on_finish); +} + +void Notifier::notify(bufferlist &bl, NotifyResponse *response, + Context *on_finish) { + { + std::lock_guard aio_notify_locker{m_aio_notify_lock}; + ++m_pending_aio_notifies; + + ldout(m_cct, 20) << "pending=" << m_pending_aio_notifies << dendl; + } + + C_AioNotify *ctx = new C_AioNotify(this, response, on_finish); + librados::AioCompletion *comp = util::create_rados_callback(ctx); + int r = m_ioctx.aio_notify(m_oid, comp, bl, NOTIFY_TIMEOUT, &ctx->out_bl); + ceph_assert(r == 0); + comp->release(); +} + +void Notifier::handle_notify(int r, Context *on_finish) { + ldout(m_cct, 20) << "r=" << r << dendl; + + std::lock_guard aio_notify_locker{m_aio_notify_lock}; + ceph_assert(m_pending_aio_notifies > 0); + --m_pending_aio_notifies; + + ldout(m_cct, 20) << "pending=" << m_pending_aio_notifies << dendl; + if (m_pending_aio_notifies == 0) { + for (auto ctx : m_aio_notify_flush_ctxs) { + m_work_queue->queue(ctx, 0); + } + m_aio_notify_flush_ctxs.clear(); + } + + if (on_finish != nullptr) { + m_work_queue->queue(on_finish, r); + } +} + +} // namespace watcher +} // namespace librbd diff --git a/src/librbd/watcher/Notifier.h b/src/librbd/watcher/Notifier.h new file mode 100644 index 000000000..79546b505 --- /dev/null +++ b/src/librbd/watcher/Notifier.h @@ -0,0 +1,64 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_WATCHER_NOTIFIER_H +#define CEPH_LIBRBD_WATCHER_NOTIFIER_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/Context.h" +#include "include/rados/librados.hpp" +#include "common/ceph_mutex.h" +#include <list> + +namespace librbd { + +namespace asio { struct ContextWQ; } + +namespace watcher { + +struct NotifyResponse; + +class Notifier { +public: + static const uint64_t NOTIFY_TIMEOUT; + + Notifier(asio::ContextWQ *work_queue, librados::IoCtx &ioctx, + const std::string &oid); + ~Notifier(); + + void flush(Context *on_finish); + void notify(bufferlist &bl, NotifyResponse *response, Context *on_finish); + +private: + typedef std::list<Context*> Contexts; + + struct C_AioNotify : public Context { + Notifier *notifier; + NotifyResponse *response; + Context *on_finish; + bufferlist out_bl; + + C_AioNotify(Notifier *notifier, NotifyResponse *response, + Context *on_finish); + + void finish(int r) override; + }; + + asio::ContextWQ *m_work_queue; + librados::IoCtx &m_ioctx; + CephContext *m_cct; + std::string m_oid; + + ceph::mutex m_aio_notify_lock; + size_t m_pending_aio_notifies = 0; + Contexts m_aio_notify_flush_ctxs; + + void handle_notify(int r, Context *on_finish); + +}; + +} // namespace watcher +} // namespace librbd + +#endif // CEPH_LIBRBD_WATCHER_NOTIFIER_H diff --git a/src/librbd/watcher/RewatchRequest.cc b/src/librbd/watcher/RewatchRequest.cc new file mode 100644 index 000000000..b890cb3c5 --- /dev/null +++ b/src/librbd/watcher/RewatchRequest.cc @@ -0,0 +1,108 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/watcher/RewatchRequest.h" +#include "common/ceph_mutex.h" +#include "common/errno.h" +#include "librbd/Utils.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::watcher::RewatchRequest: " \ + << this << " " << __func__ << " " + +namespace librbd { + +using util::create_context_callback; +using util::create_rados_callback; + +namespace watcher { + +using std::string; + +RewatchRequest::RewatchRequest(librados::IoCtx& ioctx, const string& oid, + ceph::shared_mutex &watch_lock, + librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish) + : m_ioctx(ioctx), m_oid(oid), m_watch_lock(watch_lock), + m_watch_ctx(watch_ctx), m_watch_handle(watch_handle), + m_on_finish(on_finish) { +} + +void RewatchRequest::send() { + unwatch(); +} + +void RewatchRequest::unwatch() { + ceph_assert(ceph_mutex_is_wlocked(m_watch_lock)); + if (*m_watch_handle == 0) { + rewatch(); + return; + } + + CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); + ldout(cct, 10) << dendl; + + uint64_t watch_handle = 0; + std::swap(*m_watch_handle, watch_handle); + + librados::AioCompletion *aio_comp = create_rados_callback< + RewatchRequest, &RewatchRequest::handle_unwatch>(this); + int r = m_ioctx.aio_unwatch(watch_handle, aio_comp); + ceph_assert(r == 0); + aio_comp->release(); +} + +void RewatchRequest::handle_unwatch(int r) { + CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); + ldout(cct, 10) << "r=" << r << dendl; + + if (r == -EBLOCKLISTED) { + lderr(cct) << "client blocklisted" << dendl; + finish(r); + return; + } else if (r < 0) { + lderr(cct) << "failed to unwatch: " << cpp_strerror(r) << dendl; + } + rewatch(); +} + +void RewatchRequest::rewatch() { + CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); + ldout(cct, 10) << dendl; + + librados::AioCompletion *aio_comp = create_rados_callback< + RewatchRequest, &RewatchRequest::handle_rewatch>(this); + int r = m_ioctx.aio_watch(m_oid, aio_comp, &m_rewatch_handle, m_watch_ctx); + ceph_assert(r == 0); + aio_comp->release(); +} + +void RewatchRequest::handle_rewatch(int r) { + CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); + ldout(cct, 10) << "r=" << r << dendl; + if (r < 0) { + lderr(cct) << "failed to watch object: " << cpp_strerror(r) + << dendl; + m_rewatch_handle = 0; + } + + { + std::unique_lock watch_locker{m_watch_lock}; + *m_watch_handle = m_rewatch_handle; + } + + finish(r); +} + +void RewatchRequest::finish(int r) { + CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct()); + ldout(cct, 10) << "r=" << r << dendl; + + m_on_finish->complete(r); + delete this; +} + +} // namespace watcher +} // namespace librbd + diff --git a/src/librbd/watcher/RewatchRequest.h b/src/librbd/watcher/RewatchRequest.h new file mode 100644 index 000000000..ce5e31539 --- /dev/null +++ b/src/librbd/watcher/RewatchRequest.h @@ -0,0 +1,75 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H +#define CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H + +#include "common/ceph_mutex.h" +#include "include/int_types.h" +#include "include/rados/librados.hpp" + +struct Context; + +namespace librbd { + +namespace watcher { + +class RewatchRequest { +public: + + static RewatchRequest *create(librados::IoCtx& ioctx, const std::string& oid, + ceph::shared_mutex &watch_lock, + librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish) { + return new RewatchRequest(ioctx, oid, watch_lock, watch_ctx, watch_handle, + on_finish); + } + + RewatchRequest(librados::IoCtx& ioctx, const std::string& oid, + ceph::shared_mutex &watch_lock, librados::WatchCtx2 *watch_ctx, + uint64_t *watch_handle, Context *on_finish); + + void send(); + +private: + /** + * @verbatim + * + * <start> + * | + * v + * UNWATCH + * | + * | . . . . + * | . . (recoverable error) + * v v . + * REWATCH . . . + * | + * v + * <finish> + * + * @endverbatim + */ + + librados::IoCtx& m_ioctx; + std::string m_oid; + ceph::shared_mutex &m_watch_lock; + librados::WatchCtx2 *m_watch_ctx; + uint64_t *m_watch_handle; + Context *m_on_finish; + + uint64_t m_rewatch_handle = 0; + + void unwatch(); + void handle_unwatch(int r); + + void rewatch(); + void handle_rewatch(int r); + + void finish(int r); +}; + +} // namespace watcher +} // namespace librbd + +#endif // CEPH_LIBRBD_WATCHER_REWATCH_REQUEST_H diff --git a/src/librbd/watcher/Types.cc b/src/librbd/watcher/Types.cc new file mode 100644 index 000000000..8f1991d7b --- /dev/null +++ b/src/librbd/watcher/Types.cc @@ -0,0 +1,45 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/watcher/Types.h" +#include "common/Formatter.h" + +namespace librbd { +namespace watcher { + +void ClientId::encode(bufferlist &bl) const { + using ceph::encode; + encode(gid, bl); + encode(handle, bl); +} + +void ClientId::decode(bufferlist::const_iterator &iter) { + using ceph::decode; + decode(gid, iter); + decode(handle, iter); +} + +void ClientId::dump(Formatter *f) const { + f->dump_unsigned("gid", gid); + f->dump_unsigned("handle", handle); +} + +void NotifyResponse::encode(bufferlist& bl) const { + using ceph::encode; + encode(acks, bl); + encode(timeouts, bl); +} + +void NotifyResponse::decode(bufferlist::const_iterator& iter) { + using ceph::decode; + decode(acks, iter); + decode(timeouts, iter); +} +std::ostream &operator<<(std::ostream &out, + const ClientId &client_id) { + out << "[" << client_id.gid << "," << client_id.handle << "]"; + return out; +} + +} // namespace watcher +} // namespace librbd diff --git a/src/librbd/watcher/Types.h b/src/librbd/watcher/Types.h new file mode 100644 index 000000000..d1517fb0f --- /dev/null +++ b/src/librbd/watcher/Types.h @@ -0,0 +1,71 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_WATCHER_TYPES_H +#define CEPH_LIBRBD_WATCHER_TYPES_H + +#include "include/int_types.h" +#include "include/buffer_fwd.h" +#include "include/encoding.h" + +namespace ceph { class Formatter; } + +namespace librbd { + +class Watcher; + +namespace watcher { + +struct ClientId { + uint64_t gid; + uint64_t handle; + + ClientId() : gid(0), handle(0) {} + ClientId(uint64_t gid, uint64_t handle) : gid(gid), handle(handle) {} + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& it); + void dump(Formatter *f) const; + + inline bool is_valid() const { + return (*this != ClientId()); + } + + inline bool operator==(const ClientId &rhs) const { + return (gid == rhs.gid && handle == rhs.handle); + } + inline bool operator!=(const ClientId &rhs) const { + return !(*this == rhs); + } + inline bool operator<(const ClientId &rhs) const { + if (gid != rhs.gid) { + return gid < rhs.gid; + } else { + return handle < rhs.handle; + } + } +}; + +struct NotifyResponse { + std::map<ClientId, bufferlist> acks; + std::vector<ClientId> timeouts; + + void encode(bufferlist& bl) const; + void decode(bufferlist::const_iterator& it); +}; + +template <typename ImageCtxT> +struct Traits { + typedef librbd::Watcher Watcher; +}; + +std::ostream &operator<<(std::ostream &out, + const ClientId &client); + +WRITE_CLASS_ENCODER(ClientId); +WRITE_CLASS_ENCODER(NotifyResponse); + +} // namespace watcher +} // namespace librbd + +#endif // CEPH_LIBRBD_WATCHER_TYPES_H diff --git a/src/librbd/watcher/Utils.h b/src/librbd/watcher/Utils.h new file mode 100644 index 000000000..d2510aaf3 --- /dev/null +++ b/src/librbd/watcher/Utils.h @@ -0,0 +1,74 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_WATCHER_UTILS_H +#define CEPH_LIBRBD_WATCHER_UTILS_H + +#include "include/buffer_fwd.h" +#include "include/encoding.h" +#include "include/Context.h" +#include "librbd/Watcher.h" + +namespace ceph { class Formatter; } + +namespace librbd { +namespace watcher { +namespace util { + +template <typename Watcher> +struct HandlePayloadVisitor : public boost::static_visitor<void> { + Watcher *watcher; + uint64_t notify_id; + uint64_t handle; + + HandlePayloadVisitor(Watcher *watcher_, uint64_t notify_id_, + uint64_t handle_) + : watcher(watcher_), notify_id(notify_id_), handle(handle_) + { + } + + template <typename P> + inline void operator()(const P &payload) const { + typename Watcher::C_NotifyAck *ctx = + new typename Watcher::C_NotifyAck(watcher, notify_id, handle); + if (watcher->handle_payload(payload, ctx)) { + ctx->complete(0); + } + } +}; + +class EncodePayloadVisitor : public boost::static_visitor<void> { +public: + explicit EncodePayloadVisitor(bufferlist &bl) : m_bl(bl) {} + + template <typename P> + inline void operator()(const P &payload) const { + using ceph::encode; + encode(static_cast<uint32_t>(P::NOTIFY_OP), m_bl); + payload.encode(m_bl); + } + +private: + bufferlist &m_bl; +}; + +class DecodePayloadVisitor : public boost::static_visitor<void> { +public: + DecodePayloadVisitor(__u8 version, bufferlist::const_iterator &iter) + : m_version(version), m_iter(iter) {} + + template <typename P> + inline void operator()(P &payload) const { + payload.decode(m_version, m_iter); + } + +private: + __u8 m_version; + bufferlist::const_iterator &m_iter; +}; + +} // namespace util +} // namespace watcher +} // namespace librbd + +#endif // CEPH_LIBRBD_WATCHER_UTILS_H |