/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ module client_pool_test; import core.sync.semaphore : Semaphore; import core.time : Duration, dur; import core.thread : Thread; import std.algorithm; import std.array; import std.conv; import std.exception; import std.getopt; import std.range; import std.stdio; import std.typecons; import std.variant : Variant; import thrift.base; import thrift.async.libevent; import thrift.async.socket; import thrift.codegen.base; import thrift.codegen.async_client; import thrift.codegen.async_client_pool; import thrift.codegen.client; import thrift.codegen.client_pool; import thrift.codegen.processor; import thrift.protocol.base; import thrift.protocol.binary; import thrift.server.base; import thrift.server.simple; import thrift.server.transport.socket; import thrift.transport.base; import thrift.transport.buffered; import thrift.transport.socket; import thrift.util.cancellation; import thrift.util.future; // We use this as our RPC-layer exception here to make sure socket/… problems // (that would usually considered to be RPC layer faults) cause the tests to // fail, even though we are testing the RPC exception handling. class TestServiceException : TException { int port; } interface TestService { int getPort(); alias .TestServiceException TestServiceException; enum methodMeta = [TMethodMeta("getPort", [], [TExceptionMeta("a", 1, "TestServiceException")])]; } // Use some derived service, just to check that the pools handle inheritance // correctly. interface ExTestService : TestService { int[] getPortInArray(); enum methodMeta = [TMethodMeta("getPortInArray", [], [TExceptionMeta("a", 1, "TestServiceException")])]; } class ExTestHandler : ExTestService { this(ushort port, Duration delay, bool failing, bool trace) { this.port = port; this.delay = delay; this.failing = failing; this.trace = trace; } override int getPort() { if (trace) { stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port, delay, failing); } sleep(); failIfEnabled(); return port; } override int[] getPortInArray() { return [getPort()]; } ushort port; Duration delay; bool failing; bool trace; private: void sleep() { if (delay > dur!"hnsecs"(0)) Thread.sleep(delay); } void failIfEnabled() { if (!failing) return; auto e = new TestServiceException; e.port = port; throw e; } } class ServerPreServeHandler : TServerEventHandler { this(Semaphore sem) { sem_ = sem; } override void preServe() { sem_.notify(); } Variant createContext(TProtocol input, TProtocol output) { return Variant.init; } void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {} void preProcess(Variant serverContext, TTransport transport) {} private: Semaphore sem_; } class ServerThread : Thread { this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) { super(&run); handler_ = handler; cancellation_ = cancellation; serverHandler_ = serverHandler; } private: void run() { try { auto protocolFactory = new TBinaryProtocolFactory!(); auto processor = new TServiceProcessor!ExTestService(handler_); auto serverTransport = new TServerSocket(handler_.port); serverTransport.recvTimeout = dur!"seconds"(3); auto transportFactory = new TBufferedTransportFactory; auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory); server.eventHandler = serverHandler_; server.serve(cancellation_); } catch (Exception e) { writefln("Server thread on port %s failed: %s", handler_.port, e); } } ExTestHandler handler_; ServerPreServeHandler serverHandler_; TCancellation cancellation_; } void main(string[] args) { bool trace; ushort port = 9090; getopt(args, "port", &port, "trace", &trace); auto serverCancellation = new TCancellationOrigin; scope (exit) serverCancellation.trigger(); immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6))); // semaphore that will be incremented whenever each server thread has bound and started listening Semaphore sem = new Semaphore(0); version (none) { // Cannot use this due to multiple DMD @@BUG@@s: // 1. »function D main is a nested function and cannot be accessed from array« // when calling array() on the result of the outer map() – would have to // manually do the eager evaluation/array conversion. // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument, // can be worked around by calling array() on the map result first. // 3. Even when using the workarounds for the last two points, the DMD-built // executable crashes when building without (sic!) inlining enabled, // the backtrace points into the first delegate literal. auto handlers = array(map!((args){ return new ExTestHandler(args._0, args._1, args._2, trace); })(zip( ports, map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]), [false, false, false, true, true, true] ))); } else { auto handlers = [ new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace), new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace), new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace), new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace), new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace), new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace) ]; } // Fire up the server threads. foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start(); // wait until all the handlers signal that they're ready to serve foreach (h; handlers) (sem.wait(dur!`seconds`(1))); syncClientPoolTest(ports, handlers); asyncClientPoolTest(ports, handlers); asyncFastestClientPoolTest(ports, handlers); asyncAggregatorTest(ports, handlers); } void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { auto clients = array(map!((a){ return cast(TClientBase!ExTestService)tClient!ExTestService( tBinaryProtocol(new TSocket("127.0.0.1", a)) ); })(ports)); scope(exit) foreach (c; clients) c.outputProtocol.transport.close(); // Try the case where the first client succeeds. { enforce(makePool(clients).getPort() == ports[0]); } // Try the case where all clients fail. { auto pool = makePool(clients[3 .. $]); auto e = cast(TCompoundOperationException)collectException(pool.getPort()); enforce(e); enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), ports[3 .. $])); } // Try the case where the first clients fail, but a later one succeeds. { auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]); enforce(pool.getPortInArray() == [ports[0]]); } // Make sure a client is properly deactivated when it has failed too often. { auto pool = makePool(clients); pool.faultDisableCount = 1; pool.faultDisableDuration = dur!"msecs"(50); handlers[0].failing = true; enforce(pool.getPort() == ports[1]); handlers[0].failing = false; enforce(pool.getPort() == ports[1]); Thread.sleep(dur!"msecs"(50)); enforce(pool.getPort() == ports[0]); } } auto makePool(TClientBase!ExTestService[] clients) { auto p = tClientPool(clients); p.permuteClients = false; p.rpcFaultFilter = (Exception e) { return (cast(TestServiceException)e !is null); }; return p; } void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { auto manager = new TLibeventAsyncManager; scope (exit) manager.stop(dur!"hnsecs"(0)); auto clients = makeAsyncClients(manager, ports); scope(exit) foreach (c; clients) c.transport.close(); // Try the case where the first client succeeds. { enforce(makeAsyncPool(clients).getPort() == ports[0]); } // Try the case where all clients fail. { auto pool = makeAsyncPool(clients[3 .. $]); auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); enforce(e); enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), ports[3 .. $])); } // Try the case where the first clients fail, but a later one succeeds. { auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]); enforce(pool.getPortInArray() == [ports[0]]); } // Make sure a client is properly deactivated when it has failed too often. { auto pool = makeAsyncPool(clients); pool.faultDisableCount = 1; pool.faultDisableDuration = dur!"msecs"(50); handlers[0].failing = true; enforce(pool.getPort() == ports[1]); handlers[0].failing = false; enforce(pool.getPort() == ports[1]); Thread.sleep(dur!"msecs"(50)); enforce(pool.getPort() == ports[0]); } } auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) { auto p = tAsyncClientPool(clients); p.permuteClients = false; p.rpcFaultFilter = (Exception e) { return (cast(TestServiceException)e !is null); }; return p; } auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) { // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads // to »function D main is a nested function and cannot be accessed from array«. // Thus, we manually do the array conversion. auto lazyClients = map!((a){ return new TAsyncClient!ExTestService( new TAsyncSocket(manager, "127.0.0.1", a), new TBufferedTransportFactory, new TBinaryProtocolFactory!(TBufferedTransport) ); })(ports); TAsyncClientBase!ExTestService[] clients; foreach (c; lazyClients) clients ~= c; return clients; } void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { auto manager = new TLibeventAsyncManager; scope (exit) manager.stop(dur!"hnsecs"(0)); auto clients = makeAsyncClients(manager, ports); scope(exit) foreach (c; clients) c.transport.close(); // Make sure the fastest client wins, even if they are called in some other // order. { auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet(); enforce(result == ports[0]); } // Try the case where all clients fail. { auto pool = makeAsyncFastestPool(clients[3 .. $]); auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); enforce(e); enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), ports[3 .. $])); } // Try the case where the first clients fail, but a later one succeeds. { auto pool = makeAsyncFastestPool(clients[1 .. $]); enforce(pool.getPortInArray() == [ports[1]]); } } auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) { auto p = tAsyncFastestClientPool(clients); p.rpcFaultFilter = (Exception e) { return (cast(TestServiceException)e !is null); }; return p; } void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) { auto manager = new TLibeventAsyncManager; scope (exit) manager.stop(dur!"hnsecs"(0)); auto clients = makeAsyncClients(manager, ports); scope(exit) foreach (c; clients) c.transport.close(); auto aggregator = tAsyncAggregator( cast(TAsyncClientBase!ExTestService[])clients); // Test aggregator range interface. { auto range = aggregator.getPort().range(dur!"msecs"(50)); enforce(equal(range, ports[0 .. 2][])); enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions), ports[3 .. $ - 1])); enforce(range.completedCount == 4); } // Test default accumulator for scalars. { auto fullResult = aggregator.getPort().accumulate(); enforce(fullResult.waitGet() == ports[0 .. 3]); auto partialResult = aggregator.getPort().accumulate(); Thread.sleep(dur!"msecs"(20)); enforce(partialResult.finishGet() == ports[0 .. 2]); } // Test default accumulator for arrays. { auto fullResult = aggregator.getPortInArray().accumulate(); enforce(fullResult.waitGet() == ports[0 .. 3]); auto partialResult = aggregator.getPortInArray().accumulate(); Thread.sleep(dur!"msecs"(20)); enforce(partialResult.finishGet() == ports[0 .. 2]); } // Test custom accumulator. { auto fullResult = aggregator.getPort().accumulate!(function(int[] results){ return reduce!"a + b"(results); })(); enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]); auto partialResult = aggregator.getPort().accumulate!( function(int[] results, Exception[] exceptions) { // Return a tuple of the parameters so we can check them outside of // this function (to verify the values, we need access to »ports«, but // due to DMD @@BUG5710@@, we can't use a delegate literal).f return tuple(results, exceptions); } )(); Thread.sleep(dur!"msecs"(20)); auto resultTuple = partialResult.finishGet(); enforce(resultTuple[0] == ports[0 .. 2]); enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]), ports[3 .. $ - 1])); } }