diff options
Diffstat (limited to 'src/arrow/java/plasma')
11 files changed, 946 insertions, 0 deletions
diff --git a/src/arrow/java/plasma/README.md b/src/arrow/java/plasma/README.md new file mode 100644 index 000000000..0dcb4e21f --- /dev/null +++ b/src/arrow/java/plasma/README.md @@ -0,0 +1,39 @@ +<!--- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Java Plasma Client + +## Setup Build Environment + +Install: + - java 8 or later + - maven 3.3 or later + - the same requirement of build [Arrow C++](https://github.com/apache/arrow/tree/master/cpp) + +## Build the jar of plasma client + +``` +cd .. +mvn clean install -pl plasma -am -Dmaven.test.skip +``` + +## Building and running tests +``` +./test.sh +``` diff --git a/src/arrow/java/plasma/pom.xml b/src/arrow/java/plasma/pom.xml new file mode 100644 index 000000000..725b414b1 --- /dev/null +++ b/src/arrow/java/plasma/pom.xml @@ -0,0 +1,34 @@ +<?xml version="1.0"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-java-root</artifactId> + <version>6.0.1</version> + </parent> + <artifactId>arrow-plasma</artifactId> + <name>Arrow Plasma Client</name> + <description>(Experimental/Contrib) Java client for the Plasma object store.</description> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java new file mode 100644 index 000000000..93a2d483c --- /dev/null +++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/ObjectStoreLink.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma; + +import java.util.List; + +import org.apache.arrow.plasma.exceptions.DuplicateObjectException; +import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException; + +/** + * Object store interface, which provides the capabilities to put and get raw byte array, and serves. + */ +public interface ObjectStoreLink { + + /** + * Tuple for data and metadata stored in Plasma. + */ + class ObjectStoreData { + + public ObjectStoreData(byte[] metadata, byte[] data) { + this.data = data; + this.metadata = metadata; + } + + public final byte[] metadata; + public final byte[] data; + } + + /** + * Put value in the local plasma store with object ID <tt>objectId</tt>. + * + * @param objectId The object ID of the value to be put. + * @param value The value to put in the object store. + * @param metadata encodes whatever metadata the user wishes to encode. + */ + void put(byte[] objectId, byte[] value, byte[] metadata) + throws DuplicateObjectException, PlasmaOutOfMemoryException; + + /** + * Get a buffer from the PlasmaStore based on the <tt>objectId</tt>. + * + * @param objectId The object ID used to identify the object. + * @param timeoutMs The number of milliseconds that the get call should block before timing out + * and returning. Pass -1 if the call should block and 0 if the call should return immediately. + * @param isMetadata false if get data, otherwise get metadata. + * @return A PlasmaBuffer wrapping the object. + */ + default byte[] get(byte[] objectId, int timeoutMs, boolean isMetadata) { + byte[][] objectIds = {objectId}; + return get(objectIds, timeoutMs, isMetadata).get(0); + } + + /** + * Get buffers from the PlasmaStore based on <tt>objectIds</tt>. + * + * @param objectIds List of object IDs used to identify some objects. + * @param timeoutMs The number of milliseconds that the get call should block before timing out + * and returning. Pass -1 if the call should block and 0 if the call should return immediately. + * @param isMetadata false if get data, otherwise get metadata. + * @return List of PlasmaBuffers wrapping objects. + */ + List<byte[]> get(byte[][] objectIds, int timeoutMs, boolean isMetadata); + + /** + * Get buffer pairs (data & metadata) from the PlasmaStore based on <tt>objectIds</tt>. + * + * @param objectIds List of object IDs used to identify some objects. + * @param timeoutMs The number of milliseconds that the get call should block before timing out + * and returning. Pass -1 if the call should block and 0 if the call should return immediately. + * @return List of Pairs of PlasmaBuffer wrapping objects and its metadata. + */ + List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs); + + /** + * Compute the hash of an object in the object store. + * + * @param objectId The object ID used to identify the object. + * @return A digest byte array contains object's SHA256 hash. <tt>null</tt> means that the object + * isn't in the object store. + */ + byte[] hash(byte[] objectId); + + /** + * Evict some objects to recover given count of bytes. + * + * @param numBytes The number of bytes to attempt to recover. + * @return The number of bytes that have been evicted. + */ + long evict(long numBytes); + + /** + * Release the reference of the object. + * + * @param objectId The object ID used to release the reference of the object. + */ + void release(byte[] objectId); + + /** + * Removes object with given objectId from plasma store. + * + * @param objectId used to identify an object. + */ + void delete(byte[] objectId); + + /** + * Check if the object is present and has been sealed in the PlasmaStore. + * + * @param objectId used to identify an object. + */ + boolean contains(byte[] objectId); + + /** + * List all objects in the PlasmaStore. + */ + List<byte[]> list(); +} diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java new file mode 100644 index 000000000..fdd7114f1 --- /dev/null +++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClient.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.arrow.plasma.exceptions.DuplicateObjectException; +import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException; + +/** + * The PlasmaClient is used to interface with a plasma store and manager. + * + * <p>The PlasmaClient can ask the PlasmaStore to allocate a new buffer, seal a buffer, and get a + * buffer. Buffers are referred to by object IDs. + */ +public class PlasmaClient implements ObjectStoreLink { + + private final long conn; + + protected void finalize() { + PlasmaClientJNI.disconnect(this.conn); + } + + // use plasma client to initialize the underlying jni system as well via config and config-overwrites + public PlasmaClient(String storeSocketName, String managerSocketName, int releaseDelay) { + this.conn = PlasmaClientJNI.connect(storeSocketName, managerSocketName, releaseDelay); + } + + // interface methods -------------------- + + @Override + public void put(byte[] objectId, byte[] value, byte[] metadata) + throws DuplicateObjectException, PlasmaOutOfMemoryException { + ByteBuffer buf = PlasmaClientJNI.create(conn, objectId, value.length, metadata); + buf.put(value); + PlasmaClientJNI.seal(conn, objectId); + PlasmaClientJNI.release(conn, objectId); + } + + @Override + public List<byte[]> get(byte[][] objectIds, int timeoutMs, boolean isMetadata) { + ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs); + assert bufs.length == objectIds.length; + + List<byte[]> ret = new ArrayList<>(); + for (int i = 0; i < bufs.length; i++) { + ByteBuffer buf = bufs[i][isMetadata ? 1 : 0]; + if (buf == null) { + ret.add(null); + } else { + byte[] bb = new byte[buf.remaining()]; + buf.get(bb); + ret.add(bb); + } + } + return ret; + } + + @Override + public byte[] hash(byte[] objectId) { + return PlasmaClientJNI.hash(conn, objectId); + } + + @Override + public List<ObjectStoreData> get(byte[][] objectIds, int timeoutMs) { + ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs); + assert bufs.length == objectIds.length; + + List<ObjectStoreData> ret = new ArrayList<>(); + for (int i = 0; i < bufs.length; i++) { + ByteBuffer databuf = bufs[i][0]; + ByteBuffer metabuf = bufs[i][1]; + if (databuf == null) { + ret.add(new ObjectStoreData(null, null)); + } else { + byte[] data = new byte[databuf.remaining()]; + databuf.get(data); + byte[] meta; + if (metabuf != null) { + meta = new byte[metabuf.remaining()]; + metabuf.get(meta); + } else { + meta = null; + } + ret.add(new ObjectStoreData(meta, data)); + } + } + return ret; + } + + /** + * Get an object in Plasma Store with objectId. Will return an off-heap ByteBuffer. + * + * @param objectId used to identify an object. + * @param timeoutMs time in milliseconfs to wait before this request time out. + * @param isMetadata get this object's metadata or data. + */ + public ByteBuffer getObjAsByteBuffer(byte[] objectId, int timeoutMs, boolean isMetadata) { + byte[][] objectIds = new byte[][]{objectId}; + ByteBuffer[][] bufs = PlasmaClientJNI.get(conn, objectIds, timeoutMs); + return bufs[0][isMetadata ? 1 : 0]; + } + + @Override + public List<byte[]> list() { + return Arrays.asList(PlasmaClientJNI.list(conn)); + } + + @Override + public long evict(long numBytes) { + return PlasmaClientJNI.evict(conn, numBytes); + } + + // wrapper methods -------------------- + + /** + * Create an object in Plasma Store with particular size. Will return an off-heap ByteBuffer. + * + * @param objectId used to identify an object. + * @param size size in bytes to be allocated for this object. + * @param metadata this object's metadata. It should be null if there is no metadata. + */ + public ByteBuffer create(byte[] objectId, int size, byte[] metadata) + throws DuplicateObjectException, PlasmaOutOfMemoryException { + return PlasmaClientJNI.create(conn, objectId, size, metadata); + } + + /** + * Seal the buffer in the PlasmaStore for a particular object ID. + * Once a buffer has been sealed, the buffer is immutable and can only be accessed through get. + * + * @param objectId used to identify an object. + */ + public void seal(byte[] objectId) { + PlasmaClientJNI.seal(conn, objectId); + } + + /** + * Notify Plasma that the object is no longer needed. + * + * @param objectId used to identify an object. + */ + public void release(byte[] objectId) { + PlasmaClientJNI.release(conn, objectId); + } + + /** + * Removes object with given objectId from plasma store. + * + * @param objectId used to identify an object. + */ + @Override + public void delete(byte[] objectId) { + PlasmaClientJNI.delete(conn, objectId); + } + + /** + * Check if the object is present and has been sealed in the PlasmaStore. + * + * @param objectId used to identify an object. + */ + @Override + public boolean contains(byte[] objectId) { + return PlasmaClientJNI.contains(conn, objectId); + } +} diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java new file mode 100644 index 000000000..da5c17e6b --- /dev/null +++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/PlasmaClientJNI.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma; + +import java.nio.ByteBuffer; + +import org.apache.arrow.plasma.exceptions.DuplicateObjectException; +import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException; + +/** + * JNI static methods for PlasmaClient. + */ +public class PlasmaClientJNI { + + public static native long connect(String storeSocketName, String managerSocketName, int releaseDelay); + + public static native void disconnect(long conn); + + public static native ByteBuffer create(long conn, byte[] objectId, int size, byte[] metadata) + throws DuplicateObjectException, PlasmaOutOfMemoryException; + + public static native byte[] hash(long conn, byte[] objectId); + + public static native void seal(long conn, byte[] objectId); + + public static native void release(long conn, byte[] objectId); + + public static native ByteBuffer[][] get(long conn, byte[][] objectIds, int timeoutMs); + + public static native void delete(long conn, byte[] objectId); + + public static native boolean contains(long conn, byte[] objectId); + + public static native void fetch(long conn, byte[][] objectIds); + + public static native byte[][] wait(long conn, byte[][] objectIds, int timeoutMs, + int numReturns); + + public static native long evict(long conn, long numBytes); + + public static native byte[][] list(long conn); +} diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/DuplicateObjectException.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/DuplicateObjectException.java new file mode 100644 index 000000000..cb735282c --- /dev/null +++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/DuplicateObjectException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma.exceptions; + +/** + * Thrown when attempting to place an object into the store for an ID that already exists. + */ +public class DuplicateObjectException extends RuntimeException { + + public DuplicateObjectException(String objectId) { + super("An object with ID " + objectId + " already exists in the plasma store."); + } + + public DuplicateObjectException(String objectId, Throwable t) { + super("An object with ID " + objectId + " already exists in the plasma store.", t); + } +} diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java new file mode 100644 index 000000000..ff9d96b17 --- /dev/null +++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaClientException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma.exceptions; + +/** + * Generic exception thrown by the plasma client (for example on failure to connect). + */ +public class PlasmaClientException extends RuntimeException { + + public PlasmaClientException(String message) { + super(message); + } + + public PlasmaClientException(String message, Throwable t) { + super(message, t); + } +} diff --git a/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException.java b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException.java new file mode 100644 index 000000000..ffc4177eb --- /dev/null +++ b/src/arrow/java/plasma/src/main/java/org/apache/arrow/plasma/exceptions/PlasmaOutOfMemoryException.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma.exceptions; + +/** + * Indicates no more memory is available in Plasma. + */ +public class PlasmaOutOfMemoryException extends RuntimeException { + + public PlasmaOutOfMemoryException(String message) { + super("The plasma store ran out of memory." + message); + } + + public PlasmaOutOfMemoryException(String message, Throwable t) { + super("The plasma store ran out of memory." + message, t); + } + + public PlasmaOutOfMemoryException() { + super("The plasma store ran out of memory."); + } + + public PlasmaOutOfMemoryException(Throwable t) { + super("The plasma store ran out of memory.", t); + } +} diff --git a/src/arrow/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java b/src/arrow/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java new file mode 100644 index 000000000..e02ee51f9 --- /dev/null +++ b/src/arrow/java/plasma/src/test/java/org/apache/arrow/plasma/PlasmaClientTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.plasma; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import org.apache.arrow.plasma.exceptions.DuplicateObjectException; +import org.apache.arrow.plasma.exceptions.PlasmaClientException; +import org.apache.arrow.plasma.exceptions.PlasmaOutOfMemoryException; +import org.junit.Assert; + +public class PlasmaClientTest { + + private String storeSuffix = "/tmp/store"; + + private Process storeProcess; + + private int storePort; + + private ObjectStoreLink pLink; + + + public PlasmaClientTest() throws Exception { + try { + String plasmaStorePath = System.getenv("PLASMA_STORE"); + if (plasmaStorePath == null) { + throw new Exception("Please set plasma store path in env PLASMA_STORE"); + } + + this.startObjectStore(plasmaStorePath); + System.loadLibrary("plasma_java"); + pLink = new PlasmaClient(this.getStoreAddress(), "", 0); + } catch (Throwable t) { + cleanup(); + throw t; + } + + } + + private Process startProcess(String[] cmd) { + ProcessBuilder builder; + List<String> newCmd = Arrays.stream(cmd).filter(s -> s.length() > 0).collect(Collectors.toList()); + builder = new ProcessBuilder(newCmd); + builder.inheritIO(); + Process p = null; + try { + p = builder.start(); + } catch (IOException e) { + e.printStackTrace(); + return null; + } + System.out.println("Start process " + p.hashCode() + " OK, cmd = " + Arrays.toString(cmd).replace(',', ' ')); + return p; + } + + private void startObjectStore(String plasmaStorePath) { + int occupiedMemoryMB = 10; + long memoryBytes = occupiedMemoryMB * 1000000; + int numRetries = 10; + Process p = null; + while (numRetries-- > 0) { + int currentPort = java.util.concurrent.ThreadLocalRandom.current().nextInt(0, 100000); + String name = storeSuffix + currentPort; + String cmd = plasmaStorePath + " -s " + name + " -m " + memoryBytes; + + p = startProcess(cmd.split(" ")); + + if (p != null && p.isAlive()) { + try { + TimeUnit.MILLISECONDS.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (p.isAlive()) { + storePort = currentPort; + break; + } + } + } + + + if (p == null || !p.isAlive()) { + throw new RuntimeException("Start object store failed ..."); + } else { + storeProcess = p; + System.out.println("Start object store success"); + } + } + + private void cleanup() { + if (storeProcess != null && killProcess(storeProcess)) { + System.out.println("Kill plasma store process forcibly"); + } + } + + private static boolean killProcess(Process p) { + if (p.isAlive()) { + p.destroyForcibly(); + return true; + } else { + return false; + } + } + + public void doTest() { + System.out.println("Start test."); + int timeoutMs = 3000; + byte[] id1 = new byte[20]; + Arrays.fill(id1, (byte) 1); + byte[] value1 = new byte[20]; + Arrays.fill(value1, (byte) 11); + pLink.put(id1, value1, null); + + byte[] id2 = new byte[20]; + Arrays.fill(id2, (byte) 2); + byte[] value2 = new byte[20]; + Arrays.fill(value2, (byte) 12); + pLink.put(id2, value2, null); + System.out.println("Plasma java client put test success."); + byte[] getValue1 = pLink.get(id1, timeoutMs, false); + assert Arrays.equals(value1, getValue1); + + byte[] getValue2 = pLink.get(id2, timeoutMs, false); + assert Arrays.equals(value2, getValue2); + System.out.println("Plasma java client get single object test success."); + byte[][] ids = {id1, id2}; + List<byte[]> values = pLink.get(ids, timeoutMs, false); + assert Arrays.equals(values.get(0), value1); + assert Arrays.equals(values.get(1), value2); + System.out.println("Plasma java client get multi-object test success."); + try { + pLink.put(id1, value1, null); + Assert.fail("Fail to throw DuplicateObjectException when put an object into plasma store twice."); + } catch (DuplicateObjectException e) { + System.out.println("Plasma java client put same object twice exception test success."); + } + byte[] id1Hash = pLink.hash(id1); + assert id1Hash != null; + System.out.println("Plasma java client hash test success."); + + boolean exist = pLink.contains(id2); + assert exist; + byte[] id3 = new byte[20]; + Arrays.fill(id3, (byte) 3); + boolean notExist = pLink.contains(id3); + assert !notExist; + System.out.println("Plasma java client contains test success."); + + byte[] id4 = new byte[20]; + Arrays.fill(id4, (byte) 4); + byte[] value4 = new byte[20]; + byte[] meta4 = "META4".getBytes(); + Arrays.fill(value4, (byte) 14); + pLink.put(id4, value4, meta4); + + List<byte[]> existIds = Arrays.asList(id1, id2, id3, id4); + List<byte[]> listIds = pLink.list(); + assert listIds.size() == 4; + for (byte[] existId : existIds) { + boolean found = false; + for (byte[] listId : listIds) { + if (Arrays.equals(listId, existId)) { + found = true; + } + } + assert found; + } + System.out.println("Plasma java client list test success."); + + byte[] id5 = new byte[20]; + Arrays.fill(id5, (byte) 5); + byte[] value5 = new byte[20]; + byte[] meta5 = "META5".getBytes(); + Arrays.fill(value5, (byte) 15); + pLink.put(id5, value5, meta5); + + byte[] getMeta4 = pLink.get(id4, timeoutMs, true); + assert Arrays.equals(meta4, getMeta4); + byte[] getValue4 = pLink.get(id4, timeoutMs, false); + assert Arrays.equals(value4, getValue4); + byte[][] ids4 = new byte[1][]; + ids4[0] = id4; + ObjectStoreLink.ObjectStoreData fullData4 = pLink.get(ids4, timeoutMs).get(0); + assert Arrays.equals(meta4, fullData4.metadata); + assert Arrays.equals(value4, fullData4.data); + + byte[] getMeta5 = pLink.get(id5, timeoutMs, true); + assert Arrays.equals(meta5, getMeta5); + byte[] getValue5 = pLink.get(id5, timeoutMs, false); + assert Arrays.equals(value5, getValue5); + byte[][] ids5 = new byte[1][]; + ids5[0] = id5; + ObjectStoreLink.ObjectStoreData fullData5 = pLink.get(ids5, timeoutMs).get(0); + assert Arrays.equals(meta5, fullData5.metadata); + assert Arrays.equals(value5, fullData5.data); + System.out.println("Plasma java client metadata get test success."); + + byte[] id6 = getArrayFilledWithValue(20, (byte) 6); + byte[] val6 = getArrayFilledWithValue(21, (byte) 6); + pLink.put(id6, val6, null); + assert pLink.contains(id6); + pLink.delete(id6); + assert !pLink.contains(id6); + System.out.println("Plasma java client delete test success."); + + // Test calling shutdown while getting the object. + Thread thread = new Thread(() -> { + try { + TimeUnit.SECONDS.sleep(1); + cleanup(); + } catch (InterruptedException e) { + throw new RuntimeException("Got InterruptedException when sleeping.", e); + } + }); + thread.start(); + + try { + byte[] idNone = new byte[20]; + Arrays.fill(idNone, (byte) 987); + pLink.get(idNone, timeoutMs, false); + Assert.fail("Fail to throw PlasmaClientException when get an object " + + "when object store shutdown."); + } catch (PlasmaClientException e) { + System.out.println(String.format("Expected PlasmaClientException: %s", e)); + } + + try { + thread.join(); + } catch (Exception e) { + System.out.println(String.format("Exception caught: %s", e)); + } + System.out.println("All test success."); + + } + + public void doByteBufferTest() { + System.out.println("Start ByteBuffer test."); + PlasmaClient client = (PlasmaClient) pLink; + byte[] id = new byte[20]; + Arrays.fill(id, (byte) 10); + ByteBuffer buf = client.create(id, 100, null); + assert buf.isDirect(); + for (int i = 0; i < 10; i++) { + buf.putInt(i); + } + client.seal(id); + client.release(id); + // buf is not available now. + assert client.contains(id); + System.out.println("Plasma java client create test success."); + + ByteBuffer buf1 = client.getObjAsByteBuffer(id, -1, false); + assert buf1.limit() == 100; + for (int i = 0; i < 10; i++) { + assert buf1.getInt() == i; + } + System.out.println("Plasma java client getObjAsByteBuffer test success"); + client.release(id); + } + + public void doPlasmaOutOfMemoryExceptionTest() { + System.out.println("Start PlasmaOutOfMemoryException test."); + PlasmaClient client = (PlasmaClient) pLink; + byte[] objectId = new byte[20]; + Arrays.fill(objectId, (byte) 1); + try { + ByteBuffer byteBuffer = client.create(objectId, 200000000, null); + Assert.fail("Fail to create an object, The plasma store ran out of memory."); + } catch (PlasmaOutOfMemoryException e) { + System.out.println(String.format("Expected PlasmaOutOfMemoryException: %s", e)); + System.out.println("PlasmaOutOfMemoryException test success."); + } + } + + private byte[] getArrayFilledWithValue(int arrayLength, byte val) { + byte[] arr = new byte[arrayLength]; + Arrays.fill(arr, val); + return arr; + } + + public String getStoreAddress() { + return storeSuffix + storePort; + } + + public static void main(String[] args) throws Exception { + + PlasmaClientTest plasmaClientTest = new PlasmaClientTest(); + plasmaClientTest.doPlasmaOutOfMemoryExceptionTest(); + plasmaClientTest.doByteBufferTest(); + plasmaClientTest.doTest(); + } + +} diff --git a/src/arrow/java/plasma/src/test/resources/logback.xml b/src/arrow/java/plasma/src/test/resources/logback.xml new file mode 100644 index 000000000..4c54d18a2 --- /dev/null +++ b/src/arrow/java/plasma/src/test/resources/logback.xml @@ -0,0 +1,28 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> + +<configuration> + <statusListener class="ch.qos.logback.core.status.NopStatusListener"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <logger name="org.apache.arrow" additivity="false"> + <level value="info" /> + <appender-ref ref="STDOUT" /> + </logger> + +</configuration> diff --git a/src/arrow/java/plasma/test.sh b/src/arrow/java/plasma/test.sh new file mode 100755 index 000000000..dbfae646c --- /dev/null +++ b/src/arrow/java/plasma/test.sh @@ -0,0 +1,56 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +ROOT_DIR=$(cd "$(dirname "${BASH_SOURCE:-$0}")"; pwd) +unamestr="$(uname)" +if [[ "$unamestr" == "Linux" ]]; then + PARALLEL=$(nproc) +elif [[ "$unamestr" == "Darwin" ]]; then + PARALLEL=$(sysctl -n hw.ncpu) +else + echo "Unrecognized platform." + exit 1 +fi +pushd ../../cpp + if [ ! -d "release" ]; then + mkdir release + fi + pushd release + cmake -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_C_FLAGS="-g -O3" \ + -DCMAKE_CXX_FLAGS="-g -O3" \ + -DARROW_BUILD_TESTS=off \ + -DARROW_HDFS=on \ + -DARROW_BOOST_USE_SHARED=on \ + -DARROW_PYTHON=on \ + -DARROW_PLASMA=on \ + -DPLASMA_PYTHON=on \ + -DARROW_JEMALLOC=off \ + -DARROW_WITH_BROTLI=off \ + -DARROW_WITH_LZ4=off \ + -DARROW_WITH_ZLIB=off \ + -DARROW_WITH_ZSTD=off \ + -DARROW_PLASMA_JAVA_CLIENT=on \ + .. + make VERBOSE=1 -j$PARALLEL + popd +popd + +mvn clean install +export PLASMA_STORE=$ROOT_DIR/../../cpp/release/release/plasma-store-server +java -cp target/test-classes:target/classes -Djava.library.path=$ROOT_DIR/../../cpp/release/release/ org.apache.arrow.plasma.PlasmaClientTest |