# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import contextlib import os import pyarrow as pa import shutil import subprocess import sys import tempfile import time from pyarrow._plasma import (ObjectID, ObjectNotAvailable, # noqa PlasmaBuffer, PlasmaClient, connect, PlasmaObjectExists, PlasmaObjectNotFound, PlasmaStoreFull) # The Plasma TensorFlow Operator needs to be compiled on the end user's # machine since the TensorFlow ABI is not stable between versions. # The following code checks if the operator is already present. If not, # the function build_plasma_tensorflow_op can be used to compile it. TF_PLASMA_OP_PATH = os.path.join(pa.__path__[0], "tensorflow", "plasma_op.so") tf_plasma_op = None def load_plasma_tensorflow_op(): global tf_plasma_op import tensorflow as tf tf_plasma_op = tf.load_op_library(TF_PLASMA_OP_PATH) def build_plasma_tensorflow_op(): global tf_plasma_op try: import tensorflow as tf print("TensorFlow version: " + tf.__version__) except ImportError: pass else: print("Compiling Plasma TensorFlow Op...") dir_path = os.path.dirname(os.path.realpath(__file__)) cc_path = os.path.join(dir_path, "tensorflow", "plasma_op.cc") so_path = os.path.join(dir_path, "tensorflow", "plasma_op.so") tf_cflags = tf.sysconfig.get_compile_flags() if sys.platform == 'darwin': tf_cflags = ["-undefined", "dynamic_lookup"] + tf_cflags cmd = ["g++", "-std=c++11", "-g", "-shared", cc_path, "-o", so_path, "-DNDEBUG", "-I" + pa.get_include()] cmd += ["-L" + dir for dir in pa.get_library_dirs()] cmd += ["-lplasma", "-larrow_python", "-larrow", "-fPIC"] cmd += tf_cflags cmd += tf.sysconfig.get_link_flags() cmd += ["-O2"] if tf.test.is_built_with_cuda(): cmd += ["-DGOOGLE_CUDA"] print("Running command " + str(cmd)) subprocess.check_call(cmd) tf_plasma_op = tf.load_op_library(TF_PLASMA_OP_PATH) @contextlib.contextmanager def start_plasma_store(plasma_store_memory, use_valgrind=False, use_profiler=False, plasma_directory=None, use_hugepages=False, external_store=None): """Start a plasma store process. Args: plasma_store_memory (int): Capacity of the plasma store in bytes. use_valgrind (bool): True if the plasma store should be started inside of valgrind. If this is True, use_profiler must be False. use_profiler (bool): True if the plasma store should be started inside a profiler. If this is True, use_valgrind must be False. plasma_directory (str): Directory where plasma memory mapped files will be stored. use_hugepages (bool): True if the plasma store should use huge pages. external_store (str): External store to use for evicted objects. Return: A tuple of the name of the plasma store socket and the process ID of the plasma store process. """ if use_valgrind and use_profiler: raise Exception("Cannot use valgrind and profiler at the same time.") tmpdir = tempfile.mkdtemp(prefix='test_plasma-') try: plasma_store_name = os.path.join(tmpdir, 'plasma.sock') plasma_store_executable = os.path.join( pa.__path__[0], "plasma-store-server") if not os.path.exists(plasma_store_executable): # Fallback to sys.prefix/bin/ (conda) plasma_store_executable = os.path.join( sys.prefix, "bin", "plasma-store-server") command = [plasma_store_executable, "-s", plasma_store_name, "-m", str(plasma_store_memory)] if plasma_directory: command += ["-d", plasma_directory] if use_hugepages: command += ["-h"] if external_store is not None: command += ["-e", external_store] stdout_file = None stderr_file = None if use_valgrind: command = ["valgrind", "--track-origins=yes", "--leak-check=full", "--show-leak-kinds=all", "--leak-check-heuristics=stdstring", "--error-exitcode=1"] + command proc = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) time.sleep(1.0) elif use_profiler: command = ["valgrind", "--tool=callgrind"] + command proc = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) time.sleep(1.0) else: proc = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file) time.sleep(0.1) rc = proc.poll() if rc is not None: raise RuntimeError("plasma_store exited unexpectedly with " "code %d" % (rc,)) yield plasma_store_name, proc finally: if proc.poll() is None: proc.kill() shutil.rmtree(tmpdir)