1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
|
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import gc
import pyarrow as pa
try:
from pyarrow.cffi import ffi
except ImportError:
ffi = None
import pytest
try:
import pandas as pd
import pandas.testing as tm
except ImportError:
pd = tm = None
needs_cffi = pytest.mark.skipif(ffi is None,
reason="test needs cffi package installed")
assert_schema_released = pytest.raises(
ValueError, match="Cannot import released ArrowSchema")
assert_array_released = pytest.raises(
ValueError, match="Cannot import released ArrowArray")
assert_stream_released = pytest.raises(
ValueError, match="Cannot import released ArrowArrayStream")
class ParamExtType(pa.PyExtensionType):
def __init__(self, width):
self._width = width
pa.PyExtensionType.__init__(self, pa.binary(width))
@property
def width(self):
return self._width
def __reduce__(self):
return ParamExtType, (self.width,)
def make_schema():
return pa.schema([('ints', pa.list_(pa.int32()))],
metadata={b'key1': b'value1'})
def make_extension_schema():
return pa.schema([('ext', ParamExtType(3))],
metadata={b'key1': b'value1'})
def make_batch():
return pa.record_batch([[[1], [2, 42]]], make_schema())
def make_extension_batch():
schema = make_extension_schema()
ext_col = schema[0].type.wrap_array(pa.array([b"foo", b"bar"],
type=pa.binary(3)))
return pa.record_batch([ext_col], schema)
def make_batches():
schema = make_schema()
return [
pa.record_batch([[[1], [2, 42]]], schema),
pa.record_batch([[None, [], [5, 6]]], schema),
]
def make_serialized(schema, batches):
with pa.BufferOutputStream() as sink:
with pa.ipc.new_stream(sink, schema) as out:
for batch in batches:
out.write(batch)
return sink.getvalue()
@needs_cffi
def test_export_import_type():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
typ = pa.list_(pa.int32())
typ._export_to_c(ptr_schema)
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
del typ
assert pa.total_allocated_bytes() > old_allocated
typ_new = pa.DataType._import_from_c(ptr_schema)
assert typ_new == pa.list_(pa.int32())
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.DataType._import_from_c(ptr_schema)
# Invalid format string
pa.int32()._export_to_c(ptr_schema)
bad_format = ffi.new("char[]", b"zzz")
c_schema.format = bad_format
with pytest.raises(ValueError,
match="Invalid or unsupported format string"):
pa.DataType._import_from_c(ptr_schema)
# Now released
with assert_schema_released:
pa.DataType._import_from_c(ptr_schema)
@needs_cffi
def test_export_import_field():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
field = pa.field("test", pa.list_(pa.int32()), nullable=True)
field._export_to_c(ptr_schema)
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
del field
assert pa.total_allocated_bytes() > old_allocated
field_new = pa.Field._import_from_c(ptr_schema)
assert field_new == pa.field("test", pa.list_(pa.int32()), nullable=True)
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.Field._import_from_c(ptr_schema)
@needs_cffi
def test_export_import_array():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new("struct ArrowArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
# Type is known up front
typ = pa.list_(pa.int32())
arr = pa.array([[1], [2, 42]], type=typ)
py_value = arr.to_pylist()
arr._export_to_c(ptr_array)
assert pa.total_allocated_bytes() > old_allocated
# Delete recreate C++ object from exported pointer
del arr
arr_new = pa.Array._import_from_c(ptr_array, typ)
assert arr_new.to_pylist() == py_value
assert arr_new.type == pa.list_(pa.int32())
assert pa.total_allocated_bytes() > old_allocated
del arr_new, typ
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_array_released:
pa.Array._import_from_c(ptr_array, pa.list_(pa.int32()))
# Type is exported and imported at the same time
arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32()))
py_value = arr.to_pylist()
arr._export_to_c(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del arr
arr_new = pa.Array._import_from_c(ptr_array, ptr_schema)
assert arr_new.to_pylist() == py_value
assert arr_new.type == pa.list_(pa.int32())
assert pa.total_allocated_bytes() > old_allocated
del arr_new
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.Array._import_from_c(ptr_array, ptr_schema)
def check_export_import_schema(schema_factory):
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
schema_factory()._export_to_c(ptr_schema)
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
schema_new = pa.Schema._import_from_c(ptr_schema)
assert schema_new == schema_factory()
assert pa.total_allocated_bytes() == old_allocated
del schema_new
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.Schema._import_from_c(ptr_schema)
# Not a struct type
pa.int32()._export_to_c(ptr_schema)
with pytest.raises(ValueError,
match="ArrowSchema describes non-struct type"):
pa.Schema._import_from_c(ptr_schema)
# Now released
with assert_schema_released:
pa.Schema._import_from_c(ptr_schema)
@needs_cffi
def test_export_import_schema():
check_export_import_schema(make_schema)
@needs_cffi
def test_export_import_schema_with_extension():
check_export_import_schema(make_extension_schema)
def check_export_import_batch(batch_factory):
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new("struct ArrowArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
# Schema is known up front
batch = batch_factory()
schema = batch.schema
py_value = batch.to_pydict()
batch._export_to_c(ptr_array)
assert pa.total_allocated_bytes() > old_allocated
# Delete and recreate C++ object from exported pointer
del batch
batch_new = pa.RecordBatch._import_from_c(ptr_array, schema)
assert batch_new.to_pydict() == py_value
assert batch_new.schema == schema
assert pa.total_allocated_bytes() > old_allocated
del batch_new, schema
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_array_released:
pa.RecordBatch._import_from_c(ptr_array, make_schema())
# Type is exported and imported at the same time
batch = batch_factory()
py_value = batch.to_pydict()
batch._export_to_c(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del batch
batch_new = pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
assert batch_new.to_pydict() == py_value
assert batch_new.schema == batch_factory().schema
assert pa.total_allocated_bytes() > old_allocated
del batch_new
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
# Not a struct type
pa.int32()._export_to_c(ptr_schema)
batch_factory()._export_to_c(ptr_array)
with pytest.raises(ValueError,
match="ArrowSchema describes non-struct type"):
pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
# Now released
with assert_schema_released:
pa.RecordBatch._import_from_c(ptr_array, ptr_schema)
@needs_cffi
def test_export_import_batch():
check_export_import_batch(make_batch)
@needs_cffi
def test_export_import_batch_with_extension():
check_export_import_batch(make_extension_batch)
def _export_import_batch_reader(ptr_stream, reader_factory):
# Prepare input
batches = make_batches()
schema = batches[0].schema
reader = reader_factory(schema, batches)
reader._export_to_c(ptr_stream)
# Delete and recreate C++ object from exported pointer
del reader, batches
reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
assert reader_new.schema == schema
got_batches = list(reader_new)
del reader_new
assert got_batches == make_batches()
# Test read_pandas()
if pd is not None:
batches = make_batches()
schema = batches[0].schema
expected_df = pa.Table.from_batches(batches).to_pandas()
reader = reader_factory(schema, batches)
reader._export_to_c(ptr_stream)
del reader, batches
reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
got_df = reader_new.read_pandas()
del reader_new
tm.assert_frame_equal(expected_df, got_df)
def make_ipc_stream_reader(schema, batches):
return pa.ipc.open_stream(make_serialized(schema, batches))
def make_py_record_batch_reader(schema, batches):
return pa.ipc.RecordBatchReader.from_batches(schema, batches)
@needs_cffi
@pytest.mark.parametrize('reader_factory',
[make_ipc_stream_reader,
make_py_record_batch_reader])
def test_export_import_batch_reader(reader_factory):
c_stream = ffi.new("struct ArrowArrayStream*")
ptr_stream = int(ffi.cast("uintptr_t", c_stream))
gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()
_export_import_batch_reader(ptr_stream, reader_factory)
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_stream_released:
pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
@needs_cffi
def test_imported_batch_reader_error():
c_stream = ffi.new("struct ArrowArrayStream*")
ptr_stream = int(ffi.cast("uintptr_t", c_stream))
schema = pa.schema([('foo', pa.int32())])
batches = [pa.record_batch([[1, 2, 3]], schema=schema),
pa.record_batch([[4, 5, 6]], schema=schema)]
buf = make_serialized(schema, batches)
# Open a corrupt/incomplete stream and export it
reader = pa.ipc.open_stream(buf[:-16])
reader._export_to_c(ptr_stream)
del reader
reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
batch = reader_new.read_next_batch()
assert batch == batches[0]
with pytest.raises(OSError,
match="Expected to be able to read 16 bytes "
"for message body, got 8"):
reader_new.read_next_batch()
# Again, but call read_all()
reader = pa.ipc.open_stream(buf[:-16])
reader._export_to_c(ptr_stream)
del reader
reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream)
with pytest.raises(OSError,
match="Expected to be able to read 16 bytes "
"for message body, got 8"):
reader_new.read_all()
|