summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/pyarrow/tests/test_plasma.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/tests/test_plasma.py
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/tests/test_plasma.py')
-rw-r--r--src/arrow/python/pyarrow/tests/test_plasma.py1073
1 files changed, 1073 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/tests/test_plasma.py b/src/arrow/python/pyarrow/tests/test_plasma.py
new file mode 100644
index 000000000..ed08a6872
--- /dev/null
+++ b/src/arrow/python/pyarrow/tests/test_plasma.py
@@ -0,0 +1,1073 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import multiprocessing
+import os
+import pytest
+import random
+import signal
+import struct
+import subprocess
+import sys
+import time
+
+import numpy as np
+import pyarrow as pa
+
+
+DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8
+USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1"
+EXTERNAL_STORE = "hashtable://test"
+SMALL_OBJECT_SIZE = 9000
+
+
+def random_name():
+ return str(random.randint(0, 99999999))
+
+
+def random_object_id():
+ import pyarrow.plasma as plasma
+ return plasma.ObjectID(np.random.bytes(20))
+
+
+def generate_metadata(length):
+ metadata = bytearray(length)
+ if length > 0:
+ metadata[0] = random.randint(0, 255)
+ metadata[-1] = random.randint(0, 255)
+ for _ in range(100):
+ metadata[random.randint(0, length - 1)] = random.randint(0, 255)
+ return metadata
+
+
+def write_to_data_buffer(buff, length):
+ array = np.frombuffer(buff, dtype="uint8")
+ if length > 0:
+ array[0] = random.randint(0, 255)
+ array[-1] = random.randint(0, 255)
+ for _ in range(100):
+ array[random.randint(0, length - 1)] = random.randint(0, 255)
+
+
+def create_object_with_id(client, object_id, data_size, metadata_size,
+ seal=True):
+ metadata = generate_metadata(metadata_size)
+ memory_buffer = client.create(object_id, data_size, metadata)
+ write_to_data_buffer(memory_buffer, data_size)
+ if seal:
+ client.seal(object_id)
+ return memory_buffer, metadata
+
+
+def create_object(client, data_size, metadata_size=0, seal=True):
+ object_id = random_object_id()
+ memory_buffer, metadata = create_object_with_id(client, object_id,
+ data_size, metadata_size,
+ seal=seal)
+ return object_id, memory_buffer, metadata
+
+
+@pytest.mark.plasma
+class TestPlasmaClient:
+
+ def setup_method(self, test_method):
+ import pyarrow.plasma as plasma
+ # Start Plasma store.
+ self.plasma_store_ctx = plasma.start_plasma_store(
+ plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
+ use_valgrind=USE_VALGRIND)
+ self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
+ # Connect to Plasma.
+ self.plasma_client = plasma.connect(self.plasma_store_name)
+ self.plasma_client2 = plasma.connect(self.plasma_store_name)
+
+ def teardown_method(self, test_method):
+ try:
+ # Check that the Plasma store is still alive.
+ assert self.p.poll() is None
+ # Ensure Valgrind and/or coverage have a clean exit
+ # Valgrind misses SIGTERM if it is delivered before the
+ # event loop is ready; this race condition is mitigated
+ # but not solved by time.sleep().
+ if USE_VALGRIND:
+ time.sleep(1.0)
+ self.p.send_signal(signal.SIGTERM)
+ self.p.wait(timeout=5)
+ assert self.p.returncode == 0
+ finally:
+ self.plasma_store_ctx.__exit__(None, None, None)
+
+ def test_connection_failure_raises_exception(self):
+ import pyarrow.plasma as plasma
+ # ARROW-1264
+ with pytest.raises(IOError):
+ plasma.connect('unknown-store-name', num_retries=1)
+
+ def test_create(self):
+ # Create an object id string.
+ object_id = random_object_id()
+ # Create a new buffer and write to it.
+ length = 50
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+ length),
+ dtype="uint8")
+ for i in range(length):
+ memory_buffer[i] = i % 256
+ # Seal the object.
+ self.plasma_client.seal(object_id)
+ # Get the object.
+ memory_buffer = np.frombuffer(
+ self.plasma_client.get_buffers([object_id])[0], dtype="uint8")
+ for i in range(length):
+ assert memory_buffer[i] == i % 256
+
+ def test_create_with_metadata(self):
+ for length in range(0, 1000, 3):
+ # Create an object id string.
+ object_id = random_object_id()
+ # Create a random metadata string.
+ metadata = generate_metadata(length)
+ # Create a new buffer and write to it.
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+ length,
+ metadata),
+ dtype="uint8")
+ for i in range(length):
+ memory_buffer[i] = i % 256
+ # Seal the object.
+ self.plasma_client.seal(object_id)
+ # Get the object.
+ memory_buffer = np.frombuffer(
+ self.plasma_client.get_buffers([object_id])[0], dtype="uint8")
+ for i in range(length):
+ assert memory_buffer[i] == i % 256
+ # Get the metadata.
+ metadata_buffer = np.frombuffer(
+ self.plasma_client.get_metadata([object_id])[0], dtype="uint8")
+ assert len(metadata) == len(metadata_buffer)
+ for i in range(len(metadata)):
+ assert metadata[i] == metadata_buffer[i]
+
+ def test_create_existing(self):
+ # This test is partially used to test the code path in which we create
+ # an object with an ID that already exists
+ length = 100
+ for _ in range(1000):
+ object_id = random_object_id()
+ self.plasma_client.create(object_id, length,
+ generate_metadata(length))
+ try:
+ self.plasma_client.create(object_id, length,
+ generate_metadata(length))
+ # TODO(pcm): Introduce a more specific error type here.
+ except pa.lib.ArrowException:
+ pass
+ else:
+ assert False
+
+ def test_create_and_seal(self):
+
+ # Create a bunch of objects.
+ object_ids = []
+ for i in range(1000):
+ object_id = random_object_id()
+ object_ids.append(object_id)
+ self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b')
+
+ for i in range(1000):
+ [data_tuple] = self.plasma_client.get_buffers([object_ids[i]],
+ with_meta=True)
+ assert data_tuple[1].to_pybytes() == i * b'a'
+ assert (self.plasma_client.get_metadata(
+ [object_ids[i]])[0].to_pybytes() ==
+ i * b'b')
+
+ # Make sure that creating the same object twice raises an exception.
+ object_id = random_object_id()
+ self.plasma_client.create_and_seal(object_id, b'a', b'b')
+ with pytest.raises(pa.plasma.PlasmaObjectExists):
+ self.plasma_client.create_and_seal(object_id, b'a', b'b')
+
+ # Make sure that these objects can be evicted.
+ big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a'
+ object_ids = []
+ for _ in range(20):
+ object_id = random_object_id()
+ object_ids.append(object_id)
+ self.plasma_client.create_and_seal(random_object_id(), big_object,
+ big_object)
+ for i in range(10):
+ assert not self.plasma_client.contains(object_ids[i])
+
+ def test_get(self):
+ num_object_ids = 60
+ # Test timing out of get with various timeouts.
+ for timeout in [0, 10, 100, 1000]:
+ object_ids = [random_object_id() for _ in range(num_object_ids)]
+ results = self.plasma_client.get_buffers(object_ids,
+ timeout_ms=timeout)
+ assert results == num_object_ids * [None]
+
+ data_buffers = []
+ metadata_buffers = []
+ for i in range(num_object_ids):
+ if i % 2 == 0:
+ data_buffer, metadata_buffer = create_object_with_id(
+ self.plasma_client, object_ids[i], 2000, 2000)
+ data_buffers.append(data_buffer)
+ metadata_buffers.append(metadata_buffer)
+
+ # Test timing out from some but not all get calls with various
+ # timeouts.
+ for timeout in [0, 10, 100, 1000]:
+ data_results = self.plasma_client.get_buffers(object_ids,
+ timeout_ms=timeout)
+ # metadata_results = self.plasma_client.get_metadata(
+ # object_ids, timeout_ms=timeout)
+ for i in range(num_object_ids):
+ if i % 2 == 0:
+ array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8")
+ array2 = np.frombuffer(data_results[i], dtype="uint8")
+ np.testing.assert_equal(array1, array2)
+ # TODO(rkn): We should compare the metadata as well. But
+ # currently the types are different (e.g., memoryview
+ # versus bytearray).
+ # assert plasma.buffers_equal(
+ # metadata_buffers[i // 2], metadata_results[i])
+ else:
+ assert results[i] is None
+
+ # Test trying to get an object that was created by the same client but
+ # not sealed.
+ object_id = random_object_id()
+ self.plasma_client.create(object_id, 10, b"metadata")
+ assert self.plasma_client.get_buffers(
+ [object_id], timeout_ms=0, with_meta=True)[0][1] is None
+ assert self.plasma_client.get_buffers(
+ [object_id], timeout_ms=1, with_meta=True)[0][1] is None
+ self.plasma_client.seal(object_id)
+ assert self.plasma_client.get_buffers(
+ [object_id], timeout_ms=0, with_meta=True)[0][1] is not None
+
+ def test_buffer_lifetime(self):
+ # ARROW-2195
+ arr = pa.array([1, 12, 23, 3, 34], pa.int32())
+ batch = pa.RecordBatch.from_arrays([arr], ['field1'])
+
+ # Serialize RecordBatch into Plasma store
+ sink = pa.MockOutputStream()
+ writer = pa.RecordBatchStreamWriter(sink, batch.schema)
+ writer.write_batch(batch)
+ writer.close()
+
+ object_id = random_object_id()
+ data_buffer = self.plasma_client.create(object_id, sink.size())
+ stream = pa.FixedSizeBufferWriter(data_buffer)
+ writer = pa.RecordBatchStreamWriter(stream, batch.schema)
+ writer.write_batch(batch)
+ writer.close()
+ self.plasma_client.seal(object_id)
+ del data_buffer
+
+ # Unserialize RecordBatch from Plasma store
+ [data_buffer] = self.plasma_client2.get_buffers([object_id])
+ reader = pa.RecordBatchStreamReader(data_buffer)
+ read_batch = reader.read_next_batch()
+ # Lose reference to returned buffer. The RecordBatch must still
+ # be backed by valid memory.
+ del data_buffer, reader
+
+ assert read_batch.equals(batch)
+
+ def test_put_and_get(self):
+ for value in [["hello", "world", 3, 1.0], None, "hello"]:
+ object_id = self.plasma_client.put(value)
+ [result] = self.plasma_client.get([object_id])
+ assert result == value
+
+ result = self.plasma_client.get(object_id)
+ assert result == value
+
+ object_id = random_object_id()
+ [result] = self.plasma_client.get([object_id], timeout_ms=0)
+ assert result == pa.plasma.ObjectNotAvailable
+
+ @pytest.mark.filterwarnings(
+ "ignore:'pyarrow.deserialize':FutureWarning")
+ def test_put_and_get_raw_buffer(self):
+ temp_id = random_object_id()
+ use_meta = b"RAW"
+
+ def deserialize_or_output(data_tuple):
+ if data_tuple[0] == use_meta:
+ return data_tuple[1].to_pybytes()
+ else:
+ if data_tuple[1] is None:
+ return pa.plasma.ObjectNotAvailable
+ else:
+ return pa.deserialize(data_tuple[1])
+
+ for value in [b"Bytes Test", temp_id.binary(), 10 * b"\x00", 123]:
+ if isinstance(value, bytes):
+ object_id = self.plasma_client.put_raw_buffer(
+ value, metadata=use_meta)
+ else:
+ object_id = self.plasma_client.put(value)
+ [result] = self.plasma_client.get_buffers([object_id],
+ with_meta=True)
+ result = deserialize_or_output(result)
+ assert result == value
+
+ object_id = random_object_id()
+ [result] = self.plasma_client.get_buffers([object_id],
+ timeout_ms=0,
+ with_meta=True)
+ result = deserialize_or_output(result)
+ assert result == pa.plasma.ObjectNotAvailable
+
+ @pytest.mark.filterwarnings(
+ "ignore:'serialization_context':FutureWarning")
+ def test_put_and_get_serialization_context(self):
+
+ class CustomType:
+ def __init__(self, val):
+ self.val = val
+
+ val = CustomType(42)
+
+ with pytest.raises(pa.ArrowSerializationError):
+ self.plasma_client.put(val)
+
+ serialization_context = pa.lib.SerializationContext()
+ serialization_context.register_type(CustomType, 20*"\x00")
+
+ object_id = self.plasma_client.put(
+ val, None, serialization_context=serialization_context)
+
+ with pytest.raises(pa.ArrowSerializationError):
+ result = self.plasma_client.get(object_id)
+
+ result = self.plasma_client.get(
+ object_id, -1, serialization_context=serialization_context)
+ assert result.val == val.val
+
+ def test_store_arrow_objects(self):
+ data = np.random.randn(10, 4)
+ # Write an arrow object.
+ object_id = random_object_id()
+ tensor = pa.Tensor.from_numpy(data)
+ data_size = pa.ipc.get_tensor_size(tensor)
+ buf = self.plasma_client.create(object_id, data_size)
+ stream = pa.FixedSizeBufferWriter(buf)
+ pa.ipc.write_tensor(tensor, stream)
+ self.plasma_client.seal(object_id)
+ # Read the arrow object.
+ [tensor] = self.plasma_client.get_buffers([object_id])
+ reader = pa.BufferReader(tensor)
+ array = pa.ipc.read_tensor(reader).to_numpy()
+ # Assert that they are equal.
+ np.testing.assert_equal(data, array)
+
+ @pytest.mark.pandas
+ def test_store_pandas_dataframe(self):
+ import pandas as pd
+ import pyarrow.plasma as plasma
+ d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']),
+ 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])}
+ df = pd.DataFrame(d)
+
+ # Write the DataFrame.
+ record_batch = pa.RecordBatch.from_pandas(df)
+ # Determine the size.
+ s = pa.MockOutputStream()
+ stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema)
+ stream_writer.write_batch(record_batch)
+ data_size = s.size()
+ object_id = plasma.ObjectID(np.random.bytes(20))
+
+ buf = self.plasma_client.create(object_id, data_size)
+ stream = pa.FixedSizeBufferWriter(buf)
+ stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
+ stream_writer.write_batch(record_batch)
+
+ self.plasma_client.seal(object_id)
+
+ # Read the DataFrame.
+ [data] = self.plasma_client.get_buffers([object_id])
+ reader = pa.RecordBatchStreamReader(pa.BufferReader(data))
+ result = reader.read_next_batch().to_pandas()
+
+ pd.testing.assert_frame_equal(df, result)
+
+ def test_pickle_object_ids(self):
+ # This can be used for sharing object IDs between processes.
+ import pickle
+ object_id = random_object_id()
+ data = pickle.dumps(object_id)
+ object_id2 = pickle.loads(data)
+ assert object_id == object_id2
+
+ def test_store_full(self):
+ # The store is started with 1GB, so make sure that create throws an
+ # exception when it is full.
+ def assert_create_raises_plasma_full(unit_test, size):
+ partial_size = np.random.randint(size)
+ try:
+ _, memory_buffer, _ = create_object(unit_test.plasma_client,
+ partial_size,
+ size - partial_size)
+ # TODO(pcm): More specific error here.
+ except pa.lib.ArrowException:
+ pass
+ else:
+ # For some reason the above didn't throw an exception, so fail.
+ assert False
+
+ PERCENT = DEFAULT_PLASMA_STORE_MEMORY // 100
+
+ # Create a list to keep some of the buffers in scope.
+ memory_buffers = []
+ _, memory_buffer, _ = create_object(self.plasma_client, 50 * PERCENT)
+ memory_buffers.append(memory_buffer)
+ # Remaining space is 50%. Make sure that we can't create an
+ # object of size 50% + 1, but we can create one of size 20%.
+ assert_create_raises_plasma_full(
+ self, 50 * PERCENT + SMALL_OBJECT_SIZE)
+ _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
+ del memory_buffer
+ _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
+ del memory_buffer
+ assert_create_raises_plasma_full(
+ self, 50 * PERCENT + SMALL_OBJECT_SIZE)
+
+ _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT)
+ memory_buffers.append(memory_buffer)
+ # Remaining space is 30%.
+ assert_create_raises_plasma_full(
+ self, 30 * PERCENT + SMALL_OBJECT_SIZE)
+
+ _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT)
+ memory_buffers.append(memory_buffer)
+ # Remaining space is 20%.
+ assert_create_raises_plasma_full(
+ self, 20 * PERCENT + SMALL_OBJECT_SIZE)
+
+ def test_contains(self):
+ fake_object_ids = [random_object_id() for _ in range(100)]
+ real_object_ids = [random_object_id() for _ in range(100)]
+ for object_id in real_object_ids:
+ assert self.plasma_client.contains(object_id) is False
+ self.plasma_client.create(object_id, 100)
+ self.plasma_client.seal(object_id)
+ assert self.plasma_client.contains(object_id)
+ for object_id in fake_object_ids:
+ assert not self.plasma_client.contains(object_id)
+ for object_id in real_object_ids:
+ assert self.plasma_client.contains(object_id)
+
+ def test_hash(self):
+ # Check the hash of an object that doesn't exist.
+ object_id1 = random_object_id()
+ try:
+ self.plasma_client.hash(object_id1)
+ # TODO(pcm): Introduce a more specific error type here
+ except pa.lib.ArrowException:
+ pass
+ else:
+ assert False
+
+ length = 1000
+ # Create a random object, and check that the hash function always
+ # returns the same value.
+ metadata = generate_metadata(length)
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id1,
+ length,
+ metadata),
+ dtype="uint8")
+ for i in range(length):
+ memory_buffer[i] = i % 256
+ self.plasma_client.seal(object_id1)
+ assert (self.plasma_client.hash(object_id1) ==
+ self.plasma_client.hash(object_id1))
+
+ # Create a second object with the same value as the first, and check
+ # that their hashes are equal.
+ object_id2 = random_object_id()
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id2,
+ length,
+ metadata),
+ dtype="uint8")
+ for i in range(length):
+ memory_buffer[i] = i % 256
+ self.plasma_client.seal(object_id2)
+ assert (self.plasma_client.hash(object_id1) ==
+ self.plasma_client.hash(object_id2))
+
+ # Create a third object with a different value from the first two, and
+ # check that its hash is different.
+ object_id3 = random_object_id()
+ metadata = generate_metadata(length)
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id3,
+ length,
+ metadata),
+ dtype="uint8")
+ for i in range(length):
+ memory_buffer[i] = (i + 1) % 256
+ self.plasma_client.seal(object_id3)
+ assert (self.plasma_client.hash(object_id1) !=
+ self.plasma_client.hash(object_id3))
+
+ # Create a fourth object with the same value as the third, but
+ # different metadata. Check that its hash is different from any of the
+ # previous three.
+ object_id4 = random_object_id()
+ metadata4 = generate_metadata(length)
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id4,
+ length,
+ metadata4),
+ dtype="uint8")
+ for i in range(length):
+ memory_buffer[i] = (i + 1) % 256
+ self.plasma_client.seal(object_id4)
+ assert (self.plasma_client.hash(object_id1) !=
+ self.plasma_client.hash(object_id4))
+ assert (self.plasma_client.hash(object_id3) !=
+ self.plasma_client.hash(object_id4))
+
+ def test_many_hashes(self):
+ hashes = []
+ length = 2 ** 10
+
+ for i in range(256):
+ object_id = random_object_id()
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+ length),
+ dtype="uint8")
+ for j in range(length):
+ memory_buffer[j] = i
+ self.plasma_client.seal(object_id)
+ hashes.append(self.plasma_client.hash(object_id))
+
+ # Create objects of varying length. Each pair has two bits different.
+ for i in range(length):
+ object_id = random_object_id()
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+ length),
+ dtype="uint8")
+ for j in range(length):
+ memory_buffer[j] = 0
+ memory_buffer[i] = 1
+ self.plasma_client.seal(object_id)
+ hashes.append(self.plasma_client.hash(object_id))
+
+ # Create objects of varying length, all with value 0.
+ for i in range(length):
+ object_id = random_object_id()
+ memory_buffer = np.frombuffer(self.plasma_client.create(object_id,
+ i),
+ dtype="uint8")
+ for j in range(i):
+ memory_buffer[j] = 0
+ self.plasma_client.seal(object_id)
+ hashes.append(self.plasma_client.hash(object_id))
+
+ # Check that all hashes were unique.
+ assert len(set(hashes)) == 256 + length + length
+
+ # def test_individual_delete(self):
+ # length = 100
+ # # Create an object id string.
+ # object_id = random_object_id()
+ # # Create a random metadata string.
+ # metadata = generate_metadata(100)
+ # # Create a new buffer and write to it.
+ # memory_buffer = self.plasma_client.create(object_id, length,
+ # metadata)
+ # for i in range(length):
+ # memory_buffer[i] = chr(i % 256)
+ # # Seal the object.
+ # self.plasma_client.seal(object_id)
+ # # Check that the object is present.
+ # assert self.plasma_client.contains(object_id)
+ # # Delete the object.
+ # self.plasma_client.delete(object_id)
+ # # Make sure the object is no longer present.
+ # self.assertFalse(self.plasma_client.contains(object_id))
+ #
+ # def test_delete(self):
+ # # Create some objects.
+ # object_ids = [random_object_id() for _ in range(100)]
+ # for object_id in object_ids:
+ # length = 100
+ # # Create a random metadata string.
+ # metadata = generate_metadata(100)
+ # # Create a new buffer and write to it.
+ # memory_buffer = self.plasma_client.create(object_id, length,
+ # metadata)
+ # for i in range(length):
+ # memory_buffer[i] = chr(i % 256)
+ # # Seal the object.
+ # self.plasma_client.seal(object_id)
+ # # Check that the object is present.
+ # assert self.plasma_client.contains(object_id)
+ #
+ # # Delete the objects and make sure they are no longer present.
+ # for object_id in object_ids:
+ # # Delete the object.
+ # self.plasma_client.delete(object_id)
+ # # Make sure the object is no longer present.
+ # self.assertFalse(self.plasma_client.contains(object_id))
+
+ def test_illegal_functionality(self):
+ # Create an object id string.
+ object_id = random_object_id()
+ # Create a new buffer and write to it.
+ length = 1000
+ memory_buffer = self.plasma_client.create(object_id, length)
+ # Make sure we cannot access memory out of bounds.
+ with pytest.raises(Exception):
+ memory_buffer[length]
+ # Seal the object.
+ self.plasma_client.seal(object_id)
+ # This test is commented out because it currently fails.
+ # # Make sure the object is ready only now.
+ # def illegal_assignment():
+ # memory_buffer[0] = chr(0)
+ # with pytest.raises(Exception):
+ # illegal_assignment()
+ # Get the object.
+ memory_buffer = self.plasma_client.get_buffers([object_id])[0]
+
+ # Make sure the object is read only.
+ def illegal_assignment():
+ memory_buffer[0] = chr(0)
+ with pytest.raises(Exception):
+ illegal_assignment()
+
+ def test_evict(self):
+ client = self.plasma_client2
+ object_id1 = random_object_id()
+ b1 = client.create(object_id1, 1000)
+ client.seal(object_id1)
+ del b1
+ assert client.evict(1) == 1000
+
+ object_id2 = random_object_id()
+ object_id3 = random_object_id()
+ b2 = client.create(object_id2, 999)
+ b3 = client.create(object_id3, 998)
+ client.seal(object_id3)
+ del b3
+ assert client.evict(1000) == 998
+
+ object_id4 = random_object_id()
+ b4 = client.create(object_id4, 997)
+ client.seal(object_id4)
+ del b4
+ client.seal(object_id2)
+ del b2
+ assert client.evict(1) == 997
+ assert client.evict(1) == 999
+
+ object_id5 = random_object_id()
+ object_id6 = random_object_id()
+ object_id7 = random_object_id()
+ b5 = client.create(object_id5, 996)
+ b6 = client.create(object_id6, 995)
+ b7 = client.create(object_id7, 994)
+ client.seal(object_id5)
+ client.seal(object_id6)
+ client.seal(object_id7)
+ del b5
+ del b6
+ del b7
+ assert client.evict(2000) == 996 + 995 + 994
+
+ # Mitigate valgrind-induced slowness
+ SUBSCRIBE_TEST_SIZES = ([1, 10, 100, 1000] if USE_VALGRIND
+ else [1, 10, 100, 1000, 10000])
+
+ def test_subscribe(self):
+ # Subscribe to notifications from the Plasma Store.
+ self.plasma_client.subscribe()
+ for i in self.SUBSCRIBE_TEST_SIZES:
+ object_ids = [random_object_id() for _ in range(i)]
+ metadata_sizes = [np.random.randint(1000) for _ in range(i)]
+ data_sizes = [np.random.randint(1000) for _ in range(i)]
+ for j in range(i):
+ self.plasma_client.create(
+ object_ids[j], data_sizes[j],
+ metadata=bytearray(np.random.bytes(metadata_sizes[j])))
+ self.plasma_client.seal(object_ids[j])
+ # Check that we received notifications for all of the objects.
+ for j in range(i):
+ notification_info = self.plasma_client.get_next_notification()
+ recv_objid, recv_dsize, recv_msize = notification_info
+ assert object_ids[j] == recv_objid
+ assert data_sizes[j] == recv_dsize
+ assert metadata_sizes[j] == recv_msize
+
+ def test_subscribe_socket(self):
+ # Subscribe to notifications from the Plasma Store.
+ self.plasma_client.subscribe()
+ rsock = self.plasma_client.get_notification_socket()
+ for i in self.SUBSCRIBE_TEST_SIZES:
+ # Get notification from socket.
+ object_ids = [random_object_id() for _ in range(i)]
+ metadata_sizes = [np.random.randint(1000) for _ in range(i)]
+ data_sizes = [np.random.randint(1000) for _ in range(i)]
+
+ for j in range(i):
+ self.plasma_client.create(
+ object_ids[j], data_sizes[j],
+ metadata=bytearray(np.random.bytes(metadata_sizes[j])))
+ self.plasma_client.seal(object_ids[j])
+
+ # Check that we received notifications for all of the objects.
+ for j in range(i):
+ # Assume the plasma store will not be full,
+ # so we always get the data size instead of -1.
+ msg_len, = struct.unpack('L', rsock.recv(8))
+ content = rsock.recv(msg_len)
+ recv_objids, recv_dsizes, recv_msizes = (
+ self.plasma_client.decode_notifications(content))
+ assert object_ids[j] == recv_objids[0]
+ assert data_sizes[j] == recv_dsizes[0]
+ assert metadata_sizes[j] == recv_msizes[0]
+
+ def test_subscribe_deletions(self):
+ # Subscribe to notifications from the Plasma Store. We use
+ # plasma_client2 to make sure that all used objects will get evicted
+ # properly.
+ self.plasma_client2.subscribe()
+ for i in self.SUBSCRIBE_TEST_SIZES:
+ object_ids = [random_object_id() for _ in range(i)]
+ # Add 1 to the sizes to make sure we have nonzero object sizes.
+ metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
+ data_sizes = [np.random.randint(1000) + 1 for _ in range(i)]
+ for j in range(i):
+ x = self.plasma_client2.create(
+ object_ids[j], data_sizes[j],
+ metadata=bytearray(np.random.bytes(metadata_sizes[j])))
+ self.plasma_client2.seal(object_ids[j])
+ del x
+ # Check that we received notifications for creating all of the
+ # objects.
+ for j in range(i):
+ notification_info = self.plasma_client2.get_next_notification()
+ recv_objid, recv_dsize, recv_msize = notification_info
+ assert object_ids[j] == recv_objid
+ assert data_sizes[j] == recv_dsize
+ assert metadata_sizes[j] == recv_msize
+
+ # Check that we receive notifications for deleting all objects, as
+ # we evict them.
+ for j in range(i):
+ assert (self.plasma_client2.evict(1) ==
+ data_sizes[j] + metadata_sizes[j])
+ notification_info = self.plasma_client2.get_next_notification()
+ recv_objid, recv_dsize, recv_msize = notification_info
+ assert object_ids[j] == recv_objid
+ assert -1 == recv_dsize
+ assert -1 == recv_msize
+
+ # Test multiple deletion notifications. The first 9 object IDs have
+ # size 0, and the last has a nonzero size. When Plasma evicts 1 byte,
+ # it will evict all objects, so we should receive deletion
+ # notifications for each.
+ num_object_ids = 10
+ object_ids = [random_object_id() for _ in range(num_object_ids)]
+ metadata_sizes = [0] * (num_object_ids - 1)
+ data_sizes = [0] * (num_object_ids - 1)
+ metadata_sizes.append(np.random.randint(1000))
+ data_sizes.append(np.random.randint(1000))
+ for i in range(num_object_ids):
+ x = self.plasma_client2.create(
+ object_ids[i], data_sizes[i],
+ metadata=bytearray(np.random.bytes(metadata_sizes[i])))
+ self.plasma_client2.seal(object_ids[i])
+ del x
+ for i in range(num_object_ids):
+ notification_info = self.plasma_client2.get_next_notification()
+ recv_objid, recv_dsize, recv_msize = notification_info
+ assert object_ids[i] == recv_objid
+ assert data_sizes[i] == recv_dsize
+ assert metadata_sizes[i] == recv_msize
+ assert (self.plasma_client2.evict(1) ==
+ data_sizes[-1] + metadata_sizes[-1])
+ for i in range(num_object_ids):
+ notification_info = self.plasma_client2.get_next_notification()
+ recv_objid, recv_dsize, recv_msize = notification_info
+ assert object_ids[i] == recv_objid
+ assert -1 == recv_dsize
+ assert -1 == recv_msize
+
+ def test_use_full_memory(self):
+ # Fill the object store up with a large number of small objects and let
+ # them go out of scope.
+ for _ in range(100):
+ create_object(
+ self.plasma_client2,
+ np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0)
+ # Create large objects that require the full object store size, and
+ # verify that they fit.
+ for _ in range(2):
+ create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0)
+ # Verify that an object that is too large does not fit.
+ # Also verifies that the right error is thrown, and does not
+ # create the object ID prematurely.
+ object_id = random_object_id()
+ for i in range(3):
+ with pytest.raises(pa.plasma.PlasmaStoreFull):
+ self.plasma_client2.create(
+ object_id, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE)
+
+ @staticmethod
+ def _client_blocked_in_get(plasma_store_name, object_id):
+ import pyarrow.plasma as plasma
+ client = plasma.connect(plasma_store_name)
+ # Try to get an object ID that doesn't exist. This should block.
+ client.get([object_id])
+
+ def test_client_death_during_get(self):
+ object_id = random_object_id()
+
+ p = multiprocessing.Process(target=self._client_blocked_in_get,
+ args=(self.plasma_store_name, object_id))
+ p.start()
+ # Make sure the process is running.
+ time.sleep(0.2)
+ assert p.is_alive()
+
+ # Kill the client process.
+ p.terminate()
+ # Wait a little for the store to process the disconnect event.
+ time.sleep(0.1)
+
+ # Create the object.
+ self.plasma_client.put(1, object_id=object_id)
+
+ # Check that the store is still alive. This will raise an exception if
+ # the store is dead.
+ self.plasma_client.contains(random_object_id())
+
+ @staticmethod
+ def _client_get_multiple(plasma_store_name, object_ids):
+ import pyarrow.plasma as plasma
+ client = plasma.connect(plasma_store_name)
+ # Try to get an object ID that doesn't exist. This should block.
+ client.get(object_ids)
+
+ def test_client_getting_multiple_objects(self):
+ object_ids = [random_object_id() for _ in range(10)]
+
+ p = multiprocessing.Process(target=self._client_get_multiple,
+ args=(self.plasma_store_name, object_ids))
+ p.start()
+ # Make sure the process is running.
+ time.sleep(0.2)
+ assert p.is_alive()
+
+ # Create the objects one by one.
+ for object_id in object_ids:
+ self.plasma_client.put(1, object_id=object_id)
+
+ # Check that the store is still alive. This will raise an exception if
+ # the store is dead.
+ self.plasma_client.contains(random_object_id())
+
+ # Make sure that the blocked client finishes.
+ start_time = time.time()
+ while True:
+ if time.time() - start_time > 5:
+ raise Exception("Timing out while waiting for blocked client "
+ "to finish.")
+ if not p.is_alive():
+ break
+
+
+@pytest.mark.plasma
+class TestEvictionToExternalStore:
+
+ def setup_method(self, test_method):
+ import pyarrow.plasma as plasma
+ # Start Plasma store.
+ self.plasma_store_ctx = plasma.start_plasma_store(
+ plasma_store_memory=1000 * 1024,
+ use_valgrind=USE_VALGRIND,
+ external_store=EXTERNAL_STORE)
+ self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
+ # Connect to Plasma.
+ self.plasma_client = plasma.connect(self.plasma_store_name)
+
+ def teardown_method(self, test_method):
+ try:
+ # Check that the Plasma store is still alive.
+ assert self.p.poll() is None
+ self.p.send_signal(signal.SIGTERM)
+ self.p.wait(timeout=5)
+ finally:
+ self.plasma_store_ctx.__exit__(None, None, None)
+
+ def test_eviction(self):
+ client = self.plasma_client
+
+ object_ids = [random_object_id() for _ in range(0, 20)]
+ data = b'x' * 100 * 1024
+ metadata = b''
+
+ for i in range(0, 20):
+ # Test for object non-existence.
+ assert not client.contains(object_ids[i])
+
+ # Create and seal the object.
+ client.create_and_seal(object_ids[i], data, metadata)
+
+ # Test that the client can get the object.
+ assert client.contains(object_ids[i])
+
+ for i in range(0, 20):
+ # Since we are accessing objects sequentially, every object we
+ # access would be a cache "miss" owing to LRU eviction.
+ # Try and access the object from the plasma store first, and then
+ # try external store on failure. This should succeed to fetch the
+ # object. However, it may evict the next few objects.
+ [result] = client.get_buffers([object_ids[i]])
+ assert result.to_pybytes() == data
+
+ # Make sure we still cannot fetch objects that do not exist
+ [result] = client.get_buffers([random_object_id()], timeout_ms=100)
+ assert result is None
+
+
+@pytest.mark.plasma
+def test_object_id_size():
+ import pyarrow.plasma as plasma
+ with pytest.raises(ValueError):
+ plasma.ObjectID("hello")
+ plasma.ObjectID(20 * b"0")
+
+
+@pytest.mark.plasma
+def test_object_id_equality_operators():
+ import pyarrow.plasma as plasma
+
+ oid1 = plasma.ObjectID(20 * b'0')
+ oid2 = plasma.ObjectID(20 * b'0')
+ oid3 = plasma.ObjectID(19 * b'0' + b'1')
+
+ assert oid1 == oid2
+ assert oid2 != oid3
+ assert oid1 != 'foo'
+
+
+@pytest.mark.xfail(reason="often fails on travis")
+@pytest.mark.skipif(not os.path.exists("/mnt/hugepages"),
+ reason="requires hugepage support")
+def test_use_huge_pages():
+ import pyarrow.plasma as plasma
+ with plasma.start_plasma_store(
+ plasma_store_memory=2*10**9,
+ plasma_directory="/mnt/hugepages",
+ use_hugepages=True) as (plasma_store_name, p):
+ plasma_client = plasma.connect(plasma_store_name)
+ create_object(plasma_client, 10**8)
+
+
+# This is checking to make sure plasma_clients cannot be destroyed
+# before all the PlasmaBuffers that have handles to them are
+# destroyed, see ARROW-2448.
+@pytest.mark.plasma
+def test_plasma_client_sharing():
+ import pyarrow.plasma as plasma
+
+ with plasma.start_plasma_store(
+ plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
+ as (plasma_store_name, p):
+ plasma_client = plasma.connect(plasma_store_name)
+ object_id = plasma_client.put(np.zeros(3))
+ buf = plasma_client.get(object_id)
+ del plasma_client
+ assert (buf == np.zeros(3)).all()
+ del buf # This segfaulted pre ARROW-2448.
+
+
+@pytest.mark.plasma
+def test_plasma_list():
+ import pyarrow.plasma as plasma
+
+ with plasma.start_plasma_store(
+ plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
+ as (plasma_store_name, p):
+ plasma_client = plasma.connect(plasma_store_name)
+
+ # Test sizes
+ u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False)
+ l1 = plasma_client.list()
+ assert l1[u]["data_size"] == 11
+ assert l1[u]["metadata_size"] == 7
+
+ # Test ref_count
+ v = plasma_client.put(np.zeros(3))
+ # Ref count has already been released
+ # XXX flaky test, disabled (ARROW-3344)
+ # l2 = plasma_client.list()
+ # assert l2[v]["ref_count"] == 0
+ a = plasma_client.get(v)
+ l3 = plasma_client.list()
+ assert l3[v]["ref_count"] == 1
+ del a
+
+ # Test state
+ w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
+ l4 = plasma_client.list()
+ assert l4[w]["state"] == "created"
+ plasma_client.seal(w)
+ l5 = plasma_client.list()
+ assert l5[w]["state"] == "sealed"
+
+ # Test timestamps
+ slack = 1.5 # seconds
+ t1 = time.time()
+ x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False)
+ t2 = time.time()
+ l6 = plasma_client.list()
+ assert t1 - slack <= l6[x]["create_time"] <= t2 + slack
+ time.sleep(2.0)
+ t3 = time.time()
+ plasma_client.seal(x)
+ t4 = time.time()
+ l7 = plasma_client.list()
+ assert t3 - t2 - slack <= l7[x]["construct_duration"]
+ assert l7[x]["construct_duration"] <= t4 - t1 + slack
+
+
+@pytest.mark.plasma
+def test_object_id_randomness():
+ cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())"
+ first_object_id = subprocess.check_output([sys.executable, "-c", cmd])
+ second_object_id = subprocess.check_output([sys.executable, "-c", cmd])
+ assert first_object_id != second_object_id
+
+
+@pytest.mark.plasma
+def test_store_capacity():
+ import pyarrow.plasma as plasma
+ with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p):
+ plasma_client = plasma.connect(name)
+ assert plasma_client.store_capacity() == 10000