summaryrefslogtreecommitdiffstats
path: root/src/arrow/python/examples
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/python/examples
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/python/examples')
-rw-r--r--src/arrow/python/examples/flight/client.py189
-rw-r--r--src/arrow/python/examples/flight/middleware.py167
-rw-r--r--src/arrow/python/examples/flight/server.py154
-rw-r--r--src/arrow/python/examples/minimal_build/Dockerfile.fedora31
-rw-r--r--src/arrow/python/examples/minimal_build/Dockerfile.ubuntu38
-rw-r--r--src/arrow/python/examples/minimal_build/README.md73
-rwxr-xr-xsrc/arrow/python/examples/minimal_build/build_conda.sh119
-rwxr-xr-xsrc/arrow/python/examples/minimal_build/build_venv.sh84
-rw-r--r--src/arrow/python/examples/plasma/sorting/multimerge.pyx102
-rw-r--r--src/arrow/python/examples/plasma/sorting/setup.py27
-rw-r--r--src/arrow/python/examples/plasma/sorting/sort_df.py203
11 files changed, 1187 insertions, 0 deletions
diff --git a/src/arrow/python/examples/flight/client.py b/src/arrow/python/examples/flight/client.py
new file mode 100644
index 000000000..ed6ce54ce
--- /dev/null
+++ b/src/arrow/python/examples/flight/client.py
@@ -0,0 +1,189 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""An example Flight CLI client."""
+
+import argparse
+import sys
+
+import pyarrow
+import pyarrow.flight
+import pyarrow.csv as csv
+
+
+def list_flights(args, client, connection_args={}):
+ print('Flights\n=======')
+ for flight in client.list_flights():
+ descriptor = flight.descriptor
+ if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH:
+ print("Path:", descriptor.path)
+ elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD:
+ print("Command:", descriptor.command)
+ else:
+ print("Unknown descriptor type")
+
+ print("Total records:", end=" ")
+ if flight.total_records >= 0:
+ print(flight.total_records)
+ else:
+ print("Unknown")
+
+ print("Total bytes:", end=" ")
+ if flight.total_bytes >= 0:
+ print(flight.total_bytes)
+ else:
+ print("Unknown")
+
+ print("Number of endpoints:", len(flight.endpoints))
+ print("Schema:")
+ print(flight.schema)
+ print('---')
+
+ print('\nActions\n=======')
+ for action in client.list_actions():
+ print("Type:", action.type)
+ print("Description:", action.description)
+ print('---')
+
+
+def do_action(args, client, connection_args={}):
+ try:
+ buf = pyarrow.allocate_buffer(0)
+ action = pyarrow.flight.Action(args.action_type, buf)
+ print('Running action', args.action_type)
+ for result in client.do_action(action):
+ print("Got result", result.body.to_pybytes())
+ except pyarrow.lib.ArrowIOError as e:
+ print("Error calling action:", e)
+
+
+def push_data(args, client, connection_args={}):
+ print('File Name:', args.file)
+ my_table = csv.read_csv(args.file)
+ print('Table rows=', str(len(my_table)))
+ df = my_table.to_pandas()
+ print(df.head())
+ writer, _ = client.do_put(
+ pyarrow.flight.FlightDescriptor.for_path(args.file), my_table.schema)
+ writer.write_table(my_table)
+ writer.close()
+
+
+def get_flight(args, client, connection_args={}):
+ if args.path:
+ descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path)
+ else:
+ descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command)
+
+ info = client.get_flight_info(descriptor)
+ for endpoint in info.endpoints:
+ print('Ticket:', endpoint.ticket)
+ for location in endpoint.locations:
+ print(location)
+ get_client = pyarrow.flight.FlightClient(location,
+ **connection_args)
+ reader = get_client.do_get(endpoint.ticket)
+ df = reader.read_pandas()
+ print(df)
+
+
+def _add_common_arguments(parser):
+ parser.add_argument('--tls', action='store_true',
+ help='Enable transport-level security')
+ parser.add_argument('--tls-roots', default=None,
+ help='Path to trusted TLS certificate(s)')
+ parser.add_argument("--mtls", nargs=2, default=None,
+ metavar=('CERTFILE', 'KEYFILE'),
+ help="Enable transport-level security")
+ parser.add_argument('host', type=str,
+ help="Address or hostname to connect to")
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ subcommands = parser.add_subparsers()
+
+ cmd_list = subcommands.add_parser('list')
+ cmd_list.set_defaults(action='list')
+ _add_common_arguments(cmd_list)
+ cmd_list.add_argument('-l', '--list', action='store_true',
+ help="Print more details.")
+
+ cmd_do = subcommands.add_parser('do')
+ cmd_do.set_defaults(action='do')
+ _add_common_arguments(cmd_do)
+ cmd_do.add_argument('action_type', type=str,
+ help="The action type to run.")
+
+ cmd_put = subcommands.add_parser('put')
+ cmd_put.set_defaults(action='put')
+ _add_common_arguments(cmd_put)
+ cmd_put.add_argument('file', type=str,
+ help="CSV file to upload.")
+
+ cmd_get = subcommands.add_parser('get')
+ cmd_get.set_defaults(action='get')
+ _add_common_arguments(cmd_get)
+ cmd_get_descriptor = cmd_get.add_mutually_exclusive_group(required=True)
+ cmd_get_descriptor.add_argument('-p', '--path', type=str, action='append',
+ help="The path for the descriptor.")
+ cmd_get_descriptor.add_argument('-c', '--command', type=str,
+ help="The command for the descriptor.")
+
+ args = parser.parse_args()
+ if not hasattr(args, 'action'):
+ parser.print_help()
+ sys.exit(1)
+
+ commands = {
+ 'list': list_flights,
+ 'do': do_action,
+ 'get': get_flight,
+ 'put': push_data,
+ }
+ host, port = args.host.split(':')
+ port = int(port)
+ scheme = "grpc+tcp"
+ connection_args = {}
+ if args.tls:
+ scheme = "grpc+tls"
+ if args.tls_roots:
+ with open(args.tls_roots, "rb") as root_certs:
+ connection_args["tls_root_certs"] = root_certs.read()
+ if args.mtls:
+ with open(args.mtls[0], "rb") as cert_file:
+ tls_cert_chain = cert_file.read()
+ with open(args.mtls[1], "rb") as key_file:
+ tls_private_key = key_file.read()
+ connection_args["cert_chain"] = tls_cert_chain
+ connection_args["private_key"] = tls_private_key
+ client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}",
+ **connection_args)
+ while True:
+ try:
+ action = pyarrow.flight.Action("healthcheck", b"")
+ options = pyarrow.flight.FlightCallOptions(timeout=1)
+ list(client.do_action(action, options=options))
+ break
+ except pyarrow.ArrowIOError as e:
+ if "Deadline" in str(e):
+ print("Server is not ready, waiting...")
+ commands[args.action](args, client, connection_args)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/src/arrow/python/examples/flight/middleware.py b/src/arrow/python/examples/flight/middleware.py
new file mode 100644
index 000000000..2056bae1f
--- /dev/null
+++ b/src/arrow/python/examples/flight/middleware.py
@@ -0,0 +1,167 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example of invisibly propagating a request ID with middleware."""
+
+import argparse
+import sys
+import threading
+import uuid
+
+import pyarrow as pa
+import pyarrow.flight as flight
+
+
+class TraceContext:
+ _locals = threading.local()
+ _locals.trace_id = None
+
+ @classmethod
+ def current_trace_id(cls):
+ if not getattr(cls._locals, "trace_id", None):
+ cls.set_trace_id(uuid.uuid4().hex)
+ return cls._locals.trace_id
+
+ @classmethod
+ def set_trace_id(cls, trace_id):
+ cls._locals.trace_id = trace_id
+
+
+TRACE_HEADER = "x-tracing-id"
+
+
+class TracingServerMiddleware(flight.ServerMiddleware):
+ def __init__(self, trace_id):
+ self.trace_id = trace_id
+
+ def sending_headers(self):
+ return {
+ TRACE_HEADER: self.trace_id,
+ }
+
+
+class TracingServerMiddlewareFactory(flight.ServerMiddlewareFactory):
+ def start_call(self, info, headers):
+ print("Starting new call:", info)
+ if TRACE_HEADER in headers:
+ trace_id = headers[TRACE_HEADER][0]
+ print("Found trace header with value:", trace_id)
+ TraceContext.set_trace_id(trace_id)
+ return TracingServerMiddleware(TraceContext.current_trace_id())
+
+
+class TracingClientMiddleware(flight.ClientMiddleware):
+ def sending_headers(self):
+ print("Sending trace ID:", TraceContext.current_trace_id())
+ return {
+ "x-tracing-id": TraceContext.current_trace_id(),
+ }
+
+ def received_headers(self, headers):
+ if TRACE_HEADER in headers:
+ trace_id = headers[TRACE_HEADER][0]
+ print("Found trace header with value:", trace_id)
+ # Don't overwrite our trace ID
+
+
+class TracingClientMiddlewareFactory(flight.ClientMiddlewareFactory):
+ def start_call(self, info):
+ print("Starting new call:", info)
+ return TracingClientMiddleware()
+
+
+class FlightServer(flight.FlightServerBase):
+ def __init__(self, delegate, **kwargs):
+ super().__init__(**kwargs)
+ if delegate:
+ self.delegate = flight.connect(
+ delegate,
+ middleware=(TracingClientMiddlewareFactory(),))
+ else:
+ self.delegate = None
+
+ def list_actions(self, context):
+ return [
+ ("get-trace-id", "Get the trace context ID."),
+ ]
+
+ def do_action(self, context, action):
+ trace_middleware = context.get_middleware("trace")
+ if trace_middleware:
+ TraceContext.set_trace_id(trace_middleware.trace_id)
+ if action.type == "get-trace-id":
+ if self.delegate:
+ for result in self.delegate.do_action(action):
+ yield result
+ else:
+ trace_id = TraceContext.current_trace_id().encode("utf-8")
+ print("Returning trace ID:", trace_id)
+ buf = pa.py_buffer(trace_id)
+ yield pa.flight.Result(buf)
+ else:
+ raise KeyError(f"Unknown action {action.type!r}")
+
+
+def main():
+ parser = argparse.ArgumentParser()
+
+ subparsers = parser.add_subparsers(dest="command")
+ client = subparsers.add_parser("client", help="Run the client.")
+ client.add_argument("server")
+ client.add_argument("--request-id", default=None)
+
+ server = subparsers.add_parser("server", help="Run the server.")
+ server.add_argument(
+ "--listen",
+ required=True,
+ help="The location to listen on (example: grpc://localhost:5050)",
+ )
+ server.add_argument(
+ "--delegate",
+ required=False,
+ default=None,
+ help=("A location to delegate to. That is, this server will "
+ "simply call the given server for the response. Demonstrates "
+ "propagation of the trace ID between servers."),
+ )
+
+ args = parser.parse_args()
+ if not getattr(args, "command"):
+ parser.print_help()
+ return 1
+
+ if args.command == "server":
+ server = FlightServer(
+ args.delegate,
+ location=args.listen,
+ middleware={"trace": TracingServerMiddlewareFactory()})
+ server.serve()
+ elif args.command == "client":
+ client = flight.connect(
+ args.server,
+ middleware=(TracingClientMiddlewareFactory(),))
+ if args.request_id:
+ TraceContext.set_trace_id(args.request_id)
+ else:
+ TraceContext.set_trace_id("client-chosen-id")
+
+ for result in client.do_action(flight.Action("get-trace-id", b"")):
+ print(result.body.to_pybytes())
+
+
+if __name__ == "__main__":
+ sys.exit(main() or 0)
diff --git a/src/arrow/python/examples/flight/server.py b/src/arrow/python/examples/flight/server.py
new file mode 100644
index 000000000..7a6b6697e
--- /dev/null
+++ b/src/arrow/python/examples/flight/server.py
@@ -0,0 +1,154 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""An example Flight Python server."""
+
+import argparse
+import ast
+import threading
+import time
+
+import pyarrow
+import pyarrow.flight
+
+
+class FlightServer(pyarrow.flight.FlightServerBase):
+ def __init__(self, host="localhost", location=None,
+ tls_certificates=None, verify_client=False,
+ root_certificates=None, auth_handler=None):
+ super(FlightServer, self).__init__(
+ location, auth_handler, tls_certificates, verify_client,
+ root_certificates)
+ self.flights = {}
+ self.host = host
+ self.tls_certificates = tls_certificates
+
+ @classmethod
+ def descriptor_to_key(self, descriptor):
+ return (descriptor.descriptor_type.value, descriptor.command,
+ tuple(descriptor.path or tuple()))
+
+ def _make_flight_info(self, key, descriptor, table):
+ if self.tls_certificates:
+ location = pyarrow.flight.Location.for_grpc_tls(
+ self.host, self.port)
+ else:
+ location = pyarrow.flight.Location.for_grpc_tcp(
+ self.host, self.port)
+ endpoints = [pyarrow.flight.FlightEndpoint(repr(key), [location]), ]
+
+ mock_sink = pyarrow.MockOutputStream()
+ stream_writer = pyarrow.RecordBatchStreamWriter(
+ mock_sink, table.schema)
+ stream_writer.write_table(table)
+ stream_writer.close()
+ data_size = mock_sink.size()
+
+ return pyarrow.flight.FlightInfo(table.schema,
+ descriptor, endpoints,
+ table.num_rows, data_size)
+
+ def list_flights(self, context, criteria):
+ for key, table in self.flights.items():
+ if key[1] is not None:
+ descriptor = \
+ pyarrow.flight.FlightDescriptor.for_command(key[1])
+ else:
+ descriptor = pyarrow.flight.FlightDescriptor.for_path(*key[2])
+
+ yield self._make_flight_info(key, descriptor, table)
+
+ def get_flight_info(self, context, descriptor):
+ key = FlightServer.descriptor_to_key(descriptor)
+ if key in self.flights:
+ table = self.flights[key]
+ return self._make_flight_info(key, descriptor, table)
+ raise KeyError('Flight not found.')
+
+ def do_put(self, context, descriptor, reader, writer):
+ key = FlightServer.descriptor_to_key(descriptor)
+ print(key)
+ self.flights[key] = reader.read_all()
+ print(self.flights[key])
+
+ def do_get(self, context, ticket):
+ key = ast.literal_eval(ticket.ticket.decode())
+ if key not in self.flights:
+ return None
+ return pyarrow.flight.RecordBatchStream(self.flights[key])
+
+ def list_actions(self, context):
+ return [
+ ("clear", "Clear the stored flights."),
+ ("shutdown", "Shut down this server."),
+ ]
+
+ def do_action(self, context, action):
+ if action.type == "clear":
+ raise NotImplementedError(
+ "{} is not implemented.".format(action.type))
+ elif action.type == "healthcheck":
+ pass
+ elif action.type == "shutdown":
+ yield pyarrow.flight.Result(pyarrow.py_buffer(b'Shutdown!'))
+ # Shut down on background thread to avoid blocking current
+ # request
+ threading.Thread(target=self._shutdown).start()
+ else:
+ raise KeyError("Unknown action {!r}".format(action.type))
+
+ def _shutdown(self):
+ """Shut down after a delay."""
+ print("Server is shutting down...")
+ time.sleep(2)
+ self.shutdown()
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--host", type=str, default="localhost",
+ help="Address or hostname to listen on")
+ parser.add_argument("--port", type=int, default=5005,
+ help="Port number to listen on")
+ parser.add_argument("--tls", nargs=2, default=None,
+ metavar=('CERTFILE', 'KEYFILE'),
+ help="Enable transport-level security")
+ parser.add_argument("--verify_client", type=bool, default=False,
+ help="enable mutual TLS and verify the client if True")
+
+ args = parser.parse_args()
+ tls_certificates = []
+ scheme = "grpc+tcp"
+ if args.tls:
+ scheme = "grpc+tls"
+ with open(args.tls[0], "rb") as cert_file:
+ tls_cert_chain = cert_file.read()
+ with open(args.tls[1], "rb") as key_file:
+ tls_private_key = key_file.read()
+ tls_certificates.append((tls_cert_chain, tls_private_key))
+
+ location = "{}://{}:{}".format(scheme, args.host, args.port)
+
+ server = FlightServer(args.host, location,
+ tls_certificates=tls_certificates,
+ verify_client=args.verify_client)
+ print("Serving on", location)
+ server.serve()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/src/arrow/python/examples/minimal_build/Dockerfile.fedora b/src/arrow/python/examples/minimal_build/Dockerfile.fedora
new file mode 100644
index 000000000..7dc329193
--- /dev/null
+++ b/src/arrow/python/examples/minimal_build/Dockerfile.fedora
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+FROM fedora:31
+
+RUN dnf update -y && \
+ dnf install -y \
+ autoconf \
+ gcc \
+ gcc-g++ \
+ git \
+ wget \
+ make \
+ cmake \
+ ninja-build \
+ python3-devel \
+ python3-virtualenv \ No newline at end of file
diff --git a/src/arrow/python/examples/minimal_build/Dockerfile.ubuntu b/src/arrow/python/examples/minimal_build/Dockerfile.ubuntu
new file mode 100644
index 000000000..d7b84085e
--- /dev/null
+++ b/src/arrow/python/examples/minimal_build/Dockerfile.ubuntu
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+FROM ubuntu:bionic
+
+ENV DEBIAN_FRONTEND=noninteractive
+
+RUN apt-get update -y -q && \
+ apt-get install -y -q --no-install-recommends \
+ apt-transport-https \
+ software-properties-common \
+ wget && \
+ apt-get install -y -q --no-install-recommends \
+ build-essential \
+ cmake \
+ git \
+ ninja-build \
+ python3-dev \
+ python3-pip && \
+ apt-get clean && rm -rf /var/lib/apt/lists*
+
+RUN pip3 install wheel && \
+ pip3 install -U setuptools && \
+ pip3 install wheel virtualenv \ No newline at end of file
diff --git a/src/arrow/python/examples/minimal_build/README.md b/src/arrow/python/examples/minimal_build/README.md
new file mode 100644
index 000000000..9803e18aa
--- /dev/null
+++ b/src/arrow/python/examples/minimal_build/README.md
@@ -0,0 +1,73 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Minimal Python source build on Linux
+
+This directory shows how to bootstrap a local build from source on Linux with
+an eye toward maximum portability across different Linux distributions. This
+may help for contributors debugging build issues caused by their local
+environments.
+
+## Fedora 31
+
+First, build the Docker image using:
+```
+docker build -t arrow_fedora_minimal -f Dockerfile.fedora .
+```
+
+Then build PyArrow with conda or pip/virtualenv, respectively:
+```
+# With pip/virtualenv
+docker run --rm -t -i -v $PWD:/io arrow_fedora_minimal /io/build_venv.sh
+
+# With conda
+docker run --rm -t -i -v $PWD:/io arrow_fedora_minimal /io/build_conda.sh
+```
+
+## Ubuntu 18.04
+
+First, build the Docker image using:
+```
+docker build -t arrow_ubuntu_minimal -f Dockerfile.ubuntu .
+```
+
+Then build PyArrow with conda or pip/virtualenv, respectively:
+```
+# With pip/virtualenv
+docker run --rm -t -i -v $PWD:/io arrow_ubuntu_minimal /io/build_venv.sh
+
+# With conda
+docker run --rm -t -i -v $PWD:/io arrow_ubuntu_minimal /io/build_conda.sh
+```
+
+## Building on Fedora - Podman and SELinux
+
+In addition to using Podman instead of Docker, you need to specify `:Z`
+for SELinux relabelling when binding a volume.
+
+First, build the image using:
+```
+podman build -t arrow_fedora_minimal -f Dockerfile.fedora
+```
+
+Then build PyArrow with pip/virtualenv:
+```
+# With pip/virtualenv
+podman run --rm -i -v $PWD:/io:Z -t arrow_fedora_minimal /io/build_venv.sh
+```
diff --git a/src/arrow/python/examples/minimal_build/build_conda.sh b/src/arrow/python/examples/minimal_build/build_conda.sh
new file mode 100755
index 000000000..416cac0a7
--- /dev/null
+++ b/src/arrow/python/examples/minimal_build/build_conda.sh
@@ -0,0 +1,119 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+
+#----------------------------------------------------------------------
+# Change this to whatever makes sense for your system
+
+HOME=
+MINICONDA=$HOME/miniconda-for-arrow
+LIBRARY_INSTALL_DIR=$HOME/local-libs
+CPP_BUILD_DIR=$HOME/arrow-cpp-build
+ARROW_ROOT=/arrow
+PYTHON=3.7
+
+git clone https://github.com/apache/arrow.git /arrow
+
+#----------------------------------------------------------------------
+# Run these only once
+
+function setup_miniconda() {
+ MINICONDA_URL="https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh"
+ wget -O miniconda.sh $MINICONDA_URL
+ bash miniconda.sh -b -p $MINICONDA
+ rm -f miniconda.sh
+ LOCAL_PATH=$PATH
+ export PATH="$MINICONDA/bin:$PATH"
+
+ conda update -y -q conda
+ conda config --set auto_update_conda false
+ conda info -a
+
+ conda config --set show_channel_urls True
+ conda config --add channels https://repo.continuum.io/pkgs/free
+ conda config --add channels conda-forge
+
+ conda create -y -n pyarrow-$PYTHON -c conda-forge \
+ --file arrow/ci/conda_env_unix.txt \
+ --file arrow/ci/conda_env_cpp.txt \
+ --file arrow/ci/conda_env_python.txt \
+ compilers \
+ python=3.7 \
+ pandas
+
+ export PATH=$LOCAL_PATH
+}
+
+setup_miniconda
+
+#----------------------------------------------------------------------
+# Activate conda in bash and activate conda environment
+
+. $MINICONDA/etc/profile.d/conda.sh
+conda activate pyarrow-$PYTHON
+export ARROW_HOME=$CONDA_PREFIX
+
+#----------------------------------------------------------------------
+# Build C++ library
+
+mkdir -p $CPP_BUILD_DIR
+pushd $CPP_BUILD_DIR
+
+cmake -GNinja \
+ -DCMAKE_BUILD_TYPE=DEBUG \
+ -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
+ -DCMAKE_INSTALL_LIBDIR=lib \
+ -DARROW_FLIGHT=ON \
+ -DARROW_WITH_BZ2=ON \
+ -DARROW_WITH_ZLIB=ON \
+ -DARROW_WITH_ZSTD=ON \
+ -DARROW_WITH_LZ4=ON \
+ -DARROW_WITH_SNAPPY=ON \
+ -DARROW_WITH_BROTLI=ON \
+ -DARROW_PARQUET=ON \
+ -DARROW_PLASMA=ON \
+ -DARROW_PYTHON=ON \
+ $ARROW_ROOT/cpp
+
+ninja install
+
+popd
+
+#----------------------------------------------------------------------
+# Build and test Python library
+pushd $ARROW_ROOT/python
+
+rm -rf build/ # remove any pesky pre-existing build directory
+
+export PYARROW_BUILD_TYPE=Debug
+export PYARROW_CMAKE_GENERATOR=Ninja
+export PYARROW_WITH_FLIGHT=1
+export PYARROW_WITH_PARQUET=1
+
+# You can run either "develop" or "build_ext --inplace". Your pick
+
+# python setup.py build_ext --inplace
+python setup.py develop
+
+# git submodules are required for unit tests
+git submodule update --init
+export PARQUET_TEST_DATA="$ARROW_ROOT/cpp/submodules/parquet-testing/data"
+export ARROW_TEST_DATA="$ARROW_ROOT/testing/data"
+
+py.test pyarrow
diff --git a/src/arrow/python/examples/minimal_build/build_venv.sh b/src/arrow/python/examples/minimal_build/build_venv.sh
new file mode 100755
index 000000000..f92f4af3e
--- /dev/null
+++ b/src/arrow/python/examples/minimal_build/build_venv.sh
@@ -0,0 +1,84 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -e
+
+#----------------------------------------------------------------------
+# Change this to whatever makes sense for your system
+
+WORKDIR=${WORKDIR:-$HOME}
+MINICONDA=$WORKDIR/miniconda-for-arrow
+LIBRARY_INSTALL_DIR=$WORKDIR/local-libs
+CPP_BUILD_DIR=$WORKDIR/arrow-cpp-build
+ARROW_ROOT=$WORKDIR/arrow
+export ARROW_HOME=$WORKDIR/dist
+export LD_LIBRARY_PATH=$ARROW_HOME/lib:$LD_LIBRARY_PATH
+
+virtualenv $WORKDIR/venv
+source $WORKDIR/venv/bin/activate
+
+git clone https://github.com/apache/arrow.git $ARROW_ROOT
+
+pip install -r $ARROW_ROOT/python/requirements-build.txt \
+ -r $ARROW_ROOT/python/requirements-test.txt
+
+#----------------------------------------------------------------------
+# Build C++ library
+
+mkdir -p $CPP_BUILD_DIR
+pushd $CPP_BUILD_DIR
+
+cmake -GNinja \
+ -DCMAKE_BUILD_TYPE=DEBUG \
+ -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
+ -DCMAKE_INSTALL_LIBDIR=lib \
+ -DARROW_WITH_BZ2=ON \
+ -DARROW_WITH_ZLIB=ON \
+ -DARROW_WITH_ZSTD=ON \
+ -DARROW_WITH_LZ4=ON \
+ -DARROW_WITH_SNAPPY=ON \
+ -DARROW_WITH_BROTLI=ON \
+ -DARROW_PARQUET=ON \
+ -DARROW_PYTHON=ON \
+ $ARROW_ROOT/cpp
+
+ninja install
+
+popd
+
+#----------------------------------------------------------------------
+# Build and test Python library
+pushd $ARROW_ROOT/python
+
+rm -rf build/ # remove any pesky pre-existing build directory
+
+export PYARROW_BUILD_TYPE=Debug
+export PYARROW_CMAKE_GENERATOR=Ninja
+export PYARROW_WITH_PARQUET=1
+
+# You can run either "develop" or "build_ext --inplace". Your pick
+
+# python setup.py build_ext --inplace
+python setup.py develop
+
+# git submodules are required for unit tests
+git submodule update --init
+export PARQUET_TEST_DATA="$ARROW_ROOT/cpp/submodules/parquet-testing/data"
+export ARROW_TEST_DATA="$ARROW_ROOT/testing/data"
+
+py.test pyarrow
diff --git a/src/arrow/python/examples/plasma/sorting/multimerge.pyx b/src/arrow/python/examples/plasma/sorting/multimerge.pyx
new file mode 100644
index 000000000..5e77fdfcc
--- /dev/null
+++ b/src/arrow/python/examples/plasma/sorting/multimerge.pyx
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from libc.stdint cimport uintptr_t
+from libcpp.vector cimport vector
+from libcpp.pair cimport pair
+
+import numpy as np
+
+cimport numpy as np
+
+cdef extern from "<queue>" namespace "std" nogil:
+ cdef cppclass priority_queue[T]:
+ priority_queue() except +
+ priority_queue(priority_queue&) except +
+ bint empty()
+ void pop()
+ void push(T&)
+ size_t size()
+ T& top()
+
+
+def multimerge2d(*arrays):
+ """Merge a list of sorted 2d arrays into a sorted 2d array.
+
+ This assumes C style ordering for both input and output arrays. For
+ each input array we have array[i,0] <= array[i+1,0] and for the output
+ array the same will hold.
+
+ Ideally this code would be simpler and also support both C style
+ and Fortran style ordering.
+ """
+ cdef int num_arrays = len(arrays)
+ assert num_arrays > 0
+
+ cdef int num_cols = arrays[0].shape[1]
+
+ for i in range(num_arrays):
+ assert arrays[i].ndim == 2
+ assert arrays[i].dtype == np.float64
+ assert arrays[i].shape[1] == num_cols
+ assert not np.isfortran(arrays[i])
+
+ cdef vector[double*] data
+
+ # The indices vector keeps track of the index of the next row to process in
+ # each array.
+ cdef vector[int] indices = num_arrays * [0]
+
+ # The sizes vector stores the total number of elements that each array has.
+ cdef vector[int] sizes
+
+ cdef priority_queue[pair[double, int]] queue
+ cdef pair[double, int] top
+ cdef int num_rows = sum([array.shape[0] for array in arrays])
+ cdef np.ndarray[np.float64_t, ndim=2] result = np.zeros(
+ (num_rows, num_cols), dtype=np.float64)
+ cdef double* result_ptr = <double*> np.PyArray_DATA(result)
+ for i in range(num_arrays):
+ if arrays[i].size > 0:
+ sizes.push_back(arrays[i].size)
+ data.push_back(<double*> np.PyArray_DATA(arrays[i]))
+ queue.push(pair[double, int](-data[i][0], i))
+
+ cdef int curr_idx = 0
+ cdef int j
+ cdef int col = 0
+
+ for j in range(num_rows):
+ top = queue.top()
+ for col in range(num_cols):
+ result_ptr[curr_idx + col] = (
+ data[top.second][indices[top.second] + col])
+
+ indices[top.second] += num_cols
+ curr_idx += num_cols
+
+ queue.pop()
+ if indices[top.second] < sizes[top.second]:
+ queue.push(
+ pair[double, int](-data[top.second][indices[top.second]],
+ top.second))
+
+ return result
diff --git a/src/arrow/python/examples/plasma/sorting/setup.py b/src/arrow/python/examples/plasma/sorting/setup.py
new file mode 100644
index 000000000..a5dfa5ae0
--- /dev/null
+++ b/src/arrow/python/examples/plasma/sorting/setup.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numpy as np
+from setuptools import setup
+from Cython.Build import cythonize
+
+setup(
+ name="multimerge",
+ extra_compile_args=["-O3", "-mtune=native", "-march=native"],
+ ext_modules=cythonize("multimerge.pyx"),
+ include_dirs=[np.get_include()],
+)
diff --git a/src/arrow/python/examples/plasma/sorting/sort_df.py b/src/arrow/python/examples/plasma/sorting/sort_df.py
new file mode 100644
index 000000000..2a51759a6
--- /dev/null
+++ b/src/arrow/python/examples/plasma/sorting/sort_df.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from multiprocessing import Pool
+import numpy as np
+import pandas as pd
+import pyarrow as pa
+import pyarrow.plasma as plasma
+import subprocess
+import time
+
+import multimerge
+
+# To run this example, you will first need to run "python setup.py install" in
+# this directory to build the Cython module.
+#
+# You will only see speedups if you run this code on more data, this is just a
+# small example that can run on a laptop.
+#
+# The values we used to get a speedup (on a m4.10xlarge instance on EC2) were
+# object_store_size = 84 * 10 ** 9
+# num_cores = 20
+# num_rows = 10 ** 9
+# num_cols = 1
+
+client = None
+object_store_size = 2 * 10 ** 9 # 2 GB
+num_cores = 8
+num_rows = 200000
+num_cols = 2
+column_names = [str(i) for i in range(num_cols)]
+column_to_sort = column_names[0]
+
+
+# Connect to clients
+def connect():
+ global client
+ client = plasma.connect('/tmp/store')
+ np.random.seed(int(time.time() * 10e7) % 10000000)
+
+
+def put_df(df):
+ record_batch = pa.RecordBatch.from_pandas(df)
+
+ # Get size of record batch and schema
+ mock_sink = pa.MockOutputStream()
+ stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
+ stream_writer.write_batch(record_batch)
+ data_size = mock_sink.size()
+
+ # Generate an ID and allocate a buffer in the object store for the
+ # serialized DataFrame
+ object_id = plasma.ObjectID(np.random.bytes(20))
+ buf = client.create(object_id, data_size)
+
+ # Write the serialized DataFrame to the object store
+ sink = pa.FixedSizeBufferWriter(buf)
+ stream_writer = pa.RecordBatchStreamWriter(sink, record_batch.schema)
+ stream_writer.write_batch(record_batch)
+
+ # Seal the object
+ client.seal(object_id)
+
+ return object_id
+
+
+def get_dfs(object_ids):
+ """Retrieve dataframes from the object store given their object IDs."""
+ buffers = client.get_buffers(object_ids)
+ return [pa.RecordBatchStreamReader(buf).read_next_batch().to_pandas()
+ for buf in buffers]
+
+
+def local_sort(object_id):
+ """Sort a partition of a dataframe."""
+ # Get the dataframe from the object store.
+ [df] = get_dfs([object_id])
+ # Sort the dataframe.
+ sorted_df = df.sort_values(by=column_to_sort)
+ # Get evenly spaced values from the dataframe.
+ indices = np.linspace(0, len(df) - 1, num=num_cores, dtype=np.int64)
+ # Put the sorted dataframe in the object store and return the corresponding
+ # object ID as well as the sampled values.
+ return put_df(sorted_df), sorted_df.as_matrix().take(indices)
+
+
+def local_partitions(object_id_and_pivots):
+ """Take a sorted partition of a dataframe and split it into more pieces."""
+ object_id, pivots = object_id_and_pivots
+ [df] = get_dfs([object_id])
+ split_at = df[column_to_sort].searchsorted(pivots)
+ split_at = [0] + list(split_at) + [len(df)]
+ # Partition the sorted dataframe and put each partition into the object
+ # store.
+ return [put_df(df[i:j]) for i, j in zip(split_at[:-1], split_at[1:])]
+
+
+def merge(object_ids):
+ """Merge a number of sorted dataframes into a single sorted dataframe."""
+ dfs = get_dfs(object_ids)
+
+ # In order to use our multimerge code, we have to convert the arrays from
+ # the Fortran format to the C format.
+ arrays = [np.ascontiguousarray(df.as_matrix()) for df in dfs]
+ for a in arrays:
+ assert a.dtype == np.float64
+ assert not np.isfortran(a)
+
+ # Filter out empty arrays.
+ arrays = [a for a in arrays if a.shape[0] > 0]
+
+ if len(arrays) == 0:
+ return None
+
+ resulting_array = multimerge.multimerge2d(*arrays)
+ merged_df2 = pd.DataFrame(resulting_array, columns=column_names)
+
+ return put_df(merged_df2)
+
+
+if __name__ == '__main__':
+ # Start the plasma store.
+ p = subprocess.Popen(['plasma_store',
+ '-s', '/tmp/store',
+ '-m', str(object_store_size)])
+
+ # Connect to the plasma store.
+ connect()
+
+ # Connect the processes in the pool.
+ pool = Pool(initializer=connect, initargs=(), processes=num_cores)
+
+ # Create a DataFrame from a numpy array.
+ df = pd.DataFrame(np.random.randn(num_rows, num_cols),
+ columns=column_names)
+
+ partition_ids = [put_df(partition) for partition
+ in np.split(df, num_cores)]
+
+ # Begin timing the parallel sort example.
+ parallel_sort_start = time.time()
+
+ # Sort each partition and subsample them. The subsampled values will be
+ # used to create buckets.
+ sorted_df_ids, pivot_groups = list(zip(*pool.map(local_sort,
+ partition_ids)))
+
+ # Choose the pivots.
+ all_pivots = np.concatenate(pivot_groups)
+ indices = np.linspace(0, len(all_pivots) - 1, num=num_cores,
+ dtype=np.int64)
+ pivots = np.take(np.sort(all_pivots), indices)
+
+ # Break all of the sorted partitions into even smaller partitions. Group
+ # the object IDs from each bucket together.
+ results = list(zip(*pool.map(local_partitions,
+ zip(sorted_df_ids,
+ len(sorted_df_ids) * [pivots]))))
+
+ # Merge each of the buckets and store the results in the object store.
+ object_ids = pool.map(merge, results)
+
+ resulting_ids = [object_id for object_id in object_ids
+ if object_id is not None]
+
+ # Stop timing the paralle sort example.
+ parallel_sort_end = time.time()
+
+ print('Parallel sort took {} seconds.'
+ .format(parallel_sort_end - parallel_sort_start))
+
+ serial_sort_start = time.time()
+
+ original_sorted_df = df.sort_values(by=column_to_sort)
+
+ serial_sort_end = time.time()
+
+ # Check that we sorted the DataFrame properly.
+
+ sorted_dfs = get_dfs(resulting_ids)
+ sorted_df = pd.concat(sorted_dfs)
+
+ print('Serial sort took {} seconds.'
+ .format(serial_sort_end - serial_sort_start))
+
+ assert np.allclose(sorted_df.values, original_sorted_df.values)
+
+ # Kill the object store.
+ p.kill()