summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/d/test/client_pool_test.d
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/lib/d/test/client_pool_test.d')
-rw-r--r--src/jaegertracing/thrift/lib/d/test/client_pool_test.d442
1 files changed, 442 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/d/test/client_pool_test.d b/src/jaegertracing/thrift/lib/d/test/client_pool_test.d
new file mode 100644
index 000000000..b24c97afd
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/client_pool_test.d
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module client_pool_test;
+
+import core.sync.semaphore : Semaphore;
+import core.time : Duration, dur;
+import core.thread : Thread;
+import std.algorithm;
+import std.array;
+import std.conv;
+import std.exception;
+import std.getopt;
+import std.range;
+import std.stdio;
+import std.typecons;
+import std.variant : Variant;
+import thrift.base;
+import thrift.async.libevent;
+import thrift.async.socket;
+import thrift.codegen.base;
+import thrift.codegen.async_client;
+import thrift.codegen.async_client_pool;
+import thrift.codegen.client;
+import thrift.codegen.client_pool;
+import thrift.codegen.processor;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.server.base;
+import thrift.server.simple;
+import thrift.server.transport.socket;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.socket;
+import thrift.util.cancellation;
+import thrift.util.future;
+
+// We use this as our RPC-layer exception here to make sure socket/… problems
+// (that would usually considered to be RPC layer faults) cause the tests to
+// fail, even though we are testing the RPC exception handling.
+class TestServiceException : TException {
+ int port;
+}
+
+interface TestService {
+ int getPort();
+ alias .TestServiceException TestServiceException;
+ enum methodMeta = [TMethodMeta("getPort", [],
+ [TExceptionMeta("a", 1, "TestServiceException")])];
+}
+
+// Use some derived service, just to check that the pools handle inheritance
+// correctly.
+interface ExTestService : TestService {
+ int[] getPortInArray();
+ enum methodMeta = [TMethodMeta("getPortInArray", [],
+ [TExceptionMeta("a", 1, "TestServiceException")])];
+}
+
+class ExTestHandler : ExTestService {
+ this(ushort port, Duration delay, bool failing, bool trace) {
+ this.port = port;
+ this.delay = delay;
+ this.failing = failing;
+ this.trace = trace;
+ }
+
+ override int getPort() {
+ if (trace) {
+ stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
+ delay, failing);
+ }
+ sleep();
+ failIfEnabled();
+ return port;
+ }
+
+ override int[] getPortInArray() {
+ return [getPort()];
+ }
+
+ ushort port;
+ Duration delay;
+ bool failing;
+ bool trace;
+
+private:
+ void sleep() {
+ if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
+ }
+
+ void failIfEnabled() {
+ if (!failing) return;
+
+ auto e = new TestServiceException;
+ e.port = port;
+ throw e;
+ }
+}
+
+class ServerPreServeHandler : TServerEventHandler {
+ this(Semaphore sem) {
+ sem_ = sem;
+ }
+
+ override void preServe() {
+ sem_.notify();
+ }
+
+ Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
+ void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
+ void preProcess(Variant serverContext, TTransport transport) {}
+
+private:
+ Semaphore sem_;
+}
+
+class ServerThread : Thread {
+ this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) {
+ super(&run);
+ handler_ = handler;
+ cancellation_ = cancellation;
+ serverHandler_ = serverHandler;
+ }
+private:
+ void run() {
+ try {
+ auto protocolFactory = new TBinaryProtocolFactory!();
+ auto processor = new TServiceProcessor!ExTestService(handler_);
+ auto serverTransport = new TServerSocket(handler_.port);
+ serverTransport.recvTimeout = dur!"seconds"(3);
+ auto transportFactory = new TBufferedTransportFactory;
+
+ auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory);
+ server.eventHandler = serverHandler_;
+ server.serve(cancellation_);
+ } catch (Exception e) {
+ writefln("Server thread on port %s failed: %s", handler_.port, e);
+ }
+ }
+
+ ExTestHandler handler_;
+ ServerPreServeHandler serverHandler_;
+ TCancellation cancellation_;
+}
+
+void main(string[] args) {
+ bool trace;
+ ushort port = 9090;
+ getopt(args, "port", &port, "trace", &trace);
+
+ auto serverCancellation = new TCancellationOrigin;
+ scope (exit) serverCancellation.trigger();
+
+ immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
+
+ // semaphore that will be incremented whenever each server thread has bound and started listening
+ Semaphore sem = new Semaphore(0);
+
+version (none) {
+ // Cannot use this due to multiple DMD @@BUG@@s:
+ // 1. »function D main is a nested function and cannot be accessed from array«
+ // when calling array() on the result of the outer map() – would have to
+ // manually do the eager evaluation/array conversion.
+ // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
+ // can be worked around by calling array() on the map result first.
+ // 3. Even when using the workarounds for the last two points, the DMD-built
+ // executable crashes when building without (sic!) inlining enabled,
+ // the backtrace points into the first delegate literal.
+ auto handlers = array(map!((args){
+ return new ExTestHandler(args._0, args._1, args._2, trace);
+ })(zip(
+ ports,
+ map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
+ [false, false, false, true, true, true]
+ )));
+} else {
+ auto handlers = [
+ new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
+ new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
+ new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
+ new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
+ new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
+ new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
+ ];
+}
+
+ // Fire up the server threads.
+ foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start();
+
+ // wait until all the handlers signal that they're ready to serve
+ foreach (h; handlers) (sem.wait(dur!`seconds`(1)));
+
+ syncClientPoolTest(ports, handlers);
+ asyncClientPoolTest(ports, handlers);
+ asyncFastestClientPoolTest(ports, handlers);
+ asyncAggregatorTest(ports, handlers);
+}
+
+
+void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+ auto clients = array(map!((a){
+ return cast(TClientBase!ExTestService)tClient!ExTestService(
+ tBinaryProtocol(new TSocket("127.0.0.1", a))
+ );
+ })(ports));
+
+ scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
+
+ // Try the case where the first client succeeds.
+ {
+ enforce(makePool(clients).getPort() == ports[0]);
+ }
+
+ // Try the case where all clients fail.
+ {
+ auto pool = makePool(clients[3 .. $]);
+ auto e = cast(TCompoundOperationException)collectException(pool.getPort());
+ enforce(e);
+ enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+ ports[3 .. $]));
+ }
+
+ // Try the case where the first clients fail, but a later one succeeds.
+ {
+ auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
+ enforce(pool.getPortInArray() == [ports[0]]);
+ }
+
+ // Make sure a client is properly deactivated when it has failed too often.
+ {
+ auto pool = makePool(clients);
+ pool.faultDisableCount = 1;
+ pool.faultDisableDuration = dur!"msecs"(50);
+
+ handlers[0].failing = true;
+ enforce(pool.getPort() == ports[1]);
+
+ handlers[0].failing = false;
+ enforce(pool.getPort() == ports[1]);
+
+ Thread.sleep(dur!"msecs"(50));
+ enforce(pool.getPort() == ports[0]);
+ }
+}
+
+auto makePool(TClientBase!ExTestService[] clients) {
+ auto p = tClientPool(clients);
+ p.permuteClients = false;
+ p.rpcFaultFilter = (Exception e) {
+ return (cast(TestServiceException)e !is null);
+ };
+ return p;
+}
+
+
+void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+ auto manager = new TLibeventAsyncManager;
+ scope (exit) manager.stop(dur!"hnsecs"(0));
+
+ auto clients = makeAsyncClients(manager, ports);
+ scope(exit) foreach (c; clients) c.transport.close();
+
+ // Try the case where the first client succeeds.
+ {
+ enforce(makeAsyncPool(clients).getPort() == ports[0]);
+ }
+
+ // Try the case where all clients fail.
+ {
+ auto pool = makeAsyncPool(clients[3 .. $]);
+ auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
+ enforce(e);
+ enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+ ports[3 .. $]));
+ }
+
+ // Try the case where the first clients fail, but a later one succeeds.
+ {
+ auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
+ enforce(pool.getPortInArray() == [ports[0]]);
+ }
+
+ // Make sure a client is properly deactivated when it has failed too often.
+ {
+ auto pool = makeAsyncPool(clients);
+ pool.faultDisableCount = 1;
+ pool.faultDisableDuration = dur!"msecs"(50);
+
+ handlers[0].failing = true;
+ enforce(pool.getPort() == ports[1]);
+
+ handlers[0].failing = false;
+ enforce(pool.getPort() == ports[1]);
+
+ Thread.sleep(dur!"msecs"(50));
+ enforce(pool.getPort() == ports[0]);
+ }
+}
+
+auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
+ auto p = tAsyncClientPool(clients);
+ p.permuteClients = false;
+ p.rpcFaultFilter = (Exception e) {
+ return (cast(TestServiceException)e !is null);
+ };
+ return p;
+}
+
+auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
+ // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
+ // to »function D main is a nested function and cannot be accessed from array«.
+ // Thus, we manually do the array conversion.
+ auto lazyClients = map!((a){
+ return new TAsyncClient!ExTestService(
+ new TAsyncSocket(manager, "127.0.0.1", a),
+ new TBufferedTransportFactory,
+ new TBinaryProtocolFactory!(TBufferedTransport)
+ );
+ })(ports);
+ TAsyncClientBase!ExTestService[] clients;
+ foreach (c; lazyClients) clients ~= c;
+ return clients;
+}
+
+
+void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+ auto manager = new TLibeventAsyncManager;
+ scope (exit) manager.stop(dur!"hnsecs"(0));
+
+ auto clients = makeAsyncClients(manager, ports);
+ scope(exit) foreach (c; clients) c.transport.close();
+
+ // Make sure the fastest client wins, even if they are called in some other
+ // order.
+ {
+ auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
+ enforce(result == ports[0]);
+ }
+
+ // Try the case where all clients fail.
+ {
+ auto pool = makeAsyncFastestPool(clients[3 .. $]);
+ auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
+ enforce(e);
+ enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+ ports[3 .. $]));
+ }
+
+ // Try the case where the first clients fail, but a later one succeeds.
+ {
+ auto pool = makeAsyncFastestPool(clients[1 .. $]);
+ enforce(pool.getPortInArray() == [ports[1]]);
+ }
+}
+
+auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
+ auto p = tAsyncFastestClientPool(clients);
+ p.rpcFaultFilter = (Exception e) {
+ return (cast(TestServiceException)e !is null);
+ };
+ return p;
+}
+
+
+void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+ auto manager = new TLibeventAsyncManager;
+ scope (exit) manager.stop(dur!"hnsecs"(0));
+
+ auto clients = makeAsyncClients(manager, ports);
+ scope(exit) foreach (c; clients) c.transport.close();
+
+ auto aggregator = tAsyncAggregator(
+ cast(TAsyncClientBase!ExTestService[])clients);
+
+ // Test aggregator range interface.
+ {
+ auto range = aggregator.getPort().range(dur!"msecs"(50));
+ enforce(equal(range, ports[0 .. 2][]));
+ enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
+ ports[3 .. $ - 1]));
+ enforce(range.completedCount == 4);
+ }
+
+ // Test default accumulator for scalars.
+ {
+ auto fullResult = aggregator.getPort().accumulate();
+ enforce(fullResult.waitGet() == ports[0 .. 3]);
+
+ auto partialResult = aggregator.getPort().accumulate();
+ Thread.sleep(dur!"msecs"(20));
+ enforce(partialResult.finishGet() == ports[0 .. 2]);
+
+ }
+
+ // Test default accumulator for arrays.
+ {
+ auto fullResult = aggregator.getPortInArray().accumulate();
+ enforce(fullResult.waitGet() == ports[0 .. 3]);
+
+ auto partialResult = aggregator.getPortInArray().accumulate();
+ Thread.sleep(dur!"msecs"(20));
+ enforce(partialResult.finishGet() == ports[0 .. 2]);
+ }
+
+ // Test custom accumulator.
+ {
+ auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
+ return reduce!"a + b"(results);
+ })();
+ enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
+
+ auto partialResult = aggregator.getPort().accumulate!(
+ function(int[] results, Exception[] exceptions) {
+ // Return a tuple of the parameters so we can check them outside of
+ // this function (to verify the values, we need access to »ports«, but
+ // due to DMD @@BUG5710@@, we can't use a delegate literal).f
+ return tuple(results, exceptions);
+ }
+ )();
+ Thread.sleep(dur!"msecs"(20));
+ auto resultTuple = partialResult.finishGet();
+ enforce(resultTuple[0] == ports[0 .. 2]);
+ enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]),
+ ports[3 .. $ - 1]));
+ }
+}