diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/arrow/java/flight/flight-grpc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/arrow/java/flight/flight-grpc')
4 files changed, 512 insertions, 0 deletions
diff --git a/src/arrow/java/flight/flight-grpc/pom.xml b/src/arrow/java/flight/flight-grpc/pom.xml new file mode 100644 index 000000000..1968484a1 --- /dev/null +++ b/src/arrow/java/flight/flight-grpc/pom.xml @@ -0,0 +1,132 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>arrow-java-root</artifactId> + <groupId>org.apache.arrow</groupId> + <version>6.0.1</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flight-grpc</artifactId> + <name>Arrow Flight GRPC</name> + <description>(Experimental)Contains utility class to expose Flight gRPC service and client</description> + <packaging>jar</packaging> + + <properties> + <dep.grpc.version>1.41.0</dep.grpc.version> + <dep.protobuf.version>3.7.1</dep.protobuf.version> + <forkCount>1</forkCount> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>flight-core</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-unix-common</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-kqueue</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-core</artifactId> + <version>${dep.grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${dep.grpc.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-core</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${project.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${dep.grpc.version}</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${dep.protobuf.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-api</artifactId> + <version>${dep.grpc.version}</version> + </dependency> + </dependencies> + + <build> + <extensions> + <!-- provides os.detected.classifier (i.e. linux-x86_64, osx-x86_64) property --> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.5.0.Final</version> + </extension> + </extensions> + <plugins> + <plugin> + <groupId>org.xolstice.maven.plugins</groupId> + <artifactId>protobuf-maven-plugin</artifactId> + <version>0.5.0</version> + <configuration> + <protocArtifact>com.google.protobuf:protoc:${dep.protobuf.version}:exe:${os.detected.classifier}</protocArtifact> + <clearOutputDirectory>false</clearOutputDirectory> + <pluginId>grpc-java</pluginId> + <pluginArtifact>io.grpc:protoc-gen-grpc-java:${dep.grpc.version}:exe:${os.detected.classifier}</pluginArtifact> + </configuration> + <executions> + <execution> + <id>test</id> + <configuration> + <protoSourceRoot>${basedir}/src/test/protobuf</protoSourceRoot> + <outputDirectory>${project.build.directory}/generated-test-sources//protobuf</outputDirectory> + </configuration> + <goals> + <goal>compile</goal> + <goal>compile-custom</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java b/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java new file mode 100644 index 000000000..eb5e492b4 --- /dev/null +++ b/src/arrow/java/flight/flight-grpc/src/main/java/org/apache/arrow/flight/FlightGrpcUtils.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.arrow.flight.auth.ServerAuthHandler; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.VisibleForTesting; + +import io.grpc.BindableService; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; + +/** + * Exposes Flight GRPC service & client. + */ +public class FlightGrpcUtils { + /** + * Proxy class for ManagedChannel that makes closure a no-op. + */ + @VisibleForTesting + static class NonClosingProxyManagedChannel extends ManagedChannel { + private final ManagedChannel channel; + private boolean isShutdown; + + NonClosingProxyManagedChannel(ManagedChannel channel) { + this.channel = channel; + this.isShutdown = channel.isShutdown(); + } + + @Override + public ManagedChannel shutdown() { + isShutdown = true; + return this; + } + + @Override + public boolean isShutdown() { + if (this.channel.isShutdown()) { + // If the underlying channel is shut down, ensure we're updated to match. + shutdown(); + } + return isShutdown; + } + + @Override + public boolean isTerminated() { + return this.isShutdown(); + } + + @Override + public ManagedChannel shutdownNow() { + return shutdown(); + } + + @Override + public boolean awaitTermination(long l, TimeUnit timeUnit) { + // Don't actually await termination, since it'll be a no-op, so simply return whether or not + // the channel has been shut down already. + return this.isShutdown(); + } + + @Override + public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall( + MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { + if (this.isShutdown()) { + throw new IllegalStateException("Channel has been shut down."); + } + + return this.channel.newCall(methodDescriptor, callOptions); + } + + @Override + public String authority() { + return this.channel.authority(); + } + + @Override + public ConnectivityState getState(boolean requestConnection) { + if (this.isShutdown()) { + return ConnectivityState.SHUTDOWN; + } + + return this.channel.getState(requestConnection); + } + + @Override + public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) { + // The proxy has no insight into the underlying channel state changes, so we'll have to leak the abstraction + // a bit here and simply pass to the underlying channel, even though it will never transition to shutdown via + // the proxy. This should be fine, since it's mainly targeted at the FlightClient and there's no getter for + // the channel. + this.channel.notifyWhenStateChanged(source, callback); + } + + @Override + public void resetConnectBackoff() { + this.channel.resetConnectBackoff(); + } + + @Override + public void enterIdle() { + this.channel.enterIdle(); + } + } + + private FlightGrpcUtils() {} + + /** + * Creates a Flight service. + * @param allocator Memory allocator + * @param producer Specifies the service api + * @param authHandler Authentication handler + * @param executor Executor service + * @return FlightBindingService + */ + public static BindableService createFlightService(BufferAllocator allocator, FlightProducer producer, + ServerAuthHandler authHandler, ExecutorService executor) { + return new FlightBindingService(allocator, producer, authHandler, executor); + } + + /** + * Creates a Flight client. + * @param incomingAllocator Memory allocator + * @param channel provides a connection to a gRPC server. + */ + public static FlightClient createFlightClient(BufferAllocator incomingAllocator, ManagedChannel channel) { + return new FlightClient(incomingAllocator, channel, Collections.emptyList()); + } + + /** + * Creates a Flight client. + * @param incomingAllocator Memory allocator + * @param channel provides a connection to a gRPC server. Will not be closed on closure of the returned FlightClient. + */ + public static FlightClient createFlightClientWithSharedChannel( + BufferAllocator incomingAllocator, ManagedChannel channel) { + return new FlightClient(incomingAllocator, new NonClosingProxyManagedChannel(channel), Collections.emptyList()); + } +} diff --git a/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java b/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java new file mode 100644 index 000000000..142a0f937 --- /dev/null +++ b/src/arrow/java/flight/flight-grpc/src/test/java/org/apache/arrow/flight/TestFlightGrpcUtils.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.arrow.flight.auth.ServerAuthHandler; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.protobuf.Empty; + +import io.grpc.BindableService; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; + +/** + * Unit test which adds 2 services to same server end point. + */ +public class TestFlightGrpcUtils { + private Server server; + private BufferAllocator allocator; + private String serverName; + + @Before + public void setup() throws IOException { + //Defines flight service + allocator = new RootAllocator(Integer.MAX_VALUE); + final NoOpFlightProducer producer = new NoOpFlightProducer(); + final ServerAuthHandler authHandler = ServerAuthHandler.NO_OP; + final ExecutorService exec = Executors.newCachedThreadPool(); + final BindableService flightBindingService = FlightGrpcUtils.createFlightService(allocator, producer, + authHandler, exec); + + //initializes server with 2 services - FlightBindingService & TestService + serverName = InProcessServerBuilder.generateName(); + server = InProcessServerBuilder.forName(serverName) + .directExecutor() + .addService(flightBindingService) + .addService(new TestServiceAdapter()) + .build(); + server.start(); + } + + @After + public void cleanup() { + server.shutdownNow(); + } + + /** + * This test checks if multiple gRPC services can be added to the same + * server endpoint and if they can be used by different clients via the same channel. + * @throws IOException If server fails to start. + */ + @Test + public void testMultipleGrpcServices() throws IOException { + //Initializes channel so that multiple clients can communicate with server + final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName) + .directExecutor() + .build(); + + //Defines flight client and calls service method. Since we use a NoOpFlightProducer we expect the service + //to throw a RunTimeException + final FlightClient flightClient = FlightGrpcUtils.createFlightClient(allocator, managedChannel); + final Iterable<ActionType> actionTypes = flightClient.listActions(); + assertThrows(FlightRuntimeException.class, () -> actionTypes.forEach( + actionType -> System.out.println(actionType.toString()))); + + //Define Test client as a blocking stub and call test method which correctly returns an empty protobuf object + final TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(managedChannel); + Assert.assertEquals(Empty.newBuilder().build(), blockingStub.test(Empty.newBuilder().build())); + } + + @Test + public void testShutdown() throws IOException, InterruptedException { + //Initializes channel so that multiple clients can communicate with server + final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName) + .directExecutor() + .build(); + + //Defines flight client and calls service method. Since we use a NoOpFlightProducer we expect the service + //to throw a RunTimeException + final FlightClient flightClient = FlightGrpcUtils.createFlightClientWithSharedChannel(allocator, managedChannel); + + // Should be a no-op. + flightClient.close(); + Assert.assertFalse(managedChannel.isShutdown()); + Assert.assertFalse(managedChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.IDLE, managedChannel.getState(false)); + managedChannel.shutdownNow(); + } + + @Test + public void testProxyChannel() throws IOException, InterruptedException { + //Initializes channel so that multiple clients can communicate with server + final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName) + .directExecutor() + .build(); + + final FlightGrpcUtils.NonClosingProxyManagedChannel proxyChannel = + new FlightGrpcUtils.NonClosingProxyManagedChannel(managedChannel); + Assert.assertFalse(proxyChannel.isShutdown()); + Assert.assertFalse(proxyChannel.isTerminated()); + proxyChannel.shutdown(); + Assert.assertTrue(proxyChannel.isShutdown()); + Assert.assertTrue(proxyChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.SHUTDOWN, proxyChannel.getState(false)); + try { + proxyChannel.newCall(null, null); + Assert.fail(); + } catch (IllegalStateException e) { + // This is expected, since the proxy channel is shut down. + } + + Assert.assertFalse(managedChannel.isShutdown()); + Assert.assertFalse(managedChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.IDLE, managedChannel.getState(false)); + + managedChannel.shutdownNow(); + } + + @Test + public void testProxyChannelWithClosedChannel() throws IOException, InterruptedException { + //Initializes channel so that multiple clients can communicate with server + final ManagedChannel managedChannel = InProcessChannelBuilder.forName(serverName) + .directExecutor() + .build(); + + final FlightGrpcUtils.NonClosingProxyManagedChannel proxyChannel = + new FlightGrpcUtils.NonClosingProxyManagedChannel(managedChannel); + Assert.assertFalse(proxyChannel.isShutdown()); + Assert.assertFalse(proxyChannel.isTerminated()); + managedChannel.shutdownNow(); + Assert.assertTrue(proxyChannel.isShutdown()); + Assert.assertTrue(proxyChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.SHUTDOWN, proxyChannel.getState(false)); + try { + proxyChannel.newCall(null, null); + Assert.fail(); + } catch (IllegalStateException e) { + // This is expected, since the proxy channel is shut down. + } + + Assert.assertTrue(managedChannel.isShutdown()); + Assert.assertTrue(managedChannel.isTerminated()); + Assert.assertEquals(ConnectivityState.SHUTDOWN, managedChannel.getState(false)); + } + + /** + * Private class used for testing purposes that overrides service behavior. + */ + private class TestServiceAdapter extends TestServiceGrpc.TestServiceImplBase { + + /** + * gRPC service that receives an empty object & returns and empty protobuf object. + * @param request google.protobuf.Empty + * @param responseObserver google.protobuf.Empty + */ + @Override + public void test(Empty request, StreamObserver<Empty> responseObserver) { + responseObserver.onNext(Empty.newBuilder().build()); + responseObserver.onCompleted(); + } + } +} + diff --git a/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto b/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto new file mode 100644 index 000000000..6fa1890b2 --- /dev/null +++ b/src/arrow/java/flight/flight-grpc/src/test/protobuf/test.proto @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_package = "org.apache.arrow.flight"; + +import "google/protobuf/empty.proto"; + +service TestService { + rpc Test(google.protobuf.Empty) returns (google.protobuf.Empty) {} +} |