diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/_plasma.pyx | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/_plasma.pyx')
-rw-r--r-- | src/arrow/python/pyarrow/_plasma.pyx | 867 |
1 files changed, 867 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/_plasma.pyx b/src/arrow/python/pyarrow/_plasma.pyx new file mode 100644 index 000000000..e38c81f80 --- /dev/null +++ b/src/arrow/python/pyarrow/_plasma.pyx @@ -0,0 +1,867 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: language_level = 3 + +from libcpp cimport bool as c_bool, nullptr +from libcpp.memory cimport shared_ptr, unique_ptr, make_shared +from libcpp.string cimport string as c_string +from libcpp.vector cimport vector as c_vector +from libcpp.unordered_map cimport unordered_map +from libc.stdint cimport int64_t, uint8_t, uintptr_t +from cython.operator cimport dereference as deref, preincrement as inc +from cpython.pycapsule cimport * + +from collections.abc import Sequence +import random +import socket +import warnings + +import pyarrow +from pyarrow.lib cimport (Buffer, NativeFile, _Weakrefable, + check_status, pyarrow_wrap_buffer) +from pyarrow.lib import ArrowException, frombytes +from pyarrow.includes.libarrow cimport (CBuffer, CMutableBuffer, + CFixedSizeBufferWriter, CStatus) +from pyarrow.includes.libplasma cimport * + +PLASMA_WAIT_TIMEOUT = 2 ** 30 + + +cdef extern from "plasma/common.h" nogil: + cdef cppclass CCudaIpcPlaceholder" plasma::internal::CudaIpcPlaceholder": + pass + + cdef cppclass CUniqueID" plasma::UniqueID": + + @staticmethod + CUniqueID from_binary(const c_string& binary) + + @staticmethod + CUniqueID from_random() + + c_bool operator==(const CUniqueID& rhs) const + + c_string hex() const + + c_string binary() const + + @staticmethod + int64_t size() + + cdef enum CObjectState" plasma::ObjectState": + PLASMA_CREATED" plasma::ObjectState::PLASMA_CREATED" + PLASMA_SEALED" plasma::ObjectState::PLASMA_SEALED" + + cdef struct CObjectTableEntry" plasma::ObjectTableEntry": + int fd + int device_num + int64_t map_size + ptrdiff_t offset + uint8_t* pointer + int64_t data_size + int64_t metadata_size + int ref_count + int64_t create_time + int64_t construct_duration + CObjectState state + shared_ptr[CCudaIpcPlaceholder] ipc_handle + + ctypedef unordered_map[CUniqueID, unique_ptr[CObjectTableEntry]] \ + CObjectTable" plasma::ObjectTable" + + +cdef extern from "plasma/common.h": + cdef int64_t kDigestSize" plasma::kDigestSize" + +cdef extern from "plasma/client.h" nogil: + + cdef cppclass CPlasmaClient" plasma::PlasmaClient": + + CPlasmaClient() + + CStatus Connect(const c_string& store_socket_name, + const c_string& manager_socket_name, + int release_delay, int num_retries) + + CStatus Create(const CUniqueID& object_id, + int64_t data_size, const uint8_t* metadata, int64_t + metadata_size, const shared_ptr[CBuffer]* data) + + CStatus CreateAndSeal(const CUniqueID& object_id, + const c_string& data, const c_string& metadata) + + CStatus Get(const c_vector[CUniqueID] object_ids, int64_t timeout_ms, + c_vector[CObjectBuffer]* object_buffers) + + CStatus Seal(const CUniqueID& object_id) + + CStatus Evict(int64_t num_bytes, int64_t& num_bytes_evicted) + + CStatus Hash(const CUniqueID& object_id, uint8_t* digest) + + CStatus Release(const CUniqueID& object_id) + + CStatus Contains(const CUniqueID& object_id, c_bool* has_object) + + CStatus List(CObjectTable* objects) + + CStatus Subscribe(int* fd) + + CStatus DecodeNotifications(const uint8_t* buffer, + c_vector[CUniqueID]* object_ids, + c_vector[int64_t]* data_sizes, + c_vector[int64_t]* metadata_sizes) + + CStatus GetNotification(int fd, CUniqueID* object_id, + int64_t* data_size, int64_t* metadata_size) + + CStatus Disconnect() + + CStatus Delete(const c_vector[CUniqueID] object_ids) + + CStatus SetClientOptions(const c_string& client_name, + int64_t limit_output_memory) + + c_string DebugString() + + int64_t store_capacity() + +cdef extern from "plasma/client.h" nogil: + + cdef struct CObjectBuffer" plasma::ObjectBuffer": + shared_ptr[CBuffer] data + shared_ptr[CBuffer] metadata + + +def make_object_id(object_id): + return ObjectID(object_id) + + +cdef class ObjectID(_Weakrefable): + """ + An ObjectID represents a string of bytes used to identify Plasma objects. + """ + + cdef: + CUniqueID data + + def __cinit__(self, object_id): + if (not isinstance(object_id, bytes) or + len(object_id) != CUniqueID.size()): + raise ValueError("Object ID must by 20 bytes," + " is " + str(object_id)) + self.data = CUniqueID.from_binary(object_id) + + def __eq__(self, other): + try: + return self.data == (<ObjectID?>other).data + except TypeError: + return False + + def __hash__(self): + return hash(self.data.binary()) + + def __repr__(self): + return "ObjectID(" + self.data.hex().decode() + ")" + + def __reduce__(self): + return (make_object_id, (self.data.binary(),)) + + def binary(self): + """ + Return the binary representation of this ObjectID. + + Returns + ------- + bytes + Binary representation of the ObjectID. + """ + return self.data.binary() + + @staticmethod + def from_random(): + """ + Returns a randomly generated ObjectID. + + Returns + ------- + ObjectID + A randomly generated ObjectID. + """ + random_id = bytes(bytearray( + random.getrandbits(8) for _ in range(CUniqueID.size()))) + return ObjectID(random_id) + + +cdef class ObjectNotAvailable(_Weakrefable): + """ + Placeholder for an object that was not available within the given timeout. + """ + pass + + +cdef class PlasmaBuffer(Buffer): + """ + This is the type returned by calls to get with a PlasmaClient. + + We define our own class instead of directly returning a buffer object so + that we can add a custom destructor which notifies Plasma that the object + is no longer being used, so the memory in the Plasma store backing the + object can potentially be freed. + + Attributes + ---------- + object_id : ObjectID + The ID of the object in the buffer. + client : PlasmaClient + The PlasmaClient that we use to communicate with the store and manager. + """ + + cdef: + ObjectID object_id + PlasmaClient client + + @staticmethod + cdef PlasmaBuffer create(ObjectID object_id, PlasmaClient client, + const shared_ptr[CBuffer]& buffer): + cdef PlasmaBuffer self = PlasmaBuffer.__new__(PlasmaBuffer) + self.object_id = object_id + self.client = client + self.init(buffer) + return self + + def __init__(self): + raise TypeError("Do not call PlasmaBuffer's constructor directly, use " + "`PlasmaClient.create` instead.") + + def __dealloc__(self): + """ + Notify Plasma that the object is no longer needed. + + If the plasma client has been shut down, then don't do anything. + """ + self.client._release(self.object_id) + + +class PlasmaObjectNotFound(ArrowException): + pass + + +class PlasmaStoreFull(ArrowException): + pass + + +class PlasmaObjectExists(ArrowException): + pass + + +cdef int plasma_check_status(const CStatus& status) nogil except -1: + if status.ok(): + return 0 + + with gil: + message = frombytes(status.message()) + if IsPlasmaObjectExists(status): + raise PlasmaObjectExists(message) + elif IsPlasmaObjectNotFound(status): + raise PlasmaObjectNotFound(message) + elif IsPlasmaStoreFull(status): + raise PlasmaStoreFull(message) + + return check_status(status) + + +def get_socket_from_fd(fileno, family, type): + import socket + return socket.socket(fileno=fileno, family=family, type=type) + + +cdef class PlasmaClient(_Weakrefable): + """ + The PlasmaClient is used to interface with a plasma store and manager. + + The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a + buffer, and get a buffer. Buffers are referred to by object IDs, which are + strings. + """ + + cdef: + shared_ptr[CPlasmaClient] client + int notification_fd + c_string store_socket_name + + def __cinit__(self): + self.client.reset(new CPlasmaClient()) + self.notification_fd = -1 + self.store_socket_name = b"" + + cdef _get_object_buffers(self, object_ids, int64_t timeout_ms, + c_vector[CObjectBuffer]* result): + cdef: + c_vector[CUniqueID] ids + ObjectID object_id + + for object_id in object_ids: + ids.push_back(object_id.data) + with nogil: + plasma_check_status(self.client.get().Get(ids, timeout_ms, result)) + + # XXX C++ API should instead expose some kind of CreateAuto() + cdef _make_mutable_plasma_buffer(self, ObjectID object_id, uint8_t* data, + int64_t size): + cdef shared_ptr[CBuffer] buffer + buffer.reset(new CMutableBuffer(data, size)) + return PlasmaBuffer.create(object_id, self, buffer) + + @property + def store_socket_name(self): + return self.store_socket_name.decode() + + def create(self, ObjectID object_id, int64_t data_size, + c_string metadata=b""): + """ + Create a new buffer in the PlasmaStore for a particular object ID. + + The returned buffer is mutable until seal is called. + + Parameters + ---------- + object_id : ObjectID + The object ID used to identify an object. + size : int + The size in bytes of the created buffer. + metadata : bytes + An optional string of bytes encoding whatever metadata the user + wishes to encode. + + Raises + ------ + PlasmaObjectExists + This exception is raised if the object could not be created because + there already is an object with the same ID in the plasma store. + + PlasmaStoreFull: This exception is raised if the object could + not be created because the plasma store is unable to evict + enough objects to create room for it. + """ + cdef shared_ptr[CBuffer] data + with nogil: + plasma_check_status( + self.client.get().Create(object_id.data, data_size, + <uint8_t*>(metadata.data()), + metadata.size(), &data)) + return self._make_mutable_plasma_buffer(object_id, + data.get().mutable_data(), + data_size) + + def create_and_seal(self, ObjectID object_id, c_string data, + c_string metadata=b""): + """ + Store a new object in the PlasmaStore for a particular object ID. + + Parameters + ---------- + object_id : ObjectID + The object ID used to identify an object. + data : bytes + The object to store. + metadata : bytes + An optional string of bytes encoding whatever metadata the user + wishes to encode. + + Raises + ------ + PlasmaObjectExists + This exception is raised if the object could not be created because + there already is an object with the same ID in the plasma store. + + PlasmaStoreFull: This exception is raised if the object could + not be created because the plasma store is unable to evict + enough objects to create room for it. + """ + with nogil: + plasma_check_status( + self.client.get().CreateAndSeal(object_id.data, data, + metadata)) + + def get_buffers(self, object_ids, timeout_ms=-1, with_meta=False): + """ + Returns data buffer from the PlasmaStore based on object ID. + + If the object has not been sealed yet, this call will block. The + retrieved buffer is immutable. + + Parameters + ---------- + object_ids : list + A list of ObjectIDs used to identify some objects. + timeout_ms : int + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + with_meta : bool + + Returns + ------- + list + If with_meta=False, this is a list of PlasmaBuffers for the data + associated with the object_ids and None if the object was not + available. If with_meta=True, this is a list of tuples of + PlasmaBuffer and metadata bytes. + """ + cdef c_vector[CObjectBuffer] object_buffers + self._get_object_buffers(object_ids, timeout_ms, &object_buffers) + result = [] + for i in range(object_buffers.size()): + if object_buffers[i].data.get() != nullptr: + data = pyarrow_wrap_buffer(object_buffers[i].data) + else: + data = None + if not with_meta: + result.append(data) + else: + if object_buffers[i].metadata.get() != nullptr: + size = object_buffers[i].metadata.get().size() + metadata = object_buffers[i].metadata.get().data()[:size] + else: + metadata = None + result.append((metadata, data)) + return result + + def get_metadata(self, object_ids, timeout_ms=-1): + """ + Returns metadata buffer from the PlasmaStore based on object ID. + + If the object has not been sealed yet, this call will block. The + retrieved buffer is immutable. + + Parameters + ---------- + object_ids : list + A list of ObjectIDs used to identify some objects. + timeout_ms : int + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + + Returns + ------- + list + List of PlasmaBuffers for the metadata associated with the + object_ids and None if the object was not available. + """ + cdef c_vector[CObjectBuffer] object_buffers + self._get_object_buffers(object_ids, timeout_ms, &object_buffers) + result = [] + for i in range(object_buffers.size()): + if object_buffers[i].metadata.get() != nullptr: + result.append(pyarrow_wrap_buffer(object_buffers[i].metadata)) + else: + result.append(None) + return result + + def put_raw_buffer(self, object value, ObjectID object_id=None, + c_string metadata=b"", int memcopy_threads=6): + """ + Store Python buffer into the object store. + + Parameters + ---------- + value : Python object that implements the buffer protocol + A Python buffer object to store. + object_id : ObjectID, default None + If this is provided, the specified object ID will be used to refer + to the object. + metadata : bytes + An optional string of bytes encoding whatever metadata the user + wishes to encode. + memcopy_threads : int, default 6 + The number of threads to use to write the serialized object into + the object store for large objects. + + Returns + ------- + The object ID associated to the Python buffer object. + """ + cdef ObjectID target_id = (object_id if object_id + else ObjectID.from_random()) + cdef Buffer arrow_buffer = pyarrow.py_buffer(value) + write_buffer = self.create(target_id, len(value), metadata) + stream = pyarrow.FixedSizeBufferWriter(write_buffer) + stream.set_memcopy_threads(memcopy_threads) + stream.write(arrow_buffer) + self.seal(target_id) + return target_id + + def put(self, object value, ObjectID object_id=None, int memcopy_threads=6, + serialization_context=None): + """ + Store a Python value into the object store. + + Parameters + ---------- + value : object + A Python object to store. + object_id : ObjectID, default None + If this is provided, the specified object ID will be used to refer + to the object. + memcopy_threads : int, default 6 + The number of threads to use to write the serialized object into + the object store for large objects. + serialization_context : pyarrow.SerializationContext, default None + Custom serialization and deserialization context. + + Returns + ------- + The object ID associated to the Python object. + """ + cdef ObjectID target_id = (object_id if object_id + else ObjectID.from_random()) + if serialization_context is not None: + warnings.warn( + "'serialization_context' is deprecated and will be removed " + "in a future version.", + FutureWarning, stacklevel=2 + ) + serialized = pyarrow.lib._serialize(value, serialization_context) + buffer = self.create(target_id, serialized.total_bytes) + stream = pyarrow.FixedSizeBufferWriter(buffer) + stream.set_memcopy_threads(memcopy_threads) + serialized.write_to(stream) + self.seal(target_id) + return target_id + + def get(self, object_ids, int timeout_ms=-1, serialization_context=None): + """ + Get one or more Python values from the object store. + + Parameters + ---------- + object_ids : list or ObjectID + Object ID or list of object IDs associated to the values we get + from the store. + timeout_ms : int, default -1 + The number of milliseconds that the get call should block before + timing out and returning. Pass -1 if the call should block and 0 + if the call should return immediately. + serialization_context : pyarrow.SerializationContext, default None + Custom serialization and deserialization context. + + Returns + ------- + list or object + Python value or list of Python values for the data associated with + the object_ids and ObjectNotAvailable if the object was not + available. + """ + if serialization_context is not None: + warnings.warn( + "'serialization_context' is deprecated and will be removed " + "in a future version.", + FutureWarning, stacklevel=2 + ) + if isinstance(object_ids, Sequence): + results = [] + buffers = self.get_buffers(object_ids, timeout_ms) + for i in range(len(object_ids)): + # buffers[i] is None if this object was not available within + # the timeout + if buffers[i]: + val = pyarrow.lib._deserialize(buffers[i], + serialization_context) + results.append(val) + else: + results.append(ObjectNotAvailable) + return results + else: + return self.get([object_ids], timeout_ms, serialization_context)[0] + + def seal(self, ObjectID object_id): + """ + Seal the buffer in the PlasmaStore for a particular object ID. + + Once a buffer has been sealed, the buffer is immutable and can only be + accessed through get. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + with nogil: + plasma_check_status(self.client.get().Seal(object_id.data)) + + def _release(self, ObjectID object_id): + """ + Notify Plasma that the object is no longer needed. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + with nogil: + plasma_check_status(self.client.get().Release(object_id.data)) + + def contains(self, ObjectID object_id): + """ + Check if the object is present and sealed in the PlasmaStore. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + """ + cdef c_bool is_contained + with nogil: + plasma_check_status(self.client.get().Contains(object_id.data, + &is_contained)) + return is_contained + + def hash(self, ObjectID object_id): + """ + Compute the checksum of an object in the object store. + + Parameters + ---------- + object_id : ObjectID + A string used to identify an object. + + Returns + ------- + bytes + A digest string object's hash. If the object isn't in the object + store, the string will have length zero. + """ + cdef c_vector[uint8_t] digest = c_vector[uint8_t](kDigestSize) + with nogil: + plasma_check_status(self.client.get().Hash(object_id.data, + digest.data())) + return bytes(digest[:]) + + def evict(self, int64_t num_bytes): + """ + Evict some objects until to recover some bytes. + + Recover at least num_bytes bytes if possible. + + Parameters + ---------- + num_bytes : int + The number of bytes to attempt to recover. + """ + cdef int64_t num_bytes_evicted = -1 + with nogil: + plasma_check_status( + self.client.get().Evict(num_bytes, num_bytes_evicted)) + return num_bytes_evicted + + def subscribe(self): + """Subscribe to notifications about sealed objects.""" + with nogil: + plasma_check_status( + self.client.get().Subscribe(&self.notification_fd)) + + def get_notification_socket(self): + """ + Get the notification socket. + """ + return get_socket_from_fd(self.notification_fd, + family=socket.AF_UNIX, + type=socket.SOCK_STREAM) + + def decode_notifications(self, const uint8_t* buf): + """ + Get the notification from the buffer. + + Returns + ------- + [ObjectID] + The list of object IDs in the notification message. + c_vector[int64_t] + The data sizes of the objects in the notification message. + c_vector[int64_t] + The metadata sizes of the objects in the notification message. + """ + cdef c_vector[CUniqueID] ids + cdef c_vector[int64_t] data_sizes + cdef c_vector[int64_t] metadata_sizes + with nogil: + status = self.client.get().DecodeNotifications(buf, + &ids, + &data_sizes, + &metadata_sizes) + plasma_check_status(status) + object_ids = [] + for object_id in ids: + object_ids.append(ObjectID(object_id.binary())) + return object_ids, data_sizes, metadata_sizes + + def get_next_notification(self): + """ + Get the next notification from the notification socket. + + Returns + ------- + ObjectID + The object ID of the object that was stored. + int + The data size of the object that was stored. + int + The metadata size of the object that was stored. + """ + cdef ObjectID object_id = ObjectID(CUniqueID.size() * b"\0") + cdef int64_t data_size + cdef int64_t metadata_size + with nogil: + status = self.client.get().GetNotification(self.notification_fd, + &object_id.data, + &data_size, + &metadata_size) + plasma_check_status(status) + return object_id, data_size, metadata_size + + def to_capsule(self): + return PyCapsule_New(<void *>self.client.get(), "plasma", NULL) + + def disconnect(self): + """ + Disconnect this client from the Plasma store. + """ + with nogil: + plasma_check_status(self.client.get().Disconnect()) + + def delete(self, object_ids): + """ + Delete the objects with the given IDs from other object store. + + Parameters + ---------- + object_ids : list + A list of strings used to identify the objects. + """ + cdef c_vector[CUniqueID] ids + cdef ObjectID object_id + for object_id in object_ids: + ids.push_back(object_id.data) + with nogil: + plasma_check_status(self.client.get().Delete(ids)) + + def set_client_options(self, client_name, int64_t limit_output_memory): + cdef c_string name + name = client_name.encode() + with nogil: + plasma_check_status( + self.client.get().SetClientOptions(name, limit_output_memory)) + + def debug_string(self): + cdef c_string result + with nogil: + result = self.client.get().DebugString() + return result.decode() + + def list(self): + """ + Experimental: List the objects in the store. + + Returns + ------- + dict + Dictionary from ObjectIDs to an "info" dictionary describing the + object. The "info" dictionary has the following entries: + + data_size + size of the object in bytes + + metadata_size + size of the object metadata in bytes + + ref_count + Number of clients referencing the object buffer + + create_time + Unix timestamp of the creation of the object + + construct_duration + Time the creation of the object took in seconds + + state + "created" if the object is still being created and + "sealed" if it is already sealed + """ + cdef CObjectTable objects + with nogil: + plasma_check_status(self.client.get().List(&objects)) + result = dict() + cdef ObjectID object_id + cdef CObjectTableEntry entry + it = objects.begin() + while it != objects.end(): + object_id = ObjectID(deref(it).first.binary()) + entry = deref(deref(it).second) + if entry.state == CObjectState.PLASMA_CREATED: + state = "created" + else: + state = "sealed" + result[object_id] = { + "data_size": entry.data_size, + "metadata_size": entry.metadata_size, + "ref_count": entry.ref_count, + "create_time": entry.create_time, + "construct_duration": entry.construct_duration, + "state": state + } + inc(it) + return result + + def store_capacity(self): + """ + Get the memory capacity of the store. + + Returns + ------- + + int + The memory capacity of the store in bytes. + """ + return self.client.get().store_capacity() + + +def connect(store_socket_name, int num_retries=-1): + """ + Return a new PlasmaClient that is connected a plasma store and + optionally a manager. + + Parameters + ---------- + store_socket_name : str + Name of the socket the plasma store is listening at. + num_retries : int, default -1 + Number of times to try to connect to plasma store. Default value of -1 + uses the default (50) + """ + cdef PlasmaClient result = PlasmaClient() + cdef int deprecated_release_delay = 0 + result.store_socket_name = store_socket_name.encode() + with nogil: + plasma_check_status( + result.client.get().Connect(result.store_socket_name, b"", + deprecated_release_delay, num_retries)) + return result |