# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """ UNTESTED: read_message """ import sys import sysconfig import pytest import pyarrow as pa import numpy as np cuda = pytest.importorskip("pyarrow.cuda") platform = sysconfig.get_platform() # TODO: enable ppc64 when Arrow C++ supports IPC in ppc64 systems: has_ipc_support = platform == 'linux-x86_64' # or 'ppc64' in platform cuda_ipc = pytest.mark.skipif( not has_ipc_support, reason='CUDA IPC not supported in platform `%s`' % (platform)) global_context = None # for flake8 global_context1 = None # for flake8 def setup_module(module): module.global_context = cuda.Context(0) module.global_context1 = cuda.Context(cuda.Context.get_num_devices() - 1) def teardown_module(module): del module.global_context def test_Context(): assert cuda.Context.get_num_devices() > 0 assert global_context.device_number == 0 assert global_context1.device_number == cuda.Context.get_num_devices() - 1 with pytest.raises(ValueError, match=("device_number argument must " "be non-negative less than")): cuda.Context(cuda.Context.get_num_devices()) @pytest.mark.parametrize("size", [0, 1, 1000]) def test_manage_allocate_free_host(size): buf = cuda.new_host_buffer(size) arr = np.frombuffer(buf, dtype=np.uint8) arr[size//4:3*size//4] = 1 arr_cp = arr.copy() arr2 = np.frombuffer(buf, dtype=np.uint8) np.testing.assert_equal(arr2, arr_cp) assert buf.size == size def test_context_allocate_del(): bytes_allocated = global_context.bytes_allocated cudabuf = global_context.new_buffer(128) assert global_context.bytes_allocated == bytes_allocated + 128 del cudabuf assert global_context.bytes_allocated == bytes_allocated def make_random_buffer(size, target='host'): """Return a host or device buffer with random data. """ if target == 'host': assert size >= 0 buf = pa.allocate_buffer(size) assert buf.size == size arr = np.frombuffer(buf, dtype=np.uint8) assert arr.size == size arr[:] = np.random.randint(low=1, high=255, size=size, dtype=np.uint8) assert arr.sum() > 0 or size == 0 arr_ = np.frombuffer(buf, dtype=np.uint8) np.testing.assert_equal(arr, arr_) return arr, buf elif target == 'device': arr, buf = make_random_buffer(size, target='host') dbuf = global_context.new_buffer(size) assert dbuf.size == size dbuf.copy_from_host(buf, position=0, nbytes=size) return arr, dbuf raise ValueError('invalid target value') @pytest.mark.parametrize("size", [0, 1, 1000]) def test_context_device_buffer(size): # Creating device buffer from host buffer; arr, buf = make_random_buffer(size) cudabuf = global_context.buffer_from_data(buf) assert cudabuf.size == size arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) # CudaBuffer does not support buffer protocol with pytest.raises(BufferError): memoryview(cudabuf) # Creating device buffer from array: cudabuf = global_context.buffer_from_data(arr) assert cudabuf.size == size arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) # Creating device buffer from bytes: cudabuf = global_context.buffer_from_data(arr.tobytes()) assert cudabuf.size == size arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) # Creating a device buffer from another device buffer, view: cudabuf2 = cudabuf.slice(0, cudabuf.size) assert cudabuf2.size == size arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) if size > 1: cudabuf2.copy_from_host(arr[size//2:]) arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(np.concatenate((arr[size//2:], arr[size//2:])), arr3) cudabuf2.copy_from_host(arr[:size//2]) # restoring arr # Creating a device buffer from another device buffer, copy: cudabuf2 = global_context.buffer_from_data(cudabuf) assert cudabuf2.size == size arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) cudabuf2.copy_from_host(arr[size//2:]) arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr3) # Slice of a device buffer cudabuf2 = cudabuf.slice(0, cudabuf.size+10) assert cudabuf2.size == size arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) cudabuf2 = cudabuf.slice(size//4, size+10) assert cudabuf2.size == size - size//4 arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[size//4:], arr2) # Creating a device buffer from a slice of host buffer soffset = size//4 ssize = 2*size//4 cudabuf = global_context.buffer_from_data(buf, offset=soffset, size=ssize) assert cudabuf.size == ssize arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) cudabuf = global_context.buffer_from_data(buf.slice(offset=soffset, length=ssize)) assert cudabuf.size == ssize arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) # Creating a device buffer from a slice of an array cudabuf = global_context.buffer_from_data(arr, offset=soffset, size=ssize) assert cudabuf.size == ssize arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) cudabuf = global_context.buffer_from_data(arr[soffset:soffset+ssize]) assert cudabuf.size == ssize arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) # Creating a device buffer from a slice of bytes cudabuf = global_context.buffer_from_data(arr.tobytes(), offset=soffset, size=ssize) assert cudabuf.size == ssize arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[soffset:soffset + ssize], arr2) # Creating a device buffer from size cudabuf = global_context.new_buffer(size) assert cudabuf.size == size # Creating device buffer from a slice of another device buffer: cudabuf = global_context.buffer_from_data(arr) cudabuf2 = cudabuf.slice(soffset, ssize) assert cudabuf2.size == ssize arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[soffset:soffset+ssize], arr2) # Creating device buffer from HostBuffer buf = cuda.new_host_buffer(size) arr_ = np.frombuffer(buf, dtype=np.uint8) arr_[:] = arr cudabuf = global_context.buffer_from_data(buf) assert cudabuf.size == size arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) # Creating device buffer from HostBuffer slice cudabuf = global_context.buffer_from_data(buf, offset=soffset, size=ssize) assert cudabuf.size == ssize arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[soffset:soffset+ssize], arr2) cudabuf = global_context.buffer_from_data( buf.slice(offset=soffset, length=ssize)) assert cudabuf.size == ssize arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr[soffset:soffset+ssize], arr2) @pytest.mark.parametrize("size", [0, 1, 1000]) def test_context_from_object(size): ctx = global_context arr, cbuf = make_random_buffer(size, target='device') dtype = arr.dtype # Creating device buffer from a CUDA host buffer hbuf = cuda.new_host_buffer(size * arr.dtype.itemsize) np.frombuffer(hbuf, dtype=dtype)[:] = arr cbuf2 = ctx.buffer_from_object(hbuf) assert cbuf2.size == cbuf.size arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype) np.testing.assert_equal(arr, arr2) # Creating device buffer from a device buffer cbuf2 = ctx.buffer_from_object(cbuf2) assert cbuf2.size == cbuf.size arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype) np.testing.assert_equal(arr, arr2) # Trying to create a device buffer from a Buffer with pytest.raises(pa.ArrowTypeError, match=('buffer is not backed by a CudaBuffer')): ctx.buffer_from_object(pa.py_buffer(b"123")) # Trying to create a device buffer from numpy.array with pytest.raises(pa.ArrowTypeError, match=("cannot create device buffer view from " ".* \'numpy.ndarray\'")): ctx.buffer_from_object(np.array([1, 2, 3])) def test_foreign_buffer(): ctx = global_context dtype = np.dtype(np.uint8) size = 10 hbuf = cuda.new_host_buffer(size * dtype.itemsize) # test host buffer memory reference counting rc = sys.getrefcount(hbuf) fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf) assert sys.getrefcount(hbuf) == rc + 1 del fbuf assert sys.getrefcount(hbuf) == rc # test postponed deallocation of host buffer memory fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf) del hbuf fbuf.copy_to_host() # test deallocating the host buffer memory making it inaccessible hbuf = cuda.new_host_buffer(size * dtype.itemsize) fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size) del hbuf with pytest.raises(pa.ArrowIOError, match=('Cuda error ')): fbuf.copy_to_host() @pytest.mark.parametrize("size", [0, 1, 1000]) def test_CudaBuffer(size): arr, buf = make_random_buffer(size) assert arr.tobytes() == buf.to_pybytes() cbuf = global_context.buffer_from_data(buf) assert cbuf.size == size assert not cbuf.is_cpu assert arr.tobytes() == cbuf.to_pybytes() if size > 0: assert cbuf.address > 0 for i in range(size): assert cbuf[i] == arr[i] for s in [ slice(None), slice(size//4, size//2), ]: assert cbuf[s].to_pybytes() == arr[s].tobytes() sbuf = cbuf.slice(size//4, size//2) assert sbuf.parent == cbuf with pytest.raises(TypeError, match="Do not call CudaBuffer's constructor directly"): cuda.CudaBuffer() @pytest.mark.parametrize("size", [0, 1, 1000]) def test_HostBuffer(size): arr, buf = make_random_buffer(size) assert arr.tobytes() == buf.to_pybytes() hbuf = cuda.new_host_buffer(size) np.frombuffer(hbuf, dtype=np.uint8)[:] = arr assert hbuf.size == size assert hbuf.is_cpu assert arr.tobytes() == hbuf.to_pybytes() for i in range(size): assert hbuf[i] == arr[i] for s in [ slice(None), slice(size//4, size//2), ]: assert hbuf[s].to_pybytes() == arr[s].tobytes() sbuf = hbuf.slice(size//4, size//2) assert sbuf.parent == hbuf del hbuf with pytest.raises(TypeError, match="Do not call HostBuffer's constructor directly"): cuda.HostBuffer() @pytest.mark.parametrize("size", [0, 1, 1000]) def test_copy_from_to_host(size): # Create a buffer in host containing range(size) buf = pa.allocate_buffer(size, resizable=True) # in host assert isinstance(buf, pa.Buffer) assert not isinstance(buf, cuda.CudaBuffer) arr = np.frombuffer(buf, dtype=np.uint8) assert arr.size == size arr[:] = range(size) arr_ = np.frombuffer(buf, dtype=np.uint8) np.testing.assert_equal(arr, arr_) device_buffer = global_context.new_buffer(size) assert isinstance(device_buffer, cuda.CudaBuffer) assert isinstance(device_buffer, pa.Buffer) assert device_buffer.size == size assert not device_buffer.is_cpu device_buffer.copy_from_host(buf, position=0, nbytes=size) buf2 = device_buffer.copy_to_host(position=0, nbytes=size) arr2 = np.frombuffer(buf2, dtype=np.uint8) np.testing.assert_equal(arr, arr2) @pytest.mark.parametrize("size", [0, 1, 1000]) def test_copy_to_host(size): arr, dbuf = make_random_buffer(size, target='device') buf = dbuf.copy_to_host() assert buf.is_cpu np.testing.assert_equal(arr, np.frombuffer(buf, dtype=np.uint8)) buf = dbuf.copy_to_host(position=size//4) assert buf.is_cpu np.testing.assert_equal(arr[size//4:], np.frombuffer(buf, dtype=np.uint8)) buf = dbuf.copy_to_host(position=size//4, nbytes=size//8) assert buf.is_cpu np.testing.assert_equal(arr[size//4:size//4+size//8], np.frombuffer(buf, dtype=np.uint8)) buf = dbuf.copy_to_host(position=size//4, nbytes=0) assert buf.is_cpu assert buf.size == 0 for (position, nbytes) in [ (size+2, -1), (-2, -1), (size+1, 0), (-3, 0), ]: with pytest.raises(ValueError, match='position argument is out-of-range'): dbuf.copy_to_host(position=position, nbytes=nbytes) for (position, nbytes) in [ (0, size+1), (size//2, (size+1)//2+1), (size, 1) ]: with pytest.raises(ValueError, match=('requested more to copy than' ' available from device buffer')): dbuf.copy_to_host(position=position, nbytes=nbytes) buf = pa.allocate_buffer(size//4) dbuf.copy_to_host(buf=buf) np.testing.assert_equal(arr[:size//4], np.frombuffer(buf, dtype=np.uint8)) if size < 12: return dbuf.copy_to_host(buf=buf, position=12) np.testing.assert_equal(arr[12:12+size//4], np.frombuffer(buf, dtype=np.uint8)) dbuf.copy_to_host(buf=buf, nbytes=12) np.testing.assert_equal(arr[:12], np.frombuffer(buf, dtype=np.uint8)[:12]) dbuf.copy_to_host(buf=buf, nbytes=12, position=6) np.testing.assert_equal(arr[6:6+12], np.frombuffer(buf, dtype=np.uint8)[:12]) for (position, nbytes) in [ (0, size+10), (10, size-5), (0, size//2), (size//4, size//4+1) ]: with pytest.raises(ValueError, match=('requested copy does not ' 'fit into host buffer')): dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes) @pytest.mark.parametrize("dest_ctx", ['same', 'another']) @pytest.mark.parametrize("size", [0, 1, 1000]) def test_copy_from_device(dest_ctx, size): arr, buf = make_random_buffer(size=size, target='device') lst = arr.tolist() if dest_ctx == 'another': dest_ctx = global_context1 if buf.context.device_number == dest_ctx.device_number: pytest.skip("not a multi-GPU system") else: dest_ctx = buf.context dbuf = dest_ctx.new_buffer(size) def put(*args, **kwargs): dbuf.copy_from_device(buf, *args, **kwargs) rbuf = dbuf.copy_to_host() return np.frombuffer(rbuf, dtype=np.uint8).tolist() assert put() == lst if size > 4: assert put(position=size//4) == lst[:size//4]+lst[:-size//4] assert put() == lst assert put(position=1, nbytes=size//2) == \ lst[:1] + lst[:size//2] + lst[-(size-size//2-1):] for (position, nbytes) in [ (size+2, -1), (-2, -1), (size+1, 0), (-3, 0), ]: with pytest.raises(ValueError, match='position argument is out-of-range'): put(position=position, nbytes=nbytes) for (position, nbytes) in [ (0, size+1), ]: with pytest.raises(ValueError, match=('requested more to copy than' ' available from device buffer')): put(position=position, nbytes=nbytes) if size < 4: return for (position, nbytes) in [ (size//2, (size+1)//2+1) ]: with pytest.raises(ValueError, match=('requested more to copy than' ' available in device buffer')): put(position=position, nbytes=nbytes) @pytest.mark.parametrize("size", [0, 1, 1000]) def test_copy_from_host(size): arr, buf = make_random_buffer(size=size, target='host') lst = arr.tolist() dbuf = global_context.new_buffer(size) def put(*args, **kwargs): dbuf.copy_from_host(buf, *args, **kwargs) rbuf = dbuf.copy_to_host() return np.frombuffer(rbuf, dtype=np.uint8).tolist() assert put() == lst if size > 4: assert put(position=size//4) == lst[:size//4]+lst[:-size//4] assert put() == lst assert put(position=1, nbytes=size//2) == \ lst[:1] + lst[:size//2] + lst[-(size-size//2-1):] for (position, nbytes) in [ (size+2, -1), (-2, -1), (size+1, 0), (-3, 0), ]: with pytest.raises(ValueError, match='position argument is out-of-range'): put(position=position, nbytes=nbytes) for (position, nbytes) in [ (0, size+1), ]: with pytest.raises(ValueError, match=('requested more to copy than' ' available from host buffer')): put(position=position, nbytes=nbytes) if size < 4: return for (position, nbytes) in [ (size//2, (size+1)//2+1) ]: with pytest.raises(ValueError, match=('requested more to copy than' ' available in device buffer')): put(position=position, nbytes=nbytes) def test_BufferWriter(): def allocate(size): cbuf = global_context.new_buffer(size) writer = cuda.BufferWriter(cbuf) return cbuf, writer def test_writes(total_size, chunksize, buffer_size=0): cbuf, writer = allocate(total_size) arr, buf = make_random_buffer(size=total_size, target='host') if buffer_size > 0: writer.buffer_size = buffer_size position = writer.tell() assert position == 0 writer.write(buf.slice(length=chunksize)) assert writer.tell() == chunksize writer.seek(0) position = writer.tell() assert position == 0 while position < total_size: bytes_to_write = min(chunksize, total_size - position) writer.write(buf.slice(offset=position, length=bytes_to_write)) position += bytes_to_write writer.flush() assert cbuf.size == total_size cbuf.context.synchronize() buf2 = cbuf.copy_to_host() cbuf.context.synchronize() assert buf2.size == total_size arr2 = np.frombuffer(buf2, dtype=np.uint8) np.testing.assert_equal(arr, arr2) total_size, chunk_size = 1 << 16, 1000 test_writes(total_size, chunk_size) test_writes(total_size, chunk_size, total_size // 16) cbuf, writer = allocate(100) writer.write(np.arange(100, dtype=np.uint8)) writer.writeat(50, np.arange(25, dtype=np.uint8)) writer.write(np.arange(25, dtype=np.uint8)) writer.flush() arr = np.frombuffer(cbuf.copy_to_host(), np.uint8) np.testing.assert_equal(arr[:50], np.arange(50, dtype=np.uint8)) np.testing.assert_equal(arr[50:75], np.arange(25, dtype=np.uint8)) np.testing.assert_equal(arr[75:], np.arange(25, dtype=np.uint8)) def test_BufferWriter_edge_cases(): # edge cases, see cuda-test.cc for more information: size = 1000 cbuf = global_context.new_buffer(size) writer = cuda.BufferWriter(cbuf) arr, buf = make_random_buffer(size=size, target='host') assert writer.buffer_size == 0 writer.buffer_size = 100 assert writer.buffer_size == 100 writer.write(buf.slice(length=0)) assert writer.tell() == 0 writer.write(buf.slice(length=10)) writer.buffer_size = 200 assert writer.buffer_size == 200 assert writer.num_bytes_buffered == 0 writer.write(buf.slice(offset=10, length=300)) assert writer.num_bytes_buffered == 0 writer.write(buf.slice(offset=310, length=200)) assert writer.num_bytes_buffered == 0 writer.write(buf.slice(offset=510, length=390)) writer.write(buf.slice(offset=900, length=100)) writer.flush() buf2 = cbuf.copy_to_host() assert buf2.size == size arr2 = np.frombuffer(buf2, dtype=np.uint8) np.testing.assert_equal(arr, arr2) def test_BufferReader(): size = 1000 arr, cbuf = make_random_buffer(size=size, target='device') reader = cuda.BufferReader(cbuf) reader.seek(950) assert reader.tell() == 950 data = reader.read(100) assert len(data) == 50 assert reader.tell() == 1000 reader.seek(925) arr2 = np.zeros(100, dtype=np.uint8) n = reader.readinto(arr2) assert n == 75 assert reader.tell() == 1000 np.testing.assert_equal(arr[925:], arr2[:75]) reader.seek(0) assert reader.tell() == 0 buf2 = reader.read_buffer() arr2 = np.frombuffer(buf2.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) def test_BufferReader_zero_size(): arr, cbuf = make_random_buffer(size=0, target='device') reader = cuda.BufferReader(cbuf) reader.seek(0) data = reader.read() assert len(data) == 0 assert reader.tell() == 0 buf2 = reader.read_buffer() arr2 = np.frombuffer(buf2.copy_to_host(), dtype=np.uint8) np.testing.assert_equal(arr, arr2) def make_recordbatch(length): schema = pa.schema([pa.field('f0', pa.int16()), pa.field('f1', pa.int16())]) a0 = pa.array(np.random.randint(0, 255, size=length, dtype=np.int16)) a1 = pa.array(np.random.randint(0, 255, size=length, dtype=np.int16)) batch = pa.record_batch([a0, a1], schema=schema) return batch def test_batch_serialize(): batch = make_recordbatch(10) hbuf = batch.serialize() cbuf = cuda.serialize_record_batch(batch, global_context) # Test that read_record_batch works properly cbatch = cuda.read_record_batch(cbuf, batch.schema) assert isinstance(cbatch, pa.RecordBatch) assert batch.schema == cbatch.schema assert batch.num_columns == cbatch.num_columns assert batch.num_rows == cbatch.num_rows # Deserialize CUDA-serialized batch on host buf = cbuf.copy_to_host() assert hbuf.equals(buf) batch2 = pa.ipc.read_record_batch(buf, batch.schema) assert hbuf.equals(batch2.serialize()) assert batch.num_columns == batch2.num_columns assert batch.num_rows == batch2.num_rows assert batch.column(0).equals(batch2.column(0)) assert batch.equals(batch2) def make_table(): a0 = pa.array([0, 1, 42, None], type=pa.int16()) a1 = pa.array([[0, 1], [2], [], None], type=pa.list_(pa.int32())) a2 = pa.array([("ab", True), ("cde", False), (None, None), None], type=pa.struct([("strs", pa.utf8()), ("bools", pa.bool_())])) # Dictionaries are validated on the IPC read path, but that can produce # issues for GPU-located dictionaries. Check that they work fine. a3 = pa.DictionaryArray.from_arrays( indices=[0, 1, 1, None], dictionary=pa.array(['foo', 'bar'])) a4 = pa.DictionaryArray.from_arrays( indices=[2, 1, 2, None], dictionary=a1) a5 = pa.DictionaryArray.from_arrays( indices=[2, 1, 0, None], dictionary=a2) arrays = [a0, a1, a2, a3, a4, a5] schema = pa.schema([('f{}'.format(i), arr.type) for i, arr in enumerate(arrays)]) batch = pa.record_batch(arrays, schema=schema) table = pa.Table.from_batches([batch]) return table def make_table_cuda(): htable = make_table() # Serialize the host table to bytes sink = pa.BufferOutputStream() with pa.ipc.new_stream(sink, htable.schema) as out: out.write_table(htable) hbuf = pa.py_buffer(sink.getvalue().to_pybytes()) # Copy the host bytes to a device buffer dbuf = global_context.new_buffer(len(hbuf)) dbuf.copy_from_host(hbuf, nbytes=len(hbuf)) # Deserialize the device buffer into a Table dtable = pa.ipc.open_stream(cuda.BufferReader(dbuf)).read_all() return hbuf, htable, dbuf, dtable def test_table_deserialize(): # ARROW-9659: make sure that we can deserialize a GPU-located table # without crashing when initializing or validating the underlying arrays. hbuf, htable, dbuf, dtable = make_table_cuda() # Assert basic fields the same between host and device tables assert htable.schema == dtable.schema assert htable.num_rows == dtable.num_rows assert htable.num_columns == dtable.num_columns # Assert byte-level equality assert hbuf.equals(dbuf.copy_to_host()) # Copy DtoH and assert the tables are still equivalent assert htable.equals(pa.ipc.open_stream( dbuf.copy_to_host() ).read_all()) def test_create_table_with_device_buffers(): # ARROW-11872: make sure that we can create an Arrow Table from # GPU-located Arrays without crashing. hbuf, htable, dbuf, dtable = make_table_cuda() # Construct a new Table from the device Table dtable2 = pa.Table.from_arrays(dtable.columns, dtable.column_names) # Assert basic fields the same between host and device tables assert htable.schema == dtable2.schema assert htable.num_rows == dtable2.num_rows assert htable.num_columns == dtable2.num_columns # Assert byte-level equality assert hbuf.equals(dbuf.copy_to_host()) # Copy DtoH and assert the tables are still equivalent assert htable.equals(pa.ipc.open_stream( dbuf.copy_to_host() ).read_all()) def other_process_for_test_IPC(handle_buffer, expected_arr): other_context = pa.cuda.Context(0) ipc_handle = pa.cuda.IpcMemHandle.from_buffer(handle_buffer) ipc_buf = other_context.open_ipc_buffer(ipc_handle) ipc_buf.context.synchronize() buf = ipc_buf.copy_to_host() assert buf.size == expected_arr.size, repr((buf.size, expected_arr.size)) arr = np.frombuffer(buf, dtype=expected_arr.dtype) np.testing.assert_equal(arr, expected_arr) @cuda_ipc @pytest.mark.parametrize("size", [0, 1, 1000]) def test_IPC(size): import multiprocessing ctx = multiprocessing.get_context('spawn') arr, cbuf = make_random_buffer(size=size, target='device') ipc_handle = cbuf.export_for_ipc() handle_buffer = ipc_handle.serialize() p = ctx.Process(target=other_process_for_test_IPC, args=(handle_buffer, arr)) p.start() p.join() assert p.exitcode == 0