summaryrefslogtreecommitdiffstats
path: root/src/librbd/AsyncObjectThrottle.cc
blob: e0fcefff18a03f81be5000b421906bc130ac1838 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#include "librbd/AsyncObjectThrottle.h"
#include "librbd/AsyncRequest.h"
#include "librbd/ImageCtx.h"
#include "librbd/Utils.h"
#include "librbd/asio/ContextWQ.h"

namespace librbd
{

template <typename T>
AsyncObjectThrottle<T>::AsyncObjectThrottle(
    const AsyncRequest<T>* async_request, T &image_ctx,
    const ContextFactory& context_factory, Context *ctx,
    ProgressContext *prog_ctx, uint64_t object_no, uint64_t end_object_no)
  : m_lock(ceph::make_mutex(
      util::unique_lock_name("librbd::AsyncThrottle::m_lock", this))),
    m_async_request(async_request), m_image_ctx(image_ctx),
    m_context_factory(context_factory), m_ctx(ctx), m_prog_ctx(prog_ctx),
    m_object_no(object_no), m_end_object_no(end_object_no), m_current_ops(0),
    m_ret(0)
{
}

template <typename T>
void AsyncObjectThrottle<T>::start_ops(uint64_t max_concurrent) {
  ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
  bool complete;
  {
    std::lock_guard l{m_lock};
    for (uint64_t i = 0; i < max_concurrent; ++i) {
      start_next_op();
      if (m_ret < 0 && m_current_ops == 0) {
	break;
      }
    }
    complete = (m_current_ops == 0);
  }
  if (complete) {
    // avoid re-entrant callback
    m_image_ctx.op_work_queue->queue(m_ctx, m_ret);
    delete this;
  }
}

template <typename T>
void AsyncObjectThrottle<T>::finish_op(int r) {
  bool complete;
  {
    std::shared_lock owner_locker{m_image_ctx.owner_lock};
    std::lock_guard locker{m_lock};
    --m_current_ops;
    if (r < 0 && r != -ENOENT && m_ret == 0) {
      m_ret = r;
    }

    start_next_op();
    complete = (m_current_ops == 0);
  }
  if (complete) {
    m_ctx->complete(m_ret);
    delete this;
  }
}

template <typename T>
void AsyncObjectThrottle<T>::start_next_op() {
  bool done = false;
  while (!done) {
    if (m_async_request != NULL && m_async_request->is_canceled() &&
        m_ret == 0) {
      // allow in-flight ops to complete, but don't start new ops
      m_ret = -ERESTART;
      return;
    } else if (m_ret != 0 || m_object_no >= m_end_object_no) {
      return;
    }

    uint64_t ono = m_object_no++;
    C_AsyncObjectThrottle<T> *ctx = m_context_factory(*this, ono);

    int r = ctx->send();
    if (r < 0) {
      m_ret = r;
      delete ctx;
      return;
    } else if (r > 0) {
      // op completed immediately
      delete ctx;
    } else {
      ++m_current_ops;
      done = true;
    }
    if (m_prog_ctx != NULL) {
      r = m_prog_ctx->update_progress(ono, m_end_object_no);
      if (r < 0) {
        m_ret = r;
      }
    }
  }
}

} // namespace librbd

#ifndef TEST_F
template class librbd::AsyncObjectThrottle<librbd::ImageCtx>;
#endif