/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless enforced by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ module async_test; import core.atomic; import core.sync.condition : Condition; import core.sync.mutex : Mutex; import core.thread : dur, Thread, ThreadGroup; import std.conv : text; import std.datetime; import std.getopt; import std.exception : collectException, enforce; import std.parallelism : TaskPool; import std.stdio; import std.string; import std.variant : Variant; import thrift.base; import thrift.async.base; import thrift.async.libevent; import thrift.async.socket; import thrift.async.ssl; import thrift.codegen.async_client; import thrift.codegen.async_client_pool; import thrift.codegen.base; import thrift.codegen.processor; import thrift.protocol.base; import thrift.protocol.binary; import thrift.server.base; import thrift.server.simple; import thrift.server.transport.socket; import thrift.server.transport.ssl; import thrift.transport.base; import thrift.transport.buffered; import thrift.transport.ssl; import thrift.util.cancellation; version (Posix) { import core.stdc.signal; import core.sys.posix.signal; // Disable SIGPIPE because SSL server will write to broken socket after // client disconnected (see TSSLSocket docs). shared static this() { signal(SIGPIPE, SIG_IGN); } } interface AsyncTest { string echo(string value); string delayedEcho(string value, long milliseconds); void fail(string reason); void delayedFail(string reason, long milliseconds); enum methodMeta = [ TMethodMeta("fail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]), TMethodMeta("delayedFail", [], [TExceptionMeta("ate", 1, "AsyncTestException")]) ]; alias .AsyncTestException AsyncTestException; } class AsyncTestException : TException { string reason; mixin TStructHelpers!(); } void main(string[] args) { ushort port = 9090; ushort managerCount = 2; ushort serversPerManager = 5; ushort threadsPerServer = 10; uint iterations = 10; bool ssl; bool trace; getopt(args, "iterations", &iterations, "managers", &managerCount, "port", &port, "servers-per-manager", &serversPerManager, "ssl", &ssl, "threads-per-server", &threadsPerServer, "trace", &trace, ); TTransportFactory clientTransportFactory; TSSLContext serverSSLContext; if (ssl) { auto clientSSLContext = new TSSLContext(); with (clientSSLContext) { authenticate = true; ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; loadTrustedCertificates("../../../test/keys/CA.pem"); } clientTransportFactory = new TAsyncSSLSocketFactory(clientSSLContext); serverSSLContext = new TSSLContext(); with (serverSSLContext) { serverSide = true; ciphers = "ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH"; loadCertificate("../../../test/keys/server.crt"); loadPrivateKey("../../../test/keys/server.key"); } } else { clientTransportFactory = new TBufferedTransportFactory; } auto serverCancel = new TCancellationOrigin; scope(exit) { writeln("Triggering server shutdown..."); serverCancel.trigger(); writeln("done."); } auto managers = new TLibeventAsyncManager[managerCount]; scope (exit) foreach (ref m; managers) destroy(m); auto clientsThreads = new ThreadGroup; foreach (managerIndex, ref manager; managers) { manager = new TLibeventAsyncManager; foreach (serverIndex; 0 .. serversPerManager) { auto currentPort = cast(ushort) (port + managerIndex * serversPerManager + serverIndex); // Start the server and wait until it is up and running. auto servingMutex = new Mutex; auto servingCondition = new Condition(servingMutex); auto handler = new PreServeNotifyHandler(servingMutex, servingCondition); synchronized (servingMutex) { (new ServerThread!TSimpleServer(currentPort, serverSSLContext, trace, serverCancel, handler)).start(); servingCondition.wait(); } // We only run the timing tests for the first server on each async // manager, so that we don't get spurious timing errors becaue of // ordering issues. auto runTimingTests = (serverIndex == 0); auto c = new ClientsThread(manager, currentPort, clientTransportFactory, threadsPerServer, iterations, runTimingTests, trace); clientsThreads.add(c); c.start(); } } clientsThreads.joinAll(); } class AsyncTestHandler : AsyncTest { this(bool trace) { trace_ = trace; } override string echo(string value) { if (trace_) writefln(`echo("%s")`, value); return value; } override string delayedEcho(string value, long milliseconds) { if (trace_) writef(`delayedEcho("%s", %s ms)... `, value, milliseconds); Thread.sleep(dur!"msecs"(milliseconds)); if (trace_) writeln("returning."); return value; } override void fail(string reason) { if (trace_) writefln(`fail("%s")`, reason); auto ate = new AsyncTestException; ate.reason = reason; throw ate; } override void delayedFail(string reason, long milliseconds) { if (trace_) writef(`delayedFail("%s", %s ms)... `, reason, milliseconds); Thread.sleep(dur!"msecs"(milliseconds)); if (trace_) writeln("returning."); auto ate = new AsyncTestException; ate.reason = reason; throw ate; } private: bool trace_; AsyncTestException ate_; } class PreServeNotifyHandler : TServerEventHandler { this(Mutex servingMutex, Condition servingCondition) { servingMutex_ = servingMutex; servingCondition_ = servingCondition; } void preServe() { synchronized (servingMutex_) { servingCondition_.notifyAll(); } } Variant createContext(TProtocol input, TProtocol output) { return Variant.init; } void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {} void preProcess(Variant serverContext, TTransport transport) {} private: Mutex servingMutex_; Condition servingCondition_; } class ServerThread(ServerType) : Thread { this(ushort port, TSSLContext sslContext, bool trace, TCancellation cancellation, TServerEventHandler eventHandler ) { port_ = port; sslContext_ = sslContext; trace_ = trace; cancellation_ = cancellation; eventHandler_ = eventHandler; super(&run); } void run() { TServerSocket serverSocket; if (sslContext_) { serverSocket = new TSSLServerSocket(port_, sslContext_); } else { serverSocket = new TServerSocket(port_); } auto transportFactory = new TBufferedTransportFactory; auto protocolFactory = new TBinaryProtocolFactory!(); auto processor = new TServiceProcessor!AsyncTest(new AsyncTestHandler(trace_)); auto server = new ServerType(processor, serverSocket, transportFactory, protocolFactory); server.eventHandler = eventHandler_; writefln("Starting server on port %s...", port_); server.serve(cancellation_); writefln("Server thread on port %s done.", port_); } private: ushort port_; bool trace_; TCancellation cancellation_; TSSLContext sslContext_; TServerEventHandler eventHandler_; } class ClientsThread : Thread { this(TAsyncSocketManager manager, ushort port, TTransportFactory tf, ushort threads, uint iterations, bool runTimingTests, bool trace ) { manager_ = manager; port_ = port; transportFactory_ = tf; threads_ = threads; iterations_ = iterations; runTimingTests_ = runTimingTests; trace_ = trace; super(&run); } void run() { auto transport = new TAsyncSocket(manager_, "localhost", port_); { auto client = new TAsyncClient!AsyncTest( transport, transportFactory_, new TBinaryProtocolFactory!() ); transport.open(); auto clientThreads = new ThreadGroup; foreach (clientId; 0 .. threads_) { clientThreads.create({ auto c = clientId; return { foreach (i; 0 .. iterations_) { immutable id = text(port_, ":", c, ":", i); { if (trace_) writefln(`Calling echo("%s")... `, id); auto a = client.echo(id); enforce(a == id); if (trace_) writefln(`echo("%s") done.`, id); } { if (trace_) writefln(`Calling fail("%s")... `, id); auto a = cast(AsyncTestException)collectException(client.fail(id).waitGet()); enforce(a && a.reason == id); if (trace_) writefln(`fail("%s") done.`, id); } } }; }()); } clientThreads.joinAll(); transport.close(); } if (runTimingTests_) { auto client = new TAsyncClient!AsyncTest( transport, transportFactory_, new TBinaryProtocolFactory!TBufferedTransport ); // Temporarily redirect error logs to stdout, as SSL errors on the server // side are expected when the client terminates aburptly (as is the case // in the timeout test). auto oldErrorLogSink = g_errorLogSink; g_errorLogSink = g_infoLogSink; scope (exit) g_errorLogSink = oldErrorLogSink; foreach (i; 0 .. iterations_) { transport.open(); immutable id = text(port_, ":", i); { if (trace_) writefln(`Calling delayedEcho("%s", 100 ms)...`, id); auto a = client.delayedEcho(id, 100); enforce(!a.completion.wait(dur!"usecs"(1)), text("wait() succeeded early (", a.get(), ", ", id, ").")); enforce(!a.completion.wait(dur!"usecs"(1)), text("wait() succeeded early (", a.get(), ", ", id, ").")); enforce(a.completion.wait(dur!"msecs"(200)), text("wait() didn't succeed as expected (", id, ").")); enforce(a.get() == id); if (trace_) writefln(`... delayedEcho("%s") done.`, id); } { if (trace_) writefln(`Calling delayedFail("%s", 100 ms)... `, id); auto a = client.delayedFail(id, 100); enforce(!a.completion.wait(dur!"usecs"(1)), text("wait() succeeded early (", id, ", ", collectException(a.get()), ").")); enforce(!a.completion.wait(dur!"usecs"(1)), text("wait() succeeded early (", id, ", ", collectException(a.get()), ").")); enforce(a.completion.wait(dur!"msecs"(200)), text("wait() didn't succeed as expected (", id, ").")); auto e = cast(AsyncTestException)collectException(a.get()); enforce(e && e.reason == id); if (trace_) writefln(`... delayedFail("%s") done.`, id); } { transport.recvTimeout = dur!"msecs"(50); if (trace_) write(`Calling delayedEcho("socketTimeout", 100 ms)... `); auto a = client.delayedEcho("socketTimeout", 100); auto e = cast(TTransportException)collectException(a.waitGet()); enforce(e, text("Operation didn't fail as expected (", id, ").")); enforce(e.type == TTransportException.Type.TIMED_OUT, text("Wrong timeout exception type (", id, "): ", e)); if (trace_) writeln(`timed out as expected.`); // Wait until the server thread reset before the next iteration. Thread.sleep(dur!"msecs"(50)); transport.recvTimeout = dur!"hnsecs"(0); } transport.close(); } } writefln("Clients thread for port %s done.", port_); } TAsyncSocketManager manager_; ushort port_; TTransportFactory transportFactory_; ushort threads_; uint iterations_; bool runTimingTests_; bool trace_; }