diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/pyarrow/tests/test_plasma.py | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/pyarrow/tests/test_plasma.py')
-rw-r--r-- | src/arrow/python/pyarrow/tests/test_plasma.py | 1073 |
1 files changed, 1073 insertions, 0 deletions
diff --git a/src/arrow/python/pyarrow/tests/test_plasma.py b/src/arrow/python/pyarrow/tests/test_plasma.py new file mode 100644 index 000000000..ed08a6872 --- /dev/null +++ b/src/arrow/python/pyarrow/tests/test_plasma.py @@ -0,0 +1,1073 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +import multiprocessing +import os +import pytest +import random +import signal +import struct +import subprocess +import sys +import time + +import numpy as np +import pyarrow as pa + + +DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8 +USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1" +EXTERNAL_STORE = "hashtable://test" +SMALL_OBJECT_SIZE = 9000 + + +def random_name(): + return str(random.randint(0, 99999999)) + + +def random_object_id(): + import pyarrow.plasma as plasma + return plasma.ObjectID(np.random.bytes(20)) + + +def generate_metadata(length): + metadata = bytearray(length) + if length > 0: + metadata[0] = random.randint(0, 255) + metadata[-1] = random.randint(0, 255) + for _ in range(100): + metadata[random.randint(0, length - 1)] = random.randint(0, 255) + return metadata + + +def write_to_data_buffer(buff, length): + array = np.frombuffer(buff, dtype="uint8") + if length > 0: + array[0] = random.randint(0, 255) + array[-1] = random.randint(0, 255) + for _ in range(100): + array[random.randint(0, length - 1)] = random.randint(0, 255) + + +def create_object_with_id(client, object_id, data_size, metadata_size, + seal=True): + metadata = generate_metadata(metadata_size) + memory_buffer = client.create(object_id, data_size, metadata) + write_to_data_buffer(memory_buffer, data_size) + if seal: + client.seal(object_id) + return memory_buffer, metadata + + +def create_object(client, data_size, metadata_size=0, seal=True): + object_id = random_object_id() + memory_buffer, metadata = create_object_with_id(client, object_id, + data_size, metadata_size, + seal=seal) + return object_id, memory_buffer, metadata + + +@pytest.mark.plasma +class TestPlasmaClient: + + def setup_method(self, test_method): + import pyarrow.plasma as plasma + # Start Plasma store. + self.plasma_store_ctx = plasma.start_plasma_store( + plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, + use_valgrind=USE_VALGRIND) + self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() + # Connect to Plasma. + self.plasma_client = plasma.connect(self.plasma_store_name) + self.plasma_client2 = plasma.connect(self.plasma_store_name) + + def teardown_method(self, test_method): + try: + # Check that the Plasma store is still alive. + assert self.p.poll() is None + # Ensure Valgrind and/or coverage have a clean exit + # Valgrind misses SIGTERM if it is delivered before the + # event loop is ready; this race condition is mitigated + # but not solved by time.sleep(). + if USE_VALGRIND: + time.sleep(1.0) + self.p.send_signal(signal.SIGTERM) + self.p.wait(timeout=5) + assert self.p.returncode == 0 + finally: + self.plasma_store_ctx.__exit__(None, None, None) + + def test_connection_failure_raises_exception(self): + import pyarrow.plasma as plasma + # ARROW-1264 + with pytest.raises(IOError): + plasma.connect('unknown-store-name', num_retries=1) + + def test_create(self): + # Create an object id string. + object_id = random_object_id() + # Create a new buffer and write to it. + length = 50 + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + # Seal the object. + self.plasma_client.seal(object_id) + # Get the object. + memory_buffer = np.frombuffer( + self.plasma_client.get_buffers([object_id])[0], dtype="uint8") + for i in range(length): + assert memory_buffer[i] == i % 256 + + def test_create_with_metadata(self): + for length in range(0, 1000, 3): + # Create an object id string. + object_id = random_object_id() + # Create a random metadata string. + metadata = generate_metadata(length) + # Create a new buffer and write to it. + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + # Seal the object. + self.plasma_client.seal(object_id) + # Get the object. + memory_buffer = np.frombuffer( + self.plasma_client.get_buffers([object_id])[0], dtype="uint8") + for i in range(length): + assert memory_buffer[i] == i % 256 + # Get the metadata. + metadata_buffer = np.frombuffer( + self.plasma_client.get_metadata([object_id])[0], dtype="uint8") + assert len(metadata) == len(metadata_buffer) + for i in range(len(metadata)): + assert metadata[i] == metadata_buffer[i] + + def test_create_existing(self): + # This test is partially used to test the code path in which we create + # an object with an ID that already exists + length = 100 + for _ in range(1000): + object_id = random_object_id() + self.plasma_client.create(object_id, length, + generate_metadata(length)) + try: + self.plasma_client.create(object_id, length, + generate_metadata(length)) + # TODO(pcm): Introduce a more specific error type here. + except pa.lib.ArrowException: + pass + else: + assert False + + def test_create_and_seal(self): + + # Create a bunch of objects. + object_ids = [] + for i in range(1000): + object_id = random_object_id() + object_ids.append(object_id) + self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b') + + for i in range(1000): + [data_tuple] = self.plasma_client.get_buffers([object_ids[i]], + with_meta=True) + assert data_tuple[1].to_pybytes() == i * b'a' + assert (self.plasma_client.get_metadata( + [object_ids[i]])[0].to_pybytes() == + i * b'b') + + # Make sure that creating the same object twice raises an exception. + object_id = random_object_id() + self.plasma_client.create_and_seal(object_id, b'a', b'b') + with pytest.raises(pa.plasma.PlasmaObjectExists): + self.plasma_client.create_and_seal(object_id, b'a', b'b') + + # Make sure that these objects can be evicted. + big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a' + object_ids = [] + for _ in range(20): + object_id = random_object_id() + object_ids.append(object_id) + self.plasma_client.create_and_seal(random_object_id(), big_object, + big_object) + for i in range(10): + assert not self.plasma_client.contains(object_ids[i]) + + def test_get(self): + num_object_ids = 60 + # Test timing out of get with various timeouts. + for timeout in [0, 10, 100, 1000]: + object_ids = [random_object_id() for _ in range(num_object_ids)] + results = self.plasma_client.get_buffers(object_ids, + timeout_ms=timeout) + assert results == num_object_ids * [None] + + data_buffers = [] + metadata_buffers = [] + for i in range(num_object_ids): + if i % 2 == 0: + data_buffer, metadata_buffer = create_object_with_id( + self.plasma_client, object_ids[i], 2000, 2000) + data_buffers.append(data_buffer) + metadata_buffers.append(metadata_buffer) + + # Test timing out from some but not all get calls with various + # timeouts. + for timeout in [0, 10, 100, 1000]: + data_results = self.plasma_client.get_buffers(object_ids, + timeout_ms=timeout) + # metadata_results = self.plasma_client.get_metadata( + # object_ids, timeout_ms=timeout) + for i in range(num_object_ids): + if i % 2 == 0: + array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8") + array2 = np.frombuffer(data_results[i], dtype="uint8") + np.testing.assert_equal(array1, array2) + # TODO(rkn): We should compare the metadata as well. But + # currently the types are different (e.g., memoryview + # versus bytearray). + # assert plasma.buffers_equal( + # metadata_buffers[i // 2], metadata_results[i]) + else: + assert results[i] is None + + # Test trying to get an object that was created by the same client but + # not sealed. + object_id = random_object_id() + self.plasma_client.create(object_id, 10, b"metadata") + assert self.plasma_client.get_buffers( + [object_id], timeout_ms=0, with_meta=True)[0][1] is None + assert self.plasma_client.get_buffers( + [object_id], timeout_ms=1, with_meta=True)[0][1] is None + self.plasma_client.seal(object_id) + assert self.plasma_client.get_buffers( + [object_id], timeout_ms=0, with_meta=True)[0][1] is not None + + def test_buffer_lifetime(self): + # ARROW-2195 + arr = pa.array([1, 12, 23, 3, 34], pa.int32()) + batch = pa.RecordBatch.from_arrays([arr], ['field1']) + + # Serialize RecordBatch into Plasma store + sink = pa.MockOutputStream() + writer = pa.RecordBatchStreamWriter(sink, batch.schema) + writer.write_batch(batch) + writer.close() + + object_id = random_object_id() + data_buffer = self.plasma_client.create(object_id, sink.size()) + stream = pa.FixedSizeBufferWriter(data_buffer) + writer = pa.RecordBatchStreamWriter(stream, batch.schema) + writer.write_batch(batch) + writer.close() + self.plasma_client.seal(object_id) + del data_buffer + + # Unserialize RecordBatch from Plasma store + [data_buffer] = self.plasma_client2.get_buffers([object_id]) + reader = pa.RecordBatchStreamReader(data_buffer) + read_batch = reader.read_next_batch() + # Lose reference to returned buffer. The RecordBatch must still + # be backed by valid memory. + del data_buffer, reader + + assert read_batch.equals(batch) + + def test_put_and_get(self): + for value in [["hello", "world", 3, 1.0], None, "hello"]: + object_id = self.plasma_client.put(value) + [result] = self.plasma_client.get([object_id]) + assert result == value + + result = self.plasma_client.get(object_id) + assert result == value + + object_id = random_object_id() + [result] = self.plasma_client.get([object_id], timeout_ms=0) + assert result == pa.plasma.ObjectNotAvailable + + @pytest.mark.filterwarnings( + "ignore:'pyarrow.deserialize':FutureWarning") + def test_put_and_get_raw_buffer(self): + temp_id = random_object_id() + use_meta = b"RAW" + + def deserialize_or_output(data_tuple): + if data_tuple[0] == use_meta: + return data_tuple[1].to_pybytes() + else: + if data_tuple[1] is None: + return pa.plasma.ObjectNotAvailable + else: + return pa.deserialize(data_tuple[1]) + + for value in [b"Bytes Test", temp_id.binary(), 10 * b"\x00", 123]: + if isinstance(value, bytes): + object_id = self.plasma_client.put_raw_buffer( + value, metadata=use_meta) + else: + object_id = self.plasma_client.put(value) + [result] = self.plasma_client.get_buffers([object_id], + with_meta=True) + result = deserialize_or_output(result) + assert result == value + + object_id = random_object_id() + [result] = self.plasma_client.get_buffers([object_id], + timeout_ms=0, + with_meta=True) + result = deserialize_or_output(result) + assert result == pa.plasma.ObjectNotAvailable + + @pytest.mark.filterwarnings( + "ignore:'serialization_context':FutureWarning") + def test_put_and_get_serialization_context(self): + + class CustomType: + def __init__(self, val): + self.val = val + + val = CustomType(42) + + with pytest.raises(pa.ArrowSerializationError): + self.plasma_client.put(val) + + serialization_context = pa.lib.SerializationContext() + serialization_context.register_type(CustomType, 20*"\x00") + + object_id = self.plasma_client.put( + val, None, serialization_context=serialization_context) + + with pytest.raises(pa.ArrowSerializationError): + result = self.plasma_client.get(object_id) + + result = self.plasma_client.get( + object_id, -1, serialization_context=serialization_context) + assert result.val == val.val + + def test_store_arrow_objects(self): + data = np.random.randn(10, 4) + # Write an arrow object. + object_id = random_object_id() + tensor = pa.Tensor.from_numpy(data) + data_size = pa.ipc.get_tensor_size(tensor) + buf = self.plasma_client.create(object_id, data_size) + stream = pa.FixedSizeBufferWriter(buf) + pa.ipc.write_tensor(tensor, stream) + self.plasma_client.seal(object_id) + # Read the arrow object. + [tensor] = self.plasma_client.get_buffers([object_id]) + reader = pa.BufferReader(tensor) + array = pa.ipc.read_tensor(reader).to_numpy() + # Assert that they are equal. + np.testing.assert_equal(data, array) + + @pytest.mark.pandas + def test_store_pandas_dataframe(self): + import pandas as pd + import pyarrow.plasma as plasma + d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']), + 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])} + df = pd.DataFrame(d) + + # Write the DataFrame. + record_batch = pa.RecordBatch.from_pandas(df) + # Determine the size. + s = pa.MockOutputStream() + stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema) + stream_writer.write_batch(record_batch) + data_size = s.size() + object_id = plasma.ObjectID(np.random.bytes(20)) + + buf = self.plasma_client.create(object_id, data_size) + stream = pa.FixedSizeBufferWriter(buf) + stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) + stream_writer.write_batch(record_batch) + + self.plasma_client.seal(object_id) + + # Read the DataFrame. + [data] = self.plasma_client.get_buffers([object_id]) + reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) + result = reader.read_next_batch().to_pandas() + + pd.testing.assert_frame_equal(df, result) + + def test_pickle_object_ids(self): + # This can be used for sharing object IDs between processes. + import pickle + object_id = random_object_id() + data = pickle.dumps(object_id) + object_id2 = pickle.loads(data) + assert object_id == object_id2 + + def test_store_full(self): + # The store is started with 1GB, so make sure that create throws an + # exception when it is full. + def assert_create_raises_plasma_full(unit_test, size): + partial_size = np.random.randint(size) + try: + _, memory_buffer, _ = create_object(unit_test.plasma_client, + partial_size, + size - partial_size) + # TODO(pcm): More specific error here. + except pa.lib.ArrowException: + pass + else: + # For some reason the above didn't throw an exception, so fail. + assert False + + PERCENT = DEFAULT_PLASMA_STORE_MEMORY // 100 + + # Create a list to keep some of the buffers in scope. + memory_buffers = [] + _, memory_buffer, _ = create_object(self.plasma_client, 50 * PERCENT) + memory_buffers.append(memory_buffer) + # Remaining space is 50%. Make sure that we can't create an + # object of size 50% + 1, but we can create one of size 20%. + assert_create_raises_plasma_full( + self, 50 * PERCENT + SMALL_OBJECT_SIZE) + _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) + del memory_buffer + _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) + del memory_buffer + assert_create_raises_plasma_full( + self, 50 * PERCENT + SMALL_OBJECT_SIZE) + + _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) + memory_buffers.append(memory_buffer) + # Remaining space is 30%. + assert_create_raises_plasma_full( + self, 30 * PERCENT + SMALL_OBJECT_SIZE) + + _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT) + memory_buffers.append(memory_buffer) + # Remaining space is 20%. + assert_create_raises_plasma_full( + self, 20 * PERCENT + SMALL_OBJECT_SIZE) + + def test_contains(self): + fake_object_ids = [random_object_id() for _ in range(100)] + real_object_ids = [random_object_id() for _ in range(100)] + for object_id in real_object_ids: + assert self.plasma_client.contains(object_id) is False + self.plasma_client.create(object_id, 100) + self.plasma_client.seal(object_id) + assert self.plasma_client.contains(object_id) + for object_id in fake_object_ids: + assert not self.plasma_client.contains(object_id) + for object_id in real_object_ids: + assert self.plasma_client.contains(object_id) + + def test_hash(self): + # Check the hash of an object that doesn't exist. + object_id1 = random_object_id() + try: + self.plasma_client.hash(object_id1) + # TODO(pcm): Introduce a more specific error type here + except pa.lib.ArrowException: + pass + else: + assert False + + length = 1000 + # Create a random object, and check that the hash function always + # returns the same value. + metadata = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id1, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + self.plasma_client.seal(object_id1) + assert (self.plasma_client.hash(object_id1) == + self.plasma_client.hash(object_id1)) + + # Create a second object with the same value as the first, and check + # that their hashes are equal. + object_id2 = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id2, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = i % 256 + self.plasma_client.seal(object_id2) + assert (self.plasma_client.hash(object_id1) == + self.plasma_client.hash(object_id2)) + + # Create a third object with a different value from the first two, and + # check that its hash is different. + object_id3 = random_object_id() + metadata = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id3, + length, + metadata), + dtype="uint8") + for i in range(length): + memory_buffer[i] = (i + 1) % 256 + self.plasma_client.seal(object_id3) + assert (self.plasma_client.hash(object_id1) != + self.plasma_client.hash(object_id3)) + + # Create a fourth object with the same value as the third, but + # different metadata. Check that its hash is different from any of the + # previous three. + object_id4 = random_object_id() + metadata4 = generate_metadata(length) + memory_buffer = np.frombuffer(self.plasma_client.create(object_id4, + length, + metadata4), + dtype="uint8") + for i in range(length): + memory_buffer[i] = (i + 1) % 256 + self.plasma_client.seal(object_id4) + assert (self.plasma_client.hash(object_id1) != + self.plasma_client.hash(object_id4)) + assert (self.plasma_client.hash(object_id3) != + self.plasma_client.hash(object_id4)) + + def test_many_hashes(self): + hashes = [] + length = 2 ** 10 + + for i in range(256): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for j in range(length): + memory_buffer[j] = i + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Create objects of varying length. Each pair has two bits different. + for i in range(length): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + length), + dtype="uint8") + for j in range(length): + memory_buffer[j] = 0 + memory_buffer[i] = 1 + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Create objects of varying length, all with value 0. + for i in range(length): + object_id = random_object_id() + memory_buffer = np.frombuffer(self.plasma_client.create(object_id, + i), + dtype="uint8") + for j in range(i): + memory_buffer[j] = 0 + self.plasma_client.seal(object_id) + hashes.append(self.plasma_client.hash(object_id)) + + # Check that all hashes were unique. + assert len(set(hashes)) == 256 + length + length + + # def test_individual_delete(self): + # length = 100 + # # Create an object id string. + # object_id = random_object_id() + # # Create a random metadata string. + # metadata = generate_metadata(100) + # # Create a new buffer and write to it. + # memory_buffer = self.plasma_client.create(object_id, length, + # metadata) + # for i in range(length): + # memory_buffer[i] = chr(i % 256) + # # Seal the object. + # self.plasma_client.seal(object_id) + # # Check that the object is present. + # assert self.plasma_client.contains(object_id) + # # Delete the object. + # self.plasma_client.delete(object_id) + # # Make sure the object is no longer present. + # self.assertFalse(self.plasma_client.contains(object_id)) + # + # def test_delete(self): + # # Create some objects. + # object_ids = [random_object_id() for _ in range(100)] + # for object_id in object_ids: + # length = 100 + # # Create a random metadata string. + # metadata = generate_metadata(100) + # # Create a new buffer and write to it. + # memory_buffer = self.plasma_client.create(object_id, length, + # metadata) + # for i in range(length): + # memory_buffer[i] = chr(i % 256) + # # Seal the object. + # self.plasma_client.seal(object_id) + # # Check that the object is present. + # assert self.plasma_client.contains(object_id) + # + # # Delete the objects and make sure they are no longer present. + # for object_id in object_ids: + # # Delete the object. + # self.plasma_client.delete(object_id) + # # Make sure the object is no longer present. + # self.assertFalse(self.plasma_client.contains(object_id)) + + def test_illegal_functionality(self): + # Create an object id string. + object_id = random_object_id() + # Create a new buffer and write to it. + length = 1000 + memory_buffer = self.plasma_client.create(object_id, length) + # Make sure we cannot access memory out of bounds. + with pytest.raises(Exception): + memory_buffer[length] + # Seal the object. + self.plasma_client.seal(object_id) + # This test is commented out because it currently fails. + # # Make sure the object is ready only now. + # def illegal_assignment(): + # memory_buffer[0] = chr(0) + # with pytest.raises(Exception): + # illegal_assignment() + # Get the object. + memory_buffer = self.plasma_client.get_buffers([object_id])[0] + + # Make sure the object is read only. + def illegal_assignment(): + memory_buffer[0] = chr(0) + with pytest.raises(Exception): + illegal_assignment() + + def test_evict(self): + client = self.plasma_client2 + object_id1 = random_object_id() + b1 = client.create(object_id1, 1000) + client.seal(object_id1) + del b1 + assert client.evict(1) == 1000 + + object_id2 = random_object_id() + object_id3 = random_object_id() + b2 = client.create(object_id2, 999) + b3 = client.create(object_id3, 998) + client.seal(object_id3) + del b3 + assert client.evict(1000) == 998 + + object_id4 = random_object_id() + b4 = client.create(object_id4, 997) + client.seal(object_id4) + del b4 + client.seal(object_id2) + del b2 + assert client.evict(1) == 997 + assert client.evict(1) == 999 + + object_id5 = random_object_id() + object_id6 = random_object_id() + object_id7 = random_object_id() + b5 = client.create(object_id5, 996) + b6 = client.create(object_id6, 995) + b7 = client.create(object_id7, 994) + client.seal(object_id5) + client.seal(object_id6) + client.seal(object_id7) + del b5 + del b6 + del b7 + assert client.evict(2000) == 996 + 995 + 994 + + # Mitigate valgrind-induced slowness + SUBSCRIBE_TEST_SIZES = ([1, 10, 100, 1000] if USE_VALGRIND + else [1, 10, 100, 1000, 10000]) + + def test_subscribe(self): + # Subscribe to notifications from the Plasma Store. + self.plasma_client.subscribe() + for i in self.SUBSCRIBE_TEST_SIZES: + object_ids = [random_object_id() for _ in range(i)] + metadata_sizes = [np.random.randint(1000) for _ in range(i)] + data_sizes = [np.random.randint(1000) for _ in range(i)] + for j in range(i): + self.plasma_client.create( + object_ids[j], data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client.seal(object_ids[j]) + # Check that we received notifications for all of the objects. + for j in range(i): + notification_info = self.plasma_client.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert data_sizes[j] == recv_dsize + assert metadata_sizes[j] == recv_msize + + def test_subscribe_socket(self): + # Subscribe to notifications from the Plasma Store. + self.plasma_client.subscribe() + rsock = self.plasma_client.get_notification_socket() + for i in self.SUBSCRIBE_TEST_SIZES: + # Get notification from socket. + object_ids = [random_object_id() for _ in range(i)] + metadata_sizes = [np.random.randint(1000) for _ in range(i)] + data_sizes = [np.random.randint(1000) for _ in range(i)] + + for j in range(i): + self.plasma_client.create( + object_ids[j], data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client.seal(object_ids[j]) + + # Check that we received notifications for all of the objects. + for j in range(i): + # Assume the plasma store will not be full, + # so we always get the data size instead of -1. + msg_len, = struct.unpack('L', rsock.recv(8)) + content = rsock.recv(msg_len) + recv_objids, recv_dsizes, recv_msizes = ( + self.plasma_client.decode_notifications(content)) + assert object_ids[j] == recv_objids[0] + assert data_sizes[j] == recv_dsizes[0] + assert metadata_sizes[j] == recv_msizes[0] + + def test_subscribe_deletions(self): + # Subscribe to notifications from the Plasma Store. We use + # plasma_client2 to make sure that all used objects will get evicted + # properly. + self.plasma_client2.subscribe() + for i in self.SUBSCRIBE_TEST_SIZES: + object_ids = [random_object_id() for _ in range(i)] + # Add 1 to the sizes to make sure we have nonzero object sizes. + metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)] + data_sizes = [np.random.randint(1000) + 1 for _ in range(i)] + for j in range(i): + x = self.plasma_client2.create( + object_ids[j], data_sizes[j], + metadata=bytearray(np.random.bytes(metadata_sizes[j]))) + self.plasma_client2.seal(object_ids[j]) + del x + # Check that we received notifications for creating all of the + # objects. + for j in range(i): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert data_sizes[j] == recv_dsize + assert metadata_sizes[j] == recv_msize + + # Check that we receive notifications for deleting all objects, as + # we evict them. + for j in range(i): + assert (self.plasma_client2.evict(1) == + data_sizes[j] + metadata_sizes[j]) + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[j] == recv_objid + assert -1 == recv_dsize + assert -1 == recv_msize + + # Test multiple deletion notifications. The first 9 object IDs have + # size 0, and the last has a nonzero size. When Plasma evicts 1 byte, + # it will evict all objects, so we should receive deletion + # notifications for each. + num_object_ids = 10 + object_ids = [random_object_id() for _ in range(num_object_ids)] + metadata_sizes = [0] * (num_object_ids - 1) + data_sizes = [0] * (num_object_ids - 1) + metadata_sizes.append(np.random.randint(1000)) + data_sizes.append(np.random.randint(1000)) + for i in range(num_object_ids): + x = self.plasma_client2.create( + object_ids[i], data_sizes[i], + metadata=bytearray(np.random.bytes(metadata_sizes[i]))) + self.plasma_client2.seal(object_ids[i]) + del x + for i in range(num_object_ids): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[i] == recv_objid + assert data_sizes[i] == recv_dsize + assert metadata_sizes[i] == recv_msize + assert (self.plasma_client2.evict(1) == + data_sizes[-1] + metadata_sizes[-1]) + for i in range(num_object_ids): + notification_info = self.plasma_client2.get_next_notification() + recv_objid, recv_dsize, recv_msize = notification_info + assert object_ids[i] == recv_objid + assert -1 == recv_dsize + assert -1 == recv_msize + + def test_use_full_memory(self): + # Fill the object store up with a large number of small objects and let + # them go out of scope. + for _ in range(100): + create_object( + self.plasma_client2, + np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0) + # Create large objects that require the full object store size, and + # verify that they fit. + for _ in range(2): + create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) + # Verify that an object that is too large does not fit. + # Also verifies that the right error is thrown, and does not + # create the object ID prematurely. + object_id = random_object_id() + for i in range(3): + with pytest.raises(pa.plasma.PlasmaStoreFull): + self.plasma_client2.create( + object_id, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE) + + @staticmethod + def _client_blocked_in_get(plasma_store_name, object_id): + import pyarrow.plasma as plasma + client = plasma.connect(plasma_store_name) + # Try to get an object ID that doesn't exist. This should block. + client.get([object_id]) + + def test_client_death_during_get(self): + object_id = random_object_id() + + p = multiprocessing.Process(target=self._client_blocked_in_get, + args=(self.plasma_store_name, object_id)) + p.start() + # Make sure the process is running. + time.sleep(0.2) + assert p.is_alive() + + # Kill the client process. + p.terminate() + # Wait a little for the store to process the disconnect event. + time.sleep(0.1) + + # Create the object. + self.plasma_client.put(1, object_id=object_id) + + # Check that the store is still alive. This will raise an exception if + # the store is dead. + self.plasma_client.contains(random_object_id()) + + @staticmethod + def _client_get_multiple(plasma_store_name, object_ids): + import pyarrow.plasma as plasma + client = plasma.connect(plasma_store_name) + # Try to get an object ID that doesn't exist. This should block. + client.get(object_ids) + + def test_client_getting_multiple_objects(self): + object_ids = [random_object_id() for _ in range(10)] + + p = multiprocessing.Process(target=self._client_get_multiple, + args=(self.plasma_store_name, object_ids)) + p.start() + # Make sure the process is running. + time.sleep(0.2) + assert p.is_alive() + + # Create the objects one by one. + for object_id in object_ids: + self.plasma_client.put(1, object_id=object_id) + + # Check that the store is still alive. This will raise an exception if + # the store is dead. + self.plasma_client.contains(random_object_id()) + + # Make sure that the blocked client finishes. + start_time = time.time() + while True: + if time.time() - start_time > 5: + raise Exception("Timing out while waiting for blocked client " + "to finish.") + if not p.is_alive(): + break + + +@pytest.mark.plasma +class TestEvictionToExternalStore: + + def setup_method(self, test_method): + import pyarrow.plasma as plasma + # Start Plasma store. + self.plasma_store_ctx = plasma.start_plasma_store( + plasma_store_memory=1000 * 1024, + use_valgrind=USE_VALGRIND, + external_store=EXTERNAL_STORE) + self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() + # Connect to Plasma. + self.plasma_client = plasma.connect(self.plasma_store_name) + + def teardown_method(self, test_method): + try: + # Check that the Plasma store is still alive. + assert self.p.poll() is None + self.p.send_signal(signal.SIGTERM) + self.p.wait(timeout=5) + finally: + self.plasma_store_ctx.__exit__(None, None, None) + + def test_eviction(self): + client = self.plasma_client + + object_ids = [random_object_id() for _ in range(0, 20)] + data = b'x' * 100 * 1024 + metadata = b'' + + for i in range(0, 20): + # Test for object non-existence. + assert not client.contains(object_ids[i]) + + # Create and seal the object. + client.create_and_seal(object_ids[i], data, metadata) + + # Test that the client can get the object. + assert client.contains(object_ids[i]) + + for i in range(0, 20): + # Since we are accessing objects sequentially, every object we + # access would be a cache "miss" owing to LRU eviction. + # Try and access the object from the plasma store first, and then + # try external store on failure. This should succeed to fetch the + # object. However, it may evict the next few objects. + [result] = client.get_buffers([object_ids[i]]) + assert result.to_pybytes() == data + + # Make sure we still cannot fetch objects that do not exist + [result] = client.get_buffers([random_object_id()], timeout_ms=100) + assert result is None + + +@pytest.mark.plasma +def test_object_id_size(): + import pyarrow.plasma as plasma + with pytest.raises(ValueError): + plasma.ObjectID("hello") + plasma.ObjectID(20 * b"0") + + +@pytest.mark.plasma +def test_object_id_equality_operators(): + import pyarrow.plasma as plasma + + oid1 = plasma.ObjectID(20 * b'0') + oid2 = plasma.ObjectID(20 * b'0') + oid3 = plasma.ObjectID(19 * b'0' + b'1') + + assert oid1 == oid2 + assert oid2 != oid3 + assert oid1 != 'foo' + + +@pytest.mark.xfail(reason="often fails on travis") +@pytest.mark.skipif(not os.path.exists("/mnt/hugepages"), + reason="requires hugepage support") +def test_use_huge_pages(): + import pyarrow.plasma as plasma + with plasma.start_plasma_store( + plasma_store_memory=2*10**9, + plasma_directory="/mnt/hugepages", + use_hugepages=True) as (plasma_store_name, p): + plasma_client = plasma.connect(plasma_store_name) + create_object(plasma_client, 10**8) + + +# This is checking to make sure plasma_clients cannot be destroyed +# before all the PlasmaBuffers that have handles to them are +# destroyed, see ARROW-2448. +@pytest.mark.plasma +def test_plasma_client_sharing(): + import pyarrow.plasma as plasma + + with plasma.start_plasma_store( + plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ + as (plasma_store_name, p): + plasma_client = plasma.connect(plasma_store_name) + object_id = plasma_client.put(np.zeros(3)) + buf = plasma_client.get(object_id) + del plasma_client + assert (buf == np.zeros(3)).all() + del buf # This segfaulted pre ARROW-2448. + + +@pytest.mark.plasma +def test_plasma_list(): + import pyarrow.plasma as plasma + + with plasma.start_plasma_store( + plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ + as (plasma_store_name, p): + plasma_client = plasma.connect(plasma_store_name) + + # Test sizes + u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False) + l1 = plasma_client.list() + assert l1[u]["data_size"] == 11 + assert l1[u]["metadata_size"] == 7 + + # Test ref_count + v = plasma_client.put(np.zeros(3)) + # Ref count has already been released + # XXX flaky test, disabled (ARROW-3344) + # l2 = plasma_client.list() + # assert l2[v]["ref_count"] == 0 + a = plasma_client.get(v) + l3 = plasma_client.list() + assert l3[v]["ref_count"] == 1 + del a + + # Test state + w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False) + l4 = plasma_client.list() + assert l4[w]["state"] == "created" + plasma_client.seal(w) + l5 = plasma_client.list() + assert l5[w]["state"] == "sealed" + + # Test timestamps + slack = 1.5 # seconds + t1 = time.time() + x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False) + t2 = time.time() + l6 = plasma_client.list() + assert t1 - slack <= l6[x]["create_time"] <= t2 + slack + time.sleep(2.0) + t3 = time.time() + plasma_client.seal(x) + t4 = time.time() + l7 = plasma_client.list() + assert t3 - t2 - slack <= l7[x]["construct_duration"] + assert l7[x]["construct_duration"] <= t4 - t1 + slack + + +@pytest.mark.plasma +def test_object_id_randomness(): + cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())" + first_object_id = subprocess.check_output([sys.executable, "-c", cmd]) + second_object_id = subprocess.check_output([sys.executable, "-c", cmd]) + assert first_object_id != second_object_id + + +@pytest.mark.plasma +def test_store_capacity(): + import pyarrow.plasma as plasma + with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p): + plasma_client = plasma.connect(name) + assert plasma_client.store_capacity() == 10000 |