summaryrefslogtreecommitdiffstats
path: root/src/seastar/include/seastar/rpc/rpc_types.hh
blob: b411bbd37ad1244af98c5232d43fe585d663a65d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
/*
 * This file is open source software, licensed to you under the terms
 * of the Apache License, Version 2.0 (the "License").  See the NOTICE file
 * distributed with this work for additional information regarding copyright
 * ownership.  You may not use this file except in compliance with the License.
 *
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
/*
 * Copyright (C) 2015 Cloudius Systems, Ltd.
 */

#pragma once

#include <seastar/net/api.hh>
#include <stdexcept>
#include <string>
#include <boost/any.hpp>
#include <boost/type.hpp>
#include <seastar/util/std-compat.hh>
#include <seastar/util/variant_utils.hh>
#include <seastar/core/timer.hh>
#include <seastar/core/circular_buffer.hh>
#include <seastar/core/simple-stream.hh>
#include <seastar/core/lowres_clock.hh>
#include <boost/functional/hash.hpp>
#include <seastar/core/sharded.hh>

namespace seastar {

namespace rpc {

using rpc_clock_type = lowres_clock;

// used to tag a type for serializers
template<typename T>
using type = boost::type<T>;

struct stats {
    using counter_type = uint64_t;
    counter_type replied = 0;
    counter_type pending = 0;
    counter_type exception_received = 0;
    counter_type sent_messages = 0;
    counter_type wait_reply = 0;
    counter_type timeout = 0;
};


struct client_info {
    socket_address addr;
    std::unordered_map<sstring, boost::any> user_data;
    template <typename T>
    void attach_auxiliary(const sstring& key, T&& object) {
        user_data.emplace(key, boost::any(std::forward<T>(object)));
    }
    template <typename T>
    T& retrieve_auxiliary(const sstring& key) {
        auto it = user_data.find(key);
        assert(it != user_data.end());
        return boost::any_cast<T&>(it->second);
    }
    template <typename T>
    typename std::add_const<T>::type& retrieve_auxiliary(const sstring& key) const {
        return const_cast<client_info*>(this)->retrieve_auxiliary<typename std::add_const<T>::type>(key);
    }
};

class error : public std::runtime_error {
public:
    error(const std::string& msg) : std::runtime_error(msg) {}
};

class closed_error : public error {
public:
    closed_error() : error("connection is closed") {}
};

class timeout_error : public error {
public:
    timeout_error() : error("rpc call timed out") {}
};

class unknown_verb_error : public error {
public:
    uint64_t type;
    unknown_verb_error(uint64_t type_) : error("unknown verb"), type(type_) {}
};

class unknown_exception_error : public error {
public:
    unknown_exception_error() : error("unknown exception") {}
};

class rpc_protocol_error : public error {
public:
    rpc_protocol_error() : error("rpc protocol exception") {}
};

class canceled_error : public error {
public:
    canceled_error() : error("rpc call was canceled") {}
};

class stream_closed : public error {
public:
    stream_closed() : error("rpc stream was closed by peer") {}
};

struct no_wait_type {};

// return this from a callback if client does not want to waiting for a reply
extern no_wait_type no_wait;

/// \addtogroup rpc
/// @{

template <typename T>
class optional : public std::optional<T> {
public:
     using std::optional<T>::optional;
};

class opt_time_point : public std::optional<rpc_clock_type::time_point> {
public:
     using std::optional<rpc_clock_type::time_point>::optional;
     opt_time_point(std::optional<rpc_clock_type::time_point> time_point) {
         static_cast<std::optional<rpc_clock_type::time_point>&>(*this) = time_point;
     }
};

/// @}

struct cancellable {
    std::function<void()> cancel_send;
    std::function<void()> cancel_wait;
    cancellable** send_back_pointer = nullptr;
    cancellable** wait_back_pointer = nullptr;
    cancellable() = default;
    cancellable(cancellable&& x) : cancel_send(std::move(x.cancel_send)), cancel_wait(std::move(x.cancel_wait)), send_back_pointer(x.send_back_pointer), wait_back_pointer(x.wait_back_pointer) {
        if (send_back_pointer) {
            *send_back_pointer = this;
            x.send_back_pointer = nullptr;
        }
        if (wait_back_pointer) {
            *wait_back_pointer = this;
            x.wait_back_pointer = nullptr;
        }
    }
    cancellable& operator=(cancellable&& x) {
        if (&x != this) {
            this->~cancellable();
            new (this) cancellable(std::move(x));
        }
        return *this;
    }
    void cancel() {
        if (cancel_send) {
            cancel_send();
        }
        if (cancel_wait) {
            cancel_wait();
        }
    }
    ~cancellable() {
        cancel();
    }
};

struct rcv_buf {
    uint32_t size = 0;
    std::optional<semaphore_units<>> su;
    std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
    using iterator = std::vector<temporary_buffer<char>>::iterator;
    rcv_buf() {}
    explicit rcv_buf(size_t size_) : size(size_) {}
    explicit rcv_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};
    explicit rcv_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
        : size(size), bufs(std::move(bufs)) {};
};

struct snd_buf {
    // Preferred, but not required, chunk size.
    static constexpr size_t chunk_size = 128*1024;
    uint32_t size = 0;
    std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
    using iterator = std::vector<temporary_buffer<char>>::iterator;
    snd_buf() {}
    snd_buf(snd_buf&&) noexcept;
    snd_buf& operator=(snd_buf&&) noexcept;
    explicit snd_buf(size_t size_);
    explicit snd_buf(temporary_buffer<char> b) : size(b.size()), bufs(std::move(b)) {};

    explicit snd_buf(std::vector<temporary_buffer<char>> bufs, size_t size)
        : size(size), bufs(std::move(bufs)) {};

    temporary_buffer<char>& front();
};

static inline memory_input_stream<rcv_buf::iterator> make_deserializer_stream(rcv_buf& input) {
    auto* b = std::get_if<temporary_buffer<char>>(&input.bufs);
    if (b) {
        return memory_input_stream<rcv_buf::iterator>(memory_input_stream<rcv_buf::iterator>::simple(b->begin(), b->size()));
    } else {
        auto& ar = std::get<std::vector<temporary_buffer<char>>>(input.bufs);
        return memory_input_stream<rcv_buf::iterator>(memory_input_stream<rcv_buf::iterator>::fragmented(ar.begin(), input.size));
    }
}

class compressor {
public:
    virtual ~compressor() {}
    // compress data and leave head_space bytes at the beginning of returned buffer
    virtual snd_buf compress(size_t head_space, snd_buf data) = 0;
    // decompress data
    virtual rcv_buf decompress(rcv_buf data) = 0;
    virtual sstring name() const = 0;
    
    // factory to create compressor for a connection
    class factory {
    public:
        virtual ~factory() {}
        // return feature string that will be sent as part of protocol negotiation
        virtual const sstring& supported() const = 0;
        // negotiate compress algorithm
        virtual std::unique_ptr<compressor> negotiate(sstring feature, bool is_server) const = 0;
    };
};

class connection;

struct connection_id {
    uint64_t id;
    bool operator==(const connection_id& o) const {
        return id == o.id;
    }
    operator bool() const {
        return shard() != 0xffff;
    }
    size_t shard() const {
        return size_t(id & 0xffff);
    }
    constexpr static connection_id make_invalid_id(uint64_t id = 0) {
        return make_id(id, 0xffff);
    }
    constexpr static connection_id make_id(uint64_t id, uint16_t shard) {
        return {id << 16 | shard};
    }
};

constexpr connection_id invalid_connection_id = connection_id::make_invalid_id();

std::ostream& operator<<(std::ostream&, const connection_id&);

using xshard_connection_ptr = lw_shared_ptr<foreign_ptr<shared_ptr<connection>>>;
constexpr size_t max_queued_stream_buffers = 50;
constexpr size_t max_stream_buffers_memory = 100 * 1024;

/// \addtogroup rpc
/// @{

// send data Out...
template<typename... Out>
class sink {
public:
    class impl {
    protected:
        xshard_connection_ptr _con;
        semaphore _sem;
        std::exception_ptr _ex;
        impl(xshard_connection_ptr con) : _con(std::move(con)), _sem(max_stream_buffers_memory) {}
    public:
        virtual ~impl() {};
        virtual future<> operator()(const Out&... args) = 0;
        virtual future<> close() = 0;
        virtual future<> flush() = 0;
        friend sink;
    };

private:
    shared_ptr<impl> _impl;

public:
    sink(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
    future<> operator()(const Out&... args) {
        return _impl->operator()(args...);
    }
    future<> close() {
        return _impl->close();
    }
    // Calling this function makes sure that any data buffered
    // by the stream sink will be flushed to the network.
    // It does not mean the data was received by the corresponding
    // source.
    future<> flush() {
        return _impl->flush();
    }
    connection_id get_id() const;
};

// receive data In...
template<typename... In>
class source {
public:
    class impl {
    protected:
        xshard_connection_ptr _con;
        circular_buffer<foreign_ptr<std::unique_ptr<rcv_buf>>> _bufs;
        impl(xshard_connection_ptr con) : _con(std::move(con)) {
            _bufs.reserve(max_queued_stream_buffers);
        }
    public:
        virtual ~impl() {}
        virtual future<std::optional<std::tuple<In...>>> operator()() = 0;
        friend source;
    };
private:
    shared_ptr<impl> _impl;

public:
    source(shared_ptr<impl> impl) : _impl(std::move(impl)) {}
    future<std::optional<std::tuple<In...>>> operator()() {
        return _impl->operator()();
    };
    connection_id get_id() const;
    template<typename Serializer, typename... Out> sink<Out...> make_sink();
};

/// Used to return multiple values in rpc without variadic futures
///
/// If you wish to return multiple values from an rpc procedure, use a
/// signature `future<rpc::tuple<return type list> (argument list)>>`. This
/// will be marshalled by rpc, so you do not need to have your Serializer
/// serialize/deserialize this tuple type. The serialization format is
/// compatible with the deprecated variadic future support, and is compatible
/// with adding new return types in a backwards compatible way provided new
/// parameters are appended only, and wrapped with rpc::optional:
/// `future<rpc::tuple<existing return type list, rpc::optional<new_return_type>>> (argument list)`
///
/// You may also use another tuple type, such as std::tuple. In this case,
/// your Serializer type must recognize your tuple type and provide serialization
/// and deserialization for it.
template <typename... T>
class tuple : public std::tuple<T...> {
public:
    using std::tuple<T...>::tuple;
    tuple(std::tuple<T...>&& x) : std::tuple<T...>(std::move(x)) {}
};

/// @}

template <typename... T>
tuple(T&&...) ->  tuple<T...>;

} // namespace rpc

}

namespace std {
template<>
struct hash<seastar::rpc::connection_id> {
    size_t operator()(const seastar::rpc::connection_id& id) const {
        size_t h = 0;
        boost::hash_combine(h, std::hash<uint64_t>{}(id.id));
        return h;
    }
};

template <typename... T>
struct tuple_size<seastar::rpc::tuple<T...>> : tuple_size<tuple<T...>> {
};

template <size_t I, typename... T>
struct tuple_element<I, seastar::rpc::tuple<T...>> : tuple_element<I, tuple<T...>> {
};

}