1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include "include/rados/librados_fwd.hpp"
#include <memory>
#include "common/ceph_mutex.h"
#include "common/async/completion.h"
#include "common/async/yield_context.h"
#include "services/svc_rados.h"
#include "rgw_aio.h"
namespace rgw {
class Throttle {
protected:
const uint64_t window;
uint64_t pending_size = 0;
AioResultList pending;
AioResultList completed;
bool is_available() const { return pending_size <= window; }
bool has_completion() const { return !completed.empty(); }
bool is_drained() const { return pending.empty(); }
enum class Wait { None, Available, Completion, Drained };
Wait waiter = Wait::None;
bool waiter_ready() const;
public:
Throttle(uint64_t window) : window(window) {}
virtual ~Throttle() {
// must drain before destructing
ceph_assert(pending.empty());
ceph_assert(completed.empty());
}
};
// a throttle for aio operations. all public functions must be called from
// the same thread
class BlockingAioThrottle final : public Aio, private Throttle {
ceph::mutex mutex = ceph::make_mutex("AioThrottle");
ceph::condition_variable cond;
struct Pending : AioResultEntry {
BlockingAioThrottle *parent = nullptr;
uint64_t cost = 0;
librados::AioCompletion *completion = nullptr;
};
public:
BlockingAioThrottle(uint64_t window) : Throttle(window) {}
virtual ~BlockingAioThrottle() override {};
AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
uint64_t cost, uint64_t id) override final;
void put(AioResult& r) override final;
AioResultList poll() override final;
AioResultList wait() override final;
AioResultList drain() override final;
};
// a throttle that yields the coroutine instead of blocking. all public
// functions must be called within the coroutine strand
class YieldingAioThrottle final : public Aio, private Throttle {
boost::asio::io_context& context;
yield_context yield;
struct Handler;
// completion callback associated with the waiter
using Completion = ceph::async::Completion<void(boost::system::error_code)>;
std::unique_ptr<Completion> completion;
template <typename CompletionToken>
auto async_wait(CompletionToken&& token);
struct Pending : AioResultEntry { uint64_t cost = 0; };
public:
YieldingAioThrottle(uint64_t window, boost::asio::io_context& context,
yield_context yield)
: Throttle(window), context(context), yield(yield)
{}
virtual ~YieldingAioThrottle() override {};
AioResultList get(const RGWSI_RADOS::Obj& obj, OpFunc&& f,
uint64_t cost, uint64_t id) override final;
void put(AioResult& r) override final;
AioResultList poll() override final;
AioResultList wait() override final;
AioResultList drain() override final;
};
// return a smart pointer to Aio
inline auto make_throttle(uint64_t window_size, optional_yield y)
{
std::unique_ptr<Aio> aio;
if (y) {
aio = std::make_unique<YieldingAioThrottle>(window_size,
y.get_io_context(),
y.get_yield_context());
} else {
aio = std::make_unique<BlockingAioThrottle>(window_size);
}
return aio;
}
} // namespace rgw
|