summaryrefslogtreecommitdiffstats
path: root/src/arrow/c_glib/plasma-glib/client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/c_glib/plasma-glib/client.cpp')
-rw-r--r--src/arrow/c_glib/plasma-glib/client.cpp612
1 files changed, 612 insertions, 0 deletions
diff --git a/src/arrow/c_glib/plasma-glib/client.cpp b/src/arrow/c_glib/plasma-glib/client.cpp
new file mode 100644
index 000000000..26476f4d6
--- /dev/null
+++ b/src/arrow/c_glib/plasma-glib/client.cpp
@@ -0,0 +1,612 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow-glib/buffer.hpp>
+#include <arrow-glib/error.hpp>
+
+#ifdef HAVE_ARROW_CUDA
+# include <arrow-cuda-glib/cuda.hpp>
+#endif
+
+#include <plasma-glib/client.hpp>
+#include <plasma-glib/object.hpp>
+
+G_BEGIN_DECLS
+
+/**
+ * SECTION: client
+ * @section_id: client-classes
+ * @title: Client related classes
+ * @include: plasma-glib/plasma-glib.h
+ *
+ * #GPlasmaClientOptions is a class for customizing plasma store
+ * connection.
+ *
+ * #GPlasmaClientCreateOptions is a class for customizing object creation.
+ *
+ * #GPlasmaClient is a class for an interface with a plasma store.
+ *
+ * Since: 0.12.0
+ */
+
+typedef struct GPlasmaClientCreatePrivate_ {
+ gint n_retries;
+} GPlasmaClientOptionsPrivate;
+
+enum {
+ PROP_N_RETRIES = 1
+};
+
+G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientOptions,
+ gplasma_client_options,
+ G_TYPE_OBJECT)
+
+#define GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object) \
+ static_cast<GPlasmaClientOptionsPrivate *>( \
+ gplasma_client_options_get_instance_private( \
+ GPLASMA_CLIENT_OPTIONS(object)))
+
+static void
+gplasma_client_options_set_property(GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object);
+
+ switch (prop_id) {
+ case PROP_N_RETRIES:
+ priv->n_retries = g_value_get_int(value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gplasma_client_options_get_property(GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object);
+
+ switch (prop_id) {
+ case PROP_N_RETRIES:
+ g_value_set_int(value, priv->n_retries);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gplasma_client_options_init(GPlasmaClientOptions *object)
+{
+}
+
+static void
+gplasma_client_options_class_init(GPlasmaClientOptionsClass *klass)
+{
+ auto gobject_class = G_OBJECT_CLASS(klass);
+
+ gobject_class->set_property = gplasma_client_options_set_property;
+ gobject_class->get_property = gplasma_client_options_get_property;
+
+ GParamSpec *spec;
+ spec = g_param_spec_int("n-retries",
+ "N retries",
+ "The number of retries to connect plasma store. "
+ "-1 means that the system default value is used.",
+ -1,
+ G_MAXINT,
+ -1,
+ static_cast<GParamFlags>(G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT));
+ g_object_class_install_property(gobject_class, PROP_N_RETRIES, spec);
+}
+
+/**
+ * gplasma_client_options_new:
+ *
+ * Returns: A newly created #GPlasmaClientOptions.
+ *
+ * Since: 0.12.0
+ */
+GPlasmaClientOptions *
+gplasma_client_options_new(void)
+{
+ auto options = g_object_new(GPLASMA_TYPE_CLIENT_OPTIONS,
+ NULL);
+ return GPLASMA_CLIENT_OPTIONS(options);
+}
+
+/**
+ * gplasma_client_options_set_n_retries:
+ * @options: A #GPlasmaClientOptions.
+ * @n_retries: The number of retires on connect.
+ *
+ * Since: 0.12.0
+ */
+void
+gplasma_client_options_set_n_retries(GPlasmaClientOptions *options,
+ gint n_retries)
+{
+ auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options);
+ priv->n_retries = n_retries;
+}
+
+/**
+ * gplasma_client_options_get_n_retries:
+ * @options: A #GPlasmaClientOptions.
+ *
+ * Returns: The number of retries on connect.
+ *
+ * Since: 0.12.0
+ */
+gint
+gplasma_client_options_get_n_retries(GPlasmaClientOptions *options)
+{
+ auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options);
+ return priv->n_retries;
+}
+
+
+typedef struct GPlasmaClientCreateOptionsPrivate_ {
+ guint8 *metadata;
+ gsize metadata_size;
+ gint gpu_device;
+} GPlasmaClientCreateOptionsPrivate;
+
+enum {
+ PROP_GPU_DEVICE = 1
+};
+
+G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientCreateOptions,
+ gplasma_client_create_options,
+ G_TYPE_OBJECT)
+
+#define GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object) \
+ static_cast<GPlasmaClientCreateOptionsPrivate *>( \
+ gplasma_client_create_options_get_instance_private( \
+ GPLASMA_CLIENT_CREATE_OPTIONS(object)))
+
+static void
+gplasma_client_create_options_set_property(GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object);
+
+ switch (prop_id) {
+ case PROP_GPU_DEVICE:
+ priv->gpu_device = g_value_get_int(value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gplasma_client_create_options_get_property(GObject *object,
+ guint prop_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object);
+
+ switch (prop_id) {
+ case PROP_GPU_DEVICE:
+ g_value_set_int(value, priv->gpu_device);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gplasma_client_create_options_init(GPlasmaClientCreateOptions *object)
+{
+}
+
+static void
+gplasma_client_create_options_class_init(GPlasmaClientCreateOptionsClass *klass)
+{
+ auto gobject_class = G_OBJECT_CLASS(klass);
+
+ gobject_class->set_property = gplasma_client_create_options_set_property;
+ gobject_class->get_property = gplasma_client_create_options_get_property;
+
+ GParamSpec *spec;
+ spec = g_param_spec_int("gpu-device",
+ "GPU device",
+ "The GPU device number. -1 means GPU isn't used.",
+ -1,
+ G_MAXINT,
+ -1,
+ static_cast<GParamFlags>(G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT));
+ g_object_class_install_property(gobject_class, PROP_GPU_DEVICE, spec);
+}
+
+/**
+ * gplasma_client_create_options_new:
+ *
+ * Returns: A newly created #GPlasmaClientCreateOptions.
+ *
+ * Since: 0.12.0
+ */
+GPlasmaClientCreateOptions *
+gplasma_client_create_options_new(void)
+{
+ auto options = g_object_new(GPLASMA_TYPE_CLIENT_CREATE_OPTIONS,
+ NULL);
+ return GPLASMA_CLIENT_CREATE_OPTIONS(options);
+}
+
+#if !GLIB_CHECK_VERSION(2, 68, 0)
+# define g_memdup2(memory, byte_size) g_memdup(memory, byte_size)
+#endif
+
+/**
+ * gplasma_client_create_options_set_metadata:
+ * @options: A #GPlasmaClientCreateOptions.
+ * @metadata: (nullable) (array length=size): The metadata of a created object.
+ * @size: The number of bytes of the metadata.
+ *
+ * Since: 0.12.0
+ */
+void
+gplasma_client_create_options_set_metadata(GPlasmaClientCreateOptions *options,
+ const guint8 *metadata,
+ gsize size)
+{
+ auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
+ if (priv->metadata) {
+ g_free(priv->metadata);
+ }
+ priv->metadata = static_cast<guint8 *>(g_memdup2(metadata, size));
+ priv->metadata_size = size;
+}
+
+/**
+ * gplasma_client_create_options_get_metadata:
+ * @options: A #GPlasmaClientCreateOptions.
+ * @size: (nullable) (out): The number of bytes of the metadata.
+ *
+ * Returns: (nullable) (array length=size): The metadata of a created object.
+ *
+ * Since: 0.12.0
+ */
+const guint8 *
+gplasma_client_create_options_get_metadata(GPlasmaClientCreateOptions *options,
+ gsize *size)
+{
+ auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
+ if (size) {
+ *size = priv->metadata_size;
+ }
+ return priv->metadata;
+}
+
+
+typedef struct GPlasmaClientPrivate_ {
+ plasma::PlasmaClient *client;
+ bool disconnected;
+} GPlasmaClientPrivate;
+
+enum {
+ PROP_CLIENT = 1
+};
+
+G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClient,
+ gplasma_client,
+ G_TYPE_OBJECT)
+
+#define GPLASMA_CLIENT_GET_PRIVATE(object) \
+ static_cast<GPlasmaClientPrivate *>( \
+ gplasma_client_get_instance_private( \
+ GPLASMA_CLIENT(object)))
+
+static void
+gplasma_client_finalize(GObject *object)
+{
+ auto priv = GPLASMA_CLIENT_GET_PRIVATE(object);
+
+ if (!priv->disconnected) {
+ auto status = priv->client->Disconnect();
+ if (!status.ok()) {
+ g_warning("[plasma][client][finalize] Failed to disconnect: %s",
+ status.ToString().c_str());
+ }
+ }
+ delete priv->client;
+
+ G_OBJECT_CLASS(gplasma_client_parent_class)->finalize(object);
+}
+
+static void
+gplasma_client_set_property(GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ auto priv = GPLASMA_CLIENT_GET_PRIVATE(object);
+
+ switch (prop_id) {
+ case PROP_CLIENT:
+ priv->client =
+ static_cast<plasma::PlasmaClient *>(g_value_get_pointer(value));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+gplasma_client_init(GPlasmaClient *object)
+{
+}
+
+static void
+gplasma_client_class_init(GPlasmaClientClass *klass)
+{
+ GParamSpec *spec;
+
+ auto gobject_class = G_OBJECT_CLASS(klass);
+
+ gobject_class->finalize = gplasma_client_finalize;
+ gobject_class->set_property = gplasma_client_set_property;
+
+ spec = g_param_spec_pointer("client",
+ "Client",
+ "The raw plasma::PlasmaClient *",
+ static_cast<GParamFlags>(G_PARAM_WRITABLE |
+ G_PARAM_CONSTRUCT_ONLY));
+ g_object_class_install_property(gobject_class, PROP_CLIENT, spec);
+}
+
+/**
+ * gplasma_client_new:
+ * @store_socket_name: The name of the UNIX domain socket.
+ * @options: (nullable): The options to custom how to connect to plasma store.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (nullable): A newly created #GPlasmaClient on success,
+ * %NULL on error.
+ *
+ * Since: 0.12.0
+ */
+GPlasmaClient *
+gplasma_client_new(const gchar *store_socket_name,
+ GPlasmaClientOptions *options,
+ GError **error)
+{
+ auto plasma_client = new plasma::PlasmaClient();
+ int n_retries = -1;
+ if (options) {
+ n_retries = gplasma_client_options_get_n_retries(options);
+ }
+ auto status = plasma_client->Connect(store_socket_name, "", 0, n_retries);
+ if (garrow_error_check(error, status, "[plasma][client][new]")) {
+ return gplasma_client_new_raw(plasma_client);
+ } else {
+ return NULL;
+ }
+}
+
+/**
+ * gplasma_client_create:
+ * @client: A #GPlasmaClient.
+ * @id: The ID for a newly created object.
+ * @data_size: The number of bytes of data for a newly created object.
+ * @options: (nullable): The option for creating an object.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (nullable) (transfer full): A newly created #GPlasmaCreatedObject
+ * on success, %NULL on error.
+ *
+ * Since: 0.12.0
+ */
+GPlasmaCreatedObject *
+gplasma_client_create(GPlasmaClient *client,
+ GPlasmaObjectID *id,
+ gsize data_size,
+ GPlasmaClientCreateOptions *options,
+ GError **error)
+{
+ const auto context = "[plasma][client][create]";
+ auto plasma_client = gplasma_client_get_raw(client);
+ auto plasma_id = gplasma_object_id_get_raw(id);
+ const uint8_t *raw_metadata = nullptr;
+ int64_t raw_metadata_size = 0;
+ int device_number = 0;
+ if (options) {
+ auto options_priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options);
+ raw_metadata = options_priv->metadata;
+ raw_metadata_size = options_priv->metadata_size;
+ if (options_priv->gpu_device >= 0) {
+#ifndef HAVE_ARROW_CUDA
+ g_set_error(error,
+ GARROW_ERROR,
+ GARROW_ERROR_INVALID,
+ "%s Arrow CUDA GLib is needed to use GPU",
+ context);
+ return NULL;
+#endif
+ device_number = options_priv->gpu_device + 1;
+ }
+ }
+ std::shared_ptr<arrow::Buffer> plasma_data;
+ auto status = plasma_client->Create(plasma_id,
+ data_size,
+ raw_metadata,
+ raw_metadata_size,
+ &plasma_data,
+ device_number);
+ if (!garrow_error_check(error, status, context)) {
+ return NULL;
+ }
+
+ GArrowBuffer *data = nullptr;
+ if (device_number == 0) {
+ auto plasma_mutable_data =
+ std::static_pointer_cast<arrow::MutableBuffer>(plasma_data);
+ data = GARROW_BUFFER(garrow_mutable_buffer_new_raw(&plasma_mutable_data));
+#ifdef HAVE_ARROW_CUDA
+ } else {
+ auto plasma_cuda_data =
+ std::static_pointer_cast<arrow::cuda::CudaBuffer>(plasma_data);
+ data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_data));
+#endif
+ }
+ std::shared_ptr<arrow::Buffer> plasma_metadata;
+ GArrowBuffer *metadata = nullptr;
+ if (raw_metadata_size > 0) {
+ plasma_metadata =
+ std::make_shared<arrow::Buffer>(raw_metadata, raw_metadata_size);
+ metadata = garrow_buffer_new_raw(&plasma_metadata);
+ }
+ return gplasma_created_object_new_raw(client,
+ id,
+ &plasma_data,
+ data,
+ metadata ? &plasma_metadata : nullptr,
+ metadata,
+ device_number - 1);
+}
+
+/**
+ * gplasma_client_refer_object:
+ * @client: A #GPlasmaClient.
+ * @id: The ID of the target object.
+ * @timeout_ms: The timeout in milliseconds. -1 means no timeout.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: (nullable) (transfer full): A found #GPlasmaReferredObject
+ * on success, %NULL on error.
+ *
+ * Since: 0.12.0
+ */
+GPlasmaReferredObject *
+gplasma_client_refer_object(GPlasmaClient *client,
+ GPlasmaObjectID *id,
+ gint64 timeout_ms,
+ GError **error)
+{
+ const auto context = "[plasma][client][refer-object]";
+ auto plasma_client = gplasma_client_get_raw(client);
+ auto plasma_id = gplasma_object_id_get_raw(id);
+ std::vector<plasma::ObjectID> plasma_ids;
+ plasma_ids.push_back(plasma_id);
+ std::vector<plasma::ObjectBuffer> plasma_object_buffers;
+ auto status = plasma_client->Get(plasma_ids,
+ timeout_ms,
+ &plasma_object_buffers);
+ if (!garrow_error_check(error, status, context)) {
+ return NULL;
+ }
+
+ auto plasma_object_buffer = plasma_object_buffers[0];
+ auto plasma_data = plasma_object_buffer.data;
+ auto plasma_metadata = plasma_object_buffer.metadata;
+ GArrowBuffer *data = nullptr;
+ GArrowBuffer *metadata = nullptr;
+ if (plasma_object_buffer.device_num == 0) {
+ data = garrow_buffer_new_raw(&plasma_data);
+ metadata = garrow_buffer_new_raw(&plasma_metadata);
+ } else {
+#ifdef HAVE_ARROW_CUDA
+ auto plasma_cuda_data = arrow::cuda::CudaBuffer::FromBuffer(plasma_data);
+ if (!garrow::check(error, plasma_cuda_data, context)) {
+ return NULL;
+ }
+ auto plasma_cuda_metadata =
+ arrow::cuda::CudaBuffer::FromBuffer(plasma_metadata);
+ if (!garrow::check(error, plasma_cuda_metadata, context)) {
+ return NULL;
+ }
+
+ data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_data)));
+ metadata =
+ GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_metadata)));
+#else
+ g_set_error(error,
+ GARROW_ERROR,
+ GARROW_ERROR_INVALID,
+ "%s Arrow CUDA GLib is needed to use GPU",
+ context);
+ return NULL;
+#endif
+ }
+ return gplasma_referred_object_new_raw(client,
+ id,
+ &plasma_data,
+ data,
+ &plasma_metadata,
+ metadata,
+ plasma_object_buffer.device_num - 1);
+}
+
+/**
+ * gplasma_client_disconnect:
+ * @client: A #GPlasmaClient.
+ * @error: (nullable): Return location for a #GError or %NULL.
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error.
+ *
+ * Since: 0.12.0
+ */
+gboolean
+gplasma_client_disconnect(GPlasmaClient *client,
+ GError **error)
+{
+ auto priv = GPLASMA_CLIENT_GET_PRIVATE(client);
+ auto status = priv->client->Disconnect();
+ if (garrow_error_check(error, status, "[plasma][client][disconnect]")) {
+ priv->disconnected = true;
+ return TRUE;
+ } else {
+ return FALSE;
+ }
+}
+
+G_END_DECLS
+
+GPlasmaClient *
+gplasma_client_new_raw(plasma::PlasmaClient *plasma_client)
+{
+ auto client = g_object_new(GPLASMA_TYPE_CLIENT,
+ "client", plasma_client,
+ NULL);
+ return GPLASMA_CLIENT(client);
+}
+
+plasma::PlasmaClient *
+gplasma_client_get_raw(GPlasmaClient *client)
+{
+ auto priv = GPLASMA_CLIENT_GET_PRIVATE(client);
+ return priv->client;
+}