From 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 20:24:20 +0200 Subject: Adding upstream version 14.2.21. Signed-off-by: Daniel Baumann --- src/rgw/rgw_putobj_processor.h | 263 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 263 insertions(+) create mode 100644 src/rgw/rgw_putobj_processor.h (limited to 'src/rgw/rgw_putobj_processor.h') diff --git a/src/rgw/rgw_putobj_processor.h b/src/rgw/rgw_putobj_processor.h new file mode 100644 index 00000000..8d265f17 --- /dev/null +++ b/src/rgw/rgw_putobj_processor.h @@ -0,0 +1,263 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2018 Red Hat, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include + +#include "rgw_putobj.h" +#include "rgw_rados.h" +#include "services/svc_rados.h" + +namespace rgw { + +class Aio; + +namespace putobj { + +// a data consumer that writes an object in a bucket +class ObjectProcessor : public DataProcessor { + public: + // prepare to start processing object data + virtual int prepare() = 0; + + // complete the operation and make its result visible to clients + virtual int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled) = 0; +}; + +// an object processor with special handling for the first chunk of the head. +// the virtual process_first_chunk() function returns a processor to handle the +// rest of the object +class HeadObjectProcessor : public ObjectProcessor { + uint64_t head_chunk_size; + // buffer to capture the first chunk of the head object + bufferlist head_data; + // initialized after process_first_chunk() to process everything else + DataProcessor *processor = nullptr; + uint64_t data_offset = 0; // maximum offset of data written (ie compressed) + protected: + uint64_t get_actual_size() const { return data_offset; } + + // process the first chunk of data and return a processor for the rest + virtual int process_first_chunk(bufferlist&& data, + DataProcessor **processor) = 0; + public: + HeadObjectProcessor(uint64_t head_chunk_size) + : head_chunk_size(head_chunk_size) + {} + + void set_head_chunk_size(uint64_t size) { head_chunk_size = size; } + + // cache first chunk for process_first_chunk(), then forward everything else + // to the returned processor + int process(bufferlist&& data, uint64_t logical_offset) final override; +}; + + +using RawObjSet = std::set; + +// a data sink that writes to rados objects and deletes them on cancelation +class RadosWriter : public DataProcessor { + Aio *const aio; + RGWRados *const store; + const RGWBucketInfo& bucket_info; + RGWObjectCtx& obj_ctx; + const rgw_obj head_obj; + RGWSI_RADOS::Obj stripe_obj; // current stripe object + RawObjSet written; // set of written objects for deletion + + public: + RadosWriter(Aio *aio, RGWRados *store, const RGWBucketInfo& bucket_info, + RGWObjectCtx& obj_ctx, const rgw_obj& head_obj) + : aio(aio), store(store), bucket_info(bucket_info), + obj_ctx(obj_ctx), head_obj(head_obj) + {} + ~RadosWriter(); + + // change the current stripe object + int set_stripe_obj(const rgw_raw_obj& obj); + + // write the data at the given offset of the current stripe object + int process(bufferlist&& data, uint64_t stripe_offset) override; + + // write the data as an exclusive create and wait for it to complete + int write_exclusive(const bufferlist& data); + + int drain(); + + // when the operation completes successfully, clear the set of written objects + // so they aren't deleted on destruction + void clear_written() { written.clear(); } +}; + +// a rados object processor that stripes according to RGWObjManifest +class ManifestObjectProcessor : public HeadObjectProcessor, + public StripeGenerator { + protected: + RGWRados *const store; + const RGWBucketInfo& bucket_info; + rgw_placement_rule tail_placement_rule; + const rgw_user& owner; + RGWObjectCtx& obj_ctx; + rgw_obj head_obj; + + RadosWriter writer; + RGWObjManifest manifest; + RGWObjManifest::generator manifest_gen; + ChunkProcessor chunk; + StripeProcessor stripe; + + // implements StripeGenerator + int next(uint64_t offset, uint64_t *stripe_size) override; + + public: + ManifestObjectProcessor(Aio *aio, RGWRados *store, + const RGWBucketInfo& bucket_info, + const rgw_placement_rule *ptail_placement_rule, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_obj& head_obj) + : HeadObjectProcessor(0), + store(store), bucket_info(bucket_info), + owner(owner), + obj_ctx(obj_ctx), head_obj(head_obj), + writer(aio, store, bucket_info, obj_ctx, head_obj), + chunk(&writer, 0), stripe(&chunk, this, 0) { + if (ptail_placement_rule) { + tail_placement_rule = *ptail_placement_rule; + } + } + + void set_tail_placement(const rgw_placement_rule&& tpr) { + tail_placement_rule = tpr; + } +}; + + +// a processor that completes with an atomic write to the head object as part of +// a bucket index transaction +class AtomicObjectProcessor : public ManifestObjectProcessor { + const std::optional olh_epoch; + const std::string unique_tag; + bufferlist first_chunk; // written with the head in complete() + + int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; + public: + AtomicObjectProcessor(Aio *aio, RGWRados *store, + const RGWBucketInfo& bucket_info, + const rgw_placement_rule *ptail_placement_rule, + const rgw_user& owner, + RGWObjectCtx& obj_ctx, const rgw_obj& head_obj, + std::optional olh_epoch, + const std::string& unique_tag) + : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule, + owner, obj_ctx, head_obj), + olh_epoch(olh_epoch), unique_tag(unique_tag) + {} + + // prepare a trivial manifest + int prepare() override; + // write the head object atomically in a bucket index transaction + int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled) override; + +}; + + +// a processor for multipart parts, which don't require atomic completion. the +// part's head is written with an exclusive create to detect racing uploads of +// the same part/upload id, which are restarted with a random oid prefix +class MultipartObjectProcessor : public ManifestObjectProcessor { + const rgw_obj target_obj; // target multipart object + const std::string upload_id; + const int part_num; + const std::string part_num_str; + RGWMPObj mp; + + // write the first chunk and wait on aio->drain() for its completion. + // on EEXIST, retry with random prefix + int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; + // prepare the head stripe and manifest + int prepare_head(); + public: + MultipartObjectProcessor(Aio *aio, RGWRados *store, + const RGWBucketInfo& bucket_info, + const rgw_placement_rule *ptail_placement_rule, + const rgw_user& owner, RGWObjectCtx& obj_ctx, + const rgw_obj& head_obj, + const std::string& upload_id, uint64_t part_num, + const std::string& part_num_str) + : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule, + owner, obj_ctx, head_obj), + target_obj(head_obj), upload_id(upload_id), + part_num(part_num), part_num_str(part_num_str), + mp(head_obj.key.name, upload_id) + {} + + // prepare a multipart manifest + int prepare() override; + // write the head object attributes in a bucket index transaction, then + // register the completed part with the multipart meta object + int complete(size_t accounted_size, const std::string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + std::map& attrs, + ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, + const std::string *user_data, + rgw_zone_set *zones_trace, bool *canceled) override; +}; + + class AppendObjectProcessor : public ManifestObjectProcessor { + uint64_t cur_part_num; + uint64_t position; + uint64_t cur_size; + uint64_t *cur_accounted_size; + string cur_etag; + const std::string unique_tag; + + RGWObjManifest *cur_manifest; + + int process_first_chunk(bufferlist&& data, DataProcessor **processor) override; + + public: + AppendObjectProcessor(Aio *aio, RGWRados *store, const RGWBucketInfo& bucket_info, + const rgw_placement_rule *ptail_placement_rule, + const rgw_user& owner, RGWObjectCtx& obj_ctx,const rgw_obj& head_obj, + const std::string& unique_tag, uint64_t position, uint64_t *cur_accounted_size) + : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule, owner, obj_ctx, head_obj), + position(position), cur_size(0), cur_accounted_size(cur_accounted_size), + unique_tag(unique_tag), cur_manifest(nullptr) + {} + int prepare() override; + int complete(size_t accounted_size, const string& etag, + ceph::real_time *mtime, ceph::real_time set_mtime, + map& attrs, ceph::real_time delete_at, + const char *if_match, const char *if_nomatch, const string *user_data, + rgw_zone_set *zones_trace, bool *canceled) override; + }; + +} // namespace putobj +} // namespace rgw + -- cgit v1.2.3