/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #ifdef HAVE_ARROW_CUDA # include #endif #include #include G_BEGIN_DECLS /** * SECTION: client * @section_id: client-classes * @title: Client related classes * @include: plasma-glib/plasma-glib.h * * #GPlasmaClientOptions is a class for customizing plasma store * connection. * * #GPlasmaClientCreateOptions is a class for customizing object creation. * * #GPlasmaClient is a class for an interface with a plasma store. * * Since: 0.12.0 */ typedef struct GPlasmaClientCreatePrivate_ { gint n_retries; } GPlasmaClientOptionsPrivate; enum { PROP_N_RETRIES = 1 }; G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientOptions, gplasma_client_options, G_TYPE_OBJECT) #define GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object) \ static_cast( \ gplasma_client_options_get_instance_private( \ GPLASMA_CLIENT_OPTIONS(object))) static void gplasma_client_options_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object); switch (prop_id) { case PROP_N_RETRIES: priv->n_retries = g_value_get_int(value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void gplasma_client_options_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(object); switch (prop_id) { case PROP_N_RETRIES: g_value_set_int(value, priv->n_retries); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void gplasma_client_options_init(GPlasmaClientOptions *object) { } static void gplasma_client_options_class_init(GPlasmaClientOptionsClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->set_property = gplasma_client_options_set_property; gobject_class->get_property = gplasma_client_options_get_property; GParamSpec *spec; spec = g_param_spec_int("n-retries", "N retries", "The number of retries to connect plasma store. " "-1 means that the system default value is used.", -1, G_MAXINT, -1, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); g_object_class_install_property(gobject_class, PROP_N_RETRIES, spec); } /** * gplasma_client_options_new: * * Returns: A newly created #GPlasmaClientOptions. * * Since: 0.12.0 */ GPlasmaClientOptions * gplasma_client_options_new(void) { auto options = g_object_new(GPLASMA_TYPE_CLIENT_OPTIONS, NULL); return GPLASMA_CLIENT_OPTIONS(options); } /** * gplasma_client_options_set_n_retries: * @options: A #GPlasmaClientOptions. * @n_retries: The number of retires on connect. * * Since: 0.12.0 */ void gplasma_client_options_set_n_retries(GPlasmaClientOptions *options, gint n_retries) { auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options); priv->n_retries = n_retries; } /** * gplasma_client_options_get_n_retries: * @options: A #GPlasmaClientOptions. * * Returns: The number of retries on connect. * * Since: 0.12.0 */ gint gplasma_client_options_get_n_retries(GPlasmaClientOptions *options) { auto priv = GPLASMA_CLIENT_OPTIONS_GET_PRIVATE(options); return priv->n_retries; } typedef struct GPlasmaClientCreateOptionsPrivate_ { guint8 *metadata; gsize metadata_size; gint gpu_device; } GPlasmaClientCreateOptionsPrivate; enum { PROP_GPU_DEVICE = 1 }; G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClientCreateOptions, gplasma_client_create_options, G_TYPE_OBJECT) #define GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object) \ static_cast( \ gplasma_client_create_options_get_instance_private( \ GPLASMA_CLIENT_CREATE_OPTIONS(object))) static void gplasma_client_create_options_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object); switch (prop_id) { case PROP_GPU_DEVICE: priv->gpu_device = g_value_get_int(value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void gplasma_client_create_options_get_property(GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(object); switch (prop_id) { case PROP_GPU_DEVICE: g_value_set_int(value, priv->gpu_device); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void gplasma_client_create_options_init(GPlasmaClientCreateOptions *object) { } static void gplasma_client_create_options_class_init(GPlasmaClientCreateOptionsClass *klass) { auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->set_property = gplasma_client_create_options_set_property; gobject_class->get_property = gplasma_client_create_options_get_property; GParamSpec *spec; spec = g_param_spec_int("gpu-device", "GPU device", "The GPU device number. -1 means GPU isn't used.", -1, G_MAXINT, -1, static_cast(G_PARAM_READWRITE | G_PARAM_CONSTRUCT)); g_object_class_install_property(gobject_class, PROP_GPU_DEVICE, spec); } /** * gplasma_client_create_options_new: * * Returns: A newly created #GPlasmaClientCreateOptions. * * Since: 0.12.0 */ GPlasmaClientCreateOptions * gplasma_client_create_options_new(void) { auto options = g_object_new(GPLASMA_TYPE_CLIENT_CREATE_OPTIONS, NULL); return GPLASMA_CLIENT_CREATE_OPTIONS(options); } #if !GLIB_CHECK_VERSION(2, 68, 0) # define g_memdup2(memory, byte_size) g_memdup(memory, byte_size) #endif /** * gplasma_client_create_options_set_metadata: * @options: A #GPlasmaClientCreateOptions. * @metadata: (nullable) (array length=size): The metadata of a created object. * @size: The number of bytes of the metadata. * * Since: 0.12.0 */ void gplasma_client_create_options_set_metadata(GPlasmaClientCreateOptions *options, const guint8 *metadata, gsize size) { auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options); if (priv->metadata) { g_free(priv->metadata); } priv->metadata = static_cast(g_memdup2(metadata, size)); priv->metadata_size = size; } /** * gplasma_client_create_options_get_metadata: * @options: A #GPlasmaClientCreateOptions. * @size: (nullable) (out): The number of bytes of the metadata. * * Returns: (nullable) (array length=size): The metadata of a created object. * * Since: 0.12.0 */ const guint8 * gplasma_client_create_options_get_metadata(GPlasmaClientCreateOptions *options, gsize *size) { auto priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options); if (size) { *size = priv->metadata_size; } return priv->metadata; } typedef struct GPlasmaClientPrivate_ { plasma::PlasmaClient *client; bool disconnected; } GPlasmaClientPrivate; enum { PROP_CLIENT = 1 }; G_DEFINE_TYPE_WITH_PRIVATE(GPlasmaClient, gplasma_client, G_TYPE_OBJECT) #define GPLASMA_CLIENT_GET_PRIVATE(object) \ static_cast( \ gplasma_client_get_instance_private( \ GPLASMA_CLIENT(object))) static void gplasma_client_finalize(GObject *object) { auto priv = GPLASMA_CLIENT_GET_PRIVATE(object); if (!priv->disconnected) { auto status = priv->client->Disconnect(); if (!status.ok()) { g_warning("[plasma][client][finalize] Failed to disconnect: %s", status.ToString().c_str()); } } delete priv->client; G_OBJECT_CLASS(gplasma_client_parent_class)->finalize(object); } static void gplasma_client_set_property(GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { auto priv = GPLASMA_CLIENT_GET_PRIVATE(object); switch (prop_id) { case PROP_CLIENT: priv->client = static_cast(g_value_get_pointer(value)); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; } } static void gplasma_client_init(GPlasmaClient *object) { } static void gplasma_client_class_init(GPlasmaClientClass *klass) { GParamSpec *spec; auto gobject_class = G_OBJECT_CLASS(klass); gobject_class->finalize = gplasma_client_finalize; gobject_class->set_property = gplasma_client_set_property; spec = g_param_spec_pointer("client", "Client", "The raw plasma::PlasmaClient *", static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_CLIENT, spec); } /** * gplasma_client_new: * @store_socket_name: The name of the UNIX domain socket. * @options: (nullable): The options to custom how to connect to plasma store. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (nullable): A newly created #GPlasmaClient on success, * %NULL on error. * * Since: 0.12.0 */ GPlasmaClient * gplasma_client_new(const gchar *store_socket_name, GPlasmaClientOptions *options, GError **error) { auto plasma_client = new plasma::PlasmaClient(); int n_retries = -1; if (options) { n_retries = gplasma_client_options_get_n_retries(options); } auto status = plasma_client->Connect(store_socket_name, "", 0, n_retries); if (garrow_error_check(error, status, "[plasma][client][new]")) { return gplasma_client_new_raw(plasma_client); } else { return NULL; } } /** * gplasma_client_create: * @client: A #GPlasmaClient. * @id: The ID for a newly created object. * @data_size: The number of bytes of data for a newly created object. * @options: (nullable): The option for creating an object. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (nullable) (transfer full): A newly created #GPlasmaCreatedObject * on success, %NULL on error. * * Since: 0.12.0 */ GPlasmaCreatedObject * gplasma_client_create(GPlasmaClient *client, GPlasmaObjectID *id, gsize data_size, GPlasmaClientCreateOptions *options, GError **error) { const auto context = "[plasma][client][create]"; auto plasma_client = gplasma_client_get_raw(client); auto plasma_id = gplasma_object_id_get_raw(id); const uint8_t *raw_metadata = nullptr; int64_t raw_metadata_size = 0; int device_number = 0; if (options) { auto options_priv = GPLASMA_CLIENT_CREATE_OPTIONS_GET_PRIVATE(options); raw_metadata = options_priv->metadata; raw_metadata_size = options_priv->metadata_size; if (options_priv->gpu_device >= 0) { #ifndef HAVE_ARROW_CUDA g_set_error(error, GARROW_ERROR, GARROW_ERROR_INVALID, "%s Arrow CUDA GLib is needed to use GPU", context); return NULL; #endif device_number = options_priv->gpu_device + 1; } } std::shared_ptr plasma_data; auto status = plasma_client->Create(plasma_id, data_size, raw_metadata, raw_metadata_size, &plasma_data, device_number); if (!garrow_error_check(error, status, context)) { return NULL; } GArrowBuffer *data = nullptr; if (device_number == 0) { auto plasma_mutable_data = std::static_pointer_cast(plasma_data); data = GARROW_BUFFER(garrow_mutable_buffer_new_raw(&plasma_mutable_data)); #ifdef HAVE_ARROW_CUDA } else { auto plasma_cuda_data = std::static_pointer_cast(plasma_data); data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_data)); #endif } std::shared_ptr plasma_metadata; GArrowBuffer *metadata = nullptr; if (raw_metadata_size > 0) { plasma_metadata = std::make_shared(raw_metadata, raw_metadata_size); metadata = garrow_buffer_new_raw(&plasma_metadata); } return gplasma_created_object_new_raw(client, id, &plasma_data, data, metadata ? &plasma_metadata : nullptr, metadata, device_number - 1); } /** * gplasma_client_refer_object: * @client: A #GPlasmaClient. * @id: The ID of the target object. * @timeout_ms: The timeout in milliseconds. -1 means no timeout. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: (nullable) (transfer full): A found #GPlasmaReferredObject * on success, %NULL on error. * * Since: 0.12.0 */ GPlasmaReferredObject * gplasma_client_refer_object(GPlasmaClient *client, GPlasmaObjectID *id, gint64 timeout_ms, GError **error) { const auto context = "[plasma][client][refer-object]"; auto plasma_client = gplasma_client_get_raw(client); auto plasma_id = gplasma_object_id_get_raw(id); std::vector plasma_ids; plasma_ids.push_back(plasma_id); std::vector plasma_object_buffers; auto status = plasma_client->Get(plasma_ids, timeout_ms, &plasma_object_buffers); if (!garrow_error_check(error, status, context)) { return NULL; } auto plasma_object_buffer = plasma_object_buffers[0]; auto plasma_data = plasma_object_buffer.data; auto plasma_metadata = plasma_object_buffer.metadata; GArrowBuffer *data = nullptr; GArrowBuffer *metadata = nullptr; if (plasma_object_buffer.device_num == 0) { data = garrow_buffer_new_raw(&plasma_data); metadata = garrow_buffer_new_raw(&plasma_metadata); } else { #ifdef HAVE_ARROW_CUDA auto plasma_cuda_data = arrow::cuda::CudaBuffer::FromBuffer(plasma_data); if (!garrow::check(error, plasma_cuda_data, context)) { return NULL; } auto plasma_cuda_metadata = arrow::cuda::CudaBuffer::FromBuffer(plasma_metadata); if (!garrow::check(error, plasma_cuda_metadata, context)) { return NULL; } data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_data))); metadata = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_metadata))); #else g_set_error(error, GARROW_ERROR, GARROW_ERROR_INVALID, "%s Arrow CUDA GLib is needed to use GPU", context); return NULL; #endif } return gplasma_referred_object_new_raw(client, id, &plasma_data, data, &plasma_metadata, metadata, plasma_object_buffer.device_num - 1); } /** * gplasma_client_disconnect: * @client: A #GPlasmaClient. * @error: (nullable): Return location for a #GError or %NULL. * * Returns: %TRUE on success, %FALSE if there was an error. * * Since: 0.12.0 */ gboolean gplasma_client_disconnect(GPlasmaClient *client, GError **error) { auto priv = GPLASMA_CLIENT_GET_PRIVATE(client); auto status = priv->client->Disconnect(); if (garrow_error_check(error, status, "[plasma][client][disconnect]")) { priv->disconnected = true; return TRUE; } else { return FALSE; } } G_END_DECLS GPlasmaClient * gplasma_client_new_raw(plasma::PlasmaClient *plasma_client) { auto client = g_object_new(GPLASMA_TYPE_CLIENT, "client", plasma_client, NULL); return GPLASMA_CLIENT(client); } plasma::PlasmaClient * gplasma_client_get_raw(GPlasmaClient *client) { auto priv = GPLASMA_CLIENT_GET_PRIVATE(client); return priv->client; }