summaryrefslogtreecommitdiffstats
path: root/src/arrow/java/flight/flight-grpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/arrow/java/flight/flight-grpc')
-rw-r--r--src/arrow/java/flight/flight-grpc/pom.xml132
-rw-r--r--src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java161
-rw-r--r--src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java193
-rw-r--r--src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto26
4 files changed, 512 insertions, 0 deletions
diff --git a/src/arrow/java/flight/flight-grpc/pom.xml b/src/arrow/java/flight/flight-grpc/pom.xml
new file mode 100644
index 000000000..1968484a1
--- /dev/null
+++ b/src/arrow/java/flight/flight-grpc/pom.xml
@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>arrow-java-root</artifactId>
+ <groupId>org.apache.arrow</groupId>
+ <version>6.0.1</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flight-grpc</artifactId>
+ <name>Arrow Flight GRPC</name>
+ <description>(Experimental)Contains utility class to expose Flight gRPC service and client</description>
+ <packaging>jar</packaging>
+
+ <properties>
+ <dep.grpc.version>1.41.0</dep.grpc.version>
+ <dep.protobuf.version>3.7.1</dep.protobuf.version>
+ <forkCount>1</forkCount>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>flight-core</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-unix-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-kqueue</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ <version>${dep.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${dep.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-core</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-netty</artifactId>
+ <version>${project.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${dep.grpc.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${dep.protobuf.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-api</artifactId>
+ <version>${dep.grpc.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <extensions>
+ <!-- provides os.detected.classifier (i.e. linux-x86_64, osx-x86_64) property -->
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.5.0.Final</version>
+ </extension>
+ </extensions>
+ <plugins>
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:${dep.protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
+ <clearOutputDirectory>false</clearOutputDirectory>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:${dep.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <id>test</id>
+ <configuration>
+ <protoSourceRoot>${basedir}/src/test/protobuf</protoSourceRoot>
+ <outputDirectory>${project.build.directory}/generated-test-sources//protobuf</outputDirectory>
+ </configuration>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java b/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java
new file mode 100644
index 000000000..eb5e492b4
--- /dev/null
+++ b/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.arrow.flight.auth.ServerAuthHandler;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.VisibleForTesting;
+
+import io.grpc.BindableService;
+import io.grpc.CallOptions;
+import io.grpc.ClientCall;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import io.grpc.MethodDescriptor;
+
+/**
+ * Exposes Flight GRPC service & client.
+ */
+public class FlightGrpcUtils {
+ /**
+ * Proxy class for ManagedChannel that makes closure a no-op.
+ */
+ @VisibleForTesting
+ static class NonClosingProxyManagedChannel extends ManagedChannel {
+ private final ManagedChannel channel;
+ private boolean isShutdown;
+
+ NonClosingProxyManagedChannel(ManagedChannel channel) {
+ this.channel = channel;
+ this.isShutdown = channel.isShutdown();
+ }
+
+ @Override
+ public ManagedChannel shutdown() {
+ isShutdown = true;
+ return this;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ if (this.channel.isShutdown()) {
+ // If the underlying channel is shut down, ensure we're updated to match.
+ shutdown();
+ }
+ return isShutdown;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return this.isShutdown();
+ }
+
+ @Override
+ public ManagedChannel shutdownNow() {
+ return shutdown();
+ }
+
+ @Override
+ public boolean awaitTermination(long l, TimeUnit timeUnit) {
+ // Don't actually await termination, since it'll be a no-op, so simply return whether or not
+ // the channel has been shut down already.
+ return this.isShutdown();
+ }
+
+ @Override
+ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
+ MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
+ if (this.isShutdown()) {
+ throw new IllegalStateException("Channel has been shut down.");
+ }
+
+ return this.channel.newCall(methodDescriptor, callOptions);
+ }
+
+ @Override
+ public String authority() {
+ return this.channel.authority();
+ }
+
+ @Override
+ public ConnectivityState getState(boolean requestConnection) {
+ if (this.isShutdown()) {
+ return ConnectivityState.SHUTDOWN;
+ }
+
+ return this.channel.getState(requestConnection);
+ }
+
+ @Override
+ public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
+ // The proxy has no insight into the underlying channel state changes, so we'll have to leak the abstraction
+ // a bit here and simply pass to the underlying channel, even though it will never transition to shutdown via
+ // the proxy. This should be fine, since it's mainly targeted at the FlightClient and there's no getter for
+ // the channel.
+ this.channel.notifyWhenStateChanged(source, callback);
+ }
+
+ @Override
+ public void resetConnectBackoff() {
+ this.channel.resetConnectBackoff();
+ }
+
+ @Override
+ public void enterIdle() {
+ this.channel.enterIdle();
+ }
+ }
+
+ private FlightGrpcUtils() {}
+
+ /**
+ * Creates a Flight service.
+ * @param allocator Memory allocator
+ * @param producer Specifies the service api
+ * @param authHandler Authentication handler
+ * @param executor Executor service
+ * @return FlightBindingService
+ */
+ public static BindableService createFlightService(BufferAllocator allocator, FlightProducer producer,
+ ServerAuthHandler authHandler, ExecutorService executor) {
+ return new FlightBindingService(allocator, producer, authHandler, executor);
+ }
+
+ /**
+ * Creates a Flight client.
+ * @param incomingAllocator Memory allocator
+ * @param channel provides a connection to a gRPC server.
+ */
+ public static FlightClient createFlightClient(BufferAllocator incomingAllocator, ManagedChannel channel) {
+ return new FlightClient(incomingAllocator, channel, Collections.emptyList());
+ }
+
+ /**
+ * Creates a Flight client.
+ * @param incomingAllocator Memory allocator
+ * @param channel provides a connection to a gRPC server. Will not be closed on closure of the returned FlightClient.
+ */
+ public static FlightClient createFlightClientWithSharedChannel(
+ BufferAllocator incomingAllocator, ManagedChannel channel) {
+ return new FlightClient(incomingAllocator, new NonClosingProxyManagedChannel(channel), Collections.emptyList());
+ }
+}
diff --git a/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java b/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java
new file mode 100644
index 000000000..142a0f937
--- /dev/null
+++ b/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.arrow.flight.auth.ServerAuthHandler;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.protobuf.Empty;
+
+import io.grpc.BindableService;
+import io.grpc.ConnectivityState;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * Unit test which adds 2 services to same server end point.
+ */
+public class TestFlightGrpcUtils {
+ private Server server;
+ private BufferAllocator allocator;
+ private String serverName;
+
+ @Before
+ public void setup() throws IOException {
+ //Defines flight service
+ allocator = new RootAllocator(Integer.MAX_VALUE);
+ final NoOpFlightProducer producer = new NoOpFlightProducer();
+ final ServerAuthHandler authHandler = ServerAuthHandler.NO_OP;
+ final ExecutorService exec = Executors.newCachedThreadPool();
+ final BindableService flightBindingService = FlightGrpcUtils.createFlightService(allocator, producer,
+ authHandler, exec);
+
+ //initializes server with 2 services - FlightBindingService & TestService
+ serverName = InProcessServerBuilder.generateName();
+ server = InProcessServerBuilder.forName(serverName)
+ .directExecutor()
+ .addService(flightBindingService)
+ .addService(new TestServiceAdapter())
+ .build();
+ server.start();
+ }
+
+ @After
+ public void cleanup() {
+ server.shutdownNow();
+ }
+
+ /**
+ * This test checks if multiple gRPC services can be added to the same
+ * server endpoint and if they can be used by different clients via the same channel.
+ * @throws IOException If server fails to start.
+ */
+ @Test
+ public void testMultipleGrpcServices() throws IOException {
+ //Initializes channel so that multiple clients can communicate with server
+ final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName)
+ .directExecutor()
+ .build();
+
+ //Defines flight client and calls service method. Since we use a NoOpFlightProducer we expect the service
+ //to throw a RunTimeException
+ final FlightClient flightClient = FlightGrpcUtils.createFlightClient(allocator, managedChannel);
+ final Iterable<ActionType> actionTypes = flightClient.listActions();
+ assertThrows(FlightRuntimeException.class, () -> actionTypes.forEach(
+ actionType -> System.out.println(actionType.toString())));
+
+ //Define Test client as a blocking stub and call test method which correctly returns an empty protobuf object
+ final TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(managedChannel);
+ Assert.assertEquals(Empty.newBuilder().build(), blockingStub.test(Empty.newBuilder().build()));
+ }
+
+ @Test
+ public void testShutdown() throws IOException, InterruptedException {
+ //Initializes channel so that multiple clients can communicate with server
+ final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName)
+ .directExecutor()
+ .build();
+
+ //Defines flight client and calls service method. Since we use a NoOpFlightProducer we expect the service
+ //to throw a RunTimeException
+ final FlightClient flightClient = FlightGrpcUtils.createFlightClientWithSharedChannel(allocator, managedChannel);
+
+ // Should be a no-op.
+ flightClient.close();
+ Assert.assertFalse(managedChannel.isShutdown());
+ Assert.assertFalse(managedChannel.isTerminated());
+ Assert.assertEquals(ConnectivityState.IDLE, managedChannel.getState(false));
+ managedChannel.shutdownNow();
+ }
+
+ @Test
+ public void testProxyChannel() throws IOException, InterruptedException {
+ //Initializes channel so that multiple clients can communicate with server
+ final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName)
+ .directExecutor()
+ .build();
+
+ final FlightGrpcUtils.NonClosingProxyManagedChannel proxyChannel =
+ new FlightGrpcUtils.NonClosingProxyManagedChannel(managedChannel);
+ Assert.assertFalse(proxyChannel.isShutdown());
+ Assert.assertFalse(proxyChannel.isTerminated());
+ proxyChannel.shutdown();
+ Assert.assertTrue(proxyChannel.isShutdown());
+ Assert.assertTrue(proxyChannel.isTerminated());
+ Assert.assertEquals(ConnectivityState.SHUTDOWN, proxyChannel.getState(false));
+ try {
+ proxyChannel.newCall(null, null);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // This is expected, since the proxy channel is shut down.
+ }
+
+ Assert.assertFalse(managedChannel.isShutdown());
+ Assert.assertFalse(managedChannel.isTerminated());
+ Assert.assertEquals(ConnectivityState.IDLE, managedChannel.getState(false));
+
+ managedChannel.shutdownNow();
+ }
+
+ @Test
+ public void testProxyChannelWithClosedChannel() throws IOException, InterruptedException {
+ //Initializes channel so that multiple clients can communicate with server
+ final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName)
+ .directExecutor()
+ .build();
+
+ final FlightGrpcUtils.NonClosingProxyManagedChannel proxyChannel =
+ new FlightGrpcUtils.NonClosingProxyManagedChannel(managedChannel);
+ Assert.assertFalse(proxyChannel.isShutdown());
+ Assert.assertFalse(proxyChannel.isTerminated());
+ managedChannel.shutdownNow();
+ Assert.assertTrue(proxyChannel.isShutdown());
+ Assert.assertTrue(proxyChannel.isTerminated());
+ Assert.assertEquals(ConnectivityState.SHUTDOWN, proxyChannel.getState(false));
+ try {
+ proxyChannel.newCall(null, null);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ // This is expected, since the proxy channel is shut down.
+ }
+
+ Assert.assertTrue(managedChannel.isShutdown());
+ Assert.assertTrue(managedChannel.isTerminated());
+ Assert.assertEquals(ConnectivityState.SHUTDOWN, managedChannel.getState(false));
+ }
+
+ /**
+ * Private class used for testing purposes that overrides service behavior.
+ */
+ private class TestServiceAdapter extends TestServiceGrpc.TestServiceImplBase {
+
+ /**
+ * gRPC service that receives an empty object & returns and empty protobuf object.
+ * @param request google.protobuf.Empty
+ * @param responseObserver google.protobuf.Empty
+ */
+ @Override
+ public void test(Empty request, StreamObserver<Empty> responseObserver) {
+ responseObserver.onNext(Empty.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ }
+}
+
diff --git a/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto b/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto
new file mode 100644
index 000000000..6fa1890b2
--- /dev/null
+++ b/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+option java_package = "org.apache.arrow.flight";
+
+import "google/protobuf/empty.proto";
+
+service TestService {
+ rpc Test(google.protobuf.Empty) returns (google.protobuf.Empty) {}
+}