/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #include #include #include #include #include #include #include G_BEGIN_DECLS /** * SECTION: output-stream * @section_id: output-stream-classes * @title: Output stream classes * @include: arrow-glib/arrow-glib.h * * #GArrowOutputStream is an interface for stream output. Stream * output is file based and writable * * #GArrowFileOutputStream is a class for file output stream. * * #GArrowBufferOutputStream is a class for buffer output stream. * * #GArrowGIOOutputStream is a class for `GOutputStream` based output * stream. * * #GArrowCompressedOutputStream is a class to write compressed data to * output stream. */ typedef struct GArrowOutputStreamPrivate_ { std::shared_ptr output_stream; } GArrowOutputStreamPrivate; enum { PROP_0, PROP_OUTPUT_STREAM }; static std::shared_ptr garrow_output_stream_get_raw_file_interface(GArrowFile *file) { auto output_stream = GARROW_OUTPUT_STREAM(file); auto arrow_output_stream = garrow_output_stream_get_raw(output_stream); return arrow_output_stream; } static void garrow_output_stream_file_interface_init(GArrowFileInterface *iface) { iface->get_raw = garrow_output_stream_get_raw_file_interface; } static std::shared_ptr garrow_output_stream_get_raw_writable_interface(GArrowWritable *writable) { auto output_stream = GARROW_OUTPUT_STREAM(writable); auto arrow_output_stream = garrow_output_stream_get_raw(output_stream); return arrow_output_stream; } static void garrow_output_stream_writable_interface_init(GArrowWritableInterface *iface) { iface->get_raw = garrow_output_stream_get_raw_writable_interface; } G_DEFINE_TYPE_WITH_CODE(GArrowOutputStream, garrow_output_stream, G_TYPE_OBJECT, G_ADD_PRIVATE(GArrowOutputStream) G_IMPLEMENT_INTERFACE(GARROW_TYPE_FILE, garrow_output_stream_file_interface_init) G_IMPLEMENT_INTERFACE(GARROW_TYPE_WRITABLE, garrow_output_stream_writable_interface_init)); #define GARROW_OUTPUT_STREAM_GET_PRIVATE(obj) \ static_cast( \ garrow_output_stream_get_instance_private( \ GARROW_OUTPUT_STREAM(obj))) static void garrow_output_stream_finalize(GObject *object) { auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object); priv->output_stream.~shared_ptr(); G_OBJECT_CLASS(garrow_output_stream_parent_class)->finalize(object); } static void garrow_output_stream_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_OUTPUT_STREAM: priv->output_stream = *static_cast *>(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_output_stream_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { switch (prop_id) { default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_output_stream_init(GArrowOutputStream *object) { auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(object); new(&priv->output_stream) std::shared_ptr; } static void garrow_output_stream_class_init(GArrowOutputStreamClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->finalize = garrow_output_stream_finalize; gobject_class->set_property = garrow_output_stream_set_property; gobject_class->get_property = garrow_output_stream_get_property; GParamSpec *spec; spec = g_param_spec_pointer("output-stream", "io::OutputStream", "The raw std::shared *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_OUTPUT_STREAM, spec); } /** * garrow_output_stream_align: * @stream: A #GArrowOutputStream. * @alignment: The byte multiple for the metadata prefix, usually 8 * or 64, to ensure the body starts on a multiple of that alignment. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: %TRUE on success, %FALSE on error. * * Since: 0.11.0 */ gboolean garrow_output_stream_align(GArrowOutputStream *stream, gint32 alignment, GError **error) { auto arrow_stream = garrow_output_stream_get_raw(stream); auto status = arrow::ipc::AlignStream(arrow_stream.get(), alignment); return garrow::check(error, status, "[output-stream][align]"); } /** * garrow_output_stream_write_tensor: * @stream: A #GArrowOutputStream. * @tensor: A #GArrowTensor to be written. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: The number of written bytes on success, -1 on error. * * Since: 0.4.0 */ gint64 garrow_output_stream_write_tensor(GArrowOutputStream *stream, GArrowTensor *tensor, GError **error) { auto arrow_stream = garrow_output_stream_get_raw(stream); auto arrow_tensor = garrow_tensor_get_raw(tensor); int32_t metadata_length; int64_t body_length; auto status = arrow::ipc::WriteTensor(*arrow_tensor, arrow_stream.get(), &metadata_length, &body_length); if (garrow::check(error, status, "[output-stream][write-tensor]")) { return metadata_length + body_length; } else { return -1; } } /** * garrow_output_stream_write_record_batch: * @stream: A #GArrowOutputStream. * @record_batch: A #GArrowRecordBatch to be written. * @options: (nullable): A #GArrowWriteOptions. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: The number of written bytes on success, -1 on error. * * Since: 1.0.0 */ gint64 garrow_output_stream_write_record_batch(GArrowOutputStream *stream, GArrowRecordBatch *record_batch, GArrowWriteOptions *options, GError **error) { auto arrow_stream = garrow_output_stream_get_raw(stream); auto arrow_record_batch = garrow_record_batch_get_raw(record_batch); int64_t buffer_start_offset = 0; int32_t metadata_length; int64_t body_length; arrow::Status status; if (options) { auto arrow_options = garrow_write_options_get_raw(options); status = arrow::ipc::WriteRecordBatch(*arrow_record_batch, buffer_start_offset, arrow_stream.get(), &metadata_length, &body_length, *arrow_options); } else { auto arrow_options = arrow::ipc::IpcWriteOptions::Defaults(); status = arrow::ipc::WriteRecordBatch(*arrow_record_batch, buffer_start_offset, arrow_stream.get(), &metadata_length, &body_length, arrow_options); } if (garrow::check(error, status, "[output-stream][write-record-batch]")) { return metadata_length + body_length; } else { return -1; } } G_DEFINE_TYPE(GArrowFileOutputStream, garrow_file_output_stream, GARROW_TYPE_OUTPUT_STREAM); static void garrow_file_output_stream_init(GArrowFileOutputStream *file_output_stream) { } static void garrow_file_output_stream_class_init(GArrowFileOutputStreamClass *klass) { } /** * garrow_file_output_stream_new: * @path: The path of the file output stream. * @append: Whether the path is opened as append mode or recreate mode. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (nullable): A newly opened #GArrowFileOutputStream or * %NULL on error. */ GArrowFileOutputStream * garrow_file_output_stream_new(const gchar *path, gboolean append, GError **error) { auto arrow_file_output_stream_result = arrow::io::FileOutputStream::Open(std::string(path), append); if (arrow_file_output_stream_result.ok()) { auto arrow_file_output_stream = arrow_file_output_stream_result.ValueOrDie(); return garrow_file_output_stream_new_raw(&arrow_file_output_stream); } else { std::string context("[io][file-output-stream][open]: <"); context += path; context += ">"; garrow::check(error, arrow_file_output_stream_result, context.c_str()); return NULL; } } G_DEFINE_TYPE(GArrowBufferOutputStream, garrow_buffer_output_stream, GARROW_TYPE_OUTPUT_STREAM); static void garrow_buffer_output_stream_init(GArrowBufferOutputStream *buffer_output_stream) { } static void garrow_buffer_output_stream_class_init(GArrowBufferOutputStreamClass *klass) { } /** * garrow_buffer_output_stream_new: * @buffer: The resizable buffer to be output. * * Returns: (transfer full): A newly created #GArrowBufferOutputStream. */ GArrowBufferOutputStream * garrow_buffer_output_stream_new(GArrowResizableBuffer *buffer) { auto arrow_buffer = garrow_buffer_get_raw(GARROW_BUFFER(buffer)); auto arrow_resizable_buffer = std::static_pointer_cast(arrow_buffer); auto arrow_buffer_output_stream = std::make_shared(arrow_resizable_buffer); return garrow_buffer_output_stream_new_raw(&arrow_buffer_output_stream); } G_END_DECLS namespace garrow { class GIOOutputStream : public arrow::io::OutputStream { public: GIOOutputStream(GOutputStream *output_stream) : output_stream_(output_stream), position_(0) { g_object_ref(output_stream_); } ~GIOOutputStream() { g_object_unref(output_stream_); } GOutputStream *get_output_stream() { return output_stream_; } bool closed() const override { return static_cast(g_output_stream_is_closed(output_stream_)); } arrow::Status Close() override { GError *error = NULL; if (g_output_stream_close(output_stream_, NULL, &error)) { return arrow::Status::OK(); } else { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-output-stream][close]"); } } arrow::Result Tell() const override { if (G_IS_SEEKABLE(output_stream_)) { return g_seekable_tell(G_SEEKABLE(output_stream_)); } else { return position_; } } arrow::Status Write(const void *data, int64_t n_bytes) override { GError *error = NULL; gsize n_written_bytes; auto successed = g_output_stream_write_all(output_stream_, data, n_bytes, &n_written_bytes, NULL, &error); if (successed) { position_ += n_written_bytes; return arrow::Status::OK(); } else { std::stringstream message("[gio-output-stream][write]"); message << "[" << n_written_bytes << "/" << n_bytes << "]"; return garrow_error_to_status(error, arrow::StatusCode::IOError, message.str().c_str()); } } arrow::Status Flush() override { GError *error = NULL; auto successed = g_output_stream_flush(output_stream_, NULL, &error); if (successed) { return arrow::Status::OK(); } else { return garrow_error_to_status(error, arrow::StatusCode::IOError, "[gio-output-stream][flush]"); } } private: GOutputStream *output_stream_; int64_t position_; }; }; G_BEGIN_DECLS typedef struct GArrowGIOOutputStreamPrivate_ { GOutputStream *raw; } GArrowGIOOutputStreamPrivate; enum { PROP_GIO_RAW = 1 }; G_DEFINE_TYPE_WITH_PRIVATE(GArrowGIOOutputStream, garrow_gio_output_stream, GARROW_TYPE_OUTPUT_STREAM); #define GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object) \ static_cast( \ garrow_gio_output_stream_get_instance_private( \ GARROW_GIO_OUTPUT_STREAM(object))) static void garrow_gio_output_stream_dispose(GObject *object) { auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object); if (priv->raw) { g_object_unref(priv->raw); priv->raw = nullptr; } G_OBJECT_CLASS(garrow_gio_output_stream_parent_class)->dispose(object); } static void garrow_gio_output_stream_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_GIO_RAW: priv->raw = G_OUTPUT_STREAM(g_value_dup_object(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_gio_output_stream_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_GIO_RAW: g_value_set_object(value, priv->raw); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_gio_output_stream_init(GArrowGIOOutputStream *object) { } static void garrow_gio_output_stream_class_init(GArrowGIOOutputStreamClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->dispose = garrow_gio_output_stream_dispose; gobject_class->set_property = garrow_gio_output_stream_set_property; gobject_class->get_property = garrow_gio_output_stream_get_property; GParamSpec *spec; spec = g_param_spec_object("raw", "Raw", "The raw GOutputStream *", G_TYPE_OUTPUT_STREAM, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_GIO_RAW, spec); } /** * garrow_gio_output_stream_new: * @gio_output_stream: The stream to be output. * * Returns: (transfer full): A newly created #GArrowGIOOutputStream. */ GArrowGIOOutputStream * garrow_gio_output_stream_new(GOutputStream *gio_output_stream) { auto arrow_output_stream = std::make_shared(gio_output_stream); auto object = g_object_new(GARROW_TYPE_GIO_OUTPUT_STREAM, "output-stream", &arrow_output_stream, "raw", gio_output_stream, NULL); auto output_stream = GARROW_GIO_OUTPUT_STREAM(object); return output_stream; } /** * garrow_gio_output_stream_get_raw: * @output_stream: A #GArrowGIOOutputStream. * * Returns: (transfer none): The wrapped #GOutputStream. * * Since: 0.5.0 * * Deprecated: 0.12.0: Use GArrowGIOOutputStream::raw property instead. */ GOutputStream * garrow_gio_output_stream_get_raw(GArrowGIOOutputStream *output_stream) { auto priv = GARROW_GIO_OUTPUT_STREAM_GET_PRIVATE(output_stream); return priv->raw; } typedef struct GArrowCompressedOutputStreamPrivate_ { GArrowCodec *codec; GArrowOutputStream *raw; } GArrowCompressedOutputStreamPrivate; enum { PROP_CODEC = 1, PROP_RAW }; G_DEFINE_TYPE_WITH_PRIVATE(GArrowCompressedOutputStream, garrow_compressed_output_stream, GARROW_TYPE_OUTPUT_STREAM) #define GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object) \ static_cast( \ garrow_compressed_output_stream_get_instance_private( \ GARROW_COMPRESSED_OUTPUT_STREAM(object))) static void garrow_compressed_output_stream_dispose(GObject *object) { auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); if (priv->codec) { g_object_unref(priv->codec); priv->codec = NULL; } if (priv->raw) { g_object_unref(priv->raw); priv->raw = NULL; } G_OBJECT_CLASS(garrow_compressed_output_stream_parent_class)->dispose(object); } static void garrow_compressed_output_stream_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_CODEC: priv->codec = GARROW_CODEC(g_value_dup_object(value)); break; case PROP_RAW: priv->raw = GARROW_OUTPUT_STREAM(g_value_dup_object(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_compressed_output_stream_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { auto priv = GARROW_COMPRESSED_OUTPUT_STREAM_GET_PRIVATE(object); switch (prop_id) { case PROP_CODEC: g_value_set_object(value, priv->codec); break; case PROP_RAW: g_value_set_object(value, priv->raw); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void garrow_compressed_output_stream_init(GArrowCompressedOutputStream *object) { } static void garrow_compressed_output_stream_class_init(GArrowCompressedOutputStreamClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->dispose = garrow_compressed_output_stream_dispose; gobject_class->set_property = garrow_compressed_output_stream_set_property; gobject_class->get_property = garrow_compressed_output_stream_get_property; GParamSpec *spec; spec = g_param_spec_object("codec", "Codec", "The codec for the stream", GARROW_TYPE_CODEC, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_CODEC, spec); spec = g_param_spec_object("raw", "Raw", "The underlying raw output stream", GARROW_TYPE_OUTPUT_STREAM, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_RAW, spec); } /** * garrow_compressed_output_stream_new: * @codec: A #GArrowCodec for compressed data in the @raw. * @raw: A #GArrowOutputStream that is a sink for compressed data. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: A newly created #GArrowCompressedOutputStream. * * Since: 0.12.0 */ GArrowCompressedOutputStream * garrow_compressed_output_stream_new(GArrowCodec *codec, GArrowOutputStream *raw, GError **error) { auto arrow_codec = garrow_codec_get_raw(codec).get(); auto arrow_raw = garrow_output_stream_get_raw(raw); auto arrow_stream = arrow::io::CompressedOutputStream::Make(arrow_codec, arrow_raw); if (garrow::check(error, arrow_stream, "[compressed-output-stream][new]")) { return garrow_compressed_output_stream_new_raw(&(arrow_stream.ValueOrDie()), codec, raw); } else { return NULL; } } G_END_DECLS GArrowOutputStream * garrow_output_stream_new_raw(std::shared_ptr *arrow_output_stream) { auto output_stream = GARROW_OUTPUT_STREAM(g_object_new(GARROW_TYPE_OUTPUT_STREAM, "output-stream", arrow_output_stream, NULL)); return output_stream; } std::shared_ptr garrow_output_stream_get_raw(GArrowOutputStream *output_stream) { auto priv = GARROW_OUTPUT_STREAM_GET_PRIVATE(output_stream); return priv->output_stream; } GArrowFileOutputStream * garrow_file_output_stream_new_raw(std::shared_ptr *arrow_file_output_stream) { auto file_output_stream = GARROW_FILE_OUTPUT_STREAM(g_object_new(GARROW_TYPE_FILE_OUTPUT_STREAM, "output-stream", arrow_file_output_stream, NULL)); return file_output_stream; } GArrowBufferOutputStream * garrow_buffer_output_stream_new_raw(std::shared_ptr *arrow_buffer_output_stream) { auto buffer_output_stream = GARROW_BUFFER_OUTPUT_STREAM(g_object_new(GARROW_TYPE_BUFFER_OUTPUT_STREAM, "output-stream", arrow_buffer_output_stream, NULL)); return buffer_output_stream; } GArrowCompressedOutputStream * garrow_compressed_output_stream_new_raw(std::shared_ptr *arrow_raw, GArrowCodec *codec, GArrowOutputStream *raw) { auto compressed_output_stream = g_object_new(GARROW_TYPE_COMPRESSED_OUTPUT_STREAM, "output-stream", arrow_raw, "codec", codec, "raw", raw, NULL); return GARROW_COMPRESSED_OUTPUT_STREAM(compressed_output_stream); } std::shared_ptr garrow_compressed_output_stream_get_raw(GArrowCompressedOutputStream *compressed_output_stream) { auto output_stream = GARROW_OUTPUT_STREAM(compressed_output_stream); auto arrow_output_stream = garrow_output_stream_get_raw(output_stream); auto arrow_compressed_output_stream = std::static_pointer_cast(arrow_output_stream); return arrow_compressed_output_stream->raw(); }