summaryrefslogtreecommitdiffstats
path: root/src/Client.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/Client.cpp')
-rw-r--r--src/Client.cpp798
1 files changed, 521 insertions, 277 deletions
diff --git a/src/Client.cpp b/src/Client.cpp
index 89e47e2..d56ee1c 100644
--- a/src/Client.cpp
+++ b/src/Client.cpp
@@ -67,6 +67,7 @@
#include "payloads.h"
#include "active_hosts.h"
#include "gettcpinfo.h"
+#include "iperf_formattime.h"
// const double kSecs_to_usecs = 1e6;
const double kSecs_to_nsecs = 1e9;
@@ -74,6 +75,7 @@ const int kBytes_to_Bits = 8;
#define VARYLOAD_PERIOD 0.1 // recompute the variable load every n seconds
#define MAXUDPBUF 1470
+#define TCPDELAYDEFAULTQUANTUM 4000 // units usecs
Client::Client (thread_Settings *inSettings) {
#ifdef HAVE_THREAD_DEBUG
@@ -136,13 +138,7 @@ Client::~Client () {
void Client::mySockInit (void) {
// create an internet socket
int type = (isUDP(mSettings) ? SOCK_DGRAM : SOCK_STREAM);
- int domain = (SockAddr_isIPv6(&mSettings->peer) ?
-#if HAVE_IPV6
- AF_INET6
-#else
- AF_INET
-#endif
- : AF_INET);
+ int domain = SockAddr_getAFdomain(&mSettings->peer);
mySocket = socket(domain, type, 0);
WARN_errno(mySocket == INVALID_SOCKET, "socket");
@@ -151,20 +147,23 @@ void Client::mySockInit (void) {
SetSocketOptions(mSettings);
SockAddr_localAddr(mSettings);
SockAddr_remoteAddr(mSettings);
- if (mSettings->mLocalhost != NULL) {
- // bind socket to local address
- int rc = bind(mySocket, reinterpret_cast<sockaddr*>(&mSettings->local),
- SockAddr_get_sizeof_sockaddr(&mSettings->local));
- WARN_errno(rc == SOCKET_ERROR, "bind");
+ // do this bind to device after IP addr name lookups per above
+ SetSocketBindToDeviceIfNeeded(mSettings);
+ if (mSettings->mLocalhost != NULL) { // bind src ip if needed
+ // bind socket to local address
+ int rc = bind(mSettings->mSock, (struct sockaddr *)(&mSettings->local), \
+ SockAddr_get_sizeof_sockaddr(&mSettings->local));
+ WARN_errno(rc == SOCKET_ERROR, "bind");
}
- mysock_init_done = true;
- if (!isUDP(mSettings) && isReport(mSettings) && isSettingsReport(mSettings)) {
+ if (isSettingsReport(mSettings)) {
struct ReportHeader *tmp = InitSettingsReport(mSettings);
assert(tmp!=NULL);
PostReport(tmp);
- setNoSettReport(mSettings);
}
+ mysock_init_done = true;
}
+
+
/* -------------------------------------------------------------------
* Setup a socket connected to a server.
* If inLocalhost is not null, bind to that address, specifying
@@ -177,29 +176,69 @@ bool Client::my_connect (bool close_on_fail) {
}
// connect socket
connected = false;
+ int connect_errno = 0;
mSettings->tcpinitstats.connecttime = -1;
if (!isUDP(mSettings)) {
- int trycnt = mSettings->mConnectRetries + 1;
- while (trycnt > 0) {
+ Timestamp end_connect_retry;
+ end_connect_retry.add(mSettings->connect_retry_time);
+ do {
connect_start.setnow();
rc = connect(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer),
SockAddr_get_sizeof_sockaddr(&mSettings->peer));
- WARN_errno((rc == SOCKET_ERROR), "tcp connect");
+ connect_done.setnow();
if (rc == SOCKET_ERROR) {
- if ((--trycnt) <= 0) {
- if (close_on_fail) {
- close(mySocket);
- mySocket = INVALID_SOCKET;
- }
- } else {
- delay_loop(200000);
+ char timestr[120];
+ char tmpaddr[200];
+ char errtext[50];
+ connect_errno = errno_decode(errtext, sizeof(errtext));
+ unsigned short port = SockAddr_getPort(&mSettings->peer);
+ SockAddr_getHostAddress(&mSettings->peer, tmpaddr, sizeof(tmpaddr));
+ struct timeval t;
+ t.tv_sec = connect_done.getSecs();
+ t.tv_usec = connect_done.getUsecs();
+ iperf_formattime(timestr, sizeof(timestr), t, isEnhanced(mSettings), isUTC(mSettings), YearThruSecTZ);
+ int slen = snprintf(NULL, 0, "%stcp connect to %s port %d failed (%s) on %s", \
+ mSettings->mTransferIDStr, tmpaddr, port, errtext, timestr);
+ char *text = (char *) calloc((slen+1), sizeof(char));
+ if (text) {
+ snprintf(text, (slen+1), "%stcp connect to %s port %d failed (%s) on %s", \
+ mSettings->mTransferIDStr, tmpaddr, port, errtext, timestr);
+ PostReport(InitStringReport(text));
+ FREE_ARRAY(text);
+ }
+ bool need_open = false;
+ if (close_on_fail || FATALTCPCONNECTERR(errno)) { // MAC OS kicks out invalid argument at times, not sure why
+ close(mySocket);
+ mySockInit();
+ delay_loop(10000); // delay the minimum of 10ms
+ need_open = true;
+ }
+ if (!need_open && connect_done.before(end_connect_retry)) {
+ int delay = mSettings->connect_retry_timer - (connect_done.subUsec(connect_start));
+ delay_loop((delay < 10000) ? 10000 : delay); // minimum of 10ms
}
} else {
- connect_done.setnow();
mSettings->tcpinitstats.connecttime = 1e3 * connect_done.subSec(connect_start);
connected = true;
break;
}
+ } while (connect_done.before(end_connect_retry));
+ if (!connected && (mSettings->connect_retry_time > 0)) {
+ char timestr[120];
+ struct timeval t;
+ t.tv_sec = end_connect_retry.getSecs();
+ t.tv_usec = end_connect_retry.getUsecs();
+ iperf_formattime(timestr, sizeof(timestr), t, isEnhanced(mSettings), isUTC(mSettings), YearThruSecTZ);
+ int len = snprintf(NULL, 0, "%stcp connect attempt timer expired on %s\n", \
+ mSettings->mTransferIDStr, timestr);
+ char *text = (char *) calloc(len+1, sizeof(char));
+ if (text) {
+ snprintf(text, len, "%stcp connect attempt timer expried on %s\n", \
+ mSettings->mTransferIDStr, timestr);
+ PostReport(InitStringReport(text));
+ FREE_ARRAY(text);
+ return false;
+ }
}
} else {
rc = connect(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer),
@@ -217,8 +256,6 @@ bool Client::my_connect (bool close_on_fail) {
}
#endif
// Set the send timeout for the very first write which has the test exchange
- int sosndtimer = TESTEXCHANGETIMEOUT; // 4 sec in usecs
- SetSocketOptionsSendTimeout(mSettings, sosndtimer);
getsockname(mySocket, reinterpret_cast<sockaddr*>(&mSettings->local), &mSettings->size_local);
getpeername(mySocket, reinterpret_cast<sockaddr*>(&mSettings->peer), &mSettings->size_peer);
SockAddr_Ifrname(mSettings);
@@ -230,12 +267,6 @@ bool Client::my_connect (bool close_on_fail) {
if (isUDP(mSettings) && !isIsochronous(mSettings) && !isIPG(mSettings)) {
mSettings->mBurstIPG = get_delay_target() / 1e3; // this is being set for the settings report only
}
- if (isReport(mSettings) && isSettingsReport(mSettings)) {
- struct ReportHeader *tmp = InitSettingsReport(mSettings);
- assert(tmp!=NULL);
- PostReport(tmp);
- setNoSettReport(mSettings);
- }
} else {
if (mySocket != INVALID_SOCKET) {
int rc = close(mySocket);
@@ -252,7 +283,7 @@ bool Client::my_connect (bool close_on_fail) {
cr->connect_timestamp.tv_usec = connect_start.getUsecs();
assert(reporthdr);
PostReport(reporthdr);
- } else {
+ } else if (!connect_errno) {
PostReport(InitConnectionReport(mSettings));
}
}
@@ -277,6 +308,10 @@ inline void Client::myReportPacket (void) {
reportstruct->packetLen = 0;
}
+inline void Client::myReportPacket (struct ReportStruct *reportptr) {
+ ReportPacket(myReport, reportptr);
+}
+
// There are multiple startup synchronizations, this code
// handles them all. The caller decides to apply them
@@ -303,8 +338,8 @@ int Client::StartSynch () {
// o Second is a holdback, a relative amount of seconds between the connect and data xfers
// check for an epoch based start time
reportstruct->packetLen = 0;
- if (!isServerReverse(mSettings)) {
- if (!isCompat(mSettings) && !isBounceBack(mSettings)) {
+ if (!isServerReverse(mSettings) && !isCompat(mSettings)) {
+ if (!isBounceBack(mSettings)) {
reportstruct->packetLen = SendFirstPayload();
// Reverse UDP tests need to retry "first sends" a few times
// before going to server or read mode
@@ -336,14 +371,18 @@ int Client::StartSynch () {
} else if (isTripTime(mSettings) || isPeriodicBurst(mSettings)) {
reportstruct->packetLen = SendFirstPayload();
}
- if (isIsochronous(mSettings) || isPeriodicBurst(mSettings)) {
+ if (isIsochronous(mSettings) || isPeriodicBurst(mSettings) || isBounceBack(mSettings)) {
Timestamp tmp;
- tmp.set(mSettings->txstart_epoch.tv_sec, mSettings->txstart_epoch.tv_usec);
- framecounter = new Isochronous::FrameCounter(mSettings->mFPS, tmp);
- // set the mbuf valid for burst period ahead of time. The same value will be set for all burst writes
- if (!isUDP(mSettings) && framecounter) {
- struct TCP_burst_payload * mBuf_burst = reinterpret_cast<struct TCP_burst_payload *>(mSettings->mBuf);
- mBuf_burst->burst_period_us = htonl(framecounter->period_us());
+ if (isTxStartTime(mSettings)) {
+ tmp.set(mSettings->txstart_epoch.tv_sec, mSettings->txstart_epoch.tv_usec);
+ }
+ if (mSettings->mFPS > 0) {
+ framecounter = new Isochronous::FrameCounter(mSettings->mFPS, tmp);
+ // set the mbuf valid for burst period ahead of time. The same value will be set for all burst writes
+ if (!isUDP(mSettings) && framecounter) {
+ struct TCP_burst_payload * mBuf_burst = reinterpret_cast<struct TCP_burst_payload *>(mSettings->mBuf);
+ mBuf_burst->burst_period_us = htonl(framecounter->period_us());
+ }
}
}
int setfullduplexflag = 0;
@@ -355,12 +394,12 @@ int Client::StartSynch () {
SetReportStartTime();
#if HAVE_TCP_STATS
if (!isUDP(mSettings)) {
- // Near congestion and peridiodic need sampling on every report packet
+ // Near congestion and periodic need sampling on every report packet
if (isNearCongest(mSettings) || isPeriodicBurst(mSettings)) {
myReport->info.isEnableTcpInfo = true;
myReport->info.ts.nextTCPSampleTime.tv_sec = 0;
myReport->info.ts.nextTCPSampleTime.tv_usec = 0;
- } else if (isEnhanced(mSettings) || isBounceBack(mSettings)) {
+ } else if (isEnhanced(mSettings) || isBounceBack(mSettings) || isFQPacing(mSettings)) {
myReport->info.isEnableTcpInfo = true;
myReport->info.ts.nextTCPSampleTime = myReport->info.ts.nextTime;
}
@@ -368,6 +407,7 @@ int Client::StartSynch () {
#endif
if (reportstruct->packetLen > 0) {
+ reportstruct->err_readwrite = WriteSuccess;
reportstruct->packetTime = myReport->info.ts.startTime;
reportstruct->sentTime = reportstruct->packetTime;
reportstruct->prevSentTime = reportstruct->packetTime;
@@ -406,6 +446,9 @@ inline void Client::SetFullDuplexReportStartTime () {
inline void Client::SetReportStartTime () {
assert(myReport!=NULL);
now.setnow();
+ if (isUDP(mSettings) && (mSettings->sendfirst_pacing > 0)) {
+ now.add(static_cast<unsigned int>(mSettings->sendfirst_pacing));
+ }
myReport->info.ts.startTime.tv_sec = now.getSecs();
myReport->info.ts.startTime.tv_usec = now.getUsecs();
myReport->info.ts.IPGstart = myReport->info.ts.startTime;
@@ -491,11 +534,27 @@ void Client::InitTrafficLoop () {
if (isPeriodicBurst(mSettings) && (mSettings->mFPS > 0.0)) {
sosndtimer = static_cast<int>(round(250000.0 / mSettings->mFPS));
} else if (mSettings->mInterval > 0) {
- sosndtimer = static_cast<int>(round(0.5 * mSettings->mInterval));
+ sosndtimer = static_cast<int>(round(((mSettings->mThreads > 1) ? 0.25 : 0.5) * mSettings->mInterval));
} else {
sosndtimer = static_cast<int>(mSettings->mAmount * 5e3);
}
- SetSocketOptionsSendTimeout(mSettings, sosndtimer);
+ // set to 1 second for wraps or too large
+ if ((sosndtimer < 0) || (sosndtimer > 1000000)) {
+ sosndtimer = 1000000;
+ }
+ if (sosndtimer < 1000) {
+ sosndtimer = 1000; //lower bound of 1 ms
+ }
+ if ((mSettings->mInterval > 0) && (mSettings->mIntervalMode == kInterval_Time)) {
+ int interval_half = static_cast<int>(round(mSettings->mAmount * 10000) / 2);
+ if (sosndtimer > interval_half) {
+ sosndtimer = interval_half;
+ }
+ }
+ if (!isUDP(mSettings)) {
+ mSettings->sosndtimer = sosndtimer;
+ SetSocketOptionsSendTimeout(mSettings, sosndtimer);
+ }
// set the lower bounds delay based of the socket timeout timer
// units needs to be in nanoseconds
delay_lower_bounds = static_cast<double>(sosndtimer) * -1e3;
@@ -510,10 +569,18 @@ void Client::InitTrafficLoop () {
mEndTime.add(mSettings->mAmount / 100.0);
// now.setnow(); fprintf(stderr, "DEBUG: end time set to %ld.%ld now is %ld.%ld\n", mEndTime.getSecs(), mEndTime.getUsecs(), now.getSecs(), now.getUsecs());
}
+#if HAVE_DECL_TCP_TX_DELAY
+ current_state = NO_DELAY;
+ if (isTcpTxDelay(mSettings)) {
+ state_tokens[NO_DELAY] = (int) (mSettings->mTcpTxDelayMean * (1 - mSettings->mTcpTxDelayProb));
+ state_tokens[ADD_DELAY] = (int) (mSettings->mTcpTxDelayMean * mSettings->mTcpTxDelayProb);
+ TcpTxDelayQuantumEnd.setnow();
+ }
+#endif
readAt = mSettings->mBuf;
lastPacketTime.set(myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec);
- reportstruct->errwrite=WriteNoErr;
- reportstruct->emptyreport=0;
+ reportstruct->err_readwrite=WriteSuccess;
+ reportstruct->emptyreport = false;
reportstruct->packetLen = 0;
// Finally, post this thread's "job report" which the reporter thread
// will continuously process as long as there are packets flowing
@@ -576,6 +643,48 @@ void Client::Run () {
}
}
+#if HAVE_DECL_TCP_TX_DELAY
+inline void Client::apply_txdelay_func (void) {
+ now.setnow();
+ if (isTcpTxDelay(mSettings) && TcpTxDelayQuantumEnd.before(now)) {
+ // expense the tokens for the current state
+ state_tokens[current_state] -= now.subUsec(TcpTxDelayQuantumEnd);
+ // add tokens
+ do {
+ state_tokens[NO_DELAY] += (int) (mSettings->mTcpTxDelayMean * (1 - mSettings->mTcpTxDelayProb));
+ state_tokens[ADD_DELAY] += (int) (mSettings->mTcpTxDelayMean * mSettings->mTcpTxDelayProb);
+ } while ((state_tokens[NO_DELAY] < 0) && (state_tokens[ADD_DELAY] < 0));
+ // set the next quantum end
+ while (TcpTxDelayQuantumEnd.before(now))
+ TcpTxDelayQuantumEnd.add((unsigned int) TCPDELAYDEFAULTQUANTUM);
+ // do any state change
+ if ((state_tokens[NO_DELAY] < 0) && (current_state == NO_DELAY)) {
+// printf("**** f state change to 0->1 current=%d %d %d\n", current_state, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]);
+ SetSocketTcpTxDelay(mSettings, (mSettings->mTcpTxDelayMean * 1000.0));
+ current_state = ADD_DELAY;
+ } else if ((state_tokens[ADD_DELAY] < 0) && (current_state == ADD_DELAY)) {
+// printf("**** f state change to 1->0 current=%d %d %d\n", current_state, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]);
+ SetSocketTcpTxDelay(mSettings, 0.0);
+ current_state = NO_DELAY;
+ } else {
+ int rval = (random() % 2);
+ // printf("**** curr=%d rval=%d tokens 0:%d 1:%d\n", current_state, rval, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]);
+ if (rval != current_state) {
+ if (rval && (state_tokens[ADD_DELAY] > 0)) {
+// printf("**** state change to 0->1 rval=%d current=%d %d %d\n", rval, current_state, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]);
+ SetSocketTcpTxDelay(mSettings, (mSettings->mTcpTxDelayMean * 1000.0));
+ current_state = ADD_DELAY;
+ } else if ((rval != 1) && (state_tokens[NO_DELAY] > 0)) {
+// printf("**** state change to 1->0 rval=%d current=%d %d %d\n", rval, current_state, state_tokens[NO_DELAY], state_tokens[ADD_DELAY]);
+ SetSocketTcpTxDelay(mSettings, 0.0);
+ current_state = NO_DELAY;
+ }
+ }
+ }
+ }
+}
+#endif
+
/*
* TCP send loop
*/
@@ -588,6 +697,12 @@ void Client::RunTCP () {
reportstruct->packetTime.tv_sec = now.getSecs();
reportstruct->packetTime.tv_usec = now.getUsecs();
reportstruct->write_time = 0;
+#if (HAVE_DECL_SO_MAX_PACING_RATE)
+ if (isFQPacingStep(mSettings)) {
+ PacingStepTime = now;
+ PacingStepTime.add(mSettings->mFQPacingRateStepInterval);
+ }
+#endif
while (InProgress()) {
reportstruct->writecnt = 0;
reportstruct->scheduled = false;
@@ -598,7 +713,7 @@ void Client::RunTCP () {
if (isIsochronous(mSettings)) {
assert(mSettings->mMean);
burst_remaining = static_cast<int>(lognormal(mSettings->mMean,mSettings->mVariance)) / (mSettings->mFPS * 8);
- } else if (isPeriodicBurst(mSettings)){
+ } else if (isBurstSize(mSettings)){
assert(mSettings->mBurstSize);
burst_remaining = mSettings->mBurstSize;
} else {
@@ -609,7 +724,7 @@ void Client::RunTCP () {
burst_remaining = static_cast<int>(sizeof(struct TCP_burst_payload));
// apply scheduling if needed
if (framecounter) {
- burst_id = framecounter->wait_tick(&reportstruct->sched_err);
+ burst_id = framecounter->wait_tick(&reportstruct->sched_err, true);
reportstruct->scheduled = true;
if (isPeriodicBurst(mSettings)) {
// low duty cycle traffic needs special event handling
@@ -619,20 +734,17 @@ void Client::RunTCP () {
reportstruct->packetTime.tv_usec = now.getUsecs();
if (!InProgress()) {
reportstruct->packetLen = 0;
- reportstruct->emptyreport = 1;
+ reportstruct->emptyreport = true;
// wait may have crossed the termination boundry
break;
} else {
//time interval crossings may have occurred during the wait
//post a null event to cause the report to flush the packet ring
- PostNullEvent();
+ PostNullEvent(false, false);
}
}
-#if HAVE_DECL_TCP_NOTSENT_LOWAT
- if (isWritePrefetch(mSettings)) {
- AwaitWriteSelectEventTCP();
- }
-#endif
+ if (isWritePrefetch(mSettings) && !AwaitSelectWrite())
+ continue;
}
now.setnow();
reportstruct->packetTime.tv_sec = now.getSecs();
@@ -659,11 +771,8 @@ void Client::RunTCP () {
if (isTcpWriteTimes(mSettings)) {
write_start.setnow();
}
-#if HAVE_DECL_TCP_NOTSENT_LOWAT
- if (isWritePrefetch(mSettings)) {
- AwaitWriteSelectEventTCP();
- }
-#endif
+ if (isWritePrefetch(mSettings) && !AwaitSelectWrite())
+ continue;
reportstruct->packetLen = write(mySocket, mSettings->mBuf, writelen);
now.setnow();
reportstruct->writecnt++;
@@ -676,28 +785,33 @@ void Client::RunTCP () {
}
if (reportstruct->packetLen <= 0) {
if (reportstruct->packetLen == 0) {
+ reportstruct->err_readwrite=WriteErrFatal;
peerclose = true;
} else if (NONFATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrAccount;
+ reportstruct->err_readwrite=WriteErrAccount;
} else if (FATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrFatal;
- WARN_errno(1, "tcp write");
+ reportstruct->err_readwrite=WriteErrFatal;
+ now.setnow();
+ char warnbuf[WARNBUFSIZE];
+ snprintf(warnbuf, sizeof(warnbuf), "%stcp write (%ld.%ld)", mSettings->mTransferIDStr, now.getSecs(), now.getUsecs());
+ warnbuf[sizeof(warnbuf)-1] = '\0';
+ WARN(1, warnbuf);
break;
} else {
- reportstruct->errwrite=WriteErrNoAccount;
+ reportstruct->err_readwrite=WriteNoAccount;
}
reportstruct->packetLen = 0;
- reportstruct->emptyreport = 1;
+ reportstruct->emptyreport = true;
} else {
- reportstruct->emptyreport = 0;
+ reportstruct->emptyreport = false;
totLen += reportstruct->packetLen;
- reportstruct->errwrite=WriteNoErr;
+ reportstruct->err_readwrite=WriteSuccess;
if (isburst) {
burst_remaining -= reportstruct->packetLen;
if (burst_remaining > 0) {
- reportstruct->transit_ready = 0;
+ reportstruct->transit_ready = false;
} else {
- reportstruct->transit_ready = 1;
+ reportstruct->transit_ready = true;
reportstruct->prevSentTime = myReport->info.ts.prevsendTime;
}
}
@@ -711,7 +825,20 @@ void Client::RunTCP () {
}
}
if (!one_report) {
+#if (HAVE_DECL_SO_MAX_PACING_RATE)
+ if (isFQPacing(mSettings))
+ reportstruct->FQPacingRate = mSettings->mFQPacingRateCurrent;
+#endif
myReportPacket();
+#if (HAVE_DECL_SO_MAX_PACING_RATE)
+ if (isFQPacingStep(mSettings) && PacingStepTime.before(now)) {
+ mSettings->mFQPacingRateCurrent += mSettings->mFQPacingRateStep;
+ setsockopt(mSettings->mSock, SOL_SOCKET, SO_MAX_PACING_RATE, &mSettings->mFQPacingRateCurrent, sizeof(mSettings->mFQPacingRateCurrent));
+ PacingStepTime.add(mSettings->mFQPacingRateStepInterval);
+ socklen_t len = sizeof(reportstruct->FQPacingRate);
+ getsockopt(mSettings->mSock, SOL_SOCKET, SO_MAX_PACING_RATE, &reportstruct->FQPacingRate, &len);
+ }
+#endif
}
}
FinishTrafficActions();
@@ -761,26 +888,26 @@ void Client::RunNearCongestionTCP () {
reportstruct->packetTime.tv_usec = now.getUsecs();
reportstruct->sentTime = reportstruct->packetTime;
ReportNow:
- reportstruct->transit_ready = 0;
+ reportstruct->transit_ready = false;
if (reportstruct->packetLen < 0) {
if (NONFATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrAccount;
+ reportstruct->err_readwrite=WriteErrAccount;
} else if (FATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrFatal;
+ reportstruct->err_readwrite=WriteErrFatal;
WARN_errno(1, "tcp write");
break;
} else {
- reportstruct->errwrite=WriteErrNoAccount;
+ reportstruct->err_readwrite=WriteNoAccount;
}
reportstruct->packetLen = 0;
- reportstruct->emptyreport = 1;
+ reportstruct->emptyreport = true;
} else {
- reportstruct->emptyreport = 0;
+ reportstruct->emptyreport = false;
totLen += reportstruct->packetLen;
- reportstruct->errwrite=WriteNoErr;
+ reportstruct->err_readwrite=WriteSuccess;
burst_remaining -= reportstruct->packetLen;
if (burst_remaining <= 0) {
- reportstruct->transit_ready = 1;
+ reportstruct->transit_ready = true;
}
}
if (isModeAmount(mSettings) && !reportstruct->emptyreport) {
@@ -814,8 +941,7 @@ void Client::RunNearCongestionTCP () {
void Client::RunRateLimitedTCP () {
double tokens = 0;
Timestamp time1, time2;
- int burst_size = mSettings->mBufLen;
- int burst_remaining = 0;
+ int write_remaining = 0;
int burst_id = 1;
long var_rate = mSettings->mAppRate;
@@ -848,61 +974,79 @@ void Client::RunRateLimitedTCP () {
// perform write
int len = 0;
int len2 = 0;
- if (burst_remaining == 0) {
- burst_remaining = mSettings->mBufLen;
+
+ if (isburst && !(write_remaining > 0)) {
+ if (isWritePrefetch(mSettings) && !AwaitSelectWrite())
+ continue;
+ write_remaining = mSettings->mBufLen;
+ // check for TCP minimum payload
+ if (write_remaining < static_cast<int>(sizeof(struct TCP_burst_payload)))
+ write_remaining = static_cast<int>(sizeof(struct TCP_burst_payload));
now.setnow();
reportstruct->packetTime.tv_sec = now.getSecs();
reportstruct->packetTime.tv_usec = now.getUsecs();
reportstruct->sentTime = reportstruct->packetTime;
- WriteTcpTxHdr(reportstruct, burst_size, burst_id++);
+ WriteTcpTxHdr(reportstruct, write_remaining, burst_id++);
// perform write
- if (isTripTime(mSettings)) {
- len = writen(mySocket, mSettings->mBuf, mSettings->mBufLen, &reportstruct->writecnt);
- WARN(len != mSettings->mBufLen, "burst write failed");
- } else {
- len = writen(mySocket, mSettings->mBuf, sizeof(struct TCP_burst_payload), &reportstruct->writecnt);
- WARN(len != sizeof(struct TCP_burst_payload), "burst hdr write failed");
+ // perform write, full header must succeed
+ if (isTcpWriteTimes(mSettings)) {
+ write_start.setnow();
+ }
+ len = writen(mySocket, mSettings->mBuf, write_remaining, &reportstruct->writecnt);
+ WARN((len < static_cast<int> (sizeof(struct TCP_burst_payload))), "burst hdr write failed");
+ if (isTcpWriteTimes(mSettings)) {
+ now.setnow();
+ reportstruct->write_time = now.subUsec(write_start);
}
if (len < 0) {
len = 0;
if (NONFATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrAccount;
+ reportstruct->err_readwrite=WriteErrAccount;
} else if (FATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrFatal;
+ reportstruct->err_readwrite=WriteErrFatal;
WARN_errno(1, "write");
fatalwrite_err = 1;
break;
} else {
- reportstruct->errwrite=WriteErrNoAccount;
+ reportstruct->err_readwrite=WriteNoAccount;
}
} else {
- burst_remaining -= len;
+ write_remaining -= len;
}
// thread_debug("***write burst header %d id=%d", burst_size, (burst_id - 1));
- } else if (reportstruct->packetLen > burst_remaining) {
- reportstruct->packetLen = burst_remaining;
+ } else {
+ write_remaining = mSettings->mBufLen;
}
- if (burst_remaining > 0) {
- len2 = write(mySocket, mSettings->mBuf, reportstruct->packetLen);
+ if (write_remaining > 0) {
+ if (isTcpWriteTimes(mSettings)) {
+ write_start.setnow();
+ }
+ if (isWritePrefetch(mSettings) && !AwaitSelectWrite())
+ continue;
+ len2 = write(mySocket, mSettings->mBuf, write_remaining);
+ if (isTcpWriteTimes(mSettings)) {
+ now.setnow();
+ reportstruct->write_time = now.subUsec(write_start);
+ }
reportstruct->writecnt++;
}
if (len2 < 0) {
len2 = 0;
if (NONFATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrAccount;
+ reportstruct->err_readwrite=WriteErrAccount;
} else if (FATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrFatal;
+ reportstruct->err_readwrite=WriteErrFatal;
WARN_errno(1, "write");
fatalwrite_err = 1;
break;
} else {
- reportstruct->errwrite=WriteErrNoAccount;
+ reportstruct->err_readwrite=WriteNoAccount;
}
} else {
// Consume tokens per the transmit
tokens -= (len + len2);
- totLen += (len + len2);;
- reportstruct->errwrite=WriteNoErr;
+ totLen += (len + len2);
+ reportstruct->err_readwrite=WriteSuccess;
}
time2.setnow();
reportstruct->packetLen = len + len2;
@@ -922,51 +1066,60 @@ void Client::RunRateLimitedTCP () {
}
} else {
// Use a 4 usec delay to fill tokens
-#if HAVE_DECL_TCP_NOTSENT_LOWAT
- if (isWritePrefetch(mSettings)) {
- AwaitWriteSelectEventTCP();
- } else
-#endif
- {
- delay_loop(4);
+ delay_loop(4);
+ if (isWritePrefetch(mSettings) && !AwaitSelectWrite()) {
+ continue;
}
}
}
FinishTrafficActions();
}
+inline bool Client::AwaitSelectWrite (void) {
#if HAVE_DECL_TCP_NOTSENT_LOWAT
-inline bool Client::AwaitWriteSelectEventTCP (void) {
- int rc;
- struct timeval timeout;
- fd_set writeset;
- FD_ZERO(&writeset);
- FD_SET(mySocket, &writeset);
- if (isModeTime(mSettings)) {
- Timestamp write_event_timeout(0,0);
- if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) {
- write_event_timeout.add((double) mSettings->mInterval / 1e6 * 2.0);
+ do {
+ int rc;
+ struct timeval timeout;
+ fd_set writeset;
+ FD_ZERO(&writeset);
+ FD_SET(mySocket, &writeset);
+ if (isModeTime(mSettings)) {
+ Timestamp write_event_timeout(0,0);
+ if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) {
+ write_event_timeout.add((double) mSettings->mInterval / ((mSettings->mThreads > 1) ? 4e6 : 2e6));
+ } else {
+ write_event_timeout.add((double) mSettings->mAmount / 4e2);
+ }
+ timeout.tv_sec = write_event_timeout.getSecs();
+ timeout.tv_usec = write_event_timeout.getUsecs();
} else {
- write_event_timeout.add((double) mSettings->mAmount / 1e2 * 4.0);
+ timeout.tv_sec = 1; // longest is 1 seconds
+ timeout.tv_usec = 0;
}
- timeout.tv_sec = write_event_timeout.getSecs();
- timeout.tv_usec = write_event_timeout.getUsecs();
- } else {
- timeout.tv_sec = 10; // longest is 10 seconds
- timeout.tv_usec = 0;
- }
- if ((rc = select(mySocket + 1, NULL, &writeset, NULL, &timeout)) <= 0) {
- WARN_errno((rc < 0), "select");
-#ifdef HAVE_THREAD_DEBUG
- if (rc == 0)
- thread_debug("AwaitWrite timeout");
+ if ((rc = select(mySocket + 1, NULL, &writeset, NULL, &timeout)) <= 0) {
+ WARN_errno((rc < 0), "select");
+ if (rc <= 0)
+ PostNullEvent(false, true);
+#if HAVE_SUMMING_DEBUG
+ if (rc == 0) {
+ char warnbuf[WARNBUFSIZE];
+ snprintf(warnbuf, sizeof(warnbuf), "%sTimeout: write select", mSettings->mTransferIDStr);
+ warnbuf[sizeof(warnbuf)-1] = '\0';
+ WARN(1, warnbuf);
+ }
#endif
- return false;
- }
+ } else {
+ return true;
+ }
+ } while (InProgress());
+ return false;
+#else
return true;
+#endif
}
+#if HAVE_DECL_TCP_NOTSENT_LOWAT
void Client::RunWriteEventsTCP () {
int burst_id = 0;
int writelen = mSettings->mBufLen;
@@ -975,18 +1128,28 @@ void Client::RunWriteEventsTCP () {
now.setnow();
reportstruct->packetTime.tv_sec = now.getSecs();
reportstruct->packetTime.tv_usec = now.getUsecs();
+#if (HAVE_DECL_SO_MAX_PACING_RATE)
+ if (isFQPacingStep(mSettings)) {
+ PacingStepTime = now;
+ PacingStepTime.add(mSettings->mFQPacingRateStepInterval);
+ }
+#endif
while (InProgress()) {
+#if HAVE_DECL_TCP_TX_DELAY
+// apply_txdelay_func();
+#endif
if (isModeAmount(mSettings)) {
writelen = ((mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
}
now.setnow();
reportstruct->write_time = 0;
+ reportstruct->writecnt = 0;
if (isTcpWriteTimes(mSettings)) {
write_start = now;
}
- bool rc = AwaitWriteSelectEventTCP();
- reportstruct->emptyreport = (rc == false) ? 1 : 0;
- if (rc) {
+ if (isWritePrefetch(mSettings) && !AwaitSelectWrite())
+ continue;
+ if (!reportstruct->emptyreport) {
now.setnow();
reportstruct->packetTime.tv_sec = now.getSecs();
reportstruct->packetTime.tv_usec = now.getUsecs();
@@ -999,7 +1162,7 @@ void Client::RunWriteEventsTCP () {
peerclose = true;
}
reportstruct->packetLen = 0;
- reportstruct->emptyreport = 1;
+ reportstruct->emptyreport = true;
} else if (isTcpWriteTimes(mSettings)) {
write_end.setnow();
reportstruct->write_time = write_end.subUsec(write_start);
@@ -1014,7 +1177,20 @@ void Client::RunWriteEventsTCP () {
}
}
if (!one_report) {
+#if (HAVE_DECL_SO_MAX_PACING_RATE)
+ if (isFQPacing(mSettings))
+ reportstruct->FQPacingRate = mSettings->mFQPacingRateCurrent;
+#endif
myReportPacket();
+#if (HAVE_DECL_SO_MAX_PACING_RATE)
+ if (isFQPacingStep(mSettings) && PacingStepTime.before(now)) {
+ mSettings->mFQPacingRateCurrent += mSettings->mFQPacingRateStep;
+ setsockopt(mSettings->mSock, SOL_SOCKET, SO_MAX_PACING_RATE, &mSettings->mFQPacingRateCurrent, sizeof(mSettings->mFQPacingRateCurrent));
+ PacingStepTime.add(mSettings->mFQPacingRateStepInterval);
+ socklen_t len = sizeof(reportstruct->FQPacingRate);
+ getsockopt(mSettings->mSock, SOL_SOCKET, SO_MAX_PACING_RATE, &reportstruct->FQPacingRate, &len);
+ }
+#endif
}
}
FinishTrafficActions();
@@ -1022,7 +1198,7 @@ void Client::RunWriteEventsTCP () {
#endif
void Client::RunBounceBackTCP () {
int burst_id = 0;
- int writelen = mSettings->mBufLen;
+ int writelen = mSettings->mBounceBackBytes;
memset(mSettings->mBuf, 0x5A, sizeof(struct bounceback_hdr));
if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) {
int sotimer = static_cast<int>(round(mSettings->mInterval / 2.0));
@@ -1044,11 +1220,12 @@ void Client::RunBounceBackTCP () {
reportstruct->packetTime.tv_usec = now.getUsecs();
while (InProgress()) {
int n;
+ long remaining;
reportstruct->writecnt = 0;
bool isFirst;
if (framecounter) {
- burst_id = framecounter->wait_tick(&reportstruct->sched_err);
- PostNullEvent(true); // this will set the now timestamp
+ burst_id = framecounter->wait_tick(&reportstruct->sched_err, false);
+ PostNullEvent(true, false); // now is set in this call
reportstruct->sentTime.tv_sec = now.getSecs();
reportstruct->sentTime.tv_usec = now.getUsecs();
isFirst = true;
@@ -1057,7 +1234,7 @@ void Client::RunBounceBackTCP () {
isFirst = false;
}
int bb_burst = (mSettings->mBounceBackBurst > 0) ? mSettings->mBounceBackBurst : 1;
- while (bb_burst > 0) {
+ while ((bb_burst > 0) && InProgress() && (!framecounter || (framecounter->get(&remaining)) == (unsigned int) burst_id)) {
bb_burst--;
if (isFirst) {
isFirst = false;
@@ -1067,28 +1244,31 @@ void Client::RunBounceBackTCP () {
reportstruct->sentTime.tv_usec = now.getUsecs();
}
WriteTcpTxBBHdr(reportstruct, burst_id, 0);
- reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen, &reportstruct->writecnt);
- if (reportstruct->packetLen <= 0) {
- if ((reportstruct->packetLen < 0) && FATALTCPWRITERR(errno)) {
- reportstruct->errwrite=WriteErrFatal;
+ int write_offset = 0;
+ reportstruct->writecnt = 0;
+ RETRY_WRITE:
+ n = writen(mySocket, (mSettings->mBuf + write_offset), (writelen - write_offset), &reportstruct->writecnt);
+ if (n < 0) {
+ if (FATALTCPWRITERR(errno)) {
+ reportstruct->err_readwrite=WriteErrFatal;
WARN_errno(1, "tcp bounceback write fatal error");
peerclose = true;
- } else if (reportstruct->packetLen == 0) {
- peerclose = true;
- } else if (reportstruct->packetLen != writelen) {
- WARN_errno(1, "tcp bounceback writen incomplete");
- peerclose = true;
- } else {
- // retry the write
- bb_burst++;
- continue;
+ break;
+ } else if (InProgress()) {
+ PostNullEvent(false,false);
+ goto RETRY_WRITE;
}
- break;
}
- if (reportstruct->packetLen == writelen) {
- reportstruct->emptyreport = 0;
- totLen += reportstruct->packetLen;
- reportstruct->errwrite=WriteNoErr;
+ write_offset += n;
+ if ((write_offset < writelen) && InProgress()) {
+ WARN_errno(1, "tcp bounceback writen incomplete");
+ PostNullEvent(false,false);
+ goto RETRY_WRITE;
+ }
+ if (write_offset == writelen) {
+ reportstruct->emptyreport = false;
+ totLen += writelen;
+ reportstruct->err_readwrite=WriteSuccess;
#if HAVE_DECL_TCP_QUICKACK
if (isTcpQuickAck(mSettings)) {
int opt = 1;
@@ -1098,43 +1278,50 @@ void Client::RunBounceBackTCP () {
WARN_errno(rc == SOCKET_ERROR, "setsockopt TCP_QUICKACK");
}
#endif
- if ((n = recvn(mySocket, mSettings->mBuf, mSettings->mBounceBackBytes, 0)) == mSettings->mBounceBackBytes) {
- struct bounceback_hdr *bbhdr = reinterpret_cast<struct bounceback_hdr *>(mSettings->mBuf);
- now.setnow();
- reportstruct->sentTimeRX.tv_sec = ntohl(bbhdr->bbserverRx_ts.sec);
- reportstruct->sentTimeRX.tv_usec = ntohl(bbhdr->bbserverRx_ts.usec);
- reportstruct->sentTimeTX.tv_sec = ntohl(bbhdr->bbserverTx_ts.sec);
- reportstruct->sentTimeTX.tv_usec = ntohl(bbhdr->bbserverTx_ts.usec);
- reportstruct->packetTime.tv_sec = now.getSecs();
- reportstruct->packetTime.tv_usec = now.getUsecs();
- reportstruct->packetLen += n;
- reportstruct->emptyreport = 0;
- myReportPacket();
+ int read_offset = 0;
+ RETRY_READ:
+ n = recvn(mySocket, (mSettings->mBuf + read_offset), (mSettings->mBounceBackReplyBytes - read_offset), 0);
+ if (n > 0) {
+ read_offset += n;
+ if (read_offset == mSettings->mBounceBackReplyBytes) {
+ struct bounceback_hdr *bbhdr = reinterpret_cast<struct bounceback_hdr *>(mSettings->mBuf);
+ now.setnow();
+ reportstruct->sentTimeRX.tv_sec = ntohl(bbhdr->bbserverRx_ts.sec);
+ reportstruct->sentTimeRX.tv_usec = ntohl(bbhdr->bbserverRx_ts.usec);
+ reportstruct->sentTimeTX.tv_sec = ntohl(bbhdr->bbserverTx_ts.sec);
+ reportstruct->sentTimeTX.tv_usec = ntohl(bbhdr->bbserverTx_ts.usec);
+ reportstruct->packetTime.tv_sec = now.getSecs();
+ reportstruct->packetTime.tv_usec = now.getUsecs();
+ reportstruct->packetLen += n;
+ reportstruct->emptyreport = false;
+ reportstruct->packetID = burst_id;
+ myReportPacket();
+ } else if (InProgress()) {
+ PostNullEvent(false,false);
+ goto RETRY_READ;
+ } else {
+ break;
+ }
} else if (n == 0) {
peerclose = true;
- } else if (n < 0) {
+ break;
+ } else {
if (FATALTCPREADERR(errno)) {
- WARN_errno(1, "bounceback read");
+ FAIL_errno(1, "fatal bounceback read", mSettings);
peerclose = true;
- n = 0;
+ break;
} else {
- WARN(1, "timeout: bounceback read");
+ WARN_errno(1, "timeout: bounceback read");
+ PostNullEvent(false,false);
+ if (InProgress())
+ goto RETRY_READ;
}
}
- } else if ((reportstruct->packetLen < 0 ) && NONFATALTCPWRITERR(errno)) {
- reportstruct->packetLen = 0;
- reportstruct->emptyreport = 1;
- reportstruct->errwrite=WriteErrNoAccount;
- myReportPacket();
- } else {
- reportstruct->errwrite=WriteErrFatal;
- reportstruct->packetLen = -1;
- FAIL_errno(1, "tcp bounce-back write", mSettings);
}
}
}
- WriteTcpTxBBHdr(reportstruct, 0x0, 1);
- disarm_itimer();
+ if (!peerclose)
+ WriteTcpTxBBHdr(reportstruct, 0x0, 1); // Signal end of BB test
FinishTrafficActions();
}
/*
@@ -1148,10 +1335,11 @@ double Client::get_delay_target () {
// compute delay target in units of nanoseconds
if (mSettings->mAppRateUnits == kRate_BW) {
// compute delay for bandwidth restriction, constrained to [0,1] seconds
- delay_target = (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits)
- / mSettings->mAppRate));
+ delay_target = ((mSettings->mAppRate > 0) ? \
+ (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits) / mSettings->mAppRate)) \
+ : 0);
} else {
- delay_target = 1e9 / mSettings->mAppRate;
+ delay_target = (mSettings->mAppRate > 0) ? (1e9 / mSettings->mAppRate) : 0;
}
}
return delay_target;
@@ -1202,53 +1390,77 @@ void Client::RunUDP () {
mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec);
mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec);
- // Adjustment for the running delay
- // o measure how long the last loop iteration took
- // o calculate the delay adjust
- // - If write succeeded, adjust = target IPG - the loop time
- // - If write failed, adjust = the loop time
- // o then adjust the overall running delay
- // Note: adjust units are nanoseconds,
- // packet timestamps are microseconds
- if (currLen > 0)
- adjust = delay_target + \
- (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime));
- else
- adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime);
-
- lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec);
- // Since linux nanosleep/busyloop can exceed delay
- // there are two possible equilibriums
- // 1) Try to perserve inter packet gap
- // 2) Try to perserve requested transmit rate
- // The latter seems preferred, hence use a running delay
- // that spans the life of the thread and constantly adjust.
- // A negative delay means the iperf app is behind.
- delay += adjust;
- // Don't let delay grow unbounded
- if (delay < delay_lower_bounds) {
- delay = delay_target;
- }
+ if (delay_target > 0) {
+ // Adjustment for the running delay
+ // o measure how long the last loop iteration took
+ // o calculate the delay adjust
+ // - If write succeeded, adjust = target IPG - the loop time
+ // - If write failed, adjust = the loop time
+ // o then adjust the overall running delay
+ // Note: adjust units are nanoseconds,
+ // packet timestamps are microseconds
+ if (currLen > 0)
+ adjust = delay_target + \
+ (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime));
+ else
+ adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime);
- reportstruct->errwrite = WriteNoErr;
- reportstruct->emptyreport = 0;
+ lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec);
+ // Since linux nanosleep/busyloop can exceed delay
+ // there are two possible equilibriums
+ // 1) Try to perserve inter packet gap
+ // 2) Try to perserve requested transmit rate
+ // The latter seems preferred, hence use a running delay
+ // that spans the life of the thread and constantly adjust.
+ // A negative delay means the iperf app is behind.
+ delay += adjust;
+ // Don't let delay grow unbounded
+ if (delay < delay_lower_bounds) {
+ delay = delay_target;
+ }
+ }
+ reportstruct->err_readwrite = WriteSuccess;
+ reportstruct->emptyreport = false;
// perform write
if (isModeAmount(mSettings)) {
currLen = write(mySocket, mSettings->mBuf, (mSettings->mAmount < static_cast<unsigned>(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen);
} else {
+ currLen = -1;
+#if (HAVE_USE_WRITE_SELECT) && (HAVE_SELECT)
+#if HAVE_DECL_MSG_DONTWAIT
+ currLen = send(mySocket, mSettings->mBuf, mSettings->mBufLen, MSG_DONTWAIT);
+ if ((currLen < 0) && !FATALUDPWRITERR(errno)) {
+ if (AwaitSelectWrite()) {
+ currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
+ reportstruct->err_readwrite = WriteSelectRetry;
+ } else {
+ reportstruct->err_readwrite = WriteTimeo;
+ }
+ }
+#else
+ if (AwaitSelectWrite()) {
+ currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
+ } else {
+ reportstruct->err_readwrite = WriteTimeo;
+ }
+#endif
+#else
currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen);
+#endif
}
if (currLen < 0) {
reportstruct->packetID--;
if (FATALUDPWRITERR(errno)) {
- reportstruct->errwrite = WriteErrFatal;
+ reportstruct->err_readwrite = WriteErrFatal;
WARN_errno(1, "write");
+ currLen = 0;
break;
} else {
- reportstruct->errwrite = WriteErrAccount;
+ //WARN_errno(1, "write n");
currLen = 0;
+ reportstruct->err_readwrite = WriteErrAccount;
}
- reportstruct->emptyreport = 1;
+ reportstruct->emptyreport = true;
}
if (isModeAmount(mSettings)) {
@@ -1310,7 +1522,7 @@ void Client::RunUDPIsochronous () {
udp_payload->isoch.burstsize = htonl(bytecnt);
udp_payload->isoch.prevframeid = htonl(frameid);
reportstruct->burstsize=bytecnt;
- frameid = framecounter->wait_tick(&reportstruct->sched_err);
+ frameid = framecounter->wait_tick(&reportstruct->sched_err, true);
reportstruct->scheduled = true;
udp_payload->isoch.frameid = htonl(frameid);
lastPacketTime.setnow();
@@ -1356,8 +1568,8 @@ void Client::RunUDPIsochronous () {
// delay = delay_target;
// }
- reportstruct->errwrite = WriteNoErr;
- reportstruct->emptyreport = 0;
+ reportstruct->err_readwrite = WriteSuccess;
+ reportstruct->emptyreport = false;
reportstruct->writecnt = 1;
// perform write
@@ -1373,21 +1585,22 @@ void Client::RunUDPIsochronous () {
if (currLen < 0) {
reportstruct->packetID--;
- reportstruct->emptyreport = 1;
- currLen = 0;
+ reportstruct->emptyreport = true;
if (FATALUDPWRITERR(errno)) {
- reportstruct->errwrite = WriteErrFatal;
+ reportstruct->err_readwrite = WriteErrFatal;
WARN_errno(1, "write");
fatalwrite_err = 1;
- } else {
- reportstruct->errwrite = WriteErrAccount;
+ currLen = 0;
+ } else {
+ currLen = 0;
+ reportstruct->err_readwrite = WriteErrAccount;
}
} else {
bytecnt -= currLen;
if (!bytecnt)
- reportstruct->transit_ready = 1;
+ reportstruct->transit_ready = true;
else
- reportstruct->transit_ready = 0;
+ reportstruct->transit_ready = false;
// adjust bytecnt so last packet of burst is greater or equal to min packet
if ((bytecnt > 0) && (bytecnt < udp_payload_minimum)) {
bytecnt = udp_payload_minimum;
@@ -1504,8 +1717,11 @@ void Client::WriteTcpTxBBHdr (struct ReportStruct *reportstruct, uint32_t bbid,
if (final) {
bbflags |= HEADER_BBSTOP;
}
+ if ((mSettings->mBounceBackReplyBytes > 0) && (mSettings->mBounceBackReplyBytes != mSettings->mBounceBackBytes)) {
+ bbflags |= HEADER_BBREPLYSIZE;
+ }
mBuf_bb->bbflags = htons(bbflags);
- mBuf_bb->bbsize = htonl(mSettings->mBufLen);
+ mBuf_bb->bbsize = htonl(mSettings->mBounceBackBytes);
mBuf_bb->bbid = htonl(bbid);
mBuf_bb->bbclientTx_ts.sec = htonl(reportstruct->packetTime.tv_sec);
mBuf_bb->bbclientTx_ts.usec = htonl(reportstruct->packetTime.tv_usec);
@@ -1514,6 +1730,7 @@ void Client::WriteTcpTxBBHdr (struct ReportStruct *reportstruct, uint32_t bbid,
mBuf_bb->bbserverTx_ts.sec = -1;
mBuf_bb->bbserverTx_ts.usec = -1;
mBuf_bb->bbhold = htonl(mSettings->mBounceBackHold);
+ mBuf_bb->bbreplysize = htonl(mSettings->mBounceBackReplyBytes);
}
inline bool Client::InProgress (void) {
@@ -1535,7 +1752,10 @@ inline void Client::tcp_shutdown (void) {
#ifdef HAVE_THREAD_DEBUG
thread_debug("Client calls shutdown() SHUTW_WR on tcp socket %d", mySocket);
#endif
- WARN_errno(rc == SOCKET_ERROR, "shutdown");
+ char warnbuf[256];
+ snprintf(warnbuf, sizeof(warnbuf), "%sshutdown", mSettings->mTransferIDStr);
+ warnbuf[sizeof(warnbuf)-1] = '\0';
+ WARN_errno(rc == SOCKET_ERROR, warnbuf);
if (!rc && !isFullDuplex(mSettings))
AwaitServerCloseEvent();
}
@@ -1557,7 +1777,9 @@ void Client::FinishTrafficActions () {
disarm_itimer();
// Shutdown the TCP socket's writes as the event for the server to end its traffic loop
if (!isUDP(mSettings)) {
- tcp_shutdown();
+ if (!isIgnoreShutdown(mSettings)) {
+ tcp_shutdown();
+ }
now.setnow();
reportstruct->packetTime.tv_sec = now.getSecs();
reportstruct->packetTime.tv_usec = now.getUsecs();
@@ -1568,6 +1790,8 @@ void Client::FinishTrafficActions () {
*
*/
reportstruct->packetLen = totLen;
+ reportstruct->err_readwrite = WriteSuccess;
+ myReportPacket();
}
} else {
// stop timing
@@ -1593,7 +1817,7 @@ void Client::FinishTrafficActions () {
}
reportstruct->packetLen = 0;
}
- int do_close = EndJob(myJob, reportstruct);
+ bool do_close = EndJob(myJob, reportstruct);
if (isIsochronous(mSettings) && (myReport->info.schedule_error.cnt > 2)) {
fprintf(stderr,"%sIsoch schedule errors (mean/min/max/stdev) = %0.3f/%0.3f/%0.3f/%0.3f ms\n",mSettings->mTransferIDStr, \
((myReport->info.schedule_error.sum / myReport->info.schedule_error.cnt) * 1e-3), (myReport->info.schedule_error.min * 1e-3), \
@@ -1632,6 +1856,7 @@ void Client::AwaitServerFinPacket () {
struct timeval timeout;
int ack_success = 0;
int count = RETRYCOUNT;
+ static int read_warn_rate_limiter = 3; // rate limit read warn msgs
while (--count >= 0) {
// wait until the socket is readable, or our timeout expires
FD_ZERO(&readSet);
@@ -1664,8 +1889,16 @@ void Client::AwaitServerFinPacket () {
continue;
}
}
-
- WARN_errno(rc < 0, "read");
+ // only warn when threads is small, too many warnings are too much outputs
+ if (rc < 0 && (--read_warn_rate_limiter > 0)) {
+ int len = snprintf(NULL, 0, "%sRead UDP fin", mSettings->mTransferIDStr);
+ char *tmpbuf = (char *)calloc(1, len + 2);
+ if (tmpbuf) {
+ len = snprintf(tmpbuf, len + 1, "%sRead UDP fin", mSettings->mTransferIDStr);
+ WARN_errno(1, tmpbuf);
+ free(tmpbuf);
+ }
+ }
if (rc > 0) {
ack_success = 1;
#ifdef HAVE_THREAD_DEBUG
@@ -1682,32 +1915,23 @@ void Client::AwaitServerFinPacket () {
fprintf(stderr, warn_no_ack, mySocket, (isModeTime(mSettings) ? 10 : 1));
}
-
-void Client::PostNullEvent (void) {
+// isFirst indicates first event occurred per wait_tick
+void Client::PostNullEvent (bool isFirst, bool select_retry) {
assert(myReport!=NULL);
// push a nonevent into the packet ring
// this will cause the reporter to process
// up to this event
- memset(reportstruct, 0, sizeof(struct ReportStruct));
+ struct ReportStruct report_nopacket;
+ memset(&report_nopacket, 0, sizeof(struct ReportStruct));
now.setnow();
- reportstruct->packetTime.tv_sec = now.getSecs();
- reportstruct->packetTime.tv_usec = now.getUsecs();
- reportstruct->emptyreport=1;
- myReportPacket();
-}
-
-void Client::PostNullEvent (bool isFirst) {
- assert(myReport!=NULL);
- // push a nonevent into the packet ring
- // this will cause the reporter to process
- // up to this event
- memset(reportstruct, 0, sizeof(struct ReportStruct));
- now.setnow();
- reportstruct->packetTime.tv_sec = now.getSecs();
- reportstruct->packetTime.tv_usec = now.getUsecs();
- reportstruct->emptyreport=1;
- reportstruct->scheduled = isFirst;
- myReportPacket();
+ report_nopacket.packetTime.tv_sec = now.getSecs();
+ report_nopacket.packetTime.tv_usec = now.getUsecs();
+ report_nopacket.emptyreport = true;
+ report_nopacket.scheduled = isFirst;
+ report_nopacket.packetID = 0;
+ report_nopacket.err_readwrite = (select_retry ? WriteSelectRetry : WriteNoAccount);
+ reportstruct->packetTime = report_nopacket.packetTime; // needed for the InProgress loop test
+ myReportPacket(&report_nopacket);
}
// The client end timer is based upon the final fin, fin-ack w/the server
@@ -1717,7 +1941,7 @@ void Client::PostNullEvent (bool isFirst) {
#define MINAWAITCLOSEUSECS 2000000
void Client::AwaitServerCloseEvent () {
// the await detection can take awhile so post a non event ahead of it
- PostNullEvent();
+ PostNullEvent(false,false);
unsigned int amount_usec = \
(isModeTime(mSettings) ? static_cast<int>(mSettings->mAmount * 10000) : MINAWAITCLOSEUSECS);
if (amount_usec < MINAWAITCLOSEUSECS)
@@ -1739,6 +1963,9 @@ void Client::AwaitServerCloseEvent () {
int Client::SendFirstPayload () {
int pktlen = 0;
if (!isConnectOnly(mSettings)) {
+ if (isUDP(mSettings) && (mSettings->sendfirst_pacing > 0)) {
+ delay_loop(mSettings->sendfirst_pacing);
+ }
if (myReport && !TimeZero(myReport->info.ts.startTime) && !(mSettings->mMode == kTest_TradeOff)) {
reportstruct->packetTime = myReport->info.ts.startTime;
} else {
@@ -1746,7 +1973,7 @@ int Client::SendFirstPayload () {
reportstruct->packetTime.tv_sec = now.getSecs();
reportstruct->packetTime.tv_usec = now.getUsecs();
}
- pattern(mSettings->mBuf, mSettings->mBufLen);
+
if (isTxStartTime(mSettings)) {
pktlen = Settings_GenerateClientHdr(mSettings, (void *) mSettings->mBuf, mSettings->txstart_epoch);
} else {
@@ -1766,6 +1993,9 @@ int Client::SendFirstPayload () {
#endif
apply_first_udppkt_delay = true;
} else {
+ // Set the send timeout for the very first write which has the test exchange
+ int sosndtimer = DEFAULT_TESTEXCHANGETIMEOUT; //in usecs
+ SetSocketOptionsSendTimeout(mSettings, sosndtimer);
#if HAVE_DECL_TCP_NODELAY
if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) {
int optflag=1;
@@ -1781,9 +2011,19 @@ int Client::SendFirstPayload () {
#else
pktlen = send(mySocket, mSettings->mBuf, pktlen, 0);
#endif
+ SetSocketOptionsSendTimeout(mSettings, mSettings->sosndtimer);
if (isPeerVerDetect(mSettings) && !isServerReverse(mSettings)) {
PeerXchange();
}
+ if (!isFileInput(mSettings)) {
+ int buflen = (mSettings->mBufLen < (int) sizeof(struct client_tcp_testhdr)) ? mSettings->mBufLen \
+ : sizeof(struct client_tcp_testhdr);
+ if (isTripTime(mSettings)) {
+ memset(mSettings->mBuf, 0xA5, buflen);
+ } else {
+ pattern(mSettings->mBuf, buflen); // reset the pattern in the payload for future writes
+ }
+ }
#if HAVE_DECL_TCP_NODELAY
if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) {
int optflag=0;
@@ -1833,22 +2073,15 @@ void Client::PeerXchange () {
/*
* BarrierClient allows for multiple stream clients to be syncronized
*/
-int Client::BarrierClient (struct BarrierMutex *barrier) {
- int last = 0;
+#define BARRIER_MIN 100 // units is seconds to inform the server
+bool Client::BarrierClient (struct BarrierMutex *barrier) {
+ bool last = false;
#ifdef HAVE_THREAD
assert(barrier != NULL);
+ Timestamp now;
Condition_Lock(barrier->await);
if (--barrier->count <= 0) {
- // store the barrier release timer
-#ifdef HAVE_CLOCK_GETTIME
- struct timespec t1;
- clock_gettime(CLOCK_REALTIME, &t1);
- barrier->release_time.tv_sec = t1.tv_sec;
- barrier->release_time.tv_usec = t1.tv_nsec / 1000;
-#else
- gettimeofday(&barrier->release_time, NULL);
-#endif
- last = 1;
+ last = true;
// last one wake's up everyone else
Condition_Broadcast(&barrier->await);
#ifdef HAVE_THREAD_DEBUG
@@ -1858,14 +2091,25 @@ int Client::BarrierClient (struct BarrierMutex *barrier) {
#ifdef HAVE_THREAD_DEBUG
thread_debug("Barrier WAIT on condition %p count=%d", (void *)&barrier->await, barrier->count);
#endif
- Condition_Wait(&barrier->await);
+ if (isModeTime(mSettings)) {
+ int barrier_wait_secs = int(mSettings->mAmount / 100); // convert from 10 ms to seconds
+ if (barrier_wait_secs <= 0)
+ barrier_wait_secs = 1;
+ Condition_TimedWait(&barrier->await, barrier_wait_secs);
+ } else {
+ Condition_Wait(&barrier->await);
+ }
}
Condition_Unlock(barrier->await);
#ifdef HAVE_THREAD_DEBUG
thread_debug("Barrier EXIT on condition %p", (void *)&barrier->await);
#endif
+ mSettings->barrier_time = now.delta_usec();
+ if (mSettings->barrier_time < BARRIER_MIN) {
+ mSettings->barrier_time = 0;
+ }
#else
- last = 1;
+ last = true;
#endif // HAVE_THREAD
return last;
}