diff options
Diffstat (limited to 'src/arrow/cpp/src/plasma/protocol.cc')
-rw-r--r-- | src/arrow/cpp/src/plasma/protocol.cc | 829 |
1 files changed, 829 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/plasma/protocol.cc b/src/arrow/cpp/src/plasma/protocol.cc new file mode 100644 index 000000000..735636cda --- /dev/null +++ b/src/arrow/cpp/src/plasma/protocol.cc @@ -0,0 +1,829 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/protocol.h" + +#include <utility> + +#include "flatbuffers/flatbuffers.h" +#include "plasma/common.h" +#include "plasma/io.h" +#include "plasma/plasma_generated.h" + +#ifdef PLASMA_CUDA +#include "arrow/gpu/cuda_api.h" +#endif +#include "arrow/util/ubsan.h" + +namespace fb = plasma::flatbuf; + +namespace plasma { + +using fb::MessageType; +using fb::PlasmaError; +using fb::PlasmaObjectSpec; + +using flatbuffers::uoffset_t; + +#define PLASMA_CHECK_ENUM(x, y) \ + static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch") + +flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> +ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids, + int64_t num_objects) { + std::vector<flatbuffers::Offset<flatbuffers::String>> results; + for (int64_t i = 0; i < num_objects; i++) { + results.push_back(fbb->CreateString(object_ids[i].binary())); + } + return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size()); +} + +flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>> +ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, + const std::vector<std::string>& strings) { + std::vector<flatbuffers::Offset<flatbuffers::String>> results; + for (size_t i = 0; i < strings.size(); i++) { + results.push_back(fbb->CreateString(strings[i])); + } + + return fbb->CreateVector(arrow::util::MakeNonNull(results.data()), results.size()); +} + +flatbuffers::Offset<flatbuffers::Vector<int64_t>> ToFlatbuffer( + flatbuffers::FlatBufferBuilder* fbb, const std::vector<int64_t>& data) { + return fbb->CreateVector(arrow::util::MakeNonNull(data.data()), data.size()); +} + +Status PlasmaReceive(int sock, MessageType message_type, std::vector<uint8_t>* buffer) { + MessageType type; + RETURN_NOT_OK(ReadMessage(sock, &type, buffer)); + ARROW_CHECK(type == message_type) + << "type = " << static_cast<int64_t>(type) + << ", message_type = " << static_cast<int64_t>(message_type); + return Status::OK(); +} + +// Helper function to create a vector of elements from Data (Request/Reply struct). +// The Getter function is used to extract one element from Data. +template <typename T, typename Data, typename Getter> +void ToVector(const Data& request, std::vector<T>* out, const Getter& getter) { + int count = request.count(); + out->clear(); + out->reserve(count); + for (int i = 0; i < count; ++i) { + out->push_back(getter(request, i)); + } +} + +template <typename T, typename FlatbufferVectorPointer, typename Converter> +void ConvertToVector(const FlatbufferVectorPointer fbvector, std::vector<T>* out, + const Converter& converter) { + out->clear(); + out->reserve(fbvector->size()); + for (size_t i = 0; i < fbvector->size(); ++i) { + out->push_back(converter(*fbvector->Get(i))); + } +} + +template <typename Message> +Status PlasmaSend(int sock, MessageType message_type, flatbuffers::FlatBufferBuilder* fbb, + const Message& message) { + fbb->Finish(message); + return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer()); +} + +Status PlasmaErrorStatus(fb::PlasmaError plasma_error) { + switch (plasma_error) { + case fb::PlasmaError::OK: + return Status::OK(); + case fb::PlasmaError::ObjectExists: + return MakePlasmaError(PlasmaErrorCode::PlasmaObjectExists, + "object already exists in the plasma store"); + case fb::PlasmaError::ObjectNotFound: + return MakePlasmaError(PlasmaErrorCode::PlasmaObjectNotFound, + "object does not exist in the plasma store"); + case fb::PlasmaError::OutOfMemory: + return MakePlasmaError(PlasmaErrorCode::PlasmaStoreFull, + "object does not fit in the plasma store"); + default: + ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error); + } + return Status::OK(); +} + +// Set options messages. + +Status SendSetOptionsRequest(int sock, const std::string& client_name, + int64_t output_memory_limit) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaSetOptionsRequest(fbb, fbb.CreateString(client_name), + output_memory_limit); + return PlasmaSend(sock, MessageType::PlasmaSetOptionsRequest, &fbb, message); +} + +Status ReadSetOptionsRequest(const uint8_t* data, size_t size, std::string* client_name, + int64_t* output_memory_quota) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaSetOptionsRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *client_name = std::string(message->client_name()->str()); + *output_memory_quota = message->output_memory_quota(); + return Status::OK(); +} + +Status SendSetOptionsReply(int sock, PlasmaError error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaSetOptionsReply(fbb, error); + return PlasmaSend(sock, MessageType::PlasmaSetOptionsReply, &fbb, message); +} + +Status ReadSetOptionsReply(const uint8_t* data, size_t size) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaSetOptionsReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + return PlasmaErrorStatus(message->error()); +} + +// Get debug string messages. + +Status SendGetDebugStringRequest(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaGetDebugStringRequest(fbb); + return PlasmaSend(sock, MessageType::PlasmaGetDebugStringRequest, &fbb, message); +} + +Status SendGetDebugStringReply(int sock, const std::string& debug_string) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaGetDebugStringReply(fbb, fbb.CreateString(debug_string)); + return PlasmaSend(sock, MessageType::PlasmaGetDebugStringReply, &fbb, message); +} + +Status ReadGetDebugStringReply(const uint8_t* data, size_t size, + std::string* debug_string) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaGetDebugStringReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *debug_string = message->debug_string()->str(); + return Status::OK(); +} + +// Create messages. + +Status SendCreateRequest(int sock, ObjectID object_id, bool evict_if_full, + int64_t data_size, int64_t metadata_size, int device_num) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()), + evict_if_full, data_size, metadata_size, device_num); + return PlasmaSend(sock, MessageType::PlasmaCreateRequest, &fbb, message); +} + +Status ReadCreateRequest(const uint8_t* data, size_t size, ObjectID* object_id, + bool* evict_if_full, int64_t* data_size, int64_t* metadata_size, + int* device_num) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *evict_if_full = message->evict_if_full(); + *data_size = message->data_size(); + *metadata_size = message->metadata_size(); + *object_id = ObjectID::from_binary(message->object_id()->str()); + *device_num = message->device_num(); + return Status::OK(); +} + +Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, + PlasmaError error_code, int64_t mmap_size) { + flatbuffers::FlatBufferBuilder fbb; + PlasmaObjectSpec plasma_object(object->store_fd, object->data_offset, object->data_size, + object->metadata_offset, object->metadata_size, + object->device_num); + auto object_string = fbb.CreateString(object_id.binary()); +#ifdef PLASMA_CUDA + flatbuffers::Offset<fb::CudaHandle> ipc_handle; + if (object->device_num != 0) { + std::shared_ptr<arrow::Buffer> handle; + ARROW_ASSIGN_OR_RAISE(handle, object->ipc_handle->Serialize()); + ipc_handle = + fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size())); + } +#endif + fb::PlasmaCreateReplyBuilder crb(fbb); + crb.add_error(static_cast<PlasmaError>(error_code)); + crb.add_plasma_object(&plasma_object); + crb.add_object_id(object_string); + crb.add_store_fd(object->store_fd); + crb.add_mmap_size(mmap_size); + if (object->device_num != 0) { +#ifdef PLASMA_CUDA + crb.add_ipc_handle(ipc_handle); +#else + ARROW_LOG(FATAL) << "This should be unreachable."; +#endif + } + auto message = crb.Finish(); + return PlasmaSend(sock, MessageType::PlasmaCreateReply, &fbb, message); +} + +Status ReadCreateReply(const uint8_t* data, size_t size, ObjectID* object_id, + PlasmaObject* object, int* store_fd, int64_t* mmap_size) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaCreateReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + object->store_fd = message->plasma_object()->segment_index(); + object->data_offset = message->plasma_object()->data_offset(); + object->data_size = message->plasma_object()->data_size(); + object->metadata_offset = message->plasma_object()->metadata_offset(); + object->metadata_size = message->plasma_object()->metadata_size(); + + *store_fd = message->store_fd(); + *mmap_size = message->mmap_size(); + + object->device_num = message->plasma_object()->device_num(); +#ifdef PLASMA_CUDA + if (object->device_num != 0) { + ARROW_ASSIGN_OR_RAISE( + object->ipc_handle, + CudaIpcMemHandle::FromBuffer(message->ipc_handle()->handle()->data())); + } +#endif + return PlasmaErrorStatus(message->error()); +} + +Status SendCreateAndSealRequest(int sock, const ObjectID& object_id, bool evict_if_full, + const std::string& data, const std::string& metadata, + unsigned char* digest) { + flatbuffers::FlatBufferBuilder fbb; + auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize); + auto message = fb::CreatePlasmaCreateAndSealRequest( + fbb, fbb.CreateString(object_id.binary()), evict_if_full, fbb.CreateString(data), + fbb.CreateString(metadata), digest_string); + return PlasmaSend(sock, MessageType::PlasmaCreateAndSealRequest, &fbb, message); +} + +Status ReadCreateAndSealRequest(const uint8_t* data, size_t size, ObjectID* object_id, + bool* evict_if_full, std::string* object_data, + std::string* metadata, std::string* digest) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + + *object_id = ObjectID::from_binary(message->object_id()->str()); + *evict_if_full = message->evict_if_full(); + *object_data = message->data()->str(); + *metadata = message->metadata()->str(); + ARROW_CHECK(message->digest()->size() == kDigestSize); + digest->assign(message->digest()->data(), kDigestSize); + return Status::OK(); +} + +Status SendCreateAndSealBatchRequest(int sock, const std::vector<ObjectID>& object_ids, + bool evict_if_full, + const std::vector<std::string>& data, + const std::vector<std::string>& metadata, + const std::vector<std::string>& digests) { + flatbuffers::FlatBufferBuilder fbb; + + auto message = fb::CreatePlasmaCreateAndSealBatchRequest( + fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size()), evict_if_full, + ToFlatbuffer(&fbb, data), ToFlatbuffer(&fbb, metadata), + ToFlatbuffer(&fbb, digests)); + + return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchRequest, &fbb, message); +} + +Status ReadCreateAndSealBatchRequest(const uint8_t* data, size_t size, + std::vector<ObjectID>* object_ids, + bool* evict_if_full, + std::vector<std::string>* object_data, + std::vector<std::string>* metadata, + std::vector<std::string>* digests) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + + *evict_if_full = message->evict_if_full(); + ConvertToVector(message->object_ids(), object_ids, + [](const flatbuffers::String& element) { + return ObjectID::from_binary(element.str()); + }); + + ConvertToVector(message->data(), object_data, + [](const flatbuffers::String& element) { return element.str(); }); + + ConvertToVector(message->metadata(), metadata, + [](const flatbuffers::String& element) { return element.str(); }); + + ConvertToVector(message->digest(), digests, + [](const flatbuffers::String& element) { return element.str(); }); + + return Status::OK(); +} + +Status SendCreateAndSealReply(int sock, PlasmaError error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaCreateAndSealReply(fbb, static_cast<PlasmaError>(error)); + return PlasmaSend(sock, MessageType::PlasmaCreateAndSealReply, &fbb, message); +} + +Status ReadCreateAndSealReply(const uint8_t* data, size_t size) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + return PlasmaErrorStatus(message->error()); +} + +Status SendCreateAndSealBatchReply(int sock, PlasmaError error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + fb::CreatePlasmaCreateAndSealBatchReply(fbb, static_cast<PlasmaError>(error)); + return PlasmaSend(sock, MessageType::PlasmaCreateAndSealBatchReply, &fbb, message); +} + +Status ReadCreateAndSealBatchReply(const uint8_t* data, size_t size) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaCreateAndSealBatchReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + return PlasmaErrorStatus(message->error()); +} + +Status SendAbortRequest(int sock, ObjectID object_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.binary())); + return PlasmaSend(sock, MessageType::PlasmaAbortRequest, &fbb, message); +} + +Status ReadAbortRequest(const uint8_t* data, size_t size, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaAbortRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return Status::OK(); +} + +Status SendAbortReply(int sock, ObjectID object_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.binary())); + return PlasmaSend(sock, MessageType::PlasmaAbortReply, &fbb, message); +} + +Status ReadAbortReply(const uint8_t* data, size_t size, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaAbortReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return Status::OK(); +} + +// Seal messages. + +Status SendSealRequest(int sock, ObjectID object_id, const std::string& digest) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), + fbb.CreateString(digest)); + return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message); +} + +Status ReadSealRequest(const uint8_t* data, size_t size, ObjectID* object_id, + std::string* digest) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + ARROW_CHECK_EQ(message->digest()->size(), kDigestSize); + digest->assign(message->digest()->data(), kDigestSize); + return Status::OK(); +} + +Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + fb::CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.binary()), error); + return PlasmaSend(sock, MessageType::PlasmaSealReply, &fbb, message); +} + +Status ReadSealReply(const uint8_t* data, size_t size, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaSealReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return PlasmaErrorStatus(message->error()); +} + +// Release messages. + +Status SendReleaseRequest(int sock, ObjectID object_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + fb::CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary())); + return PlasmaSend(sock, MessageType::PlasmaReleaseRequest, &fbb, message); +} + +Status ReadReleaseRequest(const uint8_t* data, size_t size, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return Status::OK(); +} + +Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + fb::CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.binary()), error); + return PlasmaSend(sock, MessageType::PlasmaReleaseReply, &fbb, message); +} + +Status ReadReleaseReply(const uint8_t* data, size_t size, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaReleaseReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return PlasmaErrorStatus(message->error()); +} + +// Delete objects messages. + +Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaDeleteRequest( + fbb, static_cast<int32_t>(object_ids.size()), + ToFlatbuffer(&fbb, &object_ids[0], object_ids.size())); + return PlasmaSend(sock, MessageType::PlasmaDeleteRequest, &fbb, message); +} + +Status ReadDeleteRequest(const uint8_t* data, size_t size, + std::vector<ObjectID>* object_ids) { + using fb::PlasmaDeleteRequest; + + DCHECK(data); + DCHECK(object_ids); + auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + ToVector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) { + return ObjectID::from_binary(request.object_ids()->Get(i)->str()); + }); + return Status::OK(); +} + +Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids, + const std::vector<PlasmaError>& errors) { + DCHECK(object_ids.size() == errors.size()); + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaDeleteReply( + fbb, static_cast<int32_t>(object_ids.size()), + ToFlatbuffer(&fbb, &object_ids[0], object_ids.size()), + fbb.CreateVector( + arrow::util::MakeNonNull(reinterpret_cast<const int32_t*>(errors.data())), + object_ids.size())); + return PlasmaSend(sock, MessageType::PlasmaDeleteReply, &fbb, message); +} + +Status ReadDeleteReply(const uint8_t* data, size_t size, + std::vector<ObjectID>* object_ids, + std::vector<PlasmaError>* errors) { + using fb::PlasmaDeleteReply; + + DCHECK(data); + DCHECK(object_ids); + DCHECK(errors); + auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + ToVector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) { + return ObjectID::from_binary(request.object_ids()->Get(i)->str()); + }); + ToVector(*message, errors, [](const PlasmaDeleteReply& request, int i) { + return static_cast<PlasmaError>(request.errors()->Get(i)); + }); + return Status::OK(); +} + +// Contains messages. + +Status SendContainsRequest(int sock, ObjectID object_id) { + flatbuffers::FlatBufferBuilder fbb; + auto message = + fb::CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary())); + return PlasmaSend(sock, MessageType::PlasmaContainsRequest, &fbb, message); +} + +Status ReadContainsRequest(const uint8_t* data, size_t size, ObjectID* object_id) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaContainsRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + return Status::OK(); +} + +Status SendContainsReply(int sock, ObjectID object_id, bool has_object) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), + has_object); + return PlasmaSend(sock, MessageType::PlasmaContainsReply, &fbb, message); +} + +Status ReadContainsReply(const uint8_t* data, size_t size, ObjectID* object_id, + bool* has_object) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaContainsReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + *has_object = message->has_object(); + return Status::OK(); +} + +// List messages. + +Status SendListRequest(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaListRequest(fbb); + return PlasmaSend(sock, MessageType::PlasmaListRequest, &fbb, message); +} + +Status ReadListRequest(const uint8_t* data, size_t size) { return Status::OK(); } + +Status SendListReply(int sock, const ObjectTable& objects) { + flatbuffers::FlatBufferBuilder fbb; + std::vector<flatbuffers::Offset<fb::ObjectInfo>> object_infos; + for (auto const& entry : objects) { + auto digest = entry.second->state == ObjectState::PLASMA_CREATED + ? fbb.CreateString("") + : fbb.CreateString(reinterpret_cast<char*>(entry.second->digest), + kDigestSize); + auto info = fb::CreateObjectInfo(fbb, fbb.CreateString(entry.first.binary()), + entry.second->data_size, entry.second->metadata_size, + entry.second->ref_count, entry.second->create_time, + entry.second->construct_duration, digest); + object_infos.push_back(info); + } + auto message = fb::CreatePlasmaListReply( + fbb, fbb.CreateVector(arrow::util::MakeNonNull(object_infos.data()), + object_infos.size())); + return PlasmaSend(sock, MessageType::PlasmaListReply, &fbb, message); +} + +Status ReadListReply(const uint8_t* data, size_t size, ObjectTable* objects) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaListReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + for (auto const object : *message->objects()) { + ObjectID object_id = ObjectID::from_binary(object->object_id()->str()); + auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry()); + entry->data_size = object->data_size(); + entry->metadata_size = object->metadata_size(); + entry->ref_count = object->ref_count(); + entry->create_time = object->create_time(); + entry->construct_duration = object->construct_duration(); + entry->state = object->digest()->size() == 0 ? ObjectState::PLASMA_CREATED + : ObjectState::PLASMA_SEALED; + (*objects)[object_id] = std::move(entry); + } + return Status::OK(); +} + +// Connect messages. + +Status SendConnectRequest(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaConnectRequest(fbb); + return PlasmaSend(sock, MessageType::PlasmaConnectRequest, &fbb, message); +} + +Status ReadConnectRequest(const uint8_t* data) { return Status::OK(); } + +Status SendConnectReply(int sock, int64_t memory_capacity) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaConnectReply(fbb, memory_capacity); + return PlasmaSend(sock, MessageType::PlasmaConnectReply, &fbb, message); +} + +Status ReadConnectReply(const uint8_t* data, size_t size, int64_t* memory_capacity) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaConnectReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *memory_capacity = message->memory_capacity(); + return Status::OK(); +} + +// Evict messages. + +Status SendEvictRequest(int sock, int64_t num_bytes) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaEvictRequest(fbb, num_bytes); + return PlasmaSend(sock, MessageType::PlasmaEvictRequest, &fbb, message); +} + +Status ReadEvictRequest(const uint8_t* data, size_t size, int64_t* num_bytes) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaEvictRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *num_bytes = message->num_bytes(); + return Status::OK(); +} + +Status SendEvictReply(int sock, int64_t num_bytes) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaEvictReply(fbb, num_bytes); + return PlasmaSend(sock, MessageType::PlasmaEvictReply, &fbb, message); +} + +Status ReadEvictReply(const uint8_t* data, size_t size, int64_t& num_bytes) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaEvictReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + num_bytes = message->num_bytes(); + return Status::OK(); +} + +// Get messages. + +Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects, + int64_t timeout_ms) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaGetRequest( + fbb, ToFlatbuffer(&fbb, object_ids, num_objects), timeout_ms); + return PlasmaSend(sock, MessageType::PlasmaGetRequest, &fbb, message); +} + +Status ReadGetRequest(const uint8_t* data, size_t size, std::vector<ObjectID>& object_ids, + int64_t* timeout_ms) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaGetRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { + auto object_id = message->object_ids()->Get(i)->str(); + object_ids.push_back(ObjectID::from_binary(object_id)); + } + *timeout_ms = message->timeout_ms(); + return Status::OK(); +} + +Status SendGetReply(int sock, ObjectID object_ids[], + std::unordered_map<ObjectID, PlasmaObject>& plasma_objects, + int64_t num_objects, const std::vector<int>& store_fds, + const std::vector<int64_t>& mmap_sizes) { + flatbuffers::FlatBufferBuilder fbb; + std::vector<PlasmaObjectSpec> objects; + + std::vector<flatbuffers::Offset<fb::CudaHandle>> handles; + for (int64_t i = 0; i < num_objects; ++i) { + const PlasmaObject& object = plasma_objects[object_ids[i]]; + objects.push_back(PlasmaObjectSpec(object.store_fd, object.data_offset, + object.data_size, object.metadata_offset, + object.metadata_size, object.device_num)); +#ifdef PLASMA_CUDA + if (object.device_num != 0) { + std::shared_ptr<arrow::Buffer> handle; + ARROW_ASSIGN_OR_RAISE(handle, object.ipc_handle->Serialize()); + handles.push_back( + fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size()))); + } +#endif + } + auto message = fb::CreatePlasmaGetReply( + fbb, ToFlatbuffer(&fbb, object_ids, num_objects), + fbb.CreateVectorOfStructs(arrow::util::MakeNonNull(objects.data()), num_objects), + fbb.CreateVector(arrow::util::MakeNonNull(store_fds.data()), store_fds.size()), + fbb.CreateVector(arrow::util::MakeNonNull(mmap_sizes.data()), mmap_sizes.size()), + fbb.CreateVector(arrow::util::MakeNonNull(handles.data()), handles.size())); + return PlasmaSend(sock, MessageType::PlasmaGetReply, &fbb, message); +} + +Status ReadGetReply(const uint8_t* data, size_t size, ObjectID object_ids[], + PlasmaObject plasma_objects[], int64_t num_objects, + std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaGetReply>(data); +#ifdef PLASMA_CUDA + int handle_pos = 0; +#endif + DCHECK(VerifyFlatbuffer(message, data, size)); + for (uoffset_t i = 0; i < num_objects; ++i) { + object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str()); + } + for (uoffset_t i = 0; i < num_objects; ++i) { + const PlasmaObjectSpec* object = message->plasma_objects()->Get(i); + plasma_objects[i].store_fd = object->segment_index(); + plasma_objects[i].data_offset = object->data_offset(); + plasma_objects[i].data_size = object->data_size(); + plasma_objects[i].metadata_offset = object->metadata_offset(); + plasma_objects[i].metadata_size = object->metadata_size(); + plasma_objects[i].device_num = object->device_num(); +#ifdef PLASMA_CUDA + if (object->device_num() != 0) { + const void* ipc_handle = message->handles()->Get(handle_pos)->handle()->data(); + ARROW_ASSIGN_OR_RAISE(plasma_objects[i].ipc_handle, + CudaIpcMemHandle::FromBuffer(ipc_handle)); + handle_pos++; + } +#endif + } + ARROW_CHECK(message->store_fds()->size() == message->mmap_sizes()->size()); + for (uoffset_t i = 0; i < message->store_fds()->size(); i++) { + store_fds.push_back(message->store_fds()->Get(i)); + mmap_sizes.push_back(message->mmap_sizes()->Get(i)); + } + return Status::OK(); +} + +// Subscribe messages. + +Status SendSubscribeRequest(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaSubscribeRequest(fbb); + return PlasmaSend(sock, MessageType::PlasmaSubscribeRequest, &fbb, message); +} + +// Data messages. + +Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port) { + flatbuffers::FlatBufferBuilder fbb; + auto addr = fbb.CreateString(address, strlen(address)); + auto message = + fb::CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port); + return PlasmaSend(sock, MessageType::PlasmaDataRequest, &fbb, message); +} + +Status ReadDataRequest(const uint8_t* data, size_t size, ObjectID* object_id, + char** address, int* port) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaDataRequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + DCHECK(message->object_id()->size() == sizeof(ObjectID)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + *address = strdup(message->address()->c_str()); + *port = message->port(); + return Status::OK(); +} + +Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, + int64_t metadata_size) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.binary()), + object_size, metadata_size); + return PlasmaSend(sock, MessageType::PlasmaDataReply, &fbb, message); +} + +Status ReadDataReply(const uint8_t* data, size_t size, ObjectID* object_id, + int64_t* object_size, int64_t* metadata_size) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaDataReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *object_id = ObjectID::from_binary(message->object_id()->str()); + *object_size = static_cast<int64_t>(message->object_size()); + *metadata_size = static_cast<int64_t>(message->metadata_size()); + return Status::OK(); +} + +// RefreshLRU messages. + +Status SendRefreshLRURequest(int sock, const std::vector<ObjectID>& object_ids) { + flatbuffers::FlatBufferBuilder fbb; + + auto message = fb::CreatePlasmaRefreshLRURequest( + fbb, ToFlatbuffer(&fbb, object_ids.data(), object_ids.size())); + + return PlasmaSend(sock, MessageType::PlasmaRefreshLRURequest, &fbb, message); +} + +Status ReadRefreshLRURequest(const uint8_t* data, size_t size, + std::vector<ObjectID>* object_ids) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaRefreshLRURequest>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) { + auto object_id = message->object_ids()->Get(i)->str(); + object_ids->push_back(ObjectID::from_binary(object_id)); + } + return Status::OK(); +} + +Status SendRefreshLRUReply(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaRefreshLRUReply(fbb); + return PlasmaSend(sock, MessageType::PlasmaRefreshLRUReply, &fbb, message); +} + +Status ReadRefreshLRUReply(const uint8_t* data, size_t size) { + DCHECK(data); + auto message = flatbuffers::GetRoot<fb::PlasmaRefreshLRUReply>(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + return Status::OK(); +} + +} // namespace plasma |