summaryrefslogtreecommitdiffstats
path: root/src/librbd/MirroringWatcher.cc
blob: c0cda5fa16a9f9d144db83d2049c435304ffe18f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab

#include "librbd/MirroringWatcher.h"
#include "include/rbd_types.h"
#include "include/rados/librados.hpp"
#include "common/errno.h"
#include "common/Cond.h"
#include "librbd/Utils.h"
#include "librbd/watcher/Utils.h"

#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "librbd::MirroringWatcher: "

namespace librbd {

using namespace mirroring_watcher;
using namespace watcher;

using librbd::util::create_rados_callback;

namespace {

static const uint64_t NOTIFY_TIMEOUT_MS = 5000;

} // anonymous namespace

template <typename I>
MirroringWatcher<I>::MirroringWatcher(librados::IoCtx &io_ctx,
                                      asio::ContextWQ *work_queue)
  : Watcher(io_ctx, work_queue, RBD_MIRRORING) {
}

template <typename I>
int MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
                                              cls::rbd::MirrorMode mirror_mode) {
  C_SaferCond ctx;
  notify_mode_updated(io_ctx, mirror_mode, &ctx);
  return ctx.wait();
}

template <typename I>
void MirroringWatcher<I>::notify_mode_updated(librados::IoCtx &io_ctx,
                                              cls::rbd::MirrorMode mirror_mode,
                                              Context *on_finish) {
  CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
  ldout(cct, 20) << dendl;

  bufferlist bl;
  encode(NotifyMessage{ModeUpdatedPayload{mirror_mode}}, bl);

  librados::AioCompletion *comp = create_rados_callback(on_finish);
  int r = io_ctx.aio_notify(RBD_MIRRORING, comp, bl, NOTIFY_TIMEOUT_MS,
                            nullptr);
  ceph_assert(r == 0);
  comp->release();
}

template <typename I>
int MirroringWatcher<I>::notify_image_updated(
    librados::IoCtx &io_ctx, cls::rbd::MirrorImageState mirror_image_state,
    const std::string &image_id, const std::string &global_image_id) {
  C_SaferCond ctx;
  notify_image_updated(io_ctx, mirror_image_state, image_id, global_image_id,
                       &ctx);
  return ctx.wait();
}

template <typename I>
void MirroringWatcher<I>::notify_image_updated(
    librados::IoCtx &io_ctx, cls::rbd::MirrorImageState mirror_image_state,
    const std::string &image_id, const std::string &global_image_id,
    Context *on_finish) {

  CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
  ldout(cct, 20) << dendl;

  bufferlist bl;
  encode(NotifyMessage{ImageUpdatedPayload{
      mirror_image_state, image_id, global_image_id}}, bl);

  librados::AioCompletion *comp = create_rados_callback(on_finish);
  int r = io_ctx.aio_notify(RBD_MIRRORING, comp, bl, NOTIFY_TIMEOUT_MS,
                            nullptr);
  ceph_assert(r == 0);
  comp->release();

}

template <typename I>
void MirroringWatcher<I>::handle_notify(uint64_t notify_id, uint64_t handle,
                                        uint64_t notifier_id, bufferlist &bl) {
  CephContext *cct = this->m_cct;
  ldout(cct, 15) << ": notify_id=" << notify_id << ", "
                 << "handle=" << handle << dendl;


  NotifyMessage notify_message;
  try {
    auto iter = bl.cbegin();
    decode(notify_message, iter);
  } catch (const buffer::error &err) {
    lderr(cct) << ": error decoding image notification: " << err.what()
               << dendl;
    Context *ctx = new C_NotifyAck(this, notify_id, handle);
    ctx->complete(0);
    return;
  }

  apply_visitor(watcher::util::HandlePayloadVisitor<MirroringWatcher<I>>(
                  this, notify_id, handle), notify_message.payload);
}

template <typename I>
bool MirroringWatcher<I>::handle_payload(const ModeUpdatedPayload &payload,
                                         Context *on_notify_ack) {
  CephContext *cct = this->m_cct;
  ldout(cct, 20) << ": mode updated: " << payload.mirror_mode << dendl;
  handle_mode_updated(payload.mirror_mode);
  return true;
}

template <typename I>
bool MirroringWatcher<I>::handle_payload(const ImageUpdatedPayload &payload,
                                         Context *on_notify_ack) {
  CephContext *cct = this->m_cct;
  ldout(cct, 20) << ": image state updated" << dendl;
  handle_image_updated(payload.mirror_image_state, payload.image_id,
                       payload.global_image_id);
  return true;
}

template <typename I>
bool MirroringWatcher<I>::handle_payload(const UnknownPayload &payload,
                                         Context *on_notify_ack) {
  return true;
}

} // namespace librbd

template class librbd::MirroringWatcher<librbd::ImageCtx>;