diff options
Diffstat (limited to 'src/rgw/rgw_putobj.cc')
-rw-r--r-- | src/rgw/rgw_putobj.cc | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/src/rgw/rgw_putobj.cc b/src/rgw/rgw_putobj.cc new file mode 100644 index 000000000..24a4b3275 --- /dev/null +++ b/src/rgw/rgw_putobj.cc @@ -0,0 +1,99 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "rgw_putobj.h" + +namespace rgw::putobj { + +int ChunkProcessor::process(bufferlist&& data, uint64_t offset) +{ + ceph_assert(offset >= chunk.length()); + uint64_t position = offset - chunk.length(); + + const bool flush = (data.length() == 0); + if (flush) { + if (chunk.length() > 0) { + int r = Pipe::process(std::move(chunk), position); + if (r < 0) { + return r; + } + } + return Pipe::process({}, offset); + } + chunk.claim_append(data); + + // write each full chunk + while (chunk.length() >= chunk_size) { + bufferlist bl; + chunk.splice(0, chunk_size, &bl); + + int r = Pipe::process(std::move(bl), position); + if (r < 0) { + return r; + } + position += chunk_size; + } + return 0; +} + + +int StripeProcessor::process(bufferlist&& data, uint64_t offset) +{ + ceph_assert(offset >= bounds.first); + + const bool flush = (data.length() == 0); + if (flush) { + return Pipe::process({}, offset - bounds.first); + } + + auto max = bounds.second - offset; + while (data.length() > max) { + if (max > 0) { + bufferlist bl; + data.splice(0, max, &bl); + + int r = Pipe::process(std::move(bl), offset - bounds.first); + if (r < 0) { + return r; + } + offset += max; + } + + // flush the current chunk + int r = Pipe::process({}, offset - bounds.first); + if (r < 0) { + return r; + } + // generate the next stripe + uint64_t stripe_size; + r = gen->next(offset, &stripe_size); + if (r < 0) { + return r; + } + ceph_assert(stripe_size > 0); + + bounds.first = offset; + bounds.second = offset + stripe_size; + + max = stripe_size; + } + + if (data.length() == 0) { // don't flush the chunk here + return 0; + } + return Pipe::process(std::move(data), offset - bounds.first); +} + +} // namespace rgw::putobj |