summaryrefslogtreecommitdiffstats
path: root/src/arrow/cpp/src/plasma/protocol.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/cpp/src/plasma/protocol.cc')
-rw-r--r--src/arrow/cpp/src/plasma/protocol.cc829
1 files changed, 829 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/plasma/protocol.cc b/src/arrow/cpp/src/plasma/protocol.cc
new file mode 100644
index 000000000..735636cda
--- /dev/null
+++ b/src/arrow/cpp/src/plasma/protocol.cc
@@ -0,0 +1,829 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/protocol.h"
+
+#include <utility>
+
+#include "flatbuffers/flatbuffers.h"
+#include "plasma/common.h"
+#include "plasma/io.h"
+#include "plasma/plasma_generated.h"
+
+#ifdef PLASMA_CUDA
+#include "arrow/gpu/cuda_api.h"
+#endif
+#include "arrow/util/ubsan.h"
+
+namespace fb = plasma::flatbuf;
+
+namespace plasma {
+
+using fb::MessageType;
+using fb::PlasmaError;
+using fb::PlasmaObjectSpec;
+
+using flatbuffers::uoffset_t;
+
+#define PLASMA_CHECK_ENUM(x, y) \
+ static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch")
+
+flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
+ int64_t num_objects) {
+ std::vector<flatbuffers::Offset<flatbuffers::String>> results;
+ for (int64_t i = 0; i < num_objects; i++) {
+ results.push_back(fbb->CreateString(object_ids[i].binary()));
+ }
+ return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size());
+}
+
+flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
+ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb,
+ const std::vector<std::string>& strings) {
+ std::vector<flatbuffers::Offset<flatbuffers::String>> results;
+ for (size_t i = 0; i < strings.size(); i++) {
+ results.push_back(fbb->CreateString(strings[i]));
+ }
+
+ return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size());
+}
+
+flatbuffers::Offset<flatbuffers::Vector<int64_t>> ToFlatbuffer(
+ flatbuffers::FlatBufferBuilder* fbb, const std::vector<int64_t>& data) {
+ return fbb->CreateVector(arrow::util::MakeNonNull(data.data()), data.size());
+}
+
+Status PlasmaReceive(int sock, MessageType message_type, std::vector<uint8_t>* buffer) {
+ MessageType type;
+ RETURN_NOT_OK(ReadMessage(sock, &type, buffer));
+ ARROW_CHECK(type == message_type)
+ << "type = " << static_cast<int64_t>(type)
+ << ", message_type = " << static_cast<int64_t>(message_type);
+ return Status::OK();
+}
+
+// Helper function to create a vector of elements from Data (Request/Reply struct).
+// The Getter function is used to extract one element from Data.
+template <typename T, typename Data, typename Getter>
+void ToVector(const Data& request, std::vector<T>* out, const Getter& getter) {
+ int count = request.count();
+ out->clear();
+ out->reserve(count);
+ for (int i = 0; i < count; ++i) {
+ out->push_back(getter(request, i));
+ }
+}
+
+template <typename T, typename FlatbufferVectorPointer, typename Converter>
+void ConvertToVector(const FlatbufferVectorPointer fbvector, std::vector<T>* out,
+ const Converter& converter) {
+ out->clear();
+ out->reserve(fbvector->size());
+ for (size_t i = 0; i < fbvector->size(); ++i) {
+ out->push_back(converter(*fbvector->Get(i)));
+ }
+}
+
+template <typename Message>
+Status PlasmaSend(int sock, MessageType message_type, flatbuffers::FlatBufferBuilder* fbb,
+ const Message& message) {
+ fbb->Finish(message);
+ return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer());
+}
+
+Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
+ switch (plasma_error) {
+ case fb::PlasmaError::OK:
+ return Status::OK();
+ case fb::PlasmaError::ObjectExists:
+ return MakePlasmaError(PlasmaErrorCode::PlasmaObjectExists,
+ "object already exists in the plasma store");
+ case fb::PlasmaError::ObjectNotFound:
+ return MakePlasmaError(PlasmaErrorCode::PlasmaObjectNotFound,
+ "object does not exist in the plasma store");
+ case fb::PlasmaError::OutOfMemory:
+ return MakePlasmaError(PlasmaErrorCode::PlasmaStoreFull,
+ "object does not fit in the plasma store");
+ default:
+ ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
+ }
+ return Status::OK();
+}
+
+// Set options messages.
+
+Status SendSetOptionsRequest(int sock, const std::string& client_name,
+ int64_t output_memory_limit) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaSetOptionsRequest(fbb, fbb.CreateString(client_name),
+ output_memory_limit);
+ return PlasmaSend(sock, MessageType::PlasmaSetOptionsRequest, &fbb, message);
+}
+
+Status ReadSetOptionsRequest(const uint8_t* data, size_t size, std::string* client_name,
+ int64_t* output_memory_quota) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaSetOptionsRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *client_name = std::string(message->client_name()->str());
+ *output_memory_quota = message->output_memory_quota();
+ return Status::OK();
+}
+
+Status SendSetOptionsReply(int sock, PlasmaError error) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaSetOptionsReply(fbb, error);
+ return PlasmaSend(sock, MessageType::PlasmaSetOptionsReply, &fbb, message);
+}
+
+Status ReadSetOptionsReply(const uint8_t* data, size_t size) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaSetOptionsReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ return PlasmaErrorStatus(message->error());
+}
+
+// Get debug string messages.
+
+Status SendGetDebugStringRequest(int sock) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaGetDebugStringRequest(fbb);
+ return PlasmaSend(sock, MessageType::PlasmaGetDebugStringRequest, &fbb, message);
+}
+
+Status SendGetDebugStringReply(int sock, const std::string& debug_string) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaGetDebugStringReply(fbb, fbb.CreateString(debug_string));
+ return PlasmaSend(sock, MessageType::PlasmaGetDebugStringReply, &fbb, message);
+}
+
+Status ReadGetDebugStringReply(const uint8_t* data, size_t size,
+ std::string* debug_string) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaGetDebugStringReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *debug_string = message->debug_string()->str();
+ return Status::OK();
+}
+
+// Create messages.
+
+Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full,
+ int64_t data_size, int64_t metadata_size, int device_num) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message =
+ fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()),
+ evict_if_full, data_size, metadata_size, device_num);
+ return PlasmaSend(sock, MessageType::PlasmaCreateRequest, &fbb, message);
+}
+
+Status ReadCreateRequest(const uint8_t* data, size_t size, ObjectID* object_id,
+ bool* evict_if_full, int64_t* data_size, int64_t* metadata_size,
+ int* device_num) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *evict_if_full = message->evict_if_full();
+ *data_size = message->data_size();
+ *metadata_size = message->metadata_size();
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ *device_num = message->device_num();
+ return Status::OK();
+}
+
+Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
+ PlasmaError error_code, int64_t mmap_size) {
+ flatbuffers::FlatBufferBuilder fbb;
+ PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size,
+ object->metadata_offset, object->metadata_size,
+ object->device_num);
+ auto object_string = fbb.CreateString(object_id.binary());
+#ifdef PLASMA_CUDA
+ flatbuffers::Offset<fb::CudaHandle> ipc_handle;
+ if (object->device_num != 0) {
+ std::shared_ptr<arrow::Buffer> handle;
+ ARROW_ASSIGN_OR_RAISE(handle, object->ipc_handle->Serialize());
+ ipc_handle =
+ fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size()));
+ }
+#endif
+ fb::PlasmaCreateReplyBuilder crb(fbb);
+ crb.add_error(static_cast<PlasmaError>(error_code));
+ crb.add_plasma_object(&plasma_object);
+ crb.add_object_id(object_string);
+ crb.add_store_fd(object->store_fd);
+ crb.add_mmap_size(mmap_size);
+ if (object->device_num != 0) {
+#ifdef PLASMA_CUDA
+ crb.add_ipc_handle(ipc_handle);
+#else
+ ARROW_LOG(FATAL) << "This should be unreachable.";
+#endif
+ }
+ auto message = crb.Finish();
+ return PlasmaSend(sock, MessageType::PlasmaCreateReply, &fbb, message);
+}
+
+Status ReadCreateReply(const uint8_t* data, size_t size, ObjectID* object_id,
+ PlasmaObject* object, int* store_fd, int64_t* mmap_size) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ object->store_fd = message->plasma_object()->segment_index();
+ object->data_offset = message->plasma_object()->data_offset();
+ object->data_size = message->plasma_object()->data_size();
+ object->metadata_offset = message->plasma_object()->metadata_offset();
+ object->metadata_size = message->plasma_object()->metadata_size();
+
+ *store_fd = message->store_fd();
+ *mmap_size = message->mmap_size();
+
+ object->device_num = message->plasma_object()->device_num();
+#ifdef PLASMA_CUDA
+ if (object->device_num != 0) {
+ ARROW_ASSIGN_OR_RAISE(
+ object->ipc_handle,
+ CudaIpcMemHandle::FromBuffer(message->ipc_handle()->handle()->data()));
+ }
+#endif
+ return PlasmaErrorStatus(message->error());
+}
+
+Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full,
+ const std::string& data, const std::string& metadata,
+ unsigned char* digest) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize);
+ auto message = fb::CreatePlasmaCreateAndSealRequest(
+ fbb, fbb.CreateString(object_id.binary()), evict_if_full, fbb.CreateString(data),
+ fbb.CreateString(metadata), digest_string);
+ return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message);
+}
+
+Status ReadCreateAndSealRequest(const uint8_t* data, size_t size, ObjectID* object_id,
+ bool* evict_if_full, std::string* object_data,
+ std::string* metadata, std::string* digest) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ *evict_if_full = message->evict_if_full();
+ *object_data = message->data()->str();
+ *metadata = message->metadata()->str();
+ ARROW_CHECK(message->digest()->size() == kDigestSize);
+ digest->assign(message->digest()->data(), kDigestSize);
+ return Status::OK();
+}
+
+Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& object_ids,
+ bool evict_if_full,
+ const std::vector<std::string>& data,
+ const std::vector<std::string>& metadata,
+ const std::vector<std::string>& digests) {
+ flatbuffers::FlatBufferBuilder fbb;
+
+ auto message = fb::CreatePlasmaCreateAndSealBatchRequest(
+ fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size()), evict_if_full,
+ ToFlatbuffer(&fbb, data), ToFlatbuffer(&fbb, metadata),
+ ToFlatbuffer(&fbb, digests));
+
+ return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchRequest, &fbb, message);
+}
+
+Status ReadCreateAndSealBatchRequest(const uint8_t* data, size_t size,
+ std::vector<ObjectID>* object_ids,
+ bool* evict_if_full,
+ std::vector<std::string>* object_data,
+ std::vector<std::string>* metadata,
+ std::vector<std::string>* digests) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+
+ *evict_if_full = message->evict_if_full();
+ ConvertToVector(message->object_ids(), object_ids,
+ [](const flatbuffers::String& element) {
+ return ObjectID::from_binary(element.str());
+ });
+
+ ConvertToVector(message->data(), object_data,
+ [](const flatbuffers::String& element) { return element.str(); });
+
+ ConvertToVector(message->metadata(), metadata,
+ [](const flatbuffers::String& element) { return element.str(); });
+
+ ConvertToVector(message->digest(), digests,
+ [](const flatbuffers::String& element) { return element.str(); });
+
+ return Status::OK();
+}
+
+Status SendCreateAndSealReply(int sock, PlasmaError error) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaCreateAndSealReply(fbb, static_cast<PlasmaError>(error));
+ return PlasmaSend(sock, MessageType::PlasmaCreateAndSealReply, &fbb, message);
+}
+
+Status ReadCreateAndSealReply(const uint8_t* data, size_t size) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ return PlasmaErrorStatus(message->error());
+}
+
+Status SendCreateAndSealBatchReply(int sock, PlasmaError error) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message =
+ fb::CreatePlasmaCreateAndSealBatchReply(fbb, static_cast<PlasmaError>(error));
+ return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchReply, &fbb, message);
+}
+
+Status ReadCreateAndSealBatchReply(const uint8_t* data, size_t size) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ return PlasmaErrorStatus(message->error());
+}
+
+Status SendAbortRequest(int sock, ObjectID object_id) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.binary()));
+ return PlasmaSend(sock, MessageType::PlasmaAbortRequest, &fbb, message);
+}
+
+Status ReadAbortRequest(const uint8_t* data, size_t size, ObjectID* object_id) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaAbortRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ return Status::OK();
+}
+
+Status SendAbortReply(int sock, ObjectID object_id) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.binary()));
+ return PlasmaSend(sock, MessageType::PlasmaAbortReply, &fbb, message);
+}
+
+Status ReadAbortReply(const uint8_t* data, size_t size, ObjectID* object_id) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaAbortReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ return Status::OK();
+}
+
+// Seal messages.
+
+Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()),
+ fbb.CreateString(digest));
+ return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message);
+}
+
+Status ReadSealRequest(const uint8_t* data, size_t size, ObjectID* object_id,
+ std::string* digest) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ ARROW_CHECK_EQ(message->digest()->size(), kDigestSize);
+ digest->assign(message->digest()->data(), kDigestSize);
+ return Status::OK();
+}
+
+Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message =
+ fb::CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.binary()), error);
+ return PlasmaSend(sock, MessageType::PlasmaSealReply, &fbb, message);
+}
+
+Status ReadSealReply(const uint8_t* data, size_t size, ObjectID* object_id) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaSealReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ return PlasmaErrorStatus(message->error());
+}
+
+// Release messages.
+
+Status SendReleaseRequest(int sock, ObjectID object_id) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message =
+ fb::CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
+ return PlasmaSend(sock, MessageType::PlasmaReleaseRequest, &fbb, message);
+}
+
+Status ReadReleaseRequest(const uint8_t* data, size_t size, ObjectID* object_id) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ return Status::OK();
+}
+
+Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message =
+ fb::CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.binary()), error);
+ return PlasmaSend(sock, MessageType::PlasmaReleaseReply, &fbb, message);
+}
+
+Status ReadReleaseReply(const uint8_t* data, size_t size, ObjectID* object_id) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaReleaseReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ return PlasmaErrorStatus(message->error());
+}
+
+// Delete objects messages.
+
+Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaDeleteRequest(
+ fbb, static_cast<int32_t>(object_ids.size()),
+ ToFlatbuffer(&fbb, &object_ids[0], object_ids.size()));
+ return PlasmaSend(sock, MessageType::PlasmaDeleteRequest, &fbb, message);
+}
+
+Status ReadDeleteRequest(const uint8_t* data, size_t size,
+ std::vector<ObjectID>* object_ids) {
+ using fb::PlasmaDeleteRequest;
+
+ DCHECK(data);
+ DCHECK(object_ids);
+ auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ ToVector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) {
+ return ObjectID::from_binary(request.object_ids()->Get(i)->str());
+ });
+ return Status::OK();
+}
+
+Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
+ const std::vector<PlasmaError>& errors) {
+ DCHECK(object_ids.size() == errors.size());
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaDeleteReply(
+ fbb, static_cast<int32_t>(object_ids.size()),
+ ToFlatbuffer(&fbb, &object_ids[0], object_ids.size()),
+ fbb.CreateVector(
+ arrow::util::MakeNonNull(reinterpret_cast<const int32_t*>(errors.data())),
+ object_ids.size()));
+ return PlasmaSend(sock, MessageType::PlasmaDeleteReply, &fbb, message);
+}
+
+Status ReadDeleteReply(const uint8_t* data, size_t size,
+ std::vector<ObjectID>* object_ids,
+ std::vector<PlasmaError>* errors) {
+ using fb::PlasmaDeleteReply;
+
+ DCHECK(data);
+ DCHECK(object_ids);
+ DCHECK(errors);
+ auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ ToVector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) {
+ return ObjectID::from_binary(request.object_ids()->Get(i)->str());
+ });
+ ToVector(*message, errors, [](const PlasmaDeleteReply& request, int i) {
+ return static_cast<PlasmaError>(request.errors()->Get(i));
+ });
+ return Status::OK();
+}
+
+// Contains messages.
+
+Status SendContainsRequest(int sock, ObjectID object_id) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message =
+ fb::CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary()));
+ return PlasmaSend(sock, MessageType::PlasmaContainsRequest, &fbb, message);
+}
+
+Status ReadContainsRequest(const uint8_t* data, size_t size, ObjectID* object_id) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaContainsRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ return Status::OK();
+}
+
+Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()),
+ has_object);
+ return PlasmaSend(sock, MessageType::PlasmaContainsReply, &fbb, message);
+}
+
+Status ReadContainsReply(const uint8_t* data, size_t size, ObjectID* object_id,
+ bool* has_object) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaContainsReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ *has_object = message->has_object();
+ return Status::OK();
+}
+
+// List messages.
+
+Status SendListRequest(int sock) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaListRequest(fbb);
+ return PlasmaSend(sock, MessageType::PlasmaListRequest, &fbb, message);
+}
+
+Status ReadListRequest(const uint8_t* data, size_t size) { return Status::OK(); }
+
+Status SendListReply(int sock, const ObjectTable& objects) {
+ flatbuffers::FlatBufferBuilder fbb;
+ std::vector<flatbuffers::Offset<fb::ObjectInfo>> object_infos;
+ for (auto const& entry : objects) {
+ auto digest = entry.second->state == ObjectState::PLASMA_CREATED
+ ? fbb.CreateString("")
+ : fbb.CreateString(reinterpret_cast<char*>(entry.second->digest),
+ kDigestSize);
+ auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.binary()),
+ entry.second->data_size, entry.second->metadata_size,
+ entry.second->ref_count, entry.second->create_time,
+ entry.second->construct_duration, digest);
+ object_infos.push_back(info);
+ }
+ auto message = fb::CreatePlasmaListReply(
+ fbb, fbb.CreateVector(arrow::util::MakeNonNull(object_infos.data()),
+ object_infos.size()));
+ return PlasmaSend(sock, MessageType::PlasmaListReply, &fbb, message);
+}
+
+Status ReadListReply(const uint8_t* data, size_t size, ObjectTable* objects) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ for (auto const object : *message->objects()) {
+ ObjectID object_id = ObjectID::from_binary(object->object_id()->str());
+ auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+ entry->data_size = object->data_size();
+ entry->metadata_size = object->metadata_size();
+ entry->ref_count = object->ref_count();
+ entry->create_time = object->create_time();
+ entry->construct_duration = object->construct_duration();
+ entry->state = object->digest()->size() == 0 ? ObjectState::PLASMA_CREATED
+ : ObjectState::PLASMA_SEALED;
+ (*objects)[object_id] = std::move(entry);
+ }
+ return Status::OK();
+}
+
+// Connect messages.
+
+Status SendConnectRequest(int sock) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaConnectRequest(fbb);
+ return PlasmaSend(sock, MessageType::PlasmaConnectRequest, &fbb, message);
+}
+
+Status ReadConnectRequest(const uint8_t* data) { return Status::OK(); }
+
+Status SendConnectReply(int sock, int64_t memory_capacity) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaConnectReply(fbb, memory_capacity);
+ return PlasmaSend(sock, MessageType::PlasmaConnectReply, &fbb, message);
+}
+
+Status ReadConnectReply(const uint8_t* data, size_t size, int64_t* memory_capacity) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaConnectReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *memory_capacity = message->memory_capacity();
+ return Status::OK();
+}
+
+// Evict messages.
+
+Status SendEvictRequest(int sock, int64_t num_bytes) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaEvictRequest(fbb, num_bytes);
+ return PlasmaSend(sock, MessageType::PlasmaEvictRequest, &fbb, message);
+}
+
+Status ReadEvictRequest(const uint8_t* data, size_t size, int64_t* num_bytes) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaEvictRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *num_bytes = message->num_bytes();
+ return Status::OK();
+}
+
+Status SendEvictReply(int sock, int64_t num_bytes) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaEvictReply(fbb, num_bytes);
+ return PlasmaSend(sock, MessageType::PlasmaEvictReply, &fbb, message);
+}
+
+Status ReadEvictReply(const uint8_t* data, size_t size, int64_t& num_bytes) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaEvictReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ num_bytes = message->num_bytes();
+ return Status::OK();
+}
+
+// Get messages.
+
+Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
+ int64_t timeout_ms) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaGetRequest(
+ fbb, ToFlatbuffer(&fbb, object_ids, num_objects), timeout_ms);
+ return PlasmaSend(sock, MessageType::PlasmaGetRequest, &fbb, message);
+}
+
+Status ReadGetRequest(const uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
+ int64_t* timeout_ms) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaGetRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
+ auto object_id = message->object_ids()->Get(i)->str();
+ object_ids.push_back(ObjectID::from_binary(object_id));
+ }
+ *timeout_ms = message->timeout_ms();
+ return Status::OK();
+}
+
+Status SendGetReply(int sock, ObjectID object_ids[],
+ std::unordered_map<ObjectID, PlasmaObject>& plasma_objects,
+ int64_t num_objects, const std::vector<int>& store_fds,
+ const std::vector<int64_t>& mmap_sizes) {
+ flatbuffers::FlatBufferBuilder fbb;
+ std::vector<PlasmaObjectSpec> objects;
+
+ std::vector<flatbuffers::Offset<fb::CudaHandle>> handles;
+ for (int64_t i = 0; i < num_objects; ++i) {
+ const PlasmaObject& object = plasma_objects[object_ids[i]];
+ objects.push_back(PlasmaObjectSpec(object.store_fd, object.data_offset,
+ object.data_size, object.metadata_offset,
+ object.metadata_size, object.device_num));
+#ifdef PLASMA_CUDA
+ if (object.device_num != 0) {
+ std::shared_ptr<arrow::Buffer> handle;
+ ARROW_ASSIGN_OR_RAISE(handle, object.ipc_handle->Serialize());
+ handles.push_back(
+ fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size())));
+ }
+#endif
+ }
+ auto message = fb::CreatePlasmaGetReply(
+ fbb, ToFlatbuffer(&fbb, object_ids, num_objects),
+ fbb.CreateVectorOfStructs(arrow::util::MakeNonNull(objects.data()), num_objects),
+ fbb.CreateVector(arrow::util::MakeNonNull(store_fds.data()), store_fds.size()),
+ fbb.CreateVector(arrow::util::MakeNonNull(mmap_sizes.data()), mmap_sizes.size()),
+ fbb.CreateVector(arrow::util::MakeNonNull(handles.data()), handles.size()));
+ return PlasmaSend(sock, MessageType::PlasmaGetReply, &fbb, message);
+}
+
+Status ReadGetReply(const uint8_t* data, size_t size, ObjectID object_ids[],
+ PlasmaObject plasma_objects[], int64_t num_objects,
+ std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaGetReply>(data);
+#ifdef PLASMA_CUDA
+ int handle_pos = 0;
+#endif
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ for (uoffset_t i = 0; i < num_objects; ++i) {
+ object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
+ }
+ for (uoffset_t i = 0; i < num_objects; ++i) {
+ const PlasmaObjectSpec* object = message->plasma_objects()->Get(i);
+ plasma_objects[i].store_fd = object->segment_index();
+ plasma_objects[i].data_offset = object->data_offset();
+ plasma_objects[i].data_size = object->data_size();
+ plasma_objects[i].metadata_offset = object->metadata_offset();
+ plasma_objects[i].metadata_size = object->metadata_size();
+ plasma_objects[i].device_num = object->device_num();
+#ifdef PLASMA_CUDA
+ if (object->device_num() != 0) {
+ const void* ipc_handle = message->handles()->Get(handle_pos)->handle()->data();
+ ARROW_ASSIGN_OR_RAISE(plasma_objects[i].ipc_handle,
+ CudaIpcMemHandle::FromBuffer(ipc_handle));
+ handle_pos++;
+ }
+#endif
+ }
+ ARROW_CHECK(message->store_fds()->size() == message->mmap_sizes()->size());
+ for (uoffset_t i = 0; i < message->store_fds()->size(); i++) {
+ store_fds.push_back(message->store_fds()->Get(i));
+ mmap_sizes.push_back(message->mmap_sizes()->Get(i));
+ }
+ return Status::OK();
+}
+
+// Subscribe messages.
+
+Status SendSubscribeRequest(int sock) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaSubscribeRequest(fbb);
+ return PlasmaSend(sock, MessageType::PlasmaSubscribeRequest, &fbb, message);
+}
+
+// Data messages.
+
+Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto addr = fbb.CreateString(address, strlen(address));
+ auto message =
+ fb::CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port);
+ return PlasmaSend(sock, MessageType::PlasmaDataRequest, &fbb, message);
+}
+
+Status ReadDataRequest(const uint8_t* data, size_t size, ObjectID* object_id,
+ char** address, int* port) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaDataRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ DCHECK(message->object_id()->size() == sizeof(ObjectID));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ *address = strdup(message->address()->c_str());
+ *port = message->port();
+ return Status::OK();
+}
+
+Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
+ int64_t metadata_size) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.binary()),
+ object_size, metadata_size);
+ return PlasmaSend(sock, MessageType::PlasmaDataReply, &fbb, message);
+}
+
+Status ReadDataReply(const uint8_t* data, size_t size, ObjectID* object_id,
+ int64_t* object_size, int64_t* metadata_size) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaDataReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ *object_id = ObjectID::from_binary(message->object_id()->str());
+ *object_size = static_cast<int64_t>(message->object_size());
+ *metadata_size = static_cast<int64_t>(message->metadata_size());
+ return Status::OK();
+}
+
+// RefreshLRU messages.
+
+Status SendRefreshLRURequest(int sock, const std::vector<ObjectID>& object_ids) {
+ flatbuffers::FlatBufferBuilder fbb;
+
+ auto message = fb::CreatePlasmaRefreshLRURequest(
+ fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size()));
+
+ return PlasmaSend(sock, MessageType::PlasmaRefreshLRURequest, &fbb, message);
+}
+
+Status ReadRefreshLRURequest(const uint8_t* data, size_t size,
+ std::vector<ObjectID>* object_ids) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaRefreshLRURequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
+ auto object_id = message->object_ids()->Get(i)->str();
+ object_ids->push_back(ObjectID::from_binary(object_id));
+ }
+ return Status::OK();
+}
+
+Status SendRefreshLRUReply(int sock) {
+ flatbuffers::FlatBufferBuilder fbb;
+ auto message = fb::CreatePlasmaRefreshLRUReply(fbb);
+ return PlasmaSend(sock, MessageType::PlasmaRefreshLRUReply, &fbb, message);
+}
+
+Status ReadRefreshLRUReply(const uint8_t* data, size_t size) {
+ DCHECK(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaRefreshLRUReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ return Status::OK();
+}
+
+} // namespace plasma