/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include G_BEGIN_DECLS /** * SECTION: input-stream * @section_id: input-stream-classes * @title: Input stream classes * @include: arrow-glib/arrow-glib.h * * #GArrowInputStream is a base class for input stream. * * #GArrowSeekableInputStream is a base class for input stream that * supports random access. * * #GArrowBufferInputStream is a class to read data on buffer. * * #GArrowFileInputStream is a class to read data in file. * * #GArrowMemoryMappedInputStream is a class to read data in file by * mapping the file on memory. It supports zero copy. * * #GArrowGIOInputStream is a class for `GInputStream` based input * stream. * * #GArrowCompressedInputStream is a class to read data from * compressed input stream. */ typedef struct GArrowInputStreamPrivate_ { std::shared_ptr input_stream; } GArrowInputStreamPrivate; enum { PROP_INPUT_STREAM = 1 }; static std::shared_ptr garrow_input_stream_get_raw_file_interface(GArrowFile *file) { auto input_stream = GARROW_INPUT_STREAM(file); auto arrow_input_stream = garrow_input_stream_get_raw(input_stream); return arrow_input_stream; } static void garrow_input_stream_file_interface_init(GArrowFileInterface *iface) { iface->get_raw = garrow_input_stream_get_raw_file_interface; } static std::shared_ptr garrow_input_stream_get_raw_readable_interface(GArrowReadable *readable) { auto input_stream = GARROW_INPUT_STREAM(readable); auto arrow_input_stream = garrow_input_stream_get_raw(input_stream); return arrow_input_stream; } static void garrow_input_stream_readable_interface_init(GArrowReadableInterface *iface) { iface->get_raw = garrow_input_stream_get_raw_readable_interface; } G_DEFINE_TYPE_WITH_CODE(GArrowInputStream, garrow_input_stream, G_TYPE_INPUT_STREAM, G_ADD_PRIVATE(GArrowInputStream) G_IMPLEMENT_INTERFACE(GARROW_TYPE_FILE, garrow_input_stream_file_interface_init) G_IMPLEMENT_INTERFACE(GARROW_TYPE_READABLE, garrow_input_stream_readable_interface_init)) #define GARROW_INPUT_STREAM_GET_PRIVATE(obj) \ static_cast( \ garrow_input_stream_get_instance_private( \ GARROW_INPUT_STREAM(obj))) static void garrow_input_stream_finalize(GObject *object) { auto priv = GARROW_INPUT_STREAM_GET_PRIVATE(object); priv->input_stream.~shared_ptr(); G_OBJECT_CLASS(garrow_input_stream_parent_class)->finalize(object); } static void garrow_input_stream_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GARROW_INPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_INPUT_STREAM: priv->input_stream = *static_cast *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_input_stream_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { switch (prop_id) { default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static gssize garrow_input_stream_read(GInputStream *stream, void *buffer, gsize count, GCancellable *cancellable, GError **error) { if (g_cancellable_set_error_if_cancelled(cancellable, error)) { return -1; } auto arrow_input_stream = garrow_input_stream_get_raw(GARROW_INPUT_STREAM(stream)); auto n_read_bytes = arrow_input_stream->Read(count, buffer); if (!garrow::check(error, n_read_bytes, "[input-stream][read]")) { return -1; } return n_read_bytes.ValueOrDie(); } static gssize garrow_input_stream_skip(GInputStream *stream, gsize count, GCancellable *cancellable, GError **error) { if (g_cancellable_set_error_if_cancelled(cancellable, error)) { return -1; } auto arrow_input_stream = garrow_input_stream_get_raw(GARROW_INPUT_STREAM(stream)); auto status = arrow_input_stream->Advance(count); if (!garrow_error_check(error, status, "[input-stream][skip]")) { return -1; } return count; } static gboolean garrow_input_stream_close(GInputStream *stream, GCancellable *cancellable, GError **error) { if (g_cancellable_set_error_if_cancelled(cancellable, error)) { return FALSE; } auto arrow_input_stream = garrow_input_stream_get_raw(GARROW_INPUT_STREAM(stream)); auto status = arrow_input_stream->Close(); return garrow_error_check(error, status, "[input-stream][close]"); } static void garrow_input_stream_init(GArrowInputStream *object) { auto priv = GARROW_INPUT_STREAM_GET_PRIVATE(object); new(&priv->input_stream) std::shared_ptr; } static void garrow_input_stream_class_init(GArrowInputStreamClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->finalize = garrow_input_stream_finalize; gobject_class->set_property = garrow_input_stream_set_property; gobject_class->get_property = garrow_input_stream_get_property; auto input_stream_class = G_INPUT_STREAM_CLASS(klass); input_stream_class->read_fn = garrow_input_stream_read; input_stream_class->skip = garrow_input_stream_skip; input_stream_class->close_fn = garrow_input_stream_close; GParamSpec *spec; spec = g_param_spec_pointer("input-stream", "Input stream", "The raw std::shared *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_INPUT_STREAM, spec); } /** * garrow_input_stream_advance: * @input_stream: A #GArrowInputStream. * @n_bytes: The number of bytes to be advanced. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: %TRUE on success, %FALSE on error. * * Since: 0.11.0 */ gboolean garrow_input_stream_advance(GArrowInputStream *input_stream, gint64 n_bytes, GError **error) { auto arrow_input_stream = garrow_input_stream_get_raw(input_stream); auto status = arrow_input_stream->Advance(n_bytes); return garrow_error_check(error, status, "[input-stream][advance]"); } /** * garrow_input_stream_align: * @input_stream: A #GArrowInputStream. * @alignment: The byte multiple for the metadata prefix, usually 8 * or 64, to ensure the body starts on a multiple of that alignment. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: %TRUE on success, %FALSE on error. * * Since: 0.11.0 */ gboolean garrow_input_stream_align(GArrowInputStream *input_stream, gint32 alignment, GError **error) { auto arrow_input_stream = garrow_input_stream_get_raw(input_stream); auto status = arrow::ipc::AlignStream(arrow_input_stream.get(), alignment); return garrow_error_check(error, status, "[input-stream][align]"); } /** * garrow_input_stream_read_tensor: * @input_stream: A #GArrowInputStream. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (transfer full) (nullable): * #GArrowTensor on success, %NULL on error. * * Since: 0.11.0 */ GArrowTensor * garrow_input_stream_read_tensor(GArrowInputStream *input_stream, GError **error) { auto arrow_input_stream = garrow_input_stream_get_raw(input_stream); auto arrow_tensor = arrow::ipc::ReadTensor(arrow_input_stream.get()); if (garrow::check(error, arrow_tensor, "[input-stream][read-tensor]")) { return garrow_tensor_new_raw(&(arrow_tensor.ValueOrDie())); } else { return NULL; } } /** * garrow_input_stream_read_record_batch: * @input_stream: A #GArrowInputStream. * @schema: A #GArrowSchema for a read record batch. * @options: (nullable): A #GArrowReadOptions. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (transfer full) (nullable): * #GArrowRecordBatch on success, %NULL on error. * * Since: 1.0.0 */ GArrowRecordBatch * garrow_input_stream_read_record_batch(GArrowInputStream *input_stream, GArrowSchema *schema, GArrowReadOptions *options, GError **error) { auto arrow_input_stream = garrow_input_stream_get_raw(input_stream); auto arrow_schema = garrow_schema_get_raw(schema); if (options) { auto arrow_options = garrow_read_options_get_raw(options); auto arrow_dictionary_memo = garrow_read_options_get_dictionary_memo_raw(options); auto arrow_record_batch = arrow::ipc::ReadRecordBatch(arrow_schema, arrow_dictionary_memo, *arrow_options, arrow_input_stream.get()); if (garrow::check(error, arrow_record_batch, "[input-stream][read-record-batch]")) { return garrow_record_batch_new_raw(&(*arrow_record_batch)); } else { return NULL; } } else { auto arrow_options = arrow::ipc::IpcReadOptions::Defaults(); auto arrow_record_batch = arrow::ipc::ReadRecordBatch(arrow_schema, nullptr, arrow_options, arrow_input_stream.get()); if (garrow::check(error, arrow_record_batch, "[input-stream][read-record-batch]")) { return garrow_record_batch_new_raw(&(*arrow_record_batch)); } else { return NULL; } } } G_DEFINE_TYPE(GArrowSeekableInputStream, garrow_seekable_input_stream, GARROW_TYPE_INPUT_STREAM); static void garrow_seekable_input_stream_init(GArrowSeekableInputStream *object) { } static void garrow_seekable_input_stream_class_init(GArrowSeekableInputStreamClass *klass) { } /** * garrow_seekable_input_stream_get_size: * @input_stream: A #GArrowSeekableInputStream. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: The size of the file. */ guint64 garrow_seekable_input_stream_get_size(GArrowSeekableInputStream *input_stream, GError **error) { auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(input_stream); auto size = arrow_random_access_file->GetSize(); if (garrow::check(error, size, "[seekable-input-stream][get-size]")) { return size.ValueOrDie(); } else { return 0; } } /** * garrow_seekable_input_stream_get_support_zero_copy: * @input_stream: A #GArrowSeekableInputStream. * * Returns: Whether zero copy read is supported or not. */ gboolean garrow_seekable_input_stream_get_support_zero_copy(GArrowSeekableInputStream *input_stream) { auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(input_stream); return arrow_random_access_file->supports_zero_copy(); } /** * garrow_seekable_input_stream_read_at: * @input_stream: A #GArrowSeekableInputStream. * @position: The read start position. * @n_bytes: The number of bytes to be read. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (transfer full) (nullable): #GArrowBuffer that has read * data on success, %NULL if there was an error. */ GArrowBuffer * garrow_seekable_input_stream_read_at(GArrowSeekableInputStream *input_stream, gint64 position, gint64 n_bytes, GError **error) { auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(input_stream); auto arrow_buffer = arrow_random_access_file->ReadAt(position, n_bytes); if (garrow::check(error, arrow_buffer, "[seekable-input-stream][read-at]")) { return garrow_buffer_new_raw(&(arrow_buffer.ValueOrDie())); } else { return NULL; } } /** * garrow_seekable_input_stream_read_at_bytes: * @input_stream: A #GArrowSeekableInputStream. * @position: The read start position. * @n_bytes: The number of bytes to be read. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (transfer full) (nullable): #GBytes that has read data on * success, %NULL if there was an error. * * Since: 0.15.0 */ GBytes * garrow_seekable_input_stream_read_at_bytes(GArrowSeekableInputStream *input_stream, gint64 position, gint64 n_bytes, GError **error) { auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(input_stream); auto arrow_buffer_result = arrow_random_access_file->ReadAt(position, n_bytes); if (!garrow::check(error, arrow_buffer_result, "[seekable-input-stream][read-at][bytes]")) { return NULL; } auto arrow_cpu_buffer_result = arrow::Buffer::ViewOrCopy(*arrow_buffer_result, arrow::default_cpu_memory_manager()); if (!garrow::check(error, arrow_cpu_buffer_result, "[seekable-input-stream][read-at][bytes][view-or-copy]")) { return NULL; } auto arrow_cpu_buffer = *arrow_cpu_buffer_result; return g_bytes_new(arrow_cpu_buffer->data(), arrow_cpu_buffer->size()); } /** * garrow_seekable_input_stream_peek: * @input_stream: A #GArrowSeekableInputStream. * @n_bytes: The number of bytes to be peeked. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (transfer full): The data of the buffer, up to the * indicated number. The data becomes invalid after any operation on * the stream. If the stream is unbuffered, the data is empty. * * It should be freed with g_bytes_unref() when no longer needed. * * Since: 0.12.0 */ GBytes * garrow_seekable_input_stream_peek(GArrowSeekableInputStream *input_stream, gint64 n_bytes, GError **error) { auto arrow_random_access_file = garrow_seekable_input_stream_get_raw(input_stream); auto view_result = arrow_random_access_file->Peek(n_bytes); if (garrow::check(error, view_result, "[seekable-input-stream][peek]")) { auto view = view_result.ValueOrDie(); return g_bytes_new_static(view.data(), view.size()); } else { return NULL; } } typedef struct GArrowBufferInputStreamPrivate_ { GArrowBuffer *buffer; } GArrowBufferInputStreamPrivate; enum { PROP_BUFFER = 1, }; G_DEFINE_TYPE_WITH_PRIVATE(GArrowBufferInputStream, garrow_buffer_input_stream, GARROW_TYPE_SEEKABLE_INPUT_STREAM); #define GARROW_BUFFER_INPUT_STREAM_GET_PRIVATE(obj) \ static_cast( \ garrow_buffer_input_stream_get_instance_private( \ GARROW_BUFFER_INPUT_STREAM(obj))) static void garrow_buffer_input_stream_dispose(GObject *object) { auto priv = GARROW_BUFFER_INPUT_STREAM_GET_PRIVATE(object); if (priv->buffer) { g_object_unref(priv->buffer); priv->buffer = nullptr; } G_OBJECT_CLASS(garrow_buffer_input_stream_parent_class)->dispose(object); } static void garrow_buffer_input_stream_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GARROW_BUFFER_INPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_BUFFER: priv->buffer = GARROW_BUFFER(g_value_dup_object(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_buffer_input_stream_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { auto priv = GARROW_BUFFER_INPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_BUFFER: g_value_set_object(value, priv->buffer); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_buffer_input_stream_init(GArrowBufferInputStream *object) { } static void garrow_buffer_input_stream_class_init(GArrowBufferInputStreamClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->dispose = garrow_buffer_input_stream_dispose; gobject_class->set_property = garrow_buffer_input_stream_set_property; gobject_class->get_property = garrow_buffer_input_stream_get_property; GParamSpec *spec; spec = g_param_spec_object("buffer", "Buffer", "The data", GARROW_TYPE_BUFFER, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_BUFFER, spec); } /** * garrow_buffer_input_stream_new: * @buffer: The buffer to be read. * * Returns: A newly created #GArrowBufferInputStream. */ GArrowBufferInputStream * garrow_buffer_input_stream_new(GArrowBuffer *buffer) { auto arrow_buffer = garrow_buffer_get_raw(buffer); auto arrow_buffer_reader = std::make_shared(arrow_buffer); return garrow_buffer_input_stream_new_raw(&arrow_buffer_reader, buffer); } /** * garrow_buffer_input_stream_get_buffer: * @input_stream: A #GArrowBufferInputStream. * * Returns: (transfer full): The data of the stream as #GArrowBuffer. */ GArrowBuffer * garrow_buffer_input_stream_get_buffer(GArrowBufferInputStream *input_stream) { auto priv = GARROW_BUFFER_INPUT_STREAM_GET_PRIVATE(input_stream); if (priv->buffer) { g_object_ref(priv->buffer); return priv->buffer; } auto arrow_buffer_reader = garrow_buffer_input_stream_get_raw(input_stream); auto arrow_buffer = arrow_buffer_reader->buffer(); return garrow_buffer_new_raw(&arrow_buffer); } G_DEFINE_TYPE(GArrowFileInputStream, garrow_file_input_stream, GARROW_TYPE_SEEKABLE_INPUT_STREAM); static void garrow_file_input_stream_init(GArrowFileInputStream *object) { } static void garrow_file_input_stream_class_init(GArrowFileInputStreamClass *klass) { } /** * garrow_file_input_stream_new: * @path: The path of the file to be opened. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (nullable): A newly created #GArrowFileInputStream * or %NULL on error. * * Since: 6.0.0 */ GArrowFileInputStream * garrow_file_input_stream_new(const gchar *path, GError **error) { auto arrow_stream_result = arrow::io::ReadableFile::Open(path); if (garrow::check(error, arrow_stream_result, "[file-input-stream][new]")) { auto arrow_stream = *arrow_stream_result; return garrow_file_input_stream_new_raw(&arrow_stream); } else { return NULL; } } /** * garrow_file_input_stream_new_file_descriptor: * @file_descriptor: The file descriptor of this input stream. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (nullable): A newly created #GArrowFileInputStream * or %NULL on error. * * Since: 6.0.0 */ GArrowFileInputStream * garrow_file_input_stream_new_file_descriptor(gint file_descriptor, GError **error) { auto arrow_stream_result = arrow::io::ReadableFile::Open(file_descriptor); if (garrow::check(error, arrow_stream_result, "[file-input-stream][new-file-descriptor]")) { auto arrow_stream = *arrow_stream_result; return garrow_file_input_stream_new_raw(&arrow_stream); } else { return NULL; } } /** * garrow_file_input_stream_get_file_descriptor: * @stream: A #GArrowFileInuptStream. * * Returns: The file descriptor of @stream. * * Since: 6.0.0 */ gint garrow_file_input_stream_get_file_descriptor(GArrowFileInputStream *stream) { auto arrow_stream = std::static_pointer_cast( garrow_input_stream_get_raw(GARROW_INPUT_STREAM(stream))); return arrow_stream->file_descriptor(); } G_DEFINE_TYPE(GArrowMemoryMappedInputStream, garrow_memory_mapped_input_stream, GARROW_TYPE_SEEKABLE_INPUT_STREAM); static void garrow_memory_mapped_input_stream_init(GArrowMemoryMappedInputStream *object) { } static void garrow_memory_mapped_input_stream_class_init(GArrowMemoryMappedInputStreamClass *klass) { } /** * garrow_memory_mapped_input_stream_new: * @path: The path of the file to be mapped on memory. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (nullable): A newly created #GArrowMemoryMappedInputStream * or %NULL on error. */ GArrowMemoryMappedInputStream * garrow_memory_mapped_input_stream_new(const gchar *path, GError **error) { auto arrow_stream_result = arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ); if (garrow::check(error, arrow_stream_result, "[memory-mapped-input-stream][new]")) { auto arrow_stream = *arrow_stream_result; return garrow_memory_mapped_input_stream_new_raw(&arrow_stream); } else { return NULL; } } G_END_DECLS namespace garrow { class GIOInputStream : public arrow::io::RandomAccessFile { public: GIOInputStream(GInputStream *input_stream) : input_stream_(input_stream), lock_() { g_object_ref(input_stream_); } ~GIOInputStream() { g_object_unref(input_stream_); } GInputStream *get_input_stream() { return input_stream_; } bool closed() const override { return static_cast(g_input_stream_is_closed(input_stream_)); } arrow::Status Close() override { std::lock_guard guard(lock_); GError *error = NULL; if (g_input_stream_close(input_stream_, NULL, &error)) { return arrow::Status::OK(); } else { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-input-stream][close]"); } } arrow::Result Tell() const override { if (!G_IS_SEEKABLE(input_stream_)) { std::string message("[gio-input-stream][tell] " "not seekable input stream: <"); message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_)); message += ">"; return arrow::Status::NotImplemented(message); } return g_seekable_tell(G_SEEKABLE(input_stream_)); } arrow::Result Read(int64_t n_bytes, void *out) override { std::lock_guard guard(lock_); GError *error = NULL; auto n_read_bytes = g_input_stream_read(input_stream_, out, n_bytes, NULL, &error); if (n_read_bytes == -1) { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-input-stream][read]"); } else { return n_read_bytes; } } arrow::Result ReadAt(int64_t position, int64_t n_bytes, void* out) override { return arrow::io::RandomAccessFile::ReadAt(position, n_bytes, out); } arrow::Result> ReadAt(int64_t position, int64_t n_bytes) override { return arrow::io::RandomAccessFile::ReadAt(position, n_bytes); } arrow::Result> Read(int64_t n_bytes) override { ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(n_bytes)); std::lock_guard guard(lock_); GError *error = NULL; auto n_read_bytes = g_input_stream_read(input_stream_, buffer->mutable_data(), n_bytes, NULL, &error); if (n_read_bytes == -1) { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-input-stream][read][buffer]"); } else { if (n_read_bytes < n_bytes) { RETURN_NOT_OK(buffer->Resize(n_read_bytes)); } return std::move(buffer); } } arrow::Result Peek(int64_t nbytes) override { if (!G_IS_BUFFERED_INPUT_STREAM(input_stream_)) { std::string message("[gio-input-stream][peek] " "not peekable input stream: <"); message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_)); message += ">"; return arrow::Status::NotImplemented(message); } auto stream = G_BUFFERED_INPUT_STREAM(input_stream_); auto available_n_bytes = g_buffered_input_stream_get_available(stream); if (available_n_bytes < static_cast(nbytes)) { GError *error = NULL; auto filled_size = g_buffered_input_stream_fill(stream, nbytes, NULL, &error); if (filled_size == -1) { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-input-stream][peek] " "failed to fill"); } } gsize data_size; auto data = g_buffered_input_stream_peek_buffer(stream, &data_size); if (data_size > static_cast(nbytes)) { data_size = nbytes; } return arrow::util::string_view(static_cast(data), data_size); } arrow::Status Seek(int64_t position) override { if (!G_IS_SEEKABLE(input_stream_)) { std::string message("[gio-input-stream][seek] " "not seekable input stream: <"); message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_)); message += ">"; return arrow::Status::NotImplemented(message); } std::lock_guard guard(lock_); GError *error = NULL; if (g_seekable_seek(G_SEEKABLE(input_stream_), position, G_SEEK_SET, NULL, &error)) { return arrow::Status::OK(); } else { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-input-stream][seek]"); } } arrow::Result GetSize() override { if (!G_IS_SEEKABLE(input_stream_)) { std::string message("[gio-input-stream][size] " "not seekable input stream: <"); message += G_OBJECT_CLASS_NAME(G_OBJECT_GET_CLASS(input_stream_)); message += ">"; return arrow::Status::NotImplemented(message); } std::lock_guard guard(lock_); auto current_position = g_seekable_tell(G_SEEKABLE(input_stream_)); GError *error = NULL; if (!g_seekable_seek(G_SEEKABLE(input_stream_), 0, G_SEEK_END, NULL, &error)) { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-input-stream][size][seek]"); } auto size = g_seekable_tell(G_SEEKABLE(input_stream_)); if (!g_seekable_seek(G_SEEKABLE(input_stream_), current_position, G_SEEK_SET, NULL, &error)) { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-input-stream][size][seek][restore]"); } return size; } bool supports_zero_copy() const override { return false; } private: GInputStream *input_stream_; std::mutex lock_; }; }; G_BEGIN_DECLS typedef struct GArrowGIOInputStreamPrivate_ { GInputStream *raw; } GArrowGIOInputStreamPrivate; enum { PROP_GIO_RAW = 1 }; G_DEFINE_TYPE_WITH_PRIVATE(GArrowGIOInputStream, garrow_gio_input_stream, GARROW_TYPE_SEEKABLE_INPUT_STREAM); #define GARROW_GIO_INPUT_STREAM_GET_PRIVATE(object) \ static_cast( \ garrow_gio_input_stream_get_instance_private( \ GARROW_GIO_INPUT_STREAM(object))) static void garrow_gio_input_stream_dispose(GObject *object) { auto priv = GARROW_GIO_INPUT_STREAM_GET_PRIVATE(object); if (priv->raw) { g_object_unref(priv->raw); priv->raw = nullptr; } G_OBJECT_CLASS(garrow_gio_input_stream_parent_class)->dispose(object); } static void garrow_gio_input_stream_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GARROW_GIO_INPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_GIO_RAW: priv->raw = G_INPUT_STREAM(g_value_dup_object(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_gio_input_stream_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { auto priv = GARROW_GIO_INPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_GIO_RAW: g_value_set_object(value, priv->raw); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_gio_input_stream_init(GArrowGIOInputStream *object) { } static void garrow_gio_input_stream_class_init(GArrowGIOInputStreamClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->dispose = garrow_gio_input_stream_dispose; gobject_class->set_property = garrow_gio_input_stream_set_property; gobject_class->get_property = garrow_gio_input_stream_get_property; GParamSpec *spec; spec = g_param_spec_object("raw", "Raw", "The raw GInputStream *", G_TYPE_INPUT_STREAM, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_GIO_RAW, spec); } /** * garrow_gio_input_stream_new: * @gio_input_stream: The stream to be read. * * Returns: (transfer full): A newly created #GArrowGIOInputStream. * * Since: 0.5.0 */ GArrowGIOInputStream * garrow_gio_input_stream_new(GInputStream *gio_input_stream) { auto arrow_input_stream = std::make_shared(gio_input_stream); auto object = g_object_new(GARROW_TYPE_GIO_INPUT_STREAM, "input-stream", &arrow_input_stream, "raw", gio_input_stream, NULL); auto input_stream = GARROW_GIO_INPUT_STREAM(object); return input_stream; } /** * garrow_gio_input_stream_get_raw: * @input_stream: A #GArrowGIOInputStream. * * Returns: (transfer none): The wrapped #GInputStream. * * Since: 0.5.0 * * Deprecated: 0.12.0: Use GArrowGIOInputStream::raw property instead. */ GInputStream * garrow_gio_input_stream_get_raw(GArrowGIOInputStream *input_stream) { auto priv = GARROW_GIO_INPUT_STREAM_GET_PRIVATE(input_stream); return priv->raw; } typedef struct GArrowCompressedInputStreamPrivate_ { GArrowCodec *codec; GArrowInputStream *raw; } GArrowCompressedInputStreamPrivate; enum { PROP_CODEC = 1, PROP_RAW }; G_DEFINE_TYPE_WITH_PRIVATE(GArrowCompressedInputStream, garrow_compressed_input_stream, GARROW_TYPE_INPUT_STREAM) #define GARROW_COMPRESSED_INPUT_STREAM_GET_PRIVATE(object) \ static_cast( \ garrow_compressed_input_stream_get_instance_private( \ GARROW_COMPRESSED_INPUT_STREAM(object))) static void garrow_compressed_input_stream_dispose(GObject *object) { auto priv = GARROW_COMPRESSED_INPUT_STREAM_GET_PRIVATE(object); if (priv->codec) { g_object_unref(priv->codec); priv->codec = NULL; } if (priv->raw) { g_object_unref(priv->raw); priv->raw = NULL; } G_OBJECT_CLASS(garrow_compressed_input_stream_parent_class)->dispose(object); } static void garrow_compressed_input_stream_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GARROW_COMPRESSED_INPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_CODEC: priv->codec = GARROW_CODEC(g_value_dup_object(value)); break; case PROP_RAW: priv->raw = GARROW_INPUT_STREAM(g_value_dup_object(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_compressed_input_stream_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { auto priv = GARROW_COMPRESSED_INPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_CODEC: g_value_set_object(value, priv->codec); break; case PROP_RAW: g_value_set_object(value, priv->raw); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_compressed_input_stream_init(GArrowCompressedInputStream *object) { } static void garrow_compressed_input_stream_class_init(GArrowCompressedInputStreamClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->dispose = garrow_compressed_input_stream_dispose; gobject_class->set_property = garrow_compressed_input_stream_set_property; gobject_class->get_property = garrow_compressed_input_stream_get_property; GParamSpec *spec; spec = g_param_spec_object("codec", "Codec", "The codec for the stream", GARROW_TYPE_CODEC, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_CODEC, spec); spec = g_param_spec_object("raw", "Raw", "The underlying raw input stream", GARROW_TYPE_INPUT_STREAM, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_RAW, spec); } /** * garrow_compressed_input_stream_new: * @codec: A #GArrowCodec for compressed data in the @raw. * @raw: A #GArrowInputStream that contains compressed data. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: A newly created #GArrowCompressedInputStream. * * Since: 0.12.0 */ GArrowCompressedInputStream * garrow_compressed_input_stream_new(GArrowCodec *codec, GArrowInputStream *raw, GError **error) { auto arrow_codec = garrow_codec_get_raw(codec).get(); auto arrow_raw = garrow_input_stream_get_raw(raw); auto arrow_stream = arrow::io::CompressedInputStream::Make(arrow_codec, arrow_raw); if (garrow::check(error, arrow_stream, "[compressed-input-stream][new]")) { return garrow_compressed_input_stream_new_raw(&(arrow_stream.ValueOrDie()), codec, raw); } else { return NULL; } } G_END_DECLS GArrowInputStream * garrow_input_stream_new_raw(std::shared_ptr *arrow_input_stream) { auto input_stream = GARROW_INPUT_STREAM(g_object_new(GARROW_TYPE_INPUT_STREAM, "input-stream", arrow_input_stream, NULL)); return input_stream; } std::shared_ptr garrow_input_stream_get_raw(GArrowInputStream *input_stream) { auto priv = GARROW_INPUT_STREAM_GET_PRIVATE(input_stream); return priv->input_stream; } GArrowSeekableInputStream * garrow_seekable_input_stream_new_raw( std::shared_ptr *arrow_random_access_file) { auto object = g_object_new(GARROW_TYPE_SEEKABLE_INPUT_STREAM, "input-stream", arrow_random_access_file, NULL); return GARROW_SEEKABLE_INPUT_STREAM(object); } std::shared_ptr garrow_seekable_input_stream_get_raw( GArrowSeekableInputStream *seekable_input_stream) { auto arrow_input_stream = garrow_input_stream_get_raw(GARROW_INPUT_STREAM(seekable_input_stream)); auto arrow_random_access_file = std::static_pointer_cast(arrow_input_stream); return arrow_random_access_file; } GArrowBufferInputStream * garrow_buffer_input_stream_new_raw(std::shared_ptr *arrow_buffer_reader, GArrowBuffer *buffer) { auto buffer_input_stream = GARROW_BUFFER_INPUT_STREAM(g_object_new(GARROW_TYPE_BUFFER_INPUT_STREAM, "input-stream", arrow_buffer_reader, "buffer", buffer, NULL)); return buffer_input_stream; } std::shared_ptr garrow_buffer_input_stream_get_raw(GArrowBufferInputStream *buffer_input_stream) { auto arrow_input_stream = garrow_input_stream_get_raw(GARROW_INPUT_STREAM(buffer_input_stream)); auto arrow_buffer_reader = std::static_pointer_cast(arrow_input_stream); return arrow_buffer_reader; } GArrowFileInputStream * garrow_file_input_stream_new_raw( std::shared_ptr *arrow_stream) { return GARROW_FILE_INPUT_STREAM(g_object_new(GARROW_TYPE_FILE_INPUT_STREAM, "input-stream", arrow_stream, NULL)); } GArrowMemoryMappedInputStream * garrow_memory_mapped_input_stream_new_raw( std::shared_ptr *arrow_stream) { return GARROW_MEMORY_MAPPED_INPUT_STREAM( g_object_new(GARROW_TYPE_MEMORY_MAPPED_INPUT_STREAM, "input-stream", arrow_stream, NULL)); } GArrowCompressedInputStream * garrow_compressed_input_stream_new_raw(std::shared_ptr *arrow_raw, GArrowCodec *codec, GArrowInputStream *raw) { auto compressed_input_stream = g_object_new(GARROW_TYPE_COMPRESSED_INPUT_STREAM, "input-stream", arrow_raw, "codec", codec, "raw", raw, NULL); return GARROW_COMPRESSED_INPUT_STREAM(compressed_input_stream); } std::shared_ptr garrow_compressed_input_stream_get_raw(GArrowCompressedInputStream *compressed_input_stream) { auto input_stream = GARROW_INPUT_STREAM(compressed_input_stream); auto arrow_input_stream = garrow_input_stream_get_raw(input_stream); auto arrow_compressed_input_stream = std::static_pointer_cast(arrow_input_stream); return arrow_compressed_input_stream->raw(); }