diff options
Diffstat (limited to 'src/jaegertracing/thrift/lib/d/test')
12 files changed, 2939 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/d/test/Makefile.am b/src/jaegertracing/thrift/lib/d/test/Makefile.am new file mode 100755 index 000000000..5ec8255bb --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/Makefile.am @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +AUTOMAKE_OPTIONS = serial-tests + +# Thrift compiler rules + +debug_proto_gen = $(addprefix gen-d/, DebugProtoTest_types.d) + +$(debug_proto_gen): $(top_srcdir)/test/DebugProtoTest.thrift + $(THRIFT) --gen d -nowarn $< + +stress_test_gen = $(addprefix gen-d/thrift/test/stress/, Service.d \ + StressTest_types.d) + +$(stress_test_gen): $(top_srcdir)/test/StressTest.thrift + $(THRIFT) --gen d $< + +thrift_test_gen = $(addprefix gen-d/thrift/test/, SecondService.d \ + ThriftTest.d ThriftTest_constants.d ThriftTest_types.d) + +$(thrift_test_gen): $(top_srcdir)/test/ThriftTest.thrift + $(THRIFT) --gen d $< + + +# The actual test targets. +# There just must be some way to reassign a variable without warnings in +# Automake... +targets__ = async_test client_pool_test serialization_benchmark \ + stress_test_server thrift_test_client thrift_test_server transport_test +ran_tests__ = client_pool_test \ + transport_test \ + async_test_runner.sh \ + thrift_test_runner.sh + +libevent_dependent_targets = async_test_client client_pool_test \ + stress_test_server thrift_test_server +libevent_dependent_ran_tests = client_pool_test async_test_runner.sh thrift_test_runner.sh + +openssl_dependent_targets = async_test thrift_test_client thrift_test_server +openssl_dependent_ran_tests = async_test_runner.sh thrift_test_runner.sh + +d_test_flags = + +if WITH_D_EVENT_TESTS +d_test_flags += $(DMD_LIBEVENT_FLAGS) ../$(D_EVENT_LIB_NAME) +targets_ = $(targets__) +ran_tests_ = $(ran_tests__) +else +targets_ = $(filter-out $(libevent_dependent_targets), $(targets__)) +ran_tests_ = $(filter-out $(libevent_dependent_ran_tests), $(ran_tests__)) +endif + +if WITH_D_SSL_TESTS +d_test_flags += $(DMD_OPENSSL_FLAGS) ../$(D_SSL_LIB_NAME) +targets = $(targets_) +ran_tests = $(ran_tests_) +else +targets = $(filter-out $(openssl_dependent_targets), $(targets_)) +ran_tests = $(filter-out $(openssl_dependent_ran_tests), $(ran_tests_)) +endif + +d_test_flags += -w -wi -O -release -inline -I$(top_srcdir)/lib/d/src -Igen-d \ + $(top_builddir)/lib/d/$(D_LIB_NAME) + + +async_test client_pool_test transport_test: %: %.d + $(DMD) $(d_test_flags) -of$@ $^ + +serialization_benchmark: %: %.d $(debug_proto_gen) + $(DMD) $(d_test_flags) -of$@ $^ + +stress_test_server: %: %.d test_utils.d $(stress_test_gen) + $(DMD) $(d_test_flags) -of$@ $^ + +thrift_test_client: %: %.d thrift_test_common.d $(thrift_test_gen) + $(DMD) $(d_test_flags) -of$@ $^ + +thrift_test_server: %: %.d thrift_test_common.d test_utils.d $(thrift_test_gen) + $(DMD) $(d_test_flags) -of$@ $^ + + +check-local: $(targets) + +clean-local: + $(RM) -rf gen-d $(targets) $(addsuffix .o, $(targets)) + + +# Tests ran as part of make check. + +async_test_runner.sh: async_test +thrift_test_runner.sh: thrift_test_client thrift_test_server + +TESTS = $(ran_tests) + +precross: $(targets) diff --git a/src/jaegertracing/thrift/lib/d/test/async_test.d b/src/jaegertracing/thrift/lib/d/test/async_test.d new file mode 100644 index 000000000..51529ba86 --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/async_test.d @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless enforced by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +module async_test; + +import core.atomic; +import core.sync.condition : Condition; +import core.sync.mutex : Mutex; +import core.thread : dur, Thread, ThreadGroup; +import std.conv : text; +import std.datetime; +import std.getopt; +import std.exception : collectException, enforce; +import std.parallelism : TaskPool; +import std.stdio; +import std.string; +import std.variant : Variant; +import thrift.base; +import thrift.async.base; +import thrift.async.libevent; +import thrift.async.socket; +import thrift.async.ssl; +import thrift.codegen.async_client; +import thrift.codegen.async_client_pool; +import thrift.codegen.base; +import thrift.codegen.processor; +import thrift.protocol.base; +import thrift.protocol.binary; +import thrift.server.base; +import thrift.server.simple; +import thrift.server.transport.socket; +import thrift.server.transport.ssl; +import thrift.transport.base; +import thrift.transport.buffered; +import thrift.transport.ssl; +import thrift.util.cancellation; + +version (Posix) { + import core.stdc.signal; + import core.sys.posix.signal; + + // Disable SIGPIPE because SSL server will write to broken socket after + // client disconnected (see TSSLSocket docs). + shared static this() { + signal(SIGPIPE, SIG_IGN); + } +} + +interface AsyncTest { + string echo(string value); + string delayedEcho(string value, long milliseconds); + + void fail(string reason); + void delayedFail(string reason, long milliseconds); + + enum methodMeta = [ + TMethodMeta("fail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]), + TMethodMeta("delayedFail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]) + ]; + alias .AsyncTestException AsyncTestException; +} + +class AsyncTestException : TException { + string reason; + mixin TStructHelpers!(); +} + +void main(string[] args) { + ushort port = 9090; + ushort managerCount = 2; + ushort serversPerManager = 5; + ushort threadsPerServer = 10; + uint iterations = 10; + bool ssl; + bool trace; + + getopt(args, + "iterations", &iterations, + "managers", &managerCount, + "port", &port, + "servers-per-manager", &serversPerManager, + "ssl", &ssl, + "threads-per-server", &threadsPerServer, + "trace", &trace, + ); + + TTransportFactory clientTransportFactory; + TSSLContext serverSSLContext; + if (ssl) { + auto clientSSLContext = new TSSLContext(); + with (clientSSLContext) { + authenticate = true; + ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; + loadTrustedCertificates("../../../test/keys/CA.pem"); + } + clientTransportFactory = new TAsyncSSLSocketFactory(clientSSLContext); + + serverSSLContext = new TSSLContext(); + with (serverSSLContext) { + serverSide = true; + ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; + loadCertificate("../../../test/keys/server.crt"); + loadPrivateKey("../../../test/keys/server.key"); + } + } else { + clientTransportFactory = new TBufferedTransportFactory; + } + + + auto serverCancel = new TCancellationOrigin; + scope(exit) { + writeln("Triggering server shutdown..."); + serverCancel.trigger(); + writeln("done."); + } + + auto managers = new TLibeventAsyncManager[managerCount]; + scope (exit) foreach (ref m; managers) destroy(m); + + auto clientsThreads = new ThreadGroup; + foreach (managerIndex, ref manager; managers) { + manager = new TLibeventAsyncManager; + foreach (serverIndex; 0 .. serversPerManager) { + auto currentPort = cast(ushort) + (port + managerIndex * serversPerManager + serverIndex); + + // Start the server and wait until it is up and running. + auto servingMutex = new Mutex; + auto servingCondition = new Condition(servingMutex); + auto handler = new PreServeNotifyHandler(servingMutex, servingCondition); + synchronized (servingMutex) { + (new ServerThread!TSimpleServer(currentPort, serverSSLContext, trace, + serverCancel, handler)).start(); + servingCondition.wait(); + } + + // We only run the timing tests for the first server on each async + // manager, so that we don't get spurious timing errors becaue of + // ordering issues. + auto runTimingTests = (serverIndex == 0); + + auto c = new ClientsThread(manager, currentPort, clientTransportFactory, + threadsPerServer, iterations, runTimingTests, trace); + clientsThreads.add(c); + c.start(); + } + } + clientsThreads.joinAll(); +} + +class AsyncTestHandler : AsyncTest { + this(bool trace) { + trace_ = trace; + } + + override string echo(string value) { + if (trace_) writefln(`echo("%s")`, value); + return value; + } + + override string delayedEcho(string value, long milliseconds) { + if (trace_) writef(`delayedEcho("%s", %s ms)... `, value, milliseconds); + Thread.sleep(dur!"msecs"(milliseconds)); + if (trace_) writeln("returning."); + + return value; + } + + override void fail(string reason) { + if (trace_) writefln(`fail("%s")`, reason); + auto ate = new AsyncTestException; + ate.reason = reason; + throw ate; + } + + override void delayedFail(string reason, long milliseconds) { + if (trace_) writef(`delayedFail("%s", %s ms)... `, reason, milliseconds); + Thread.sleep(dur!"msecs"(milliseconds)); + if (trace_) writeln("returning."); + + auto ate = new AsyncTestException; + ate.reason = reason; + throw ate; + } + +private: + bool trace_; + AsyncTestException ate_; +} + +class PreServeNotifyHandler : TServerEventHandler { + this(Mutex servingMutex, Condition servingCondition) { + servingMutex_ = servingMutex; + servingCondition_ = servingCondition; + } + + void preServe() { + synchronized (servingMutex_) { + servingCondition_.notifyAll(); + } + } + Variant createContext(TProtocol input, TProtocol output) { return Variant.init; } + void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {} + void preProcess(Variant serverContext, TTransport transport) {} + +private: + Mutex servingMutex_; + Condition servingCondition_; +} + +class ServerThread(ServerType) : Thread { + this(ushort port, TSSLContext sslContext, bool trace, + TCancellation cancellation, TServerEventHandler eventHandler + ) { + port_ = port; + sslContext_ = sslContext; + trace_ = trace; + cancellation_ = cancellation; + eventHandler_ = eventHandler; + + super(&run); + } + + void run() { + TServerSocket serverSocket; + if (sslContext_) { + serverSocket = new TSSLServerSocket(port_, sslContext_); + } else { + serverSocket = new TServerSocket(port_); + } + auto transportFactory = new TBufferedTransportFactory; + auto protocolFactory = new TBinaryProtocolFactory!(); + auto processor = new TServiceProcessor!AsyncTest(new AsyncTestHandler(trace_)); + + auto server = new ServerType(processor, serverSocket, transportFactory, + protocolFactory); + server.eventHandler = eventHandler_; + + writefln("Starting server on port %s...", port_); + server.serve(cancellation_); + writefln("Server thread on port %s done.", port_); + } + +private: + ushort port_; + bool trace_; + TCancellation cancellation_; + TSSLContext sslContext_; + TServerEventHandler eventHandler_; +} + +class ClientsThread : Thread { + this(TAsyncSocketManager manager, ushort port, TTransportFactory tf, + ushort threads, uint iterations, bool runTimingTests, bool trace + ) { + manager_ = manager; + port_ = port; + transportFactory_ = tf; + threads_ = threads; + iterations_ = iterations; + runTimingTests_ = runTimingTests; + trace_ = trace; + super(&run); + } + + void run() { + auto transport = new TAsyncSocket(manager_, "localhost", port_); + + { + auto client = new TAsyncClient!AsyncTest( + transport, + transportFactory_, + new TBinaryProtocolFactory!() + ); + transport.open(); + auto clientThreads = new ThreadGroup; + foreach (clientId; 0 .. threads_) { + clientThreads.create({ + auto c = clientId; + return { + foreach (i; 0 .. iterations_) { + immutable id = text(port_, ":", c, ":", i); + + { + if (trace_) writefln(`Calling echo("%s")... `, id); + auto a = client.echo(id); + enforce(a == id); + if (trace_) writefln(`echo("%s") done.`, id); + } + + { + if (trace_) writefln(`Calling fail("%s")... `, id); + auto a = cast(AsyncTestException)collectException(client.fail(id).waitGet()); + enforce(a && a.reason == id); + if (trace_) writefln(`fail("%s") done.`, id); + } + } + }; + }()); + } + clientThreads.joinAll(); + transport.close(); + } + + if (runTimingTests_) { + auto client = new TAsyncClient!AsyncTest( + transport, + transportFactory_, + new TBinaryProtocolFactory!TBufferedTransport + ); + + // Temporarily redirect error logs to stdout, as SSL errors on the server + // side are expected when the client terminates aburptly (as is the case + // in the timeout test). + auto oldErrorLogSink = g_errorLogSink; + g_errorLogSink = g_infoLogSink; + scope (exit) g_errorLogSink = oldErrorLogSink; + + foreach (i; 0 .. iterations_) { + transport.open(); + + immutable id = text(port_, ":", i); + + { + if (trace_) writefln(`Calling delayedEcho("%s", 100 ms)...`, id); + auto a = client.delayedEcho(id, 100); + enforce(!a.completion.wait(dur!"usecs"(1)), + text("wait() succeeded early (", a.get(), ", ", id, ").")); + enforce(!a.completion.wait(dur!"usecs"(1)), + text("wait() succeeded early (", a.get(), ", ", id, ").")); + enforce(a.completion.wait(dur!"msecs"(200)), + text("wait() didn't succeed as expected (", id, ").")); + enforce(a.get() == id); + if (trace_) writefln(`... delayedEcho("%s") done.`, id); + } + + { + if (trace_) writefln(`Calling delayedFail("%s", 100 ms)... `, id); + auto a = client.delayedFail(id, 100); + enforce(!a.completion.wait(dur!"usecs"(1)), + text("wait() succeeded early (", id, ", ", collectException(a.get()), ").")); + enforce(!a.completion.wait(dur!"usecs"(1)), + text("wait() succeeded early (", id, ", ", collectException(a.get()), ").")); + enforce(a.completion.wait(dur!"msecs"(200)), + text("wait() didn't succeed as expected (", id, ").")); + auto e = cast(AsyncTestException)collectException(a.get()); + enforce(e && e.reason == id); + if (trace_) writefln(`... delayedFail("%s") done.`, id); + } + + { + transport.recvTimeout = dur!"msecs"(50); + + if (trace_) write(`Calling delayedEcho("socketTimeout", 100 ms)... `); + auto a = client.delayedEcho("socketTimeout", 100); + auto e = cast(TTransportException)collectException(a.waitGet()); + enforce(e, text("Operation didn't fail as expected (", id, ").")); + enforce(e.type == TTransportException.Type.TIMED_OUT, + text("Wrong timeout exception type (", id, "): ", e)); + if (trace_) writeln(`timed out as expected.`); + + // Wait until the server thread reset before the next iteration. + Thread.sleep(dur!"msecs"(50)); + transport.recvTimeout = dur!"hnsecs"(0); + } + + transport.close(); + } + } + + writefln("Clients thread for port %s done.", port_); + } + + TAsyncSocketManager manager_; + ushort port_; + TTransportFactory transportFactory_; + ushort threads_; + uint iterations_; + bool runTimingTests_; + bool trace_; +} diff --git a/src/jaegertracing/thrift/lib/d/test/async_test_runner.sh b/src/jaegertracing/thrift/lib/d/test/async_test_runner.sh new file mode 100755 index 000000000..d56654f50 --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/async_test_runner.sh @@ -0,0 +1,31 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +CUR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# Runs the async test in both SSL and non-SSL mode. +${CUR}/async_test > /dev/null || exit 1 +echo "Non-SSL tests done." + +# THRIFT-4905: disabled the following test as it deadlocks / hangs +# ${CUR}/async_test --ssl > /dev/null || exit 1 +# echo "SSL tests done." +echo "THRIFT-4905: SSL tests are disabled. Fix them." diff --git a/src/jaegertracing/thrift/lib/d/test/client_pool_test.d b/src/jaegertracing/thrift/lib/d/test/client_pool_test.d new file mode 100644 index 000000000..b24c97afd --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/client_pool_test.d @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +module client_pool_test; + +import core.sync.semaphore : Semaphore; +import core.time : Duration, dur; +import core.thread : Thread; +import std.algorithm; +import std.array; +import std.conv; +import std.exception; +import std.getopt; +import std.range; +import std.stdio; +import std.typecons; +import std.variant : Variant; +import thrift.base; +import thrift.async.libevent; +import thrift.async.socket; +import thrift.codegen.base; +import thrift.codegen.async_client; +import thrift.codegen.async_client_pool; +import thrift.codegen.client; +import thrift.codegen.client_pool; +import thrift.codegen.processor; +import thrift.protocol.base; +import thrift.protocol.binary; +import thrift.server.base; +import thrift.server.simple; +import thrift.server.transport.socket; +import thrift.transport.base; +import thrift.transport.buffered; +import thrift.transport.socket; +import thrift.util.cancellation; +import thrift.util.future; + +// We use this as our RPC-layer exception here to make sure socket/… problems +// (that would usually considered to be RPC layer faults) cause the tests to +// fail, even though we are testing the RPC exception handling. +class TestServiceException : TException { + int port; +} + +interface TestService { + int getPort(); + alias .TestServiceException TestServiceException; + enum methodMeta = [TMethodMeta("getPort", [], + [TExceptionMeta("a", 1, "TestServiceException")])]; +} + +// Use some derived service, just to check that the pools handle inheritance +// correctly. +interface ExTestService : TestService { + int[] getPortInArray(); + enum methodMeta = [TMethodMeta("getPortInArray", [], + [TExceptionMeta("a", 1, "TestServiceException")])]; +} + +class ExTestHandler : ExTestService { + this(ushort port, Duration delay, bool failing, bool trace) { + this.port = port; + this.delay = delay; + this.failing = failing; + this.trace = trace; + } + + override int getPort() { + if (trace) { + stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port, + delay, failing); + } + sleep(); + failIfEnabled(); + return port; + } + + override int[] getPortInArray() { + return [getPort()]; + } + + ushort port; + Duration delay; + bool failing; + bool trace; + +private: + void sleep() { + if (delay > dur!"hnsecs"(0)) Thread.sleep(delay); + } + + void failIfEnabled() { + if (!failing) return; + + auto e = new TestServiceException; + e.port = port; + throw e; + } +} + +class ServerPreServeHandler : TServerEventHandler { + this(Semaphore sem) { + sem_ = sem; + } + + override void preServe() { + sem_.notify(); + } + + Variant createContext(TProtocol input, TProtocol output) { return Variant.init; } + void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {} + void preProcess(Variant serverContext, TTransport transport) {} + +private: + Semaphore sem_; +} + +class ServerThread : Thread { + this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) { + super(&run); + handler_ = handler; + cancellation_ = cancellation; + serverHandler_ = serverHandler; + } +private: + void run() { + try { + auto protocolFactory = new TBinaryProtocolFactory!(); + auto processor = new TServiceProcessor!ExTestService(handler_); + auto serverTransport = new TServerSocket(handler_.port); + serverTransport.recvTimeout = dur!"seconds"(3); + auto transportFactory = new TBufferedTransportFactory; + + auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory); + server.eventHandler = serverHandler_; + server.serve(cancellation_); + } catch (Exception e) { + writefln("Server thread on port %s failed: %s", handler_.port, e); + } + } + + ExTestHandler handler_; + ServerPreServeHandler serverHandler_; + TCancellation cancellation_; +} + +void main(string[] args) { + bool trace; + ushort port = 9090; + getopt(args, "port", &port, "trace", &trace); + + auto serverCancellation = new TCancellationOrigin; + scope (exit) serverCancellation.trigger(); + + immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6))); + + // semaphore that will be incremented whenever each server thread has bound and started listening + Semaphore sem = new Semaphore(0); + +version (none) { + // Cannot use this due to multiple DMD @@BUG@@s: + // 1. »function D main is a nested function and cannot be accessed from array« + // when calling array() on the result of the outer map() – would have to + // manually do the eager evaluation/array conversion. + // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument, + // can be worked around by calling array() on the map result first. + // 3. Even when using the workarounds for the last two points, the DMD-built + // executable crashes when building without (sic!) inlining enabled, + // the backtrace points into the first delegate literal. + auto handlers = array(map!((args){ + return new ExTestHandler(args._0, args._1, args._2, trace); + })(zip( + ports, + map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]), + [false, false, false, true, true, true] + ))); +} else { + auto handlers = [ + new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace), + new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace), + new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace), + new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace), + new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace), + new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace) + ]; +} + + // Fire up the server threads. + foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start(); + + // wait until all the handlers signal that they're ready to serve + foreach (h; handlers) (sem.wait(dur!`seconds`(1))); + + syncClientPoolTest(ports, handlers); + asyncClientPoolTest(ports, handlers); + asyncFastestClientPoolTest(ports, handlers); + asyncAggregatorTest(ports, handlers); +} + + +void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { + auto clients = array(map!((a){ + return cast(TClientBase!ExTestService)tClient!ExTestService( + tBinaryProtocol(new TSocket("127.0.0.1", a)) + ); + })(ports)); + + scope(exit) foreach (c; clients) c.outputProtocol.transport.close(); + + // Try the case where the first client succeeds. + { + enforce(makePool(clients).getPort() == ports[0]); + } + + // Try the case where all clients fail. + { + auto pool = makePool(clients[3 .. $]); + auto e = cast(TCompoundOperationException)collectException(pool.getPort()); + enforce(e); + enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), + ports[3 .. $])); + } + + // Try the case where the first clients fail, but a later one succeeds. + { + auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]); + enforce(pool.getPortInArray() == [ports[0]]); + } + + // Make sure a client is properly deactivated when it has failed too often. + { + auto pool = makePool(clients); + pool.faultDisableCount = 1; + pool.faultDisableDuration = dur!"msecs"(50); + + handlers[0].failing = true; + enforce(pool.getPort() == ports[1]); + + handlers[0].failing = false; + enforce(pool.getPort() == ports[1]); + + Thread.sleep(dur!"msecs"(50)); + enforce(pool.getPort() == ports[0]); + } +} + +auto makePool(TClientBase!ExTestService[] clients) { + auto p = tClientPool(clients); + p.permuteClients = false; + p.rpcFaultFilter = (Exception e) { + return (cast(TestServiceException)e !is null); + }; + return p; +} + + +void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { + auto manager = new TLibeventAsyncManager; + scope (exit) manager.stop(dur!"hnsecs"(0)); + + auto clients = makeAsyncClients(manager, ports); + scope(exit) foreach (c; clients) c.transport.close(); + + // Try the case where the first client succeeds. + { + enforce(makeAsyncPool(clients).getPort() == ports[0]); + } + + // Try the case where all clients fail. + { + auto pool = makeAsyncPool(clients[3 .. $]); + auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); + enforce(e); + enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), + ports[3 .. $])); + } + + // Try the case where the first clients fail, but a later one succeeds. + { + auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]); + enforce(pool.getPortInArray() == [ports[0]]); + } + + // Make sure a client is properly deactivated when it has failed too often. + { + auto pool = makeAsyncPool(clients); + pool.faultDisableCount = 1; + pool.faultDisableDuration = dur!"msecs"(50); + + handlers[0].failing = true; + enforce(pool.getPort() == ports[1]); + + handlers[0].failing = false; + enforce(pool.getPort() == ports[1]); + + Thread.sleep(dur!"msecs"(50)); + enforce(pool.getPort() == ports[0]); + } +} + +auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) { + auto p = tAsyncClientPool(clients); + p.permuteClients = false; + p.rpcFaultFilter = (Exception e) { + return (cast(TestServiceException)e !is null); + }; + return p; +} + +auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) { + // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads + // to »function D main is a nested function and cannot be accessed from array«. + // Thus, we manually do the array conversion. + auto lazyClients = map!((a){ + return new TAsyncClient!ExTestService( + new TAsyncSocket(manager, "127.0.0.1", a), + new TBufferedTransportFactory, + new TBinaryProtocolFactory!(TBufferedTransport) + ); + })(ports); + TAsyncClientBase!ExTestService[] clients; + foreach (c; lazyClients) clients ~= c; + return clients; +} + + +void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { + auto manager = new TLibeventAsyncManager; + scope (exit) manager.stop(dur!"hnsecs"(0)); + + auto clients = makeAsyncClients(manager, ports); + scope(exit) foreach (c; clients) c.transport.close(); + + // Make sure the fastest client wins, even if they are called in some other + // order. + { + auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet(); + enforce(result == ports[0]); + } + + // Try the case where all clients fail. + { + auto pool = makeAsyncFastestPool(clients[3 .. $]); + auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); + enforce(e); + enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), + ports[3 .. $])); + } + + // Try the case where the first clients fail, but a later one succeeds. + { + auto pool = makeAsyncFastestPool(clients[1 .. $]); + enforce(pool.getPortInArray() == [ports[1]]); + } +} + +auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) { + auto p = tAsyncFastestClientPool(clients); + p.rpcFaultFilter = (Exception e) { + return (cast(TestServiceException)e !is null); + }; + return p; +} + + +void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) { + auto manager = new TLibeventAsyncManager; + scope (exit) manager.stop(dur!"hnsecs"(0)); + + auto clients = makeAsyncClients(manager, ports); + scope(exit) foreach (c; clients) c.transport.close(); + + auto aggregator = tAsyncAggregator( + cast(TAsyncClientBase!ExTestService[])clients); + + // Test aggregator range interface. + { + auto range = aggregator.getPort().range(dur!"msecs"(50)); + enforce(equal(range, ports[0 .. 2][])); + enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions), + ports[3 .. $ - 1])); + enforce(range.completedCount == 4); + } + + // Test default accumulator for scalars. + { + auto fullResult = aggregator.getPort().accumulate(); + enforce(fullResult.waitGet() == ports[0 .. 3]); + + auto partialResult = aggregator.getPort().accumulate(); + Thread.sleep(dur!"msecs"(20)); + enforce(partialResult.finishGet() == ports[0 .. 2]); + + } + + // Test default accumulator for arrays. + { + auto fullResult = aggregator.getPortInArray().accumulate(); + enforce(fullResult.waitGet() == ports[0 .. 3]); + + auto partialResult = aggregator.getPortInArray().accumulate(); + Thread.sleep(dur!"msecs"(20)); + enforce(partialResult.finishGet() == ports[0 .. 2]); + } + + // Test custom accumulator. + { + auto fullResult = aggregator.getPort().accumulate!(function(int[] results){ + return reduce!"a + b"(results); + })(); + enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]); + + auto partialResult = aggregator.getPort().accumulate!( + function(int[] results, Exception[] exceptions) { + // Return a tuple of the parameters so we can check them outside of + // this function (to verify the values, we need access to »ports«, but + // due to DMD @@BUG5710@@, we can't use a delegate literal).f + return tuple(results, exceptions); + } + )(); + Thread.sleep(dur!"msecs"(20)); + auto resultTuple = partialResult.finishGet(); + enforce(resultTuple[0] == ports[0 .. 2]); + enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]), + ports[3 .. $ - 1])); + } +} diff --git a/src/jaegertracing/thrift/lib/d/test/serialization_benchmark.d b/src/jaegertracing/thrift/lib/d/test/serialization_benchmark.d new file mode 100644 index 000000000..40d048094 --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/serialization_benchmark.d @@ -0,0 +1,70 @@ +/** + * An implementation of the mini serialization benchmark also available for + * C++ and Java. + * + * For meaningful results, you might want to make sure that + * the Thrift library is compiled with release build flags, + * e.g. by including the source files with the build instead + * of linking libthriftd: + * + dmd -w -O -release -inline -I../src -Igen-d -ofserialization_benchmark \ + $(find ../src/thrift -name '*.d' -not -name index.d) \ + gen-d/DebugProtoTest_types.d serialization_benchmark.d + */ +module serialization_benchmark; + +import std.datetime.stopwatch : AutoStart, StopWatch; +import std.math : PI; +import std.stdio; +import thrift.protocol.binary; +import thrift.transport.memory; +import thrift.transport.range; +import DebugProtoTest_types; + +void main() { + auto buf = new TMemoryBuffer; + enum ITERATIONS = 10_000_000; + + { + auto ooe = OneOfEach(); + ooe.im_true = true; + ooe.im_false = false; + ooe.a_bite = 0x7f; + ooe.integer16 = 27_000; + ooe.integer32 = 1 << 24; + ooe.integer64 = 6_000_000_000; + ooe.double_precision = PI; + ooe.some_characters = "JSON THIS! \"\1"; + ooe.zomg_unicode = "\xd7\n\a\t"; + ooe.base64 = "\1\2\3\255"; + + auto prot = tBinaryProtocol(buf); + auto sw = StopWatch(AutoStart.yes); + foreach (i; 0 .. ITERATIONS) { + buf.reset(120); + ooe.write(prot); + } + sw.stop(); + + auto msecs = sw.peek().total!"msecs"; + writefln("Write: %s ms (%s kHz)", msecs, ITERATIONS / msecs); + } + + auto data = buf.getContents().dup; + + { + auto readBuf = tInputRangeTransport(data); + auto prot = tBinaryProtocol(readBuf); + auto ooe = OneOfEach(); + + auto sw = StopWatch(AutoStart.yes); + foreach (i; 0 .. ITERATIONS) { + readBuf.reset(data); + ooe.read(prot); + } + sw.stop(); + + auto msecs = sw.peek().total!"msecs"; + writefln(" Read: %s ms (%s kHz)", msecs, ITERATIONS / msecs); + } +} diff --git a/src/jaegertracing/thrift/lib/d/test/stress_test_server.d b/src/jaegertracing/thrift/lib/d/test/stress_test_server.d new file mode 100644 index 000000000..ddda098b3 --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/stress_test_server.d @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +module stress_test_server; + +import std.getopt; +import std.parallelism : totalCPUs; +import std.stdio; +import std.typetuple; +import thrift.codegen.processor; +import thrift.protocol.binary; +import thrift.server.base; +import thrift.server.transport.socket; +import thrift.transport.buffered; +import thrift.transport.memory; +import thrift.transport.socket; +import thrift.util.hashset; +import test_utils; + +import thrift.test.stress.Service; + +class ServiceHandler : Service { + void echoVoid() { return; } + byte echoByte(byte arg) { return arg; } + int echoI32(int arg) { return arg; } + long echoI64(long arg) { return arg; } + byte[] echoList(byte[] arg) { return arg; } + HashSet!byte echoSet(HashSet!byte arg) { return arg; } + byte[byte] echoMap(byte[byte] arg) { return arg; } + + string echoString(string arg) { + if (arg != "hello") { + stderr.writefln(`Wrong string received: %s instead of "hello"`, arg); + throw new Exception("Wrong string received."); + } + return arg; + } +} + +void main(string[] args) { + ushort port = 9091; + auto serverType = ServerType.threaded; + TransportType transportType; + size_t numIOThreads = 1; + size_t taskPoolSize = totalCPUs; + + getopt(args, "port", &port, "server-type", &serverType, + "transport-type", &transportType, "task-pool-size", &taskPoolSize, + "num-io-threads", &numIOThreads); + + alias TypeTuple!(TBufferedTransport, TMemoryBuffer) AvailableTransports; + + auto processor = new TServiceProcessor!(Service, + staticMap!(TBinaryProtocol, AvailableTransports))(new ServiceHandler()); + auto serverSocket = new TServerSocket(port); + auto transportFactory = createTransportFactory(transportType); + auto protocolFactory = new TBinaryProtocolFactory!AvailableTransports; + + auto server = createServer(serverType, taskPoolSize, numIOThreads, + processor, serverSocket, transportFactory, protocolFactory); + + writefln("Starting %s %s StressTest server on port %s...", transportType, + serverType, port); + server.serve(); + writeln("done."); +} diff --git a/src/jaegertracing/thrift/lib/d/test/test_utils.d b/src/jaegertracing/thrift/lib/d/test/test_utils.d new file mode 100644 index 000000000..174100b79 --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/test_utils.d @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Various helpers used by more than a single test. + */ +module test_utils; + +import std.parallelism : TaskPool; +import thrift.protocol.base; +import thrift.protocol.processor; +import thrift.server.base; +import thrift.server.nonblocking; +import thrift.server.simple; +import thrift.server.taskpool; +import thrift.server.threaded; +import thrift.server.transport.socket; +import thrift.transport.base; +import thrift.transport.buffered; +import thrift.transport.framed; +import thrift.transport.http; + +// This is a likely victim of @@BUG4744@@ when used with command argument +// parsing. +enum ServerType { + simple, + nonblocking, + pooledNonblocking, + taskpool, + threaded +} + +TServer createServer(ServerType type, size_t taskPoolSize, size_t numIOThreads, + TProcessor processor, TServerSocket serverTransport, + TTransportFactory transportFactory, TProtocolFactory protocolFactory) +{ + final switch (type) { + case ServerType.simple: + return new TSimpleServer(processor, serverTransport, + transportFactory, protocolFactory); + case ServerType.nonblocking: + auto nb = new TNonblockingServer(processor, serverTransport.port, + transportFactory, protocolFactory); + nb.numIOThreads = numIOThreads; + return nb; + case ServerType.pooledNonblocking: + auto nb = new TNonblockingServer(processor, serverTransport.port, + transportFactory, protocolFactory, new TaskPool(taskPoolSize)); + nb.numIOThreads = numIOThreads; + return nb; + case ServerType.taskpool: + auto tps = new TTaskPoolServer(processor, serverTransport, + transportFactory, protocolFactory); + tps.taskPool = new TaskPool(taskPoolSize); + return tps; + case ServerType.threaded: + return new TThreadedServer(processor, serverTransport, + transportFactory, protocolFactory); + } +} + +enum TransportType { + buffered, + framed, + http, + raw +} + +TTransportFactory createTransportFactory(TransportType type) { + final switch (type) { + case TransportType.buffered: + return new TBufferedTransportFactory; + case TransportType.framed: + return new TFramedTransportFactory; + case TransportType.http: + return new TServerHttpTransportFactory; + case TransportType.raw: + return new TTransportFactory; + } +} diff --git a/src/jaegertracing/thrift/lib/d/test/thrift_test_client.d b/src/jaegertracing/thrift/lib/d/test/thrift_test_client.d new file mode 100644 index 000000000..49419f71a --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/thrift_test_client.d @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +module thrift_test_client; + +import std.conv; +import std.datetime; +import std.exception : enforce; +import std.getopt; +import std.stdio; +import std.string; +import std.traits; +import thrift.base; +import thrift.codegen.client; +import thrift.protocol.base; +import thrift.protocol.binary; +import thrift.protocol.compact; +import thrift.protocol.json; +import thrift.transport.base; +import thrift.transport.buffered; +import thrift.transport.framed; +import thrift.transport.http; +import thrift.transport.socket; +import thrift.transport.ssl; +import thrift.util.hashset; + +import thrift_test_common; +import thrift.test.ThriftTest; +import thrift.test.ThriftTest_types; + +enum TransportType { + buffered, + framed, + http, + raw +} + +TProtocol createProtocol(T)(T trans, ProtocolType type) { + final switch (type) { + case ProtocolType.binary: + return tBinaryProtocol(trans); + case ProtocolType.compact: + return tCompactProtocol(trans); + case ProtocolType.json: + return tJsonProtocol(trans); + } +} + +void main(string[] args) { + string host = "localhost"; + ushort port = 9090; + uint numTests = 1; + bool ssl; + ProtocolType protocolType; + TransportType transportType; + bool trace; + + getopt(args, + "numTests|n", &numTests, + "protocol", &protocolType, + "ssl", &ssl, + "transport", &transportType, + "trace", &trace, + "port", &port, + "host", (string _, string value) { + auto parts = split(value, ":"); + if (parts.length > 1) { + // IPv6 addresses can contain colons, so take the last part for the + // port. + host = join(parts[0 .. $ - 1], ":"); + port = to!ushort(parts[$ - 1]); + } else { + host = value; + } + } + ); + port = to!ushort(port); + + TSocket socket; + if (ssl) { + auto sslContext = new TSSLContext(); + sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; + sslContext.authenticate = true; + sslContext.loadTrustedCertificates("../../../test/keys/CA.pem"); + socket = new TSSLSocket(sslContext, host, port); + } else { + socket = new TSocket(host, port); + } + + TProtocol protocol; + final switch (transportType) { + case TransportType.buffered: + protocol = createProtocol(new TBufferedTransport(socket), protocolType); + break; + case TransportType.framed: + protocol = createProtocol(new TFramedTransport(socket), protocolType); + break; + case TransportType.http: + protocol = createProtocol( + new TClientHttpTransport(socket, host, "/service"), protocolType); + break; + case TransportType.raw: + protocol = createProtocol(socket, protocolType); + break; + } + + auto client = tClient!ThriftTest(protocol); + + ulong time_min; + ulong time_max; + ulong time_tot; + + StopWatch sw; + foreach(test; 0 .. numTests) { + sw.start(); + + protocol.transport.open(); + + if (trace) writefln("Test #%s, connect %s:%s", test + 1, host, port); + + if (trace) write("testVoid()"); + client.testVoid(); + if (trace) writeln(" = void"); + + if (trace) write("testString(\"Test\")"); + string s = client.testString("Test"); + if (trace) writefln(" = \"%s\"", s); + enforce(s == "Test"); + + if (trace) write("testByte(1)"); + byte u8 = client.testByte(1); + if (trace) writefln(" = %s", u8); + enforce(u8 == 1); + + if (trace) write("testI32(-1)"); + int i32 = client.testI32(-1); + if (trace) writefln(" = %s", i32); + enforce(i32 == -1); + + if (trace) write("testI64(-34359738368)"); + long i64 = client.testI64(-34359738368L); + if (trace) writefln(" = %s", i64); + enforce(i64 == -34359738368L); + + if (trace) write("testDouble(-5.2098523)"); + double dub = client.testDouble(-5.2098523); + if (trace) writefln(" = %s", dub); + enforce(dub == -5.2098523); + + // TODO: add testBinary() call + + Xtruct out1; + out1.string_thing = "Zero"; + out1.byte_thing = 1; + out1.i32_thing = -3; + out1.i64_thing = -5; + if (trace) writef("testStruct(%s)", out1); + auto in1 = client.testStruct(out1); + if (trace) writefln(" = %s", in1); + enforce(in1 == out1); + + if (trace) write("testNest({1, {\"Zero\", 1, -3, -5}), 5}"); + Xtruct2 out2; + out2.byte_thing = 1; + out2.struct_thing = out1; + out2.i32_thing = 5; + auto in2 = client.testNest(out2); + in1 = in2.struct_thing; + if (trace) writefln(" = {%s, {\"%s\", %s, %s, %s}, %s}", in2.byte_thing, + in1.string_thing, in1.byte_thing, in1.i32_thing, in1.i64_thing, + in2.i32_thing); + enforce(in2 == out2); + + int[int] mapout; + for (int i = 0; i < 5; ++i) { + mapout[i] = i - 10; + } + if (trace) writef("testMap({%s})", mapout); + auto mapin = client.testMap(mapout); + if (trace) writefln(" = {%s}", mapin); + enforce(mapin == mapout); + + auto setout = new HashSet!int; + for (int i = -2; i < 3; ++i) { + setout ~= i; + } + if (trace) writef("testSet(%s)", setout); + auto setin = client.testSet(setout); + if (trace) writefln(" = %s", setin); + enforce(setin == setout); + + int[] listout; + for (int i = -2; i < 3; ++i) { + listout ~= i; + } + if (trace) writef("testList(%s)", listout); + auto listin = client.testList(listout); + if (trace) writefln(" = %s", listin); + enforce(listin == listout); + + { + if (trace) write("testEnum(ONE)"); + auto ret = client.testEnum(Numberz.ONE); + if (trace) writefln(" = %s", ret); + enforce(ret == Numberz.ONE); + + if (trace) write("testEnum(TWO)"); + ret = client.testEnum(Numberz.TWO); + if (trace) writefln(" = %s", ret); + enforce(ret == Numberz.TWO); + + if (trace) write("testEnum(THREE)"); + ret = client.testEnum(Numberz.THREE); + if (trace) writefln(" = %s", ret); + enforce(ret == Numberz.THREE); + + if (trace) write("testEnum(FIVE)"); + ret = client.testEnum(Numberz.FIVE); + if (trace) writefln(" = %s", ret); + enforce(ret == Numberz.FIVE); + + if (trace) write("testEnum(EIGHT)"); + ret = client.testEnum(Numberz.EIGHT); + if (trace) writefln(" = %s", ret); + enforce(ret == Numberz.EIGHT); + } + + if (trace) write("testTypedef(309858235082523)"); + UserId uid = client.testTypedef(309858235082523L); + if (trace) writefln(" = %s", uid); + enforce(uid == 309858235082523L); + + if (trace) write("testMapMap(1)"); + auto mm = client.testMapMap(1); + if (trace) writefln(" = {%s}", mm); + // Simply doing == doesn't seem to work for nested AAs. + foreach (key, value; mm) { + enforce(testMapMapReturn[key] == value); + } + foreach (key, value; testMapMapReturn) { + enforce(mm[key] == value); + } + + Insanity insane; + insane.userMap[Numberz.FIVE] = 5000; + Xtruct truck; + truck.string_thing = "Truck"; + truck.byte_thing = 8; + truck.i32_thing = 8; + truck.i64_thing = 8; + insane.xtructs ~= truck; + if (trace) write("testInsanity()"); + auto whoa = client.testInsanity(insane); + if (trace) writefln(" = %s", whoa); + + // Commented for now, this is cumbersome to write without opEqual getting + // called on AA comparison. + // enforce(whoa == testInsanityReturn); + + { + try { + if (trace) write("client.testException(\"Xception\") =>"); + client.testException("Xception"); + if (trace) writeln(" void\nFAILURE"); + throw new Exception("testException failed."); + } catch (Xception e) { + if (trace) writefln(" {%s, \"%s\"}", e.errorCode, e.message); + } + + try { + if (trace) write("client.testException(\"TException\") =>"); + client.testException("Xception"); + if (trace) writeln(" void\nFAILURE"); + throw new Exception("testException failed."); + } catch (TException e) { + if (trace) writefln(" {%s}", e.msg); + } + + try { + if (trace) write("client.testException(\"success\") =>"); + client.testException("success"); + if (trace) writeln(" void"); + } catch (Exception e) { + if (trace) writeln(" exception\nFAILURE"); + throw new Exception("testException failed."); + } + } + + { + try { + if (trace) write("client.testMultiException(\"Xception\", \"test 1\") =>"); + auto result = client.testMultiException("Xception", "test 1"); + if (trace) writeln(" result\nFAILURE"); + throw new Exception("testMultiException failed."); + } catch (Xception e) { + if (trace) writefln(" {%s, \"%s\"}", e.errorCode, e.message); + } + + try { + if (trace) write("client.testMultiException(\"Xception2\", \"test 2\") =>"); + auto result = client.testMultiException("Xception2", "test 2"); + if (trace) writeln(" result\nFAILURE"); + throw new Exception("testMultiException failed."); + } catch (Xception2 e) { + if (trace) writefln(" {%s, {\"%s\"}}", + e.errorCode, e.struct_thing.string_thing); + } + + try { + if (trace) writef("client.testMultiException(\"success\", \"test 3\") =>"); + auto result = client.testMultiException("success", "test 3"); + if (trace) writefln(" {{\"%s\"}}", result.string_thing); + } catch (Exception e) { + if (trace) writeln(" exception\nFAILURE"); + throw new Exception("testMultiException failed."); + } + } + + // Do not run oneway test when doing multiple iterations, as it blocks the + // server for three seconds. + if (numTests == 1) { + if (trace) writef("client.testOneway(3) =>"); + auto onewayWatch = StopWatch(AutoStart.yes); + client.testOneway(3); + onewayWatch.stop(); + if (onewayWatch.peek().msecs > 200) { + if (trace) { + writefln(" FAILURE - took %s ms", onewayWatch.peek().usecs / 1000.0); + } + throw new Exception("testOneway failed."); + } else { + if (trace) { + writefln(" success - took %s ms", onewayWatch.peek().usecs / 1000.0); + } + } + + // Redo a simple test after the oneway to make sure we aren't "off by + // one", which would be the case if the server treated oneway methods + // like normal ones. + if (trace) write("re-test testI32(-1)"); + i32 = client.testI32(-1); + if (trace) writefln(" = %s", i32); + } + + // Time metering. + sw.stop(); + + immutable tot = sw.peek().usecs; + if (trace) writefln("Total time: %s us\n", tot); + + time_tot += tot; + if (time_min == 0 || tot < time_min) { + time_min = tot; + } + if (tot > time_max) { + time_max = tot; + } + protocol.transport.close(); + + sw.reset(); + } + + writeln("All tests done."); + + if (numTests > 1) { + auto time_avg = time_tot / numTests; + writefln("Min time: %s us", time_min); + writefln("Max time: %s us", time_max); + writefln("Avg time: %s us", time_avg); + } +} diff --git a/src/jaegertracing/thrift/lib/d/test/thrift_test_common.d b/src/jaegertracing/thrift/lib/d/test/thrift_test_common.d new file mode 100644 index 000000000..13a568613 --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/thrift_test_common.d @@ -0,0 +1,92 @@ +module thrift_test_common; + +import std.stdio; +import thrift.test.ThriftTest_types; + +enum ProtocolType { + binary, + compact, + json +} + +void writeInsanityReturn(in Insanity[Numberz][UserId] insane) { + write("{"); + foreach(key1, value1; insane) { + writef("%s => {", key1); + foreach(key2, value2; value1) { + writef("%s => {", key2); + write("{"); + foreach(key3, value3; value2.userMap) { + writef("%s => %s, ", key3, value3); + } + write("}, "); + + write("{"); + foreach (x; value2.xtructs) { + writef("{\"%s\", %s, %s, %s}, ", + x.string_thing, x.byte_thing, x.i32_thing, x.i64_thing); + } + write("}"); + + write("}, "); + } + write("}, "); + } + write("}"); +} + +Insanity[Numberz][UserId] testInsanityReturn; +int[int][int] testMapMapReturn; + +static this() { + testInsanityReturn = { + Insanity[Numberz][UserId] insane; + + Xtruct hello; + hello.string_thing = "Hello2"; + hello.byte_thing = 2; + hello.i32_thing = 2; + hello.i64_thing = 2; + + Xtruct goodbye; + goodbye.string_thing = "Goodbye4"; + goodbye.byte_thing = 4; + goodbye.i32_thing = 4; + goodbye.i64_thing = 4; + + Insanity crazy; + crazy.userMap[Numberz.EIGHT] = 8; + crazy.xtructs ~= goodbye; + + Insanity looney; + // The C++ TestServer also assigns these to crazy, but that is probably + // an oversight. + looney.userMap[Numberz.FIVE] = 5; + looney.xtructs ~= hello; + + Insanity[Numberz] first_map; + first_map[Numberz.TWO] = crazy; + first_map[Numberz.THREE] = crazy; + insane[1] = first_map; + + Insanity[Numberz] second_map; + second_map[Numberz.SIX] = looney; + insane[2] = second_map; + return insane; + }(); + + testMapMapReturn = { + int[int] pos; + int[int] neg; + + for (int i = 1; i < 5; i++) { + pos[i] = i; + neg[-i] = -i; + } + + int[int][int] result; + result[4] = pos; + result[-4] = neg; + return result; + }(); +} diff --git a/src/jaegertracing/thrift/lib/d/test/thrift_test_runner.sh b/src/jaegertracing/thrift/lib/d/test/thrift_test_runner.sh new file mode 100755 index 000000000..51bfe9999 --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/thrift_test_runner.sh @@ -0,0 +1,93 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Runs the D ThriftTest client and servers for all combinations of transport, +# protocol, SSL-mode and server type. +# Pass -k to keep going after failed tests. + +CUR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +protocols="binary compact json" +# TODO: fix and enable http +# transports="buffered framed raw http" +transports="buffered framed raw" +servers="simple taskpool threaded" +framed_only_servers="nonblocking pooledNonblocking" + +# Don't leave any server instances behind when interrupted (e.g. by Ctrl+C) +# or terminated. +trap "kill $(jobs -p) 2>/dev/null" INT TERM + +for protocol in $protocols; do + for ssl in "" " --ssl"; do + for transport in $transports; do + for server in $servers $framed_only_servers; do + case $framed_only_servers in + *$server*) if [ $transport != "framed" ] || [ $ssl != "" ]; then continue; fi;; + esac + + args="--transport=$transport --protocol=$protocol$ssl" + ${CUR}/thrift_test_server $args --server-type=$server > /dev/null & + server_pid=$! + + # Give the server some time to get up and check if it runs (yes, this + # is a huge kludge, should add a connect timeout to test client). + client_rc=-1 + if [ "$server" = "taskpool" ]; then + sleep 0.5 + else + sleep 0.02 + fi + kill -0 $server_pid 2>/dev/null + if [ $? -eq 0 ]; then + ${CUR}/thrift_test_client $args --numTests=10 > /dev/null + client_rc=$? + + # Temporarily redirect stderr to null to avoid job control messages, + # restore it afterwards. + exec 3>&2 + exec 2>/dev/null + kill $server_pid + exec 3>&2 + fi + + # Get the server exit code (wait should immediately return). + wait $server_pid + server_rc=$? + + if [ $client_rc -ne 0 -o $server_rc -eq 1 ]; then + echo -e "\nTests failed for: $args --server-type=$server" + failed="true" + if [ "$1" != "-k" ]; then + exit 1 + fi + else + echo -n "." + fi + done + done + done +done + +echo +if [ -z "$failed" ]; then + echo "All tests passed." +fi diff --git a/src/jaegertracing/thrift/lib/d/test/thrift_test_server.d b/src/jaegertracing/thrift/lib/d/test/thrift_test_server.d new file mode 100644 index 000000000..ce820d699 --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/thrift_test_server.d @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +module thrift_test_server; + +import core.stdc.errno : errno; +import core.stdc.signal : signal, SIGINT, SIG_DFL, SIG_ERR; +import core.thread : dur, Thread; +import std.algorithm; +import std.exception : enforce; +import std.getopt; +import std.parallelism : totalCPUs; +import std.string; +import std.stdio; +import std.typetuple : TypeTuple, staticMap; +import thrift.base; +import thrift.codegen.processor; +import thrift.protocol.base; +import thrift.protocol.binary; +import thrift.protocol.compact; +import thrift.protocol.json; +import thrift.server.base; +import thrift.server.transport.socket; +import thrift.server.transport.ssl; +import thrift.transport.base; +import thrift.transport.buffered; +import thrift.transport.framed; +import thrift.transport.http; +import thrift.transport.ssl; +import thrift.util.cancellation; +import thrift.util.hashset; +import test_utils; + +import thrift_test_common; +import thrift.test.ThriftTest_types; +import thrift.test.ThriftTest; + +class TestHandler : ThriftTest { + this(bool trace) { + trace_ = trace; + } + + override void testVoid() { + if (trace_) writeln("testVoid()"); + } + + override string testString(string thing) { + if (trace_) writefln("testString(\"%s\")", thing); + return thing; + } + + override byte testByte(byte thing) { + if (trace_) writefln("testByte(%s)", thing); + return thing; + } + + override int testI32(int thing) { + if (trace_) writefln("testI32(%s)", thing); + return thing; + } + + override long testI64(long thing) { + if (trace_) writefln("testI64(%s)", thing); + return thing; + } + + override double testDouble(double thing) { + if (trace_) writefln("testDouble(%s)", thing); + return thing; + } + + override string testBinary(string thing) { + if (trace_) writefln("testBinary(\"%s\")", thing); + return thing; + } + + override bool testBool(bool thing) { + if (trace_) writefln("testBool(\"%s\")", thing); + return thing; + } + + override Xtruct testStruct(ref const(Xtruct) thing) { + if (trace_) writefln("testStruct({\"%s\", %s, %s, %s})", + thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing); + return thing; + } + + override Xtruct2 testNest(ref const(Xtruct2) nest) { + auto thing = nest.struct_thing; + if (trace_) writefln("testNest({%s, {\"%s\", %s, %s, %s}, %s})", + nest.byte_thing, thing.string_thing, thing.byte_thing, thing.i32_thing, + thing.i64_thing, nest.i32_thing); + return nest; + } + + override int[int] testMap(int[int] thing) { + if (trace_) writefln("testMap({%s})", thing); + return thing; + } + + override HashSet!int testSet(HashSet!int thing) { + if (trace_) writefln("testSet({%s})", + join(map!`to!string(a)`(thing[]), ", ")); + return thing; + } + + override int[] testList(int[] thing) { + if (trace_) writefln("testList(%s)", thing); + return thing; + } + + override Numberz testEnum(Numberz thing) { + if (trace_) writefln("testEnum(%s)", thing); + return thing; + } + + override UserId testTypedef(UserId thing) { + if (trace_) writefln("testTypedef(%s)", thing); + return thing; + } + + override string[string] testStringMap(string[string] thing) { + if (trace_) writefln("testStringMap(%s)", thing); + return thing; + } + + override int[int][int] testMapMap(int hello) { + if (trace_) writefln("testMapMap(%s)", hello); + return testMapMapReturn; + } + + override Insanity[Numberz][UserId] testInsanity(ref const(Insanity) argument) { + if (trace_) writeln("testInsanity()"); + Insanity[Numberz][UserId] ret; + Insanity[Numberz] m1; + Insanity[Numberz] m2; + Insanity tmp; + tmp = cast(Insanity)argument; + m1[Numberz.TWO] = tmp; + m1[Numberz.THREE] = tmp; + m2[Numberz.SIX] = Insanity(); + ret[1] = m1; + ret[2] = m2; + return ret; + } + + override Xtruct testMulti(byte arg0, int arg1, long arg2, string[short] arg3, + Numberz arg4, UserId arg5) + { + if (trace_) writeln("testMulti()"); + return Xtruct("Hello2", arg0, arg1, arg2); + } + + override void testException(string arg) { + if (trace_) writefln("testException(%s)", arg); + if (arg == "Xception") { + auto e = new Xception(); + e.errorCode = 1001; + e.message = arg; + throw e; + } else if (arg == "TException") { + throw new TException(); + } else if (arg == "ApplicationException") { + throw new TException(); + } + } + + override Xtruct testMultiException(string arg0, string arg1) { + if (trace_) writefln("testMultiException(%s, %s)", arg0, arg1); + + if (arg0 == "Xception") { + auto e = new Xception(); + e.errorCode = 1001; + e.message = "This is an Xception"; + throw e; + } else if (arg0 == "Xception2") { + auto e = new Xception2(); + e.errorCode = 2002; + e.struct_thing.string_thing = "This is an Xception2"; + throw e; + } else { + return Xtruct(arg1); + } + } + + override void testOneway(int sleepFor) { + if (trace_) writefln("testOneway(%s): Sleeping...", sleepFor); + Thread.sleep(dur!"seconds"(sleepFor)); + if (trace_) writefln("testOneway(%s): done sleeping!", sleepFor); + } + +private: + bool trace_; +} + +shared(bool) gShutdown = false; + +nothrow @nogc extern(C) void handleSignal(int sig) { + gShutdown = true; +} + +// Runs a thread that waits for shutdown to be +// signaled and then triggers cancellation, +// causing the server to stop. While we could +// use a signalfd for this purpose, we are instead +// opting for a busy waiting scheme for maximum +// portability since signalfd is a linux thing. + +class ShutdownThread : Thread { + this(TCancellationOrigin cancellation) { + cancellation_ = cancellation; + super(&run); + } + +private: + void run() { + while (!gShutdown) { + Thread.sleep(dur!("msecs")(25)); + } + cancellation_.trigger(); + } + + TCancellationOrigin cancellation_; +} + +void main(string[] args) { + ushort port = 9090; + ServerType serverType; + ProtocolType protocolType; + size_t numIOThreads = 1; + TransportType transportType; + bool ssl = false; + bool trace = true; + size_t taskPoolSize = totalCPUs; + + getopt(args, "port", &port, "protocol", &protocolType, "server-type", + &serverType, "ssl", &ssl, "num-io-threads", &numIOThreads, + "task-pool-size", &taskPoolSize, "trace", &trace, + "transport", &transportType); + + if (serverType == ServerType.nonblocking || + serverType == ServerType.pooledNonblocking + ) { + enforce(transportType == TransportType.framed, + "Need to use framed transport with non-blocking server."); + enforce(!ssl, "The non-blocking server does not support SSL yet."); + + // Don't wrap the contents into another layer of framing. + transportType = TransportType.raw; + } + + version (ThriftTestTemplates) { + // Only exercise the specialized template code paths if explicitly enabled + // to reduce memory consumption on regular test suite runs – there should + // not be much that can go wrong with that specifically anyway. + alias TypeTuple!(TBufferedTransport, TFramedTransport, TServerHttpTransport) + AvailableTransports; + alias TypeTuple!( + staticMap!(TBinaryProtocol, AvailableTransports), + staticMap!(TCompactProtocol, AvailableTransports) + ) AvailableProtocols; + } else { + alias TypeTuple!() AvailableTransports; + alias TypeTuple!() AvailableProtocols; + } + + TProtocolFactory protocolFactory; + final switch (protocolType) { + case ProtocolType.binary: + protocolFactory = new TBinaryProtocolFactory!AvailableTransports; + break; + case ProtocolType.compact: + protocolFactory = new TCompactProtocolFactory!AvailableTransports; + break; + case ProtocolType.json: + protocolFactory = new TJsonProtocolFactory!AvailableTransports; + break; + } + + auto processor = new TServiceProcessor!(ThriftTest, AvailableProtocols)( + new TestHandler(trace)); + + TServerSocket serverSocket; + if (ssl) { + auto sslContext = new TSSLContext(); + sslContext.serverSide = true; + sslContext.loadCertificate("../../../test/keys/server.crt"); + sslContext.loadPrivateKey("../../../test/keys/server.key"); + sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; + serverSocket = new TSSLServerSocket(port, sslContext); + } else { + serverSocket = new TServerSocket(port); + } + + auto transportFactory = createTransportFactory(transportType); + + auto server = createServer(serverType, numIOThreads, taskPoolSize, + processor, serverSocket, transportFactory, protocolFactory); + + // Set up SIGINT signal handling + enforce(signal(SIGINT, &handleSignal) != SIG_ERR, + "Could not replace the SIGINT signal handler: errno {0}".format(errno())); + + // Set up a server cancellation trigger + auto cancel = new TCancellationOrigin(); + + // Set up a listener for the shutdown condition - this will + // wake up when the signal occurs and trigger cancellation. + auto shutdown = new ShutdownThread(cancel); + shutdown.start(); + + // Serve from this thread; the signal will stop the server + // and control will return here + writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType, + transportType, serverType, ssl ? "(using SSL) ": "", port); + server.serve(cancel); + shutdown.join(); + signal(SIGINT, SIG_DFL); + + writeln("done."); +} diff --git a/src/jaegertracing/thrift/lib/d/test/transport_test.d b/src/jaegertracing/thrift/lib/d/test/transport_test.d new file mode 100644 index 000000000..623e03f0e --- /dev/null +++ b/src/jaegertracing/thrift/lib/d/test/transport_test.d @@ -0,0 +1,803 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Exercises various transports, combined with the buffered/framed wrappers. + * + * Originally ported from the C++ version, with Windows support code added. + */ +module transport_test; + +import core.atomic; +import core.time : Duration; +import core.thread : Thread; +import std.conv : to; +import std.datetime; +import std.exception : enforce; +static import std.file; +import std.getopt; +import std.random : rndGen, uniform, unpredictableSeed; +import std.socket; +import std.stdio; +import std.string; +import std.typetuple; +import thrift.transport.base; +import thrift.transport.buffered; +import thrift.transport.framed; +import thrift.transport.file; +import thrift.transport.http; +import thrift.transport.memory; +import thrift.transport.socket; +import thrift.transport.zlib; + +/* + * Size generation helpers – used to be able to run the same testing code + * with both constant and random total/chunk sizes. + */ + +interface SizeGenerator { + size_t nextSize(); + string toString(); +} + +class ConstantSizeGenerator : SizeGenerator { + this(size_t value) { + value_ = value; + } + + override size_t nextSize() { + return value_; + } + + override string toString() const { + return to!string(value_); + } + +private: + size_t value_; +} + +class RandomSizeGenerator : SizeGenerator { + this(size_t min, size_t max) { + min_ = min; + max_ = max; + } + + override size_t nextSize() { + return uniform!"[]"(min_, max_); + } + + override string toString() const { + return format("rand(%s, %s)", min_, max_); + } + + size_t min() const @property { + return min_; + } + + size_t max() const @property { + return max_; + } + +private: + size_t min_; + size_t max_; +} + + +/* + * Classes to set up coupled transports + */ + +/** + * Helper class to represent a coupled pair of transports. + * + * Data written to the output transport can be read from the input transport. + * + * This is used as the base class for the various coupled transport + * implementations. It shouldn't be used directly. + */ +class CoupledTransports(Transport) if (isTTransport!Transport) { + Transport input; + Transport output; +} + +template isCoupledTransports(T) { + static if (is(T _ : CoupledTransports!U, U)) { + enum isCoupledTransports = true; + } else { + enum isCoupledTransports = false; + } +} + +/** + * Helper template class for creating coupled transports that wrap + * another transport. + */ +class CoupledWrapperTransports(WrapperTransport, InnerCoupledTransports) if ( + isTTransport!WrapperTransport && isCoupledTransports!InnerCoupledTransports +) : CoupledTransports!WrapperTransport { + this() { + inner_ = new InnerCoupledTransports(); + if (inner_.input) { + input = new WrapperTransport(inner_.input); + } + if (inner_.output) { + output = new WrapperTransport(inner_.output); + } + } + + ~this() { + destroy(inner_); + } + +private: + InnerCoupledTransports inner_; +} + +import thrift.internal.codegen : PApply; +alias PApply!(CoupledWrapperTransports, TBufferedTransport) CoupledBufferedTransports; +alias PApply!(CoupledWrapperTransports, TFramedTransport) CoupledFramedTransports; +alias PApply!(CoupledWrapperTransports, TZlibTransport) CoupledZlibTransports; + +/** + * Coupled TMemoryBuffers. + */ +class CoupledMemoryBuffers : CoupledTransports!TMemoryBuffer { + this() { + buf = new TMemoryBuffer; + input = buf; + output = buf; + } + + TMemoryBuffer buf; +} + +/** + * Coupled TSockets. + */ +class CoupledSocketTransports : CoupledTransports!TSocket { + this() { + auto sockets = socketPair(); + input = new TSocket(sockets[0]); + output = new TSocket(sockets[1]); + } + + ~this() { + input.close(); + output.close(); + } +} + +/** + * Coupled TFileTransports + */ +class CoupledFileTransports : CoupledTransports!TTransport { + this() { + // We actually need the file name of the temp file here, so we can't just + // use the usual tempfile facilities. + do { + fileName_ = tmpDir ~ "/thrift.transport_test." ~ to!string(rndGen().front); + rndGen().popFront(); + } while (std.file.exists(fileName_)); + + writefln("Using temp file: %s", fileName_); + + auto writer = new TFileWriterTransport(fileName_); + writer.open(); + output = writer; + + // Wait until the file has been created. + writer.flush(); + + auto reader = new TFileReaderTransport(fileName_); + reader.open(); + reader.readTimeout(dur!"msecs"(-1)); + input = reader; + } + + ~this() { + input.close(); + output.close(); + std.file.remove(fileName_); + } + + static string tmpDir; + +private: + string fileName_; +} + + +/* + * Test functions + */ + +/** + * Test interleaved write and read calls. + * + * Generates a buffer totalSize bytes long, then writes it to the transport, + * and verifies the written data can be read back correctly. + * + * Mode of operation: + * - call wChunkGenerator to figure out how large of a chunk to write + * - call wSizeGenerator to get the size for individual write() calls, + * and do this repeatedly until the entire chunk is written. + * - call rChunkGenerator to figure out how large of a chunk to read + * - call rSizeGenerator to get the size for individual read() calls, + * and do this repeatedly until the entire chunk is read. + * - repeat until the full buffer is written and read back, + * then compare the data read back against the original buffer + * + * + * - If any of the size generators return 0, this means to use the maximum + * possible size. + * + * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that + * there are never more than maxOutstanding bytes waiting to be read back. + */ +void testReadWrite(CoupledTransports)( + size_t totalSize, + SizeGenerator wSizeGenerator, + SizeGenerator rSizeGenerator, + SizeGenerator wChunkGenerator, + SizeGenerator rChunkGenerator, + size_t maxOutstanding +) if ( + isCoupledTransports!CoupledTransports +) { + scope transports = new CoupledTransports; + assert(transports.input); + assert(transports.output); + + auto wbuf = new ubyte[totalSize]; + auto rbuf = new ubyte[totalSize]; + + // Store some data in wbuf. + foreach (i, ref b; wbuf) { + b = i & 0xff; + } + + size_t totalWritten; + size_t totalRead; + while (totalRead < totalSize) { + // Determine how large a chunk of data to write. + auto wChunkSize = wChunkGenerator.nextSize(); + if (wChunkSize == 0 || wChunkSize > totalSize - totalWritten) { + wChunkSize = totalSize - totalWritten; + } + + // Make sure (totalWritten - totalRead) + wChunkSize is less than + // maxOutstanding. + if (maxOutstanding > 0 && + wChunkSize > maxOutstanding - (totalWritten - totalRead)) { + wChunkSize = maxOutstanding - (totalWritten - totalRead); + } + + // Write the chunk. + size_t chunkWritten = 0; + while (chunkWritten < wChunkSize) { + auto writeSize = wSizeGenerator.nextSize(); + if (writeSize == 0 || writeSize > wChunkSize - chunkWritten) { + writeSize = wChunkSize - chunkWritten; + } + + transports.output.write(wbuf[totalWritten .. totalWritten + writeSize]); + chunkWritten += writeSize; + totalWritten += writeSize; + } + + // Flush the data, so it will be available in the read transport + // Don't flush if wChunkSize is 0. (This should only happen if + // totalWritten == totalSize already, and we're only reading now.) + if (wChunkSize > 0) { + transports.output.flush(); + } + + // Determine how large a chunk of data to read back. + auto rChunkSize = rChunkGenerator.nextSize(); + if (rChunkSize == 0 || rChunkSize > totalWritten - totalRead) { + rChunkSize = totalWritten - totalRead; + } + + // Read the chunk. + size_t chunkRead; + while (chunkRead < rChunkSize) { + auto readSize = rSizeGenerator.nextSize(); + if (readSize == 0 || readSize > rChunkSize - chunkRead) { + readSize = rChunkSize - chunkRead; + } + + size_t bytesRead; + try { + bytesRead = transports.input.read( + rbuf[totalRead .. totalRead + readSize]); + } catch (TTransportException e) { + throw new Exception(format(`read(pos = %s, size = %s) threw ` ~ + `exception "%s"; written so far: %s/%s bytes`, totalRead, readSize, + e.msg, totalWritten, totalSize)); + } + + enforce(bytesRead > 0, format(`read(pos = %s, size = %s) returned %s; ` ~ + `written so far: %s/%s bytes`, totalRead, readSize, bytesRead, + totalWritten, totalSize)); + + chunkRead += bytesRead; + totalRead += bytesRead; + } + } + + // make sure the data read back is identical to the data written + if (rbuf != wbuf) { + stderr.writefln("%s vs. %s", wbuf[$ - 4 .. $], rbuf[$ - 4 .. $]); + stderr.writefln("rbuf: %s vs. wbuf: %s", rbuf.length, wbuf.length); + } + enforce(rbuf == wbuf); +} + +void testReadPartAvailable(CoupledTransports)() if ( + isCoupledTransports!CoupledTransports +) { + scope transports = new CoupledTransports; + assert(transports.input); + assert(transports.output); + + ubyte[10] writeBuf = 'a'; + ubyte[10] readBuf; + + // Attemping to read 10 bytes when only 9 are available should return 9 + // immediately. + transports.output.write(writeBuf[0 .. 9]); + transports.output.flush(); + + auto t = Trigger(dur!"seconds"(3), transports.output, 1); + auto bytesRead = transports.input.read(readBuf); + enforce(t.fired == 0); + enforce(bytesRead == 9); +} + +void testReadPartialMidframe(CoupledTransports)() if ( + isCoupledTransports!CoupledTransports +) { + scope transports = new CoupledTransports; + assert(transports.input); + assert(transports.output); + + ubyte[13] writeBuf = 'a'; + ubyte[14] readBuf; + + // Attempt to read 10 bytes, when only 9 are available, but after we have + // already read part of the data that is available. This exercises a + // different code path for several of the transports. + // + // For transports that add their own framing (e.g., TFramedTransport and + // TFileTransport), the two flush calls break up the data in to a 10 byte + // frame and a 3 byte frame. The first read then puts us partway through the + // first frame, and then we attempt to read past the end of that frame, and + // through the next frame, too. + // + // For buffered transports that perform read-ahead (e.g., + // TBufferedTransport), the read-ahead will most likely see all 13 bytes + // written on the first read. The next read will then attempt to read past + // the end of the read-ahead buffer. + // + // Flush 10 bytes, then 3 bytes. This creates 2 separate frames for + // transports that track framing internally. + transports.output.write(writeBuf[0 .. 10]); + transports.output.flush(); + transports.output.write(writeBuf[10 .. 13]); + transports.output.flush(); + + // Now read 4 bytes, so that we are partway through the written data. + auto bytesRead = transports.input.read(readBuf[0 .. 4]); + enforce(bytesRead == 4); + + // Now attempt to read 10 bytes. Only 9 more are available. + // + // We should be able to get all 9 bytes, but it might take multiple read + // calls, since it is valid for read() to return fewer bytes than requested. + // (Most transports do immediately return 9 bytes, but the framing transports + // tend to only return to the end of the current frame, which is 6 bytes in + // this case.) + size_t totalRead = 0; + while (totalRead < 9) { + auto t = Trigger(dur!"seconds"(3), transports.output, 1); + bytesRead = transports.input.read(readBuf[4 + totalRead .. 14]); + enforce(t.fired == 0); + enforce(bytesRead > 0); + totalRead += bytesRead; + enforce(totalRead <= 9); + } + + enforce(totalRead == 9); +} + +void testBorrowPartAvailable(CoupledTransports)() if ( + isCoupledTransports!CoupledTransports +) { + scope transports = new CoupledTransports; + assert(transports.input); + assert(transports.output); + + ubyte[9] writeBuf = 'a'; + ubyte[10] readBuf; + + // Attemping to borrow 10 bytes when only 9 are available should return NULL + // immediately. + transports.output.write(writeBuf); + transports.output.flush(); + + auto t = Trigger(dur!"seconds"(3), transports.output, 1); + auto borrowLen = readBuf.length; + auto borrowedBuf = transports.input.borrow(readBuf.ptr, borrowLen); + enforce(t.fired == 0); + enforce(borrowedBuf is null); +} + +void testReadNoneAvailable(CoupledTransports)() if ( + isCoupledTransports!CoupledTransports +) { + scope transports = new CoupledTransports; + assert(transports.input); + assert(transports.output); + + // Attempting to read when no data is available should either block until + // some data is available, or fail immediately. (e.g., TSocket blocks, + // TMemoryBuffer just fails.) + // + // If the transport blocks, it should succeed once some data is available, + // even if less than the amount requested becomes available. + ubyte[10] readBuf; + + auto t = Trigger(dur!"seconds"(1), transports.output, 2); + t.add(dur!"seconds"(1), transports.output, 8); + + auto bytesRead = transports.input.read(readBuf); + if (bytesRead == 0) { + enforce(t.fired == 0); + } else { + enforce(t.fired == 1); + enforce(bytesRead == 2); + } +} + +void testBorrowNoneAvailable(CoupledTransports)() if ( + isCoupledTransports!CoupledTransports +) { + scope transports = new CoupledTransports; + assert(transports.input); + assert(transports.output); + + ubyte[16] writeBuf = 'a'; + + // Attempting to borrow when no data is available should fail immediately + auto t = Trigger(dur!"seconds"(1), transports.output, 10); + + auto borrowLen = 10; + auto borrowedBuf = transports.input.borrow(null, borrowLen); + enforce(borrowedBuf is null); + enforce(t.fired == 0); +} + + +void doRwTest(CoupledTransports)( + size_t totalSize, + SizeGenerator wSizeGen, + SizeGenerator rSizeGen, + SizeGenerator wChunkSizeGen = new ConstantSizeGenerator(0), + SizeGenerator rChunkSizeGen = new ConstantSizeGenerator(0), + size_t maxOutstanding = 0 +) if ( + isCoupledTransports!CoupledTransports +) { + totalSize = cast(size_t)(totalSize * g_sizeMultiplier); + + scope(failure) { + writefln("Test failed for %s: testReadWrite(%s, %s, %s, %s, %s, %s)", + CoupledTransports.stringof, totalSize, wSizeGen, rSizeGen, + wChunkSizeGen, rChunkSizeGen, maxOutstanding); + } + + testReadWrite!CoupledTransports(totalSize, wSizeGen, rSizeGen, + wChunkSizeGen, rChunkSizeGen, maxOutstanding); +} + +void doBlockingTest(CoupledTransports)() if ( + isCoupledTransports!CoupledTransports +) { + void writeFailure(string name) { + writefln("Test failed for %s: %s()", CoupledTransports.stringof, name); + } + + { + scope(failure) writeFailure("testReadPartAvailable"); + testReadPartAvailable!CoupledTransports(); + } + + { + scope(failure) writeFailure("testReadPartialMidframe"); + testReadPartialMidframe!CoupledTransports(); + } + + { + scope(failure) writeFailure("testReadNoneAvaliable"); + testReadNoneAvailable!CoupledTransports(); + } + + { + scope(failure) writeFailure("testBorrowPartAvailable"); + testBorrowPartAvailable!CoupledTransports(); + } + + { + scope(failure) writeFailure("testBorrowNoneAvailable"); + testBorrowNoneAvailable!CoupledTransports(); + } +} + +SizeGenerator getGenerator(T)(T t) { + static if (is(T : SizeGenerator)) { + return t; + } else { + return new ConstantSizeGenerator(t); + } +} + +template WrappedTransports(T) if (isCoupledTransports!T) { + alias TypeTuple!( + T, + CoupledBufferedTransports!T, + CoupledFramedTransports!T, + CoupledZlibTransports!T + ) WrappedTransports; +} + +void testRw(C, R, S)( + size_t totalSize, + R wSize, + S rSize +) if ( + isCoupledTransports!C && is(typeof(getGenerator(wSize))) && + is(typeof(getGenerator(rSize))) +) { + testRw!C(totalSize, wSize, rSize, 0, 0, 0); +} + +void testRw(C, R, S, T, U)( + size_t totalSize, + R wSize, + S rSize, + T wChunkSize, + U rChunkSize, + size_t maxOutstanding = 0 +) if ( + isCoupledTransports!C && is(typeof(getGenerator(wSize))) && + is(typeof(getGenerator(rSize))) && is(typeof(getGenerator(wChunkSize))) && + is(typeof(getGenerator(rChunkSize))) +) { + foreach (T; WrappedTransports!C) { + doRwTest!T( + totalSize, + getGenerator(wSize), + getGenerator(rSize), + getGenerator(wChunkSize), + getGenerator(rChunkSize), + maxOutstanding + ); + } +} + +void testBlocking(C)() if (isCoupledTransports!C) { + foreach (T; WrappedTransports!C) { + doBlockingTest!T(); + } +} + +// A quick hack, for the sake of brevity… +float g_sizeMultiplier = 1; + +version (Posix) { + immutable defaultTempDir = "/tmp"; +} else version (Windows) { + import core.sys.windows.windows; + extern(Windows) DWORD GetTempPathA(DWORD nBufferLength, LPTSTR lpBuffer); + + string defaultTempDir() @property { + char[MAX_PATH + 1] dir; + enforce(GetTempPathA(dir.length, dir.ptr)); + return to!string(dir.ptr)[0 .. $ - 1]; + } +} else static assert(false); + +void main(string[] args) { + int seed = unpredictableSeed(); + string tmpDir = defaultTempDir; + + getopt(args, "seed", &seed, "size-multiplier", &g_sizeMultiplier, + "tmp-dir", &tmpDir); + enforce(g_sizeMultiplier >= 0, "Size multiplier must not be negative."); + + writefln("Using seed: %s", seed); + rndGen().seed(seed); + CoupledFileTransports.tmpDir = tmpDir; + + auto rand4k = new RandomSizeGenerator(1, 4096); + + /* + * We do the basically the same set of tests for each transport type, + * although we tweak the parameters in some places. + */ + + // TMemoryBuffer tests + testRw!CoupledMemoryBuffers(1024 * 1024, 0, 0); + testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k); + testRw!CoupledMemoryBuffers(1024 * 256, 167, 163); + testRw!CoupledMemoryBuffers(1024 * 16, 1, 1); + + testRw!CoupledMemoryBuffers(1024 * 256, 0, 0, rand4k, rand4k); + testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k, rand4k, rand4k); + testRw!CoupledMemoryBuffers(1024 * 256, 167, 163, rand4k, rand4k); + testRw!CoupledMemoryBuffers(1024 * 16, 1, 1, rand4k, rand4k); + + testBlocking!CoupledMemoryBuffers(); + + // TSocket tests + enum socketMaxOutstanding = 4096; + testRw!CoupledSocketTransports(1024 * 1024, 0, 0, + 0, 0, socketMaxOutstanding); + testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k, + 0, 0, socketMaxOutstanding); + testRw!CoupledSocketTransports(1024 * 256, 167, 163, + 0, 0, socketMaxOutstanding); + // Doh. Apparently writing to a socket has some additional overhead for + // each send() call. If we have more than ~400 outstanding 1-byte write + // requests, additional send() calls start blocking. + testRw!CoupledSocketTransports(1024 * 16, 1, 1, + 0, 0, 250); + testRw!CoupledSocketTransports(1024 * 256, 0, 0, + rand4k, rand4k, socketMaxOutstanding); + testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k, + rand4k, rand4k, socketMaxOutstanding); + testRw!CoupledSocketTransports(1024 * 256, 167, 163, + rand4k, rand4k, socketMaxOutstanding); + testRw!CoupledSocketTransports(1024 * 16, 1, 1, + rand4k, rand4k, 250); + + testBlocking!CoupledSocketTransports(); + + // File transport tests. + + // Cannot write more than the frame size at once. + enum maxWriteAtOnce = 1024 * 1024 * 16 - 4; + + testRw!CoupledFileTransports(1024 * 1024, maxWriteAtOnce, 0); + testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k); + testRw!CoupledFileTransports(1024 * 256, 167, 163); + testRw!CoupledFileTransports(1024 * 16, 1, 1); + + testRw!CoupledFileTransports(1024 * 256, 0, 0, rand4k, rand4k); + testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k, rand4k, rand4k); + testRw!CoupledFileTransports(1024 * 256, 167, 163, rand4k, rand4k); + testRw!CoupledFileTransports(1024 * 16, 1, 1, rand4k, rand4k); + + testBlocking!CoupledFileTransports(); +} + + +/* + * Timer handling code for use in tests that check the transport blocking + * semantics. + * + * The implementation has been hacked together in a hurry and wastes a lot of + * threads, but speed should not be the concern here. + */ + +struct Trigger { + this(Duration timeout, TTransport transport, size_t writeLength) { + mutex_ = new Mutex; + cancelCondition_ = new Condition(mutex_); + info_ = new Info(timeout, transport, writeLength); + startThread(); + } + + ~this() { + synchronized (mutex_) { + info_ = null; + cancelCondition_.notifyAll(); + } + if (thread_) thread_.join(); + } + + @disable this(this) { assert(0); } + + void add(Duration timeout, TTransport transport, size_t writeLength) { + synchronized (mutex_) { + auto info = new Info(timeout, transport, writeLength); + if (info_) { + auto prev = info_; + while (prev.next) prev = prev.next; + prev.next = info; + } else { + info_ = info; + startThread(); + } + } + } + + @property short fired() { + return atomicLoad(fired_); + } + +private: + void timerThread() { + // KLUDGE: Make sure the std.concurrency mbox is initialized on the timer + // thread to be able to unblock the file transport. + import std.concurrency; + thisTid; + + synchronized (mutex_) { + while (info_) { + auto cancelled = cancelCondition_.wait(info_.timeout); + if (cancelled) { + info_ = null; + break; + } + + atomicOp!"+="(fired_, 1); + + // Write some data to the transport to unblock it. + auto buf = new ubyte[info_.writeLength]; + buf[] = 'b'; + info_.transport.write(buf); + info_.transport.flush(); + + info_ = info_.next; + } + } + + thread_ = null; + } + + void startThread() { + thread_ = new Thread(&timerThread); + thread_.start(); + } + + struct Info { + this(Duration timeout, TTransport transport, size_t writeLength) { + this.timeout = timeout; + this.transport = transport; + this.writeLength = writeLength; + } + + Duration timeout; + TTransport transport; + size_t writeLength; + Info* next; + } + + Info* info_; + Thread thread_; + shared short fired_; + + import core.sync.mutex; + Mutex mutex_; + import core.sync.condition; + Condition cancelCondition_; +} |