# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import multiprocessing import os import pytest import random import signal import struct import subprocess import sys import time import numpy as np import pyarrow as pa DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8 USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1" EXTERNAL_STORE = "hashtable://test" SMALL_OBJECT_SIZE = 9000 def random_name(): return str(random.randint(0, 99999999)) def random_object_id(): import pyarrow.plasma as plasma return plasma.ObjectID(np.random.bytes(20)) def generate_metadata(length): metadata = bytearray(length) if length > 0: metadata[0] = random.randint(0, 255) metadata[-1] = random.randint(0, 255) for _ in range(100): metadata[random.randint(0, length - 1)] = random.randint(0, 255) return metadata def write_to_data_buffer(buff, length): array = np.frombuffer(buff, dtype="uint8") if length > 0: array[0] = random.randint(0, 255) array[-1] = random.randint(0, 255) for _ in range(100): array[random.randint(0, length - 1)] = random.randint(0, 255) def create_object_with_id(client, object_id, data_size, metadata_size, seal=True): metadata = generate_metadata(metadata_size) memory_buffer = client.create(object_id, data_size, metadata) write_to_data_buffer(memory_buffer, data_size) if seal: client.seal(object_id) return memory_buffer, metadata def create_object(client, data_size, metadata_size=0, seal=True): object_id = random_object_id() memory_buffer, metadata = create_object_with_id(client, object_id, data_size, metadata_size, seal=seal) return object_id, memory_buffer, metadata @pytest.mark.plasma class TestPlasmaClient: def setup_method(self, test_method): import pyarrow.plasma as plasma # Start Plasma store. self.plasma_store_ctx = plasma.start_plasma_store( plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, use_valgrind=USE_VALGRIND) self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() # Connect to Plasma. self.plasma_client = plasma.connect(self.plasma_store_name) self.plasma_client2 = plasma.connect(self.plasma_store_name) def teardown_method(self, test_method): try: # Check that the Plasma store is still alive. assert self.p.poll() is None # Ensure Valgrind and/or coverage have a clean exit # Valgrind misses SIGTERM if it is delivered before the # event loop is ready; this race condition is mitigated # but not solved by time.sleep(). if USE_VALGRIND: time.sleep(1.0) self.p.send_signal(signal.SIGTERM) self.p.wait(timeout=5) assert self.p.returncode == 0 finally: self.plasma_store_ctx.__exit__(None, None, None) def test_connection_failure_raises_exception(self): import pyarrow.plasma as plasma # ARROW-1264 with pytest.raises(IOError): plasma.connect('unknown-store-name', num_retries=1) def test_create(self): # Create an object id string. object_id = random_object_id() # Create a new buffer and write to it. length = 50 memory_buffer = np.frombuffer(self.plasma_client.create(object_id, length), dtype="uint8") for i in range(length): memory_buffer[i] = i % 256 # Seal the object. self.plasma_client.seal(object_id) # Get the object. memory_buffer = np.frombuffer( self.plasma_client.get_buffers([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 def test_create_with_metadata(self): for length in range(0, 1000, 3): # Create an object id string. object_id = random_object_id() # Create a random metadata string. metadata = generate_metadata(length) # Create a new buffer and write to it. memory_buffer = np.frombuffer(self.plasma_client.create(object_id, length, metadata), dtype="uint8") for i in range(length): memory_buffer[i] = i % 256 # Seal the object. self.plasma_client.seal(object_id) # Get the object. memory_buffer = np.frombuffer( self.plasma_client.get_buffers([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 # Get the metadata. metadata_buffer = np.frombuffer( self.plasma_client.get_metadata([object_id])[0], dtype="uint8") assert len(metadata) == len(metadata_buffer) for i in range(len(metadata)): assert metadata[i] == metadata_buffer[i] def test_create_existing(self): # This test is partially used to test the code path in which we create # an object with an ID that already exists length = 100 for _ in range(1000): object_id = random_object_id() self.plasma_client.create(object_id, length, generate_metadata(length)) try: self.plasma_client.create(object_id, length, generate_metadata(length)) # TODO(pcm): Introduce a more specific error type here. except pa.lib.ArrowException: pass else: assert False def test_create_and_seal(self): # Create a bunch of objects. object_ids = [] for i in range(1000): object_id = random_object_id() object_ids.append(object_id) self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b') for i in range(1000): [data_tuple] = self.plasma_client.get_buffers([object_ids[i]], with_meta=True) assert data_tuple[1].to_pybytes() == i * b'a' assert (self.plasma_client.get_metadata( [object_ids[i]])[0].to_pybytes() == i * b'b') # Make sure that creating the same object twice raises an exception. object_id = random_object_id() self.plasma_client.create_and_seal(object_id, b'a', b'b') with pytest.raises(pa.plasma.PlasmaObjectExists): self.plasma_client.create_and_seal(object_id, b'a', b'b') # Make sure that these objects can be evicted. big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a' object_ids = [] for _ in range(20): object_id = random_object_id() object_ids.append(object_id) self.plasma_client.create_and_seal(random_object_id(), big_object, big_object) for i in range(10): assert not self.plasma_client.contains(object_ids[i]) def test_get(self): num_object_ids = 60 # Test timing out of get with various timeouts. for timeout in [0, 10, 100, 1000]: object_ids = [random_object_id() for _ in range(num_object_ids)] results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout) assert results == num_object_ids * [None] data_buffers = [] metadata_buffers = [] for i in range(num_object_ids): if i % 2 == 0: data_buffer, metadata_buffer = create_object_with_id( self.plasma_client, object_ids[i], 2000, 2000) data_buffers.append(data_buffer) metadata_buffers.append(metadata_buffer) # Test timing out from some but not all get calls with various # timeouts. for timeout in [0, 10, 100, 1000]: data_results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout) # metadata_results = self.plasma_client.get_metadata( # object_ids, timeout_ms=timeout) for i in range(num_object_ids): if i % 2 == 0: array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8") array2 = np.frombuffer(data_results[i], dtype="uint8") np.testing.assert_equal(array1, array2) # TODO(rkn): We should compare the metadata as well. But # currently the types are different (e.g., memoryview # versus bytearray). # assert plasma.buffers_equal( # metadata_buffers[i // 2], metadata_results[i]) else: assert results[i] is None # Test trying to get an object that was created by the same client but # not sealed. object_id = random_object_id() self.plasma_client.create(object_id, 10, b"metadata") assert self.plasma_client.get_buffers( [object_id], timeout_ms=0, with_meta=True)[0][1] is None assert self.plasma_client.get_buffers( [object_id], timeout_ms=1, with_meta=True)[0][1] is None self.plasma_client.seal(object_id) assert self.plasma_client.get_buffers( [object_id], timeout_ms=0, with_meta=True)[0][1] is not None def test_buffer_lifetime(self): # ARROW-2195 arr = pa.array([1, 12, 23, 3, 34], pa.int32()) batch = pa.RecordBatch.from_arrays([arr], ['field1']) # Serialize RecordBatch into Plasma store sink = pa.MockOutputStream() writer = pa.RecordBatchStreamWriter(sink, batch.schema) writer.write_batch(batch) writer.close() object_id = random_object_id() data_buffer = self.plasma_client.create(object_id, sink.size()) stream = pa.FixedSizeBufferWriter(data_buffer) writer = pa.RecordBatchStreamWriter(stream, batch.schema) writer.write_batch(batch) writer.close() self.plasma_client.seal(object_id) del data_buffer # Unserialize RecordBatch from Plasma store [data_buffer] = self.plasma_client2.get_buffers([object_id]) reader = pa.RecordBatchStreamReader(data_buffer) read_batch = reader.read_next_batch() # Lose reference to returned buffer. The RecordBatch must still # be backed by valid memory. del data_buffer, reader assert read_batch.equals(batch) def test_put_and_get(self): for value in [["hello", "world", 3, 1.0], None, "hello"]: object_id = self.plasma_client.put(value) [result] = self.plasma_client.get([object_id]) assert result == value result = self.plasma_client.get(object_id) assert result == value object_id = random_object_id() [result] = self.plasma_client.get([object_id], timeout_ms=0) assert result == pa.plasma.ObjectNotAvailable @pytest.mark.filterwarnings( "ignore:'pyarrow.deserialize':FutureWarning") def test_put_and_get_raw_buffer(self): temp_id = random_object_id() use_meta = b"RAW" def deserialize_or_output(data_tuple): if data_tuple[0] == use_meta: return data_tuple[1].to_pybytes() else: if data_tuple[1] is None: return pa.plasma.ObjectNotAvailable else: return pa.deserialize(data_tuple[1]) for value in [b"Bytes Test", temp_id.binary(), 10 * b"\x00", 123]: if isinstance(value, bytes): object_id = self.plasma_client.put_raw_buffer( value, metadata=use_meta) else: object_id = self.plasma_client.put(value) [result] = self.plasma_client.get_buffers([object_id], with_meta=True) result = deserialize_or_output(result) assert result == value object_id = random_object_id() [result] = self.plasma_client.get_buffers([object_id], timeout_ms=0, with_meta=True) result = deserialize_or_output(result) assert result == pa.plasma.ObjectNotAvailable @pytest.mark.filterwarnings( "ignore:'serialization_context':FutureWarning") def test_put_and_get_serialization_context(self): class CustomType: def __init__(self, val): self.val = val val = CustomType(42) with pytest.raises(pa.ArrowSerializationError): self.plasma_client.put(val) serialization_context = pa.lib.SerializationContext() serialization_context.register_type(CustomType, 20*"\x00") object_id = self.plasma_client.put( val, None, serialization_context=serialization_context) with pytest.raises(pa.ArrowSerializationError): result = self.plasma_client.get(object_id) result = self.plasma_client.get( object_id, -1, serialization_context=serialization_context) assert result.val == val.val def test_store_arrow_objects(self): data = np.random.randn(10, 4) # Write an arrow object. object_id = random_object_id() tensor = pa.Tensor.from_numpy(data) data_size = pa.ipc.get_tensor_size(tensor) buf = self.plasma_client.create(object_id, data_size) stream = pa.FixedSizeBufferWriter(buf) pa.ipc.write_tensor(tensor, stream) self.plasma_client.seal(object_id) # Read the arrow object. [tensor] = self.plasma_client.get_buffers([object_id]) reader = pa.BufferReader(tensor) array = pa.ipc.read_tensor(reader).to_numpy() # Assert that they are equal. np.testing.assert_equal(data, array) @pytest.mark.pandas def test_store_pandas_dataframe(self): import pandas as pd import pyarrow.plasma as plasma d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']), 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])} df = pd.DataFrame(d) # Write the DataFrame. record_batch = pa.RecordBatch.from_pandas(df) # Determine the size. s = pa.MockOutputStream() stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema) stream_writer.write_batch(record_batch) data_size = s.size() object_id = plasma.ObjectID(np.random.bytes(20)) buf = self.plasma_client.create(object_id, data_size) stream = pa.FixedSizeBufferWriter(buf) stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) stream_writer.write_batch(record_batch) self.plasma_client.seal(object_id) # Read the DataFrame. [data] = self.plasma_client.get_buffers([object_id]) reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) result = reader.read_next_batch().to_pandas() pd.testing.assert_frame_equal(df, result) def test_pickle_object_ids(self): # This can be used for sharing object IDs between processes. import pickle object_id = random_object_id() data = pickle.dumps(object_id) object_id2 = pickle.loads(data) assert object_id == object_id2 def test_store_full(self): # The store is started with 1GB, so make sure that create throws an # exception when it is full. def assert_create_raises_plasma_full(unit_test, size): partial_size = np.random.randint(size) try: _, memory_buffer, _ = create_object(unit_test.plasma_client, partial_size, size - partial_size) # TODO(pcm): More specific error here. except pa.lib.ArrowException: pass else: # For some reason the above didn't throw an exception, so fail. assert False PERCENT = DEFAULT_PLASMA_STORE_MEMORY // 100 # Create a list to keep some of the buffers in scope. memory_buffers = [] _, memory_buffer, _ = create_object(self.plasma_client, 50 * PERCENT) memory_buffers.append(memory_buffer) # Remaining space is 50%. Make sure that we can't create an # object of size 50% + 1, but we can create one of size 20%. assert_create_raises_plasma_full( self, 50 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) del memory_buffer _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) del memory_buffer assert_create_raises_plasma_full( self, 50 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) memory_buffers.append(memory_buffer) # Remaining space is 30%. assert_create_raises_plasma_full( self, 30 * PERCENT + SMALL_OBJECT_SIZE) _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT) memory_buffers.append(memory_buffer) # Remaining space is 20%. assert_create_raises_plasma_full( self, 20 * PERCENT + SMALL_OBJECT_SIZE) def test_contains(self): fake_object_ids = [random_object_id() for _ in range(100)] real_object_ids = [random_object_id() for _ in range(100)] for object_id in real_object_ids: assert self.plasma_client.contains(object_id) is False self.plasma_client.create(object_id, 100) self.plasma_client.seal(object_id) assert self.plasma_client.contains(object_id) for object_id in fake_object_ids: assert not self.plasma_client.contains(object_id) for object_id in real_object_ids: assert self.plasma_client.contains(object_id) def test_hash(self): # Check the hash of an object that doesn't exist. object_id1 = random_object_id() try: self.plasma_client.hash(object_id1) # TODO(pcm): Introduce a more specific error type here except pa.lib.ArrowException: pass else: assert False length = 1000 # Create a random object, and check that the hash function always # returns the same value. metadata = generate_metadata(length) memory_buffer = np.frombuffer(self.plasma_client.create(object_id1, length, metadata), dtype="uint8") for i in range(length): memory_buffer[i] = i % 256 self.plasma_client.seal(object_id1) assert (self.plasma_client.hash(object_id1) == self.plasma_client.hash(object_id1)) # Create a second object with the same value as the first, and check # that their hashes are equal. object_id2 = random_object_id() memory_buffer = np.frombuffer(self.plasma_client.create(object_id2, length, metadata), dtype="uint8") for i in range(length): memory_buffer[i] = i % 256 self.plasma_client.seal(object_id2) assert (self.plasma_client.hash(object_id1) == self.plasma_client.hash(object_id2)) # Create a third object with a different value from the first two, and # check that its hash is different. object_id3 = random_object_id() metadata = generate_metadata(length) memory_buffer = np.frombuffer(self.plasma_client.create(object_id3, length, metadata), dtype="uint8") for i in range(length): memory_buffer[i] = (i + 1) % 256 self.plasma_client.seal(object_id3) assert (self.plasma_client.hash(object_id1) != self.plasma_client.hash(object_id3)) # Create a fourth object with the same value as the third, but # different metadata. Check that its hash is different from any of the # previous three. object_id4 = random_object_id() metadata4 = generate_metadata(length) memory_buffer = np.frombuffer(self.plasma_client.create(object_id4, length, metadata4), dtype="uint8") for i in range(length): memory_buffer[i] = (i + 1) % 256 self.plasma_client.seal(object_id4) assert (self.plasma_client.hash(object_id1) != self.plasma_client.hash(object_id4)) assert (self.plasma_client.hash(object_id3) != self.plasma_client.hash(object_id4)) def test_many_hashes(self): hashes = [] length = 2 ** 10 for i in range(256): object_id = random_object_id() memory_buffer = np.frombuffer(self.plasma_client.create(object_id, length), dtype="uint8") for j in range(length): memory_buffer[j] = i self.plasma_client.seal(object_id) hashes.append(self.plasma_client.hash(object_id)) # Create objects of varying length. Each pair has two bits different. for i in range(length): object_id = random_object_id() memory_buffer = np.frombuffer(self.plasma_client.create(object_id, length), dtype="uint8") for j in range(length): memory_buffer[j] = 0 memory_buffer[i] = 1 self.plasma_client.seal(object_id) hashes.append(self.plasma_client.hash(object_id)) # Create objects of varying length, all with value 0. for i in range(length): object_id = random_object_id() memory_buffer = np.frombuffer(self.plasma_client.create(object_id, i), dtype="uint8") for j in range(i): memory_buffer[j] = 0 self.plasma_client.seal(object_id) hashes.append(self.plasma_client.hash(object_id)) # Check that all hashes were unique. assert len(set(hashes)) == 256 + length + length # def test_individual_delete(self): # length = 100 # # Create an object id string. # object_id = random_object_id() # # Create a random metadata string. # metadata = generate_metadata(100) # # Create a new buffer and write to it. # memory_buffer = self.plasma_client.create(object_id, length, # metadata) # for i in range(length): # memory_buffer[i] = chr(i % 256) # # Seal the object. # self.plasma_client.seal(object_id) # # Check that the object is present. # assert self.plasma_client.contains(object_id) # # Delete the object. # self.plasma_client.delete(object_id) # # Make sure the object is no longer present. # self.assertFalse(self.plasma_client.contains(object_id)) # # def test_delete(self): # # Create some objects. # object_ids = [random_object_id() for _ in range(100)] # for object_id in object_ids: # length = 100 # # Create a random metadata string. # metadata = generate_metadata(100) # # Create a new buffer and write to it. # memory_buffer = self.plasma_client.create(object_id, length, # metadata) # for i in range(length): # memory_buffer[i] = chr(i % 256) # # Seal the object. # self.plasma_client.seal(object_id) # # Check that the object is present. # assert self.plasma_client.contains(object_id) # # # Delete the objects and make sure they are no longer present. # for object_id in object_ids: # # Delete the object. # self.plasma_client.delete(object_id) # # Make sure the object is no longer present. # self.assertFalse(self.plasma_client.contains(object_id)) def test_illegal_functionality(self): # Create an object id string. object_id = random_object_id() # Create a new buffer and write to it. length = 1000 memory_buffer = self.plasma_client.create(object_id, length) # Make sure we cannot access memory out of bounds. with pytest.raises(Exception): memory_buffer[length] # Seal the object. self.plasma_client.seal(object_id) # This test is commented out because it currently fails. # # Make sure the object is ready only now. # def illegal_assignment(): # memory_buffer[0] = chr(0) # with pytest.raises(Exception): # illegal_assignment() # Get the object. memory_buffer = self.plasma_client.get_buffers([object_id])[0] # Make sure the object is read only. def illegal_assignment(): memory_buffer[0] = chr(0) with pytest.raises(Exception): illegal_assignment() def test_evict(self): client = self.plasma_client2 object_id1 = random_object_id() b1 = client.create(object_id1, 1000) client.seal(object_id1) del b1 assert client.evict(1) == 1000 object_id2 = random_object_id() object_id3 = random_object_id() b2 = client.create(object_id2, 999) b3 = client.create(object_id3, 998) client.seal(object_id3) del b3 assert client.evict(1000) == 998 object_id4 = random_object_id() b4 = client.create(object_id4, 997) client.seal(object_id4) del b4 client.seal(object_id2) del b2 assert client.evict(1) == 997 assert client.evict(1) == 999 object_id5 = random_object_id() object_id6 = random_object_id() object_id7 = random_object_id() b5 = client.create(object_id5, 996) b6 = client.create(object_id6, 995) b7 = client.create(object_id7, 994) client.seal(object_id5) client.seal(object_id6) client.seal(object_id7) del b5 del b6 del b7 assert client.evict(2000) == 996 + 995 + 994 # Mitigate valgrind-induced slowness SUBSCRIBE_TEST_SIZES = ([1, 10, 100, 1000] if USE_VALGRIND else [1, 10, 100, 1000, 10000]) def test_subscribe(self): # Subscribe to notifications from the Plasma Store. self.plasma_client.subscribe() for i in self.SUBSCRIBE_TEST_SIZES: object_ids = [random_object_id() for _ in range(i)] metadata_sizes = [np.random.randint(1000) for _ in range(i)] data_sizes = [np.random.randint(1000) for _ in range(i)] for j in range(i): self.plasma_client.create( object_ids[j], data_sizes[j], metadata=bytearray(np.random.bytes(metadata_sizes[j]))) self.plasma_client.seal(object_ids[j]) # Check that we received notifications for all of the objects. for j in range(i): notification_info = self.plasma_client.get_next_notification() recv_objid, recv_dsize, recv_msize = notification_info assert object_ids[j] == recv_objid assert data_sizes[j] == recv_dsize assert metadata_sizes[j] == recv_msize def test_subscribe_socket(self): # Subscribe to notifications from the Plasma Store. self.plasma_client.subscribe() rsock = self.plasma_client.get_notification_socket() for i in self.SUBSCRIBE_TEST_SIZES: # Get notification from socket. object_ids = [random_object_id() for _ in range(i)] metadata_sizes = [np.random.randint(1000) for _ in range(i)] data_sizes = [np.random.randint(1000) for _ in range(i)] for j in range(i): self.plasma_client.create( object_ids[j], data_sizes[j], metadata=bytearray(np.random.bytes(metadata_sizes[j]))) self.plasma_client.seal(object_ids[j]) # Check that we received notifications for all of the objects. for j in range(i): # Assume the plasma store will not be full, # so we always get the data size instead of -1. msg_len, = struct.unpack('L', rsock.recv(8)) content = rsock.recv(msg_len) recv_objids, recv_dsizes, recv_msizes = ( self.plasma_client.decode_notifications(content)) assert object_ids[j] == recv_objids[0] assert data_sizes[j] == recv_dsizes[0] assert metadata_sizes[j] == recv_msizes[0] def test_subscribe_deletions(self): # Subscribe to notifications from the Plasma Store. We use # plasma_client2 to make sure that all used objects will get evicted # properly. self.plasma_client2.subscribe() for i in self.SUBSCRIBE_TEST_SIZES: object_ids = [random_object_id() for _ in range(i)] # Add 1 to the sizes to make sure we have nonzero object sizes. metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)] data_sizes = [np.random.randint(1000) + 1 for _ in range(i)] for j in range(i): x = self.plasma_client2.create( object_ids[j], data_sizes[j], metadata=bytearray(np.random.bytes(metadata_sizes[j]))) self.plasma_client2.seal(object_ids[j]) del x # Check that we received notifications for creating all of the # objects. for j in range(i): notification_info = self.plasma_client2.get_next_notification() recv_objid, recv_dsize, recv_msize = notification_info assert object_ids[j] == recv_objid assert data_sizes[j] == recv_dsize assert metadata_sizes[j] == recv_msize # Check that we receive notifications for deleting all objects, as # we evict them. for j in range(i): assert (self.plasma_client2.evict(1) == data_sizes[j] + metadata_sizes[j]) notification_info = self.plasma_client2.get_next_notification() recv_objid, recv_dsize, recv_msize = notification_info assert object_ids[j] == recv_objid assert -1 == recv_dsize assert -1 == recv_msize # Test multiple deletion notifications. The first 9 object IDs have # size 0, and the last has a nonzero size. When Plasma evicts 1 byte, # it will evict all objects, so we should receive deletion # notifications for each. num_object_ids = 10 object_ids = [random_object_id() for _ in range(num_object_ids)] metadata_sizes = [0] * (num_object_ids - 1) data_sizes = [0] * (num_object_ids - 1) metadata_sizes.append(np.random.randint(1000)) data_sizes.append(np.random.randint(1000)) for i in range(num_object_ids): x = self.plasma_client2.create( object_ids[i], data_sizes[i], metadata=bytearray(np.random.bytes(metadata_sizes[i]))) self.plasma_client2.seal(object_ids[i]) del x for i in range(num_object_ids): notification_info = self.plasma_client2.get_next_notification() recv_objid, recv_dsize, recv_msize = notification_info assert object_ids[i] == recv_objid assert data_sizes[i] == recv_dsize assert metadata_sizes[i] == recv_msize assert (self.plasma_client2.evict(1) == data_sizes[-1] + metadata_sizes[-1]) for i in range(num_object_ids): notification_info = self.plasma_client2.get_next_notification() recv_objid, recv_dsize, recv_msize = notification_info assert object_ids[i] == recv_objid assert -1 == recv_dsize assert -1 == recv_msize def test_use_full_memory(self): # Fill the object store up with a large number of small objects and let # them go out of scope. for _ in range(100): create_object( self.plasma_client2, np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0) # Create large objects that require the full object store size, and # verify that they fit. for _ in range(2): create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) # Verify that an object that is too large does not fit. # Also verifies that the right error is thrown, and does not # create the object ID prematurely. object_id = random_object_id() for i in range(3): with pytest.raises(pa.plasma.PlasmaStoreFull): self.plasma_client2.create( object_id, DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE) @staticmethod def _client_blocked_in_get(plasma_store_name, object_id): import pyarrow.plasma as plasma client = plasma.connect(plasma_store_name) # Try to get an object ID that doesn't exist. This should block. client.get([object_id]) def test_client_death_during_get(self): object_id = random_object_id() p = multiprocessing.Process(target=self._client_blocked_in_get, args=(self.plasma_store_name, object_id)) p.start() # Make sure the process is running. time.sleep(0.2) assert p.is_alive() # Kill the client process. p.terminate() # Wait a little for the store to process the disconnect event. time.sleep(0.1) # Create the object. self.plasma_client.put(1, object_id=object_id) # Check that the store is still alive. This will raise an exception if # the store is dead. self.plasma_client.contains(random_object_id()) @staticmethod def _client_get_multiple(plasma_store_name, object_ids): import pyarrow.plasma as plasma client = plasma.connect(plasma_store_name) # Try to get an object ID that doesn't exist. This should block. client.get(object_ids) def test_client_getting_multiple_objects(self): object_ids = [random_object_id() for _ in range(10)] p = multiprocessing.Process(target=self._client_get_multiple, args=(self.plasma_store_name, object_ids)) p.start() # Make sure the process is running. time.sleep(0.2) assert p.is_alive() # Create the objects one by one. for object_id in object_ids: self.plasma_client.put(1, object_id=object_id) # Check that the store is still alive. This will raise an exception if # the store is dead. self.plasma_client.contains(random_object_id()) # Make sure that the blocked client finishes. start_time = time.time() while True: if time.time() - start_time > 5: raise Exception("Timing out while waiting for blocked client " "to finish.") if not p.is_alive(): break @pytest.mark.plasma class TestEvictionToExternalStore: def setup_method(self, test_method): import pyarrow.plasma as plasma # Start Plasma store. self.plasma_store_ctx = plasma.start_plasma_store( plasma_store_memory=1000 * 1024, use_valgrind=USE_VALGRIND, external_store=EXTERNAL_STORE) self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() # Connect to Plasma. self.plasma_client = plasma.connect(self.plasma_store_name) def teardown_method(self, test_method): try: # Check that the Plasma store is still alive. assert self.p.poll() is None self.p.send_signal(signal.SIGTERM) self.p.wait(timeout=5) finally: self.plasma_store_ctx.__exit__(None, None, None) def test_eviction(self): client = self.plasma_client object_ids = [random_object_id() for _ in range(0, 20)] data = b'x' * 100 * 1024 metadata = b'' for i in range(0, 20): # Test for object non-existence. assert not client.contains(object_ids[i]) # Create and seal the object. client.create_and_seal(object_ids[i], data, metadata) # Test that the client can get the object. assert client.contains(object_ids[i]) for i in range(0, 20): # Since we are accessing objects sequentially, every object we # access would be a cache "miss" owing to LRU eviction. # Try and access the object from the plasma store first, and then # try external store on failure. This should succeed to fetch the # object. However, it may evict the next few objects. [result] = client.get_buffers([object_ids[i]]) assert result.to_pybytes() == data # Make sure we still cannot fetch objects that do not exist [result] = client.get_buffers([random_object_id()], timeout_ms=100) assert result is None @pytest.mark.plasma def test_object_id_size(): import pyarrow.plasma as plasma with pytest.raises(ValueError): plasma.ObjectID("hello") plasma.ObjectID(20 * b"0") @pytest.mark.plasma def test_object_id_equality_operators(): import pyarrow.plasma as plasma oid1 = plasma.ObjectID(20 * b'0') oid2 = plasma.ObjectID(20 * b'0') oid3 = plasma.ObjectID(19 * b'0' + b'1') assert oid1 == oid2 assert oid2 != oid3 assert oid1 != 'foo' @pytest.mark.xfail(reason="often fails on travis") @pytest.mark.skipif(not os.path.exists("/mnt/hugepages"), reason="requires hugepage support") def test_use_huge_pages(): import pyarrow.plasma as plasma with plasma.start_plasma_store( plasma_store_memory=2*10**9, plasma_directory="/mnt/hugepages", use_hugepages=True) as (plasma_store_name, p): plasma_client = plasma.connect(plasma_store_name) create_object(plasma_client, 10**8) # This is checking to make sure plasma_clients cannot be destroyed # before all the PlasmaBuffers that have handles to them are # destroyed, see ARROW-2448. @pytest.mark.plasma def test_plasma_client_sharing(): import pyarrow.plasma as plasma with plasma.start_plasma_store( plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ as (plasma_store_name, p): plasma_client = plasma.connect(plasma_store_name) object_id = plasma_client.put(np.zeros(3)) buf = plasma_client.get(object_id) del plasma_client assert (buf == np.zeros(3)).all() del buf # This segfaulted pre ARROW-2448. @pytest.mark.plasma def test_plasma_list(): import pyarrow.plasma as plasma with plasma.start_plasma_store( plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ as (plasma_store_name, p): plasma_client = plasma.connect(plasma_store_name) # Test sizes u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False) l1 = plasma_client.list() assert l1[u]["data_size"] == 11 assert l1[u]["metadata_size"] == 7 # Test ref_count v = plasma_client.put(np.zeros(3)) # Ref count has already been released # XXX flaky test, disabled (ARROW-3344) # l2 = plasma_client.list() # assert l2[v]["ref_count"] == 0 a = plasma_client.get(v) l3 = plasma_client.list() assert l3[v]["ref_count"] == 1 del a # Test state w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False) l4 = plasma_client.list() assert l4[w]["state"] == "created" plasma_client.seal(w) l5 = plasma_client.list() assert l5[w]["state"] == "sealed" # Test timestamps slack = 1.5 # seconds t1 = time.time() x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False) t2 = time.time() l6 = plasma_client.list() assert t1 - slack <= l6[x]["create_time"] <= t2 + slack time.sleep(2.0) t3 = time.time() plasma_client.seal(x) t4 = time.time() l7 = plasma_client.list() assert t3 - t2 - slack <= l7[x]["construct_duration"] assert l7[x]["construct_duration"] <= t4 - t1 + slack @pytest.mark.plasma def test_object_id_randomness(): cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())" first_object_id = subprocess.check_output([sys.executable, "-c", cmd]) second_object_id = subprocess.check_output([sys.executable, "-c", cmd]) assert first_object_id != second_object_id @pytest.mark.plasma def test_store_capacity(): import pyarrow.plasma as plasma with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p): plasma_client = plasma.connect(name) assert plasma_client.store_capacity() == 10000