summaryrefslogtreecommitdiffstats
path: root/src/jaegertracing/thrift/lib/d/test/async_test.d
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-07 18:45:59 +0000
commit19fcec84d8d7d21e796c7624e521b60d28ee21ed (patch)
tree42d26aa27d1e3f7c0b8bd3fd14e7d7082f5008dc /src/jaegertracing/thrift/lib/d/test/async_test.d
parentInitial commit. (diff)
downloadceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.tar.xz
ceph-6d07fdb6bb33b1af39833b850bb6cf8af79fe293.zip
Adding upstream version 16.2.11+ds.upstream/16.2.11+dsupstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/jaegertracing/thrift/lib/d/test/async_test.d')
-rw-r--r--src/jaegertracing/thrift/lib/d/test/async_test.d396
1 files changed, 396 insertions, 0 deletions
diff --git a/src/jaegertracing/thrift/lib/d/test/async_test.d b/src/jaegertracing/thrift/lib/d/test/async_test.d
new file mode 100644
index 000000000..51529ba86
--- /dev/null
+++ b/src/jaegertracing/thrift/lib/d/test/async_test.d
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless enforced by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module async_test;
+
+import core.atomic;
+import core.sync.condition : Condition;
+import core.sync.mutex : Mutex;
+import core.thread : dur, Thread, ThreadGroup;
+import std.conv : text;
+import std.datetime;
+import std.getopt;
+import std.exception : collectException, enforce;
+import std.parallelism : TaskPool;
+import std.stdio;
+import std.string;
+import std.variant : Variant;
+import thrift.base;
+import thrift.async.base;
+import thrift.async.libevent;
+import thrift.async.socket;
+import thrift.async.ssl;
+import thrift.codegen.async_client;
+import thrift.codegen.async_client_pool;
+import thrift.codegen.base;
+import thrift.codegen.processor;
+import thrift.protocol.base;
+import thrift.protocol.binary;
+import thrift.server.base;
+import thrift.server.simple;
+import thrift.server.transport.socket;
+import thrift.server.transport.ssl;
+import thrift.transport.base;
+import thrift.transport.buffered;
+import thrift.transport.ssl;
+import thrift.util.cancellation;
+
+version (Posix) {
+ import core.stdc.signal;
+ import core.sys.posix.signal;
+
+ // Disable SIGPIPE because SSL server will write to broken socket after
+ // client disconnected (see TSSLSocket docs).
+ shared static this() {
+ signal(SIGPIPE, SIG_IGN);
+ }
+}
+
+interface AsyncTest {
+ string echo(string value);
+ string delayedEcho(string value, long milliseconds);
+
+ void fail(string reason);
+ void delayedFail(string reason, long milliseconds);
+
+ enum methodMeta = [
+ TMethodMeta("fail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]),
+ TMethodMeta("delayedFail", [], [TExceptionMeta("ate", 1, "AsyncTestException")])
+ ];
+ alias .AsyncTestException AsyncTestException;
+}
+
+class AsyncTestException : TException {
+ string reason;
+ mixin TStructHelpers!();
+}
+
+void main(string[] args) {
+ ushort port = 9090;
+ ushort managerCount = 2;
+ ushort serversPerManager = 5;
+ ushort threadsPerServer = 10;
+ uint iterations = 10;
+ bool ssl;
+ bool trace;
+
+ getopt(args,
+ "iterations", &iterations,
+ "managers", &managerCount,
+ "port", &port,
+ "servers-per-manager", &serversPerManager,
+ "ssl", &ssl,
+ "threads-per-server", &threadsPerServer,
+ "trace", &trace,
+ );
+
+ TTransportFactory clientTransportFactory;
+ TSSLContext serverSSLContext;
+ if (ssl) {
+ auto clientSSLContext = new TSSLContext();
+ with (clientSSLContext) {
+ authenticate = true;
+ ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+ loadTrustedCertificates("../../../test/keys/CA.pem");
+ }
+ clientTransportFactory = new TAsyncSSLSocketFactory(clientSSLContext);
+
+ serverSSLContext = new TSSLContext();
+ with (serverSSLContext) {
+ serverSide = true;
+ ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH";
+ loadCertificate("../../../test/keys/server.crt");
+ loadPrivateKey("../../../test/keys/server.key");
+ }
+ } else {
+ clientTransportFactory = new TBufferedTransportFactory;
+ }
+
+
+ auto serverCancel = new TCancellationOrigin;
+ scope(exit) {
+ writeln("Triggering server shutdown...");
+ serverCancel.trigger();
+ writeln("done.");
+ }
+
+ auto managers = new TLibeventAsyncManager[managerCount];
+ scope (exit) foreach (ref m; managers) destroy(m);
+
+ auto clientsThreads = new ThreadGroup;
+ foreach (managerIndex, ref manager; managers) {
+ manager = new TLibeventAsyncManager;
+ foreach (serverIndex; 0 .. serversPerManager) {
+ auto currentPort = cast(ushort)
+ (port + managerIndex * serversPerManager + serverIndex);
+
+ // Start the server and wait until it is up and running.
+ auto servingMutex = new Mutex;
+ auto servingCondition = new Condition(servingMutex);
+ auto handler = new PreServeNotifyHandler(servingMutex, servingCondition);
+ synchronized (servingMutex) {
+ (new ServerThread!TSimpleServer(currentPort, serverSSLContext, trace,
+ serverCancel, handler)).start();
+ servingCondition.wait();
+ }
+
+ // We only run the timing tests for the first server on each async
+ // manager, so that we don't get spurious timing errors becaue of
+ // ordering issues.
+ auto runTimingTests = (serverIndex == 0);
+
+ auto c = new ClientsThread(manager, currentPort, clientTransportFactory,
+ threadsPerServer, iterations, runTimingTests, trace);
+ clientsThreads.add(c);
+ c.start();
+ }
+ }
+ clientsThreads.joinAll();
+}
+
+class AsyncTestHandler : AsyncTest {
+ this(bool trace) {
+ trace_ = trace;
+ }
+
+ override string echo(string value) {
+ if (trace_) writefln(`echo("%s")`, value);
+ return value;
+ }
+
+ override string delayedEcho(string value, long milliseconds) {
+ if (trace_) writef(`delayedEcho("%s", %s ms)... `, value, milliseconds);
+ Thread.sleep(dur!"msecs"(milliseconds));
+ if (trace_) writeln("returning.");
+
+ return value;
+ }
+
+ override void fail(string reason) {
+ if (trace_) writefln(`fail("%s")`, reason);
+ auto ate = new AsyncTestException;
+ ate.reason = reason;
+ throw ate;
+ }
+
+ override void delayedFail(string reason, long milliseconds) {
+ if (trace_) writef(`delayedFail("%s", %s ms)... `, reason, milliseconds);
+ Thread.sleep(dur!"msecs"(milliseconds));
+ if (trace_) writeln("returning.");
+
+ auto ate = new AsyncTestException;
+ ate.reason = reason;
+ throw ate;
+ }
+
+private:
+ bool trace_;
+ AsyncTestException ate_;
+}
+
+class PreServeNotifyHandler : TServerEventHandler {
+ this(Mutex servingMutex, Condition servingCondition) {
+ servingMutex_ = servingMutex;
+ servingCondition_ = servingCondition;
+ }
+
+ void preServe() {
+ synchronized (servingMutex_) {
+ servingCondition_.notifyAll();
+ }
+ }
+ Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
+ void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
+ void preProcess(Variant serverContext, TTransport transport) {}
+
+private:
+ Mutex servingMutex_;
+ Condition servingCondition_;
+}
+
+class ServerThread(ServerType) : Thread {
+ this(ushort port, TSSLContext sslContext, bool trace,
+ TCancellation cancellation, TServerEventHandler eventHandler
+ ) {
+ port_ = port;
+ sslContext_ = sslContext;
+ trace_ = trace;
+ cancellation_ = cancellation;
+ eventHandler_ = eventHandler;
+
+ super(&run);
+ }
+
+ void run() {
+ TServerSocket serverSocket;
+ if (sslContext_) {
+ serverSocket = new TSSLServerSocket(port_, sslContext_);
+ } else {
+ serverSocket = new TServerSocket(port_);
+ }
+ auto transportFactory = new TBufferedTransportFactory;
+ auto protocolFactory = new TBinaryProtocolFactory!();
+ auto processor = new TServiceProcessor!AsyncTest(new AsyncTestHandler(trace_));
+
+ auto server = new ServerType(processor, serverSocket, transportFactory,
+ protocolFactory);
+ server.eventHandler = eventHandler_;
+
+ writefln("Starting server on port %s...", port_);
+ server.serve(cancellation_);
+ writefln("Server thread on port %s done.", port_);
+ }
+
+private:
+ ushort port_;
+ bool trace_;
+ TCancellation cancellation_;
+ TSSLContext sslContext_;
+ TServerEventHandler eventHandler_;
+}
+
+class ClientsThread : Thread {
+ this(TAsyncSocketManager manager, ushort port, TTransportFactory tf,
+ ushort threads, uint iterations, bool runTimingTests, bool trace
+ ) {
+ manager_ = manager;
+ port_ = port;
+ transportFactory_ = tf;
+ threads_ = threads;
+ iterations_ = iterations;
+ runTimingTests_ = runTimingTests;
+ trace_ = trace;
+ super(&run);
+ }
+
+ void run() {
+ auto transport = new TAsyncSocket(manager_, "localhost", port_);
+
+ {
+ auto client = new TAsyncClient!AsyncTest(
+ transport,
+ transportFactory_,
+ new TBinaryProtocolFactory!()
+ );
+ transport.open();
+ auto clientThreads = new ThreadGroup;
+ foreach (clientId; 0 .. threads_) {
+ clientThreads.create({
+ auto c = clientId;
+ return {
+ foreach (i; 0 .. iterations_) {
+ immutable id = text(port_, ":", c, ":", i);
+
+ {
+ if (trace_) writefln(`Calling echo("%s")... `, id);
+ auto a = client.echo(id);
+ enforce(a == id);
+ if (trace_) writefln(`echo("%s") done.`, id);
+ }
+
+ {
+ if (trace_) writefln(`Calling fail("%s")... `, id);
+ auto a = cast(AsyncTestException)collectException(client.fail(id).waitGet());
+ enforce(a && a.reason == id);
+ if (trace_) writefln(`fail("%s") done.`, id);
+ }
+ }
+ };
+ }());
+ }
+ clientThreads.joinAll();
+ transport.close();
+ }
+
+ if (runTimingTests_) {
+ auto client = new TAsyncClient!AsyncTest(
+ transport,
+ transportFactory_,
+ new TBinaryProtocolFactory!TBufferedTransport
+ );
+
+ // Temporarily redirect error logs to stdout, as SSL errors on the server
+ // side are expected when the client terminates aburptly (as is the case
+ // in the timeout test).
+ auto oldErrorLogSink = g_errorLogSink;
+ g_errorLogSink = g_infoLogSink;
+ scope (exit) g_errorLogSink = oldErrorLogSink;
+
+ foreach (i; 0 .. iterations_) {
+ transport.open();
+
+ immutable id = text(port_, ":", i);
+
+ {
+ if (trace_) writefln(`Calling delayedEcho("%s", 100 ms)...`, id);
+ auto a = client.delayedEcho(id, 100);
+ enforce(!a.completion.wait(dur!"usecs"(1)),
+ text("wait() succeeded early (", a.get(), ", ", id, ")."));
+ enforce(!a.completion.wait(dur!"usecs"(1)),
+ text("wait() succeeded early (", a.get(), ", ", id, ")."));
+ enforce(a.completion.wait(dur!"msecs"(200)),
+ text("wait() didn't succeed as expected (", id, ")."));
+ enforce(a.get() == id);
+ if (trace_) writefln(`... delayedEcho("%s") done.`, id);
+ }
+
+ {
+ if (trace_) writefln(`Calling delayedFail("%s", 100 ms)... `, id);
+ auto a = client.delayedFail(id, 100);
+ enforce(!a.completion.wait(dur!"usecs"(1)),
+ text("wait() succeeded early (", id, ", ", collectException(a.get()), ")."));
+ enforce(!a.completion.wait(dur!"usecs"(1)),
+ text("wait() succeeded early (", id, ", ", collectException(a.get()), ")."));
+ enforce(a.completion.wait(dur!"msecs"(200)),
+ text("wait() didn't succeed as expected (", id, ")."));
+ auto e = cast(AsyncTestException)collectException(a.get());
+ enforce(e && e.reason == id);
+ if (trace_) writefln(`... delayedFail("%s") done.`, id);
+ }
+
+ {
+ transport.recvTimeout = dur!"msecs"(50);
+
+ if (trace_) write(`Calling delayedEcho("socketTimeout", 100 ms)... `);
+ auto a = client.delayedEcho("socketTimeout", 100);
+ auto e = cast(TTransportException)collectException(a.waitGet());
+ enforce(e, text("Operation didn't fail as expected (", id, ")."));
+ enforce(e.type == TTransportException.Type.TIMED_OUT,
+ text("Wrong timeout exception type (", id, "): ", e));
+ if (trace_) writeln(`timed out as expected.`);
+
+ // Wait until the server thread reset before the next iteration.
+ Thread.sleep(dur!"msecs"(50));
+ transport.recvTimeout = dur!"hnsecs"(0);
+ }
+
+ transport.close();
+ }
+ }
+
+ writefln("Clients thread for port %s done.", port_);
+ }
+
+ TAsyncSocketManager manager_;
+ ushort port_;
+ TTransportFactory transportFactory_;
+ ushort threads_;
+ uint iterations_;
+ bool runTimingTests_;
+ bool trace_;
+}