diff options
Diffstat (limited to 'src/arrow/cpp/src/plasma/common.cc')
-rw-r--r-- | src/arrow/cpp/src/plasma/common.cc | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/src/arrow/cpp/src/plasma/common.cc b/src/arrow/cpp/src/plasma/common.cc new file mode 100644 index 000000000..e7d2643d7 --- /dev/null +++ b/src/arrow/cpp/src/plasma/common.cc @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/common.h" + +#include <limits> +#include <utility> + +#include "arrow/util/ubsan.h" + +#include "plasma/plasma_generated.h" + +namespace fb = plasma::flatbuf; + +namespace plasma { + +namespace { + +const char kErrorDetailTypeId[] = "plasma::PlasmaStatusDetail"; + +class PlasmaStatusDetail : public arrow::StatusDetail { + public: + explicit PlasmaStatusDetail(PlasmaErrorCode code) : code_(code) {} + const char* type_id() const override { return kErrorDetailTypeId; } + std::string ToString() const override { + const char* type; + switch (code()) { + case PlasmaErrorCode::PlasmaObjectExists: + type = "Plasma object is exists"; + break; + case PlasmaErrorCode::PlasmaObjectNotFound: + type = "Plasma object is not found"; + break; + case PlasmaErrorCode::PlasmaStoreFull: + type = "Plasma store is full"; + break; + case PlasmaErrorCode::PlasmaObjectAlreadySealed: + type = "Plasma object is already sealed"; + break; + default: + type = "Unknown plasma error"; + break; + } + return std::string(type); + } + PlasmaErrorCode code() const { return code_; } + + private: + PlasmaErrorCode code_; +}; + +bool IsPlasmaStatus(const arrow::Status& status, PlasmaErrorCode code) { + if (status.ok()) { + return false; + } + auto* detail = status.detail().get(); + return detail != nullptr && detail->type_id() == kErrorDetailTypeId && + static_cast<PlasmaStatusDetail*>(detail)->code() == code; +} + +} // namespace + +using arrow::Status; + +arrow::Status MakePlasmaError(PlasmaErrorCode code, std::string message) { + arrow::StatusCode arrow_code = arrow::StatusCode::UnknownError; + switch (code) { + case PlasmaErrorCode::PlasmaObjectExists: + arrow_code = arrow::StatusCode::AlreadyExists; + break; + case PlasmaErrorCode::PlasmaObjectNotFound: + arrow_code = arrow::StatusCode::KeyError; + break; + case PlasmaErrorCode::PlasmaStoreFull: + arrow_code = arrow::StatusCode::CapacityError; + break; + case PlasmaErrorCode::PlasmaObjectAlreadySealed: + // Maybe a stretch? + arrow_code = arrow::StatusCode::TypeError; + break; + } + return arrow::Status(arrow_code, std::move(message), + std::make_shared<PlasmaStatusDetail>(code)); +} + +bool IsPlasmaObjectExists(const arrow::Status& status) { + return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectExists); +} +bool IsPlasmaObjectNotFound(const arrow::Status& status) { + return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectNotFound); +} +bool IsPlasmaObjectAlreadySealed(const arrow::Status& status) { + return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaObjectAlreadySealed); +} +bool IsPlasmaStoreFull(const arrow::Status& status) { + return IsPlasmaStatus(status, PlasmaErrorCode::PlasmaStoreFull); +} + +UniqueID UniqueID::from_binary(const std::string& binary) { + UniqueID id; + std::memcpy(&id, binary.data(), sizeof(id)); + return id; +} + +const uint8_t* UniqueID::data() const { return id_; } + +uint8_t* UniqueID::mutable_data() { return id_; } + +std::string UniqueID::binary() const { + return std::string(reinterpret_cast<const char*>(id_), kUniqueIDSize); +} + +std::string UniqueID::hex() const { + constexpr char hex[] = "0123456789abcdef"; + std::string result; + for (int i = 0; i < kUniqueIDSize; i++) { + unsigned int val = id_[i]; + result.push_back(hex[val >> 4]); + result.push_back(hex[val & 0xf]); + } + return result; +} + +// This code is from https://sites.google.com/site/murmurhash/ +// and is public domain. +uint64_t MurmurHash64A(const void* key, int len, unsigned int seed) { + const uint64_t m = 0xc6a4a7935bd1e995; + const int r = 47; + + uint64_t h = seed ^ (len * m); + + const uint64_t* data = reinterpret_cast<const uint64_t*>(key); + const uint64_t* end = data + (len / 8); + + while (data != end) { + uint64_t k = arrow::util::SafeLoad(data++); + + k *= m; + k ^= k >> r; + k *= m; + + h ^= k; + h *= m; + } + + const unsigned char* data2 = reinterpret_cast<const unsigned char*>(data); + + switch (len & 7) { + case 7: + h ^= uint64_t(data2[6]) << 48; // fall through + case 6: + h ^= uint64_t(data2[5]) << 40; // fall through + case 5: + h ^= uint64_t(data2[4]) << 32; // fall through + case 4: + h ^= uint64_t(data2[3]) << 24; // fall through + case 3: + h ^= uint64_t(data2[2]) << 16; // fall through + case 2: + h ^= uint64_t(data2[1]) << 8; // fall through + case 1: + h ^= uint64_t(data2[0]); + h *= m; + } + + h ^= h >> r; + h *= m; + h ^= h >> r; + + return h; +} + +size_t UniqueID::hash() const { return MurmurHash64A(&id_[0], kUniqueIDSize, 0); } + +bool UniqueID::operator==(const UniqueID& rhs) const { + return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0; +} + +const PlasmaStoreInfo* plasma_config; + +} // namespace plasma |