From 04fc174d50fd19d6ae78fd2fd2faae221acff807 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 30 May 2024 05:25:49 +0200 Subject: Adding upstream version 2.2.0+dfsg. Signed-off-by: Daniel Baumann --- src/Client.cpp | 798 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 521 insertions(+), 277 deletions(-) (limited to 'src/Client.cpp') diff --git a/src/Client.cpp b/src/Client.cpp index 89e47e2..d56ee1c 100644 --- a/src/Client.cpp +++ b/src/Client.cpp @@ -67,6 +67,7 @@ #include "payloads.h" #include "active_hosts.h" #include "gettcpinfo.h" +#include "iperf_formattime.h" // const double kSecs_to_usecs = 1e6; const double kSecs_to_nsecs = 1e9; @@ -74,6 +75,7 @@ const int kBytes_to_Bits = 8; #define VARYLOAD_PERIOD 0.1 // recompute the variable load every n seconds #define MAXUDPBUF 1470 +#define TCPDELAYDEFAULTQUANTUM 4000 // units usecs Client::Client (thread_Settings *inSettings) { #ifdef HAVE_THREAD_DEBUG @@ -136,13 +138,7 @@ Client::~Client () { void Client::mySockInit (void) { // create an internet socket int type = (isUDP(mSettings) ? SOCK_DGRAM : SOCK_STREAM); - int domain = (SockAddr_isIPv6(&mSettings->peer) ? -#if HAVE_IPV6 - AF_INET6 -#else - AF_INET -#endif - : AF_INET); + int domain = SockAddr_getAFdomain(&mSettings->peer); mySocket = socket(domain, type, 0); WARN_errno(mySocket == INVALID_SOCKET, "socket"); @@ -151,20 +147,23 @@ void Client::mySockInit (void) { SetSocketOptions(mSettings); SockAddr_localAddr(mSettings); SockAddr_remoteAddr(mSettings); - if (mSettings->mLocalhost != NULL) { - // bind socket to local address - int rc = bind(mySocket, reinterpret_cast(&mSettings->local), - SockAddr_get_sizeof_sockaddr(&mSettings->local)); - WARN_errno(rc == SOCKET_ERROR, "bind"); + // do this bind to device after IP addr name lookups per above + SetSocketBindToDeviceIfNeeded(mSettings); + if (mSettings->mLocalhost != NULL) { // bind src ip if needed + // bind socket to local address + int rc = bind(mSettings->mSock, (struct sockaddr *)(&mSettings->local), \ + SockAddr_get_sizeof_sockaddr(&mSettings->local)); + WARN_errno(rc == SOCKET_ERROR, "bind"); } - mysock_init_done = true; - if (!isUDP(mSettings) && isReport(mSettings) && isSettingsReport(mSettings)) { + if (isSettingsReport(mSettings)) { struct ReportHeader *tmp = InitSettingsReport(mSettings); assert(tmp!=NULL); PostReport(tmp); - setNoSettReport(mSettings); } + mysock_init_done = true; } + + /* ------------------------------------------------------------------- * Setup a socket connected to a server. * If inLocalhost is not null, bind to that address, specifying @@ -177,29 +176,69 @@ bool Client::my_connect (bool close_on_fail) { } // connect socket connected = false; + int connect_errno = 0; mSettings->tcpinitstats.connecttime = -1; if (!isUDP(mSettings)) { - int trycnt = mSettings->mConnectRetries + 1; - while (trycnt > 0) { + Timestamp end_connect_retry; + end_connect_retry.add(mSettings->connect_retry_time); + do { connect_start.setnow(); rc = connect(mySocket, reinterpret_cast(&mSettings->peer), SockAddr_get_sizeof_sockaddr(&mSettings->peer)); - WARN_errno((rc == SOCKET_ERROR), "tcp connect"); + connect_done.setnow(); if (rc == SOCKET_ERROR) { - if ((--trycnt) <= 0) { - if (close_on_fail) { - close(mySocket); - mySocket = INVALID_SOCKET; - } - } else { - delay_loop(200000); + char timestr[120]; + char tmpaddr[200]; + char errtext[50]; + connect_errno = errno_decode(errtext, sizeof(errtext)); + unsigned short port = SockAddr_getPort(&mSettings->peer); + SockAddr_getHostAddress(&mSettings->peer, tmpaddr, sizeof(tmpaddr)); + struct timeval t; + t.tv_sec = connect_done.getSecs(); + t.tv_usec = connect_done.getUsecs(); + iperf_formattime(timestr, sizeof(timestr), t, isEnhanced(mSettings), isUTC(mSettings), YearThruSecTZ); + int slen = snprintf(NULL, 0, "%stcp connect to %s port %d failed (%s) on %s", \ + mSettings->mTransferIDStr, tmpaddr, port, errtext, timestr); + char *text = (char *) calloc((slen+1), sizeof(char)); + if (text) { + snprintf(text, (slen+1), "%stcp connect to %s port %d failed (%s) on %s", \ + mSettings->mTransferIDStr, tmpaddr, port, errtext, timestr); + PostReport(InitStringReport(text)); + FREE_ARRAY(text); + } + bool need_open = false; + if (close_on_fail || FATALTCPCONNECTERR(errno)) { // MAC OS kicks out invalid argument at times, not sure why + close(mySocket); + mySockInit(); + delay_loop(10000); // delay the minimum of 10ms + need_open = true; + } + if (!need_open && connect_done.before(end_connect_retry)) { + int delay = mSettings->connect_retry_timer - (connect_done.subUsec(connect_start)); + delay_loop((delay < 10000) ? 10000 : delay); // minimum of 10ms } } else { - connect_done.setnow(); mSettings->tcpinitstats.connecttime = 1e3 * connect_done.subSec(connect_start); connected = true; break; } + } while (connect_done.before(end_connect_retry)); + if (!connected && (mSettings->connect_retry_time > 0)) { + char timestr[120]; + struct timeval t; + t.tv_sec = end_connect_retry.getSecs(); + t.tv_usec = end_connect_retry.getUsecs(); + iperf_formattime(timestr, sizeof(timestr), t, isEnhanced(mSettings), isUTC(mSettings), YearThruSecTZ); + int len = snprintf(NULL, 0, "%stcp connect attempt timer expired on %s\n", \ + mSettings->mTransferIDStr, timestr); + char *text = (char *) calloc(len+1, sizeof(char)); + if (text) { + snprintf(text, len, "%stcp connect attempt timer expried on %s\n", \ + mSettings->mTransferIDStr, timestr); + PostReport(InitStringReport(text)); + FREE_ARRAY(text); + return false; + } } } else { rc = connect(mySocket, reinterpret_cast(&mSettings->peer), @@ -217,8 +256,6 @@ bool Client::my_connect (bool close_on_fail) { } #endif // Set the send timeout for the very first write which has the test exchange - int sosndtimer = TESTEXCHANGETIMEOUT; // 4 sec in usecs - SetSocketOptionsSendTimeout(mSettings, sosndtimer); getsockname(mySocket, reinterpret_cast(&mSettings->local), &mSettings->size_local); getpeername(mySocket, reinterpret_cast(&mSettings->peer), &mSettings->size_peer); SockAddr_Ifrname(mSettings); @@ -230,12 +267,6 @@ bool Client::my_connect (bool close_on_fail) { if (isUDP(mSettings) && !isIsochronous(mSettings) && !isIPG(mSettings)) { mSettings->mBurstIPG = get_delay_target() / 1e3; // this is being set for the settings report only } - if (isReport(mSettings) && isSettingsReport(mSettings)) { - struct ReportHeader *tmp = InitSettingsReport(mSettings); - assert(tmp!=NULL); - PostReport(tmp); - setNoSettReport(mSettings); - } } else { if (mySocket != INVALID_SOCKET) { int rc = close(mySocket); @@ -252,7 +283,7 @@ bool Client::my_connect (bool close_on_fail) { cr->connect_timestamp.tv_usec = connect_start.getUsecs(); assert(reporthdr); PostReport(reporthdr); - } else { + } else if (!connect_errno) { PostReport(InitConnectionReport(mSettings)); } } @@ -277,6 +308,10 @@ inline void Client::myReportPacket (void) { reportstruct->packetLen = 0; } +inline void Client::myReportPacket (struct ReportStruct *reportptr) { + ReportPacket(myReport, reportptr); +} + // There are multiple startup synchronizations, this code // handles them all. The caller decides to apply them @@ -303,8 +338,8 @@ int Client::StartSynch () { // o Second is a holdback, a relative amount of seconds between the connect and data xfers // check for an epoch based start time reportstruct->packetLen = 0; - if (!isServerReverse(mSettings)) { - if (!isCompat(mSettings) && !isBounceBack(mSettings)) { + if (!isServerReverse(mSettings) && !isCompat(mSettings)) { + if (!isBounceBack(mSettings)) { reportstruct->packetLen = SendFirstPayload(); // Reverse UDP tests need to retry "first sends" a few times // before going to server or read mode @@ -336,14 +371,18 @@ int Client::StartSynch () { } else if (isTripTime(mSettings) || isPeriodicBurst(mSettings)) { reportstruct->packetLen = SendFirstPayload(); } - if (isIsochronous(mSettings) || isPeriodicBurst(mSettings)) { + if (isIsochronous(mSettings) || isPeriodicBurst(mSettings) || isBounceBack(mSettings)) { Timestamp tmp; - tmp.set(mSettings->txstart_epoch.tv_sec, mSettings->txstart_epoch.tv_usec); - framecounter = new Isochronous::FrameCounter(mSettings->mFPS, tmp); - // set the mbuf valid for burst period ahead of time. The same value will be set for all burst writes - if (!isUDP(mSettings) && framecounter) { - struct TCP_burst_payload * mBuf_burst = reinterpret_cast(mSettings->mBuf); - mBuf_burst->burst_period_us = htonl(framecounter->period_us()); + if (isTxStartTime(mSettings)) { + tmp.set(mSettings->txstart_epoch.tv_sec, mSettings->txstart_epoch.tv_usec); + } + if (mSettings->mFPS > 0) { + framecounter = new Isochronous::FrameCounter(mSettings->mFPS, tmp); + // set the mbuf valid for burst period ahead of time. The same value will be set for all burst writes + if (!isUDP(mSettings) && framecounter) { + struct TCP_burst_payload * mBuf_burst = reinterpret_cast(mSettings->mBuf); + mBuf_burst->burst_period_us = htonl(framecounter->period_us()); + } } } int setfullduplexflag = 0; @@ -355,12 +394,12 @@ int Client::StartSynch () { SetReportStartTime(); #if HAVE_TCP_STATS if (!isUDP(mSettings)) { - // Near congestion and peridiodic need sampling on every report packet + // Near congestion and periodic need sampling on every report packet if (isNearCongest(mSettings) || isPeriodicBurst(mSettings)) { myReport->info.isEnableTcpInfo = true; myReport->info.ts.nextTCPSampleTime.tv_sec = 0; myReport->info.ts.nextTCPSampleTime.tv_usec = 0; - } else if (isEnhanced(mSettings) || isBounceBack(mSettings)) { + } else if (isEnhanced(mSettings) || isBounceBack(mSettings) || isFQPacing(mSettings)) { myReport->info.isEnableTcpInfo = true; myReport->info.ts.nextTCPSampleTime = myReport->info.ts.nextTime; } @@ -368,6 +407,7 @@ int Client::StartSynch () { #endif if (reportstruct->packetLen > 0) { + reportstruct->err_readwrite = WriteSuccess; reportstruct->packetTime = myReport->info.ts.startTime; reportstruct->sentTime = reportstruct->packetTime; reportstruct->prevSentTime = reportstruct->packetTime; @@ -406,6 +446,9 @@ inline void Client::SetFullDuplexReportStartTime () { inline void Client::SetReportStartTime () { assert(myReport!=NULL); now.setnow(); + if (isUDP(mSettings) && (mSettings->sendfirst_pacing > 0)) { + now.add(static_cast(mSettings->sendfirst_pacing)); + } myReport->info.ts.startTime.tv_sec = now.getSecs(); myReport->info.ts.startTime.tv_usec = now.getUsecs(); myReport->info.ts.IPGstart = myReport->info.ts.startTime; @@ -491,11 +534,27 @@ void Client::InitTrafficLoop () { if (isPeriodicBurst(mSettings) && (mSettings->mFPS > 0.0)) { sosndtimer = static_cast(round(250000.0 / mSettings->mFPS)); } else if (mSettings->mInterval > 0) { - sosndtimer = static_cast(round(0.5 * mSettings->mInterval)); + sosndtimer = static_cast(round(((mSettings->mThreads > 1) ? 0.25 : 0.5) * mSettings->mInterval)); } else { sosndtimer = static_cast(mSettings->mAmount * 5e3); } - SetSocketOptionsSendTimeout(mSettings, sosndtimer); + // set to 1 second for wraps or too large + if ((sosndtimer < 0) || (sosndtimer > 1000000)) { + sosndtimer = 1000000; + } + if (sosndtimer < 1000) { + sosndtimer = 1000; //lower bound of 1 ms + } + if ((mSettings->mInterval > 0) && (mSettings->mIntervalMode == kInterval_Time)) { + int interval_half = static_cast(round(mSettings->mAmount * 10000) / 2); + if (sosndtimer > interval_half) { + sosndtimer = interval_half; + } + } + if (!isUDP(mSettings)) { + mSettings->sosndtimer = sosndtimer; + SetSocketOptionsSendTimeout(mSettings, sosndtimer); + } // set the lower bounds delay based of the socket timeout timer // units needs to be in nanoseconds delay_lower_bounds = static_cast(sosndtimer) * -1e3; @@ -510,10 +569,18 @@ void Client::InitTrafficLoop () { mEndTime.add(mSettings->mAmount / 100.0); // now.setnow(); fprintf(stderr, "DEBUG: end time set to %ld.%ld now is %ld.%ld\n", mEndTime.getSecs(), mEndTime.getUsecs(), now.getSecs(), now.getUsecs()); } +#if HAVE_DECL_TCP_TX_DELAY + current_state = NO_DELAY; + if (isTcpTxDelay(mSettings)) { + state_tokens[NO_DELAY] = (int) (mSettings->mTcpTxDelayMean * (1 - mSettings->mTcpTxDelayProb)); + state_tokens[ADD_DELAY] = (int) (mSettings->mTcpTxDelayMean * mSettings->mTcpTxDelayProb); + TcpTxDelayQuantumEnd.setnow(); + } +#endif readAt = mSettings->mBuf; lastPacketTime.set(myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec); - reportstruct->errwrite=WriteNoErr; - reportstruct->emptyreport=0; + reportstruct->err_readwrite=WriteSuccess; + reportstruct->emptyreport = false; reportstruct->packetLen = 0; // Finally, post this thread's "job report" which the reporter thread // will continuously process as long as there are packets flowing @@ -576,6 +643,48 @@ void Client::Run () { } } +#if HAVE_DECL_TCP_TX_DELAY +inline void Client::apply_txdelay_func (void) { + now.setnow(); + if (isTcpTxDelay(mSettings) && TcpTxDelayQuantumEnd.before(now)) { + // expense the tokens for the current state + state_tokens[current_state] -= now.subUsec(TcpTxDelayQuantumEnd); + // add tokens + do { + state_tokens[NO_DELAY] += (int) (mSettings->mTcpTxDelayMean * (1 - mSettings->mTcpTxDelayProb)); + state_tokens[ADD_DELAY] += (int) (mSettings->mTcpTxDelayMean * mSettings->mTcpTxDelayProb); + } while ((state_tokens[NO_DELAY] < 0) && (state_tokens[ADD_DELAY] < 0)); + // set the next quantum end + while (TcpTxDelayQuantumEnd.before(now)) + TcpTxDelayQuantumEnd.add((unsigned int) TCPDELAYDEFAULTQUANTUM); + // do any state change + if ((state_tokens[NO_DELAY] < 0) && (current_state == NO_DELAY)) { +// printf("**** f state change to 0->1 current=%d %d %d\n", current_state, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]); + SetSocketTcpTxDelay(mSettings, (mSettings->mTcpTxDelayMean * 1000.0)); + current_state = ADD_DELAY; + } else if ((state_tokens[ADD_DELAY] < 0) && (current_state == ADD_DELAY)) { +// printf("**** f state change to 1->0 current=%d %d %d\n", current_state, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]); + SetSocketTcpTxDelay(mSettings, 0.0); + current_state = NO_DELAY; + } else { + int rval = (random() % 2); + // printf("**** curr=%d rval=%d tokens 0:%d 1:%d\n", current_state, rval, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]); + if (rval != current_state) { + if (rval && (state_tokens[ADD_DELAY] > 0)) { +// printf("**** state change to 0->1 rval=%d current=%d %d %d\n", rval, current_state, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]); + SetSocketTcpTxDelay(mSettings, (mSettings->mTcpTxDelayMean * 1000.0)); + current_state = ADD_DELAY; + } else if ((rval != 1) && (state_tokens[NO_DELAY] > 0)) { +// printf("**** state change to 1->0 rval=%d current=%d %d %d\n", rval, current_state, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]); + SetSocketTcpTxDelay(mSettings, 0.0); + current_state = NO_DELAY; + } + } + } + } +} +#endif + /* * TCP send loop */ @@ -588,6 +697,12 @@ void Client::RunTCP () { reportstruct->packetTime.tv_sec = now.getSecs(); reportstruct->packetTime.tv_usec = now.getUsecs(); reportstruct->write_time = 0; +#if (HAVE_DECL_SO_MAX_PACING_RATE) + if (isFQPacingStep(mSettings)) { + PacingStepTime = now; + PacingStepTime.add(mSettings->mFQPacingRateStepInterval); + } +#endif while (InProgress()) { reportstruct->writecnt = 0; reportstruct->scheduled = false; @@ -598,7 +713,7 @@ void Client::RunTCP () { if (isIsochronous(mSettings)) { assert(mSettings->mMean); burst_remaining = static_cast(lognormal(mSettings->mMean,mSettings->mVariance)) / (mSettings->mFPS * 8); - } else if (isPeriodicBurst(mSettings)){ + } else if (isBurstSize(mSettings)){ assert(mSettings->mBurstSize); burst_remaining = mSettings->mBurstSize; } else { @@ -609,7 +724,7 @@ void Client::RunTCP () { burst_remaining = static_cast(sizeof(struct TCP_burst_payload)); // apply scheduling if needed if (framecounter) { - burst_id = framecounter->wait_tick(&reportstruct->sched_err); + burst_id = framecounter->wait_tick(&reportstruct->sched_err, true); reportstruct->scheduled = true; if (isPeriodicBurst(mSettings)) { // low duty cycle traffic needs special event handling @@ -619,20 +734,17 @@ void Client::RunTCP () { reportstruct->packetTime.tv_usec = now.getUsecs(); if (!InProgress()) { reportstruct->packetLen = 0; - reportstruct->emptyreport = 1; + reportstruct->emptyreport = true; // wait may have crossed the termination boundry break; } else { //time interval crossings may have occurred during the wait //post a null event to cause the report to flush the packet ring - PostNullEvent(); + PostNullEvent(false, false); } } -#if HAVE_DECL_TCP_NOTSENT_LOWAT - if (isWritePrefetch(mSettings)) { - AwaitWriteSelectEventTCP(); - } -#endif + if (isWritePrefetch(mSettings) && !AwaitSelectWrite()) + continue; } now.setnow(); reportstruct->packetTime.tv_sec = now.getSecs(); @@ -659,11 +771,8 @@ void Client::RunTCP () { if (isTcpWriteTimes(mSettings)) { write_start.setnow(); } -#if HAVE_DECL_TCP_NOTSENT_LOWAT - if (isWritePrefetch(mSettings)) { - AwaitWriteSelectEventTCP(); - } -#endif + if (isWritePrefetch(mSettings) && !AwaitSelectWrite()) + continue; reportstruct->packetLen = write(mySocket, mSettings->mBuf, writelen); now.setnow(); reportstruct->writecnt++; @@ -676,28 +785,33 @@ void Client::RunTCP () { } if (reportstruct->packetLen <= 0) { if (reportstruct->packetLen == 0) { + reportstruct->err_readwrite=WriteErrFatal; peerclose = true; } else if (NONFATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrAccount; + reportstruct->err_readwrite=WriteErrAccount; } else if (FATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrFatal; - WARN_errno(1, "tcp write"); + reportstruct->err_readwrite=WriteErrFatal; + now.setnow(); + char warnbuf[WARNBUFSIZE]; + snprintf(warnbuf, sizeof(warnbuf), "%stcp write (%ld.%ld)", mSettings->mTransferIDStr, now.getSecs(), now.getUsecs()); + warnbuf[sizeof(warnbuf)-1] = '\0'; + WARN(1, warnbuf); break; } else { - reportstruct->errwrite=WriteErrNoAccount; + reportstruct->err_readwrite=WriteNoAccount; } reportstruct->packetLen = 0; - reportstruct->emptyreport = 1; + reportstruct->emptyreport = true; } else { - reportstruct->emptyreport = 0; + reportstruct->emptyreport = false; totLen += reportstruct->packetLen; - reportstruct->errwrite=WriteNoErr; + reportstruct->err_readwrite=WriteSuccess; if (isburst) { burst_remaining -= reportstruct->packetLen; if (burst_remaining > 0) { - reportstruct->transit_ready = 0; + reportstruct->transit_ready = false; } else { - reportstruct->transit_ready = 1; + reportstruct->transit_ready = true; reportstruct->prevSentTime = myReport->info.ts.prevsendTime; } } @@ -711,7 +825,20 @@ void Client::RunTCP () { } } if (!one_report) { +#if (HAVE_DECL_SO_MAX_PACING_RATE) + if (isFQPacing(mSettings)) + reportstruct->FQPacingRate = mSettings->mFQPacingRateCurrent; +#endif myReportPacket(); +#if (HAVE_DECL_SO_MAX_PACING_RATE) + if (isFQPacingStep(mSettings) && PacingStepTime.before(now)) { + mSettings->mFQPacingRateCurrent += mSettings->mFQPacingRateStep; + setsockopt(mSettings->mSock, SOL_SOCKET, SO_MAX_PACING_RATE, &mSettings->mFQPacingRateCurrent, sizeof(mSettings->mFQPacingRateCurrent)); + PacingStepTime.add(mSettings->mFQPacingRateStepInterval); + socklen_t len = sizeof(reportstruct->FQPacingRate); + getsockopt(mSettings->mSock, SOL_SOCKET, SO_MAX_PACING_RATE, &reportstruct->FQPacingRate, &len); + } +#endif } } FinishTrafficActions(); @@ -761,26 +888,26 @@ void Client::RunNearCongestionTCP () { reportstruct->packetTime.tv_usec = now.getUsecs(); reportstruct->sentTime = reportstruct->packetTime; ReportNow: - reportstruct->transit_ready = 0; + reportstruct->transit_ready = false; if (reportstruct->packetLen < 0) { if (NONFATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrAccount; + reportstruct->err_readwrite=WriteErrAccount; } else if (FATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrFatal; + reportstruct->err_readwrite=WriteErrFatal; WARN_errno(1, "tcp write"); break; } else { - reportstruct->errwrite=WriteErrNoAccount; + reportstruct->err_readwrite=WriteNoAccount; } reportstruct->packetLen = 0; - reportstruct->emptyreport = 1; + reportstruct->emptyreport = true; } else { - reportstruct->emptyreport = 0; + reportstruct->emptyreport = false; totLen += reportstruct->packetLen; - reportstruct->errwrite=WriteNoErr; + reportstruct->err_readwrite=WriteSuccess; burst_remaining -= reportstruct->packetLen; if (burst_remaining <= 0) { - reportstruct->transit_ready = 1; + reportstruct->transit_ready = true; } } if (isModeAmount(mSettings) && !reportstruct->emptyreport) { @@ -814,8 +941,7 @@ void Client::RunNearCongestionTCP () { void Client::RunRateLimitedTCP () { double tokens = 0; Timestamp time1, time2; - int burst_size = mSettings->mBufLen; - int burst_remaining = 0; + int write_remaining = 0; int burst_id = 1; long var_rate = mSettings->mAppRate; @@ -848,61 +974,79 @@ void Client::RunRateLimitedTCP () { // perform write int len = 0; int len2 = 0; - if (burst_remaining == 0) { - burst_remaining = mSettings->mBufLen; + + if (isburst && !(write_remaining > 0)) { + if (isWritePrefetch(mSettings) && !AwaitSelectWrite()) + continue; + write_remaining = mSettings->mBufLen; + // check for TCP minimum payload + if (write_remaining < static_cast(sizeof(struct TCP_burst_payload))) + write_remaining = static_cast(sizeof(struct TCP_burst_payload)); now.setnow(); reportstruct->packetTime.tv_sec = now.getSecs(); reportstruct->packetTime.tv_usec = now.getUsecs(); reportstruct->sentTime = reportstruct->packetTime; - WriteTcpTxHdr(reportstruct, burst_size, burst_id++); + WriteTcpTxHdr(reportstruct, write_remaining, burst_id++); // perform write - if (isTripTime(mSettings)) { - len = writen(mySocket, mSettings->mBuf, mSettings->mBufLen, &reportstruct->writecnt); - WARN(len != mSettings->mBufLen, "burst write failed"); - } else { - len = writen(mySocket, mSettings->mBuf, sizeof(struct TCP_burst_payload), &reportstruct->writecnt); - WARN(len != sizeof(struct TCP_burst_payload), "burst hdr write failed"); + // perform write, full header must succeed + if (isTcpWriteTimes(mSettings)) { + write_start.setnow(); + } + len = writen(mySocket, mSettings->mBuf, write_remaining, &reportstruct->writecnt); + WARN((len < static_cast (sizeof(struct TCP_burst_payload))), "burst hdr write failed"); + if (isTcpWriteTimes(mSettings)) { + now.setnow(); + reportstruct->write_time = now.subUsec(write_start); } if (len < 0) { len = 0; if (NONFATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrAccount; + reportstruct->err_readwrite=WriteErrAccount; } else if (FATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrFatal; + reportstruct->err_readwrite=WriteErrFatal; WARN_errno(1, "write"); fatalwrite_err = 1; break; } else { - reportstruct->errwrite=WriteErrNoAccount; + reportstruct->err_readwrite=WriteNoAccount; } } else { - burst_remaining -= len; + write_remaining -= len; } // thread_debug("***write burst header %d id=%d", burst_size, (burst_id - 1)); - } else if (reportstruct->packetLen > burst_remaining) { - reportstruct->packetLen = burst_remaining; + } else { + write_remaining = mSettings->mBufLen; } - if (burst_remaining > 0) { - len2 = write(mySocket, mSettings->mBuf, reportstruct->packetLen); + if (write_remaining > 0) { + if (isTcpWriteTimes(mSettings)) { + write_start.setnow(); + } + if (isWritePrefetch(mSettings) && !AwaitSelectWrite()) + continue; + len2 = write(mySocket, mSettings->mBuf, write_remaining); + if (isTcpWriteTimes(mSettings)) { + now.setnow(); + reportstruct->write_time = now.subUsec(write_start); + } reportstruct->writecnt++; } if (len2 < 0) { len2 = 0; if (NONFATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrAccount; + reportstruct->err_readwrite=WriteErrAccount; } else if (FATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrFatal; + reportstruct->err_readwrite=WriteErrFatal; WARN_errno(1, "write"); fatalwrite_err = 1; break; } else { - reportstruct->errwrite=WriteErrNoAccount; + reportstruct->err_readwrite=WriteNoAccount; } } else { // Consume tokens per the transmit tokens -= (len + len2); - totLen += (len + len2);; - reportstruct->errwrite=WriteNoErr; + totLen += (len + len2); + reportstruct->err_readwrite=WriteSuccess; } time2.setnow(); reportstruct->packetLen = len + len2; @@ -922,51 +1066,60 @@ void Client::RunRateLimitedTCP () { } } else { // Use a 4 usec delay to fill tokens -#if HAVE_DECL_TCP_NOTSENT_LOWAT - if (isWritePrefetch(mSettings)) { - AwaitWriteSelectEventTCP(); - } else -#endif - { - delay_loop(4); + delay_loop(4); + if (isWritePrefetch(mSettings) && !AwaitSelectWrite()) { + continue; } } } FinishTrafficActions(); } +inline bool Client::AwaitSelectWrite (void) { #if HAVE_DECL_TCP_NOTSENT_LOWAT -inline bool Client::AwaitWriteSelectEventTCP (void) { - int rc; - struct timeval timeout; - fd_set writeset; - FD_ZERO(&writeset); - FD_SET(mySocket, &writeset); - if (isModeTime(mSettings)) { - Timestamp write_event_timeout(0,0); - if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) { - write_event_timeout.add((double) mSettings->mInterval / 1e6 * 2.0); + do { + int rc; + struct timeval timeout; + fd_set writeset; + FD_ZERO(&writeset); + FD_SET(mySocket, &writeset); + if (isModeTime(mSettings)) { + Timestamp write_event_timeout(0,0); + if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) { + write_event_timeout.add((double) mSettings->mInterval / ((mSettings->mThreads > 1) ? 4e6 : 2e6)); + } else { + write_event_timeout.add((double) mSettings->mAmount / 4e2); + } + timeout.tv_sec = write_event_timeout.getSecs(); + timeout.tv_usec = write_event_timeout.getUsecs(); } else { - write_event_timeout.add((double) mSettings->mAmount / 1e2 * 4.0); + timeout.tv_sec = 1; // longest is 1 seconds + timeout.tv_usec = 0; } - timeout.tv_sec = write_event_timeout.getSecs(); - timeout.tv_usec = write_event_timeout.getUsecs(); - } else { - timeout.tv_sec = 10; // longest is 10 seconds - timeout.tv_usec = 0; - } - if ((rc = select(mySocket + 1, NULL, &writeset, NULL, &timeout)) <= 0) { - WARN_errno((rc < 0), "select"); -#ifdef HAVE_THREAD_DEBUG - if (rc == 0) - thread_debug("AwaitWrite timeout"); + if ((rc = select(mySocket + 1, NULL, &writeset, NULL, &timeout)) <= 0) { + WARN_errno((rc < 0), "select"); + if (rc <= 0) + PostNullEvent(false, true); +#if HAVE_SUMMING_DEBUG + if (rc == 0) { + char warnbuf[WARNBUFSIZE]; + snprintf(warnbuf, sizeof(warnbuf), "%sTimeout: write select", mSettings->mTransferIDStr); + warnbuf[sizeof(warnbuf)-1] = '\0'; + WARN(1, warnbuf); + } #endif - return false; - } + } else { + return true; + } + } while (InProgress()); + return false; +#else return true; +#endif } +#if HAVE_DECL_TCP_NOTSENT_LOWAT void Client::RunWriteEventsTCP () { int burst_id = 0; int writelen = mSettings->mBufLen; @@ -975,18 +1128,28 @@ void Client::RunWriteEventsTCP () { now.setnow(); reportstruct->packetTime.tv_sec = now.getSecs(); reportstruct->packetTime.tv_usec = now.getUsecs(); +#if (HAVE_DECL_SO_MAX_PACING_RATE) + if (isFQPacingStep(mSettings)) { + PacingStepTime = now; + PacingStepTime.add(mSettings->mFQPacingRateStepInterval); + } +#endif while (InProgress()) { +#if HAVE_DECL_TCP_TX_DELAY +// apply_txdelay_func(); +#endif if (isModeAmount(mSettings)) { writelen = ((mSettings->mAmount < static_cast(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen); } now.setnow(); reportstruct->write_time = 0; + reportstruct->writecnt = 0; if (isTcpWriteTimes(mSettings)) { write_start = now; } - bool rc = AwaitWriteSelectEventTCP(); - reportstruct->emptyreport = (rc == false) ? 1 : 0; - if (rc) { + if (isWritePrefetch(mSettings) && !AwaitSelectWrite()) + continue; + if (!reportstruct->emptyreport) { now.setnow(); reportstruct->packetTime.tv_sec = now.getSecs(); reportstruct->packetTime.tv_usec = now.getUsecs(); @@ -999,7 +1162,7 @@ void Client::RunWriteEventsTCP () { peerclose = true; } reportstruct->packetLen = 0; - reportstruct->emptyreport = 1; + reportstruct->emptyreport = true; } else if (isTcpWriteTimes(mSettings)) { write_end.setnow(); reportstruct->write_time = write_end.subUsec(write_start); @@ -1014,7 +1177,20 @@ void Client::RunWriteEventsTCP () { } } if (!one_report) { +#if (HAVE_DECL_SO_MAX_PACING_RATE) + if (isFQPacing(mSettings)) + reportstruct->FQPacingRate = mSettings->mFQPacingRateCurrent; +#endif myReportPacket(); +#if (HAVE_DECL_SO_MAX_PACING_RATE) + if (isFQPacingStep(mSettings) && PacingStepTime.before(now)) { + mSettings->mFQPacingRateCurrent += mSettings->mFQPacingRateStep; + setsockopt(mSettings->mSock, SOL_SOCKET, SO_MAX_PACING_RATE, &mSettings->mFQPacingRateCurrent, sizeof(mSettings->mFQPacingRateCurrent)); + PacingStepTime.add(mSettings->mFQPacingRateStepInterval); + socklen_t len = sizeof(reportstruct->FQPacingRate); + getsockopt(mSettings->mSock, SOL_SOCKET, SO_MAX_PACING_RATE, &reportstruct->FQPacingRate, &len); + } +#endif } } FinishTrafficActions(); @@ -1022,7 +1198,7 @@ void Client::RunWriteEventsTCP () { #endif void Client::RunBounceBackTCP () { int burst_id = 0; - int writelen = mSettings->mBufLen; + int writelen = mSettings->mBounceBackBytes; memset(mSettings->mBuf, 0x5A, sizeof(struct bounceback_hdr)); if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) { int sotimer = static_cast(round(mSettings->mInterval / 2.0)); @@ -1044,11 +1220,12 @@ void Client::RunBounceBackTCP () { reportstruct->packetTime.tv_usec = now.getUsecs(); while (InProgress()) { int n; + long remaining; reportstruct->writecnt = 0; bool isFirst; if (framecounter) { - burst_id = framecounter->wait_tick(&reportstruct->sched_err); - PostNullEvent(true); // this will set the now timestamp + burst_id = framecounter->wait_tick(&reportstruct->sched_err, false); + PostNullEvent(true, false); // now is set in this call reportstruct->sentTime.tv_sec = now.getSecs(); reportstruct->sentTime.tv_usec = now.getUsecs(); isFirst = true; @@ -1057,7 +1234,7 @@ void Client::RunBounceBackTCP () { isFirst = false; } int bb_burst = (mSettings->mBounceBackBurst > 0) ? mSettings->mBounceBackBurst : 1; - while (bb_burst > 0) { + while ((bb_burst > 0) && InProgress() && (!framecounter || (framecounter->get(&remaining)) == (unsigned int) burst_id)) { bb_burst--; if (isFirst) { isFirst = false; @@ -1067,28 +1244,31 @@ void Client::RunBounceBackTCP () { reportstruct->sentTime.tv_usec = now.getUsecs(); } WriteTcpTxBBHdr(reportstruct, burst_id, 0); - reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen, &reportstruct->writecnt); - if (reportstruct->packetLen <= 0) { - if ((reportstruct->packetLen < 0) && FATALTCPWRITERR(errno)) { - reportstruct->errwrite=WriteErrFatal; + int write_offset = 0; + reportstruct->writecnt = 0; + RETRY_WRITE: + n = writen(mySocket, (mSettings->mBuf + write_offset), (writelen - write_offset), &reportstruct->writecnt); + if (n < 0) { + if (FATALTCPWRITERR(errno)) { + reportstruct->err_readwrite=WriteErrFatal; WARN_errno(1, "tcp bounceback write fatal error"); peerclose = true; - } else if (reportstruct->packetLen == 0) { - peerclose = true; - } else if (reportstruct->packetLen != writelen) { - WARN_errno(1, "tcp bounceback writen incomplete"); - peerclose = true; - } else { - // retry the write - bb_burst++; - continue; + break; + } else if (InProgress()) { + PostNullEvent(false,false); + goto RETRY_WRITE; } - break; } - if (reportstruct->packetLen == writelen) { - reportstruct->emptyreport = 0; - totLen += reportstruct->packetLen; - reportstruct->errwrite=WriteNoErr; + write_offset += n; + if ((write_offset < writelen) && InProgress()) { + WARN_errno(1, "tcp bounceback writen incomplete"); + PostNullEvent(false,false); + goto RETRY_WRITE; + } + if (write_offset == writelen) { + reportstruct->emptyreport = false; + totLen += writelen; + reportstruct->err_readwrite=WriteSuccess; #if HAVE_DECL_TCP_QUICKACK if (isTcpQuickAck(mSettings)) { int opt = 1; @@ -1098,43 +1278,50 @@ void Client::RunBounceBackTCP () { WARN_errno(rc == SOCKET_ERROR, "setsockopt TCP_QUICKACK"); } #endif - if ((n = recvn(mySocket, mSettings->mBuf, mSettings->mBounceBackBytes, 0)) == mSettings->mBounceBackBytes) { - struct bounceback_hdr *bbhdr = reinterpret_cast(mSettings->mBuf); - now.setnow(); - reportstruct->sentTimeRX.tv_sec = ntohl(bbhdr->bbserverRx_ts.sec); - reportstruct->sentTimeRX.tv_usec = ntohl(bbhdr->bbserverRx_ts.usec); - reportstruct->sentTimeTX.tv_sec = ntohl(bbhdr->bbserverTx_ts.sec); - reportstruct->sentTimeTX.tv_usec = ntohl(bbhdr->bbserverTx_ts.usec); - reportstruct->packetTime.tv_sec = now.getSecs(); - reportstruct->packetTime.tv_usec = now.getUsecs(); - reportstruct->packetLen += n; - reportstruct->emptyreport = 0; - myReportPacket(); + int read_offset = 0; + RETRY_READ: + n = recvn(mySocket, (mSettings->mBuf + read_offset), (mSettings->mBounceBackReplyBytes - read_offset), 0); + if (n > 0) { + read_offset += n; + if (read_offset == mSettings->mBounceBackReplyBytes) { + struct bounceback_hdr *bbhdr = reinterpret_cast(mSettings->mBuf); + now.setnow(); + reportstruct->sentTimeRX.tv_sec = ntohl(bbhdr->bbserverRx_ts.sec); + reportstruct->sentTimeRX.tv_usec = ntohl(bbhdr->bbserverRx_ts.usec); + reportstruct->sentTimeTX.tv_sec = ntohl(bbhdr->bbserverTx_ts.sec); + reportstruct->sentTimeTX.tv_usec = ntohl(bbhdr->bbserverTx_ts.usec); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->packetLen += n; + reportstruct->emptyreport = false; + reportstruct->packetID = burst_id; + myReportPacket(); + } else if (InProgress()) { + PostNullEvent(false,false); + goto RETRY_READ; + } else { + break; + } } else if (n == 0) { peerclose = true; - } else if (n < 0) { + break; + } else { if (FATALTCPREADERR(errno)) { - WARN_errno(1, "bounceback read"); + FAIL_errno(1, "fatal bounceback read", mSettings); peerclose = true; - n = 0; + break; } else { - WARN(1, "timeout: bounceback read"); + WARN_errno(1, "timeout: bounceback read"); + PostNullEvent(false,false); + if (InProgress()) + goto RETRY_READ; } } - } else if ((reportstruct->packetLen < 0 ) && NONFATALTCPWRITERR(errno)) { - reportstruct->packetLen = 0; - reportstruct->emptyreport = 1; - reportstruct->errwrite=WriteErrNoAccount; - myReportPacket(); - } else { - reportstruct->errwrite=WriteErrFatal; - reportstruct->packetLen = -1; - FAIL_errno(1, "tcp bounce-back write", mSettings); } } } - WriteTcpTxBBHdr(reportstruct, 0x0, 1); - disarm_itimer(); + if (!peerclose) + WriteTcpTxBBHdr(reportstruct, 0x0, 1); // Signal end of BB test FinishTrafficActions(); } /* @@ -1148,10 +1335,11 @@ double Client::get_delay_target () { // compute delay target in units of nanoseconds if (mSettings->mAppRateUnits == kRate_BW) { // compute delay for bandwidth restriction, constrained to [0,1] seconds - delay_target = (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits) - / mSettings->mAppRate)); + delay_target = ((mSettings->mAppRate > 0) ? \ + (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits) / mSettings->mAppRate)) \ + : 0); } else { - delay_target = 1e9 / mSettings->mAppRate; + delay_target = (mSettings->mAppRate > 0) ? (1e9 / mSettings->mAppRate) : 0; } } return delay_target; @@ -1202,53 +1390,77 @@ void Client::RunUDP () { mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec); mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec); - // Adjustment for the running delay - // o measure how long the last loop iteration took - // o calculate the delay adjust - // - If write succeeded, adjust = target IPG - the loop time - // - If write failed, adjust = the loop time - // o then adjust the overall running delay - // Note: adjust units are nanoseconds, - // packet timestamps are microseconds - if (currLen > 0) - adjust = delay_target + \ - (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime)); - else - adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime); - - lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec); - // Since linux nanosleep/busyloop can exceed delay - // there are two possible equilibriums - // 1) Try to perserve inter packet gap - // 2) Try to perserve requested transmit rate - // The latter seems preferred, hence use a running delay - // that spans the life of the thread and constantly adjust. - // A negative delay means the iperf app is behind. - delay += adjust; - // Don't let delay grow unbounded - if (delay < delay_lower_bounds) { - delay = delay_target; - } + if (delay_target > 0) { + // Adjustment for the running delay + // o measure how long the last loop iteration took + // o calculate the delay adjust + // - If write succeeded, adjust = target IPG - the loop time + // - If write failed, adjust = the loop time + // o then adjust the overall running delay + // Note: adjust units are nanoseconds, + // packet timestamps are microseconds + if (currLen > 0) + adjust = delay_target + \ + (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime)); + else + adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime); - reportstruct->errwrite = WriteNoErr; - reportstruct->emptyreport = 0; + lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec); + // Since linux nanosleep/busyloop can exceed delay + // there are two possible equilibriums + // 1) Try to perserve inter packet gap + // 2) Try to perserve requested transmit rate + // The latter seems preferred, hence use a running delay + // that spans the life of the thread and constantly adjust. + // A negative delay means the iperf app is behind. + delay += adjust; + // Don't let delay grow unbounded + if (delay < delay_lower_bounds) { + delay = delay_target; + } + } + reportstruct->err_readwrite = WriteSuccess; + reportstruct->emptyreport = false; // perform write if (isModeAmount(mSettings)) { currLen = write(mySocket, mSettings->mBuf, (mSettings->mAmount < static_cast(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen); } else { + currLen = -1; +#if (HAVE_USE_WRITE_SELECT) && (HAVE_SELECT) +#if HAVE_DECL_MSG_DONTWAIT + currLen = send(mySocket, mSettings->mBuf, mSettings->mBufLen, MSG_DONTWAIT); + if ((currLen < 0) && !FATALUDPWRITERR(errno)) { + if (AwaitSelectWrite()) { + currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen); + reportstruct->err_readwrite = WriteSelectRetry; + } else { + reportstruct->err_readwrite = WriteTimeo; + } + } +#else + if (AwaitSelectWrite()) { + currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen); + } else { + reportstruct->err_readwrite = WriteTimeo; + } +#endif +#else currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen); +#endif } if (currLen < 0) { reportstruct->packetID--; if (FATALUDPWRITERR(errno)) { - reportstruct->errwrite = WriteErrFatal; + reportstruct->err_readwrite = WriteErrFatal; WARN_errno(1, "write"); + currLen = 0; break; } else { - reportstruct->errwrite = WriteErrAccount; + //WARN_errno(1, "write n"); currLen = 0; + reportstruct->err_readwrite = WriteErrAccount; } - reportstruct->emptyreport = 1; + reportstruct->emptyreport = true; } if (isModeAmount(mSettings)) { @@ -1310,7 +1522,7 @@ void Client::RunUDPIsochronous () { udp_payload->isoch.burstsize = htonl(bytecnt); udp_payload->isoch.prevframeid = htonl(frameid); reportstruct->burstsize=bytecnt; - frameid = framecounter->wait_tick(&reportstruct->sched_err); + frameid = framecounter->wait_tick(&reportstruct->sched_err, true); reportstruct->scheduled = true; udp_payload->isoch.frameid = htonl(frameid); lastPacketTime.setnow(); @@ -1356,8 +1568,8 @@ void Client::RunUDPIsochronous () { // delay = delay_target; // } - reportstruct->errwrite = WriteNoErr; - reportstruct->emptyreport = 0; + reportstruct->err_readwrite = WriteSuccess; + reportstruct->emptyreport = false; reportstruct->writecnt = 1; // perform write @@ -1373,21 +1585,22 @@ void Client::RunUDPIsochronous () { if (currLen < 0) { reportstruct->packetID--; - reportstruct->emptyreport = 1; - currLen = 0; + reportstruct->emptyreport = true; if (FATALUDPWRITERR(errno)) { - reportstruct->errwrite = WriteErrFatal; + reportstruct->err_readwrite = WriteErrFatal; WARN_errno(1, "write"); fatalwrite_err = 1; - } else { - reportstruct->errwrite = WriteErrAccount; + currLen = 0; + } else { + currLen = 0; + reportstruct->err_readwrite = WriteErrAccount; } } else { bytecnt -= currLen; if (!bytecnt) - reportstruct->transit_ready = 1; + reportstruct->transit_ready = true; else - reportstruct->transit_ready = 0; + reportstruct->transit_ready = false; // adjust bytecnt so last packet of burst is greater or equal to min packet if ((bytecnt > 0) && (bytecnt < udp_payload_minimum)) { bytecnt = udp_payload_minimum; @@ -1504,8 +1717,11 @@ void Client::WriteTcpTxBBHdr (struct ReportStruct *reportstruct, uint32_t bbid, if (final) { bbflags |= HEADER_BBSTOP; } + if ((mSettings->mBounceBackReplyBytes > 0) && (mSettings->mBounceBackReplyBytes != mSettings->mBounceBackBytes)) { + bbflags |= HEADER_BBREPLYSIZE; + } mBuf_bb->bbflags = htons(bbflags); - mBuf_bb->bbsize = htonl(mSettings->mBufLen); + mBuf_bb->bbsize = htonl(mSettings->mBounceBackBytes); mBuf_bb->bbid = htonl(bbid); mBuf_bb->bbclientTx_ts.sec = htonl(reportstruct->packetTime.tv_sec); mBuf_bb->bbclientTx_ts.usec = htonl(reportstruct->packetTime.tv_usec); @@ -1514,6 +1730,7 @@ void Client::WriteTcpTxBBHdr (struct ReportStruct *reportstruct, uint32_t bbid, mBuf_bb->bbserverTx_ts.sec = -1; mBuf_bb->bbserverTx_ts.usec = -1; mBuf_bb->bbhold = htonl(mSettings->mBounceBackHold); + mBuf_bb->bbreplysize = htonl(mSettings->mBounceBackReplyBytes); } inline bool Client::InProgress (void) { @@ -1535,7 +1752,10 @@ inline void Client::tcp_shutdown (void) { #ifdef HAVE_THREAD_DEBUG thread_debug("Client calls shutdown() SHUTW_WR on tcp socket %d", mySocket); #endif - WARN_errno(rc == SOCKET_ERROR, "shutdown"); + char warnbuf[256]; + snprintf(warnbuf, sizeof(warnbuf), "%sshutdown", mSettings->mTransferIDStr); + warnbuf[sizeof(warnbuf)-1] = '\0'; + WARN_errno(rc == SOCKET_ERROR, warnbuf); if (!rc && !isFullDuplex(mSettings)) AwaitServerCloseEvent(); } @@ -1557,7 +1777,9 @@ void Client::FinishTrafficActions () { disarm_itimer(); // Shutdown the TCP socket's writes as the event for the server to end its traffic loop if (!isUDP(mSettings)) { - tcp_shutdown(); + if (!isIgnoreShutdown(mSettings)) { + tcp_shutdown(); + } now.setnow(); reportstruct->packetTime.tv_sec = now.getSecs(); reportstruct->packetTime.tv_usec = now.getUsecs(); @@ -1568,6 +1790,8 @@ void Client::FinishTrafficActions () { * */ reportstruct->packetLen = totLen; + reportstruct->err_readwrite = WriteSuccess; + myReportPacket(); } } else { // stop timing @@ -1593,7 +1817,7 @@ void Client::FinishTrafficActions () { } reportstruct->packetLen = 0; } - int do_close = EndJob(myJob, reportstruct); + bool do_close = EndJob(myJob, reportstruct); if (isIsochronous(mSettings) && (myReport->info.schedule_error.cnt > 2)) { fprintf(stderr,"%sIsoch schedule errors (mean/min/max/stdev) = %0.3f/%0.3f/%0.3f/%0.3f ms\n",mSettings->mTransferIDStr, \ ((myReport->info.schedule_error.sum / myReport->info.schedule_error.cnt) * 1e-3), (myReport->info.schedule_error.min * 1e-3), \ @@ -1632,6 +1856,7 @@ void Client::AwaitServerFinPacket () { struct timeval timeout; int ack_success = 0; int count = RETRYCOUNT; + static int read_warn_rate_limiter = 3; // rate limit read warn msgs while (--count >= 0) { // wait until the socket is readable, or our timeout expires FD_ZERO(&readSet); @@ -1664,8 +1889,16 @@ void Client::AwaitServerFinPacket () { continue; } } - - WARN_errno(rc < 0, "read"); + // only warn when threads is small, too many warnings are too much outputs + if (rc < 0 && (--read_warn_rate_limiter > 0)) { + int len = snprintf(NULL, 0, "%sRead UDP fin", mSettings->mTransferIDStr); + char *tmpbuf = (char *)calloc(1, len + 2); + if (tmpbuf) { + len = snprintf(tmpbuf, len + 1, "%sRead UDP fin", mSettings->mTransferIDStr); + WARN_errno(1, tmpbuf); + free(tmpbuf); + } + } if (rc > 0) { ack_success = 1; #ifdef HAVE_THREAD_DEBUG @@ -1682,32 +1915,23 @@ void Client::AwaitServerFinPacket () { fprintf(stderr, warn_no_ack, mySocket, (isModeTime(mSettings) ? 10 : 1)); } - -void Client::PostNullEvent (void) { +// isFirst indicates first event occurred per wait_tick +void Client::PostNullEvent (bool isFirst, bool select_retry) { assert(myReport!=NULL); // push a nonevent into the packet ring // this will cause the reporter to process // up to this event - memset(reportstruct, 0, sizeof(struct ReportStruct)); + struct ReportStruct report_nopacket; + memset(&report_nopacket, 0, sizeof(struct ReportStruct)); now.setnow(); - reportstruct->packetTime.tv_sec = now.getSecs(); - reportstruct->packetTime.tv_usec = now.getUsecs(); - reportstruct->emptyreport=1; - myReportPacket(); -} - -void Client::PostNullEvent (bool isFirst) { - assert(myReport!=NULL); - // push a nonevent into the packet ring - // this will cause the reporter to process - // up to this event - memset(reportstruct, 0, sizeof(struct ReportStruct)); - now.setnow(); - reportstruct->packetTime.tv_sec = now.getSecs(); - reportstruct->packetTime.tv_usec = now.getUsecs(); - reportstruct->emptyreport=1; - reportstruct->scheduled = isFirst; - myReportPacket(); + report_nopacket.packetTime.tv_sec = now.getSecs(); + report_nopacket.packetTime.tv_usec = now.getUsecs(); + report_nopacket.emptyreport = true; + report_nopacket.scheduled = isFirst; + report_nopacket.packetID = 0; + report_nopacket.err_readwrite = (select_retry ? WriteSelectRetry : WriteNoAccount); + reportstruct->packetTime = report_nopacket.packetTime; // needed for the InProgress loop test + myReportPacket(&report_nopacket); } // The client end timer is based upon the final fin, fin-ack w/the server @@ -1717,7 +1941,7 @@ void Client::PostNullEvent (bool isFirst) { #define MINAWAITCLOSEUSECS 2000000 void Client::AwaitServerCloseEvent () { // the await detection can take awhile so post a non event ahead of it - PostNullEvent(); + PostNullEvent(false,false); unsigned int amount_usec = \ (isModeTime(mSettings) ? static_cast(mSettings->mAmount * 10000) : MINAWAITCLOSEUSECS); if (amount_usec < MINAWAITCLOSEUSECS) @@ -1739,6 +1963,9 @@ void Client::AwaitServerCloseEvent () { int Client::SendFirstPayload () { int pktlen = 0; if (!isConnectOnly(mSettings)) { + if (isUDP(mSettings) && (mSettings->sendfirst_pacing > 0)) { + delay_loop(mSettings->sendfirst_pacing); + } if (myReport && !TimeZero(myReport->info.ts.startTime) && !(mSettings->mMode == kTest_TradeOff)) { reportstruct->packetTime = myReport->info.ts.startTime; } else { @@ -1746,7 +1973,7 @@ int Client::SendFirstPayload () { reportstruct->packetTime.tv_sec = now.getSecs(); reportstruct->packetTime.tv_usec = now.getUsecs(); } - pattern(mSettings->mBuf, mSettings->mBufLen); + if (isTxStartTime(mSettings)) { pktlen = Settings_GenerateClientHdr(mSettings, (void *) mSettings->mBuf, mSettings->txstart_epoch); } else { @@ -1766,6 +1993,9 @@ int Client::SendFirstPayload () { #endif apply_first_udppkt_delay = true; } else { + // Set the send timeout for the very first write which has the test exchange + int sosndtimer = DEFAULT_TESTEXCHANGETIMEOUT; //in usecs + SetSocketOptionsSendTimeout(mSettings, sosndtimer); #if HAVE_DECL_TCP_NODELAY if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) { int optflag=1; @@ -1781,9 +2011,19 @@ int Client::SendFirstPayload () { #else pktlen = send(mySocket, mSettings->mBuf, pktlen, 0); #endif + SetSocketOptionsSendTimeout(mSettings, mSettings->sosndtimer); if (isPeerVerDetect(mSettings) && !isServerReverse(mSettings)) { PeerXchange(); } + if (!isFileInput(mSettings)) { + int buflen = (mSettings->mBufLen < (int) sizeof(struct client_tcp_testhdr)) ? mSettings->mBufLen \ + : sizeof(struct client_tcp_testhdr); + if (isTripTime(mSettings)) { + memset(mSettings->mBuf, 0xA5, buflen); + } else { + pattern(mSettings->mBuf, buflen); // reset the pattern in the payload for future writes + } + } #if HAVE_DECL_TCP_NODELAY if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) { int optflag=0; @@ -1833,22 +2073,15 @@ void Client::PeerXchange () { /* * BarrierClient allows for multiple stream clients to be syncronized */ -int Client::BarrierClient (struct BarrierMutex *barrier) { - int last = 0; +#define BARRIER_MIN 100 // units is seconds to inform the server +bool Client::BarrierClient (struct BarrierMutex *barrier) { + bool last = false; #ifdef HAVE_THREAD assert(barrier != NULL); + Timestamp now; Condition_Lock(barrier->await); if (--barrier->count <= 0) { - // store the barrier release timer -#ifdef HAVE_CLOCK_GETTIME - struct timespec t1; - clock_gettime(CLOCK_REALTIME, &t1); - barrier->release_time.tv_sec = t1.tv_sec; - barrier->release_time.tv_usec = t1.tv_nsec / 1000; -#else - gettimeofday(&barrier->release_time, NULL); -#endif - last = 1; + last = true; // last one wake's up everyone else Condition_Broadcast(&barrier->await); #ifdef HAVE_THREAD_DEBUG @@ -1858,14 +2091,25 @@ int Client::BarrierClient (struct BarrierMutex *barrier) { #ifdef HAVE_THREAD_DEBUG thread_debug("Barrier WAIT on condition %p count=%d", (void *)&barrier->await, barrier->count); #endif - Condition_Wait(&barrier->await); + if (isModeTime(mSettings)) { + int barrier_wait_secs = int(mSettings->mAmount / 100); // convert from 10 ms to seconds + if (barrier_wait_secs <= 0) + barrier_wait_secs = 1; + Condition_TimedWait(&barrier->await, barrier_wait_secs); + } else { + Condition_Wait(&barrier->await); + } } Condition_Unlock(barrier->await); #ifdef HAVE_THREAD_DEBUG thread_debug("Barrier EXIT on condition %p", (void *)&barrier->await); #endif + mSettings->barrier_time = now.delta_usec(); + if (mSettings->barrier_time < BARRIER_MIN) { + mSettings->barrier_time = 0; + } #else - last = 1; + last = true; #endif // HAVE_THREAD return last; } -- cgit v1.2.3