diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rgw/rgw_flight_frontend.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rgw/rgw_flight_frontend.cc | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/src/rgw/rgw_flight_frontend.cc b/src/rgw/rgw_flight_frontend.cc new file mode 100644 index 000000000..c29703fe5 --- /dev/null +++ b/src/rgw/rgw_flight_frontend.cc @@ -0,0 +1,246 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * Ceph - scalable distributed file system + * + * Copyright 2023 IBM + * + * See file COPYING for licensing information. + */ + +#include <cstdio> +#include <filesystem> +#include <sstream> + +#include "arrow/type.h" +#include "arrow/flight/server.h" +#include "arrow/io/file.h" + +#include "parquet/arrow/reader.h" +#include "parquet/arrow/schema.h" +#include "parquet/stream_reader.h" + +#include "rgw_flight_frontend.h" +#include "rgw_flight.h" + + +// logging +constexpr unsigned dout_subsys = ceph_subsys_rgw_flight; +constexpr const char* dout_prefix_str = "rgw arrow_flight: "; + + +namespace rgw::flight { + +const FlightKey null_flight_key = 0; + +FlightFrontend::FlightFrontend(RGWProcessEnv& _env, + RGWFrontendConfig* _config, + int _port) : + env(_env), + config(_config), + port(_port), + dp(env.driver->ctx(), dout_subsys, dout_prefix_str) +{ + env.flight_store = new MemoryFlightStore(dp); + env.flight_server = new FlightServer(env, env.flight_store, dp); + INFO << "flight server started" << dendl; +} + +FlightFrontend::~FlightFrontend() { + delete env.flight_server; + env.flight_server = nullptr; + + delete env.flight_store; + env.flight_store = nullptr; + + INFO << "flight server shut down" << dendl; +} + +int FlightFrontend::init() { + if (port <= 0) { + port = FlightServer::default_port; + } + const std::string url = + std::string("grpc+tcp://localhost:") + std::to_string(port); + flt::Location location; + arw::Status s = flt::Location::Parse(url, &location); + if (!s.ok()) { + ERROR << "couldn't parse url=" << url << ", status=" << s << dendl; + return -EINVAL; + } + + flt::FlightServerOptions options(location); + options.verify_client = false; + s = env.flight_server->Init(options); + if (!s.ok()) { + ERROR << "couldn't init flight server; status=" << s << dendl; + return -EINVAL; + } + + INFO << "FlightServer inited; will use port " << port << dendl; + return 0; +} + +int FlightFrontend::run() { + try { + flight_thread = make_named_thread(server_thread_name, + &FlightServer::Serve, + env.flight_server); + + INFO << "FlightServer thread started, id=" << + flight_thread.get_id() << + ", joinable=" << flight_thread.joinable() << dendl; + return 0; + } catch (std::system_error& e) { + ERROR << "FlightServer thread failed to start" << dendl; + return -e.code().value(); + } +} + +void FlightFrontend::stop() { + env.flight_server->Shutdown(); + env.flight_server->Wait(); + INFO << "FlightServer shut down" << dendl; +} + +void FlightFrontend::join() { + flight_thread.join(); + INFO << "FlightServer thread joined" << dendl; +} + +void FlightFrontend::pause_for_new_config() { + // ignore since config changes won't alter flight_server +} + +void FlightFrontend::unpause_with_new_config() { + // ignore since config changes won't alter flight_server +} + +/* ************************************************************ */ + +FlightGetObj_Filter::FlightGetObj_Filter(const req_state* request, + RGWGetObj_Filter* next) : + RGWGetObj_Filter(next), + penv(request->penv), + dp(request->cct->get(), dout_subsys, dout_prefix_str), + current_offset(0), + expected_size(request->obj_size), + uri(request->decoded_uri), + tenant_name(request->bucket->get_tenant()), + bucket_name(request->bucket->get_name()), + object_key(request->object->get_key()), + // note: what about object namespace and instance? + schema_status(arrow::StatusCode::Cancelled, + "schema determination incomplete"), + user_id(request->user->get_id()) +{ +#warning "TODO: fix use of tmpnam" + char name[L_tmpnam]; + const char* namep = std::tmpnam(name); + if (!namep) { + // + } + temp_file_name = namep; + + temp_file.open(temp_file_name); +} + +FlightGetObj_Filter::~FlightGetObj_Filter() { + if (temp_file.is_open()) { + temp_file.close(); + } + std::error_code error; + std::filesystem::remove(temp_file_name, error); + if (error) { + ERROR << "FlightGetObj_Filter got error when removing temp file; " + "error=" << error.value() << + ", temp_file_name=" << temp_file_name << dendl; + } else { + INFO << "parquet/arrow schema determination status: " << + schema_status << dendl; + } +} + +int FlightGetObj_Filter::handle_data(bufferlist& bl, + off_t bl_ofs, off_t bl_len) { + INFO << "flight handling data from offset " << + current_offset << " (" << bl_ofs << ") of size " << bl_len << dendl; + + current_offset += bl_len; + + if (temp_file.is_open()) { + bl.write_stream(temp_file); + + if (current_offset >= expected_size) { + INFO << "data read is completed, current_offset=" << + current_offset << ", expected_size=" << expected_size << dendl; + temp_file.close(); + + std::shared_ptr<const arw::KeyValueMetadata> kv_metadata; + std::shared_ptr<arw::Schema> aw_schema; + int64_t num_rows = 0; + + auto process_metadata = [&aw_schema, &num_rows, &kv_metadata, this]() -> arrow::Status { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::ReadableFile> file, + arrow::io::ReadableFile::Open(temp_file_name)); + const std::shared_ptr<parquet::FileMetaData> metadata = parquet::ReadMetaData(file); + + file->Close(); + + num_rows = metadata->num_rows(); + kv_metadata = metadata->key_value_metadata(); + const parquet::SchemaDescriptor* pq_schema = metadata->schema(); + ARROW_RETURN_NOT_OK(parquet::arrow::FromParquetSchema(pq_schema, &aw_schema)); + + return arrow::Status::OK(); + }; + + schema_status = process_metadata(); + if (!schema_status.ok()) { + ERROR << "reading metadata to access schema, error=" << schema_status << dendl; + } else { + // INFO << "arrow_schema=" << *aw_schema << dendl; + FlightStore* store = penv.flight_store; + auto key = + store->add_flight(FlightData(uri, tenant_name, bucket_name, + object_key, num_rows, + expected_size, aw_schema, + kv_metadata, user_id)); + (void) key; // suppress unused variable warning + } + } // if last block + } // if file opened + + // chain to next filter in stream + int ret = RGWGetObj_Filter::handle_data(bl, bl_ofs, bl_len); + + return ret; +} + +#if 0 +void code_snippets() { + INFO << "num_columns:" << md->num_columns() << + " num_schema_elements:" << md->num_schema_elements() << + " num_rows:" << md->num_rows() << + " num_row_groups:" << md->num_row_groups() << dendl; + + + INFO << "file schema: name=" << schema1->name() << ", ToString:" << schema1->ToString() << ", num_columns=" << schema1->num_columns() << dendl; + for (int c = 0; c < schema1->num_columns(); ++c) { + const parquet::ColumnDescriptor* cd = schema1->Column(c); + // const parquet::ConvertedType::type t = cd->converted_type; + const std::shared_ptr<const parquet::LogicalType> lt = cd->logical_type(); + INFO << "column " << c << ": name=" << cd->name() << ", ToString=" << cd->ToString() << ", logical_type=" << lt->ToString() << dendl; + } + + INFO << "There are " << md->num_rows() << " rows and " << md->num_row_groups() << " row groups" << dendl; + for (int rg = 0; rg < md->num_row_groups(); ++rg) { + INFO << "Row Group " << rg << dendl; + auto rg_md = md->RowGroup(rg); + auto schema2 = rg_md->schema(); + } +} +#endif + +} // namespace rgw::flight |