summaryrefslogtreecommitdiffstats
path: root/src/client/RWRef.h
blob: 9035a0937f513c99cfcbfd7c64fe18236ae7e59a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2020 Red Hat, Inc.
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 * ============
 *
 * This is a common read/write reference framework, which will work
 * simliarly to a RW lock, the difference here is that for the "readers"
 * they won't hold any lock but will increase a reference instead when
 * the "require" state is matched, or set a flag to tell the callers
 * that the "require" state is not matched and also there is no any
 * wait mechanism for "readers" to wait the state until it matches. It
 * will let the callers determine what to do next.
 *
 * The usage, such as in libcephfs's client/Client.cc case:
 *
 * The Readers:
 *
 *   For the ll_read()/ll_write(), etc fucntions, they will work as
 *   "readers", in the beginning they just need to define a RWRef
 *   object and in RWRef constructor it will check if the state is
 *   MOUNTED or MOUTING, if not it will fail and return directly with
 *   doing nothing, or it will increase the reference and continue.
 *   And when destructing the RWRef object, in the RWRef destructor
 *   it will decrease the reference and notify the "writers" who maybe
 *   waiting.
 *
 * The Writers:
 *
 *   And for the _unmount() function , as a "writer", in the beginning
 *   it will also just need to define a RWRef object and in RWRef
 *   constructor it will update the state to next stage first, which then
 *   will fail all the new comming "readers", and then wait for all the
 *   "readers" to finish.
 *
 * With this we can get rid of the locks for all the "readers" and they
 * can run in parallel. And we won't have any potential deadlock issue
 * with RWRef, such as:
 *
 * With RWLock:
 *
 *     ThreadA:                           ThreadB:
 *
 *     write_lock<RWLock1>.lock();        another_lock.lock();
 *     state = NEXT_STATE;                ...
 *     another_lock.lock();               read_lock<RWLock1>.lock();
 *     ...                                if (state == STATE) {
 *                                          ...
 *                                        }
 *                                        ...
 *
 * With RWRef:
 *
 *     ThreadA:                           ThreadB:
 *
 *     w = RWRef(myS, NEXT_STATE, false); another_lock.lock();
 *     another_lock.lock();               r = RWRef(myS, STATE);
 *     ...                                if (r.is_state_satisfied()) {
 *                                          ...
 *                                        }
 *                                        ...
 *
 * And also in ThreadA, if it needs to do the cond.wait(&another_lock),
 * it will goto sleep by holding the write_lock<RWLock1> for the RWLock
 * case, if the ThreadBs are for some IOs, they may stuck for a very long
 * time that may get timedout in the uplayer which may keep retrying.
 * With the RWRef, the ThreadB will fail or continue directly without any
 * stuck, and the uplayer will knew what next to do quickly.
 */

#ifndef CEPH_RWRef_Posix__H
#define CEPH_RWRef_Posix__H

#include <string>
#include "include/ceph_assert.h"
#include "common/ceph_mutex.h"

/* The status mechanism info */
template<typename T>
struct RWRefState {
  public:
    template <typename T1> friend class RWRef;

    /*
     * This will be status mechanism. Currently you need to define
     * it by yourself.
     */
    T state;

    /*
     * User defined method to check whether the "require" state
     * is in the proper range we need.
     *
     * For example for the client/Client.cc:
     * In some reader operation cases we need to make sure the
     * client state is in mounting or mounted states, then it
     * will set the "require = mounting" in class RWRef's constructor.
     * Then the check_reader_state() should return truth if the
     * state is already in mouting or mounted state.
     */
    virtual int check_reader_state(T require) const = 0;

    /*
     * User defined method to check whether the "require" state
     * is in the proper range we need.
     *
     * This will usually be the state migration check.
     */
    virtual int check_writer_state(T require) const = 0;

    /*
     * User defined method to check whether the "require"
     * state is valid or not.
     */
    virtual bool is_valid_state(T require) const = 0;

    int64_t get_state() const {
      std::scoped_lock l{lock};
      return state;
    }

    bool check_current_state(T require) const {
      ceph_assert(is_valid_state(require));

      std::scoped_lock l{lock};
      return state == require;
    }

    RWRefState(T init_state, const char *lockname, uint64_t _reader_cnt=0)
      : state(init_state), lock(ceph::make_mutex(lockname)), reader_cnt(_reader_cnt) {}
    virtual ~RWRefState() {}

  private:
    mutable ceph::mutex lock;
    ceph::condition_variable cond;
    uint64_t reader_cnt = 0;
};

template<typename T>
class RWRef {
public:
  RWRef(const RWRef& other) = delete;
  const RWRef& operator=(const RWRef& other) = delete;

  RWRef(RWRefState<T> &s, T require, bool ir=true)
    :S(s), is_reader(ir) {
    ceph_assert(S.is_valid_state(require));

    std::scoped_lock l{S.lock};
    if (likely(is_reader)) { // Readers will update the reader_cnt
      if (S.check_reader_state(require)) {
        S.reader_cnt++;
        satisfied = true;
      }
    } else { // Writers will update the state
      is_reader = false;

      /*
       * If the current state is not the same as "require"
       * then update the state and we are the first writer.
       *
       * Or if there already has one writer running or
       * finished, it will let user to choose to continue
       * or just break.
       */
      if (S.check_writer_state(require)) {
        first_writer = true;
        S.state = require;
      }
      satisfied = true;
    }
  }

  /*
   * Whether the "require" state is in the proper range of
   * the states.
   */
  bool is_state_satisfied() const {
    return satisfied;
  }

  /*
   * Update the state, and only the writer could do the update.
   */
  void update_state(T new_state) {
    ceph_assert(!is_reader);
    ceph_assert(S.is_valid_state(new_state));

    std::scoped_lock l{S.lock};
    S.state = new_state;
  }

  /*
   * For current state whether we are the first writer or not
   */
  bool is_first_writer() const {
    return first_writer;
  }

  /*
   * Will wait for all the in-flight "readers" to finish
   */
  void wait_readers_done() {
    // Only writers can wait
    ceph_assert(!is_reader);

    std::unique_lock l{S.lock};

    S.cond.wait(l, [this] {
      return !S.reader_cnt;
    });
  }

  ~RWRef() {
    std::scoped_lock l{S.lock};
    if (!is_reader)
      return;

    if (!satisfied)
      return;

    /*
     * Decrease the refcnt and notify the waiters
     */
    if (--S.reader_cnt == 0)
      S.cond.notify_all();
  }

private:
  RWRefState<T> &S;
  bool satisfied = false;
  bool first_writer = false;
  bool is_reader = true;
};

#endif // !CEPH_RWRef_Posix__H