1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include "include/rados/librados_fwd.hpp"
#include <memory>
#include "common/ceph_mutex.h"
#include "services/svc_rados.h"
#include "rgw_aio.h"
namespace rgw {
// a throttle for aio operations that enforces a maximum window on outstanding
// bytes. only supports a single waiter, so all public functions must be called
// from the same thread
class AioThrottle : public Aio {
protected:
const uint64_t window;
uint64_t pending_size = 0;
bool is_available() const { return pending_size <= window; }
bool has_completion() const { return !completed.empty(); }
bool is_drained() const { return pending.empty(); }
struct Pending : AioResultEntry {
AioThrottle *parent = nullptr;
uint64_t cost = 0;
librados::AioCompletion *completion = nullptr;
};
OwningList<Pending> pending;
AioResultList completed;
enum class Wait { None, Available, Completion, Drained };
Wait waiter = Wait::None;
bool waiter_ready() const;
ceph::mutex mutex = ceph::make_mutex("AioThrottle");
ceph::condition_variable cond;
void get(Pending& p);
void put(Pending& p);
static void aio_cb(void *cb, void *arg);
public:
AioThrottle(uint64_t window) : window(window) {}
virtual ~AioThrottle() {
// must drain before destructing
ceph_assert(pending.empty());
ceph_assert(completed.empty());
}
AioResultList submit(RGWSI_RADOS::Obj& obj,
librados::ObjectReadOperation *op,
uint64_t cost, uint64_t id) override;
AioResultList submit(RGWSI_RADOS::Obj& obj,
librados::ObjectWriteOperation *op,
uint64_t cost, uint64_t id) override;
AioResultList poll() override;
AioResultList wait() override;
AioResultList drain() override;
};
} // namespace rgw
|