From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/arrow/python/examples/flight/client.py | 189 +++++++++++++++++++ src/arrow/python/examples/flight/middleware.py | 167 +++++++++++++++++ src/arrow/python/examples/flight/server.py | 154 ++++++++++++++++ .../examples/minimal_build/Dockerfile.fedora | 31 ++++ .../examples/minimal_build/Dockerfile.ubuntu | 38 ++++ src/arrow/python/examples/minimal_build/README.md | 73 ++++++++ .../python/examples/minimal_build/build_conda.sh | 119 ++++++++++++ .../python/examples/minimal_build/build_venv.sh | 84 +++++++++ .../python/examples/plasma/sorting/multimerge.pyx | 102 +++++++++++ src/arrow/python/examples/plasma/sorting/setup.py | 27 +++ .../python/examples/plasma/sorting/sort_df.py | 203 +++++++++++++++++++++ 11 files changed, 1187 insertions(+) create mode 100644 src/arrow/python/examples/flight/client.py create mode 100644 src/arrow/python/examples/flight/middleware.py create mode 100644 src/arrow/python/examples/flight/server.py create mode 100644 src/arrow/python/examples/minimal_build/Dockerfile.fedora create mode 100644 src/arrow/python/examples/minimal_build/Dockerfile.ubuntu create mode 100644 src/arrow/python/examples/minimal_build/README.md create mode 100755 src/arrow/python/examples/minimal_build/build_conda.sh create mode 100755 src/arrow/python/examples/minimal_build/build_venv.sh create mode 100644 src/arrow/python/examples/plasma/sorting/multimerge.pyx create mode 100644 src/arrow/python/examples/plasma/sorting/setup.py create mode 100644 src/arrow/python/examples/plasma/sorting/sort_df.py (limited to 'src/arrow/python/examples') diff --git a/src/arrow/python/examples/flight/client.py b/src/arrow/python/examples/flight/client.py new file mode 100644 index 000000000..ed6ce54ce --- /dev/null +++ b/src/arrow/python/examples/flight/client.py @@ -0,0 +1,189 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""An example Flight CLI client.""" + +import argparse +import sys + +import pyarrow +import pyarrow.flight +import pyarrow.csv as csv + + +def list_flights(args, client, connection_args={}): + print('Flights\n=======') + for flight in client.list_flights(): + descriptor = flight.descriptor + if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH: + print("Path:", descriptor.path) + elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD: + print("Command:", descriptor.command) + else: + print("Unknown descriptor type") + + print("Total records:", end=" ") + if flight.total_records >= 0: + print(flight.total_records) + else: + print("Unknown") + + print("Total bytes:", end=" ") + if flight.total_bytes >= 0: + print(flight.total_bytes) + else: + print("Unknown") + + print("Number of endpoints:", len(flight.endpoints)) + print("Schema:") + print(flight.schema) + print('---') + + print('\nActions\n=======') + for action in client.list_actions(): + print("Type:", action.type) + print("Description:", action.description) + print('---') + + +def do_action(args, client, connection_args={}): + try: + buf = pyarrow.allocate_buffer(0) + action = pyarrow.flight.Action(args.action_type, buf) + print('Running action', args.action_type) + for result in client.do_action(action): + print("Got result", result.body.to_pybytes()) + except pyarrow.lib.ArrowIOError as e: + print("Error calling action:", e) + + +def push_data(args, client, connection_args={}): + print('File Name:', args.file) + my_table = csv.read_csv(args.file) + print('Table rows=', str(len(my_table))) + df = my_table.to_pandas() + print(df.head()) + writer, _ = client.do_put( + pyarrow.flight.FlightDescriptor.for_path(args.file), my_table.schema) + writer.write_table(my_table) + writer.close() + + +def get_flight(args, client, connection_args={}): + if args.path: + descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path) + else: + descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command) + + info = client.get_flight_info(descriptor) + for endpoint in info.endpoints: + print('Ticket:', endpoint.ticket) + for location in endpoint.locations: + print(location) + get_client = pyarrow.flight.FlightClient(location, + **connection_args) + reader = get_client.do_get(endpoint.ticket) + df = reader.read_pandas() + print(df) + + +def _add_common_arguments(parser): + parser.add_argument('--tls', action='store_true', + help='Enable transport-level security') + parser.add_argument('--tls-roots', default=None, + help='Path to trusted TLS certificate(s)') + parser.add_argument("--mtls", nargs=2, default=None, + metavar=('CERTFILE', 'KEYFILE'), + help="Enable transport-level security") + parser.add_argument('host', type=str, + help="Address or hostname to connect to") + + +def main(): + parser = argparse.ArgumentParser() + subcommands = parser.add_subparsers() + + cmd_list = subcommands.add_parser('list') + cmd_list.set_defaults(action='list') + _add_common_arguments(cmd_list) + cmd_list.add_argument('-l', '--list', action='store_true', + help="Print more details.") + + cmd_do = subcommands.add_parser('do') + cmd_do.set_defaults(action='do') + _add_common_arguments(cmd_do) + cmd_do.add_argument('action_type', type=str, + help="The action type to run.") + + cmd_put = subcommands.add_parser('put') + cmd_put.set_defaults(action='put') + _add_common_arguments(cmd_put) + cmd_put.add_argument('file', type=str, + help="CSV file to upload.") + + cmd_get = subcommands.add_parser('get') + cmd_get.set_defaults(action='get') + _add_common_arguments(cmd_get) + cmd_get_descriptor = cmd_get.add_mutually_exclusive_group(required=True) + cmd_get_descriptor.add_argument('-p', '--path', type=str, action='append', + help="The path for the descriptor.") + cmd_get_descriptor.add_argument('-c', '--command', type=str, + help="The command for the descriptor.") + + args = parser.parse_args() + if not hasattr(args, 'action'): + parser.print_help() + sys.exit(1) + + commands = { + 'list': list_flights, + 'do': do_action, + 'get': get_flight, + 'put': push_data, + } + host, port = args.host.split(':') + port = int(port) + scheme = "grpc+tcp" + connection_args = {} + if args.tls: + scheme = "grpc+tls" + if args.tls_roots: + with open(args.tls_roots, "rb") as root_certs: + connection_args["tls_root_certs"] = root_certs.read() + if args.mtls: + with open(args.mtls[0], "rb") as cert_file: + tls_cert_chain = cert_file.read() + with open(args.mtls[1], "rb") as key_file: + tls_private_key = key_file.read() + connection_args["cert_chain"] = tls_cert_chain + connection_args["private_key"] = tls_private_key + client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}", + **connection_args) + while True: + try: + action = pyarrow.flight.Action("healthcheck", b"") + options = pyarrow.flight.FlightCallOptions(timeout=1) + list(client.do_action(action, options=options)) + break + except pyarrow.ArrowIOError as e: + if "Deadline" in str(e): + print("Server is not ready, waiting...") + commands[args.action](args, client, connection_args) + + +if __name__ == '__main__': + main() diff --git a/src/arrow/python/examples/flight/middleware.py b/src/arrow/python/examples/flight/middleware.py new file mode 100644 index 000000000..2056bae1f --- /dev/null +++ b/src/arrow/python/examples/flight/middleware.py @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Example of invisibly propagating a request ID with middleware.""" + +import argparse +import sys +import threading +import uuid + +import pyarrow as pa +import pyarrow.flight as flight + + +class TraceContext: + _locals = threading.local() + _locals.trace_id = None + + @classmethod + def current_trace_id(cls): + if not getattr(cls._locals, "trace_id", None): + cls.set_trace_id(uuid.uuid4().hex) + return cls._locals.trace_id + + @classmethod + def set_trace_id(cls, trace_id): + cls._locals.trace_id = trace_id + + +TRACE_HEADER = "x-tracing-id" + + +class TracingServerMiddleware(flight.ServerMiddleware): + def __init__(self, trace_id): + self.trace_id = trace_id + + def sending_headers(self): + return { + TRACE_HEADER: self.trace_id, + } + + +class TracingServerMiddlewareFactory(flight.ServerMiddlewareFactory): + def start_call(self, info, headers): + print("Starting new call:", info) + if TRACE_HEADER in headers: + trace_id = headers[TRACE_HEADER][0] + print("Found trace header with value:", trace_id) + TraceContext.set_trace_id(trace_id) + return TracingServerMiddleware(TraceContext.current_trace_id()) + + +class TracingClientMiddleware(flight.ClientMiddleware): + def sending_headers(self): + print("Sending trace ID:", TraceContext.current_trace_id()) + return { + "x-tracing-id": TraceContext.current_trace_id(), + } + + def received_headers(self, headers): + if TRACE_HEADER in headers: + trace_id = headers[TRACE_HEADER][0] + print("Found trace header with value:", trace_id) + # Don't overwrite our trace ID + + +class TracingClientMiddlewareFactory(flight.ClientMiddlewareFactory): + def start_call(self, info): + print("Starting new call:", info) + return TracingClientMiddleware() + + +class FlightServer(flight.FlightServerBase): + def __init__(self, delegate, **kwargs): + super().__init__(**kwargs) + if delegate: + self.delegate = flight.connect( + delegate, + middleware=(TracingClientMiddlewareFactory(),)) + else: + self.delegate = None + + def list_actions(self, context): + return [ + ("get-trace-id", "Get the trace context ID."), + ] + + def do_action(self, context, action): + trace_middleware = context.get_middleware("trace") + if trace_middleware: + TraceContext.set_trace_id(trace_middleware.trace_id) + if action.type == "get-trace-id": + if self.delegate: + for result in self.delegate.do_action(action): + yield result + else: + trace_id = TraceContext.current_trace_id().encode("utf-8") + print("Returning trace ID:", trace_id) + buf = pa.py_buffer(trace_id) + yield pa.flight.Result(buf) + else: + raise KeyError(f"Unknown action {action.type!r}") + + +def main(): + parser = argparse.ArgumentParser() + + subparsers = parser.add_subparsers(dest="command") + client = subparsers.add_parser("client", help="Run the client.") + client.add_argument("server") + client.add_argument("--request-id", default=None) + + server = subparsers.add_parser("server", help="Run the server.") + server.add_argument( + "--listen", + required=True, + help="The location to listen on (example: grpc://localhost:5050)", + ) + server.add_argument( + "--delegate", + required=False, + default=None, + help=("A location to delegate to. That is, this server will " + "simply call the given server for the response. Demonstrates " + "propagation of the trace ID between servers."), + ) + + args = parser.parse_args() + if not getattr(args, "command"): + parser.print_help() + return 1 + + if args.command == "server": + server = FlightServer( + args.delegate, + location=args.listen, + middleware={"trace": TracingServerMiddlewareFactory()}) + server.serve() + elif args.command == "client": + client = flight.connect( + args.server, + middleware=(TracingClientMiddlewareFactory(),)) + if args.request_id: + TraceContext.set_trace_id(args.request_id) + else: + TraceContext.set_trace_id("client-chosen-id") + + for result in client.do_action(flight.Action("get-trace-id", b"")): + print(result.body.to_pybytes()) + + +if __name__ == "__main__": + sys.exit(main() or 0) diff --git a/src/arrow/python/examples/flight/server.py b/src/arrow/python/examples/flight/server.py new file mode 100644 index 000000000..7a6b6697e --- /dev/null +++ b/src/arrow/python/examples/flight/server.py @@ -0,0 +1,154 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""An example Flight Python server.""" + +import argparse +import ast +import threading +import time + +import pyarrow +import pyarrow.flight + + +class FlightServer(pyarrow.flight.FlightServerBase): + def __init__(self, host="localhost", location=None, + tls_certificates=None, verify_client=False, + root_certificates=None, auth_handler=None): + super(FlightServer, self).__init__( + location, auth_handler, tls_certificates, verify_client, + root_certificates) + self.flights = {} + self.host = host + self.tls_certificates = tls_certificates + + @classmethod + def descriptor_to_key(self, descriptor): + return (descriptor.descriptor_type.value, descriptor.command, + tuple(descriptor.path or tuple())) + + def _make_flight_info(self, key, descriptor, table): + if self.tls_certificates: + location = pyarrow.flight.Location.for_grpc_tls( + self.host, self.port) + else: + location = pyarrow.flight.Location.for_grpc_tcp( + self.host, self.port) + endpoints = [pyarrow.flight.FlightEndpoint(repr(key), [location]), ] + + mock_sink = pyarrow.MockOutputStream() + stream_writer = pyarrow.RecordBatchStreamWriter( + mock_sink, table.schema) + stream_writer.write_table(table) + stream_writer.close() + data_size = mock_sink.size() + + return pyarrow.flight.FlightInfo(table.schema, + descriptor, endpoints, + table.num_rows, data_size) + + def list_flights(self, context, criteria): + for key, table in self.flights.items(): + if key[1] is not None: + descriptor = \ + pyarrow.flight.FlightDescriptor.for_command(key[1]) + else: + descriptor = pyarrow.flight.FlightDescriptor.for_path(*key[2]) + + yield self._make_flight_info(key, descriptor, table) + + def get_flight_info(self, context, descriptor): + key = FlightServer.descriptor_to_key(descriptor) + if key in self.flights: + table = self.flights[key] + return self._make_flight_info(key, descriptor, table) + raise KeyError('Flight not found.') + + def do_put(self, context, descriptor, reader, writer): + key = FlightServer.descriptor_to_key(descriptor) + print(key) + self.flights[key] = reader.read_all() + print(self.flights[key]) + + def do_get(self, context, ticket): + key = ast.literal_eval(ticket.ticket.decode()) + if key not in self.flights: + return None + return pyarrow.flight.RecordBatchStream(self.flights[key]) + + def list_actions(self, context): + return [ + ("clear", "Clear the stored flights."), + ("shutdown", "Shut down this server."), + ] + + def do_action(self, context, action): + if action.type == "clear": + raise NotImplementedError( + "{} is not implemented.".format(action.type)) + elif action.type == "healthcheck": + pass + elif action.type == "shutdown": + yield pyarrow.flight.Result(pyarrow.py_buffer(b'Shutdown!')) + # Shut down on background thread to avoid blocking current + # request + threading.Thread(target=self._shutdown).start() + else: + raise KeyError("Unknown action {!r}".format(action.type)) + + def _shutdown(self): + """Shut down after a delay.""" + print("Server is shutting down...") + time.sleep(2) + self.shutdown() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--host", type=str, default="localhost", + help="Address or hostname to listen on") + parser.add_argument("--port", type=int, default=5005, + help="Port number to listen on") + parser.add_argument("--tls", nargs=2, default=None, + metavar=('CERTFILE', 'KEYFILE'), + help="Enable transport-level security") + parser.add_argument("--verify_client", type=bool, default=False, + help="enable mutual TLS and verify the client if True") + + args = parser.parse_args() + tls_certificates = [] + scheme = "grpc+tcp" + if args.tls: + scheme = "grpc+tls" + with open(args.tls[0], "rb") as cert_file: + tls_cert_chain = cert_file.read() + with open(args.tls[1], "rb") as key_file: + tls_private_key = key_file.read() + tls_certificates.append((tls_cert_chain, tls_private_key)) + + location = "{}://{}:{}".format(scheme, args.host, args.port) + + server = FlightServer(args.host, location, + tls_certificates=tls_certificates, + verify_client=args.verify_client) + print("Serving on", location) + server.serve() + + +if __name__ == '__main__': + main() diff --git a/src/arrow/python/examples/minimal_build/Dockerfile.fedora b/src/arrow/python/examples/minimal_build/Dockerfile.fedora new file mode 100644 index 000000000..7dc329193 --- /dev/null +++ b/src/arrow/python/examples/minimal_build/Dockerfile.fedora @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM fedora:31 + +RUN dnf update -y && \ + dnf install -y \ + autoconf \ + gcc \ + gcc-g++ \ + git \ + wget \ + make \ + cmake \ + ninja-build \ + python3-devel \ + python3-virtualenv \ No newline at end of file diff --git a/src/arrow/python/examples/minimal_build/Dockerfile.ubuntu b/src/arrow/python/examples/minimal_build/Dockerfile.ubuntu new file mode 100644 index 000000000..d7b84085e --- /dev/null +++ b/src/arrow/python/examples/minimal_build/Dockerfile.ubuntu @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +FROM ubuntu:bionic + +ENV DEBIAN_FRONTEND=noninteractive + +RUN apt-get update -y -q && \ + apt-get install -y -q --no-install-recommends \ + apt-transport-https \ + software-properties-common \ + wget && \ + apt-get install -y -q --no-install-recommends \ + build-essential \ + cmake \ + git \ + ninja-build \ + python3-dev \ + python3-pip && \ + apt-get clean && rm -rf /var/lib/apt/lists* + +RUN pip3 install wheel && \ + pip3 install -U setuptools && \ + pip3 install wheel virtualenv \ No newline at end of file diff --git a/src/arrow/python/examples/minimal_build/README.md b/src/arrow/python/examples/minimal_build/README.md new file mode 100644 index 000000000..9803e18aa --- /dev/null +++ b/src/arrow/python/examples/minimal_build/README.md @@ -0,0 +1,73 @@ + + +# Minimal Python source build on Linux + +This directory shows how to bootstrap a local build from source on Linux with +an eye toward maximum portability across different Linux distributions. This +may help for contributors debugging build issues caused by their local +environments. + +## Fedora 31 + +First, build the Docker image using: +``` +docker build -t arrow_fedora_minimal -f Dockerfile.fedora . +``` + +Then build PyArrow with conda or pip/virtualenv, respectively: +``` +# With pip/virtualenv +docker run --rm -t -i -v $PWD:/io arrow_fedora_minimal /io/build_venv.sh + +# With conda +docker run --rm -t -i -v $PWD:/io arrow_fedora_minimal /io/build_conda.sh +``` + +## Ubuntu 18.04 + +First, build the Docker image using: +``` +docker build -t arrow_ubuntu_minimal -f Dockerfile.ubuntu . +``` + +Then build PyArrow with conda or pip/virtualenv, respectively: +``` +# With pip/virtualenv +docker run --rm -t -i -v $PWD:/io arrow_ubuntu_minimal /io/build_venv.sh + +# With conda +docker run --rm -t -i -v $PWD:/io arrow_ubuntu_minimal /io/build_conda.sh +``` + +## Building on Fedora - Podman and SELinux + +In addition to using Podman instead of Docker, you need to specify `:Z` +for SELinux relabelling when binding a volume. + +First, build the image using: +``` +podman build -t arrow_fedora_minimal -f Dockerfile.fedora +``` + +Then build PyArrow with pip/virtualenv: +``` +# With pip/virtualenv +podman run --rm -i -v $PWD:/io:Z -t arrow_fedora_minimal /io/build_venv.sh +``` diff --git a/src/arrow/python/examples/minimal_build/build_conda.sh b/src/arrow/python/examples/minimal_build/build_conda.sh new file mode 100755 index 000000000..416cac0a7 --- /dev/null +++ b/src/arrow/python/examples/minimal_build/build_conda.sh @@ -0,0 +1,119 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +#---------------------------------------------------------------------- +# Change this to whatever makes sense for your system + +HOME= +MINICONDA=$HOME/miniconda-for-arrow +LIBRARY_INSTALL_DIR=$HOME/local-libs +CPP_BUILD_DIR=$HOME/arrow-cpp-build +ARROW_ROOT=/arrow +PYTHON=3.7 + +git clone https://github.com/apache/arrow.git /arrow + +#---------------------------------------------------------------------- +# Run these only once + +function setup_miniconda() { + MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh" + wget -O miniconda.sh $MINICONDA_URL + bash miniconda.sh -b -p $MINICONDA + rm -f miniconda.sh + LOCAL_PATH=$PATH + export PATH="$MINICONDA/bin:$PATH" + + conda update -y -q conda + conda config --set auto_update_conda false + conda info -a + + conda config --set show_channel_urls True + conda config --add channels https://repo.continuum.io/pkgs/free + conda config --add channels conda-forge + + conda create -y -n pyarrow-$PYTHON -c conda-forge \ + --file arrow/ci/conda_env_unix.txt \ + --file arrow/ci/conda_env_cpp.txt \ + --file arrow/ci/conda_env_python.txt \ + compilers \ + python=3.7 \ + pandas + + export PATH=$LOCAL_PATH +} + +setup_miniconda + +#---------------------------------------------------------------------- +# Activate conda in bash and activate conda environment + +. $MINICONDA/etc/profile.d/conda.sh +conda activate pyarrow-$PYTHON +export ARROW_HOME=$CONDA_PREFIX + +#---------------------------------------------------------------------- +# Build C++ library + +mkdir -p $CPP_BUILD_DIR +pushd $CPP_BUILD_DIR + +cmake -GNinja \ + -DCMAKE_BUILD_TYPE=DEBUG \ + -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ + -DCMAKE_INSTALL_LIBDIR=lib \ + -DARROW_FLIGHT=ON \ + -DARROW_WITH_BZ2=ON \ + -DARROW_WITH_ZLIB=ON \ + -DARROW_WITH_ZSTD=ON \ + -DARROW_WITH_LZ4=ON \ + -DARROW_WITH_SNAPPY=ON \ + -DARROW_WITH_BROTLI=ON \ + -DARROW_PARQUET=ON \ + -DARROW_PLASMA=ON \ + -DARROW_PYTHON=ON \ + $ARROW_ROOT/cpp + +ninja install + +popd + +#---------------------------------------------------------------------- +# Build and test Python library +pushd $ARROW_ROOT/python + +rm -rf build/ # remove any pesky pre-existing build directory + +export PYARROW_BUILD_TYPE=Debug +export PYARROW_CMAKE_GENERATOR=Ninja +export PYARROW_WITH_FLIGHT=1 +export PYARROW_WITH_PARQUET=1 + +# You can run either "develop" or "build_ext --inplace". Your pick + +# python setup.py build_ext --inplace +python setup.py develop + +# git submodules are required for unit tests +git submodule update --init +export PARQUET_TEST_DATA="$ARROW_ROOT/cpp/submodules/parquet-testing/data" +export ARROW_TEST_DATA="$ARROW_ROOT/testing/data" + +py.test pyarrow diff --git a/src/arrow/python/examples/minimal_build/build_venv.sh b/src/arrow/python/examples/minimal_build/build_venv.sh new file mode 100755 index 000000000..f92f4af3e --- /dev/null +++ b/src/arrow/python/examples/minimal_build/build_venv.sh @@ -0,0 +1,84 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +#---------------------------------------------------------------------- +# Change this to whatever makes sense for your system + +WORKDIR=${WORKDIR:-$HOME} +MINICONDA=$WORKDIR/miniconda-for-arrow +LIBRARY_INSTALL_DIR=$WORKDIR/local-libs +CPP_BUILD_DIR=$WORKDIR/arrow-cpp-build +ARROW_ROOT=$WORKDIR/arrow +export ARROW_HOME=$WORKDIR/dist +export LD_LIBRARY_PATH=$ARROW_HOME/lib:$LD_LIBRARY_PATH + +virtualenv $WORKDIR/venv +source $WORKDIR/venv/bin/activate + +git clone https://github.com/apache/arrow.git $ARROW_ROOT + +pip install -r $ARROW_ROOT/python/requirements-build.txt \ + -r $ARROW_ROOT/python/requirements-test.txt + +#---------------------------------------------------------------------- +# Build C++ library + +mkdir -p $CPP_BUILD_DIR +pushd $CPP_BUILD_DIR + +cmake -GNinja \ + -DCMAKE_BUILD_TYPE=DEBUG \ + -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ + -DCMAKE_INSTALL_LIBDIR=lib \ + -DARROW_WITH_BZ2=ON \ + -DARROW_WITH_ZLIB=ON \ + -DARROW_WITH_ZSTD=ON \ + -DARROW_WITH_LZ4=ON \ + -DARROW_WITH_SNAPPY=ON \ + -DARROW_WITH_BROTLI=ON \ + -DARROW_PARQUET=ON \ + -DARROW_PYTHON=ON \ + $ARROW_ROOT/cpp + +ninja install + +popd + +#---------------------------------------------------------------------- +# Build and test Python library +pushd $ARROW_ROOT/python + +rm -rf build/ # remove any pesky pre-existing build directory + +export PYARROW_BUILD_TYPE=Debug +export PYARROW_CMAKE_GENERATOR=Ninja +export PYARROW_WITH_PARQUET=1 + +# You can run either "develop" or "build_ext --inplace". Your pick + +# python setup.py build_ext --inplace +python setup.py develop + +# git submodules are required for unit tests +git submodule update --init +export PARQUET_TEST_DATA="$ARROW_ROOT/cpp/submodules/parquet-testing/data" +export ARROW_TEST_DATA="$ARROW_ROOT/testing/data" + +py.test pyarrow diff --git a/src/arrow/python/examples/plasma/sorting/multimerge.pyx b/src/arrow/python/examples/plasma/sorting/multimerge.pyx new file mode 100644 index 000000000..5e77fdfcc --- /dev/null +++ b/src/arrow/python/examples/plasma/sorting/multimerge.pyx @@ -0,0 +1,102 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from libc.stdint cimport uintptr_t +from libcpp.vector cimport vector +from libcpp.pair cimport pair + +import numpy as np + +cimport numpy as np + +cdef extern from "" namespace "std" nogil: + cdef cppclass priority_queue[T]: + priority_queue() except + + priority_queue(priority_queue&) except + + bint empty() + void pop() + void push(T&) + size_t size() + T& top() + + +def multimerge2d(*arrays): + """Merge a list of sorted 2d arrays into a sorted 2d array. + + This assumes C style ordering for both input and output arrays. For + each input array we have array[i,0] <= array[i+1,0] and for the output + array the same will hold. + + Ideally this code would be simpler and also support both C style + and Fortran style ordering. + """ + cdef int num_arrays = len(arrays) + assert num_arrays > 0 + + cdef int num_cols = arrays[0].shape[1] + + for i in range(num_arrays): + assert arrays[i].ndim == 2 + assert arrays[i].dtype == np.float64 + assert arrays[i].shape[1] == num_cols + assert not np.isfortran(arrays[i]) + + cdef vector[double*] data + + # The indices vector keeps track of the index of the next row to process in + # each array. + cdef vector[int] indices = num_arrays * [0] + + # The sizes vector stores the total number of elements that each array has. + cdef vector[int] sizes + + cdef priority_queue[pair[double, int]] queue + cdef pair[double, int] top + cdef int num_rows = sum([array.shape[0] for array in arrays]) + cdef np.ndarray[np.float64_t, ndim=2] result = np.zeros( + (num_rows, num_cols), dtype=np.float64) + cdef double* result_ptr = np.PyArray_DATA(result) + for i in range(num_arrays): + if arrays[i].size > 0: + sizes.push_back(arrays[i].size) + data.push_back( np.PyArray_DATA(arrays[i])) + queue.push(pair[double, int](-data[i][0], i)) + + cdef int curr_idx = 0 + cdef int j + cdef int col = 0 + + for j in range(num_rows): + top = queue.top() + for col in range(num_cols): + result_ptr[curr_idx + col] = ( + data[top.second][indices[top.second] + col]) + + indices[top.second] += num_cols + curr_idx += num_cols + + queue.pop() + if indices[top.second] < sizes[top.second]: + queue.push( + pair[double, int](-data[top.second][indices[top.second]], + top.second)) + + return result diff --git a/src/arrow/python/examples/plasma/sorting/setup.py b/src/arrow/python/examples/plasma/sorting/setup.py new file mode 100644 index 000000000..a5dfa5ae0 --- /dev/null +++ b/src/arrow/python/examples/plasma/sorting/setup.py @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import numpy as np +from setuptools import setup +from Cython.Build import cythonize + +setup( + name="multimerge", + extra_compile_args=["-O3", "-mtune=native", "-march=native"], + ext_modules=cythonize("multimerge.pyx"), + include_dirs=[np.get_include()], +) diff --git a/src/arrow/python/examples/plasma/sorting/sort_df.py b/src/arrow/python/examples/plasma/sorting/sort_df.py new file mode 100644 index 000000000..2a51759a6 --- /dev/null +++ b/src/arrow/python/examples/plasma/sorting/sort_df.py @@ -0,0 +1,203 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from multiprocessing import Pool +import numpy as np +import pandas as pd +import pyarrow as pa +import pyarrow.plasma as plasma +import subprocess +import time + +import multimerge + +# To run this example, you will first need to run "python setup.py install" in +# this directory to build the Cython module. +# +# You will only see speedups if you run this code on more data, this is just a +# small example that can run on a laptop. +# +# The values we used to get a speedup (on a m4.10xlarge instance on EC2) were +# object_store_size = 84 * 10 ** 9 +# num_cores = 20 +# num_rows = 10 ** 9 +# num_cols = 1 + +client = None +object_store_size = 2 * 10 ** 9 # 2 GB +num_cores = 8 +num_rows = 200000 +num_cols = 2 +column_names = [str(i) for i in range(num_cols)] +column_to_sort = column_names[0] + + +# Connect to clients +def connect(): + global client + client = plasma.connect('/tmp/store') + np.random.seed(int(time.time() * 10e7) % 10000000) + + +def put_df(df): + record_batch = pa.RecordBatch.from_pandas(df) + + # Get size of record batch and schema + mock_sink = pa.MockOutputStream() + stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) + stream_writer.write_batch(record_batch) + data_size = mock_sink.size() + + # Generate an ID and allocate a buffer in the object store for the + # serialized DataFrame + object_id = plasma.ObjectID(np.random.bytes(20)) + buf = client.create(object_id, data_size) + + # Write the serialized DataFrame to the object store + sink = pa.FixedSizeBufferWriter(buf) + stream_writer = pa.RecordBatchStreamWriter(sink, record_batch.schema) + stream_writer.write_batch(record_batch) + + # Seal the object + client.seal(object_id) + + return object_id + + +def get_dfs(object_ids): + """Retrieve dataframes from the object store given their object IDs.""" + buffers = client.get_buffers(object_ids) + return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas() + for buf in buffers] + + +def local_sort(object_id): + """Sort a partition of a dataframe.""" + # Get the dataframe from the object store. + [df] = get_dfs([object_id]) + # Sort the dataframe. + sorted_df = df.sort_values(by=column_to_sort) + # Get evenly spaced values from the dataframe. + indices = np.linspace(0, len(df) - 1, num=num_cores, dtype=np.int64) + # Put the sorted dataframe in the object store and return the corresponding + # object ID as well as the sampled values. + return put_df(sorted_df), sorted_df.as_matrix().take(indices) + + +def local_partitions(object_id_and_pivots): + """Take a sorted partition of a dataframe and split it into more pieces.""" + object_id, pivots = object_id_and_pivots + [df] = get_dfs([object_id]) + split_at = df[column_to_sort].searchsorted(pivots) + split_at = [0] + list(split_at) + [len(df)] + # Partition the sorted dataframe and put each partition into the object + # store. + return [put_df(df[i:j]) for i, j in zip(split_at[:-1], split_at[1:])] + + +def merge(object_ids): + """Merge a number of sorted dataframes into a single sorted dataframe.""" + dfs = get_dfs(object_ids) + + # In order to use our multimerge code, we have to convert the arrays from + # the Fortran format to the C format. + arrays = [np.ascontiguousarray(df.as_matrix()) for df in dfs] + for a in arrays: + assert a.dtype == np.float64 + assert not np.isfortran(a) + + # Filter out empty arrays. + arrays = [a for a in arrays if a.shape[0] > 0] + + if len(arrays) == 0: + return None + + resulting_array = multimerge.multimerge2d(*arrays) + merged_df2 = pd.DataFrame(resulting_array, columns=column_names) + + return put_df(merged_df2) + + +if __name__ == '__main__': + # Start the plasma store. + p = subprocess.Popen(['plasma_store', + '-s', '/tmp/store', + '-m', str(object_store_size)]) + + # Connect to the plasma store. + connect() + + # Connect the processes in the pool. + pool = Pool(initializer=connect, initargs=(), processes=num_cores) + + # Create a DataFrame from a numpy array. + df = pd.DataFrame(np.random.randn(num_rows, num_cols), + columns=column_names) + + partition_ids = [put_df(partition) for partition + in np.split(df, num_cores)] + + # Begin timing the parallel sort example. + parallel_sort_start = time.time() + + # Sort each partition and subsample them. The subsampled values will be + # used to create buckets. + sorted_df_ids, pivot_groups = list(zip(*pool.map(local_sort, + partition_ids))) + + # Choose the pivots. + all_pivots = np.concatenate(pivot_groups) + indices = np.linspace(0, len(all_pivots) - 1, num=num_cores, + dtype=np.int64) + pivots = np.take(np.sort(all_pivots), indices) + + # Break all of the sorted partitions into even smaller partitions. Group + # the object IDs from each bucket together. + results = list(zip(*pool.map(local_partitions, + zip(sorted_df_ids, + len(sorted_df_ids) * [pivots])))) + + # Merge each of the buckets and store the results in the object store. + object_ids = pool.map(merge, results) + + resulting_ids = [object_id for object_id in object_ids + if object_id is not None] + + # Stop timing the paralle sort example. + parallel_sort_end = time.time() + + print('Parallel sort took {} seconds.' + .format(parallel_sort_end - parallel_sort_start)) + + serial_sort_start = time.time() + + original_sorted_df = df.sort_values(by=column_to_sort) + + serial_sort_end = time.time() + + # Check that we sorted the DataFrame properly. + + sorted_dfs = get_dfs(resulting_ids) + sorted_df = pd.concat(sorted_dfs) + + print('Serial sort took {} seconds.' + .format(serial_sort_end - serial_sort_start)) + + assert np.allclose(sorted_df.values, original_sorted_df.values) + + # Kill the object store. + p.kill() -- cgit v1.2.3