summaryrefslogtreecommitdiffstats
path: root/src/arrow/java/plasma
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/java/plasma')
-rw-r--r--src/arrow/java/plasma/README.md39
-rw-r--r--src/arrow/java/plasma/pom.xml34
-rw-r--r--src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java131
-rw-r--r--src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java184
-rw-r--r--src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java57
-rw-r--r--src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/DuplicateObjectException.java32
-rw-r--r--src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java32
-rw-r--r--src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException.java40
-rw-r--r--src/arrow/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java313
-rw-r--r--src/arrow/java/plasma/src/test/resources/logback.xml28
-rwxr-xr-xsrc/arrow/java/plasma/test.sh56
11 files changed, 946 insertions, 0 deletions
diff --git a/src/arrow/java/plasma/README.md b/src/arrow/java/plasma/README.md
new file mode 100644
index 000000000..0dcb4e21f
--- /dev/null
+++ b/src/arrow/java/plasma/README.md
@@ -0,0 +1,39 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Java Plasma Client
+
+## Setup Build Environment
+
+Install:
+ - java 8 or later
+ - maven 3.3 or later
+ - the same requirement of build [Arrow C++](https://github.com/apache/arrow/tree/master/cpp)
+
+## Build the jar of plasma client
+
+```
+cd ..
+mvn clean install -pl plasma -am -Dmaven.test.skip
+```
+
+## Building and running tests
+```
+./test.sh
+```
diff --git a/src/arrow/java/plasma/pom.xml b/src/arrow/java/plasma/pom.xml
new file mode 100644
index 000000000..725b414b1
--- /dev/null
+++ b/src/arrow/java/plasma/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-java-root</artifactId>
+ <version>6.0.1</version>
+ </parent>
+ <artifactId>arrow-plasma</artifactId>
+ <name>Arrow Plasma Client</name>
+ <description>(Experimental/Contrib) Java client for the Plasma object store.</description>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
new file mode 100644
index 000000000..93a2d483c
--- /dev/null
+++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.plasma;
+
+import java.util.List;
+
+import org.apache.arrow.plasma.exceptions.DuplicateObjectException;
+import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException;
+
+/**
+ * Object store interface, which provides the capabilities to put and get raw byte array, and serves.
+ */
+public interface ObjectStoreLink {
+
+ /**
+ * Tuple for data and metadata stored in Plasma.
+ */
+ class ObjectStoreData {
+
+ public ObjectStoreData(byte[] metadata, byte[] data) {
+ this.data = data;
+ this.metadata = metadata;
+ }
+
+ public final byte[] metadata;
+ public final byte[] data;
+ }
+
+ /**
+ * Put value in the local plasma store with object ID <tt>objectId</tt>.
+ *
+ * @param objectId The object ID of the value to be put.
+ * @param value The value to put in the object store.
+ * @param metadata encodes whatever metadata the user wishes to encode.
+ */
+ void put(byte[] objectId, byte[] value, byte[] metadata)
+ throws DuplicateObjectException, PlasmaOutOfMemoryException;
+
+ /**
+ * Get a buffer from the PlasmaStore based on the <tt>objectId</tt>.
+ *
+ * @param objectId The object ID used to identify the object.
+ * @param timeoutMs The number of milliseconds that the get call should block before timing out
+ * and returning. Pass -1 if the call should block and 0 if the call should return immediately.
+ * @param isMetadata false if get data, otherwise get metadata.
+ * @return A PlasmaBuffer wrapping the object.
+ */
+ default byte[] get(byte[] objectId, int timeoutMs, boolean isMetadata) {
+ byte[][] objectIds = {objectId};
+ return get(objectIds, timeoutMs, isMetadata).get(0);
+ }
+
+ /**
+ * Get buffers from the PlasmaStore based on <tt>objectIds</tt>.
+ *
+ * @param objectIds List of object IDs used to identify some objects.
+ * @param timeoutMs The number of milliseconds that the get call should block before timing out
+ * and returning. Pass -1 if the call should block and 0 if the call should return immediately.
+ * @param isMetadata false if get data, otherwise get metadata.
+ * @return List of PlasmaBuffers wrapping objects.
+ */
+ List<byte[]> get(byte[][] objectIds, int timeoutMs, boolean isMetadata);
+
+ /**
+ * Get buffer pairs (data & metadata) from the PlasmaStore based on <tt>objectIds</tt>.
+ *
+ * @param objectIds List of object IDs used to identify some objects.
+ * @param timeoutMs The number of milliseconds that the get call should block before timing out
+ * and returning. Pass -1 if the call should block and 0 if the call should return immediately.
+ * @return List of Pairs of PlasmaBuffer wrapping objects and its metadata.
+ */
+ List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs);
+
+ /**
+ * Compute the hash of an object in the object store.
+ *
+ * @param objectId The object ID used to identify the object.
+ * @return A digest byte array contains object's SHA256 hash. <tt>null</tt> means that the object
+ * isn't in the object store.
+ */
+ byte[] hash(byte[] objectId);
+
+ /**
+ * Evict some objects to recover given count of bytes.
+ *
+ * @param numBytes The number of bytes to attempt to recover.
+ * @return The number of bytes that have been evicted.
+ */
+ long evict(long numBytes);
+
+ /**
+ * Release the reference of the object.
+ *
+ * @param objectId The object ID used to release the reference of the object.
+ */
+ void release(byte[] objectId);
+
+ /**
+ * Removes object with given objectId from plasma store.
+ *
+ * @param objectId used to identify an object.
+ */
+ void delete(byte[] objectId);
+
+ /**
+ * Check if the object is present and has been sealed in the PlasmaStore.
+ *
+ * @param objectId used to identify an object.
+ */
+ boolean contains(byte[] objectId);
+
+ /**
+ * List all objects in the PlasmaStore.
+ */
+ List<byte[]> list();
+}
diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
new file mode 100644
index 000000000..fdd7114f1
--- /dev/null
+++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.plasma;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.arrow.plasma.exceptions.DuplicateObjectException;
+import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException;
+
+/**
+ * The PlasmaClient is used to interface with a plasma store and manager.
+ *
+ * <p>The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a buffer, and get a
+ * buffer. Buffers are referred to by object IDs.
+ */
+public class PlasmaClient implements ObjectStoreLink {
+
+ private final long conn;
+
+ protected void finalize() {
+ PlasmaClientJNI.disconnect(this.conn);
+ }
+
+ // use plasma client to initialize the underlying jni system as well via config and config-overwrites
+ public PlasmaClient(String storeSocketName, String managerSocketName, int releaseDelay) {
+ this.conn = PlasmaClientJNI.connect(storeSocketName, managerSocketName, releaseDelay);
+ }
+
+ // interface methods --------------------
+
+ @Override
+ public void put(byte[] objectId, byte[] value, byte[] metadata)
+ throws DuplicateObjectException, PlasmaOutOfMemoryException {
+ ByteBuffer buf = PlasmaClientJNI.create(conn, objectId, value.length, metadata);
+ buf.put(value);
+ PlasmaClientJNI.seal(conn, objectId);
+ PlasmaClientJNI.release(conn, objectId);
+ }
+
+ @Override
+ public List<byte[]> get(byte[][] objectIds, int timeoutMs, boolean isMetadata) {
+ ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs);
+ assert bufs.length == objectIds.length;
+
+ List<byte[]> ret = new ArrayList<>();
+ for (int i = 0; i < bufs.length; i++) {
+ ByteBuffer buf = bufs[i][isMetadata ? 1 : 0];
+ if (buf == null) {
+ ret.add(null);
+ } else {
+ byte[] bb = new byte[buf.remaining()];
+ buf.get(bb);
+ ret.add(bb);
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public byte[] hash(byte[] objectId) {
+ return PlasmaClientJNI.hash(conn, objectId);
+ }
+
+ @Override
+ public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) {
+ ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs);
+ assert bufs.length == objectIds.length;
+
+ List<ObjectStoreData> ret = new ArrayList<>();
+ for (int i = 0; i < bufs.length; i++) {
+ ByteBuffer databuf = bufs[i][0];
+ ByteBuffer metabuf = bufs[i][1];
+ if (databuf == null) {
+ ret.add(new ObjectStoreData(null, null));
+ } else {
+ byte[] data = new byte[databuf.remaining()];
+ databuf.get(data);
+ byte[] meta;
+ if (metabuf != null) {
+ meta = new byte[metabuf.remaining()];
+ metabuf.get(meta);
+ } else {
+ meta = null;
+ }
+ ret.add(new ObjectStoreData(meta, data));
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Get an object in Plasma Store with objectId. Will return an off-heap ByteBuffer.
+ *
+ * @param objectId used to identify an object.
+ * @param timeoutMs time in milliseconfs to wait before this request time out.
+ * @param isMetadata get this object's metadata or data.
+ */
+ public ByteBuffer getObjAsByteBuffer(byte[] objectId, int timeoutMs, boolean isMetadata) {
+ byte[][] objectIds = new byte[][]{objectId};
+ ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs);
+ return bufs[0][isMetadata ? 1 : 0];
+ }
+
+ @Override
+ public List<byte[]> list() {
+ return Arrays.asList(PlasmaClientJNI.list(conn));
+ }
+
+ @Override
+ public long evict(long numBytes) {
+ return PlasmaClientJNI.evict(conn, numBytes);
+ }
+
+ // wrapper methods --------------------
+
+ /**
+ * Create an object in Plasma Store with particular size. Will return an off-heap ByteBuffer.
+ *
+ * @param objectId used to identify an object.
+ * @param size size in bytes to be allocated for this object.
+ * @param metadata this object's metadata. It should be null if there is no metadata.
+ */
+ public ByteBuffer create(byte[] objectId, int size, byte[] metadata)
+ throws DuplicateObjectException, PlasmaOutOfMemoryException {
+ return PlasmaClientJNI.create(conn, objectId, size, metadata);
+ }
+
+ /**
+ * Seal the buffer in the PlasmaStore for a particular object ID.
+ * Once a buffer has been sealed, the buffer is immutable and can only be accessed through get.
+ *
+ * @param objectId used to identify an object.
+ */
+ public void seal(byte[] objectId) {
+ PlasmaClientJNI.seal(conn, objectId);
+ }
+
+ /**
+ * Notify Plasma that the object is no longer needed.
+ *
+ * @param objectId used to identify an object.
+ */
+ public void release(byte[] objectId) {
+ PlasmaClientJNI.release(conn, objectId);
+ }
+
+ /**
+ * Removes object with given objectId from plasma store.
+ *
+ * @param objectId used to identify an object.
+ */
+ @Override
+ public void delete(byte[] objectId) {
+ PlasmaClientJNI.delete(conn, objectId);
+ }
+
+ /**
+ * Check if the object is present and has been sealed in the PlasmaStore.
+ *
+ * @param objectId used to identify an object.
+ */
+ @Override
+ public boolean contains(byte[] objectId) {
+ return PlasmaClientJNI.contains(conn, objectId);
+ }
+}
diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java
new file mode 100644
index 000000000..da5c17e6b
--- /dev/null
+++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.plasma;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.plasma.exceptions.DuplicateObjectException;
+import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException;
+
+/**
+ * JNI static methods for PlasmaClient.
+ */
+public class PlasmaClientJNI {
+
+ public static native long connect(String storeSocketName, String managerSocketName, int releaseDelay);
+
+ public static native void disconnect(long conn);
+
+ public static native ByteBuffer create(long conn, byte[] objectId, int size, byte[] metadata)
+ throws DuplicateObjectException, PlasmaOutOfMemoryException;
+
+ public static native byte[] hash(long conn, byte[] objectId);
+
+ public static native void seal(long conn, byte[] objectId);
+
+ public static native void release(long conn, byte[] objectId);
+
+ public static native ByteBuffer[][] get(long conn, byte[][] objectIds, int timeoutMs);
+
+ public static native void delete(long conn, byte[] objectId);
+
+ public static native boolean contains(long conn, byte[] objectId);
+
+ public static native void fetch(long conn, byte[][] objectIds);
+
+ public static native byte[][] wait(long conn, byte[][] objectIds, int timeoutMs,
+ int numReturns);
+
+ public static native long evict(long conn, long numBytes);
+
+ public static native byte[][] list(long conn);
+}
diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/DuplicateObjectException.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/DuplicateObjectException.java
new file mode 100644
index 000000000..cb735282c
--- /dev/null
+++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/DuplicateObjectException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.plasma.exceptions;
+
+/**
+ * Thrown when attempting to place an object into the store for an ID that already exists.
+ */
+public class DuplicateObjectException extends RuntimeException {
+
+ public DuplicateObjectException(String objectId) {
+ super("An object with ID " + objectId + " already exists in the plasma store.");
+ }
+
+ public DuplicateObjectException(String objectId, Throwable t) {
+ super("An object with ID " + objectId + " already exists in the plasma store.", t);
+ }
+}
diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java
new file mode 100644
index 000000000..ff9d96b17
--- /dev/null
+++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.plasma.exceptions;
+
+/**
+ * Generic exception thrown by the plasma client (for example on failure to connect).
+ */
+public class PlasmaClientException extends RuntimeException {
+
+ public PlasmaClientException(String message) {
+ super(message);
+ }
+
+ public PlasmaClientException(String message, Throwable t) {
+ super(message, t);
+ }
+}
diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException.java
new file mode 100644
index 000000000..ffc4177eb
--- /dev/null
+++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.plasma.exceptions;
+
+/**
+ * Indicates no more memory is available in Plasma.
+ */
+public class PlasmaOutOfMemoryException extends RuntimeException {
+
+ public PlasmaOutOfMemoryException(String message) {
+ super("The plasma store ran out of memory." + message);
+ }
+
+ public PlasmaOutOfMemoryException(String message, Throwable t) {
+ super("The plasma store ran out of memory." + message, t);
+ }
+
+ public PlasmaOutOfMemoryException() {
+ super("The plasma store ran out of memory.");
+ }
+
+ public PlasmaOutOfMemoryException(Throwable t) {
+ super("The plasma store ran out of memory.", t);
+ }
+}
diff --git a/src/arrow/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java b/src/arrow/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java
new file mode 100644
index 000000000..e02ee51f9
--- /dev/null
+++ b/src/arrow/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.plasma;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.plasma.exceptions.DuplicateObjectException;
+import org.apache.arrow.plasma.exceptions.PlasmaClientException;
+import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException;
+import org.junit.Assert;
+
+public class PlasmaClientTest {
+
+ private String storeSuffix = "/tmp/store";
+
+ private Process storeProcess;
+
+ private int storePort;
+
+ private ObjectStoreLink pLink;
+
+
+ public PlasmaClientTest() throws Exception {
+ try {
+ String plasmaStorePath = System.getenv("PLASMA_STORE");
+ if (plasmaStorePath == null) {
+ throw new Exception("Please set plasma store path in env PLASMA_STORE");
+ }
+
+ this.startObjectStore(plasmaStorePath);
+ System.loadLibrary("plasma_java");
+ pLink = new PlasmaClient(this.getStoreAddress(), "", 0);
+ } catch (Throwable t) {
+ cleanup();
+ throw t;
+ }
+
+ }
+
+ private Process startProcess(String[] cmd) {
+ ProcessBuilder builder;
+ List<String> newCmd = Arrays.stream(cmd).filter(s -> s.length() > 0).collect(Collectors.toList());
+ builder = new ProcessBuilder(newCmd);
+ builder.inheritIO();
+ Process p = null;
+ try {
+ p = builder.start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ System.out.println("Start process " + p.hashCode() + " OK, cmd = " + Arrays.toString(cmd).replace(',', ' '));
+ return p;
+ }
+
+ private void startObjectStore(String plasmaStorePath) {
+ int occupiedMemoryMB = 10;
+ long memoryBytes = occupiedMemoryMB * 1000000;
+ int numRetries = 10;
+ Process p = null;
+ while (numRetries-- > 0) {
+ int currentPort = java.util.concurrent.ThreadLocalRandom.current().nextInt(0, 100000);
+ String name = storeSuffix + currentPort;
+ String cmd = plasmaStorePath + " -s " + name + " -m " + memoryBytes;
+
+ p = startProcess(cmd.split(" "));
+
+ if (p != null && p.isAlive()) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ if (p.isAlive()) {
+ storePort = currentPort;
+ break;
+ }
+ }
+ }
+
+
+ if (p == null || !p.isAlive()) {
+ throw new RuntimeException("Start object store failed ...");
+ } else {
+ storeProcess = p;
+ System.out.println("Start object store success");
+ }
+ }
+
+ private void cleanup() {
+ if (storeProcess != null && killProcess(storeProcess)) {
+ System.out.println("Kill plasma store process forcibly");
+ }
+ }
+
+ private static boolean killProcess(Process p) {
+ if (p.isAlive()) {
+ p.destroyForcibly();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void doTest() {
+ System.out.println("Start test.");
+ int timeoutMs = 3000;
+ byte[] id1 = new byte[20];
+ Arrays.fill(id1, (byte) 1);
+ byte[] value1 = new byte[20];
+ Arrays.fill(value1, (byte) 11);
+ pLink.put(id1, value1, null);
+
+ byte[] id2 = new byte[20];
+ Arrays.fill(id2, (byte) 2);
+ byte[] value2 = new byte[20];
+ Arrays.fill(value2, (byte) 12);
+ pLink.put(id2, value2, null);
+ System.out.println("Plasma java client put test success.");
+ byte[] getValue1 = pLink.get(id1, timeoutMs, false);
+ assert Arrays.equals(value1, getValue1);
+
+ byte[] getValue2 = pLink.get(id2, timeoutMs, false);
+ assert Arrays.equals(value2, getValue2);
+ System.out.println("Plasma java client get single object test success.");
+ byte[][] ids = {id1, id2};
+ List<byte[]> values = pLink.get(ids, timeoutMs, false);
+ assert Arrays.equals(values.get(0), value1);
+ assert Arrays.equals(values.get(1), value2);
+ System.out.println("Plasma java client get multi-object test success.");
+ try {
+ pLink.put(id1, value1, null);
+ Assert.fail("Fail to throw DuplicateObjectException when put an object into plasma store twice.");
+ } catch (DuplicateObjectException e) {
+ System.out.println("Plasma java client put same object twice exception test success.");
+ }
+ byte[] id1Hash = pLink.hash(id1);
+ assert id1Hash != null;
+ System.out.println("Plasma java client hash test success.");
+
+ boolean exist = pLink.contains(id2);
+ assert exist;
+ byte[] id3 = new byte[20];
+ Arrays.fill(id3, (byte) 3);
+ boolean notExist = pLink.contains(id3);
+ assert !notExist;
+ System.out.println("Plasma java client contains test success.");
+
+ byte[] id4 = new byte[20];
+ Arrays.fill(id4, (byte) 4);
+ byte[] value4 = new byte[20];
+ byte[] meta4 = "META4".getBytes();
+ Arrays.fill(value4, (byte) 14);
+ pLink.put(id4, value4, meta4);
+
+ List<byte[]> existIds = Arrays.asList(id1, id2, id3, id4);
+ List<byte[]> listIds = pLink.list();
+ assert listIds.size() == 4;
+ for (byte[] existId : existIds) {
+ boolean found = false;
+ for (byte[] listId : listIds) {
+ if (Arrays.equals(listId, existId)) {
+ found = true;
+ }
+ }
+ assert found;
+ }
+ System.out.println("Plasma java client list test success.");
+
+ byte[] id5 = new byte[20];
+ Arrays.fill(id5, (byte) 5);
+ byte[] value5 = new byte[20];
+ byte[] meta5 = "META5".getBytes();
+ Arrays.fill(value5, (byte) 15);
+ pLink.put(id5, value5, meta5);
+
+ byte[] getMeta4 = pLink.get(id4, timeoutMs, true);
+ assert Arrays.equals(meta4, getMeta4);
+ byte[] getValue4 = pLink.get(id4, timeoutMs, false);
+ assert Arrays.equals(value4, getValue4);
+ byte[][] ids4 = new byte[1][];
+ ids4[0] = id4;
+ ObjectStoreLink.ObjectStoreData fullData4 = pLink.get(ids4, timeoutMs).get(0);
+ assert Arrays.equals(meta4, fullData4.metadata);
+ assert Arrays.equals(value4, fullData4.data);
+
+ byte[] getMeta5 = pLink.get(id5, timeoutMs, true);
+ assert Arrays.equals(meta5, getMeta5);
+ byte[] getValue5 = pLink.get(id5, timeoutMs, false);
+ assert Arrays.equals(value5, getValue5);
+ byte[][] ids5 = new byte[1][];
+ ids5[0] = id5;
+ ObjectStoreLink.ObjectStoreData fullData5 = pLink.get(ids5, timeoutMs).get(0);
+ assert Arrays.equals(meta5, fullData5.metadata);
+ assert Arrays.equals(value5, fullData5.data);
+ System.out.println("Plasma java client metadata get test success.");
+
+ byte[] id6 = getArrayFilledWithValue(20, (byte) 6);
+ byte[] val6 = getArrayFilledWithValue(21, (byte) 6);
+ pLink.put(id6, val6, null);
+ assert pLink.contains(id6);
+ pLink.delete(id6);
+ assert !pLink.contains(id6);
+ System.out.println("Plasma java client delete test success.");
+
+ // Test calling shutdown while getting the object.
+ Thread thread = new Thread(() -> {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ cleanup();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Got InterruptedException when sleeping.", e);
+ }
+ });
+ thread.start();
+
+ try {
+ byte[] idNone = new byte[20];
+ Arrays.fill(idNone, (byte) 987);
+ pLink.get(idNone, timeoutMs, false);
+ Assert.fail("Fail to throw PlasmaClientException when get an object " +
+ "when object store shutdown.");
+ } catch (PlasmaClientException e) {
+ System.out.println(String.format("Expected PlasmaClientException: %s", e));
+ }
+
+ try {
+ thread.join();
+ } catch (Exception e) {
+ System.out.println(String.format("Exception caught: %s", e));
+ }
+ System.out.println("All test success.");
+
+ }
+
+ public void doByteBufferTest() {
+ System.out.println("Start ByteBuffer test.");
+ PlasmaClient client = (PlasmaClient) pLink;
+ byte[] id = new byte[20];
+ Arrays.fill(id, (byte) 10);
+ ByteBuffer buf = client.create(id, 100, null);
+ assert buf.isDirect();
+ for (int i = 0; i < 10; i++) {
+ buf.putInt(i);
+ }
+ client.seal(id);
+ client.release(id);
+ // buf is not available now.
+ assert client.contains(id);
+ System.out.println("Plasma java client create test success.");
+
+ ByteBuffer buf1 = client.getObjAsByteBuffer(id, -1, false);
+ assert buf1.limit() == 100;
+ for (int i = 0; i < 10; i++) {
+ assert buf1.getInt() == i;
+ }
+ System.out.println("Plasma java client getObjAsByteBuffer test success");
+ client.release(id);
+ }
+
+ public void doPlasmaOutOfMemoryExceptionTest() {
+ System.out.println("Start PlasmaOutOfMemoryException test.");
+ PlasmaClient client = (PlasmaClient) pLink;
+ byte[] objectId = new byte[20];
+ Arrays.fill(objectId, (byte) 1);
+ try {
+ ByteBuffer byteBuffer = client.create(objectId, 200000000, null);
+ Assert.fail("Fail to create an object, The plasma store ran out of memory.");
+ } catch (PlasmaOutOfMemoryException e) {
+ System.out.println(String.format("Expected PlasmaOutOfMemoryException: %s", e));
+ System.out.println("PlasmaOutOfMemoryException test success.");
+ }
+ }
+
+ private byte[] getArrayFilledWithValue(int arrayLength, byte val) {
+ byte[] arr = new byte[arrayLength];
+ Arrays.fill(arr, val);
+ return arr;
+ }
+
+ public String getStoreAddress() {
+ return storeSuffix + storePort;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ PlasmaClientTest plasmaClientTest = new PlasmaClientTest();
+ plasmaClientTest.doPlasmaOutOfMemoryExceptionTest();
+ plasmaClientTest.doByteBufferTest();
+ plasmaClientTest.doTest();
+ }
+
+}
diff --git a/src/arrow/java/plasma/src/test/resources/logback.xml b/src/arrow/java/plasma/src/test/resources/logback.xml
new file mode 100644
index 000000000..4c54d18a2
--- /dev/null
+++ b/src/arrow/java/plasma/src/test/resources/logback.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<configuration>
+ <statusListener class="ch.qos.logback.core.status.NopStatusListener"/>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <!-- encoders are assigned the type
+ ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.arrow" additivity="false">
+ <level value="info" />
+ <appender-ref ref="STDOUT" />
+ </logger>
+
+</configuration>
diff --git a/src/arrow/java/plasma/test.sh b/src/arrow/java/plasma/test.sh
new file mode 100755
index 000000000..dbfae646c
--- /dev/null
+++ b/src/arrow/java/plasma/test.sh
@@ -0,0 +1,56 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd)
+unamestr="$(uname)"
+if [[ "$unamestr" == "Linux" ]]; then
+ PARALLEL=$(nproc)
+elif [[ "$unamestr" == "Darwin" ]]; then
+ PARALLEL=$(sysctl -n hw.ncpu)
+else
+ echo "Unrecognized platform."
+ exit 1
+fi
+pushd ../../cpp
+ if [ ! -d "release" ]; then
+ mkdir release
+ fi
+ pushd release
+ cmake -DCMAKE_BUILD_TYPE=Release \
+ -DCMAKE_C_FLAGS="-g -O3" \
+ -DCMAKE_CXX_FLAGS="-g -O3" \
+ -DARROW_BUILD_TESTS=off \
+ -DARROW_HDFS=on \
+ -DARROW_BOOST_USE_SHARED=on \
+ -DARROW_PYTHON=on \
+ -DARROW_PLASMA=on \
+ -DPLASMA_PYTHON=on \
+ -DARROW_JEMALLOC=off \
+ -DARROW_WITH_BROTLI=off \
+ -DARROW_WITH_LZ4=off \
+ -DARROW_WITH_ZLIB=off \
+ -DARROW_WITH_ZSTD=off \
+ -DARROW_PLASMA_JAVA_CLIENT=on \
+ ..
+ make VERBOSE=1 -j$PARALLEL
+ popd
+popd
+
+mvn clean install
+export PLASMA_STORE=$ROOT_DIR/../../cpp/release/release/plasma-store-server
+java -cp target/test-classes:target/classes -Djava.library.path=$ROOT_DIR/../../cpp/release/release/ org.apache.arrow.plasma.PlasmaClientTest