summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/benchmarks/streaming.py
blob: c0c63e6ef4c97bfc218797d6f8be2d913052b81d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import numpy as np
import pandas as pd
import pyarrow as pa

from . import common
from .common import KILOBYTE, MEGABYTE


def generate_chunks(total_size, nchunks, ncols, dtype=np.dtype('int64')):
    rowsize = total_size // nchunks // ncols
    assert rowsize % dtype.itemsize == 0

    def make_column(col, chunk):
        return np.frombuffer(common.get_random_bytes(
            rowsize, seed=col + 997 * chunk)).view(dtype)

    return [pd.DataFrame({
            'c' + str(col): make_column(col, chunk)
            for col in range(ncols)})
            for chunk in range(nchunks)]


class StreamReader(object):
    """
    Benchmark in-memory streaming to a Pandas dataframe.
    """
    total_size = 64 * MEGABYTE
    ncols = 8
    chunk_sizes = [16 * KILOBYTE, 256 * KILOBYTE, 8 * MEGABYTE]

    param_names = ['chunk_size']
    params = [chunk_sizes]

    def setup(self, chunk_size):
        # Note we're careful to stream different chunks instead of
        # streaming N times the same chunk, so that we avoid operating
        # entirely out of L1/L2.
        chunks = generate_chunks(self.total_size,
                                 nchunks=self.total_size // chunk_size,
                                 ncols=self.ncols)
        batches = [pa.RecordBatch.from_pandas(df)
                   for df in chunks]
        schema = batches[0].schema
        sink = pa.BufferOutputStream()
        stream_writer = pa.RecordBatchStreamWriter(sink, schema)
        for batch in batches:
            stream_writer.write_batch(batch)
        self.source = sink.getvalue()

    def time_read_to_dataframe(self, *args):
        reader = pa.RecordBatchStreamReader(self.source)
        table = reader.read_all()
        df = table.to_pandas()  # noqa