summaryrefslogtreecommitdiffstats
path: root/src/common/Continuation.h
blob: 1c61e7c4eba3c8be9e5d62748d00c5b64279b075 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
 * Ceph - scalable distributed file system
 *
 * Copyright (C) 2014 Red Hat
 *
 * This is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License version 2.1, as published by the Free Software
 * Foundation.  See file COPYING.
 *
 */

#include "include/Context.h"

/**
 * The Continuation interface is designed to help easily create multi-step
 * operations that share data without having to pass it around or create
 * custom Context classes for each step. To write a Continuation:
 * 1) create a child class with a function for each stage.
 * 2) Put all your shared data members into the class.
 * 3) In the constructor, register each function stage with set_callback().
 * 4) Whenever you need to provide a Context callback that activates the next
 * stage, call get_callback(stage_number). If you need to proceed to another
 * stage immediately, call immediate(stage, retcode) and return its result.
 *
 * To use a class:
 * 1) Construct the child class on the heap.
 * 2) Call begin().
 * 3) The destructor will be called once one of your functions returns true to
 * indicate it is done.
 *
 * Please note that while you can skip stages and get multiple Callback
 * objects at once, you *cannot* have any stage report that the Continuation
 * is completed while any other stage Callbacks are outstanding. It's best to
 * be serial unless you want to maintain your own metadata about which stages
 * are still pending.
 *
 * In fact, there are only two situations in which a stage should return
 * true while others are running:
 * 1) A Callback was issued and completed in the same thread,
 * 2) you called immediate(stage) and it is returning true.
 */

class Continuation {
  std::set<int> stages_in_flight;
  std::set<int> stages_processing;
  int rval;
  Context *on_finish;
  bool reported_done;

  class Callback : public Context {
    Continuation *continuation;
    int stage_to_activate;
  public:
    Callback(Continuation *c, int stage) :
      continuation(c),
      stage_to_activate(stage) {}
    void finish(int r) override {
      continuation->continue_function(r, stage_to_activate);
    }
  };

protected:
  typedef bool (Continuation::*stagePtr)(int r);
  /**
   * Continue immediately to the given stage. It will be executed
   * immediately, in the given thread.
   * @pre You are in a callback function.
   * @param stage The stage to execute
   * @param r The return code that will be provided to the next stage
   */
  bool immediate(int stage, int r) {
    ceph_assert(!stages_in_flight.count(stage));
    ceph_assert(!stages_processing.count(stage));
    stages_in_flight.insert(stage);
    stages_processing.insert(stage);
    return _continue_function(r, stage);
  }

  /**
   * Obtain a Context * that when complete()ed calls back into the given stage.
   * @pre You are in a callback function.
   * @param stage The stage this Context should activate
   */
  Context *get_callback(int stage) {
    stages_in_flight.insert(stage);
    return new Callback(this, stage);
  }

  /**
   * Set the return code that is passed to the finally-activated Context.
   * @param new_rval The return code to use.
   */
  void set_rval(int new_rval) { rval = new_rval; }
  int get_rval() { return rval; }

  /**
   * Register member functions as associated with a given stage. Start
   * your stage IDs at 0 and make that one the setup phase.
   * @pre There are no other functions associated with the stage.
   * @param stage The stage to associate this function with
   * @param func The function to use
   */
  void set_callback(int stage, stagePtr func) {
    ceph_assert(callbacks.find(stage) == callbacks.end());
    callbacks[stage] = func;
  }
  
  /**
   * Called when the Continuation is done, as determined by a stage returning
   * true and us having finished all the currently-processing ones.
   */
   virtual void _done() {
     on_finish->complete(rval);
     on_finish = NULL;
     return;
   }

private:
  std::map<int, Continuation::stagePtr> callbacks;

  bool _continue_function(int r, int n) {
    set<int>::iterator stage_iter = stages_in_flight.find(n);
    ceph_assert(stage_iter != stages_in_flight.end());
    ceph_assert(callbacks.count(n));
    stagePtr p = callbacks[n];

    pair<set<int>::iterator,bool> insert_r = stages_processing.insert(n);

    bool done = (this->*p)(r);
    if (done)
      reported_done = true;

    stages_processing.erase(insert_r.first);
    stages_in_flight.erase(stage_iter);
    return done;
  }

  void continue_function(int r, int stage) {
    bool done = _continue_function(r, stage);

    assert (!done ||
            stages_in_flight.size() == stages_processing.size());

    if ((done || reported_done) && stages_processing.empty()) {
      _done();
      delete this;
    }
  }



public:
  /**
   * Construct a new Continuation object. Call this from your child class,
   * obviously.
   *
   * @Param c The Context which should be complete()ed when this Continuation
   * is done.
   */
  Continuation(Context *c) :
    rval(0), on_finish(c), reported_done(false) {}
  /**
   * Clean up.
   */
  virtual ~Continuation() { ceph_assert(on_finish == NULL); }
  /**
   * Begin running the Continuation.
   */
  void begin() { stages_in_flight.insert(0); continue_function(0, 0); }
};