diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-07 18:45:59 +0000 |
commit | 19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch) | |
tree | 42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/jaegertracing/thrift/lib/d/test/client_pool_test.d | |
parent | Initial commit. (diff) | |
download | ceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.tar.xz ceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.zip |
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/jaegertracing/thrift/lib/d/test/client_pool_test.d | 442 |
1 files changed, 442 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/d/test/client_pool_test.d b/src/jaegertracing/thrift/lib/d/test/client_pool_test.d new file mode 100644 index 000000000..b24c97afd --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/client_pool_test.d @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +module client_pool_test; + +import core.sync.semaphore : Semaphore; +import core.time : Duration, dur; +import core.thread : Thread; +import std.algorithm; +import std.array; +import std.conv; +import std.exception; +import std.getopt; +import std.range; +import std.stdio; +import std.typecons; +import std.variant : Variant; +import thrift.base; +import thrift.async.libevent; +import thrift.async.socket; +import thrift.codegen.base; +import thrift.codegen.async_client; +import thrift.codegen.async_client_pool; +import thrift.codegen.client; +import thrift.codegen.client_pool; +import thrift.codegen.processor; +import thrift.protocol.base; +import thrift.protocol.binary; +import thrift.server.base; +import thrift.server.simple; +import thrift.server.transport.socket; +import thrift.transport.base; +import thrift.transport.buffered; +import thrift.transport.socket; +import thrift.util.cancellation; +import thrift.util.future; + +// We use this as our RPC-layer exception here to make sure socket/… problems +// (that would usually considered to be RPC layer faults) cause the tests to +// fail, even though we are testing the RPC exception handling. +class TestServiceException : TException { + int port; +} + +interface TestService { + int getPort(); + alias .TestServiceException TestServiceException; + enum methodMeta = [TMethodMeta("getPort", [], + [TExceptionMeta("a", 1, "TestServiceException")])]; +} + +// Use some derived service, just to check that the pools handle inheritance +// correctly. +interface ExTestService : TestService { + int[] getPortInArray(); + enum methodMeta = [TMethodMeta("getPortInArray", [], + [TExceptionMeta("a", 1, "TestServiceException")])]; +} + +class ExTestHandler : ExTestService { + this(ushort port, Duration delay, bool failing, bool trace) { + this.port = port; + this.delay = delay; + this.failing = failing; + this.trace = trace; + } + + override int getPort() { + if (trace) { + stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port, + delay, failing); + } + sleep(); + failIfEnabled(); + return port; + } + + override int[] getPortInArray() { + return [getPort()]; + } + + ushort port; + Duration delay; + bool failing; + bool trace; + +private: + void sleep() { + if (delay > dur!"hnsecs"(0)) Thread.sleep(delay); + } + + void failIfEnabled() { + if (!failing) return; + + auto e = new TestServiceException; + e.port = port; + throw e; + } +} + +class ServerPreServeHandler : TServerEventHandler { + this(Semaphore sem) { + sem_ = sem; + } + + override void preServe() { + sem_.notify(); + } + + Variant createContext(TProtocol input, TProtocol output) { return Variant.init; } + void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {} + void preProcess(Variant serverContext, TTransport transport) {} + +private: + Semaphore sem_; +} + +class ServerThread : Thread { + this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) { + super(&run); + handler_ = handler; + cancellation_ = cancellation; + serverHandler_ = serverHandler; + } +private: + void run() { + try { + auto protocolFactory = new TBinaryProtocolFactory!(); + auto processor = new TServiceProcessor!ExTestService(handler_); + auto serverTransport = new TServerSocket(handler_.port); + serverTransport.recvTimeout = dur!"seconds"(3); + auto transportFactory = new TBufferedTransportFactory; + + auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory); + server.eventHandler = serverHandler_; + server.serve(cancellation_); + } catch (Exception e) { + writefln("Server thread on port %s failed: %s", handler_.port, e); + } + } + + ExTestHandler handler_; + ServerPreServeHandler serverHandler_; + TCancellation cancellation_; +} + +void main(string[] args) { + bool trace; + ushort port = 9090; + getopt(args, "port", &port, "trace", &trace); + + auto serverCancellation = new TCancellationOrigin; + scope (exit) serverCancellation.trigger(); + + immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6))); + + // semaphore that will be incremented whenever each server thread has bound and started listening + Semaphore sem = new Semaphore(0); + +version (none) { + // Cannot use this due to multiple DMD @@BUG@@s: + // 1. »function D main is a nested function and cannot be accessed from array« + // when calling array() on the result of the outer map() – would have to + // manually do the eager evaluation/array conversion. + // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument, + // can be worked around by calling array() on the map result first. + // 3. Even when using the workarounds for the last two points, the DMD-built + // executable crashes when building without (sic!) inlining enabled, + // the backtrace points into the first delegate literal. + auto handlers = array(map!((args){ + return new ExTestHandler(args._0, args._1, args._2, trace); + })(zip( + ports, + map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]), + [false, false, false, true, true, true] + ))); +} else { + auto handlers = [ + new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace), + new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace), + new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace), + new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace), + new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace), + new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace) + ]; +} + + // Fire up the server threads. + foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start(); + + // wait until all the handlers signal that they're ready to serve + foreach (h; handlers) (sem.wait(dur!`seconds`(1))); + + syncClientPoolTest(ports, handlers); + asyncClientPoolTest(ports, handlers); + asyncFastestClientPoolTest(ports, handlers); + asyncAggregatorTest(ports, handlers); +} + + +void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { + auto clients = array(map!((a){ + return cast(TClientBase!ExTestService)tClient!ExTestService( + tBinaryProtocol(new TSocket("127.0.0.1", a)) + ); + })(ports)); + + scope(exit) foreach (c; clients) c.outputProtocol.transport.close(); + + // Try the case where the first client succeeds. + { + enforce(makePool(clients).getPort() == ports[0]); + } + + // Try the case where all clients fail. + { + auto pool = makePool(clients[3 .. $]); + auto e = cast(TCompoundOperationException)collectException(pool.getPort()); + enforce(e); + enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), + ports[3 .. $])); + } + + // Try the case where the first clients fail, but a later one succeeds. + { + auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]); + enforce(pool.getPortInArray() == [ports[0]]); + } + + // Make sure a client is properly deactivated when it has failed too often. + { + auto pool = makePool(clients); + pool.faultDisableCount = 1; + pool.faultDisableDuration = dur!"msecs"(50); + + handlers[0].failing = true; + enforce(pool.getPort() == ports[1]); + + handlers[0].failing = false; + enforce(pool.getPort() == ports[1]); + + Thread.sleep(dur!"msecs"(50)); + enforce(pool.getPort() == ports[0]); + } +} + +auto makePool(TClientBase!ExTestService[] clients) { + auto p = tClientPool(clients); + p.permuteClients = false; + p.rpcFaultFilter = (Exception e) { + return (cast(TestServiceException)e !is null); + }; + return p; +} + + +void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { + auto manager = new TLibeventAsyncManager; + scope (exit) manager.stop(dur!"hnsecs"(0)); + + auto clients = makeAsyncClients(manager, ports); + scope(exit) foreach (c; clients) c.transport.close(); + + // Try the case where the first client succeeds. + { + enforce(makeAsyncPool(clients).getPort() == ports[0]); + } + + // Try the case where all clients fail. + { + auto pool = makeAsyncPool(clients[3 .. $]); + auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); + enforce(e); + enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), + ports[3 .. $])); + } + + // Try the case where the first clients fail, but a later one succeeds. + { + auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]); + enforce(pool.getPortInArray() == [ports[0]]); + } + + // Make sure a client is properly deactivated when it has failed too often. + { + auto pool = makeAsyncPool(clients); + pool.faultDisableCount = 1; + pool.faultDisableDuration = dur!"msecs"(50); + + handlers[0].failing = true; + enforce(pool.getPort() == ports[1]); + + handlers[0].failing = false; + enforce(pool.getPort() == ports[1]); + + Thread.sleep(dur!"msecs"(50)); + enforce(pool.getPort() == ports[0]); + } +} + +auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) { + auto p = tAsyncClientPool(clients); + p.permuteClients = false; + p.rpcFaultFilter = (Exception e) { + return (cast(TestServiceException)e !is null); + }; + return p; +} + +auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) { + // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads + // to »function D main is a nested function and cannot be accessed from array«. + // Thus, we manually do the array conversion. + auto lazyClients = map!((a){ + return new TAsyncClient!ExTestService( + new TAsyncSocket(manager, "127.0.0.1", a), + new TBufferedTransportFactory, + new TBinaryProtocolFactory!(TBufferedTransport) + ); + })(ports); + TAsyncClientBase!ExTestService[] clients; + foreach (c; lazyClients) clients ~= c; + return clients; +} + + +void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { + auto manager = new TLibeventAsyncManager; + scope (exit) manager.stop(dur!"hnsecs"(0)); + + auto clients = makeAsyncClients(manager, ports); + scope(exit) foreach (c; clients) c.transport.close(); + + // Make sure the fastest client wins, even if they are called in some other + // order. + { + auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet(); + enforce(result == ports[0]); + } + + // Try the case where all clients fail. + { + auto pool = makeAsyncFastestPool(clients[3 .. $]); + auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); + enforce(e); + enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), + ports[3 .. $])); + } + + // Try the case where the first clients fail, but a later one succeeds. + { + auto pool = makeAsyncFastestPool(clients[1 .. $]); + enforce(pool.getPortInArray() == [ports[1]]); + } +} + +auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) { + auto p = tAsyncFastestClientPool(clients); + p.rpcFaultFilter = (Exception e) { + return (cast(TestServiceException)e !is null); + }; + return p; +} + + +void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) { + auto manager = new TLibeventAsyncManager; + scope (exit) manager.stop(dur!"hnsecs"(0)); + + auto clients = makeAsyncClients(manager, ports); + scope(exit) foreach (c; clients) c.transport.close(); + + auto aggregator = tAsyncAggregator( + cast(TAsyncClientBase!ExTestService[])clients); + + // Test aggregator range interface. + { + auto range = aggregator.getPort().range(dur!"msecs"(50)); + enforce(equal(range, ports[0 .. 2][])); + enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions), + ports[3 .. $ - 1])); + enforce(range.completedCount == 4); + } + + // Test default accumulator for scalars. + { + auto fullResult = aggregator.getPort().accumulate(); + enforce(fullResult.waitGet() == ports[0 .. 3]); + + auto partialResult = aggregator.getPort().accumulate(); + Thread.sleep(dur!"msecs"(20)); + enforce(partialResult.finishGet() == ports[0 .. 2]); + + } + + // Test default accumulator for arrays. + { + auto fullResult = aggregator.getPortInArray().accumulate(); + enforce(fullResult.waitGet() == ports[0 .. 3]); + + auto partialResult = aggregator.getPortInArray().accumulate(); + Thread.sleep(dur!"msecs"(20)); + enforce(partialResult.finishGet() == ports[0 .. 2]); + } + + // Test custom accumulator. + { + auto fullResult = aggregator.getPort().accumulate!(function(int[] results){ + return reduce!"a + b"(results); + })(); + enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]); + + auto partialResult = aggregator.getPort().accumulate!( + function(int[] results, Exception[] exceptions) { + // Return a tuple of the parameters so we can check them outside of + // this function (to verify the values, we need access to »ports«, but + // due to DMD @@BUG5710@@, we can't use a delegate literal).f + return tuple(results, exceptions); + } + )(); + Thread.sleep(dur!"msecs"(20)); + auto resultTuple = partialResult.finishGet(); + enforce(resultTuple[0] == ports[0 .. 2]); + enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]), + ports[3 .. $ - 1])); + } +} |