diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/s3select/include/s3select_parquet_intrf.h | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/s3select/include/s3select_parquet_intrf.h')
-rw-r--r-- | src/s3select/include/s3select_parquet_intrf.h | 2079 |
1 files changed, 2079 insertions, 0 deletions
diff --git a/src/s3select/include/s3select_parquet_intrf.h b/src/s3select/include/s3select_parquet_intrf.h new file mode 100644 index 000000000..df04e1618 --- /dev/null +++ b/src/s3select/include/s3select_parquet_intrf.h @@ -0,0 +1,2079 @@ + +#pragma once + +#if ! __has_include (<arrow/api.h>) || ! __has_include (<arrow/io/api.h>) || !__has_include (<parquet/arrow/reader.h>) +# undef _ARROW_EXIST +#endif + +#ifdef _ARROW_EXIST + +#include <iostream> +#include <arrow/api.h> +#include <arrow/io/api.h> +#include <parquet/arrow/reader.h> +#include <parquet/arrow/writer.h> +#include <parquet/exception.h> +#include <set> +#include <parquet/column_reader.h> +#include <arrow/util/io_util.h> + +#include <arrow/io/interfaces.h> +#include <utility> + +#include <mutex> +#include <functional> + +#include "internal_file_decryptor.h" +#include "encryption_internal.h" + +#if ARROW_VERSION_MAJOR < 9 +#define _ARROW_FD fd_ +#define _ARROW_FD_TYPE int +#else +#define _ARROW_FD fd_.fd() +#define _ARROW_FD_TYPE arrow::internal::FileDescriptor +#endif + +/******************************************/ +/******************************************/ +class optional_yield; +namespace s3selectEngine { +class rgw_s3select_api { + + // global object for setting interface between RGW and parquet-reader + private: + + public: + + std::function<int(int64_t,int64_t,void*,optional_yield*)> range_req_fptr; + std::function<size_t(void)> get_size_fptr; + optional_yield *m_y; + + void set_range_req_api(std::function<int(int64_t,int64_t,void*,optional_yield*)> fp) + { + range_req_fptr = fp; + } + + void set_get_size_api(std::function<size_t(void)> fp) + { + get_size_fptr = fp; + } +}; +} + +/******************************************/ +/******************************************/ +/******************************************/ + +static constexpr uint8_t kParquetMagic[4] = {'P', 'A', 'R', '1'}; +static constexpr uint8_t kParquetEMagic[4] = {'P', 'A', 'R', 'E'}; +constexpr int kGcmTagLength = 16; + +namespace arrow { +namespace io { +namespace internal { + +ARROW_EXPORT void CloseFromDestructor(FileInterface* file); + +// Validate a (offset, size) region (as given to ReadAt) against +// the file size. Return the actual read size. +ARROW_EXPORT Result<int64_t> ValidateReadRange(int64_t offset, int64_t size, + int64_t file_size); +// Validate a (offset, size) region (as given to WriteAt) against +// the file size. Short writes are not allowed. +ARROW_EXPORT Status ValidateWriteRange(int64_t offset, int64_t size, int64_t file_size); + +// Validate a (offset, size) region (as given to ReadAt or WriteAt), without +// knowing the file size. +ARROW_EXPORT Status ValidateRange(int64_t offset, int64_t size); + +ARROW_EXPORT +std::vector<ReadRange> CoalesceReadRanges(std::vector<ReadRange> ranges, + int64_t hole_size_limit, + int64_t range_size_limit); + +ARROW_EXPORT +::arrow::internal::ThreadPool* GetIOThreadPool(); + +} // namespace internal +} // namespace io +} + + +// RGWimpl and OSFile implements the access to storage objects, OSFile(filesystem files) RGWimpl( ceph S3 ) +// ObjectInterface(temporary) is "empty base class" enables injections of access function to storage-objects +// ReadableFileImpl an implementation layer to ObjectInterface objects +// ReadableFile a layer which call to ReadableFileImpl, enable runtime switching between implementations +// ParquetFileReader is the main interface (underline implementation is transparent to this layer) +// + + +namespace arrow { +class Buffer; +namespace io { + +class ObjectInterface { + +#define NOT_IMPLEMENTED {std::cout << "not implemented" << std::endl;} + +//purpose: to implement the range-request from single object +public: + ObjectInterface() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {} + + virtual ~ObjectInterface(){} + + // Note: only one of the Open* methods below may be called on a given instance + + virtual Status OpenWritable(const std::string& path, bool truncate, bool append, bool write_only){return Status::OK();} + + // This is different from OpenWritable(string, ...) in that it doesn't + // truncate nor mandate a seekable file + virtual Status OpenWritable(int fd){return Status::OK();} + + virtual Status OpenReadable(const std::string& path){return Status::OK();} + + virtual Status OpenReadable(int fd){return Status::OK();} + + virtual Status CheckClosed() const {return Status::OK();} + + virtual Status Close(){return Status::OK();} + + virtual Result<int64_t> Read(int64_t nbytes, void* out){return Result<int64_t>(-1);} + + virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out){return Result<int64_t>(-1);} + + virtual Status Seek(int64_t pos){return Status::OK();} + + virtual Result<int64_t> Tell() const {return Result<int64_t>(-1);} + + virtual Status Write(const void* data, int64_t length){return Status::OK();} + + virtual int fd() const{return -1;} + + virtual bool is_open() const{return false;} + + virtual int64_t size() const{return -1;} + + virtual FileMode::type mode() const{return FileMode::READ;} + + #if 0 + std::mutex& lock(){} + #endif + + protected: + virtual Status SetFileName(const std::string& file_name){return Status::OK();} + + virtual Status SetFileName(int fd){return Status::OK();} + + virtual Status CheckPositioned(){return Status::OK();} + + ::arrow::internal::PlatformFilename file_name_; + + std::mutex lock_; + + // File descriptor + _ARROW_FD_TYPE fd_; + + FileMode::type mode_; + + bool is_open_; + int64_t size_; + // Whether ReadAt made the file position non-deterministic. + std::atomic<bool> need_seeking_; + +}; //ObjectInterface + +} //namespace io +} //namespace arrow + +namespace arrow { + +using internal::IOErrorFromErrno; + +namespace io { + +class OSFile : public ObjectInterface { + public: + OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {} + + ~OSFile() {} + + // Note: only one of the Open* methods below may be called on a given instance + + Status OpenWritable(const std::string& path, bool truncate, bool append, + bool write_only) override { + RETURN_NOT_OK(SetFileName(path)); + + ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, write_only, + truncate, append)); + is_open_ = true; + mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE; + + if (!truncate) { + ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(_ARROW_FD)); + } else { + size_ = 0; + } + return Status::OK(); + } + + // This is different from OpenWritable(string, ...) in that it doesn't + // truncate nor mandate a seekable file + Status OpenWritable(int fd) override { + auto result = ::arrow::internal::FileGetSize(fd); + if (result.ok()) { + size_ = *result; + } else { + // Non-seekable file + size_ = -1; + } + RETURN_NOT_OK(SetFileName(fd)); + is_open_ = true; + mode_ = FileMode::WRITE; + #if ARROW_VERSION_MAJOR < 9 + fd_ = fd; + #else + fd_ = arrow::internal::FileDescriptor{fd}; + #endif + return Status::OK(); + } + + Status OpenReadable(const std::string& path) override { + RETURN_NOT_OK(SetFileName(path)); + + ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_)); + ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(_ARROW_FD)); + + is_open_ = true; + mode_ = FileMode::READ; + return Status::OK(); + } + + Status OpenReadable(int fd) override { + ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd)); + RETURN_NOT_OK(SetFileName(fd)); + is_open_ = true; + mode_ = FileMode::READ; + #if ARROW_VERSION_MAJOR < 9 + fd_ = fd; + #else + fd_ = arrow::internal::FileDescriptor{fd}; + #endif + return Status::OK(); + } + + Status CheckClosed() const override { + if (!is_open_) { + return Status::Invalid("Invalid operation on closed file"); + } + return Status::OK(); + } + + Status Close() override { + if (is_open_) { + // Even if closing fails, the fd will likely be closed (perhaps it's + // already closed). + is_open_ = false; + #if ARROW_VERSION_MAJOR < 9 + int fd = fd_; + fd_ = -1; + RETURN_NOT_OK(::arrow::internal::FileClose(fd)); + #else + RETURN_NOT_OK(fd_.Close()); + #endif + } + return Status::OK(); + } + + Result<int64_t> Read(int64_t nbytes, void* out) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPositioned()); + return ::arrow::internal::FileRead(_ARROW_FD, reinterpret_cast<uint8_t*>(out), nbytes); + } + + Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(internal::ValidateRange(position, nbytes)); + // ReadAt() leaves the file position undefined, so require that we seek + // before calling Read() or Write(). + need_seeking_.store(true); + return ::arrow::internal::FileReadAt(_ARROW_FD, reinterpret_cast<uint8_t*>(out), position, + nbytes); + } + + Status Seek(int64_t pos) override { + RETURN_NOT_OK(CheckClosed()); + if (pos < 0) { + return Status::Invalid("Invalid position"); + } + Status st = ::arrow::internal::FileSeek(_ARROW_FD, pos); + if (st.ok()) { + need_seeking_.store(false); + } + return st; + } + + Result<int64_t> Tell() const override { + RETURN_NOT_OK(CheckClosed()); + return ::arrow::internal::FileTell(_ARROW_FD); + } + + Status Write(const void* data, int64_t length) override { + RETURN_NOT_OK(CheckClosed()); + + std::lock_guard<std::mutex> guard(lock_); + RETURN_NOT_OK(CheckPositioned()); + if (length < 0) { + return Status::IOError("Length must be non-negative"); + } + return ::arrow::internal::FileWrite(_ARROW_FD, reinterpret_cast<const uint8_t*>(data), + length); + } + + int fd() const override { return _ARROW_FD; } + + bool is_open() const override { return is_open_; } + + int64_t size() const override { return size_; } + + FileMode::type mode() const override { return mode_; } + + std::mutex& lock() { return lock_; } + + protected: + Status SetFileName(const std::string& file_name) override { + return ::arrow::internal::PlatformFilename::FromString(file_name).Value(&file_name_); + } + + Status SetFileName(int fd) override { + std::stringstream ss; + ss << "<fd " << fd << ">"; + return SetFileName(ss.str()); + } + + Status CheckPositioned() override { + if (need_seeking_.load()) { + return Status::Invalid( + "Need seeking after ReadAt() before " + "calling implicitly-positioned operation"); + } + return Status::OK(); + } + + ::arrow::internal::PlatformFilename file_name_; + + std::mutex lock_; + + // File descriptor + _ARROW_FD_TYPE fd_; + + FileMode::type mode_; + + bool is_open_; + int64_t size_; + // Whether ReadAt made the file position non-deterministic. + std::atomic<bool> need_seeking_; +}; +} // namespace io +} // namespace arrow + +namespace arrow { +class Buffer; +namespace io { + +class RGWimpl : public ObjectInterface { + +//purpose: to implement the range-request from single object +public: + RGWimpl(s3selectEngine::rgw_s3select_api* rgw) : fd_(-1), is_open_(false), size_(-1), need_seeking_(false),m_rgw_impl(rgw) {} + + ~RGWimpl(){} + +#define NOT_IMPLEMENT { \ + std::stringstream ss; \ + ss << " method " << __FUNCTION__ << " is not implemented;"; \ + throw parquet::ParquetException(ss.str()); \ + } + + // Note: only one of the Open* methods below may be called on a given instance + + Status OpenWritable(const std::string& path, bool truncate, bool append, bool write_only) { NOT_IMPLEMENT;return Status::OK(); } + + // This is different from OpenWritable(string, ...) in that it doesn't + // truncate nor mandate a seekable file + Status OpenWritable(int fd) {NOT_IMPLEMENT;return Status::OK(); } + + Status OpenReadable(const std::string& path) { + //RGW-implement + + RETURN_NOT_OK(SetFileName(path));//TODO can skip that + size_ = m_rgw_impl->get_size_fptr(); + + is_open_ = true; + mode_ = FileMode::READ; + return Status::OK(); + } + + Status OpenReadable(int fd) {NOT_IMPLEMENT;return Status::OK(); } + + Status CheckClosed() const { + //RGW-implement + if (!is_open_) { + return Status::Invalid("Invalid operation on closed file"); + } + return Status::OK(); + } + + Status Close() { + //RGW-implement + if (is_open_) { + // Even if closing fails, the fd will likely be closed (perhaps it's + // already closed). + is_open_ = false; + //int fd = fd_; + #if ARROW_VERSION_MAJOR < 9 + fd_ = -1; + #else + fd_.Close(); + #endif + //RETURN_NOT_OK(::arrow::internal::FileClose(fd)); + } + return Status::OK(); + } + + Result<int64_t> Read(int64_t nbytes, void* out) { + NOT_IMPLEMENT; + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPositioned()); + return ::arrow::internal::FileRead(_ARROW_FD, reinterpret_cast<uint8_t*>(out), nbytes); + } + + Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) { + + Result<int64_t> status = m_rgw_impl->range_req_fptr(position,nbytes,out,m_rgw_impl->m_y); + + return status; + } + + Status Seek(int64_t pos) {NOT_IMPLEMENT;return Status::OK(); } + + Result<int64_t> Tell() const { + NOT_IMPLEMENT; + return Result<int64_t>(0); + } + + Status Write(const void* data, int64_t length) { + NOT_IMPLEMENT; + return Status::OK(); + } + + int fd() const { return _ARROW_FD; } + + bool is_open() const { return is_open_; } + + int64_t size() const { return size_; } + + FileMode::type mode() const { return mode_; } + + std::mutex& lock() { return lock_; } //TODO skip + + protected: + Status SetFileName(const std::string& file_name) override { + return ::arrow::internal::PlatformFilename::FromString(file_name).Value(&file_name_); + } + + Status SetFileName(int fd) {NOT_IMPLEMENT; return Status::OK(); } + + Status CheckPositioned() {NOT_IMPLEMENT; return Status::OK(); } + + ::arrow::internal::PlatformFilename file_name_; + + std::mutex lock_; + + // File descriptor + _ARROW_FD_TYPE fd_; + + FileMode::type mode_; + + bool is_open_; + int64_t size_; + // Whether ReadAt made the file position non-deterministic. + std::atomic<bool> need_seeking_; + +private: + + s3selectEngine::rgw_s3select_api* m_rgw_impl; +}; + +} //namespace io +} //namespace arrow + +namespace arrow { + +class Buffer; +class MemoryPool; +class Status; + +namespace io { +namespace ceph { + +/// \brief An operating system file open in read-only mode. +/// +/// Reads through this implementation are unbuffered. If many small reads +/// need to be issued, it is recommended to use a buffering layer for good +/// performance. +class ARROW_EXPORT ReadableFile + : public internal::RandomAccessFileConcurrencyWrapper<ReadableFile> { + public: + ~ReadableFile() override; + + /// \brief Open a local file for reading + /// \param[in] path with UTF8 encoding + /// \param[in] pool a MemoryPool for memory allocations + /// \return ReadableFile instance + static Result<std::shared_ptr<ReadableFile>> Open( + const std::string& path,s3selectEngine::rgw_s3select_api* rgw,MemoryPool* pool = default_memory_pool()); + + /// \brief Open a local file for reading + /// \param[in] fd file descriptor + /// \param[in] pool a MemoryPool for memory allocations + /// \return ReadableFile instance + /// + /// The file descriptor becomes owned by the ReadableFile, and will be closed + /// on Close() or destruction. + static Result<std::shared_ptr<ReadableFile>> Open( + int fd, MemoryPool* pool = default_memory_pool()); + + bool closed() const override; + + int file_descriptor() const; + + Status WillNeed(const std::vector<ReadRange>& ranges) override; + + private: + friend RandomAccessFileConcurrencyWrapper<ReadableFile>; + + explicit ReadableFile(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw); + + Status DoClose(); + Result<int64_t> DoTell() const; + Result<int64_t> DoRead(int64_t nbytes, void* buffer); + Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes); + + /// \brief Thread-safe implementation of ReadAt + Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out); + + /// \brief Thread-safe implementation of ReadAt + Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes); + + Result<int64_t> DoGetSize(); + Status DoSeek(int64_t position); + + class ARROW_NO_EXPORT ReadableFileImpl; + std::unique_ptr<ReadableFileImpl> impl_; +}; + + +} // namespace ceph +} // namespace io +} // namespace arrow + +// ---------------------------------------------------------------------- +// ReadableFileImpl implementation + +namespace arrow { +namespace io { +namespace ceph { + +class ReadableFile::ReadableFileImpl : public ObjectInterface { + public: + + ~ReadableFileImpl() + { + if(IMPL != nullptr) + { + delete IMPL; + } + } + +#ifdef CEPH_USE_FS + explicit ReadableFileImpl(MemoryPool* pool) : pool_(pool) {IMPL=new OSFile();} +#endif + explicit ReadableFileImpl(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw) : pool_(pool) {IMPL=new RGWimpl(rgw);} + + Status Open(const std::string& path) { return IMPL->OpenReadable(path); } + + Status Open(int fd) { return IMPL->OpenReadable(fd); } + + Result<std::shared_ptr<Buffer>> ReadBuffer(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); + + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, IMPL->Read(nbytes, buffer->mutable_data())); + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + buffer->ZeroPadding(); + } + return buffer; + } + + Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_)); + + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, + IMPL->ReadAt(position, nbytes, buffer->mutable_data())); + if (bytes_read < nbytes) { + RETURN_NOT_OK(buffer->Resize(bytes_read)); + buffer->ZeroPadding(); + } + return buffer; + } + + Status WillNeed(const std::vector<ReadRange>& ranges) { + RETURN_NOT_OK(CheckClosed()); + for (const auto& range : ranges) { + RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length)); +#if defined(POSIX_FADV_WILLNEED) + if (posix_fadvise(_ARROW_FD, range.offset, range.length, POSIX_FADV_WILLNEED)) { + return IOErrorFromErrno(errno, "posix_fadvise failed"); + } +#elif defined(F_RDADVISE) // macOS, BSD? + struct { + off_t ra_offset; + int ra_count; + } radvisory{range.offset, static_cast<int>(range.length)}; + if (radvisory.ra_count > 0 && fcntl(_ARROW_FD, F_RDADVISE, &radvisory) == -1) { + return IOErrorFromErrno(errno, "fcntl(fd, F_RDADVISE, ...) failed"); + } +#endif + } + return Status::OK(); + } + + ObjectInterface *IMPL;//TODO to declare in ObjectInterface + + private: + + MemoryPool* pool_; + +}; + +// ReadableFile implemmetation +ReadableFile::ReadableFile(MemoryPool* pool,s3selectEngine::rgw_s3select_api* rgw) { impl_.reset(new ReadableFileImpl(pool,rgw)); } + +ReadableFile::~ReadableFile() { internal::CloseFromDestructor(this); } + +Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(const std::string& path, + s3selectEngine::rgw_s3select_api* rgw, + MemoryPool* pool + ) { + auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool,rgw)); + RETURN_NOT_OK(file->impl_->Open(path)); + return file; +} + +Result<std::shared_ptr<ReadableFile>> ReadableFile::Open(int fd, MemoryPool* pool) { + NOT_IMPLEMENT; + auto file = std::shared_ptr<ReadableFile>(new ReadableFile(pool,0)); + RETURN_NOT_OK(file->impl_->Open(fd)); + return file; +} + +Status ReadableFile::DoClose() { return impl_->Close(); } + +bool ReadableFile::closed() const { return !impl_->is_open(); } + +Status ReadableFile::WillNeed(const std::vector<ReadRange>& ranges) { + return impl_->WillNeed(ranges); +} + +Result<int64_t> ReadableFile::DoTell() const { return impl_->Tell(); } + +Result<int64_t> ReadableFile::DoRead(int64_t nbytes, void* out) { + return impl_->IMPL->Read(nbytes, out); +} + +Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void* out) { + return impl_->IMPL->ReadAt(position, nbytes, out); +} + +Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position, int64_t nbytes) { + return impl_->ReadBufferAt(position, nbytes); +} + +Result<std::shared_ptr<Buffer>> ReadableFile::DoRead(int64_t nbytes) { + return impl_->ReadBuffer(nbytes); +} + +Result<int64_t> ReadableFile::DoGetSize() { return impl_->IMPL->size(); } + +Status ReadableFile::DoSeek(int64_t pos) { return impl_->IMPL->Seek(pos); } + +int ReadableFile::file_descriptor() const { return impl_->IMPL->fd(); } + +} // namepace ceph +} // namespace io +} // namespace arrow + + +namespace parquet { + +class ColumnReader; +class FileMetaData; +class PageReader; +class RandomAccessSource; +class RowGroupMetaData; + +namespace ceph { +class PARQUET_EXPORT RowGroupReader { + public: + // Forward declare a virtual class 'Contents' to aid dependency injection and more + // easily create test fixtures + // An implementation of the Contents class is defined in the .cc file + struct Contents { + virtual ~Contents() {} + virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0; + virtual const RowGroupMetaData* metadata() const = 0; + virtual const ReaderProperties* properties() const = 0; + }; + + explicit RowGroupReader(std::unique_ptr<Contents> contents); + + // Returns the rowgroup metadata + const RowGroupMetaData* metadata() const; + + // Construct a ColumnReader for the indicated row group-relative + // column. Ownership is shared with the RowGroupReader. + std::shared_ptr<ColumnReader> Column(int i); + + std::unique_ptr<PageReader> GetColumnPageReader(int i); + + private: + // Holds a pointer to an instance of Contents implementation + std::unique_ptr<Contents> contents_; +}; + +class PARQUET_EXPORT ParquetFileReader { + public: + // Declare a virtual class 'Contents' to aid dependency injection and more + // easily create test fixtures + // An implementation of the Contents class is defined in the .cc file + struct PARQUET_EXPORT Contents { + static std::unique_ptr<Contents> Open( + std::shared_ptr<::arrow::io::RandomAccessFile> source, + const ReaderProperties& props = default_reader_properties(), + std::shared_ptr<FileMetaData> metadata = NULLPTR); + + virtual ~Contents() = default; + // Perform any cleanup associated with the file contents + virtual void Close() = 0; + virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i) = 0; + virtual std::shared_ptr<FileMetaData> metadata() const = 0; + }; + + ParquetFileReader(); + ~ParquetFileReader(); + + // Create a reader from some implementation of parquet-cpp's generic file + // input interface + // + // If you cannot provide exclusive access to your file resource, create a + // subclass of RandomAccessSource that wraps the shared resource + ARROW_DEPRECATED("Use arrow::io::RandomAccessFile version") + static std::unique_ptr<ParquetFileReader> Open( + std::unique_ptr<RandomAccessSource> source, + const ReaderProperties& props = default_reader_properties(), + std::shared_ptr<FileMetaData> metadata = NULLPTR); + + // Create a file reader instance from an Arrow file object. Thread-safety is + // the responsibility of the file implementation + static std::unique_ptr<ParquetFileReader> Open( + std::shared_ptr<::arrow::io::RandomAccessFile> source, + const ReaderProperties& props = default_reader_properties(), + std::shared_ptr<FileMetaData> metadata = NULLPTR); + + // API Convenience to open a serialized Parquet file on disk, using Arrow IO + // interfaces. + static std::unique_ptr<ParquetFileReader> OpenFile( + const std::string& path,s3selectEngine::rgw_s3select_api* rgw, bool memory_map = true, + const ReaderProperties& props = default_reader_properties(), + std::shared_ptr<FileMetaData> metadata = NULLPTR + ); + + void Open(std::unique_ptr<Contents> contents); + void Close(); + + // The RowGroupReader is owned by the FileReader + std::shared_ptr<RowGroupReader> RowGroup(int i); + + // Returns the file metadata. Only one instance is ever created + std::shared_ptr<FileMetaData> metadata() const; + + /// Pre-buffer the specified column indices in all row groups. + /// + /// Readers can optionally call this to cache the necessary slices + /// of the file in-memory before deserialization. Arrow readers can + /// automatically do this via an option. This is intended to + /// increase performance when reading from high-latency filesystems + /// (e.g. Amazon S3). + /// + /// After calling this, creating readers for row groups/column + /// indices that were not buffered may fail. Creating multiple + /// readers for the a subset of the buffered regions is + /// acceptable. This may be called again to buffer a different set + /// of row groups/columns. + /// + /// If memory usage is a concern, note that data will remain + /// buffered in memory until either \a PreBuffer() is called again, + /// or the reader itself is destructed. Reading - and buffering - + /// only one row group at a time may be useful. + void PreBuffer(const std::vector<int>& row_groups, + const std::vector<int>& column_indices, + const ::arrow::io::IOContext& ctx, + const ::arrow::io::CacheOptions& options); + + private: + // Holds a pointer to an instance of Contents implementation + std::unique_ptr<Contents> contents_; +}; + +// Read only Parquet file metadata +std::shared_ptr<FileMetaData> PARQUET_EXPORT +ReadMetaData(const std::shared_ptr<::arrow::io::RandomAccessFile>& source); + +/// \brief Scan all values in file. Useful for performance testing +/// \param[in] columns the column numbers to scan. If empty scans all +/// \param[in] column_batch_size number of values to read at a time when scanning column +/// \param[in] reader a ParquetFileReader instance +/// \return number of semantic rows in file +PARQUET_EXPORT +int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size, + ParquetFileReader* reader); + +}//namespace ceph +}//namespace parquet + + +namespace parquet { + +namespace ceph { + +// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file +static constexpr int64_t kDefaultFooterReadSize = 64 * 1024; +static constexpr uint32_t kFooterSize = 8; + +// For PARQUET-816 +static constexpr int64_t kMaxDictHeaderSize = 100; + +// ---------------------------------------------------------------------- +// RowGroupReader public API + +RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents) + : contents_(std::move(contents)) {} + +std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) { + if (i >= metadata()->num_columns()) { + std::stringstream ss; + ss << "Trying to read column index " << i << " but row group metadata has only " + << metadata()->num_columns() << " columns"; + throw ParquetException(ss.str()); + } + const ColumnDescriptor* descr = metadata()->schema()->Column(i); + + std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i); + return ColumnReader::Make( + descr, std::move(page_reader), + const_cast<ReaderProperties*>(contents_->properties())->memory_pool()); +} + +std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) { + if (i >= metadata()->num_columns()) { + std::stringstream ss; + ss << "Trying to read column index " << i << " but row group metadata has only " + << metadata()->num_columns() << " columns"; + throw ParquetException(ss.str()); + } + return contents_->GetColumnPageReader(i); +} + +// Returns the rowgroup metadata +const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); } + +/// Compute the section of the file that should be read for the given +/// row group and column chunk. +::arrow::io::ReadRange ComputeColumnChunkRange(FileMetaData* file_metadata, + int64_t source_size, int row_group_index, + int column_index) { + auto row_group_metadata = file_metadata->RowGroup(row_group_index); + auto column_metadata = row_group_metadata->ColumnChunk(column_index); + + int64_t col_start = column_metadata->data_page_offset(); + if (column_metadata->has_dictionary_page() && + column_metadata->dictionary_page_offset() > 0 && + col_start > column_metadata->dictionary_page_offset()) { + col_start = column_metadata->dictionary_page_offset(); + } + + int64_t col_length = column_metadata->total_compressed_size(); + // PARQUET-816 workaround for old files created by older parquet-mr + const ApplicationVersion& version = file_metadata->writer_version(); + if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION())) { + // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the + // dictionary page header size in total_compressed_size and total_uncompressed_size + // (see IMPALA-694). We add padding to compensate. + int64_t bytes_remaining = source_size - (col_start + col_length); + int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining); + col_length += padding; + } + + return {col_start, col_length}; +} + +// RowGroupReader::Contents implementation for the Parquet file specification +class SerializedRowGroup : public RowGroupReader::Contents { + public: + SerializedRowGroup(std::shared_ptr<ArrowInputFile> source, + std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source, + int64_t source_size, FileMetaData* file_metadata, + int row_group_number, const ReaderProperties& props, + std::shared_ptr<parquet::InternalFileDecryptor> file_decryptor = nullptr) + : source_(std::move(source)), + cached_source_(std::move(cached_source)), + source_size_(source_size), + file_metadata_(file_metadata), + properties_(props), + row_group_ordinal_(row_group_number), + file_decryptor_(file_decryptor) { + row_group_metadata_ = file_metadata->RowGroup(row_group_number); + } + + const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); } + + const ReaderProperties* properties() const override { return &properties_; } + + std::unique_ptr<PageReader> GetColumnPageReader(int i) override { + // Read column chunk from the file + auto col = row_group_metadata_->ColumnChunk(i); + + ::arrow::io::ReadRange col_range = + ComputeColumnChunkRange(file_metadata_, source_size_, row_group_ordinal_, i); + std::shared_ptr<ArrowInputStream> stream; + if (cached_source_) { + // PARQUET-1698: if read coalescing is enabled, read from pre-buffered + // segments. + PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range)); + stream = std::make_shared<::arrow::io::BufferReader>(buffer); + } else { + stream = properties_.GetStream(source_, col_range.offset, col_range.length); + } + + std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col->crypto_metadata(); + + // Column is encrypted only if crypto_metadata exists. + if (!crypto_metadata) { + return PageReader::Open(stream, col->num_values(), col->compression(), + properties_.memory_pool()); + } + + if (file_decryptor_ == nullptr) { + throw ParquetException("RowGroup is noted as encrypted but no file decryptor"); + } + + constexpr auto kEncryptedRowGroupsLimit = 32767; + if (i > kEncryptedRowGroupsLimit) { + throw ParquetException("Encrypted files cannot contain more than 32767 row groups"); + } + + // The column is encrypted + std::shared_ptr<::parquet::Decryptor> meta_decryptor; + std::shared_ptr<Decryptor> data_decryptor; + // The column is encrypted with footer key + if (crypto_metadata->encrypted_with_footer_key()) { + meta_decryptor = file_decryptor_->GetFooterDecryptorForColumnMeta(); + data_decryptor = file_decryptor_->GetFooterDecryptorForColumnData(); + + CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_, + static_cast<int16_t>(i), meta_decryptor, data_decryptor); + return PageReader::Open(stream, col->num_values(), col->compression(), + #if ARROW_VERSION_MAJOR > 8 + false, + #endif + properties_.memory_pool(), &ctx); + } + + // The column is encrypted with its own key + std::string column_key_metadata = crypto_metadata->key_metadata(); + const std::string column_path = crypto_metadata->path_in_schema()->ToDotString(); + + meta_decryptor = + file_decryptor_->GetColumnMetaDecryptor(column_path, column_key_metadata); + data_decryptor = + file_decryptor_->GetColumnDataDecryptor(column_path, column_key_metadata); + + CryptoContext ctx(col->has_dictionary_page(), row_group_ordinal_, + static_cast<int16_t>(i), meta_decryptor, data_decryptor); + return PageReader::Open(stream, col->num_values(), col->compression(), + #if ARROW_VERSION_MAJOR > 8 + false, + #endif + properties_.memory_pool(), &ctx); + } + + private: + std::shared_ptr<ArrowInputFile> source_; + // Will be nullptr if PreBuffer() is not called. + std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source_; + int64_t source_size_; + FileMetaData* file_metadata_; + std::unique_ptr<RowGroupMetaData> row_group_metadata_; + ReaderProperties properties_; + int row_group_ordinal_; + std::shared_ptr<InternalFileDecryptor> file_decryptor_; +}; + +// ---------------------------------------------------------------------- +// SerializedFile: An implementation of ParquetFileReader::Contents that deals +// with the Parquet file structure, Thrift deserialization, and other internal +// matters + +// This class takes ownership of the provided data source +class SerializedFile : public ParquetFileReader::Contents { + public: + SerializedFile(std::shared_ptr<ArrowInputFile> source, + const ReaderProperties& props = default_reader_properties()) + : source_(std::move(source)), properties_(props) { + PARQUET_ASSIGN_OR_THROW(source_size_, source_->GetSize()); + } + + ~SerializedFile() override { + try { + Close(); + } catch (...) { + } + } + + void Close() override { + if (file_decryptor_) file_decryptor_->WipeOutDecryptionKeys(); + } + + std::shared_ptr<RowGroupReader> GetRowGroup(int i) override { + std::unique_ptr<SerializedRowGroup> contents( + new SerializedRowGroup(source_, cached_source_, source_size_, + file_metadata_.get(), i, properties_, file_decryptor_)); + return std::make_shared<RowGroupReader>(std::move(contents)); + } + + std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; } + + void set_metadata(std::shared_ptr<FileMetaData> metadata) { + file_metadata_ = std::move(metadata); + } + + void PreBuffer(const std::vector<int>& row_groups, + const std::vector<int>& column_indices, + const ::arrow::io::IOContext& ctx, + const ::arrow::io::CacheOptions& options) { + cached_source_ = + std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, options); + //std::vector<arrow::io::ReadRange> ranges; + std::vector<::arrow::io::ReadRange> ranges; + for (int row : row_groups) { + for (int col : column_indices) { + ranges.push_back( + ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, col)); + } + } + PARQUET_THROW_NOT_OK(cached_source_->Cache(ranges)); + } + + void ParseMetaData() { + if (source_size_ == 0) { + throw ParquetInvalidOrCorruptedFileException("Parquet file size is 0 bytes"); + } else if (source_size_ < kFooterSize) { + throw ParquetInvalidOrCorruptedFileException( + "Parquet file size is ", source_size_, + " bytes, smaller than the minimum file footer (", kFooterSize, " bytes)"); + } + + int64_t footer_read_size = std::min(source_size_, kDefaultFooterReadSize); + PARQUET_ASSIGN_OR_THROW( + auto footer_buffer, + source_->ReadAt(source_size_ - footer_read_size, footer_read_size)); + + // Check if all bytes are read. Check if last 4 bytes read have the magic bits + if (footer_buffer->size() != footer_read_size || + (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetMagic, 4) != 0 && + memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) != 0)) { + throw ParquetInvalidOrCorruptedFileException( + "Parquet magic bytes not found in footer. Either the file is corrupted or this " + "is not a parquet file."); + } + + if (memcmp(footer_buffer->data() + footer_read_size - 4, kParquetEMagic, 4) == 0) { + // Encrypted file with Encrypted footer. + ParseMetaDataOfEncryptedFileWithEncryptedFooter(footer_buffer, footer_read_size); + return; + } + + // No encryption or encryption with plaintext footer mode. + std::shared_ptr<Buffer> metadata_buffer; + uint32_t metadata_len, read_metadata_len; + ParseUnencryptedFileMetadata(footer_buffer, footer_read_size, &metadata_buffer, + &metadata_len, &read_metadata_len); + + auto file_decryption_properties = properties_.file_decryption_properties().get(); + if (!file_metadata_->is_encryption_algorithm_set()) { // Non encrypted file. + if (file_decryption_properties != nullptr) { + if (!file_decryption_properties->plaintext_files_allowed()) { + throw ParquetException("Applying decryption properties on plaintext file"); + } + } + } else { + // Encrypted file with plaintext footer mode. + ParseMetaDataOfEncryptedFileWithPlaintextFooter( + file_decryption_properties, metadata_buffer, metadata_len, read_metadata_len); + } + } + + private: + std::shared_ptr<ArrowInputFile> source_; + std::shared_ptr<::arrow::io::internal::ReadRangeCache> cached_source_; + int64_t source_size_; + std::shared_ptr<FileMetaData> file_metadata_; + ReaderProperties properties_; + + std::shared_ptr<::parquet::InternalFileDecryptor> file_decryptor_; + + void ParseUnencryptedFileMetadata(const std::shared_ptr<Buffer>& footer_buffer, + int64_t footer_read_size, + std::shared_ptr<Buffer>* metadata_buffer, + uint32_t* metadata_len, uint32_t* read_metadata_len); + + std::string HandleAadPrefix(FileDecryptionProperties* file_decryption_properties, + EncryptionAlgorithm& algo); + + void ParseMetaDataOfEncryptedFileWithPlaintextFooter( + FileDecryptionProperties* file_decryption_properties, + const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len, + uint32_t read_metadata_len); + + void ParseMetaDataOfEncryptedFileWithEncryptedFooter( + const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size); +}; + +void SerializedFile::ParseUnencryptedFileMetadata( + const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size, + std::shared_ptr<Buffer>* metadata_buffer, uint32_t* metadata_len, + uint32_t* read_metadata_len) { + *metadata_len = ::arrow::util::SafeLoadAs<uint32_t>( + reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size - + kFooterSize); + int64_t metadata_start = source_size_ - kFooterSize - *metadata_len; + if (*metadata_len > source_size_ - kFooterSize) { + throw ParquetInvalidOrCorruptedFileException( + "Parquet file size is ", source_size_, + " bytes, smaller than the size reported by metadata (", metadata_len, "bytes)"); + } + + // Check if the footer_buffer contains the entire metadata + if (footer_read_size >= (*metadata_len + kFooterSize)) { + *metadata_buffer = SliceBuffer( + footer_buffer, footer_read_size - *metadata_len - kFooterSize, *metadata_len); + } else { + PARQUET_ASSIGN_OR_THROW(*metadata_buffer, + source_->ReadAt(metadata_start, *metadata_len)); + if ((*metadata_buffer)->size() != *metadata_len) { + throw ParquetException("Failed reading metadata buffer (requested " + + std::to_string(*metadata_len) + " bytes but got " + + std::to_string((*metadata_buffer)->size()) + " bytes)"); + } + } + + *read_metadata_len = *metadata_len; + file_metadata_ = FileMetaData::Make((*metadata_buffer)->data(), read_metadata_len); +} + +void SerializedFile::ParseMetaDataOfEncryptedFileWithEncryptedFooter( + const std::shared_ptr<Buffer>& footer_buffer, int64_t footer_read_size) { + // encryption with encrypted footer + // both metadata & crypto metadata length + uint32_t footer_len = ::arrow::util::SafeLoadAs<uint32_t>( + reinterpret_cast<const uint8_t*>(footer_buffer->data()) + footer_read_size - + kFooterSize); + int64_t crypto_metadata_start = source_size_ - kFooterSize - footer_len; + if (kFooterSize + footer_len > source_size_) { + throw ParquetInvalidOrCorruptedFileException( + "Parquet file size is ", source_size_, + " bytes, smaller than the size reported by footer's (", footer_len, "bytes)"); + } + std::shared_ptr<Buffer> crypto_metadata_buffer; + // Check if the footer_buffer contains the entire metadata + if (footer_read_size >= (footer_len + kFooterSize)) { + crypto_metadata_buffer = SliceBuffer( + footer_buffer, footer_read_size - footer_len - kFooterSize, footer_len); + } else { + PARQUET_ASSIGN_OR_THROW(crypto_metadata_buffer, + source_->ReadAt(crypto_metadata_start, footer_len)); + if (crypto_metadata_buffer->size() != footer_len) { + throw ParquetException("Failed reading encrypted metadata buffer (requested " + + std::to_string(footer_len) + " bytes but got " + + std::to_string(crypto_metadata_buffer->size()) + " bytes)"); + } + } + auto file_decryption_properties = properties_.file_decryption_properties().get(); + if (file_decryption_properties == nullptr) { + throw ParquetException( + "Could not read encrypted metadata, no decryption found in reader's properties"); + } + uint32_t crypto_metadata_len = footer_len; + std::shared_ptr<FileCryptoMetaData> file_crypto_metadata = + FileCryptoMetaData::Make(crypto_metadata_buffer->data(), &crypto_metadata_len); + // Handle AAD prefix + EncryptionAlgorithm algo = file_crypto_metadata->encryption_algorithm(); + std::string file_aad = HandleAadPrefix(file_decryption_properties, algo); + file_decryptor_ = std::make_shared<::parquet::InternalFileDecryptor>( + file_decryption_properties, file_aad, algo.algorithm, + file_crypto_metadata->key_metadata(), properties_.memory_pool()); + + int64_t metadata_offset = source_size_ - kFooterSize - footer_len + crypto_metadata_len; + uint32_t metadata_len = footer_len - crypto_metadata_len; + PARQUET_ASSIGN_OR_THROW(auto metadata_buffer, + source_->ReadAt(metadata_offset, metadata_len)); + if (metadata_buffer->size() != metadata_len) { + throw ParquetException("Failed reading metadata buffer (requested " + + std::to_string(metadata_len) + " bytes but got " + + std::to_string(metadata_buffer->size()) + " bytes)"); + } + + file_metadata_ = + FileMetaData::Make(metadata_buffer->data(), &metadata_len, file_decryptor_); + //FileMetaData::Make(metadata_buffer->data(), &metadata_len, default_reader_properties(), file_decryptor_); //version>9 +} + +void SerializedFile::ParseMetaDataOfEncryptedFileWithPlaintextFooter( + FileDecryptionProperties* file_decryption_properties, + const std::shared_ptr<Buffer>& metadata_buffer, uint32_t metadata_len, + uint32_t read_metadata_len) { + // Providing decryption properties in plaintext footer mode is not mandatory, for + // example when reading by legacy reader. + if (file_decryption_properties != nullptr) { + EncryptionAlgorithm algo = file_metadata_->encryption_algorithm(); + // Handle AAD prefix + std::string file_aad = HandleAadPrefix(file_decryption_properties, algo); + file_decryptor_ = std::make_shared<::parquet::InternalFileDecryptor>( + file_decryption_properties, file_aad, algo.algorithm, + file_metadata_->footer_signing_key_metadata(), properties_.memory_pool()); + // set the InternalFileDecryptor in the metadata as well, as it's used + // for signature verification and for ColumnChunkMetaData creation. +#if GAL_set_file_decryptor_declare_private + file_metadata_->set_file_decryptor(file_decryptor_); +#endif + if (file_decryption_properties->check_plaintext_footer_integrity()) { + if (metadata_len - read_metadata_len != + (parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength)) { + throw ParquetInvalidOrCorruptedFileException( + "Failed reading metadata for encryption signature (requested ", + parquet::encryption::kGcmTagLength + parquet::encryption::kNonceLength, + " bytes but have ", metadata_len - read_metadata_len, " bytes)"); + } + + if (!file_metadata_->VerifySignature(metadata_buffer->data() + read_metadata_len)) { + throw ParquetInvalidOrCorruptedFileException( + "Parquet crypto signature verification failed"); + } + } + } +} + +std::string SerializedFile::HandleAadPrefix( + FileDecryptionProperties* file_decryption_properties, EncryptionAlgorithm& algo) { + std::string aad_prefix_in_properties = file_decryption_properties->aad_prefix(); + std::string aad_prefix = aad_prefix_in_properties; + bool file_has_aad_prefix = algo.aad.aad_prefix.empty() ? false : true; + std::string aad_prefix_in_file = algo.aad.aad_prefix; + + if (algo.aad.supply_aad_prefix && aad_prefix_in_properties.empty()) { + throw ParquetException( + "AAD prefix used for file encryption, " + "but not stored in file and not supplied " + "in decryption properties"); + } + + if (file_has_aad_prefix) { + if (!aad_prefix_in_properties.empty()) { + if (aad_prefix_in_properties.compare(aad_prefix_in_file) != 0) { + throw ParquetException( + "AAD Prefix in file and in properties " + "is not the same"); + } + } + aad_prefix = aad_prefix_in_file; + std::shared_ptr<AADPrefixVerifier> aad_prefix_verifier = + file_decryption_properties->aad_prefix_verifier(); + if (aad_prefix_verifier != nullptr) aad_prefix_verifier->Verify(aad_prefix); + } else { + if (!algo.aad.supply_aad_prefix && !aad_prefix_in_properties.empty()) { + throw ParquetException( + "AAD Prefix set in decryption properties, but was not used " + "for file encryption"); + } + std::shared_ptr<AADPrefixVerifier> aad_prefix_verifier = + file_decryption_properties->aad_prefix_verifier(); + if (aad_prefix_verifier != nullptr) { + throw ParquetException( + "AAD Prefix Verifier is set, but AAD Prefix not found in file"); + } + } + return aad_prefix + algo.aad.aad_file_unique; +} + +// ---------------------------------------------------------------------- +// ParquetFileReader public API + +ParquetFileReader::ParquetFileReader() {} + +ParquetFileReader::~ParquetFileReader() { + try { + Close(); + } catch (...) { + } +} + +// Open the file. If no metadata is passed, it is parsed from the footer of +// the file +std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open( + std::shared_ptr<ArrowInputFile> source, const ReaderProperties& props, + std::shared_ptr<FileMetaData> metadata) { + std::unique_ptr<ParquetFileReader::Contents> result( + new SerializedFile(std::move(source), props)); + + // Access private methods here, but otherwise unavailable + SerializedFile* file = static_cast<SerializedFile*>(result.get()); + + if (metadata == nullptr) { + // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor + file->ParseMetaData(); + } else { + file->set_metadata(std::move(metadata)); + } + + return result; +} + +std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( + std::shared_ptr<::arrow::io::RandomAccessFile> source, const ReaderProperties& props, + std::shared_ptr<FileMetaData> metadata) { + auto contents = SerializedFile::Open(std::move(source), props, std::move(metadata)); + std::unique_ptr<ParquetFileReader> result(new ParquetFileReader()); + result->Open(std::move(contents)); + return result; +} + +#if GAL_NOT_IMPLEMENTED +std::unique_ptr<ParquetFileReader> ParquetFileReader::Open( + std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props, + std::shared_ptr<FileMetaData> metadata) { + auto wrapper = std::make_shared<ParquetInputWrapper>(std::move(source)); + return Open(std::move(wrapper), props, std::move(metadata)); +} +#endif + +std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile( + const std::string& path, s3selectEngine::rgw_s3select_api* rgw, bool memory_map, const ReaderProperties& props, + std::shared_ptr<FileMetaData> metadata) { + std::shared_ptr<::arrow::io::RandomAccessFile> source; + if (memory_map) { + PARQUET_ASSIGN_OR_THROW( + source, ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ));//GAL change that also, or to remove? + } else { + PARQUET_ASSIGN_OR_THROW(source, + ::arrow::io::ceph::ReadableFile::Open(path, rgw, props.memory_pool())); + } + + return Open(std::move(source), props, std::move(metadata)); +} + +void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) { + contents_ = std::move(contents); +} + +void ParquetFileReader::Close() { + if (contents_) { + contents_->Close(); + } +} + +std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const { + return contents_->metadata(); +} + +std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) { + if (i >= metadata()->num_row_groups()) { + std::stringstream ss; + ss << "Trying to read row group " << i << " but file only has " + << metadata()->num_row_groups() << " row groups"; + throw ParquetException(ss.str()); + } + return contents_->GetRowGroup(i); +} + +void ParquetFileReader::PreBuffer(const std::vector<int>& row_groups, + const std::vector<int>& column_indices, + const ::arrow::io::IOContext& ctx, + const ::arrow::io::CacheOptions& options) { + // Access private methods here + SerializedFile* file = + ::arrow::internal::checked_cast<SerializedFile*>(contents_.get()); + file->PreBuffer(row_groups, column_indices, ctx, options); +} + +// ---------------------------------------------------------------------- +// File metadata helpers + +std::shared_ptr<FileMetaData> ReadMetaData( + const std::shared_ptr<::arrow::io::RandomAccessFile>& source) { + return ParquetFileReader::Open(source)->metadata(); +} + +// ---------------------------------------------------------------------- +// File scanner for performance testing +#if GAL_ScanAllValues_is_no_declare +int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size, + ParquetFileReader* reader) { + std::vector<int16_t> rep_levels(column_batch_size); + std::vector<int16_t> def_levels(column_batch_size); + + int num_columns = static_cast<int>(columns.size()); + + // columns are not specified explicitly. Add all columns + if (columns.size() == 0) { + num_columns = reader->metadata()->num_columns(); + columns.resize(num_columns); + for (int i = 0; i < num_columns; i++) { + columns[i] = i; + } + } + + std::vector<int64_t> total_rows(num_columns, 0); + + for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) { + auto group_reader = reader->RowGroup(r); + int col = 0; + for (auto i : columns) { + std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i); + size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type()); + std::vector<uint8_t> values(column_batch_size * value_byte_size); + + int64_t values_read = 0; + while (col_reader->HasNext()) { + int64_t levels_read = + ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(), + values.data(), &values_read, col_reader.get()); + if (col_reader->descr()->max_repetition_level() > 0) { + for (int64_t i = 0; i < levels_read; i++) { + if (rep_levels[i] == 0) { + total_rows[col]++; + } + } + } else { + total_rows[col] += levels_read; + } + } + col++; + } + } + + for (int i = 1; i < num_columns; ++i) { + if (total_rows[0] != total_rows[i]) { + throw ParquetException("Parquet error: Total rows among columns do not match"); + } + } + + return total_rows[0]; +} +#endif + +} //namespace ceph +} //namespace parquet + +/******************************************/ +/******************************************/ +/******************************************/ +class column_reader_wrap +{ + +private: + + int64_t m_rownum; + parquet::Type::type m_type; + std::shared_ptr<parquet::ceph::RowGroupReader> m_row_group_reader; + int m_row_grouop_id; + uint16_t m_col_id; + parquet::ceph::ParquetFileReader* m_parquet_reader; + std::shared_ptr<parquet::ColumnReader> m_ColumnReader; + bool m_end_of_stream; + bool m_read_last_value; + + +public: + + enum class parquet_type + { + NA_TYPE, + STRING, + INT32, + INT64, + FLOAT, + DOUBLE, + TIMESTAMP, + PARQUET_NULL + }; + + struct parquet_value + { + int64_t num; + char *str; //str is pointing to offset in string which is NOT null terminated. + uint16_t str_len; + double dbl; + parquet_type type; + + parquet_value():type(parquet_type::NA_TYPE){} + }; + + typedef struct parquet_value parquet_value_t; + + enum class parquet_column_read_state {PARQUET_OUT_OF_RANGE,PARQUET_READ_OK}; + + private: + parquet_value_t m_last_value; + + public: + column_reader_wrap(std::unique_ptr<parquet::ceph::ParquetFileReader> & parquet_reader,uint16_t col_id); + + parquet::Type::type get_type(); + + bool HasNext();//TODO template + + int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, + parquet_value_t* values, int64_t* values_read); + + int64_t Skip(int64_t rows_to_skip); + + parquet_column_read_state Read(uint64_t rownum,parquet_value_t & value); + +}; + +class parquet_file_parser +{ + +public: + + typedef std::vector<std::pair<std::string, column_reader_wrap::parquet_type>> schema_t; + typedef std::set<uint16_t> column_pos_t; + typedef std::vector<column_reader_wrap::parquet_value_t> row_values_t; + + typedef column_reader_wrap::parquet_value_t parquet_value_t; + typedef column_reader_wrap::parquet_type parquet_type; + +private: + + std::string m_parquet_file_name; + uint32_t m_num_of_columms; + uint64_t m_num_of_rows; + uint64_t m_rownum; + schema_t m_schm; + int m_num_row_groups; + std::shared_ptr<parquet::FileMetaData> m_file_metadata; + std::unique_ptr<parquet::ceph::ParquetFileReader> m_parquet_reader; + std::vector<column_reader_wrap*> m_column_readers; + s3selectEngine::rgw_s3select_api* m_rgw_s3select_api; + + public: + + parquet_file_parser(std::string parquet_file_name,s3selectEngine::rgw_s3select_api* rgw_api) : + m_parquet_file_name(parquet_file_name), + m_num_of_columms(0), + m_num_of_rows(0), + m_rownum(0), + m_num_row_groups(0), + m_rgw_s3select_api(rgw_api) + + + { + load_meta_data(); + } + + ~parquet_file_parser() + { + for(auto r : m_column_readers) + { + delete r; + } + } + + int load_meta_data() + { + m_parquet_reader = parquet::ceph::ParquetFileReader::OpenFile(m_parquet_file_name,m_rgw_s3select_api,false); + m_file_metadata = m_parquet_reader->metadata(); + m_num_of_columms = m_parquet_reader->metadata()->num_columns(); + m_num_row_groups = m_file_metadata->num_row_groups(); + m_num_of_rows = m_file_metadata->num_rows(); + + for (uint32_t i = 0; i < m_num_of_columms; i++) + { + parquet::Type::type tp = m_file_metadata->schema()->Column(i)->physical_type(); + std::pair<std::string, column_reader_wrap::parquet_type> elm; + + switch (tp) + { + case parquet::Type::type::INT32: + elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::INT32); + m_schm.push_back(elm); + break; + + case parquet::Type::type::INT64: + elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::INT64); + m_schm.push_back(elm); + break; + + case parquet::Type::type::FLOAT: + elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::FLOAT); + m_schm.push_back(elm); + break; + + case parquet::Type::type::DOUBLE: + elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::DOUBLE); + m_schm.push_back(elm); + break; + + case parquet::Type::type::BYTE_ARRAY: + elm = std::pair<std::string, column_reader_wrap::parquet_type>(m_file_metadata->schema()->Column(i)->name(), column_reader_wrap::parquet_type::STRING); + m_schm.push_back(elm); + break; + + default: + { + std::stringstream err; + err << "some parquet type not supported"; + throw std::runtime_error(err.str()); + } + } + + m_column_readers.push_back(new column_reader_wrap(m_parquet_reader,i)); + } + + return 0; + } + + bool end_of_stream() + { + + if (m_rownum > (m_num_of_rows-1)) + return true; + return false; + } + + uint64_t get_number_of_rows() + { + return m_num_of_rows; + } + + uint64_t rownum() + { + return m_rownum; + } + + bool increase_rownum() + { + if (end_of_stream()) + return false; + + m_rownum++; + return true; + } + + uint64_t get_rownum() + { + return m_rownum; + } + + uint32_t get_num_of_columns() + { + return m_num_of_columms; + } + + int get_column_values_by_positions(column_pos_t positions, row_values_t &row_values) + { + column_reader_wrap::parquet_value_t column_value; + row_values.clear(); + + for(auto col : positions) + { + if((col)>=m_num_of_columms) + {//TODO should verified upon syntax phase + //TODO throw exception + return -1; + } + auto status = m_column_readers[col]->Read(m_rownum,column_value); + if(status == column_reader_wrap::parquet_column_read_state::PARQUET_OUT_OF_RANGE) return -1; + row_values.push_back(column_value);//TODO intensive (should move) + } + return 0; + } + + schema_t get_schema() + { + return m_schm; + } +}; + +/******************************************/ + + + column_reader_wrap::column_reader_wrap(std::unique_ptr<parquet::ceph::ParquetFileReader> & parquet_reader,uint16_t col_id): + m_rownum(-1), + m_type(parquet::Type::type::UNDEFINED), + m_row_grouop_id(0), + m_col_id(col_id), + m_end_of_stream(false), + m_read_last_value(false) + { + m_parquet_reader = parquet_reader.get(); + m_row_group_reader = m_parquet_reader->RowGroup(m_row_grouop_id); + m_ColumnReader = m_row_group_reader->Column(m_col_id); + } + + parquet::Type::type column_reader_wrap::get_type() + {//TODO if UNDEFINED + return m_parquet_reader->metadata()->schema()->Column(m_col_id)->physical_type(); + } + + bool column_reader_wrap::HasNext()//TODO template + { + parquet::Int32Reader* int32_reader; + parquet::Int64Reader* int64_reader; + parquet::FloatReader* float_reader; + parquet::DoubleReader* double_reader; + parquet::ByteArrayReader* byte_array_reader; + + switch (get_type()) + { + case parquet::Type::type::INT32: + int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get()); + return int32_reader->HasNext(); + break; + + case parquet::Type::type::INT64: + int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get()); + return int64_reader->HasNext(); + break; + + case parquet::Type::type::FLOAT: + float_reader = static_cast<parquet::FloatReader *>(m_ColumnReader.get()); + return float_reader->HasNext(); + break; + + case parquet::Type::type::DOUBLE: + double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get()); + return double_reader->HasNext(); + break; + + case parquet::Type::type::BYTE_ARRAY: + byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get()); + return byte_array_reader->HasNext(); + break; + + default: + + std::stringstream err; + err << "HasNext():" << "wrong type or type not exist" << std::endl; + throw std::runtime_error(err.str()); + + return false; + //TODO throw exception + } + + return false; + } + + int64_t column_reader_wrap::ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, + parquet_value_t* values, int64_t* values_read) + { + parquet::Int32Reader* int32_reader; + parquet::Int64Reader* int64_reader; + parquet::FloatReader* float_reader; + parquet::DoubleReader* double_reader; + parquet::ByteArrayReader* byte_array_reader; + + parquet::ByteArray str_value; + int64_t rows_read; + int32_t i32_val; + + auto error_msg = [&](std::exception &e) + { + std::stringstream err; + err << "what() :" << e.what() << std::endl; + err << "failed to parse column id:" << this->m_col_id << " name:" <<this->m_parquet_reader->metadata()->schema()->Column(m_col_id)->name(); + return err; + }; + int16_t defintion_level; + int16_t repeat_level; + + switch (get_type()) + { + case parquet::Type::type::INT32: + int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get()); + try { + rows_read = int32_reader->ReadBatch(1, &defintion_level, &repeat_level, &i32_val , values_read); + if(defintion_level == 0) + { + values->type = column_reader_wrap::parquet_type::PARQUET_NULL; + } else + { + values->num = i32_val; + values->type = column_reader_wrap::parquet_type::INT32; + } + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + + break; + + case parquet::Type::type::INT64: + int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get()); + try{ + rows_read = int64_reader->ReadBatch(1, &defintion_level, &repeat_level, (int64_t *)&(values->num), values_read); + if(defintion_level == 0) + { + values->type = column_reader_wrap::parquet_type::PARQUET_NULL; + } else + { + auto logical_type = m_parquet_reader->metadata()->schema()->Column(m_col_id)->logical_type(); + + if (logical_type.get()->type() == parquet::LogicalType::Type::type::TIMESTAMP) //TODO missing sub-type (milli,micro) + values->type = column_reader_wrap::parquet_type::TIMESTAMP; + else + values->type = column_reader_wrap::parquet_type::INT64; + } + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + case parquet::Type::type::FLOAT: + float_reader = static_cast<parquet::FloatReader *>(m_ColumnReader.get()); + try{ + float data_source_float = 0; + rows_read = float_reader->ReadBatch(1, &defintion_level, &repeat_level, &data_source_float , values_read);//TODO proper cast + if(defintion_level == 0) + { + values->type = column_reader_wrap::parquet_type::PARQUET_NULL; + } else + { + values->type = column_reader_wrap::parquet_type::DOUBLE; + values->dbl = data_source_float; + + } + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + case parquet::Type::type::DOUBLE: + double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get()); + try{ + rows_read = double_reader->ReadBatch(1, &defintion_level, &repeat_level, (double *)&(values->dbl), values_read); + if(defintion_level == 0) + { + values->type = column_reader_wrap::parquet_type::PARQUET_NULL; + } else + { + values->type = column_reader_wrap::parquet_type::DOUBLE; + } + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + case parquet::Type::type::BYTE_ARRAY: + byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get()); + try{ + rows_read = byte_array_reader->ReadBatch(1, &defintion_level, &repeat_level, &str_value , values_read); + if(defintion_level == 0) + { + values->type = column_reader_wrap::parquet_type::PARQUET_NULL; + } else + { + values->type = column_reader_wrap::parquet_type::STRING; + values->str = (char*)str_value.ptr; + values->str_len = str_value.len; + } + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + default: + { + std::stringstream err; + err << "wrong type" << std::endl; + throw std::runtime_error(err.str()); + } + + } + + return rows_read; + } + + int64_t column_reader_wrap::Skip(int64_t rows_to_skip) + { + parquet::Int32Reader* int32_reader; + parquet::Int64Reader* int64_reader; + parquet::DoubleReader* double_reader; + parquet::FloatReader* float_reader; + parquet::ByteArrayReader* byte_array_reader; + + parquet::ByteArray str_value; + int64_t rows_read; + + auto error_msg = [&](std::exception &e) + { + std::stringstream err; + err << "what() :" << e.what() << std::endl; + err << "failed to parse column id:" << this->m_col_id << " name:" <<this->m_parquet_reader->metadata()->schema()->Column(m_col_id)->name(); + return err; + }; + + switch (get_type()) + { + case parquet::Type::type::INT32: + int32_reader = static_cast<parquet::Int32Reader *>(m_ColumnReader.get()); + try{ + rows_read = int32_reader->Skip(rows_to_skip); + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + case parquet::Type::type::INT64: + int64_reader = static_cast<parquet::Int64Reader *>(m_ColumnReader.get()); + try{ + rows_read = int64_reader->Skip(rows_to_skip); + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + case parquet::Type::type::FLOAT: + float_reader = static_cast<parquet::FloatReader *>(m_ColumnReader.get()); + try { + rows_read = float_reader->Skip(rows_to_skip); + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + case parquet::Type::type::DOUBLE: + double_reader = static_cast<parquet::DoubleReader *>(m_ColumnReader.get()); + try { + rows_read = double_reader->Skip(rows_to_skip); + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + case parquet::Type::type::BYTE_ARRAY: + byte_array_reader = static_cast<parquet::ByteArrayReader *>(m_ColumnReader.get()); + try{ + rows_read = byte_array_reader->Skip(rows_to_skip); + } + catch(std::exception &e) + { + throw std::runtime_error(error_msg(e).str()); + } + break; + + default: + { + std::stringstream err; + err << "wrong type" << std::endl; + throw std::runtime_error(err.str()); + } + } + + return rows_read; + } + + + column_reader_wrap::parquet_column_read_state column_reader_wrap::Read(const uint64_t rownum,parquet_value_t & value) + { + int64_t values_read = 0; + + if (m_rownum < (int64_t)rownum) + { //should skip + m_read_last_value = false; + + //TODO what about Skip(0) + uint64_t skipped_rows = Skip(rownum - m_rownum -1); + m_rownum += skipped_rows; + + while (((m_rownum+1) < (int64_t)rownum) || HasNext() == false) + { + uint64_t skipped_rows = Skip(rownum - m_rownum -1); + m_rownum += skipped_rows; + + if (HasNext() == false) + { + if ((m_row_grouop_id + 1) >= m_parquet_reader->metadata()->num_row_groups()) + { + m_end_of_stream = true; + return column_reader_wrap::parquet_column_read_state::PARQUET_OUT_OF_RANGE;//end-of-stream + } + else + { + m_row_grouop_id++; + m_row_group_reader = m_parquet_reader->RowGroup(m_row_grouop_id); + m_ColumnReader = m_row_group_reader->Column(m_col_id); + } + } + } //end-while + + ReadBatch(1, nullptr, nullptr, &m_last_value, &values_read); + m_read_last_value = true; + m_rownum++; + value = m_last_value; + } + else + { + if (m_read_last_value == false) + { + ReadBatch(1, nullptr, nullptr, &m_last_value, &values_read); + m_read_last_value = true; + m_rownum++; + } + + value = m_last_value; + } + + return column_reader_wrap::parquet_column_read_state::PARQUET_READ_OK; + } + +#endif + |