From e6918187568dbd01842d8d1d2c808ce16a894239 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sun, 21 Apr 2024 13:54:28 +0200 Subject: Adding upstream version 18.2.2. Signed-off-by: Daniel Baumann --- src/arrow/java/flight/flight-grpc/pom.xml | 132 ++++++++++++++ .../org/apache/arrow/flight/FlightGrpcUtils.java | 161 +++++++++++++++++ .../apache/arrow/flight/TestFlightGrpcUtils.java | 193 +++++++++++++++++++++ .../flight-grpc/src/test/protobuf/test.proto | 26 +++ 4 files changed, 512 insertions(+) create mode 100644 src/arrow/java/flight/flight-grpc/pom.xml create mode 100644 src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java create mode 100644 src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java create mode 100644 src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto (limited to 'src/arrow/java/flight/flight-grpc') diff --git a/src/arrow/java/flight/flight-grpc/pom.xml b/src/arrow/java/flight/flight-grpc/pom.xml new file mode 100644 index 000000000..1968484a1 --- /dev/null +++ b/src/arrow/java/flight/flight-grpc/pom.xml @@ -0,0 +1,132 @@ + + + + + arrow-java-root + org.apache.arrow + 6.0.1 + ../../pom.xml + + 4.0.0 + + flight-grpc + Arrow Flight GRPC + (Experimental)Contains utility class to expose Flight gRPC service and client + jar + + + 1.41.0 + 3.7.1 + 1 + + + + + org.apache.arrow + flight-core + ${project.version} + + + io.netty + netty-transport-native-unix-common + + + io.netty + netty-transport-native-kqueue + + + io.netty + netty-transport-native-epoll + + + + + io.grpc + grpc-core + ${dep.grpc.version} + + + io.grpc + grpc-stub + ${dep.grpc.version} + + + org.apache.arrow + arrow-memory-core + ${project.version} + compile + + + org.apache.arrow + arrow-memory-netty + ${project.version} + runtime + + + io.grpc + grpc-protobuf + ${dep.grpc.version} + + + com.google.guava + guava + + + com.google.protobuf + protobuf-java + ${dep.protobuf.version} + + + io.grpc + grpc-api + ${dep.grpc.version} + + + + + + + + kr.motd.maven + os-maven-plugin + 1.5.0.Final + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.5.0 + + com.google.protobuf:protoc:${dep.protobuf.version}:exe:${os.detected.classifier} + false + grpc-java + io.grpc:protoc-gen-grpc-java:${dep.grpc.version}:exe:${os.detected.classifier} + + + + test + + ${basedir}/src/test/protobuf + ${project.build.directory}/generated-test-sources//protobuf + + + compile + compile-custom + + + + + + + + diff --git a/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java b/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java new file mode 100644 index 000000000..eb5e492b4 --- /dev/null +++ b/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.arrow.flight.auth.ServerAuthHandler; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.VisibleForTesting; + +import io.grpc.BindableService; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; + +/** + * Exposes Flight GRPC service & client. + */ +public class FlightGrpcUtils { + /** + * Proxy class for ManagedChannel that makes closure a no-op. + */ + @VisibleForTesting + static class NonClosingProxyManagedChannel extends ManagedChannel { + private final ManagedChannel channel; + private boolean isShutdown; + + NonClosingProxyManagedChannel(ManagedChannel channel) { + this.channel = channel; + this.isShutdown = channel.isShutdown(); + } + + @Override + public ManagedChannel shutdown() { + isShutdown = true; + return this; + } + + @Override + public boolean isShutdown() { + if (this.channel.isShutdown()) { + // If the underlying channel is shut down, ensure we're updated to match. + shutdown(); + } + return isShutdown; + } + + @Override + public boolean isTerminated() { + return this.isShutdown(); + } + + @Override + public ManagedChannel shutdownNow() { + return shutdown(); + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) { + // Don't actually await termination, since it'll be a no-op, so simply return whether or not + // the channel has been shut down already. + return this.isShutdown(); + } + + @Override + public ClientCall newCall( + MethodDescriptor methodDescriptor, CallOptions callOptions) { + if (this.isShutdown()) { + throw new IllegalStateException("Channel has been shut down."); + } + + return this.channel.newCall(methodDescriptor, callOptions); + } + + @Override + public String authority() { + return this.channel.authority(); + } + + @Override + public ConnectivityState getState(boolean requestConnection) { + if (this.isShutdown()) { + return ConnectivityState.SHUTDOWN; + } + + return this.channel.getState(requestConnection); + } + + @Override + public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) { + // The proxy has no insight into the underlying channel state changes, so we'll have to leak the abstraction + // a bit here and simply pass to the underlying channel, even though it will never transition to shutdown via + // the proxy. This should be fine, since it's mainly targeted at the FlightClient and there's no getter for + // the channel. + this.channel.notifyWhenStateChanged(source, callback); + } + + @Override + public void resetConnectBackoff() { + this.channel.resetConnectBackoff(); + } + + @Override + public void enterIdle() { + this.channel.enterIdle(); + } + } + + private FlightGrpcUtils() {} + + /** + * Creates a Flight service. + * @param allocator Memory allocator + * @param producer Specifies the service api + * @param authHandler Authentication handler + * @param executor Executor service + * @return FlightBindingService + */ + public static BindableService createFlightService(BufferAllocator allocator, FlightProducer producer, + ServerAuthHandler authHandler, ExecutorService executor) { + return new FlightBindingService(allocator, producer, authHandler, executor); + } + + /** + * Creates a Flight client. + * @param incomingAllocator Memory allocator + * @param channel provides a connection to a gRPC server. + */ + public static FlightClient createFlightClient(BufferAllocator incomingAllocator, ManagedChannel channel) { + return new FlightClient(incomingAllocator, channel, Collections.emptyList()); + } + + /** + * Creates a Flight client. + * @param incomingAllocator Memory allocator + * @param channel provides a connection to a gRPC server. Will not be closed on closure of the returned FlightClient. + */ + public static FlightClient createFlightClientWithSharedChannel( + BufferAllocator incomingAllocator, ManagedChannel channel) { + return new FlightClient(incomingAllocator, new NonClosingProxyManagedChannel(channel), Collections.emptyList()); + } +} diff --git a/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java b/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java new file mode 100644 index 000000000..142a0f937 --- /dev/null +++ b/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.arrow.flight.auth.ServerAuthHandler; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.protobuf.Empty; + +import io.grpc.BindableService; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; + +/** + * Unit test which adds 2 services to same server end point. + */ +public class TestFlightGrpcUtils { + private Server server; + private BufferAllocator allocator; + private String serverName; + + @Before + public void setup() throws IOException { + //Defines flight service + allocator = new RootAllocator(Integer.MAX_VALUE); + final NoOpFlightProducer producer = new NoOpFlightProducer(); + final ServerAuthHandler authHandler = ServerAuthHandler.NO_OP; + final ExecutorService exec = Executors.newCachedThreadPool(); + final BindableService flightBindingService = FlightGrpcUtils.createFlightService(allocator, producer, + authHandler, exec); + + //initializes server with 2 services - FlightBindingService & TestService + serverName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(flightBindingService) + .addService(new TestServiceAdapter()) + .build(); + server.start(); + } + + @After + public void cleanup() { + server.shutdownNow(); + } + + /** + * This test checks if multiple gRPC services can be added to the same + * server endpoint and if they can be used by different clients via the same channel. + * @throws IOException If server fails to start. + */ + @Test + public void testMultipleGrpcServices() throws IOException { + //Initializes channel so that multiple clients can communicate with server + final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName) + .directExecutor() + .build(); + + //Defines flight client and calls service method. Since we use a NoOpFlightProducer we expect the service + //to throw a RunTimeException + final FlightClient flightClient = FlightGrpcUtils.createFlightClient(allocator, managedChannel); + final Iterable actionTypes = flightClient.listActions(); + assertThrows(FlightRuntimeException.class, () -> actionTypes.forEach( + actionType -> System.out.println(actionType.toString()))); + + //Define Test client as a blocking stub and call test method which correctly returns an empty protobuf object + final TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(managedChannel); + Assert.assertEquals(Empty.newBuilder().build(), blockingStub.test(Empty.newBuilder().build())); + } + + @Test + public void testShutdown() throws IOException, InterruptedException { + //Initializes channel so that multiple clients can communicate with server + final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName) + .directExecutor() + .build(); + + //Defines flight client and calls service method. Since we use a NoOpFlightProducer we expect the service + //to throw a RunTimeException + final FlightClient flightClient = FlightGrpcUtils.createFlightClientWithSharedChannel(allocator, managedChannel); + + // Should be a no-op. + flightClient.close(); + Assert.assertFalse(managedChannel.isShutdown()); + Assert.assertFalse(managedChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.IDLE, managedChannel.getState(false)); + managedChannel.shutdownNow(); + } + + @Test + public void testProxyChannel() throws IOException, InterruptedException { + //Initializes channel so that multiple clients can communicate with server + final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName) + .directExecutor() + .build(); + + final FlightGrpcUtils.NonClosingProxyManagedChannel proxyChannel = + new FlightGrpcUtils.NonClosingProxyManagedChannel(managedChannel); + Assert.assertFalse(proxyChannel.isShutdown()); + Assert.assertFalse(proxyChannel.isTerminated()); + proxyChannel.shutdown(); + Assert.assertTrue(proxyChannel.isShutdown()); + Assert.assertTrue(proxyChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.SHUTDOWN, proxyChannel.getState(false)); + try { + proxyChannel.newCall(null, null); + Assert.fail(); + } catch (IllegalStateException e) { + // This is expected, since the proxy channel is shut down. + } + + Assert.assertFalse(managedChannel.isShutdown()); + Assert.assertFalse(managedChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.IDLE, managedChannel.getState(false)); + + managedChannel.shutdownNow(); + } + + @Test + public void testProxyChannelWithClosedChannel() throws IOException, InterruptedException { + //Initializes channel so that multiple clients can communicate with server + final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName) + .directExecutor() + .build(); + + final FlightGrpcUtils.NonClosingProxyManagedChannel proxyChannel = + new FlightGrpcUtils.NonClosingProxyManagedChannel(managedChannel); + Assert.assertFalse(proxyChannel.isShutdown()); + Assert.assertFalse(proxyChannel.isTerminated()); + managedChannel.shutdownNow(); + Assert.assertTrue(proxyChannel.isShutdown()); + Assert.assertTrue(proxyChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.SHUTDOWN, proxyChannel.getState(false)); + try { + proxyChannel.newCall(null, null); + Assert.fail(); + } catch (IllegalStateException e) { + // This is expected, since the proxy channel is shut down. + } + + Assert.assertTrue(managedChannel.isShutdown()); + Assert.assertTrue(managedChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.SHUTDOWN, managedChannel.getState(false)); + } + + /** + * Private class used for testing purposes that overrides service behavior. + */ + private class TestServiceAdapter extends TestServiceGrpc.TestServiceImplBase { + + /** + * gRPC service that receives an empty object & returns and empty protobuf object. + * @param request google.protobuf.Empty + * @param responseObserver google.protobuf.Empty + */ + @Override + public void test(Empty request, StreamObserver responseObserver) { + responseObserver.onNext(Empty.newBuilder().build()); + responseObserver.onCompleted(); + } + } +} + diff --git a/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto b/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto new file mode 100644 index 000000000..6fa1890b2 --- /dev/null +++ b/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_package = "org.apache.arrow.flight"; + +import "google/protobuf/empty.proto"; + +service TestService { + rpc Test(google.protobuf.Empty) returns (google.protobuf.Empty) {} +} -- cgit v1.2.3