1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab ft=cpp
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2018 Red Hat, Inc.
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "rgw_putobj.h"
namespace rgw::putobj {
int ChunkProcessor::process(bufferlist&& data, uint64_t offset)
{
ceph_assert(offset >= chunk.length());
uint64_t position = offset - chunk.length();
const bool flush = (data.length() == 0);
if (flush) {
if (chunk.length() > 0) {
int r = Pipe::process(std::move(chunk), position);
if (r < 0) {
return r;
}
}
return Pipe::process({}, offset);
}
chunk.claim_append(data);
// write each full chunk
while (chunk.length() >= chunk_size) {
bufferlist bl;
chunk.splice(0, chunk_size, &bl);
int r = Pipe::process(std::move(bl), position);
if (r < 0) {
return r;
}
position += chunk_size;
}
return 0;
}
int StripeProcessor::process(bufferlist&& data, uint64_t offset)
{
ceph_assert(offset >= bounds.first);
const bool flush = (data.length() == 0);
if (flush) {
return Pipe::process({}, offset - bounds.first);
}
auto max = bounds.second - offset;
while (data.length() > max) {
if (max > 0) {
bufferlist bl;
data.splice(0, max, &bl);
int r = Pipe::process(std::move(bl), offset - bounds.first);
if (r < 0) {
return r;
}
offset += max;
}
// flush the current chunk
int r = Pipe::process({}, offset - bounds.first);
if (r < 0) {
return r;
}
// generate the next stripe
uint64_t stripe_size;
r = gen->next(offset, &stripe_size);
if (r < 0) {
return r;
}
ceph_assert(stripe_size > 0);
bounds.first = offset;
bounds.second = offset + stripe_size;
max = stripe_size;
}
if (data.length() == 0) { // don't flush the chunk here
return 0;
}
return Pipe::process(std::move(data), offset - bounds.first);
}
} // namespace rgw::putobj
|