summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/d/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/jaegertracing/thrift/lib/d/test')
-rwxr-xr-xsrc/jaegertracing/thrift/lib/d/test/Makefile.am112
-rw-r--r--src/jaegertracing/thrift/lib/d/test/async_test.d396
-rwxr-xr-xsrc/jaegertracing/thrift/lib/d/test/async_test_runner.sh31
-rw-r--r--src/jaegertracing/thrift/lib/d/test/client_pool_test.d442
-rw-r--r--src/jaegertracing/thrift/lib/d/test/serialization_benchmark.d70
-rw-r--r--src/jaegertracing/thrift/lib/d/test/stress_test_server.d81
-rw-r--r--src/jaegertracing/thrift/lib/d/test/test_utils.d96
-rw-r--r--src/jaegertracing/thrift/lib/d/test/thrift_test_client.d386
-rw-r--r--src/jaegertracing/thrift/lib/d/test/thrift_test_common.d92
-rwxr-xr-xsrc/jaegertracing/thrift/lib/d/test/thrift_test_runner.sh93
-rw-r--r--src/jaegertracing/thrift/lib/d/test/thrift_test_server.d337
-rw-r--r--src/jaegertracing/thrift/lib/d/test/transport_test.d803
12 files changed, 2939 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/d/test/Makefile.am b/src/jaegertracing/thrift/lib/d/test/Makefile.am
new file mode 100755
index 000000000..5ec8255bb
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/Makefile.am
@@ -0,0 +1,112 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+AUTOMAKE_OPTIONS = serial-tests
+
+# Thrift compiler rules
+
+debug_proto_gen = $(addprefix gen-d/, DebugProtoTest_types.d)
+
+$(debug_proto_gen): $(top_srcdir)/test/DebugProtoTest.thrift
+ $(THRIFT) --gen d -nowarn $<
+
+stress_test_gen = $(addprefix gen-d/thrift/test/stress/, Service.d \
+ StressTest_types.d)
+
+$(stress_test_gen): $(top_srcdir)/test/StressTest.thrift
+ $(THRIFT) --gen d $<
+
+thrift_test_gen = $(addprefix gen-d/thrift/test/, SecondService.d \
+ ThriftTest.d ThriftTest_constants.d ThriftTest_types.d)
+
+$(thrift_test_gen): $(top_srcdir)/test/ThriftTest.thrift
+ $(THRIFT) --gen d $<
+
+
+# The actual test targets.
+# There just must be some way to reassign a variable without warnings in
+# Automake...
+targets__ = async_test client_pool_test serialization_benchmark \
+ stress_test_server thrift_test_client thrift_test_server transport_test
+ran_tests__ = client_pool_test \
+ transport_test \
+ async_test_runner.sh \
+ thrift_test_runner.sh
+
+libevent_dependent_targets = async_test_client client_pool_test \
+ stress_test_server thrift_test_server
+libevent_dependent_ran_tests = client_pool_test async_test_runner.sh thrift_test_runner.sh
+
+openssl_dependent_targets = async_test thrift_test_client thrift_test_server
+openssl_dependent_ran_tests = async_test_runner.sh thrift_test_runner.sh
+
+d_test_flags =
+
+if WITH_D_EVENT_TESTS
+d_test_flags += $(DMD_LIBEVENT_FLAGS) ../$(D_EVENT_LIB_NAME)
+targets_ = $(targets__)
+ran_tests_ = $(ran_tests__)
+else
+targets_ = $(filter-out $(libevent_dependent_targets), $(targets__))
+ran_tests_ = $(filter-out $(libevent_dependent_ran_tests), $(ran_tests__))
+endif
+
+if WITH_D_SSL_TESTS
+d_test_flags += $(DMD_OPENSSL_FLAGS) ../$(D_SSL_LIB_NAME)
+targets = $(targets_)
+ran_tests = $(ran_tests_)
+else
+targets = $(filter-out $(openssl_dependent_targets), $(targets_))
+ran_tests = $(filter-out $(openssl_dependent_ran_tests), $(ran_tests_))
+endif
+
+d_test_flags += -w -wi -O -release -inline -I$(top_srcdir)/lib/d/src -Igen-d \
+ $(top_builddir)/lib/d/$(D_LIB_NAME)
+
+
+async_test client_pool_test transport_test: %: %.d
+ $(DMD) $(d_test_flags) -of$@ $^
+
+serialization_benchmark: %: %.d $(debug_proto_gen)
+ $(DMD) $(d_test_flags) -of$@ $^
+
+stress_test_server: %: %.d test_utils.d $(stress_test_gen)
+ $(DMD) $(d_test_flags) -of$@ $^
+
+thrift_test_client: %: %.d thrift_test_common.d $(thrift_test_gen)
+ $(DMD) $(d_test_flags) -of$@ $^
+
+thrift_test_server: %: %.d thrift_test_common.d test_utils.d $(thrift_test_gen)
+ $(DMD) $(d_test_flags) -of$@ $^
+
+
+check-local: $(targets)
+
+clean-local:
+ $(RM) -rf gen-d $(targets) $(addsuffix .o, $(targets))
+
+
+# Tests ran as part of make check.
+
+async_test_runner.sh: async_test
+thrift_test_runner.sh: thrift_test_client thrift_test_server
+
+TESTS = $(ran_tests)
+
+precross: $(targets)
diff --git a/src/jaegertracing/thrift/lib/d/test/async_test.d b/src/jaegertracing/thrift/lib/d/test/async_test.d
new file mode 100644
index 000000000..51529ba86
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/async_test.d
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless enforced by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module async_test;
+
+import core.atomic;
+import core.sync.condition : Condition;
+import core.sync.mutex : Mutex;
+import core.thread : dur, Thread, ThreadGroup;
+import std.conv : text;
+import std.datetime;
+import std.getopt;
+import std.exception : collectException, enforce;
+import std.parallelism : TaskPool;
+import std.stdio;
+import std.string;
+import std.variant : Variant;
+import thrift.base;
+import thrift.async.base;
+import thrift.async.libevent;
+import thrift.async.socket;
+import thrift.async.ssl;
+import thrift.codegen.async_client;
+import thrift.codegen.async_client_pool;
+import thrift.codegen.base;
+import thrift.codegen.processor;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.server.base;
+import thrift.server.simple;
+import thrift.server.transport.socket;
+import thrift.server.transport.ssl;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.ssl;
+import thrift.util.cancellation;
+
+version (Posix) {
+ import core.stdc.signal;
+ import core.sys.posix.signal;
+
+ // Disable SIGPIPE because SSL server will write to broken socket after
+ // client disconnected (see TSSLSocket docs).
+ shared static this() {
+ signal(SIGPIPE, SIG_IGN);
+ }
+}
+
+interface AsyncTest {
+ string echo(string value);
+ string delayedEcho(string value, long milliseconds);
+
+ void fail(string reason);
+ void delayedFail(string reason, long milliseconds);
+
+ enum methodMeta = [
+ TMethodMeta("fail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]),
+ TMethodMeta("delayedFail", [], [TExceptionMeta("ate", 1, "AsyncTestException")])
+ ];
+ alias .AsyncTestException AsyncTestException;
+}
+
+class AsyncTestException : TException {
+ string reason;
+ mixin TStructHelpers!();
+}
+
+void main(string[] args) {
+ ushort port = 9090;
+ ushort managerCount = 2;
+ ushort serversPerManager = 5;
+ ushort threadsPerServer = 10;
+ uint iterations = 10;
+ bool ssl;
+ bool trace;
+
+ getopt(args,
+ "iterations", &iterations,
+ "managers", &managerCount,
+ "port", &port,
+ "servers-per-manager", &serversPerManager,
+ "ssl", &ssl,
+ "threads-per-server", &threadsPerServer,
+ "trace", &trace,
+ );
+
+ TTransportFactory clientTransportFactory;
+ TSSLContext serverSSLContext;
+ if (ssl) {
+ auto clientSSLContext = new TSSLContext();
+ with (clientSSLContext) {
+ authenticate = true;
+ ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+ loadTrustedCertificates("../../../test/keys/CA.pem");
+ }
+ clientTransportFactory = new TAsyncSSLSocketFactory(clientSSLContext);
+
+ serverSSLContext = new TSSLContext();
+ with (serverSSLContext) {
+ serverSide = true;
+ ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+ loadCertificate("../../../test/keys/server.crt");
+ loadPrivateKey("../../../test/keys/server.key");
+ }
+ } else {
+ clientTransportFactory = new TBufferedTransportFactory;
+ }
+
+
+ auto serverCancel = new TCancellationOrigin;
+ scope(exit) {
+ writeln("Triggering server shutdown...");
+ serverCancel.trigger();
+ writeln("done.");
+ }
+
+ auto managers = new TLibeventAsyncManager[managerCount];
+ scope (exit) foreach (ref m; managers) destroy(m);
+
+ auto clientsThreads = new ThreadGroup;
+ foreach (managerIndex, ref manager; managers) {
+ manager = new TLibeventAsyncManager;
+ foreach (serverIndex; 0 .. serversPerManager) {
+ auto currentPort = cast(ushort)
+ (port + managerIndex * serversPerManager + serverIndex);
+
+ // Start the server and wait until it is up and running.
+ auto servingMutex = new Mutex;
+ auto servingCondition = new Condition(servingMutex);
+ auto handler = new PreServeNotifyHandler(servingMutex, servingCondition);
+ synchronized (servingMutex) {
+ (new ServerThread!TSimpleServer(currentPort, serverSSLContext, trace,
+ serverCancel, handler)).start();
+ servingCondition.wait();
+ }
+
+ // We only run the timing tests for the first server on each async
+ // manager, so that we don't get spurious timing errors becaue of
+ // ordering issues.
+ auto runTimingTests = (serverIndex == 0);
+
+ auto c = new ClientsThread(manager, currentPort, clientTransportFactory,
+ threadsPerServer, iterations, runTimingTests, trace);
+ clientsThreads.add(c);
+ c.start();
+ }
+ }
+ clientsThreads.joinAll();
+}
+
+class AsyncTestHandler : AsyncTest {
+ this(bool trace) {
+ trace_ = trace;
+ }
+
+ override string echo(string value) {
+ if (trace_) writefln(`echo("%s")`, value);
+ return value;
+ }
+
+ override string delayedEcho(string value, long milliseconds) {
+ if (trace_) writef(`delayedEcho("%s", %s ms)... `, value, milliseconds);
+ Thread.sleep(dur!"msecs"(milliseconds));
+ if (trace_) writeln("returning.");
+
+ return value;
+ }
+
+ override void fail(string reason) {
+ if (trace_) writefln(`fail("%s")`, reason);
+ auto ate = new AsyncTestException;
+ ate.reason = reason;
+ throw ate;
+ }
+
+ override void delayedFail(string reason, long milliseconds) {
+ if (trace_) writef(`delayedFail("%s", %s ms)... `, reason, milliseconds);
+ Thread.sleep(dur!"msecs"(milliseconds));
+ if (trace_) writeln("returning.");
+
+ auto ate = new AsyncTestException;
+ ate.reason = reason;
+ throw ate;
+ }
+
+private:
+ bool trace_;
+ AsyncTestException ate_;
+}
+
+class PreServeNotifyHandler : TServerEventHandler {
+ this(Mutex servingMutex, Condition servingCondition) {
+ servingMutex_ = servingMutex;
+ servingCondition_ = servingCondition;
+ }
+
+ void preServe() {
+ synchronized (servingMutex_) {
+ servingCondition_.notifyAll();
+ }
+ }
+ Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
+ void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
+ void preProcess(Variant serverContext, TTransport transport) {}
+
+private:
+ Mutex servingMutex_;
+ Condition servingCondition_;
+}
+
+class ServerThread(ServerType) : Thread {
+ this(ushort port, TSSLContext sslContext, bool trace,
+ TCancellation cancellation, TServerEventHandler eventHandler
+ ) {
+ port_ = port;
+ sslContext_ = sslContext;
+ trace_ = trace;
+ cancellation_ = cancellation;
+ eventHandler_ = eventHandler;
+
+ super(&run);
+ }
+
+ void run() {
+ TServerSocket serverSocket;
+ if (sslContext_) {
+ serverSocket = new TSSLServerSocket(port_, sslContext_);
+ } else {
+ serverSocket = new TServerSocket(port_);
+ }
+ auto transportFactory = new TBufferedTransportFactory;
+ auto protocolFactory = new TBinaryProtocolFactory!();
+ auto processor = new TServiceProcessor!AsyncTest(new AsyncTestHandler(trace_));
+
+ auto server = new ServerType(processor, serverSocket, transportFactory,
+ protocolFactory);
+ server.eventHandler = eventHandler_;
+
+ writefln("Starting server on port %s...", port_);
+ server.serve(cancellation_);
+ writefln("Server thread on port %s done.", port_);
+ }
+
+private:
+ ushort port_;
+ bool trace_;
+ TCancellation cancellation_;
+ TSSLContext sslContext_;
+ TServerEventHandler eventHandler_;
+}
+
+class ClientsThread : Thread {
+ this(TAsyncSocketManager manager, ushort port, TTransportFactory tf,
+ ushort threads, uint iterations, bool runTimingTests, bool trace
+ ) {
+ manager_ = manager;
+ port_ = port;
+ transportFactory_ = tf;
+ threads_ = threads;
+ iterations_ = iterations;
+ runTimingTests_ = runTimingTests;
+ trace_ = trace;
+ super(&run);
+ }
+
+ void run() {
+ auto transport = new TAsyncSocket(manager_, "localhost", port_);
+
+ {
+ auto client = new TAsyncClient!AsyncTest(
+ transport,
+ transportFactory_,
+ new TBinaryProtocolFactory!()
+ );
+ transport.open();
+ auto clientThreads = new ThreadGroup;
+ foreach (clientId; 0 .. threads_) {
+ clientThreads.create({
+ auto c = clientId;
+ return {
+ foreach (i; 0 .. iterations_) {
+ immutable id = text(port_, ":", c, ":", i);
+
+ {
+ if (trace_) writefln(`Calling echo("%s")... `, id);
+ auto a = client.echo(id);
+ enforce(a == id);
+ if (trace_) writefln(`echo("%s") done.`, id);
+ }
+
+ {
+ if (trace_) writefln(`Calling fail("%s")... `, id);
+ auto a = cast(AsyncTestException)collectException(client.fail(id).waitGet());
+ enforce(a && a.reason == id);
+ if (trace_) writefln(`fail("%s") done.`, id);
+ }
+ }
+ };
+ }());
+ }
+ clientThreads.joinAll();
+ transport.close();
+ }
+
+ if (runTimingTests_) {
+ auto client = new TAsyncClient!AsyncTest(
+ transport,
+ transportFactory_,
+ new TBinaryProtocolFactory!TBufferedTransport
+ );
+
+ // Temporarily redirect error logs to stdout, as SSL errors on the server
+ // side are expected when the client terminates aburptly (as is the case
+ // in the timeout test).
+ auto oldErrorLogSink = g_errorLogSink;
+ g_errorLogSink = g_infoLogSink;
+ scope (exit) g_errorLogSink = oldErrorLogSink;
+
+ foreach (i; 0 .. iterations_) {
+ transport.open();
+
+ immutable id = text(port_, ":", i);
+
+ {
+ if (trace_) writefln(`Calling delayedEcho("%s", 100 ms)...`, id);
+ auto a = client.delayedEcho(id, 100);
+ enforce(!a.completion.wait(dur!"usecs"(1)),
+ text("wait() succeeded early (", a.get(), ", ", id, ")."));
+ enforce(!a.completion.wait(dur!"usecs"(1)),
+ text("wait() succeeded early (", a.get(), ", ", id, ")."));
+ enforce(a.completion.wait(dur!"msecs"(200)),
+ text("wait() didn't succeed as expected (", id, ")."));
+ enforce(a.get() == id);
+ if (trace_) writefln(`... delayedEcho("%s") done.`, id);
+ }
+
+ {
+ if (trace_) writefln(`Calling delayedFail("%s", 100 ms)... `, id);
+ auto a = client.delayedFail(id, 100);
+ enforce(!a.completion.wait(dur!"usecs"(1)),
+ text("wait() succeeded early (", id, ", ", collectException(a.get()), ")."));
+ enforce(!a.completion.wait(dur!"usecs"(1)),
+ text("wait() succeeded early (", id, ", ", collectException(a.get()), ")."));
+ enforce(a.completion.wait(dur!"msecs"(200)),
+ text("wait() didn't succeed as expected (", id, ")."));
+ auto e = cast(AsyncTestException)collectException(a.get());
+ enforce(e && e.reason == id);
+ if (trace_) writefln(`... delayedFail("%s") done.`, id);
+ }
+
+ {
+ transport.recvTimeout = dur!"msecs"(50);
+
+ if (trace_) write(`Calling delayedEcho("socketTimeout", 100 ms)... `);
+ auto a = client.delayedEcho("socketTimeout", 100);
+ auto e = cast(TTransportException)collectException(a.waitGet());
+ enforce(e, text("Operation didn't fail as expected (", id, ")."));
+ enforce(e.type == TTransportException.Type.TIMED_OUT,
+ text("Wrong timeout exception type (", id, "): ", e));
+ if (trace_) writeln(`timed out as expected.`);
+
+ // Wait until the server thread reset before the next iteration.
+ Thread.sleep(dur!"msecs"(50));
+ transport.recvTimeout = dur!"hnsecs"(0);
+ }
+
+ transport.close();
+ }
+ }
+
+ writefln("Clients thread for port %s done.", port_);
+ }
+
+ TAsyncSocketManager manager_;
+ ushort port_;
+ TTransportFactory transportFactory_;
+ ushort threads_;
+ uint iterations_;
+ bool runTimingTests_;
+ bool trace_;
+}
diff --git a/src/jaegertracing/thrift/lib/d/test/async_test_runner.sh b/src/jaegertracing/thrift/lib/d/test/async_test_runner.sh
new file mode 100755
index 000000000..d56654f50
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/async_test_runner.sh
@@ -0,0 +1,31 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+CUR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+
+# Runs the async test in both SSL and non-SSL mode.
+${CUR}/async_test > /dev/null || exit 1
+echo "Non-SSL tests done."
+
+# THRIFT-4905: disabled the following test as it deadlocks / hangs
+# ${CUR}/async_test --ssl > /dev/null || exit 1
+# echo "SSL tests done."
+echo "THRIFT-4905: SSL tests are disabled. Fix them."
diff --git a/src/jaegertracing/thrift/lib/d/test/client_pool_test.d b/src/jaegertracing/thrift/lib/d/test/client_pool_test.d
new file mode 100644
index 000000000..b24c97afd
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/client_pool_test.d
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module client_pool_test;
+
+import core.sync.semaphore : Semaphore;
+import core.time : Duration, dur;
+import core.thread : Thread;
+import std.algorithm;
+import std.array;
+import std.conv;
+import std.exception;
+import std.getopt;
+import std.range;
+import std.stdio;
+import std.typecons;
+import std.variant : Variant;
+import thrift.base;
+import thrift.async.libevent;
+import thrift.async.socket;
+import thrift.codegen.base;
+import thrift.codegen.async_client;
+import thrift.codegen.async_client_pool;
+import thrift.codegen.client;
+import thrift.codegen.client_pool;
+import thrift.codegen.processor;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.server.base;
+import thrift.server.simple;
+import thrift.server.transport.socket;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.socket;
+import thrift.util.cancellation;
+import thrift.util.future;
+
+// We use this as our RPC-layer exception here to make sure socket/… problems
+// (that would usually considered to be RPC layer faults) cause the tests to
+// fail, even though we are testing the RPC exception handling.
+class TestServiceException : TException {
+ int port;
+}
+
+interface TestService {
+ int getPort();
+ alias .TestServiceException TestServiceException;
+ enum methodMeta = [TMethodMeta("getPort", [],
+ [TExceptionMeta("a", 1, "TestServiceException")])];
+}
+
+// Use some derived service, just to check that the pools handle inheritance
+// correctly.
+interface ExTestService : TestService {
+ int[] getPortInArray();
+ enum methodMeta = [TMethodMeta("getPortInArray", [],
+ [TExceptionMeta("a", 1, "TestServiceException")])];
+}
+
+class ExTestHandler : ExTestService {
+ this(ushort port, Duration delay, bool failing, bool trace) {
+ this.port = port;
+ this.delay = delay;
+ this.failing = failing;
+ this.trace = trace;
+ }
+
+ override int getPort() {
+ if (trace) {
+ stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
+ delay, failing);
+ }
+ sleep();
+ failIfEnabled();
+ return port;
+ }
+
+ override int[] getPortInArray() {
+ return [getPort()];
+ }
+
+ ushort port;
+ Duration delay;
+ bool failing;
+ bool trace;
+
+private:
+ void sleep() {
+ if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
+ }
+
+ void failIfEnabled() {
+ if (!failing) return;
+
+ auto e = new TestServiceException;
+ e.port = port;
+ throw e;
+ }
+}
+
+class ServerPreServeHandler : TServerEventHandler {
+ this(Semaphore sem) {
+ sem_ = sem;
+ }
+
+ override void preServe() {
+ sem_.notify();
+ }
+
+ Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
+ void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
+ void preProcess(Variant serverContext, TTransport transport) {}
+
+private:
+ Semaphore sem_;
+}
+
+class ServerThread : Thread {
+ this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) {
+ super(&run);
+ handler_ = handler;
+ cancellation_ = cancellation;
+ serverHandler_ = serverHandler;
+ }
+private:
+ void run() {
+ try {
+ auto protocolFactory = new TBinaryProtocolFactory!();
+ auto processor = new TServiceProcessor!ExTestService(handler_);
+ auto serverTransport = new TServerSocket(handler_.port);
+ serverTransport.recvTimeout = dur!"seconds"(3);
+ auto transportFactory = new TBufferedTransportFactory;
+
+ auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory);
+ server.eventHandler = serverHandler_;
+ server.serve(cancellation_);
+ } catch (Exception e) {
+ writefln("Server thread on port %s failed: %s", handler_.port, e);
+ }
+ }
+
+ ExTestHandler handler_;
+ ServerPreServeHandler serverHandler_;
+ TCancellation cancellation_;
+}
+
+void main(string[] args) {
+ bool trace;
+ ushort port = 9090;
+ getopt(args, "port", &port, "trace", &trace);
+
+ auto serverCancellation = new TCancellationOrigin;
+ scope (exit) serverCancellation.trigger();
+
+ immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
+
+ // semaphore that will be incremented whenever each server thread has bound and started listening
+ Semaphore sem = new Semaphore(0);
+
+version (none) {
+ // Cannot use this due to multiple DMD @@BUG@@s:
+ // 1. »function D main is a nested function and cannot be accessed from array«
+ // when calling array() on the result of the outer map() – would have to
+ // manually do the eager evaluation/array conversion.
+ // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
+ // can be worked around by calling array() on the map result first.
+ // 3. Even when using the workarounds for the last two points, the DMD-built
+ // executable crashes when building without (sic!) inlining enabled,
+ // the backtrace points into the first delegate literal.
+ auto handlers = array(map!((args){
+ return new ExTestHandler(args._0, args._1, args._2, trace);
+ })(zip(
+ ports,
+ map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
+ [false, false, false, true, true, true]
+ )));
+} else {
+ auto handlers = [
+ new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
+ new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
+ new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
+ new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
+ new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
+ new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
+ ];
+}
+
+ // Fire up the server threads.
+ foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start();
+
+ // wait until all the handlers signal that they're ready to serve
+ foreach (h; handlers) (sem.wait(dur!`seconds`(1)));
+
+ syncClientPoolTest(ports, handlers);
+ asyncClientPoolTest(ports, handlers);
+ asyncFastestClientPoolTest(ports, handlers);
+ asyncAggregatorTest(ports, handlers);
+}
+
+
+void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+ auto clients = array(map!((a){
+ return cast(TClientBase!ExTestService)tClient!ExTestService(
+ tBinaryProtocol(new TSocket("127.0.0.1", a))
+ );
+ })(ports));
+
+ scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
+
+ // Try the case where the first client succeeds.
+ {
+ enforce(makePool(clients).getPort() == ports[0]);
+ }
+
+ // Try the case where all clients fail.
+ {
+ auto pool = makePool(clients[3 .. $]);
+ auto e = cast(TCompoundOperationException)collectException(pool.getPort());
+ enforce(e);
+ enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+ ports[3 .. $]));
+ }
+
+ // Try the case where the first clients fail, but a later one succeeds.
+ {
+ auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
+ enforce(pool.getPortInArray() == [ports[0]]);
+ }
+
+ // Make sure a client is properly deactivated when it has failed too often.
+ {
+ auto pool = makePool(clients);
+ pool.faultDisableCount = 1;
+ pool.faultDisableDuration = dur!"msecs"(50);
+
+ handlers[0].failing = true;
+ enforce(pool.getPort() == ports[1]);
+
+ handlers[0].failing = false;
+ enforce(pool.getPort() == ports[1]);
+
+ Thread.sleep(dur!"msecs"(50));
+ enforce(pool.getPort() == ports[0]);
+ }
+}
+
+auto makePool(TClientBase!ExTestService[] clients) {
+ auto p = tClientPool(clients);
+ p.permuteClients = false;
+ p.rpcFaultFilter = (Exception e) {
+ return (cast(TestServiceException)e !is null);
+ };
+ return p;
+}
+
+
+void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+ auto manager = new TLibeventAsyncManager;
+ scope (exit) manager.stop(dur!"hnsecs"(0));
+
+ auto clients = makeAsyncClients(manager, ports);
+ scope(exit) foreach (c; clients) c.transport.close();
+
+ // Try the case where the first client succeeds.
+ {
+ enforce(makeAsyncPool(clients).getPort() == ports[0]);
+ }
+
+ // Try the case where all clients fail.
+ {
+ auto pool = makeAsyncPool(clients[3 .. $]);
+ auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
+ enforce(e);
+ enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+ ports[3 .. $]));
+ }
+
+ // Try the case where the first clients fail, but a later one succeeds.
+ {
+ auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
+ enforce(pool.getPortInArray() == [ports[0]]);
+ }
+
+ // Make sure a client is properly deactivated when it has failed too often.
+ {
+ auto pool = makeAsyncPool(clients);
+ pool.faultDisableCount = 1;
+ pool.faultDisableDuration = dur!"msecs"(50);
+
+ handlers[0].failing = true;
+ enforce(pool.getPort() == ports[1]);
+
+ handlers[0].failing = false;
+ enforce(pool.getPort() == ports[1]);
+
+ Thread.sleep(dur!"msecs"(50));
+ enforce(pool.getPort() == ports[0]);
+ }
+}
+
+auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
+ auto p = tAsyncClientPool(clients);
+ p.permuteClients = false;
+ p.rpcFaultFilter = (Exception e) {
+ return (cast(TestServiceException)e !is null);
+ };
+ return p;
+}
+
+auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
+ // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
+ // to »function D main is a nested function and cannot be accessed from array«.
+ // Thus, we manually do the array conversion.
+ auto lazyClients = map!((a){
+ return new TAsyncClient!ExTestService(
+ new TAsyncSocket(manager, "127.0.0.1", a),
+ new TBufferedTransportFactory,
+ new TBinaryProtocolFactory!(TBufferedTransport)
+ );
+ })(ports);
+ TAsyncClientBase!ExTestService[] clients;
+ foreach (c; lazyClients) clients ~= c;
+ return clients;
+}
+
+
+void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+ auto manager = new TLibeventAsyncManager;
+ scope (exit) manager.stop(dur!"hnsecs"(0));
+
+ auto clients = makeAsyncClients(manager, ports);
+ scope(exit) foreach (c; clients) c.transport.close();
+
+ // Make sure the fastest client wins, even if they are called in some other
+ // order.
+ {
+ auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
+ enforce(result == ports[0]);
+ }
+
+ // Try the case where all clients fail.
+ {
+ auto pool = makeAsyncFastestPool(clients[3 .. $]);
+ auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
+ enforce(e);
+ enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
+ ports[3 .. $]));
+ }
+
+ // Try the case where the first clients fail, but a later one succeeds.
+ {
+ auto pool = makeAsyncFastestPool(clients[1 .. $]);
+ enforce(pool.getPortInArray() == [ports[1]]);
+ }
+}
+
+auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
+ auto p = tAsyncFastestClientPool(clients);
+ p.rpcFaultFilter = (Exception e) {
+ return (cast(TestServiceException)e !is null);
+ };
+ return p;
+}
+
+
+void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
+ auto manager = new TLibeventAsyncManager;
+ scope (exit) manager.stop(dur!"hnsecs"(0));
+
+ auto clients = makeAsyncClients(manager, ports);
+ scope(exit) foreach (c; clients) c.transport.close();
+
+ auto aggregator = tAsyncAggregator(
+ cast(TAsyncClientBase!ExTestService[])clients);
+
+ // Test aggregator range interface.
+ {
+ auto range = aggregator.getPort().range(dur!"msecs"(50));
+ enforce(equal(range, ports[0 .. 2][]));
+ enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
+ ports[3 .. $ - 1]));
+ enforce(range.completedCount == 4);
+ }
+
+ // Test default accumulator for scalars.
+ {
+ auto fullResult = aggregator.getPort().accumulate();
+ enforce(fullResult.waitGet() == ports[0 .. 3]);
+
+ auto partialResult = aggregator.getPort().accumulate();
+ Thread.sleep(dur!"msecs"(20));
+ enforce(partialResult.finishGet() == ports[0 .. 2]);
+
+ }
+
+ // Test default accumulator for arrays.
+ {
+ auto fullResult = aggregator.getPortInArray().accumulate();
+ enforce(fullResult.waitGet() == ports[0 .. 3]);
+
+ auto partialResult = aggregator.getPortInArray().accumulate();
+ Thread.sleep(dur!"msecs"(20));
+ enforce(partialResult.finishGet() == ports[0 .. 2]);
+ }
+
+ // Test custom accumulator.
+ {
+ auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
+ return reduce!"a + b"(results);
+ })();
+ enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
+
+ auto partialResult = aggregator.getPort().accumulate!(
+ function(int[] results, Exception[] exceptions) {
+ // Return a tuple of the parameters so we can check them outside of
+ // this function (to verify the values, we need access to »ports«, but
+ // due to DMD @@BUG5710@@, we can't use a delegate literal).f
+ return tuple(results, exceptions);
+ }
+ )();
+ Thread.sleep(dur!"msecs"(20));
+ auto resultTuple = partialResult.finishGet();
+ enforce(resultTuple[0] == ports[0 .. 2]);
+ enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]),
+ ports[3 .. $ - 1]));
+ }
+}
diff --git a/src/jaegertracing/thrift/lib/d/test/serialization_benchmark.d b/src/jaegertracing/thrift/lib/d/test/serialization_benchmark.d
new file mode 100644
index 000000000..40d048094
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/serialization_benchmark.d
@@ -0,0 +1,70 @@
+/**
+ * An implementation of the mini serialization benchmark also available for
+ * C++ and Java.
+ *
+ * For meaningful results, you might want to make sure that
+ * the Thrift library is compiled with release build flags,
+ * e.g. by including the source files with the build instead
+ * of linking libthriftd:
+ *
+ dmd -w -O -release -inline -I../src -Igen-d -ofserialization_benchmark \
+ $(find ../src/thrift -name '*.d' -not -name index.d) \
+ gen-d/DebugProtoTest_types.d serialization_benchmark.d
+ */
+module serialization_benchmark;
+
+import std.datetime.stopwatch : AutoStart, StopWatch;
+import std.math : PI;
+import std.stdio;
+import thrift.protocol.binary;
+import thrift.transport.memory;
+import thrift.transport.range;
+import DebugProtoTest_types;
+
+void main() {
+ auto buf = new TMemoryBuffer;
+ enum ITERATIONS = 10_000_000;
+
+ {
+ auto ooe = OneOfEach();
+ ooe.im_true = true;
+ ooe.im_false = false;
+ ooe.a_bite = 0x7f;
+ ooe.integer16 = 27_000;
+ ooe.integer32 = 1 << 24;
+ ooe.integer64 = 6_000_000_000;
+ ooe.double_precision = PI;
+ ooe.some_characters = "JSON THIS! \"\1";
+ ooe.zomg_unicode = "\xd7\n\a\t";
+ ooe.base64 = "\1\2\3\255";
+
+ auto prot = tBinaryProtocol(buf);
+ auto sw = StopWatch(AutoStart.yes);
+ foreach (i; 0 .. ITERATIONS) {
+ buf.reset(120);
+ ooe.write(prot);
+ }
+ sw.stop();
+
+ auto msecs = sw.peek().total!"msecs";
+ writefln("Write: %s ms (%s kHz)", msecs, ITERATIONS / msecs);
+ }
+
+ auto data = buf.getContents().dup;
+
+ {
+ auto readBuf = tInputRangeTransport(data);
+ auto prot = tBinaryProtocol(readBuf);
+ auto ooe = OneOfEach();
+
+ auto sw = StopWatch(AutoStart.yes);
+ foreach (i; 0 .. ITERATIONS) {
+ readBuf.reset(data);
+ ooe.read(prot);
+ }
+ sw.stop();
+
+ auto msecs = sw.peek().total!"msecs";
+ writefln(" Read: %s ms (%s kHz)", msecs, ITERATIONS / msecs);
+ }
+}
diff --git a/src/jaegertracing/thrift/lib/d/test/stress_test_server.d b/src/jaegertracing/thrift/lib/d/test/stress_test_server.d
new file mode 100644
index 000000000..ddda098b3
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/stress_test_server.d
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module stress_test_server;
+
+import std.getopt;
+import std.parallelism : totalCPUs;
+import std.stdio;
+import std.typetuple;
+import thrift.codegen.processor;
+import thrift.protocol.binary;
+import thrift.server.base;
+import thrift.server.transport.socket;
+import thrift.transport.buffered;
+import thrift.transport.memory;
+import thrift.transport.socket;
+import thrift.util.hashset;
+import test_utils;
+
+import thrift.test.stress.Service;
+
+class ServiceHandler : Service {
+ void echoVoid() { return; }
+ byte echoByte(byte arg) { return arg; }
+ int echoI32(int arg) { return arg; }
+ long echoI64(long arg) { return arg; }
+ byte[] echoList(byte[] arg) { return arg; }
+ HashSet!byte echoSet(HashSet!byte arg) { return arg; }
+ byte[byte] echoMap(byte[byte] arg) { return arg; }
+
+ string echoString(string arg) {
+ if (arg != "hello") {
+ stderr.writefln(`Wrong string received: %s instead of "hello"`, arg);
+ throw new Exception("Wrong string received.");
+ }
+ return arg;
+ }
+}
+
+void main(string[] args) {
+ ushort port = 9091;
+ auto serverType = ServerType.threaded;
+ TransportType transportType;
+ size_t numIOThreads = 1;
+ size_t taskPoolSize = totalCPUs;
+
+ getopt(args, "port", &port, "server-type", &serverType,
+ "transport-type", &transportType, "task-pool-size", &taskPoolSize,
+ "num-io-threads", &numIOThreads);
+
+ alias TypeTuple!(TBufferedTransport, TMemoryBuffer) AvailableTransports;
+
+ auto processor = new TServiceProcessor!(Service,
+ staticMap!(TBinaryProtocol, AvailableTransports))(new ServiceHandler());
+ auto serverSocket = new TServerSocket(port);
+ auto transportFactory = createTransportFactory(transportType);
+ auto protocolFactory = new TBinaryProtocolFactory!AvailableTransports;
+
+ auto server = createServer(serverType, taskPoolSize, numIOThreads,
+ processor, serverSocket, transportFactory, protocolFactory);
+
+ writefln("Starting %s %s StressTest server on port %s...", transportType,
+ serverType, port);
+ server.serve();
+ writeln("done.");
+}
diff --git a/src/jaegertracing/thrift/lib/d/test/test_utils.d b/src/jaegertracing/thrift/lib/d/test/test_utils.d
new file mode 100644
index 000000000..174100b79
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/test_utils.d
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Various helpers used by more than a single test.
+ */
+module test_utils;
+
+import std.parallelism : TaskPool;
+import thrift.protocol.base;
+import thrift.protocol.processor;
+import thrift.server.base;
+import thrift.server.nonblocking;
+import thrift.server.simple;
+import thrift.server.taskpool;
+import thrift.server.threaded;
+import thrift.server.transport.socket;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.framed;
+import thrift.transport.http;
+
+// This is a likely victim of @@BUG4744@@ when used with command argument
+// parsing.
+enum ServerType {
+ simple,
+ nonblocking,
+ pooledNonblocking,
+ taskpool,
+ threaded
+}
+
+TServer createServer(ServerType type, size_t taskPoolSize, size_t numIOThreads,
+ TProcessor processor, TServerSocket serverTransport,
+ TTransportFactory transportFactory, TProtocolFactory protocolFactory)
+{
+ final switch (type) {
+ case ServerType.simple:
+ return new TSimpleServer(processor, serverTransport,
+ transportFactory, protocolFactory);
+ case ServerType.nonblocking:
+ auto nb = new TNonblockingServer(processor, serverTransport.port,
+ transportFactory, protocolFactory);
+ nb.numIOThreads = numIOThreads;
+ return nb;
+ case ServerType.pooledNonblocking:
+ auto nb = new TNonblockingServer(processor, serverTransport.port,
+ transportFactory, protocolFactory, new TaskPool(taskPoolSize));
+ nb.numIOThreads = numIOThreads;
+ return nb;
+ case ServerType.taskpool:
+ auto tps = new TTaskPoolServer(processor, serverTransport,
+ transportFactory, protocolFactory);
+ tps.taskPool = new TaskPool(taskPoolSize);
+ return tps;
+ case ServerType.threaded:
+ return new TThreadedServer(processor, serverTransport,
+ transportFactory, protocolFactory);
+ }
+}
+
+enum TransportType {
+ buffered,
+ framed,
+ http,
+ raw
+}
+
+TTransportFactory createTransportFactory(TransportType type) {
+ final switch (type) {
+ case TransportType.buffered:
+ return new TBufferedTransportFactory;
+ case TransportType.framed:
+ return new TFramedTransportFactory;
+ case TransportType.http:
+ return new TServerHttpTransportFactory;
+ case TransportType.raw:
+ return new TTransportFactory;
+ }
+}
diff --git a/src/jaegertracing/thrift/lib/d/test/thrift_test_client.d b/src/jaegertracing/thrift/lib/d/test/thrift_test_client.d
new file mode 100644
index 000000000..49419f71a
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/thrift_test_client.d
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift_test_client;
+
+import std.conv;
+import std.datetime;
+import std.exception : enforce;
+import std.getopt;
+import std.stdio;
+import std.string;
+import std.traits;
+import thrift.base;
+import thrift.codegen.client;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.protocol.compact;
+import thrift.protocol.json;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.framed;
+import thrift.transport.http;
+import thrift.transport.socket;
+import thrift.transport.ssl;
+import thrift.util.hashset;
+
+import thrift_test_common;
+import thrift.test.ThriftTest;
+import thrift.test.ThriftTest_types;
+
+enum TransportType {
+ buffered,
+ framed,
+ http,
+ raw
+}
+
+TProtocol createProtocol(T)(T trans, ProtocolType type) {
+ final switch (type) {
+ case ProtocolType.binary:
+ return tBinaryProtocol(trans);
+ case ProtocolType.compact:
+ return tCompactProtocol(trans);
+ case ProtocolType.json:
+ return tJsonProtocol(trans);
+ }
+}
+
+void main(string[] args) {
+ string host = "localhost";
+ ushort port = 9090;
+ uint numTests = 1;
+ bool ssl;
+ ProtocolType protocolType;
+ TransportType transportType;
+ bool trace;
+
+ getopt(args,
+ "numTests|n", &numTests,
+ "protocol", &protocolType,
+ "ssl", &ssl,
+ "transport", &transportType,
+ "trace", &trace,
+ "port", &port,
+ "host", (string _, string value) {
+ auto parts = split(value, ":");
+ if (parts.length > 1) {
+ // IPv6 addresses can contain colons, so take the last part for the
+ // port.
+ host = join(parts[0 .. $ - 1], ":");
+ port = to!ushort(parts[$ - 1]);
+ } else {
+ host = value;
+ }
+ }
+ );
+ port = to!ushort(port);
+
+ TSocket socket;
+ if (ssl) {
+ auto sslContext = new TSSLContext();
+ sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+ sslContext.authenticate = true;
+ sslContext.loadTrustedCertificates("../../../test/keys/CA.pem");
+ socket = new TSSLSocket(sslContext, host, port);
+ } else {
+ socket = new TSocket(host, port);
+ }
+
+ TProtocol protocol;
+ final switch (transportType) {
+ case TransportType.buffered:
+ protocol = createProtocol(new TBufferedTransport(socket), protocolType);
+ break;
+ case TransportType.framed:
+ protocol = createProtocol(new TFramedTransport(socket), protocolType);
+ break;
+ case TransportType.http:
+ protocol = createProtocol(
+ new TClientHttpTransport(socket, host, "/service"), protocolType);
+ break;
+ case TransportType.raw:
+ protocol = createProtocol(socket, protocolType);
+ break;
+ }
+
+ auto client = tClient!ThriftTest(protocol);
+
+ ulong time_min;
+ ulong time_max;
+ ulong time_tot;
+
+ StopWatch sw;
+ foreach(test; 0 .. numTests) {
+ sw.start();
+
+ protocol.transport.open();
+
+ if (trace) writefln("Test #%s, connect %s:%s", test + 1, host, port);
+
+ if (trace) write("testVoid()");
+ client.testVoid();
+ if (trace) writeln(" = void");
+
+ if (trace) write("testString(\"Test\")");
+ string s = client.testString("Test");
+ if (trace) writefln(" = \"%s\"", s);
+ enforce(s == "Test");
+
+ if (trace) write("testByte(1)");
+ byte u8 = client.testByte(1);
+ if (trace) writefln(" = %s", u8);
+ enforce(u8 == 1);
+
+ if (trace) write("testI32(-1)");
+ int i32 = client.testI32(-1);
+ if (trace) writefln(" = %s", i32);
+ enforce(i32 == -1);
+
+ if (trace) write("testI64(-34359738368)");
+ long i64 = client.testI64(-34359738368L);
+ if (trace) writefln(" = %s", i64);
+ enforce(i64 == -34359738368L);
+
+ if (trace) write("testDouble(-5.2098523)");
+ double dub = client.testDouble(-5.2098523);
+ if (trace) writefln(" = %s", dub);
+ enforce(dub == -5.2098523);
+
+ // TODO: add testBinary() call
+
+ Xtruct out1;
+ out1.string_thing = "Zero";
+ out1.byte_thing = 1;
+ out1.i32_thing = -3;
+ out1.i64_thing = -5;
+ if (trace) writef("testStruct(%s)", out1);
+ auto in1 = client.testStruct(out1);
+ if (trace) writefln(" = %s", in1);
+ enforce(in1 == out1);
+
+ if (trace) write("testNest({1, {\"Zero\", 1, -3, -5}), 5}");
+ Xtruct2 out2;
+ out2.byte_thing = 1;
+ out2.struct_thing = out1;
+ out2.i32_thing = 5;
+ auto in2 = client.testNest(out2);
+ in1 = in2.struct_thing;
+ if (trace) writefln(" = {%s, {\"%s\", %s, %s, %s}, %s}", in2.byte_thing,
+ in1.string_thing, in1.byte_thing, in1.i32_thing, in1.i64_thing,
+ in2.i32_thing);
+ enforce(in2 == out2);
+
+ int[int] mapout;
+ for (int i = 0; i < 5; ++i) {
+ mapout[i] = i - 10;
+ }
+ if (trace) writef("testMap({%s})", mapout);
+ auto mapin = client.testMap(mapout);
+ if (trace) writefln(" = {%s}", mapin);
+ enforce(mapin == mapout);
+
+ auto setout = new HashSet!int;
+ for (int i = -2; i < 3; ++i) {
+ setout ~= i;
+ }
+ if (trace) writef("testSet(%s)", setout);
+ auto setin = client.testSet(setout);
+ if (trace) writefln(" = %s", setin);
+ enforce(setin == setout);
+
+ int[] listout;
+ for (int i = -2; i < 3; ++i) {
+ listout ~= i;
+ }
+ if (trace) writef("testList(%s)", listout);
+ auto listin = client.testList(listout);
+ if (trace) writefln(" = %s", listin);
+ enforce(listin == listout);
+
+ {
+ if (trace) write("testEnum(ONE)");
+ auto ret = client.testEnum(Numberz.ONE);
+ if (trace) writefln(" = %s", ret);
+ enforce(ret == Numberz.ONE);
+
+ if (trace) write("testEnum(TWO)");
+ ret = client.testEnum(Numberz.TWO);
+ if (trace) writefln(" = %s", ret);
+ enforce(ret == Numberz.TWO);
+
+ if (trace) write("testEnum(THREE)");
+ ret = client.testEnum(Numberz.THREE);
+ if (trace) writefln(" = %s", ret);
+ enforce(ret == Numberz.THREE);
+
+ if (trace) write("testEnum(FIVE)");
+ ret = client.testEnum(Numberz.FIVE);
+ if (trace) writefln(" = %s", ret);
+ enforce(ret == Numberz.FIVE);
+
+ if (trace) write("testEnum(EIGHT)");
+ ret = client.testEnum(Numberz.EIGHT);
+ if (trace) writefln(" = %s", ret);
+ enforce(ret == Numberz.EIGHT);
+ }
+
+ if (trace) write("testTypedef(309858235082523)");
+ UserId uid = client.testTypedef(309858235082523L);
+ if (trace) writefln(" = %s", uid);
+ enforce(uid == 309858235082523L);
+
+ if (trace) write("testMapMap(1)");
+ auto mm = client.testMapMap(1);
+ if (trace) writefln(" = {%s}", mm);
+ // Simply doing == doesn't seem to work for nested AAs.
+ foreach (key, value; mm) {
+ enforce(testMapMapReturn[key] == value);
+ }
+ foreach (key, value; testMapMapReturn) {
+ enforce(mm[key] == value);
+ }
+
+ Insanity insane;
+ insane.userMap[Numberz.FIVE] = 5000;
+ Xtruct truck;
+ truck.string_thing = "Truck";
+ truck.byte_thing = 8;
+ truck.i32_thing = 8;
+ truck.i64_thing = 8;
+ insane.xtructs ~= truck;
+ if (trace) write("testInsanity()");
+ auto whoa = client.testInsanity(insane);
+ if (trace) writefln(" = %s", whoa);
+
+ // Commented for now, this is cumbersome to write without opEqual getting
+ // called on AA comparison.
+ // enforce(whoa == testInsanityReturn);
+
+ {
+ try {
+ if (trace) write("client.testException(\"Xception\") =>");
+ client.testException("Xception");
+ if (trace) writeln(" void\nFAILURE");
+ throw new Exception("testException failed.");
+ } catch (Xception e) {
+ if (trace) writefln(" {%s, \"%s\"}", e.errorCode, e.message);
+ }
+
+ try {
+ if (trace) write("client.testException(\"TException\") =>");
+ client.testException("Xception");
+ if (trace) writeln(" void\nFAILURE");
+ throw new Exception("testException failed.");
+ } catch (TException e) {
+ if (trace) writefln(" {%s}", e.msg);
+ }
+
+ try {
+ if (trace) write("client.testException(\"success\") =>");
+ client.testException("success");
+ if (trace) writeln(" void");
+ } catch (Exception e) {
+ if (trace) writeln(" exception\nFAILURE");
+ throw new Exception("testException failed.");
+ }
+ }
+
+ {
+ try {
+ if (trace) write("client.testMultiException(\"Xception\", \"test 1\") =>");
+ auto result = client.testMultiException("Xception", "test 1");
+ if (trace) writeln(" result\nFAILURE");
+ throw new Exception("testMultiException failed.");
+ } catch (Xception e) {
+ if (trace) writefln(" {%s, \"%s\"}", e.errorCode, e.message);
+ }
+
+ try {
+ if (trace) write("client.testMultiException(\"Xception2\", \"test 2\") =>");
+ auto result = client.testMultiException("Xception2", "test 2");
+ if (trace) writeln(" result\nFAILURE");
+ throw new Exception("testMultiException failed.");
+ } catch (Xception2 e) {
+ if (trace) writefln(" {%s, {\"%s\"}}",
+ e.errorCode, e.struct_thing.string_thing);
+ }
+
+ try {
+ if (trace) writef("client.testMultiException(\"success\", \"test 3\") =>");
+ auto result = client.testMultiException("success", "test 3");
+ if (trace) writefln(" {{\"%s\"}}", result.string_thing);
+ } catch (Exception e) {
+ if (trace) writeln(" exception\nFAILURE");
+ throw new Exception("testMultiException failed.");
+ }
+ }
+
+ // Do not run oneway test when doing multiple iterations, as it blocks the
+ // server for three seconds.
+ if (numTests == 1) {
+ if (trace) writef("client.testOneway(3) =>");
+ auto onewayWatch = StopWatch(AutoStart.yes);
+ client.testOneway(3);
+ onewayWatch.stop();
+ if (onewayWatch.peek().msecs > 200) {
+ if (trace) {
+ writefln(" FAILURE - took %s ms", onewayWatch.peek().usecs / 1000.0);
+ }
+ throw new Exception("testOneway failed.");
+ } else {
+ if (trace) {
+ writefln(" success - took %s ms", onewayWatch.peek().usecs / 1000.0);
+ }
+ }
+
+ // Redo a simple test after the oneway to make sure we aren't "off by
+ // one", which would be the case if the server treated oneway methods
+ // like normal ones.
+ if (trace) write("re-test testI32(-1)");
+ i32 = client.testI32(-1);
+ if (trace) writefln(" = %s", i32);
+ }
+
+ // Time metering.
+ sw.stop();
+
+ immutable tot = sw.peek().usecs;
+ if (trace) writefln("Total time: %s us\n", tot);
+
+ time_tot += tot;
+ if (time_min == 0 || tot < time_min) {
+ time_min = tot;
+ }
+ if (tot > time_max) {
+ time_max = tot;
+ }
+ protocol.transport.close();
+
+ sw.reset();
+ }
+
+ writeln("All tests done.");
+
+ if (numTests > 1) {
+ auto time_avg = time_tot / numTests;
+ writefln("Min time: %s us", time_min);
+ writefln("Max time: %s us", time_max);
+ writefln("Avg time: %s us", time_avg);
+ }
+}
diff --git a/src/jaegertracing/thrift/lib/d/test/thrift_test_common.d b/src/jaegertracing/thrift/lib/d/test/thrift_test_common.d
new file mode 100644
index 000000000..13a568613
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/thrift_test_common.d
@@ -0,0 +1,92 @@
+module thrift_test_common;
+
+import std.stdio;
+import thrift.test.ThriftTest_types;
+
+enum ProtocolType {
+ binary,
+ compact,
+ json
+}
+
+void writeInsanityReturn(in Insanity[Numberz][UserId] insane) {
+ write("{");
+ foreach(key1, value1; insane) {
+ writef("%s => {", key1);
+ foreach(key2, value2; value1) {
+ writef("%s => {", key2);
+ write("{");
+ foreach(key3, value3; value2.userMap) {
+ writef("%s => %s, ", key3, value3);
+ }
+ write("}, ");
+
+ write("{");
+ foreach (x; value2.xtructs) {
+ writef("{\"%s\", %s, %s, %s}, ",
+ x.string_thing, x.byte_thing, x.i32_thing, x.i64_thing);
+ }
+ write("}");
+
+ write("}, ");
+ }
+ write("}, ");
+ }
+ write("}");
+}
+
+Insanity[Numberz][UserId] testInsanityReturn;
+int[int][int] testMapMapReturn;
+
+static this() {
+ testInsanityReturn = {
+ Insanity[Numberz][UserId] insane;
+
+ Xtruct hello;
+ hello.string_thing = "Hello2";
+ hello.byte_thing = 2;
+ hello.i32_thing = 2;
+ hello.i64_thing = 2;
+
+ Xtruct goodbye;
+ goodbye.string_thing = "Goodbye4";
+ goodbye.byte_thing = 4;
+ goodbye.i32_thing = 4;
+ goodbye.i64_thing = 4;
+
+ Insanity crazy;
+ crazy.userMap[Numberz.EIGHT] = 8;
+ crazy.xtructs ~= goodbye;
+
+ Insanity looney;
+ // The C++ TestServer also assigns these to crazy, but that is probably
+ // an oversight.
+ looney.userMap[Numberz.FIVE] = 5;
+ looney.xtructs ~= hello;
+
+ Insanity[Numberz] first_map;
+ first_map[Numberz.TWO] = crazy;
+ first_map[Numberz.THREE] = crazy;
+ insane[1] = first_map;
+
+ Insanity[Numberz] second_map;
+ second_map[Numberz.SIX] = looney;
+ insane[2] = second_map;
+ return insane;
+ }();
+
+ testMapMapReturn = {
+ int[int] pos;
+ int[int] neg;
+
+ for (int i = 1; i < 5; i++) {
+ pos[i] = i;
+ neg[-i] = -i;
+ }
+
+ int[int][int] result;
+ result[4] = pos;
+ result[-4] = neg;
+ return result;
+ }();
+}
diff --git a/src/jaegertracing/thrift/lib/d/test/thrift_test_runner.sh b/src/jaegertracing/thrift/lib/d/test/thrift_test_runner.sh
new file mode 100755
index 000000000..51bfe9999
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/thrift_test_runner.sh
@@ -0,0 +1,93 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Runs the D ThriftTest client and servers for all combinations of transport,
+# protocol, SSL-mode and server type.
+# Pass -k to keep going after failed tests.
+
+CUR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
+
+protocols="binary compact json"
+# TODO: fix and enable http
+# transports="buffered framed raw http"
+transports="buffered framed raw"
+servers="simple taskpool threaded"
+framed_only_servers="nonblocking pooledNonblocking"
+
+# Don't leave any server instances behind when interrupted (e.g. by Ctrl+C)
+# or terminated.
+trap "kill $(jobs -p) 2>/dev/null" INT TERM
+
+for protocol in $protocols; do
+ for ssl in "" " --ssl"; do
+ for transport in $transports; do
+ for server in $servers $framed_only_servers; do
+ case $framed_only_servers in
+ *$server*) if [ $transport != "framed" ] || [ $ssl != "" ]; then continue; fi;;
+ esac
+
+ args="--transport=$transport --protocol=$protocol$ssl"
+ ${CUR}/thrift_test_server $args --server-type=$server > /dev/null &
+ server_pid=$!
+
+ # Give the server some time to get up and check if it runs (yes, this
+ # is a huge kludge, should add a connect timeout to test client).
+ client_rc=-1
+ if [ "$server" = "taskpool" ]; then
+ sleep 0.5
+ else
+ sleep 0.02
+ fi
+ kill -0 $server_pid 2>/dev/null
+ if [ $? -eq 0 ]; then
+ ${CUR}/thrift_test_client $args --numTests=10 > /dev/null
+ client_rc=$?
+
+ # Temporarily redirect stderr to null to avoid job control messages,
+ # restore it afterwards.
+ exec 3>&2
+ exec 2>/dev/null
+ kill $server_pid
+ exec 3>&2
+ fi
+
+ # Get the server exit code (wait should immediately return).
+ wait $server_pid
+ server_rc=$?
+
+ if [ $client_rc -ne 0 -o $server_rc -eq 1 ]; then
+ echo -e "\nTests failed for: $args --server-type=$server"
+ failed="true"
+ if [ "$1" != "-k" ]; then
+ exit 1
+ fi
+ else
+ echo -n "."
+ fi
+ done
+ done
+ done
+done
+
+echo
+if [ -z "$failed" ]; then
+ echo "All tests passed."
+fi
diff --git a/src/jaegertracing/thrift/lib/d/test/thrift_test_server.d b/src/jaegertracing/thrift/lib/d/test/thrift_test_server.d
new file mode 100644
index 000000000..ce820d699
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/thrift_test_server.d
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+module thrift_test_server;
+
+import core.stdc.errno : errno;
+import core.stdc.signal : signal, SIGINT, SIG_DFL, SIG_ERR;
+import core.thread : dur, Thread;
+import std.algorithm;
+import std.exception : enforce;
+import std.getopt;
+import std.parallelism : totalCPUs;
+import std.string;
+import std.stdio;
+import std.typetuple : TypeTuple, staticMap;
+import thrift.base;
+import thrift.codegen.processor;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.protocol.compact;
+import thrift.protocol.json;
+import thrift.server.base;
+import thrift.server.transport.socket;
+import thrift.server.transport.ssl;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.framed;
+import thrift.transport.http;
+import thrift.transport.ssl;
+import thrift.util.cancellation;
+import thrift.util.hashset;
+import test_utils;
+
+import thrift_test_common;
+import thrift.test.ThriftTest_types;
+import thrift.test.ThriftTest;
+
+class TestHandler : ThriftTest {
+ this(bool trace) {
+ trace_ = trace;
+ }
+
+ override void testVoid() {
+ if (trace_) writeln("testVoid()");
+ }
+
+ override string testString(string thing) {
+ if (trace_) writefln("testString(\"%s\")", thing);
+ return thing;
+ }
+
+ override byte testByte(byte thing) {
+ if (trace_) writefln("testByte(%s)", thing);
+ return thing;
+ }
+
+ override int testI32(int thing) {
+ if (trace_) writefln("testI32(%s)", thing);
+ return thing;
+ }
+
+ override long testI64(long thing) {
+ if (trace_) writefln("testI64(%s)", thing);
+ return thing;
+ }
+
+ override double testDouble(double thing) {
+ if (trace_) writefln("testDouble(%s)", thing);
+ return thing;
+ }
+
+ override string testBinary(string thing) {
+ if (trace_) writefln("testBinary(\"%s\")", thing);
+ return thing;
+ }
+
+ override bool testBool(bool thing) {
+ if (trace_) writefln("testBool(\"%s\")", thing);
+ return thing;
+ }
+
+ override Xtruct testStruct(ref const(Xtruct) thing) {
+ if (trace_) writefln("testStruct({\"%s\", %s, %s, %s})",
+ thing.string_thing, thing.byte_thing, thing.i32_thing, thing.i64_thing);
+ return thing;
+ }
+
+ override Xtruct2 testNest(ref const(Xtruct2) nest) {
+ auto thing = nest.struct_thing;
+ if (trace_) writefln("testNest({%s, {\"%s\", %s, %s, %s}, %s})",
+ nest.byte_thing, thing.string_thing, thing.byte_thing, thing.i32_thing,
+ thing.i64_thing, nest.i32_thing);
+ return nest;
+ }
+
+ override int[int] testMap(int[int] thing) {
+ if (trace_) writefln("testMap({%s})", thing);
+ return thing;
+ }
+
+ override HashSet!int testSet(HashSet!int thing) {
+ if (trace_) writefln("testSet({%s})",
+ join(map!`to!string(a)`(thing[]), ", "));
+ return thing;
+ }
+
+ override int[] testList(int[] thing) {
+ if (trace_) writefln("testList(%s)", thing);
+ return thing;
+ }
+
+ override Numberz testEnum(Numberz thing) {
+ if (trace_) writefln("testEnum(%s)", thing);
+ return thing;
+ }
+
+ override UserId testTypedef(UserId thing) {
+ if (trace_) writefln("testTypedef(%s)", thing);
+ return thing;
+ }
+
+ override string[string] testStringMap(string[string] thing) {
+ if (trace_) writefln("testStringMap(%s)", thing);
+ return thing;
+ }
+
+ override int[int][int] testMapMap(int hello) {
+ if (trace_) writefln("testMapMap(%s)", hello);
+ return testMapMapReturn;
+ }
+
+ override Insanity[Numberz][UserId] testInsanity(ref const(Insanity) argument) {
+ if (trace_) writeln("testInsanity()");
+ Insanity[Numberz][UserId] ret;
+ Insanity[Numberz] m1;
+ Insanity[Numberz] m2;
+ Insanity tmp;
+ tmp = cast(Insanity)argument;
+ m1[Numberz.TWO] = tmp;
+ m1[Numberz.THREE] = tmp;
+ m2[Numberz.SIX] = Insanity();
+ ret[1] = m1;
+ ret[2] = m2;
+ return ret;
+ }
+
+ override Xtruct testMulti(byte arg0, int arg1, long arg2, string[short] arg3,
+ Numberz arg4, UserId arg5)
+ {
+ if (trace_) writeln("testMulti()");
+ return Xtruct("Hello2", arg0, arg1, arg2);
+ }
+
+ override void testException(string arg) {
+ if (trace_) writefln("testException(%s)", arg);
+ if (arg == "Xception") {
+ auto e = new Xception();
+ e.errorCode = 1001;
+ e.message = arg;
+ throw e;
+ } else if (arg == "TException") {
+ throw new TException();
+ } else if (arg == "ApplicationException") {
+ throw new TException();
+ }
+ }
+
+ override Xtruct testMultiException(string arg0, string arg1) {
+ if (trace_) writefln("testMultiException(%s, %s)", arg0, arg1);
+
+ if (arg0 == "Xception") {
+ auto e = new Xception();
+ e.errorCode = 1001;
+ e.message = "This is an Xception";
+ throw e;
+ } else if (arg0 == "Xception2") {
+ auto e = new Xception2();
+ e.errorCode = 2002;
+ e.struct_thing.string_thing = "This is an Xception2";
+ throw e;
+ } else {
+ return Xtruct(arg1);
+ }
+ }
+
+ override void testOneway(int sleepFor) {
+ if (trace_) writefln("testOneway(%s): Sleeping...", sleepFor);
+ Thread.sleep(dur!"seconds"(sleepFor));
+ if (trace_) writefln("testOneway(%s): done sleeping!", sleepFor);
+ }
+
+private:
+ bool trace_;
+}
+
+shared(bool) gShutdown = false;
+
+nothrow @nogc extern(C) void handleSignal(int sig) {
+ gShutdown = true;
+}
+
+// Runs a thread that waits for shutdown to be
+// signaled and then triggers cancellation,
+// causing the server to stop. While we could
+// use a signalfd for this purpose, we are instead
+// opting for a busy waiting scheme for maximum
+// portability since signalfd is a linux thing.
+
+class ShutdownThread : Thread {
+ this(TCancellationOrigin cancellation) {
+ cancellation_ = cancellation;
+ super(&run);
+ }
+
+private:
+ void run() {
+ while (!gShutdown) {
+ Thread.sleep(dur!("msecs")(25));
+ }
+ cancellation_.trigger();
+ }
+
+ TCancellationOrigin cancellation_;
+}
+
+void main(string[] args) {
+ ushort port = 9090;
+ ServerType serverType;
+ ProtocolType protocolType;
+ size_t numIOThreads = 1;
+ TransportType transportType;
+ bool ssl = false;
+ bool trace = true;
+ size_t taskPoolSize = totalCPUs;
+
+ getopt(args, "port", &port, "protocol", &protocolType, "server-type",
+ &serverType, "ssl", &ssl, "num-io-threads", &numIOThreads,
+ "task-pool-size", &taskPoolSize, "trace", &trace,
+ "transport", &transportType);
+
+ if (serverType == ServerType.nonblocking ||
+ serverType == ServerType.pooledNonblocking
+ ) {
+ enforce(transportType == TransportType.framed,
+ "Need to use framed transport with non-blocking server.");
+ enforce(!ssl, "The non-blocking server does not support SSL yet.");
+
+ // Don't wrap the contents into another layer of framing.
+ transportType = TransportType.raw;
+ }
+
+ version (ThriftTestTemplates) {
+ // Only exercise the specialized template code paths if explicitly enabled
+ // to reduce memory consumption on regular test suite runs – there should
+ // not be much that can go wrong with that specifically anyway.
+ alias TypeTuple!(TBufferedTransport, TFramedTransport, TServerHttpTransport)
+ AvailableTransports;
+ alias TypeTuple!(
+ staticMap!(TBinaryProtocol, AvailableTransports),
+ staticMap!(TCompactProtocol, AvailableTransports)
+ ) AvailableProtocols;
+ } else {
+ alias TypeTuple!() AvailableTransports;
+ alias TypeTuple!() AvailableProtocols;
+ }
+
+ TProtocolFactory protocolFactory;
+ final switch (protocolType) {
+ case ProtocolType.binary:
+ protocolFactory = new TBinaryProtocolFactory!AvailableTransports;
+ break;
+ case ProtocolType.compact:
+ protocolFactory = new TCompactProtocolFactory!AvailableTransports;
+ break;
+ case ProtocolType.json:
+ protocolFactory = new TJsonProtocolFactory!AvailableTransports;
+ break;
+ }
+
+ auto processor = new TServiceProcessor!(ThriftTest, AvailableProtocols)(
+ new TestHandler(trace));
+
+ TServerSocket serverSocket;
+ if (ssl) {
+ auto sslContext = new TSSLContext();
+ sslContext.serverSide = true;
+ sslContext.loadCertificate("../../../test/keys/server.crt");
+ sslContext.loadPrivateKey("../../../test/keys/server.key");
+ sslContext.ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+ serverSocket = new TSSLServerSocket(port, sslContext);
+ } else {
+ serverSocket = new TServerSocket(port);
+ }
+
+ auto transportFactory = createTransportFactory(transportType);
+
+ auto server = createServer(serverType, numIOThreads, taskPoolSize,
+ processor, serverSocket, transportFactory, protocolFactory);
+
+ // Set up SIGINT signal handling
+ enforce(signal(SIGINT, &handleSignal) != SIG_ERR,
+ "Could not replace the SIGINT signal handler: errno {0}".format(errno()));
+
+ // Set up a server cancellation trigger
+ auto cancel = new TCancellationOrigin();
+
+ // Set up a listener for the shutdown condition - this will
+ // wake up when the signal occurs and trigger cancellation.
+ auto shutdown = new ShutdownThread(cancel);
+ shutdown.start();
+
+ // Serve from this thread; the signal will stop the server
+ // and control will return here
+ writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType,
+ transportType, serverType, ssl ? "(using SSL) ": "", port);
+ server.serve(cancel);
+ shutdown.join();
+ signal(SIGINT, SIG_DFL);
+
+ writeln("done.");
+}
diff --git a/src/jaegertracing/thrift/lib/d/test/transport_test.d b/src/jaegertracing/thrift/lib/d/test/transport_test.d
new file mode 100644
index 000000000..623e03f0e
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/transport_test.d
@@ -0,0 +1,803 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Exercises various transports, combined with the buffered/framed wrappers.
+ *
+ * Originally ported from the C++ version, with Windows support code added.
+ */
+module transport_test;
+
+import core.atomic;
+import core.time : Duration;
+import core.thread : Thread;
+import std.conv : to;
+import std.datetime;
+import std.exception : enforce;
+static import std.file;
+import std.getopt;
+import std.random : rndGen, uniform, unpredictableSeed;
+import std.socket;
+import std.stdio;
+import std.string;
+import std.typetuple;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.framed;
+import thrift.transport.file;
+import thrift.transport.http;
+import thrift.transport.memory;
+import thrift.transport.socket;
+import thrift.transport.zlib;
+
+/*
+ * Size generation helpers – used to be able to run the same testing code
+ * with both constant and random total/chunk sizes.
+ */
+
+interface SizeGenerator {
+ size_t nextSize();
+ string toString();
+}
+
+class ConstantSizeGenerator : SizeGenerator {
+ this(size_t value) {
+ value_ = value;
+ }
+
+ override size_t nextSize() {
+ return value_;
+ }
+
+ override string toString() const {
+ return to!string(value_);
+ }
+
+private:
+ size_t value_;
+}
+
+class RandomSizeGenerator : SizeGenerator {
+ this(size_t min, size_t max) {
+ min_ = min;
+ max_ = max;
+ }
+
+ override size_t nextSize() {
+ return uniform!"[]"(min_, max_);
+ }
+
+ override string toString() const {
+ return format("rand(%s, %s)", min_, max_);
+ }
+
+ size_t min() const @property {
+ return min_;
+ }
+
+ size_t max() const @property {
+ return max_;
+ }
+
+private:
+ size_t min_;
+ size_t max_;
+}
+
+
+/*
+ * Classes to set up coupled transports
+ */
+
+/**
+ * Helper class to represent a coupled pair of transports.
+ *
+ * Data written to the output transport can be read from the input transport.
+ *
+ * This is used as the base class for the various coupled transport
+ * implementations. It shouldn't be used directly.
+ */
+class CoupledTransports(Transport) if (isTTransport!Transport) {
+ Transport input;
+ Transport output;
+}
+
+template isCoupledTransports(T) {
+ static if (is(T _ : CoupledTransports!U, U)) {
+ enum isCoupledTransports = true;
+ } else {
+ enum isCoupledTransports = false;
+ }
+}
+
+/**
+ * Helper template class for creating coupled transports that wrap
+ * another transport.
+ */
+class CoupledWrapperTransports(WrapperTransport, InnerCoupledTransports) if (
+ isTTransport!WrapperTransport && isCoupledTransports!InnerCoupledTransports
+) : CoupledTransports!WrapperTransport {
+ this() {
+ inner_ = new InnerCoupledTransports();
+ if (inner_.input) {
+ input = new WrapperTransport(inner_.input);
+ }
+ if (inner_.output) {
+ output = new WrapperTransport(inner_.output);
+ }
+ }
+
+ ~this() {
+ destroy(inner_);
+ }
+
+private:
+ InnerCoupledTransports inner_;
+}
+
+import thrift.internal.codegen : PApply;
+alias PApply!(CoupledWrapperTransports, TBufferedTransport) CoupledBufferedTransports;
+alias PApply!(CoupledWrapperTransports, TFramedTransport) CoupledFramedTransports;
+alias PApply!(CoupledWrapperTransports, TZlibTransport) CoupledZlibTransports;
+
+/**
+ * Coupled TMemoryBuffers.
+ */
+class CoupledMemoryBuffers : CoupledTransports!TMemoryBuffer {
+ this() {
+ buf = new TMemoryBuffer;
+ input = buf;
+ output = buf;
+ }
+
+ TMemoryBuffer buf;
+}
+
+/**
+ * Coupled TSockets.
+ */
+class CoupledSocketTransports : CoupledTransports!TSocket {
+ this() {
+ auto sockets = socketPair();
+ input = new TSocket(sockets[0]);
+ output = new TSocket(sockets[1]);
+ }
+
+ ~this() {
+ input.close();
+ output.close();
+ }
+}
+
+/**
+ * Coupled TFileTransports
+ */
+class CoupledFileTransports : CoupledTransports!TTransport {
+ this() {
+ // We actually need the file name of the temp file here, so we can't just
+ // use the usual tempfile facilities.
+ do {
+ fileName_ = tmpDir ~ "/thrift.transport_test." ~ to!string(rndGen().front);
+ rndGen().popFront();
+ } while (std.file.exists(fileName_));
+
+ writefln("Using temp file: %s", fileName_);
+
+ auto writer = new TFileWriterTransport(fileName_);
+ writer.open();
+ output = writer;
+
+ // Wait until the file has been created.
+ writer.flush();
+
+ auto reader = new TFileReaderTransport(fileName_);
+ reader.open();
+ reader.readTimeout(dur!"msecs"(-1));
+ input = reader;
+ }
+
+ ~this() {
+ input.close();
+ output.close();
+ std.file.remove(fileName_);
+ }
+
+ static string tmpDir;
+
+private:
+ string fileName_;
+}
+
+
+/*
+ * Test functions
+ */
+
+/**
+ * Test interleaved write and read calls.
+ *
+ * Generates a buffer totalSize bytes long, then writes it to the transport,
+ * and verifies the written data can be read back correctly.
+ *
+ * Mode of operation:
+ * - call wChunkGenerator to figure out how large of a chunk to write
+ * - call wSizeGenerator to get the size for individual write() calls,
+ * and do this repeatedly until the entire chunk is written.
+ * - call rChunkGenerator to figure out how large of a chunk to read
+ * - call rSizeGenerator to get the size for individual read() calls,
+ * and do this repeatedly until the entire chunk is read.
+ * - repeat until the full buffer is written and read back,
+ * then compare the data read back against the original buffer
+ *
+ *
+ * - If any of the size generators return 0, this means to use the maximum
+ * possible size.
+ *
+ * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that
+ * there are never more than maxOutstanding bytes waiting to be read back.
+ */
+void testReadWrite(CoupledTransports)(
+ size_t totalSize,
+ SizeGenerator wSizeGenerator,
+ SizeGenerator rSizeGenerator,
+ SizeGenerator wChunkGenerator,
+ SizeGenerator rChunkGenerator,
+ size_t maxOutstanding
+) if (
+ isCoupledTransports!CoupledTransports
+) {
+ scope transports = new CoupledTransports;
+ assert(transports.input);
+ assert(transports.output);
+
+ auto wbuf = new ubyte[totalSize];
+ auto rbuf = new ubyte[totalSize];
+
+ // Store some data in wbuf.
+ foreach (i, ref b; wbuf) {
+ b = i & 0xff;
+ }
+
+ size_t totalWritten;
+ size_t totalRead;
+ while (totalRead < totalSize) {
+ // Determine how large a chunk of data to write.
+ auto wChunkSize = wChunkGenerator.nextSize();
+ if (wChunkSize == 0 || wChunkSize > totalSize - totalWritten) {
+ wChunkSize = totalSize - totalWritten;
+ }
+
+ // Make sure (totalWritten - totalRead) + wChunkSize is less than
+ // maxOutstanding.
+ if (maxOutstanding > 0 &&
+ wChunkSize > maxOutstanding - (totalWritten - totalRead)) {
+ wChunkSize = maxOutstanding - (totalWritten - totalRead);
+ }
+
+ // Write the chunk.
+ size_t chunkWritten = 0;
+ while (chunkWritten < wChunkSize) {
+ auto writeSize = wSizeGenerator.nextSize();
+ if (writeSize == 0 || writeSize > wChunkSize - chunkWritten) {
+ writeSize = wChunkSize - chunkWritten;
+ }
+
+ transports.output.write(wbuf[totalWritten .. totalWritten + writeSize]);
+ chunkWritten += writeSize;
+ totalWritten += writeSize;
+ }
+
+ // Flush the data, so it will be available in the read transport
+ // Don't flush if wChunkSize is 0. (This should only happen if
+ // totalWritten == totalSize already, and we're only reading now.)
+ if (wChunkSize > 0) {
+ transports.output.flush();
+ }
+
+ // Determine how large a chunk of data to read back.
+ auto rChunkSize = rChunkGenerator.nextSize();
+ if (rChunkSize == 0 || rChunkSize > totalWritten - totalRead) {
+ rChunkSize = totalWritten - totalRead;
+ }
+
+ // Read the chunk.
+ size_t chunkRead;
+ while (chunkRead < rChunkSize) {
+ auto readSize = rSizeGenerator.nextSize();
+ if (readSize == 0 || readSize > rChunkSize - chunkRead) {
+ readSize = rChunkSize - chunkRead;
+ }
+
+ size_t bytesRead;
+ try {
+ bytesRead = transports.input.read(
+ rbuf[totalRead .. totalRead + readSize]);
+ } catch (TTransportException e) {
+ throw new Exception(format(`read(pos = %s, size = %s) threw ` ~
+ `exception "%s"; written so far: %s/%s bytes`, totalRead, readSize,
+ e.msg, totalWritten, totalSize));
+ }
+
+ enforce(bytesRead > 0, format(`read(pos = %s, size = %s) returned %s; ` ~
+ `written so far: %s/%s bytes`, totalRead, readSize, bytesRead,
+ totalWritten, totalSize));
+
+ chunkRead += bytesRead;
+ totalRead += bytesRead;
+ }
+ }
+
+ // make sure the data read back is identical to the data written
+ if (rbuf != wbuf) {
+ stderr.writefln("%s vs. %s", wbuf[$ - 4 .. $], rbuf[$ - 4 .. $]);
+ stderr.writefln("rbuf: %s vs. wbuf: %s", rbuf.length, wbuf.length);
+ }
+ enforce(rbuf == wbuf);
+}
+
+void testReadPartAvailable(CoupledTransports)() if (
+ isCoupledTransports!CoupledTransports
+) {
+ scope transports = new CoupledTransports;
+ assert(transports.input);
+ assert(transports.output);
+
+ ubyte[10] writeBuf = 'a';
+ ubyte[10] readBuf;
+
+ // Attemping to read 10 bytes when only 9 are available should return 9
+ // immediately.
+ transports.output.write(writeBuf[0 .. 9]);
+ transports.output.flush();
+
+ auto t = Trigger(dur!"seconds"(3), transports.output, 1);
+ auto bytesRead = transports.input.read(readBuf);
+ enforce(t.fired == 0);
+ enforce(bytesRead == 9);
+}
+
+void testReadPartialMidframe(CoupledTransports)() if (
+ isCoupledTransports!CoupledTransports
+) {
+ scope transports = new CoupledTransports;
+ assert(transports.input);
+ assert(transports.output);
+
+ ubyte[13] writeBuf = 'a';
+ ubyte[14] readBuf;
+
+ // Attempt to read 10 bytes, when only 9 are available, but after we have
+ // already read part of the data that is available. This exercises a
+ // different code path for several of the transports.
+ //
+ // For transports that add their own framing (e.g., TFramedTransport and
+ // TFileTransport), the two flush calls break up the data in to a 10 byte
+ // frame and a 3 byte frame. The first read then puts us partway through the
+ // first frame, and then we attempt to read past the end of that frame, and
+ // through the next frame, too.
+ //
+ // For buffered transports that perform read-ahead (e.g.,
+ // TBufferedTransport), the read-ahead will most likely see all 13 bytes
+ // written on the first read. The next read will then attempt to read past
+ // the end of the read-ahead buffer.
+ //
+ // Flush 10 bytes, then 3 bytes. This creates 2 separate frames for
+ // transports that track framing internally.
+ transports.output.write(writeBuf[0 .. 10]);
+ transports.output.flush();
+ transports.output.write(writeBuf[10 .. 13]);
+ transports.output.flush();
+
+ // Now read 4 bytes, so that we are partway through the written data.
+ auto bytesRead = transports.input.read(readBuf[0 .. 4]);
+ enforce(bytesRead == 4);
+
+ // Now attempt to read 10 bytes. Only 9 more are available.
+ //
+ // We should be able to get all 9 bytes, but it might take multiple read
+ // calls, since it is valid for read() to return fewer bytes than requested.
+ // (Most transports do immediately return 9 bytes, but the framing transports
+ // tend to only return to the end of the current frame, which is 6 bytes in
+ // this case.)
+ size_t totalRead = 0;
+ while (totalRead < 9) {
+ auto t = Trigger(dur!"seconds"(3), transports.output, 1);
+ bytesRead = transports.input.read(readBuf[4 + totalRead .. 14]);
+ enforce(t.fired == 0);
+ enforce(bytesRead > 0);
+ totalRead += bytesRead;
+ enforce(totalRead <= 9);
+ }
+
+ enforce(totalRead == 9);
+}
+
+void testBorrowPartAvailable(CoupledTransports)() if (
+ isCoupledTransports!CoupledTransports
+) {
+ scope transports = new CoupledTransports;
+ assert(transports.input);
+ assert(transports.output);
+
+ ubyte[9] writeBuf = 'a';
+ ubyte[10] readBuf;
+
+ // Attemping to borrow 10 bytes when only 9 are available should return NULL
+ // immediately.
+ transports.output.write(writeBuf);
+ transports.output.flush();
+
+ auto t = Trigger(dur!"seconds"(3), transports.output, 1);
+ auto borrowLen = readBuf.length;
+ auto borrowedBuf = transports.input.borrow(readBuf.ptr, borrowLen);
+ enforce(t.fired == 0);
+ enforce(borrowedBuf is null);
+}
+
+void testReadNoneAvailable(CoupledTransports)() if (
+ isCoupledTransports!CoupledTransports
+) {
+ scope transports = new CoupledTransports;
+ assert(transports.input);
+ assert(transports.output);
+
+ // Attempting to read when no data is available should either block until
+ // some data is available, or fail immediately. (e.g., TSocket blocks,
+ // TMemoryBuffer just fails.)
+ //
+ // If the transport blocks, it should succeed once some data is available,
+ // even if less than the amount requested becomes available.
+ ubyte[10] readBuf;
+
+ auto t = Trigger(dur!"seconds"(1), transports.output, 2);
+ t.add(dur!"seconds"(1), transports.output, 8);
+
+ auto bytesRead = transports.input.read(readBuf);
+ if (bytesRead == 0) {
+ enforce(t.fired == 0);
+ } else {
+ enforce(t.fired == 1);
+ enforce(bytesRead == 2);
+ }
+}
+
+void testBorrowNoneAvailable(CoupledTransports)() if (
+ isCoupledTransports!CoupledTransports
+) {
+ scope transports = new CoupledTransports;
+ assert(transports.input);
+ assert(transports.output);
+
+ ubyte[16] writeBuf = 'a';
+
+ // Attempting to borrow when no data is available should fail immediately
+ auto t = Trigger(dur!"seconds"(1), transports.output, 10);
+
+ auto borrowLen = 10;
+ auto borrowedBuf = transports.input.borrow(null, borrowLen);
+ enforce(borrowedBuf is null);
+ enforce(t.fired == 0);
+}
+
+
+void doRwTest(CoupledTransports)(
+ size_t totalSize,
+ SizeGenerator wSizeGen,
+ SizeGenerator rSizeGen,
+ SizeGenerator wChunkSizeGen = new ConstantSizeGenerator(0),
+ SizeGenerator rChunkSizeGen = new ConstantSizeGenerator(0),
+ size_t maxOutstanding = 0
+) if (
+ isCoupledTransports!CoupledTransports
+) {
+ totalSize = cast(size_t)(totalSize * g_sizeMultiplier);
+
+ scope(failure) {
+ writefln("Test failed for %s: testReadWrite(%s, %s, %s, %s, %s, %s)",
+ CoupledTransports.stringof, totalSize, wSizeGen, rSizeGen,
+ wChunkSizeGen, rChunkSizeGen, maxOutstanding);
+ }
+
+ testReadWrite!CoupledTransports(totalSize, wSizeGen, rSizeGen,
+ wChunkSizeGen, rChunkSizeGen, maxOutstanding);
+}
+
+void doBlockingTest(CoupledTransports)() if (
+ isCoupledTransports!CoupledTransports
+) {
+ void writeFailure(string name) {
+ writefln("Test failed for %s: %s()", CoupledTransports.stringof, name);
+ }
+
+ {
+ scope(failure) writeFailure("testReadPartAvailable");
+ testReadPartAvailable!CoupledTransports();
+ }
+
+ {
+ scope(failure) writeFailure("testReadPartialMidframe");
+ testReadPartialMidframe!CoupledTransports();
+ }
+
+ {
+ scope(failure) writeFailure("testReadNoneAvaliable");
+ testReadNoneAvailable!CoupledTransports();
+ }
+
+ {
+ scope(failure) writeFailure("testBorrowPartAvailable");
+ testBorrowPartAvailable!CoupledTransports();
+ }
+
+ {
+ scope(failure) writeFailure("testBorrowNoneAvailable");
+ testBorrowNoneAvailable!CoupledTransports();
+ }
+}
+
+SizeGenerator getGenerator(T)(T t) {
+ static if (is(T : SizeGenerator)) {
+ return t;
+ } else {
+ return new ConstantSizeGenerator(t);
+ }
+}
+
+template WrappedTransports(T) if (isCoupledTransports!T) {
+ alias TypeTuple!(
+ T,
+ CoupledBufferedTransports!T,
+ CoupledFramedTransports!T,
+ CoupledZlibTransports!T
+ ) WrappedTransports;
+}
+
+void testRw(C, R, S)(
+ size_t totalSize,
+ R wSize,
+ S rSize
+) if (
+ isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
+ is(typeof(getGenerator(rSize)))
+) {
+ testRw!C(totalSize, wSize, rSize, 0, 0, 0);
+}
+
+void testRw(C, R, S, T, U)(
+ size_t totalSize,
+ R wSize,
+ S rSize,
+ T wChunkSize,
+ U rChunkSize,
+ size_t maxOutstanding = 0
+) if (
+ isCoupledTransports!C && is(typeof(getGenerator(wSize))) &&
+ is(typeof(getGenerator(rSize))) && is(typeof(getGenerator(wChunkSize))) &&
+ is(typeof(getGenerator(rChunkSize)))
+) {
+ foreach (T; WrappedTransports!C) {
+ doRwTest!T(
+ totalSize,
+ getGenerator(wSize),
+ getGenerator(rSize),
+ getGenerator(wChunkSize),
+ getGenerator(rChunkSize),
+ maxOutstanding
+ );
+ }
+}
+
+void testBlocking(C)() if (isCoupledTransports!C) {
+ foreach (T; WrappedTransports!C) {
+ doBlockingTest!T();
+ }
+}
+
+// A quick hack, for the sake of brevity…
+float g_sizeMultiplier = 1;
+
+version (Posix) {
+ immutable defaultTempDir = "/tmp";
+} else version (Windows) {
+ import core.sys.windows.windows;
+ extern(Windows) DWORD GetTempPathA(DWORD nBufferLength, LPTSTR lpBuffer);
+
+ string defaultTempDir() @property {
+ char[MAX_PATH + 1] dir;
+ enforce(GetTempPathA(dir.length, dir.ptr));
+ return to!string(dir.ptr)[0 .. $ - 1];
+ }
+} else static assert(false);
+
+void main(string[] args) {
+ int seed = unpredictableSeed();
+ string tmpDir = defaultTempDir;
+
+ getopt(args, "seed", &seed, "size-multiplier", &g_sizeMultiplier,
+ "tmp-dir", &tmpDir);
+ enforce(g_sizeMultiplier >= 0, "Size multiplier must not be negative.");
+
+ writefln("Using seed: %s", seed);
+ rndGen().seed(seed);
+ CoupledFileTransports.tmpDir = tmpDir;
+
+ auto rand4k = new RandomSizeGenerator(1, 4096);
+
+ /*
+ * We do the basically the same set of tests for each transport type,
+ * although we tweak the parameters in some places.
+ */
+
+ // TMemoryBuffer tests
+ testRw!CoupledMemoryBuffers(1024 * 1024, 0, 0);
+ testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k);
+ testRw!CoupledMemoryBuffers(1024 * 256, 167, 163);
+ testRw!CoupledMemoryBuffers(1024 * 16, 1, 1);
+
+ testRw!CoupledMemoryBuffers(1024 * 256, 0, 0, rand4k, rand4k);
+ testRw!CoupledMemoryBuffers(1024 * 256, rand4k, rand4k, rand4k, rand4k);
+ testRw!CoupledMemoryBuffers(1024 * 256, 167, 163, rand4k, rand4k);
+ testRw!CoupledMemoryBuffers(1024 * 16, 1, 1, rand4k, rand4k);
+
+ testBlocking!CoupledMemoryBuffers();
+
+ // TSocket tests
+ enum socketMaxOutstanding = 4096;
+ testRw!CoupledSocketTransports(1024 * 1024, 0, 0,
+ 0, 0, socketMaxOutstanding);
+ testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
+ 0, 0, socketMaxOutstanding);
+ testRw!CoupledSocketTransports(1024 * 256, 167, 163,
+ 0, 0, socketMaxOutstanding);
+ // Doh. Apparently writing to a socket has some additional overhead for
+ // each send() call. If we have more than ~400 outstanding 1-byte write
+ // requests, additional send() calls start blocking.
+ testRw!CoupledSocketTransports(1024 * 16, 1, 1,
+ 0, 0, 250);
+ testRw!CoupledSocketTransports(1024 * 256, 0, 0,
+ rand4k, rand4k, socketMaxOutstanding);
+ testRw!CoupledSocketTransports(1024 * 256, rand4k, rand4k,
+ rand4k, rand4k, socketMaxOutstanding);
+ testRw!CoupledSocketTransports(1024 * 256, 167, 163,
+ rand4k, rand4k, socketMaxOutstanding);
+ testRw!CoupledSocketTransports(1024 * 16, 1, 1,
+ rand4k, rand4k, 250);
+
+ testBlocking!CoupledSocketTransports();
+
+ // File transport tests.
+
+ // Cannot write more than the frame size at once.
+ enum maxWriteAtOnce = 1024 * 1024 * 16 - 4;
+
+ testRw!CoupledFileTransports(1024 * 1024, maxWriteAtOnce, 0);
+ testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k);
+ testRw!CoupledFileTransports(1024 * 256, 167, 163);
+ testRw!CoupledFileTransports(1024 * 16, 1, 1);
+
+ testRw!CoupledFileTransports(1024 * 256, 0, 0, rand4k, rand4k);
+ testRw!CoupledFileTransports(1024 * 256, rand4k, rand4k, rand4k, rand4k);
+ testRw!CoupledFileTransports(1024 * 256, 167, 163, rand4k, rand4k);
+ testRw!CoupledFileTransports(1024 * 16, 1, 1, rand4k, rand4k);
+
+ testBlocking!CoupledFileTransports();
+}
+
+
+/*
+ * Timer handling code for use in tests that check the transport blocking
+ * semantics.
+ *
+ * The implementation has been hacked together in a hurry and wastes a lot of
+ * threads, but speed should not be the concern here.
+ */
+
+struct Trigger {
+ this(Duration timeout, TTransport transport, size_t writeLength) {
+ mutex_ = new Mutex;
+ cancelCondition_ = new Condition(mutex_);
+ info_ = new Info(timeout, transport, writeLength);
+ startThread();
+ }
+
+ ~this() {
+ synchronized (mutex_) {
+ info_ = null;
+ cancelCondition_.notifyAll();
+ }
+ if (thread_) thread_.join();
+ }
+
+ @disable this(this) { assert(0); }
+
+ void add(Duration timeout, TTransport transport, size_t writeLength) {
+ synchronized (mutex_) {
+ auto info = new Info(timeout, transport, writeLength);
+ if (info_) {
+ auto prev = info_;
+ while (prev.next) prev = prev.next;
+ prev.next = info;
+ } else {
+ info_ = info;
+ startThread();
+ }
+ }
+ }
+
+ @property short fired() {
+ return atomicLoad(fired_);
+ }
+
+private:
+ void timerThread() {
+ // KLUDGE: Make sure the std.concurrency mbox is initialized on the timer
+ // thread to be able to unblock the file transport.
+ import std.concurrency;
+ thisTid;
+
+ synchronized (mutex_) {
+ while (info_) {
+ auto cancelled = cancelCondition_.wait(info_.timeout);
+ if (cancelled) {
+ info_ = null;
+ break;
+ }
+
+ atomicOp!"+="(fired_, 1);
+
+ // Write some data to the transport to unblock it.
+ auto buf = new ubyte[info_.writeLength];
+ buf[] = 'b';
+ info_.transport.write(buf);
+ info_.transport.flush();
+
+ info_ = info_.next;
+ }
+ }
+
+ thread_ = null;
+ }
+
+ void startThread() {
+ thread_ = new Thread(&timerThread);
+ thread_.start();
+ }
+
+ struct Info {
+ this(Duration timeout, TTransport transport, size_t writeLength) {
+ this.timeout = timeout;
+ this.transport = transport;
+ this.writeLength = writeLength;
+ }
+
+ Duration timeout;
+ TTransport transport;
+ size_t writeLength;
+ Info* next;
+ }
+
+ Info* info_;
+ Thread thread_;
+ shared short fired_;
+
+ import core.sync.mutex;
+ Mutex mutex_;
+ import core.sync.condition;
+ Condition cancelCondition_;
+}