diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rgw/rgw_flight.h | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rgw/rgw_flight.h')
-rw-r--r-- | src/rgw/rgw_flight.h | 221 |
1 files changed, 221 insertions, 0 deletions
diff --git a/src/rgw/rgw_flight.h b/src/rgw/rgw_flight.h new file mode 100644 index 000000000..bb0a987d0 --- /dev/null +++ b/src/rgw/rgw_flight.h @@ -0,0 +1,221 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright 2023 IBM + * + * See file COPYING for licensing information. + */ + +#pragma once + +#include <map> +#include <mutex> +#include <atomic> + +#include "include/common_fwd.h" +#include "common/ceph_context.h" +#include "common/Thread.h" +#include "common/ceph_time.h" +#include "rgw_frontend.h" +#include "arrow/type.h" +#include "arrow/flight/server.h" +#include "arrow/util/string_view.h" + +#include "rgw_flight_frontend.h" + + +#define INFO_F(dp) ldpp_dout(&dp, 20) << "INFO: " << __func__ << ": " +#define STATUS_F(dp) ldpp_dout(&dp, 10) << "STATUS: " << __func__ << ": " +#define WARN_F(dp) ldpp_dout(&dp, 0) << "WARNING: " << __func__ << ": " +#define ERROR_F(dp) ldpp_dout(&dp, 0) << "ERROR: " << __func__ << ": " + +#define INFO INFO_F(dp) +#define STATUS STATUS_F(dp) +#define WARN WARN_F(dp) +#define ERROR ERROR_F(dp) + + +namespace arw = arrow; +namespace flt = arrow::flight; + + +struct req_state; + +namespace rgw::flight { + +static const coarse_real_clock::duration lifespan = std::chrono::hours(1); + +struct FlightData { + FlightKey key; + // coarse_real_clock::time_point expires; + std::string uri; + std::string tenant_name; + std::string bucket_name; + rgw_obj_key object_key; + // NB: what about object's namespace and instance? + uint64_t num_records; + uint64_t obj_size; + std::shared_ptr<arw::Schema> schema; + std::shared_ptr<const arw::KeyValueMetadata> kv_metadata; + + rgw_user user_id; // TODO: this should be removed when we do + // proper flight authentication + + FlightData(const std::string& _uri, + const std::string& _tenant_name, + const std::string& _bucket_name, + const rgw_obj_key& _object_key, + uint64_t _num_records, + uint64_t _obj_size, + std::shared_ptr<arw::Schema>& _schema, + std::shared_ptr<const arw::KeyValueMetadata>& _kv_metadata, + rgw_user _user_id); +}; + +// stores flights that have been created and helps expire them +class FlightStore { + +protected: + + const DoutPrefix& dp; + +public: + + FlightStore(const DoutPrefix& dp); + virtual ~FlightStore(); + virtual FlightKey add_flight(FlightData&& flight) = 0; + + // TODO consider returning const shared pointers to FlightData in + // the following two functions + virtual arw::Result<FlightData> get_flight(const FlightKey& key) const = 0; + virtual std::optional<FlightData> after_key(const FlightKey& key) const = 0; + + virtual int remove_flight(const FlightKey& key) = 0; + virtual int expire_flights() = 0; +}; + +class MemoryFlightStore : public FlightStore { + std::map<FlightKey, FlightData> map; + mutable std::mutex mtx; // for map + +public: + + MemoryFlightStore(const DoutPrefix& dp); + virtual ~MemoryFlightStore(); + FlightKey add_flight(FlightData&& flight) override; + arw::Result<FlightData> get_flight(const FlightKey& key) const override; + std::optional<FlightData> after_key(const FlightKey& key) const override; + int remove_flight(const FlightKey& key) override; + int expire_flights() override; +}; + +class FlightServer : public flt::FlightServerBase { + + using Data1 = std::vector<std::shared_ptr<arw::RecordBatch>>; + + RGWProcessEnv& env; + rgw::sal::Driver* driver; + const DoutPrefix& dp; + FlightStore* flight_store; + + std::map<std::string, Data1> data; + +public: + + static constexpr int default_port = 8077; + + FlightServer(RGWProcessEnv& env, + FlightStore* flight_store, + const DoutPrefix& dp); + ~FlightServer() override; + + FlightStore* get_flight_store() { + return flight_store; + } + + arw::Status ListFlights(const flt::ServerCallContext& context, + const flt::Criteria* criteria, + std::unique_ptr<flt::FlightListing>* listings) override; + + arw::Status GetFlightInfo(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr<flt::FlightInfo> *info) override; + + arw::Status GetSchema(const flt::ServerCallContext &context, + const flt::FlightDescriptor &request, + std::unique_ptr<flt::SchemaResult> *schema) override; + + arw::Status DoGet(const flt::ServerCallContext &context, + const flt::Ticket &request, + std::unique_ptr<flt::FlightDataStream> *stream) override; +}; // class FlightServer + +class OwningStringView : public arw::util::string_view { + + uint8_t* buffer; + int64_t capacity; + int64_t consumed; + + OwningStringView(uint8_t* _buffer, int64_t _size) : + arw::util::string_view((const char*) _buffer, _size), + buffer(_buffer), + capacity(_size), + consumed(_size) + { } + + OwningStringView(OwningStringView&& from, int64_t new_size) : + buffer(nullptr), + capacity(from.capacity), + consumed(new_size) + { + // should be impossible due to static function check + ceph_assertf(consumed <= capacity, "new size cannot exceed capacity"); + + std::swap(buffer, from.buffer); + from.capacity = 0; + from.consumed = 0; + } + +public: + + OwningStringView(OwningStringView&&) = default; + OwningStringView& operator=(OwningStringView&&) = default; + + uint8_t* writeable_data() { + return buffer; + } + + ~OwningStringView() { + if (buffer) { + delete[] buffer; + } + } + + static arw::Result<OwningStringView> make(int64_t size) { + uint8_t* buffer = new uint8_t[size]; + if (!buffer) { + return arw::Status::OutOfMemory("could not allocated buffer of size %" PRId64, size); + } + return OwningStringView(buffer, size); + } + + static arw::Result<OwningStringView> shrink(OwningStringView&& from, + int64_t new_size) { + if (new_size > from.capacity) { + return arw::Status::Invalid("new size cannot exceed capacity"); + } else { + return OwningStringView(std::move(from), new_size); + } + } + +}; + +// GLOBAL + +flt::Ticket FlightKeyToTicket(const FlightKey& key); +arw::Status TicketToFlightKey(const flt::Ticket& t, FlightKey& key); + +} // namespace rgw::flight |