summaryrefslogtreecommitdiffstats
path: root/src/Launch.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/Launch.cpp')
-rw-r--r--src/Launch.cpp478
1 files changed, 478 insertions, 0 deletions
diff --git a/src/Launch.cpp b/src/Launch.cpp
new file mode 100644
index 0000000..9f0c633
--- /dev/null
+++ b/src/Launch.cpp
@@ -0,0 +1,478 @@
+/*---------------------------------------------------------------
+ * Copyright (c) 1999,2000,2001,2002,2003
+ * The Board of Trustees of the University of Illinois
+ * All Rights Reserved.
+ *---------------------------------------------------------------
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software (Iperf) and associated
+ * documentation files (the "Software"), to deal in the Software
+ * without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute,
+ * sublicense, and/or sell copies of the Software, and to permit
+ * persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ *
+ * Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and
+ * the following disclaimers.
+ *
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimers in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ *
+ * Neither the names of the University of Illinois, NCSA,
+ * nor the names of its contributors may be used to endorse
+ * or promote products derived from this Software without
+ * specific prior written permission.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ * ________________________________________________________________
+ * National Laboratory for Applied Network Research
+ * National Center for Supercomputing Applications
+ * University of Illinois at Urbana-Champaign
+ * http://www.ncsa.uiuc.edu
+ * ________________________________________________________________
+ *
+ * Launch.cpp
+ * by Kevin Gibbs <kgibbs@nlanr.net>
+ * -------------------------------------------------------------------
+ * Functions to launch new server and client threads from C while
+ * the server and client are in C++.
+ * The launch function for reporters is in Reporter.c since it is
+ * in C and does not need a special launching function.
+ * ------------------------------------------------------------------- */
+
+#include "headers.h"
+#include "Thread.h"
+#include "Settings.hpp"
+#include "Client.hpp"
+#include "Listener.hpp"
+#include "Server.hpp"
+#include "PerfSocket.hpp"
+#include "active_hosts.h"
+#include "SocketAddr.h"
+#include "delay.h"
+
+static int fullduplex_startstop_barrier (struct BarrierMutex *barrier) {
+ int rc = 0;
+ assert(barrier != NULL);
+ Condition_Lock(barrier->await);
+ ++barrier->count;
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Fullduplex barrier incr to %d %p ", barrier->count, (void *)&barrier->await, rc);
+#endif
+ if (barrier->count == 2) {
+ rc = 1;
+ // last one wake's up everyone else'
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Fullduplex startstop broadcast on condition %p ", (void *)&barrier->await, rc);
+#endif
+ Condition_Broadcast(&barrier->await);
+ } else {
+ int timeout = barrier->timeout;
+ while ((barrier->count != 2) && (timeout > 0)) {
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Fullduplex startstop barrier wait %p %d/2 (%d)", (void *)&barrier->await, barrier->count, timeout);
+#endif
+ Condition_TimedWait(&barrier->await, 1);
+ timeout--;
+ if ((timeout == 0) && (barrier->count != 2)) {
+ fprintf(stdout, "Barrier timeout per full duplex traffic\n");
+ Condition_Unlock(barrier->await);
+ return -1;
+ }
+ }
+ barrier->count=0;
+ }
+ Condition_Unlock(barrier->await);
+ return rc;
+}
+int fullduplex_start_barrier (struct BarrierMutex *barrier) {
+ int rc = fullduplex_startstop_barrier(barrier);
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Fullduplex start barrier done on condition %p rc=%d", (void *)&barrier->await, rc);
+#endif
+ return rc;
+}
+int fullduplex_stop_barrier (struct BarrierMutex *barrier) {
+ int rc = fullduplex_startstop_barrier(barrier);
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Fullduplex stop barrier done on condition %p rc=%d", (void *)&barrier->await, rc);
+#endif
+ return rc;
+}
+
+/*
+ * listener_spawn is responsible for creating a Listener class
+ * and launching the listener. It is provided as a means for
+ * the C thread subsystem to launch the listener C++ object.
+ */
+void listener_spawn(struct thread_Settings *thread) {
+ Listener *theListener = NULL;
+ // the Listener need to trigger a settings report
+ setReport(thread);
+ // start up a listener
+ theListener = new Listener(thread);
+ // Start listening
+ theListener->Run();
+ DELETE_PTR(theListener);
+}
+
+/*
+ * server_spawn is responsible for creating a Server class
+ * and launching the server. It is provided as a means for
+ * the C thread subsystem to launch the server C++ object.
+ */
+void server_spawn(struct thread_Settings *thread) {
+ Server *theServer = NULL;
+#ifdef HAVE_THREAD_DEBUG
+ if (isBounceBack(thread)) {
+ thread_debug("spawn server bounce-back");
+ } else {
+ thread_debug("spawn server settings=%p GroupSumReport=%p sock=%d", \
+ (void *) thread, (void *)thread->mSumReport, thread->mSock);
+ }
+#endif
+ // set traffic thread to realtime if needed
+#if HAVE_SCHED_SETSCHEDULER
+ thread_setscheduler(thread);
+#endif
+ // Start up the server
+ theServer = new Server(thread);
+ if (isTxStartTime(thread)) {
+ clock_usleep_abstime(&thread->txstart_epoch);
+ }
+ // Run the test
+ if (isUDP(thread)) {
+ theServer->RunUDP();
+ } else {
+ if (isBounceBack(thread)) {
+ theServer->RunBounceBackTCP();
+ } else {
+ theServer->RunTCP();
+ }
+ }
+ DELETE_PTR(theServer);
+}
+
+static void clientside_client_basic (struct thread_Settings *thread, Client *theClient) {
+ setTransferID(thread, 0);
+#ifdef HAVE_THREAD_DEBUG
+ if (isBounceBack(thread)) {
+ thread_debug("Launch: spawn client bounce-back mode, size = %d", thread->mBurstSize);
+ } else {
+ thread_debug("Launch: client spawn thread basic (sock=%d)", thread->mSock);
+ }
+#endif
+ SockAddr_remoteAddr(thread);
+ // Bounceback start delays the connect too
+ if (isBounceBack(thread)) {
+ if (isTxStartTime(thread)) {
+ theClient->mySockInit();
+ clock_usleep_abstime(&thread->txstart_epoch);
+ } else if (isTxHoldback(thread)) {
+ unsetTxHoldback(thread);
+ theClient->mySockInit();
+ clock_usleep(&thread->txholdback_timer);
+ }
+ }
+ theClient->my_connect(true);
+ if ((thread->mThreads > 1) && !isNoConnectSync(thread) && !isCompat(thread))
+ // When -P > 1 then all threads finish connect before starting traffic
+ theClient->BarrierClient(thread->connects_done);
+ if (theClient->isConnected()) {
+ if ((thread->mThreads > 1) || isSumOnly(thread))
+ Iperf_push_host(thread);
+ theClient->StartSynch();
+ theClient->Run();
+ }
+}
+
+static void clientside_client_reverse (struct thread_Settings *thread, \
+ struct thread_Settings *reverse_client, Client *theClient) {
+ setTransferID(thread, 0);
+ SockAddr_remoteAddr(thread);
+ theClient->my_connect(true);
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Launch: client spawn thread reverse (sock=%d)", thread->mSock);
+#endif
+ if ((thread->mThreads > 1) && !isNoConnectSync(thread))
+ // When -P > 1 then all threads finish connect before starting traffic
+ theClient->BarrierClient(thread->connects_done);
+ if (theClient->isConnected()) {
+ FAIL((!reverse_client || !(thread->mSock > 0)), "Reverse test failed to start per thread settings or socket problem", thread);
+ setTransferID(reverse_client, 1);
+ theClient->StartSynch();
+ reverse_client->mSock = thread->mSock; // use the same socket for both directions
+ reverse_client->mThreadMode = kMode_Server;
+ setReverse(reverse_client);
+ setNoUDPfin(reverse_client); // disable the fin report - no need
+ reverse_client->size_local = sizeof(iperf_sockaddr);
+ getsockname(reverse_client->mSock, reinterpret_cast<sockaddr*>(&reverse_client->local), &reverse_client->size_local);
+ if ((thread->mThreads > 1) || isSumOnly(thread))
+ Iperf_push_host(reverse_client);
+ thread_start(reverse_client);
+ if (theClient->myJob)
+ FreeReport(theClient->myJob);
+ }
+}
+
+static void clientside_client_fullduplex (struct thread_Settings *thread, \
+ struct thread_Settings *reverse_client, Client *theClient) {
+ setTransferID(thread, 0);
+ SockAddr_remoteAddr(thread);
+ if (!isBounceBack(thread)) {
+ thread->mFullDuplexReport = InitSumReport(thread, -1, 1);
+ }
+ Settings_Copy(thread, &reverse_client, SHALLOW_COPY);
+ if ((thread->mThreads > 1) || isSumOnly(thread) || \
+ (!(thread->mThreads > 1) && !isEnhanced(thread))) {
+ Iperf_push_host(thread);
+ Iperf_push_host(reverse_client);
+ }
+ assert(reverse_client != NULL);
+ setTransferID(reverse_client, 1);
+ theClient->my_connect(true);
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Launch: client spawn thread fullduplex (sock=%d)", thread->mSock);
+#endif
+ if ((thread->mThreads > 1) && !isNoConnectSync(thread))
+ // When -P > 1 then all threads finish connect before starting traffic
+ theClient->BarrierClient(thread->connects_done);
+ if (theClient->isConnected()) {
+ thread->mFullDuplexReport->info.common->socket = thread->mSock;
+ FAIL((!reverse_client || !(thread->mSock > 0)), "Reverse test failed to start per thread settings or socket problem", thread);
+ reverse_client->mSock = thread->mSock; // use the same socket for both directions
+ reverse_client->mThreadMode = kMode_Server;
+ setReverse(reverse_client);
+ if (isModeTime(reverse_client)) {
+ reverse_client->mAmount += (SLOPSECS * 100); // add 2 sec for slop on reverse, units are 10 ms
+ }
+ thread_start(reverse_client);
+ if (theClient->StartSynch() != -1) {
+ theClient->Run();
+ }
+ }
+}
+
+static void serverside_client_fullduplex (struct thread_Settings *thread, Client *theClient) {
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Launch: Listener spawn client thread (fd sock=%d)", thread->mSock);
+#endif
+ setTransferID(thread, 1);
+ if (theClient->StartSynch() != -1) {
+ theClient->Run();
+ }
+}
+
+static void serverside_client_bidir (struct thread_Settings *thread, Client *theClient) {
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Launch: Listener spawn client thread (bidir sock=%d)", thread->mSock);
+#endif
+ setTransferID(thread, 1);
+ SockAddr_zeroAddress(&thread->peer);
+ SockAddr_remoteAddr(thread);
+ if (thread->mReportMode == kReport_CSV) {
+ setNoSettReport(thread);
+ setNoConnReport(thread);
+ } else {
+ unsetNoSettReport(thread);
+ unsetNoConnReport(thread);
+ }
+ setReport(thread);
+ theClient->my_connect(false);
+ if (theClient->isConnected()) {
+ Iperf_push_host(thread);
+ if (theClient->StartSynch() != -1) {
+ theClient->Run();
+ }
+ }
+}
+
+/*
+ * client_spawn is responsible for creating a Client class
+ * and launching the client. It is provided as a means for
+ * the C thread subsystem to launch the client C++ object.
+ *
+ * There are a few different client startup modes
+ * o) Normal
+ * o) Dual (-d or -r) (legacy)
+ * o) Reverse (Client side) (client acts like server)
+ * o) FullDuplex (Client side) client starts server
+ * o) ServerReverse (Server side) (listener starts a client)
+ * o) FullDuplex (Server side) (listener starts server & client)
+ * o) WriteAck
+ *
+ * Note: This runs in client thread context
+ */
+void client_spawn (struct thread_Settings *thread) {
+ Client *theClient = NULL;
+
+ // set traffic thread to realtime if needed
+#if HAVE_SCHED_SETSCHEDULER
+ thread_setscheduler(thread);
+#endif
+ // start up the client
+ setTransferID(thread, 0);
+ theClient = new Client(thread);
+ // let the reporter thread go first in the case of -P greater than 1
+ Condition_Lock(reporter_state.await);
+ while (!reporter_state.ready) {
+ Condition_TimedWait(&reporter_state.await, 1);
+ }
+ Condition_Unlock(reporter_state.await);
+
+ if (isConnectOnly(thread)) {
+ theClient->ConnectPeriodic();
+ } else if (!isServerReverse(thread)) {
+ // These are the client side spawning of clients
+ if (!isReverse(thread) && !isFullDuplex(thread)) {
+ clientside_client_basic(thread, theClient);
+ } else if (isReverse(thread) && !isFullDuplex(thread)) {
+ struct thread_Settings *reverse_thread = NULL;
+ Settings_Copy(thread, &reverse_thread, DEEP_COPY);
+ FAIL((reverse_thread == NULL), "Reverse thread alloc failed", thread);
+ clientside_client_reverse(thread, reverse_thread, theClient);
+ } else if (isFullDuplex(thread)) {
+ struct thread_Settings *reverse_thread = NULL;
+ Settings_Copy(thread, &reverse_thread, DEEP_COPY);
+ FAIL((reverse_thread == NULL), "Reverse in full-duplex thread alloc failed", thread);
+ clientside_client_fullduplex(thread, reverse_thread, theClient);
+ } else {
+ fprintf(stdout, "Program error in client side client_spawn");
+ _exit(-1);
+ }
+ } else {
+ if (thread->mMode != kTest_Normal) {
+ setCompat(thread);
+ // These are the server or listener side spawning of clients
+ serverside_client_bidir(thread, theClient);
+ } else {
+ serverside_client_fullduplex(thread, theClient);
+ }
+ }
+ // Call the client's destructor
+ DELETE_PTR(theClient);
+}
+
+/*
+ * client_init handles multiple threaded connects. It creates
+ * a listener object if either the dual test or tradeoff were
+ * specified. It also creates settings structures for all the
+ * threads and arranges them so they can be managed and started
+ * via the one settings structure that was passed in.
+ *
+ * Note: This runs in main thread context
+ */
+void client_init(struct thread_Settings *clients) {
+ struct thread_Settings *itr = NULL;
+ struct thread_Settings *next = NULL;
+
+ itr = clients;
+ setReport(clients);
+ // See if we need to start a listener as well
+ Settings_GenerateListenerSettings(clients, &next);
+#ifdef HAVE_THREAD
+ if (next != NULL) {
+ // We have threads and we need to start a listener so
+ // have it ran before the client is launched
+ itr->runNow = next;
+ itr = next;
+ }
+ // For each of the needed threads create a copy of the
+ // provided settings, unsetting the report flag and add
+ // to the list of threads to start
+ for (int i = 1; i < clients->mThreads; i++) {
+ Settings_Copy(clients, &next, DEEP_COPY);
+ // printf("*****port/thread = %d/%d\n", next->mPort + i, i);
+ if (next) {
+ if (isIncrSrcIP(clients) && (clients->mLocalhost != NULL)) {
+ next->incrsrcip = i;
+ }
+ if (isIncrDstIP(clients)) {
+ next->incrdstip = i;
+ // force a setHostname
+ SockAddr_zeroAddress(&next->peer);
+ } else if (clients->mBindPort) {
+ // Increment the source port of none of the quintuple is being change or the user requests it
+ if ((!isIncrDstPort(clients) && !isIncrDstIP(clients) && !isIncrSrcIP(clients)) || isIncrSrcPort(clients)) {
+ // case -B with src port and -P > 1
+ next->incrsrcport = i;
+ }
+ }
+ if (isIncrDstPort(clients)) {
+ next->mPort += i;
+ SockAddr_zeroAddress(&next->peer);
+ }
+ }
+ itr->runNow = next;
+ itr = next;
+ }
+ if ((isBounceBack(clients) || isConnectOnly(clients) || isPeriodicBurst(clients)) \
+ && (isWorkingLoadUp(clients) || isWorkingLoadDown(clients))) {
+ int working_load_threads = (clients->mWorkingLoadThreads == 0) ? 1 : clients->mWorkingLoadThreads;
+ while (working_load_threads--) {
+ Settings_Copy(clients, &next, DEEP_COPY);
+ if (next != NULL) {
+ unsetBounceBack(next);
+ unsetConnectOnly(next);
+ unsetPeriodicBurst(next);
+ unsetTxHoldback(next); // don't delay working load threads
+ next->mTOS = 0; // disable any QoS on the congestion stream
+#if HAVE_DECL_TCP_NOTSENT_LOWAT
+ next->mWritePrefetch = SMALL_WRITE_PREFETCH;
+ setWritePrefetch(next);
+#endif
+ if (isWorkingLoadUp(clients) && isWorkingLoadDown(clients)) {
+ setFullDuplex(next);
+ } else if (isWorkingLoadDown(clients)) {
+ setReverse(next);
+ }
+ if (isBounceBack(clients) && (clients->mWorkingLoadThreads > 1)) {
+ Iperf_push_host(clients);
+ }
+ // Bump the bounceback time to include the delay time
+ if (next->txholdback_timer.tv_sec || next->txholdback_timer.tv_usec) {
+ // mAmount units are 10 ms
+ next->mAmount += (next->txholdback_timer.tv_sec * 100);
+ next->mAmount += (next->txholdback_timer.tv_usec / 10000);
+ }
+ itr->runNow = next;
+ itr = next;
+ }
+ }
+ }
+#else
+ if (next != NULL) {
+ // We don't have threads and we need to start a listener so
+ // have it ran after the client is finished
+ itr->runNext = next;
+ }
+#endif
+}
+
+void listeners_init(struct thread_Settings *listener) {
+ struct thread_Settings *itr = listener;
+ struct thread_Settings *next = NULL;
+ for (int ix = 1; ix < (listener->mPortLast - listener->mPort + 1); ix++) {
+ Settings_Copy(listener, &next, DEEP_COPY);
+ if (next != NULL) {
+ setNoSettReport(next);
+ next->mPort = listener->mPort + ix;
+ next->mThreadMode = kMode_Listener;
+ itr->runNow = next;
+ itr = next;
+ }
+ }
+}