From 7c720bec5600a9e607c875c670ca30ed351fa4ba Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 15 Apr 2024 20:20:54 +0200 Subject: Adding upstream version 2.1.9+dfsg. Signed-off-by: Daniel Baumann --- src/Client.cpp | 1871 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1871 insertions(+) create mode 100644 src/Client.cpp (limited to 'src/Client.cpp') diff --git a/src/Client.cpp b/src/Client.cpp new file mode 100644 index 0000000..89e47e2 --- /dev/null +++ b/src/Client.cpp @@ -0,0 +1,1871 @@ +/*--------------------------------------------------------------- + * Copyright (c) 1999,2000,2001,2002,2003 + * The Board of Trustees of the University of Illinois + * All Rights Reserved. + *--------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software (Iperf) and associated + * documentation files (the "Software"), to deal in the Software + * without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, + * sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * + * Redistributions of source code must retain the above + * copyright notice, this list of conditions and + * the following disclaimers. + * + * + * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimers in the documentation and/or other materials + * provided with the distribution. + * + * + * Neither the names of the University of Illinois, NCSA, + * nor the names of its contributors may be used to endorse + * or promote products derived from this Software without + * specific prior written permission. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTIBUTORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * ________________________________________________________________ + * National Laboratory for Applied Network Research + * National Center for Supercomputing Applications + * University of Illinois at Urbana-Champaign + * http://www.ncsa.uiuc.edu + * ________________________________________________________________ + * + * Client.cpp + * by Mark Gates + * ------------------------------------------------------------------- + * A client thread initiates a connect to the server and handles + * sending and receiving data, then closes the socket. + * ------------------------------------------------------------------- */ +#include +#include +#include "headers.h" +#include "Client.hpp" +#include "Thread.h" +#include "SocketAddr.h" +#include "PerfSocket.hpp" +#include "Extractor.h" +#include "delay.h" +#include "util.h" +#include "Locale.h" +#include "isochronous.hpp" +#include "pdfs.h" +#include "version.h" +#include "payloads.h" +#include "active_hosts.h" +#include "gettcpinfo.h" + +// const double kSecs_to_usecs = 1e6; +const double kSecs_to_nsecs = 1e9; +const int kBytes_to_Bits = 8; + +#define VARYLOAD_PERIOD 0.1 // recompute the variable load every n seconds +#define MAXUDPBUF 1470 + +Client::Client (thread_Settings *inSettings) { +#ifdef HAVE_THREAD_DEBUG + thread_debug("Client constructor with thread %p sum=%p (flags=%x)", (void *) inSettings, (void *)inSettings->mSumReport, inSettings->flags); +#endif + mSettings = inSettings; + myJob = NULL; + myReport = NULL; + framecounter = NULL; + one_report = false; + udp_payload_minimum = 1; + apply_first_udppkt_delay = false; + + memset(&scratchpad, 0, sizeof(struct ReportStruct)); + reportstruct = &scratchpad; + reportstruct->packetID = 1; + mySocket = isServerReverse(mSettings) ? mSettings->mSock : INVALID_SOCKET; + connected = isServerReverse(mSettings); + if (isCompat(mSettings) && isPeerVerDetect(mSettings)) { + fprintf(stderr, "%s", warn_compat_and_peer_exchange); + unsetPeerVerDetect(mSettings); + } + + pattern(mSettings->mBuf, mSettings->mBufLen); + if (isIsochronous(mSettings)) { + FAIL_errno(!(mSettings->mFPS > 0.0), "Invalid value for frames per second in the isochronous settings\n", mSettings); + } + if (isFileInput(mSettings)) { + if (!isSTDIN(mSettings)) + Extractor_Initialize(mSettings->mFileName, mSettings->mBufLen, mSettings); + else + Extractor_InitializeFile(stdin, mSettings->mBufLen, mSettings); + + if (!Extractor_canRead(mSettings)) { + unsetFileInput(mSettings); + } + } + peerclose = false; + mysock_init_done = false; + isburst = (isIsochronous(mSettings) || isPeriodicBurst(mSettings) || (isTripTime(mSettings) && !isUDP(mSettings))); +} // end Client + +/* ------------------------------------------------------------------- + * Destructor + * ------------------------------------------------------------------- */ +Client::~Client () { +#if HAVE_THREAD_DEBUG + thread_debug("Client destructor sock=%d report=%p server-reverse=%s fullduplex=%s", \ + mySocket, (void *) mSettings->reporthdr, \ + (isServerReverse(mSettings) ? "true" : "false"), (isFullDuplex(mSettings) ? "true" : "false")); +#endif + DELETE_PTR(framecounter); +} // end ~Client + +/* ------------------------------------------------------------------- + * Setup a socket connected to a server. + * If inLocalhost is not null, bind to that address, specifying + * which outgoing interface to use. + * ------------------------------------------------------------------- */ +void Client::mySockInit (void) { + // create an internet socket + int type = (isUDP(mSettings) ? SOCK_DGRAM : SOCK_STREAM); + int domain = (SockAddr_isIPv6(&mSettings->peer) ? +#if HAVE_IPV6 + AF_INET6 +#else + AF_INET +#endif + : AF_INET); + + mySocket = socket(domain, type, 0); + WARN_errno(mySocket == INVALID_SOCKET, "socket"); + // Socket is carried both by the object and the thread + mSettings->mSock=mySocket; + SetSocketOptions(mSettings); + SockAddr_localAddr(mSettings); + SockAddr_remoteAddr(mSettings); + if (mSettings->mLocalhost != NULL) { + // bind socket to local address + int rc = bind(mySocket, reinterpret_cast(&mSettings->local), + SockAddr_get_sizeof_sockaddr(&mSettings->local)); + WARN_errno(rc == SOCKET_ERROR, "bind"); + } + mysock_init_done = true; + if (!isUDP(mSettings) && isReport(mSettings) && isSettingsReport(mSettings)) { + struct ReportHeader *tmp = InitSettingsReport(mSettings); + assert(tmp!=NULL); + PostReport(tmp); + setNoSettReport(mSettings); + } +} +/* ------------------------------------------------------------------- + * Setup a socket connected to a server. + * If inLocalhost is not null, bind to that address, specifying + * which outgoing interface to use. + * ------------------------------------------------------------------- */ +bool Client::my_connect (bool close_on_fail) { + int rc; + if (!mysock_init_done) { + mySockInit(); + } + // connect socket + connected = false; + mSettings->tcpinitstats.connecttime = -1; + if (!isUDP(mSettings)) { + int trycnt = mSettings->mConnectRetries + 1; + while (trycnt > 0) { + connect_start.setnow(); + rc = connect(mySocket, reinterpret_cast(&mSettings->peer), + SockAddr_get_sizeof_sockaddr(&mSettings->peer)); + WARN_errno((rc == SOCKET_ERROR), "tcp connect"); + if (rc == SOCKET_ERROR) { + if ((--trycnt) <= 0) { + if (close_on_fail) { + close(mySocket); + mySocket = INVALID_SOCKET; + } + } else { + delay_loop(200000); + } + } else { + connect_done.setnow(); + mSettings->tcpinitstats.connecttime = 1e3 * connect_done.subSec(connect_start); + connected = true; + break; + } + } + } else { + rc = connect(mySocket, reinterpret_cast(&mSettings->peer), + SockAddr_get_sizeof_sockaddr(&mSettings->peer)); + mSettings->tcpinitstats.connecttime = 0.0; // UDP doesn't have a 3WHS + WARN_errno((rc == SOCKET_ERROR), "udp connect"); + if (rc != SOCKET_ERROR) + connected = true; + } + if (connected) { +#if HAVE_TCP_STATS + assert(reportstruct); + if (!isUDP(mSettings)) { + gettcpinfo(mySocket, &mSettings->tcpinitstats); + } +#endif + // Set the send timeout for the very first write which has the test exchange + int sosndtimer = TESTEXCHANGETIMEOUT; // 4 sec in usecs + SetSocketOptionsSendTimeout(mSettings, sosndtimer); + getsockname(mySocket, reinterpret_cast(&mSettings->local), &mSettings->size_local); + getpeername(mySocket, reinterpret_cast(&mSettings->peer), &mSettings->size_peer); + SockAddr_Ifrname(mSettings); +#ifdef DEFAULT_PAYLOAD_LEN_PER_MTU_DISCOVERY + if (isUDP(mSettings) && !isBuflenSet(mSettings)) { + checksock_max_udp_payload(mSettings); + } +#endif + if (isUDP(mSettings) && !isIsochronous(mSettings) && !isIPG(mSettings)) { + mSettings->mBurstIPG = get_delay_target() / 1e3; // this is being set for the settings report only + } + if (isReport(mSettings) && isSettingsReport(mSettings)) { + struct ReportHeader *tmp = InitSettingsReport(mSettings); + assert(tmp!=NULL); + PostReport(tmp); + setNoSettReport(mSettings); + } + } else { + if (mySocket != INVALID_SOCKET) { + int rc = close(mySocket); + WARN_errno(rc == SOCKET_ERROR, "client connect close"); + mySocket = INVALID_SOCKET; + } + } + // Post the connect report unless peer version exchange is set + if (isConnectionReport(mSettings) && !isSumOnly(mSettings)) { + if (connected) { + struct ReportHeader *reporthdr = InitConnectionReport(mSettings); + struct ConnectionInfo *cr = static_cast(reporthdr->this_report); + cr->connect_timestamp.tv_sec = connect_start.getSecs(); + cr->connect_timestamp.tv_usec = connect_start.getUsecs(); + assert(reporthdr); + PostReport(reporthdr); + } else { + PostReport(InitConnectionReport(mSettings)); + } + } + return connected; +} // end Connect + +bool Client::isConnected () const { +#ifdef HAVE_THREAD_DEBUG + // thread_debug("Client is connected %d", connected); +#endif + return connected; +} + +void Client::TxDelay () { + if (isTxHoldback(mSettings)) { + clock_usleep(&mSettings->txholdback_timer); + } +} + +inline void Client::myReportPacket (void) { + ReportPacket(myReport, reportstruct); + reportstruct->packetLen = 0; +} + + +// There are multiple startup synchronizations, this code +// handles them all. The caller decides to apply them +// either before connect() or after connect() and before writes() +int Client::StartSynch () { +#ifdef HAVE_THREAD_DEBUG + thread_debug("Client start sync enterred"); +#endif + bool delay_test_exchange = (isFullDuplex(mSettings) && isUDP(mSettings)); + if (isTxStartTime(mSettings) && delay_test_exchange) { + clock_usleep_abstime(&mSettings->txstart_epoch); + } + myJob = InitIndividualReport(mSettings); + myReport = static_cast(myJob->this_report); + myReport->info.common->socket=mySocket; + myReport->info.isEnableTcpInfo = false; // default here, set in init traffic actions + if (!isReverse(mSettings) && (mSettings->mReportMode == kReport_CSV)) { + format_ips_port_string(&myReport->info, 0); + } + + // Perform delays, usually between connect() and data xfer though before connect + // Two delays are supported: + // o First is an absolute start time per unix epoch format + // o Second is a holdback, a relative amount of seconds between the connect and data xfers + // check for an epoch based start time + reportstruct->packetLen = 0; + if (!isServerReverse(mSettings)) { + if (!isCompat(mSettings) && !isBounceBack(mSettings)) { + reportstruct->packetLen = SendFirstPayload(); + // Reverse UDP tests need to retry "first sends" a few times + // before going to server or read mode + if (isReverse(mSettings) && isUDP(mSettings)) { + reportstruct->packetLen = 0; + fd_set set; + struct timeval timeout; + int resend_udp = 100; + while (--resend_udp > 0) { + FD_ZERO(&set); + FD_SET(mySocket, &set); + timeout.tv_sec = 0; + timeout.tv_usec = rand() % 20000; // randomize IPG a bit + if (select(mySocket + 1, &set, NULL, NULL, &timeout) == 0) { + reportstruct->packetLen = SendFirstPayload(); + // printf("**** resend sock=%d count=%d\n", mySocket, resend_udp); + } else { + break; + } + } + } + } + if (isTxStartTime(mSettings) && !delay_test_exchange) { + clock_usleep_abstime(&mSettings->txstart_epoch); + } else if (isTxHoldback(mSettings)) { + TxDelay(); + } + // Server side client + } else if (isTripTime(mSettings) || isPeriodicBurst(mSettings)) { + reportstruct->packetLen = SendFirstPayload(); + } + if (isIsochronous(mSettings) || isPeriodicBurst(mSettings)) { + Timestamp tmp; + tmp.set(mSettings->txstart_epoch.tv_sec, mSettings->txstart_epoch.tv_usec); + framecounter = new Isochronous::FrameCounter(mSettings->mFPS, tmp); + // set the mbuf valid for burst period ahead of time. The same value will be set for all burst writes + if (!isUDP(mSettings) && framecounter) { + struct TCP_burst_payload * mBuf_burst = reinterpret_cast(mSettings->mBuf); + mBuf_burst->burst_period_us = htonl(framecounter->period_us()); + } + } + int setfullduplexflag = 0; + if (isFullDuplex(mSettings) && !isServerReverse(mSettings)) { + assert(mSettings->mFullDuplexReport != NULL); + if ((setfullduplexflag = fullduplex_start_barrier(&mSettings->mFullDuplexReport->fullduplex_barrier)) < 0) + return -1; + } + SetReportStartTime(); +#if HAVE_TCP_STATS + if (!isUDP(mSettings)) { + // Near congestion and peridiodic need sampling on every report packet + if (isNearCongest(mSettings) || isPeriodicBurst(mSettings)) { + myReport->info.isEnableTcpInfo = true; + myReport->info.ts.nextTCPSampleTime.tv_sec = 0; + myReport->info.ts.nextTCPSampleTime.tv_usec = 0; + } else if (isEnhanced(mSettings) || isBounceBack(mSettings)) { + myReport->info.isEnableTcpInfo = true; + myReport->info.ts.nextTCPSampleTime = myReport->info.ts.nextTime; + } + } +#endif + + if (reportstruct->packetLen > 0) { + reportstruct->packetTime = myReport->info.ts.startTime; + reportstruct->sentTime = reportstruct->packetTime; + reportstruct->prevSentTime = reportstruct->packetTime; + reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime; + if (isModeAmount(mSettings)) { + mSettings->mAmount -= reportstruct->packetLen; + } + myReportPacket(); + myReport->info.ts.prevpacketTime = reportstruct->packetTime; + reportstruct->packetID++; + } + if (setfullduplexflag) { + SetFullDuplexReportStartTime(); + } + // Full duplex sockets need to be syncronized +#ifdef HAVE_THREAD_DEBUG + thread_debug("Client start sync exited"); +#endif + return 0; +} + +inline void Client::SetFullDuplexReportStartTime () { + assert(myReport->FullDuplexReport != NULL); + struct TransferInfo *fullduplexstats = &myReport->FullDuplexReport->info; + assert(fullduplexstats != NULL); + if (TimeZero(fullduplexstats->ts.startTime)) { + fullduplexstats->ts.startTime = myReport->info.ts.startTime; + if (isModeTime(mSettings)) { + fullduplexstats->ts.nextTime = myReport->info.ts.nextTime; + } + } +#ifdef HAVE_THREAD_DEBUG + thread_debug("Client fullduplex report start=%ld.%ld next=%ld.%ld", fullduplexstats->ts.startTime.tv_sec, fullduplexstats->ts.startTime.tv_usec, fullduplexstats->ts.nextTime.tv_sec, fullduplexstats->ts.nextTime.tv_usec); +#endif +} +inline void Client::SetReportStartTime () { + assert(myReport!=NULL); + now.setnow(); + myReport->info.ts.startTime.tv_sec = now.getSecs(); + myReport->info.ts.startTime.tv_usec = now.getUsecs(); + myReport->info.ts.IPGstart = myReport->info.ts.startTime; + myReport->info.ts.prevpacketTime = myReport->info.ts.startTime; + if (!TimeZero(myReport->info.ts.intervalTime)) { + myReport->info.ts.nextTime = myReport->info.ts.startTime; + TimeAdd(myReport->info.ts.nextTime, myReport->info.ts.intervalTime); + } + if (myReport->GroupSumReport) { + struct TransferInfo *sumstats = &myReport->GroupSumReport->info; + assert(sumstats != NULL); + Mutex_Lock(&myReport->GroupSumReport->reference.lock); + if (TimeZero(sumstats->ts.startTime)) { + sumstats->ts.startTime = myReport->info.ts.startTime; + if (isModeTime(mSettings) || isModeInfinite(mSettings)) { + sumstats->ts.nextTime = myReport->info.ts.nextTime; + } +#ifdef HAVE_THREAD_DEBUG + thread_debug("Client group sum report start=%ld.%ld next=%ld.%ld", sumstats->ts.startTime.tv_sec, sumstats->ts.startTime.tv_usec, sumstats->ts.nextTime.tv_sec, sumstats->ts.nextTime.tv_usec); +#endif + } + Mutex_Unlock(&myReport->GroupSumReport->reference.lock); + } +#ifdef HAVE_THREAD_DEBUG + thread_debug("Client(%d) report start/ipg=%ld.%ld next=%ld.%ld", mSettings->mSock, myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec, myReport->info.ts.nextTime.tv_sec, myReport->info.ts.nextTime.tv_usec); +#endif +} + +void Client::ConnectPeriodic () { + Timestamp end; + Timestamp next; + unsigned int amount_usec = 1000000; + if (isModeTime(mSettings)) { + amount_usec = (mSettings->mAmount * 10000); + end.add(amount_usec); // add in micro seconds + } + setNoConnectSync(mSettings); + int num_connects = -1; + if (!(mSettings->mInterval > 0)) { + if (mSettings->connectonly_count < 0) + num_connects = 10; + else if (mSettings->connectonly_count > 0) + num_connects = mSettings->connectonly_count; + } + + do { + if (my_connect(false)){ + int rc = close(mySocket); + WARN_errno(rc == SOCKET_ERROR, "client close"); + mySocket = INVALID_SOCKET; + mysock_init_done = false; + } + if (mSettings->mInterval > 0) { + now.setnow(); + do { + next.add(mSettings->mInterval); + } while (next.before(now)); + if (next.before(end)) { + struct timeval tmp; + tmp.tv_sec = next.getSecs(); + tmp.tv_usec = next.getUsecs(); + clock_usleep_abstime(&tmp); + } + } + if (num_connects > 0) { + --num_connects; + } + } while (num_connects && !sInterupted && (next.before(end) || (isModeTime(mSettings) && !(mSettings->mInterval > 0)))); +} +/* ------------------------------------------------------------------- + * Common traffic loop intializations + * ------------------------------------------------------------------- */ +void Client::InitTrafficLoop () { + // Enable socket write timeouts for responsive reporting + // Do this after the connection establishment + // and after Client::InitiateServer as during these + // default socket timeouts are preferred. + int sosndtimer = 0; + // sosndtimer units microseconds + // mInterval units are microseconds, mAmount units is 10 ms + // SetSocketOptionsSendTimeout takes microseconds + // Set the timeout value to 1/2 the interval (per -i) or 1/2 the -t value + if (isPeriodicBurst(mSettings) && (mSettings->mFPS > 0.0)) { + sosndtimer = static_cast(round(250000.0 / mSettings->mFPS)); + } else if (mSettings->mInterval > 0) { + sosndtimer = static_cast(round(0.5 * mSettings->mInterval)); + } else { + sosndtimer = static_cast(mSettings->mAmount * 5e3); + } + SetSocketOptionsSendTimeout(mSettings, sosndtimer); + // set the lower bounds delay based of the socket timeout timer + // units needs to be in nanoseconds + delay_lower_bounds = static_cast(sosndtimer) * -1e3; + + if (isIsochronous(mSettings)) + myReport->info.matchframeID = 1; + + // set the total bytes sent to zero + totLen = 0; + if (isModeTime(mSettings)) { + mEndTime.setnow(); + mEndTime.add(mSettings->mAmount / 100.0); + // now.setnow(); fprintf(stderr, "DEBUG: end time set to %ld.%ld now is %ld.%ld\n", mEndTime.getSecs(), mEndTime.getUsecs(), now.getSecs(), now.getUsecs()); + } + readAt = mSettings->mBuf; + lastPacketTime.set(myReport->info.ts.startTime.tv_sec, myReport->info.ts.startTime.tv_usec); + reportstruct->errwrite=WriteNoErr; + reportstruct->emptyreport=0; + reportstruct->packetLen = 0; + // Finally, post this thread's "job report" which the reporter thread + // will continuously process as long as there are packets flowing + // right now the ring is empty + if (!isReverse(mSettings) && !isSingleUDP(mSettings) && isDataReport(mSettings)) { + assert(myJob!=NULL); + assert(myReport!=NULL); + PostReport(myJob); + } + one_report = (!isUDP(mSettings) && !isEnhanced(mSettings) && (mSettings->mIntervalMode != kInterval_Time) \ + && !isIsochronous(mSettings) && !isPeriodicBurst(mSettings) && !isTripTime(mSettings) && !isReverse(mSettings)); +} + +/* ------------------------------------------------------------------- + * Run the appropriate send loop between + * + * 1) TCP without rate limiting + * 2) TCP with rate limiting + * 3) UDP + * 4) UDP isochronous w/vbr + * + * ------------------------------------------------------------------- */ +void Client::Run () { + // Initialize the report struct scratch pad + // Peform common traffic setup + InitTrafficLoop(); + /* + * UDP + */ + if (isUDP(mSettings)) { + if (isFileInput(mSettings)) { + // Due to the UDP timestamps etc, included + // reduce the read size by an amount + // equal to the header size + Extractor_reduceReadSize(sizeof(struct UDP_datagram), mSettings); + readAt += sizeof(struct UDP_datagram); + } + // Launch the approprate UDP traffic loop + if (isIsochronous(mSettings)) { + RunUDPIsochronous(); + } else { + RunUDP(); + } + } else { + // Launch the approprate TCP traffic loop + if (isBounceBack(mSettings)) { + RunBounceBackTCP(); + } else if (mSettings->mAppRate > 0) { + RunRateLimitedTCP(); + } else if (isNearCongest(mSettings)) { + RunNearCongestionTCP(); +#if HAVE_DECL_TCP_NOTSENT_LOWAT + } else if (isWritePrefetch(mSettings) && \ + !isIsochronous(mSettings) && !isPeriodicBurst(mSettings)) { + RunWriteEventsTCP(); +#endif + } else { + RunTCP(); + } + } +} + +/* + * TCP send loop + */ +void Client::RunTCP () { + int burst_remaining = 0; + uint32_t burst_id = 1; + int writelen = mSettings->mBufLen; + + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->write_time = 0; + while (InProgress()) { + reportstruct->writecnt = 0; + reportstruct->scheduled = false; + if (isModeAmount(mSettings)) { + writelen = ((mSettings->mAmount < static_cast(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen); + } + if (isburst && !(burst_remaining > 0)) { + if (isIsochronous(mSettings)) { + assert(mSettings->mMean); + burst_remaining = static_cast(lognormal(mSettings->mMean,mSettings->mVariance)) / (mSettings->mFPS * 8); + } else if (isPeriodicBurst(mSettings)){ + assert(mSettings->mBurstSize); + burst_remaining = mSettings->mBurstSize; + } else { + burst_remaining = mSettings->mBufLen; + } + // check for TCP minimum payload + if (burst_remaining < static_cast(sizeof(struct TCP_burst_payload))) + burst_remaining = static_cast(sizeof(struct TCP_burst_payload)); + // apply scheduling if needed + if (framecounter) { + burst_id = framecounter->wait_tick(&reportstruct->sched_err); + reportstruct->scheduled = true; + if (isPeriodicBurst(mSettings)) { + // low duty cycle traffic needs special event handling + now.setnow(); + myReport->info.ts.prevsendTime = reportstruct->packetTime; + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + if (!InProgress()) { + reportstruct->packetLen = 0; + reportstruct->emptyreport = 1; + // wait may have crossed the termination boundry + break; + } else { + //time interval crossings may have occurred during the wait + //post a null event to cause the report to flush the packet ring + PostNullEvent(); + } + } +#if HAVE_DECL_TCP_NOTSENT_LOWAT + if (isWritePrefetch(mSettings)) { + AwaitWriteSelectEventTCP(); + } +#endif + } + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + WriteTcpTxHdr(reportstruct, burst_remaining, burst_id++); + reportstruct->sentTime = reportstruct->packetTime; + myReport->info.ts.prevsendTime = reportstruct->packetTime; + writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen; + // perform write, full header must succeed + if (isTcpWriteTimes(mSettings)) { + write_start.setnow(); + } + reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen, &reportstruct->writecnt); + FAIL_errno(reportstruct->packetLen < (intmax_t) sizeof(struct TCP_burst_payload), "burst written", mSettings); + if (isTcpWriteTimes(mSettings)) { + now.setnow(); + reportstruct->write_time = now.subUsec(write_start); + } + } else { + // printf("pl=%ld\n",reportstruct->packetLen); + // perform write + if (isburst) + writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen; + if (isTcpWriteTimes(mSettings)) { + write_start.setnow(); + } +#if HAVE_DECL_TCP_NOTSENT_LOWAT + if (isWritePrefetch(mSettings)) { + AwaitWriteSelectEventTCP(); + } +#endif + reportstruct->packetLen = write(mySocket, mSettings->mBuf, writelen); + now.setnow(); + reportstruct->writecnt++; + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + if (isTcpWriteTimes(mSettings)) { + reportstruct->write_time = now.subUsec(write_start); + } + reportstruct->sentTime = reportstruct->packetTime; + } + if (reportstruct->packetLen <= 0) { + if (reportstruct->packetLen == 0) { + peerclose = true; + } else if (NONFATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrAccount; + } else if (FATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrFatal; + WARN_errno(1, "tcp write"); + break; + } else { + reportstruct->errwrite=WriteErrNoAccount; + } + reportstruct->packetLen = 0; + reportstruct->emptyreport = 1; + } else { + reportstruct->emptyreport = 0; + totLen += reportstruct->packetLen; + reportstruct->errwrite=WriteNoErr; + if (isburst) { + burst_remaining -= reportstruct->packetLen; + if (burst_remaining > 0) { + reportstruct->transit_ready = 0; + } else { + reportstruct->transit_ready = 1; + reportstruct->prevSentTime = myReport->info.ts.prevsendTime; + } + } + } + if (isModeAmount(mSettings) && !reportstruct->emptyreport) { + /* mAmount may be unsigned, so don't let it underflow! */ + if (mSettings->mAmount >= static_cast(reportstruct->packetLen)) { + mSettings->mAmount -= static_cast(reportstruct->packetLen); + } else { + mSettings->mAmount = 0; + } + } + if (!one_report) { + myReportPacket(); + } + } + FinishTrafficActions(); +} + +/* + * TCP send loop + */ +void Client::RunNearCongestionTCP () { + int burst_remaining = 0; + int burst_id = 1; + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + while (InProgress()) { + reportstruct->writecnt = 0; + if (isModeAmount(mSettings)) { + reportstruct->packetLen = ((mSettings->mAmount < static_cast(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen); + } else { + reportstruct->packetLen = mSettings->mBufLen; + } + if (!burst_remaining) { + burst_remaining = mSettings->mBufLen; + // mAmount check + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + WriteTcpTxHdr(reportstruct, burst_remaining, burst_id++); + reportstruct->sentTime = reportstruct->packetTime; + myReport->info.ts.prevsendTime = reportstruct->packetTime; + // perform write + int writelen = (mSettings->mBufLen > burst_remaining) ? burst_remaining : mSettings->mBufLen; + reportstruct->packetLen = write(mySocket, mSettings->mBuf, writelen); + reportstruct->writecnt++; + assert(reportstruct->packetLen >= (intmax_t) sizeof(struct TCP_burst_payload)); + goto ReportNow; + } + if (reportstruct->packetLen > burst_remaining) { + reportstruct->packetLen = burst_remaining; + } + // printf("pl=%ld\n",reportstruct->packetLen); + // perform write + reportstruct->packetLen = write(mySocket, mSettings->mBuf, reportstruct->packetLen); + now.setnow(); + reportstruct->writecnt++; + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->sentTime = reportstruct->packetTime; + ReportNow: + reportstruct->transit_ready = 0; + if (reportstruct->packetLen < 0) { + if (NONFATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrAccount; + } else if (FATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrFatal; + WARN_errno(1, "tcp write"); + break; + } else { + reportstruct->errwrite=WriteErrNoAccount; + } + reportstruct->packetLen = 0; + reportstruct->emptyreport = 1; + } else { + reportstruct->emptyreport = 0; + totLen += reportstruct->packetLen; + reportstruct->errwrite=WriteNoErr; + burst_remaining -= reportstruct->packetLen; + if (burst_remaining <= 0) { + reportstruct->transit_ready = 1; + } + } + if (isModeAmount(mSettings) && !reportstruct->emptyreport) { + /* mAmount may be unsigned, so don't let it underflow! */ + if (mSettings->mAmount >= static_cast(reportstruct->packetLen)) { + mSettings->mAmount -= static_cast(reportstruct->packetLen); + } else { + mSettings->mAmount = 0; + } + } + // apply placing after write burst completes + if (reportstruct->transit_ready) { + myReportPacket(); // this will set the tcpstats in the report struct + // pacing timer is weighted by the RTT (set to 1 when RTT is not supported) + int pacing_timer = 0; +#if HAVE_TCP_STATS + pacing_timer = static_cast(std::ceil(static_cast(reportstruct->tcpstats.rtt) * mSettings->rtt_nearcongest_weight_factor)); +#else + pacing_timer = static_cast(100 * mSettings->rtt_nearcongest_weight_factor); +#endif + if (pacing_timer) + delay_loop(pacing_timer); + } + } + FinishTrafficActions(); +} + +/* + * A version of the transmit loop that supports TCP rate limiting using a token bucket + */ +void Client::RunRateLimitedTCP () { + double tokens = 0; + Timestamp time1, time2; + int burst_size = mSettings->mBufLen; + int burst_remaining = 0; + int burst_id = 1; + + long var_rate = mSettings->mAppRate; + int fatalwrite_err = 0; + + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + while (InProgress() && !fatalwrite_err) { + reportstruct->writecnt = 0; + // Add tokens per the loop time + time2.setnow(); + if (isVaryLoad(mSettings)) { + static Timestamp time3; + if (time2.subSec(time3) >= VARYLOAD_PERIOD) { + var_rate = lognormal(mSettings->mAppRate,mSettings->mVariance); + time3 = time2; + if (var_rate < 0) + var_rate = 0; + } + } + tokens += time2.subSec(time1) * (var_rate / 8.0); + time1 = time2; + if (tokens >= 0.0) { + if (isModeAmount(mSettings)) { + reportstruct->packetLen = ((mSettings->mAmount < static_cast(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen); + } else { + reportstruct->packetLen = mSettings->mBufLen; + } + // perform write + int len = 0; + int len2 = 0; + if (burst_remaining == 0) { + burst_remaining = mSettings->mBufLen; + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->sentTime = reportstruct->packetTime; + WriteTcpTxHdr(reportstruct, burst_size, burst_id++); + // perform write + if (isTripTime(mSettings)) { + len = writen(mySocket, mSettings->mBuf, mSettings->mBufLen, &reportstruct->writecnt); + WARN(len != mSettings->mBufLen, "burst write failed"); + } else { + len = writen(mySocket, mSettings->mBuf, sizeof(struct TCP_burst_payload), &reportstruct->writecnt); + WARN(len != sizeof(struct TCP_burst_payload), "burst hdr write failed"); + } + if (len < 0) { + len = 0; + if (NONFATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrAccount; + } else if (FATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrFatal; + WARN_errno(1, "write"); + fatalwrite_err = 1; + break; + } else { + reportstruct->errwrite=WriteErrNoAccount; + } + } else { + burst_remaining -= len; + } + // thread_debug("***write burst header %d id=%d", burst_size, (burst_id - 1)); + } else if (reportstruct->packetLen > burst_remaining) { + reportstruct->packetLen = burst_remaining; + } + if (burst_remaining > 0) { + len2 = write(mySocket, mSettings->mBuf, reportstruct->packetLen); + reportstruct->writecnt++; + } + if (len2 < 0) { + len2 = 0; + if (NONFATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrAccount; + } else if (FATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrFatal; + WARN_errno(1, "write"); + fatalwrite_err = 1; + break; + } else { + reportstruct->errwrite=WriteErrNoAccount; + } + } else { + // Consume tokens per the transmit + tokens -= (len + len2); + totLen += (len + len2);; + reportstruct->errwrite=WriteNoErr; + } + time2.setnow(); + reportstruct->packetLen = len + len2; + reportstruct->packetTime.tv_sec = time2.getSecs(); + reportstruct->packetTime.tv_usec = time2.getUsecs(); + reportstruct->sentTime = reportstruct->packetTime; + if (isModeAmount(mSettings)) { + /* mAmount may be unsigned, so don't let it underflow! */ + if (mSettings->mAmount >= static_cast(reportstruct->packetLen)) { + mSettings->mAmount -= static_cast(reportstruct->packetLen); + } else { + mSettings->mAmount = 0; + } + } + if (!one_report) { + myReportPacket(); + } + } else { + // Use a 4 usec delay to fill tokens +#if HAVE_DECL_TCP_NOTSENT_LOWAT + if (isWritePrefetch(mSettings)) { + AwaitWriteSelectEventTCP(); + } else +#endif + { + delay_loop(4); + } + } + } + FinishTrafficActions(); +} + +#if HAVE_DECL_TCP_NOTSENT_LOWAT +inline bool Client::AwaitWriteSelectEventTCP (void) { + int rc; + struct timeval timeout; + fd_set writeset; + FD_ZERO(&writeset); + FD_SET(mySocket, &writeset); + if (isModeTime(mSettings)) { + Timestamp write_event_timeout(0,0); + if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) { + write_event_timeout.add((double) mSettings->mInterval / 1e6 * 2.0); + } else { + write_event_timeout.add((double) mSettings->mAmount / 1e2 * 4.0); + } + timeout.tv_sec = write_event_timeout.getSecs(); + timeout.tv_usec = write_event_timeout.getUsecs(); + } else { + timeout.tv_sec = 10; // longest is 10 seconds + timeout.tv_usec = 0; + } + + if ((rc = select(mySocket + 1, NULL, &writeset, NULL, &timeout)) <= 0) { + WARN_errno((rc < 0), "select"); +#ifdef HAVE_THREAD_DEBUG + if (rc == 0) + thread_debug("AwaitWrite timeout"); +#endif + return false; + } + return true; +} + +void Client::RunWriteEventsTCP () { + int burst_id = 0; + int writelen = mSettings->mBufLen; + Timestamp write_end; + + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + while (InProgress()) { + if (isModeAmount(mSettings)) { + writelen = ((mSettings->mAmount < static_cast(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen); + } + now.setnow(); + reportstruct->write_time = 0; + if (isTcpWriteTimes(mSettings)) { + write_start = now; + } + bool rc = AwaitWriteSelectEventTCP(); + reportstruct->emptyreport = (rc == false) ? 1 : 0; + if (rc) { + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + WriteTcpTxHdr(reportstruct, writelen, ++burst_id); + reportstruct->sentTime = reportstruct->packetTime; + reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen, &reportstruct->writecnt); + if (reportstruct->packetLen <= 0) { + WARN_errno((reportstruct->packetLen < 0), "event writen()"); + if (reportstruct->packetLen == 0) { + peerclose = true; + } + reportstruct->packetLen = 0; + reportstruct->emptyreport = 1; + } else if (isTcpWriteTimes(mSettings)) { + write_end.setnow(); + reportstruct->write_time = write_end.subUsec(write_start); + } + } + if (isModeAmount(mSettings) && !reportstruct->emptyreport) { + /* mAmount may be unsigned, so don't let it underflow! */ + if (mSettings->mAmount >= static_cast(reportstruct->packetLen)) { + mSettings->mAmount -= static_cast(reportstruct->packetLen); + } else { + mSettings->mAmount = 0; + } + } + if (!one_report) { + myReportPacket(); + } + } + FinishTrafficActions(); +} +#endif +void Client::RunBounceBackTCP () { + int burst_id = 0; + int writelen = mSettings->mBufLen; + memset(mSettings->mBuf, 0x5A, sizeof(struct bounceback_hdr)); + if (mSettings->mInterval && (mSettings->mIntervalMode == kInterval_Time)) { + int sotimer = static_cast(round(mSettings->mInterval / 2.0)); + SetSocketOptionsReceiveTimeout(mSettings, sotimer); + SetSocketOptionsSendTimeout(mSettings, sotimer); + } else if (isModeTime(mSettings)) { + int sotimer = static_cast(round(mSettings->mAmount * 10000) / 2); + SetSocketOptionsReceiveTimeout(mSettings, sotimer); + SetSocketOptionsSendTimeout(mSettings, sotimer); + } + if (isModeTime(mSettings)) { + uintmax_t end_usecs = (mSettings->mAmount * 10000); + if (int err = set_itimer(end_usecs)) { + FAIL_errno(err != 0, "setitimer", mSettings); + } + } + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + while (InProgress()) { + int n; + reportstruct->writecnt = 0; + bool isFirst; + if (framecounter) { + burst_id = framecounter->wait_tick(&reportstruct->sched_err); + PostNullEvent(true); // this will set the now timestamp + reportstruct->sentTime.tv_sec = now.getSecs(); + reportstruct->sentTime.tv_usec = now.getUsecs(); + isFirst = true; + } else { + burst_id++; + isFirst = false; + } + int bb_burst = (mSettings->mBounceBackBurst > 0) ? mSettings->mBounceBackBurst : 1; + while (bb_burst > 0) { + bb_burst--; + if (isFirst) { + isFirst = false; + } else { + now.setnow(); + reportstruct->sentTime.tv_sec = now.getSecs(); + reportstruct->sentTime.tv_usec = now.getUsecs(); + } + WriteTcpTxBBHdr(reportstruct, burst_id, 0); + reportstruct->packetLen = writen(mySocket, mSettings->mBuf, writelen, &reportstruct->writecnt); + if (reportstruct->packetLen <= 0) { + if ((reportstruct->packetLen < 0) && FATALTCPWRITERR(errno)) { + reportstruct->errwrite=WriteErrFatal; + WARN_errno(1, "tcp bounceback write fatal error"); + peerclose = true; + } else if (reportstruct->packetLen == 0) { + peerclose = true; + } else if (reportstruct->packetLen != writelen) { + WARN_errno(1, "tcp bounceback writen incomplete"); + peerclose = true; + } else { + // retry the write + bb_burst++; + continue; + } + break; + } + if (reportstruct->packetLen == writelen) { + reportstruct->emptyreport = 0; + totLen += reportstruct->packetLen; + reportstruct->errwrite=WriteNoErr; +#if HAVE_DECL_TCP_QUICKACK + if (isTcpQuickAck(mSettings)) { + int opt = 1; + Socklen_t len = sizeof(opt); + int rc = setsockopt(mySocket, IPPROTO_TCP, TCP_QUICKACK, + reinterpret_cast(&opt), len); + WARN_errno(rc == SOCKET_ERROR, "setsockopt TCP_QUICKACK"); + } +#endif + if ((n = recvn(mySocket, mSettings->mBuf, mSettings->mBounceBackBytes, 0)) == mSettings->mBounceBackBytes) { + struct bounceback_hdr *bbhdr = reinterpret_cast(mSettings->mBuf); + now.setnow(); + reportstruct->sentTimeRX.tv_sec = ntohl(bbhdr->bbserverRx_ts.sec); + reportstruct->sentTimeRX.tv_usec = ntohl(bbhdr->bbserverRx_ts.usec); + reportstruct->sentTimeTX.tv_sec = ntohl(bbhdr->bbserverTx_ts.sec); + reportstruct->sentTimeTX.tv_usec = ntohl(bbhdr->bbserverTx_ts.usec); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->packetLen += n; + reportstruct->emptyreport = 0; + myReportPacket(); + } else if (n == 0) { + peerclose = true; + } else if (n < 0) { + if (FATALTCPREADERR(errno)) { + WARN_errno(1, "bounceback read"); + peerclose = true; + n = 0; + } else { + WARN(1, "timeout: bounceback read"); + } + } + } else if ((reportstruct->packetLen < 0 ) && NONFATALTCPWRITERR(errno)) { + reportstruct->packetLen = 0; + reportstruct->emptyreport = 1; + reportstruct->errwrite=WriteErrNoAccount; + myReportPacket(); + } else { + reportstruct->errwrite=WriteErrFatal; + reportstruct->packetLen = -1; + FAIL_errno(1, "tcp bounce-back write", mSettings); + } + } + } + WriteTcpTxBBHdr(reportstruct, 0x0, 1); + disarm_itimer(); + FinishTrafficActions(); +} +/* + * UDP send loop + */ +double Client::get_delay_target () { + double delay_target; + if (isIPG(mSettings)) { + delay_target = mSettings->mBurstIPG * 1000000; // convert from milliseconds to nanoseconds + } else { + // compute delay target in units of nanoseconds + if (mSettings->mAppRateUnits == kRate_BW) { + // compute delay for bandwidth restriction, constrained to [0,1] seconds + delay_target = (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits) + / mSettings->mAppRate)); + } else { + delay_target = 1e9 / mSettings->mAppRate; + } + } + return delay_target; +} + +void Client::RunUDP () { + struct UDP_datagram* mBuf_UDP = reinterpret_cast(mSettings->mBuf); + int currLen; + + double delay_target = get_delay_target(); + double delay = 0; + double adjust = 0; + + // Set this to > 0 so first loop iteration will delay the IPG + currLen = 1; + double variance = mSettings->mVariance; + if (apply_first_udppkt_delay && (delay_target > 100000)) { + //the case when a UDP first packet went out in SendFirstPayload + delay_loop(static_cast(delay_target / 1000)); + } + + while (InProgress()) { + // Test case: drop 17 packets and send 2 out-of-order: + // sequence 51, 52, 70, 53, 54, 71, 72 + //switch(datagramID) { + // case 53: datagramID = 70; break; + // case 71: datagramID = 53; break; + // case 55: datagramID = 71; break; + // default: break; + //} + now.setnow(); + reportstruct->writecnt = 1; + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->sentTime = reportstruct->packetTime; + if (isVaryLoad(mSettings) && mSettings->mAppRateUnits == kRate_BW) { + static Timestamp time3; + if (now.subSec(time3) >= VARYLOAD_PERIOD) { + long var_rate = lognormal(mSettings->mAppRate,variance); + if (var_rate < 0) + var_rate = 0; + delay_target = (mSettings->mBufLen * ((kSecs_to_nsecs * kBytes_to_Bits) / var_rate)); + time3 = now; + } + } + // store datagram ID into buffer + WritePacketID(reportstruct->packetID); + mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec); + mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec); + + // Adjustment for the running delay + // o measure how long the last loop iteration took + // o calculate the delay adjust + // - If write succeeded, adjust = target IPG - the loop time + // - If write failed, adjust = the loop time + // o then adjust the overall running delay + // Note: adjust units are nanoseconds, + // packet timestamps are microseconds + if (currLen > 0) + adjust = delay_target + \ + (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime)); + else + adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime); + + lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec); + // Since linux nanosleep/busyloop can exceed delay + // there are two possible equilibriums + // 1) Try to perserve inter packet gap + // 2) Try to perserve requested transmit rate + // The latter seems preferred, hence use a running delay + // that spans the life of the thread and constantly adjust. + // A negative delay means the iperf app is behind. + delay += adjust; + // Don't let delay grow unbounded + if (delay < delay_lower_bounds) { + delay = delay_target; + } + + reportstruct->errwrite = WriteNoErr; + reportstruct->emptyreport = 0; + // perform write + if (isModeAmount(mSettings)) { + currLen = write(mySocket, mSettings->mBuf, (mSettings->mAmount < static_cast(mSettings->mBufLen)) ? mSettings->mAmount : mSettings->mBufLen); + } else { + currLen = write(mySocket, mSettings->mBuf, mSettings->mBufLen); + } + if (currLen < 0) { + reportstruct->packetID--; + if (FATALUDPWRITERR(errno)) { + reportstruct->errwrite = WriteErrFatal; + WARN_errno(1, "write"); + break; + } else { + reportstruct->errwrite = WriteErrAccount; + currLen = 0; + } + reportstruct->emptyreport = 1; + } + + if (isModeAmount(mSettings)) { + /* mAmount may be unsigned, so don't let it underflow! */ + if (mSettings->mAmount >= static_cast(currLen)) { + mSettings->mAmount -= static_cast(currLen); + } else { + mSettings->mAmount = 0; + } + } + + // report packets + reportstruct->packetLen = static_cast(currLen); + reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime; + myReportPacket(); + reportstruct->packetID++; + myReport->info.ts.prevpacketTime = reportstruct->packetTime; + // Insert delay here only if the running delay is greater than 100 usec, + // otherwise don't delay and immediately continue with the next tx. + if (delay >= 100000) { + // Convert from nanoseconds to microseconds + // and invoke the microsecond delay + delay_loop(static_cast(delay / 1000)); + } + } + FinishTrafficActions(); +} + +/* + * UDP isochronous send loop + */ +void Client::RunUDPIsochronous () { + struct UDP_datagram* mBuf_UDP = reinterpret_cast(mSettings->mBuf); + // skip over the UDP datagram (seq no, timestamp) to reach the isoch fields + struct client_udp_testhdr *udp_payload = reinterpret_cast(mSettings->mBuf); + + double delay_target = mSettings->mBurstIPG * 1000000; // convert from milliseconds to nanoseconds + double delay = 0; + double adjust = 0; + int currLen = 1; + int frameid=0; + Timestamp t1; + + // make sure the packet can carry the isoch payload + if (!framecounter) { + framecounter = new Isochronous::FrameCounter(mSettings->mFPS); + } + udp_payload->isoch.burstperiod = htonl(framecounter->period_us()); + + int initdone = 0; + int fatalwrite_err = 0; + while (InProgress() && !fatalwrite_err) { + int bytecnt = static_cast(lognormal(mSettings->mMean,mSettings->mVariance)) / (mSettings->mFPS * 8); + if (bytecnt < udp_payload_minimum) + bytecnt = udp_payload_minimum; + delay = 0; + + // printf("bits=%d\n", (int) (mSettings->mFPS * bytecnt * 8)); + udp_payload->isoch.burstsize = htonl(bytecnt); + udp_payload->isoch.prevframeid = htonl(frameid); + reportstruct->burstsize=bytecnt; + frameid = framecounter->wait_tick(&reportstruct->sched_err); + reportstruct->scheduled = true; + udp_payload->isoch.frameid = htonl(frameid); + lastPacketTime.setnow(); + if (!initdone) { + initdone = 1; + udp_payload->isoch.start_tv_sec = htonl(lastPacketTime.getSecs()); + udp_payload->isoch.start_tv_usec = htonl(lastPacketTime.getUsecs()); + } + while ((bytecnt > 0) && InProgress()) { + t1.setnow(); + reportstruct->packetTime.tv_sec = t1.getSecs(); + reportstruct->packetTime.tv_usec = t1.getUsecs(); + reportstruct->sentTime = reportstruct->packetTime; + mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec); + mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec); + WritePacketID(reportstruct->packetID); + + // Adjustment for the running delay + // o measure how long the last loop iteration took + // o calculate the delay adjust + // - If write succeeded, adjust = target IPG - the loop time + // - If write failed, adjust = the loop time + // o then adjust the overall running delay + // Note: adjust units are nanoseconds, + // packet timestamps are microseconds + if (currLen > 0) + adjust = delay_target + \ + (1000.0 * lastPacketTime.subUsec(reportstruct->packetTime)); + else + adjust = 1000.0 * lastPacketTime.subUsec(reportstruct->packetTime); + + lastPacketTime.set(reportstruct->packetTime.tv_sec, reportstruct->packetTime.tv_usec); + // Since linux nanosleep/busyloop can exceed delay + // there are two possible equilibriums + // 1) Try to perserve inter packet gap + // 2) Try to perserve requested transmit rate + // The latter seems preferred, hence use a running delay + // that spans the life of the thread and constantly adjust. + // A negative delay means the iperf app is behind. + delay += adjust; + // Don't let delay grow unbounded + // if (delay < delay_lower_bounds) { + // delay = delay_target; + // } + + reportstruct->errwrite = WriteNoErr; + reportstruct->emptyreport = 0; + reportstruct->writecnt = 1; + + // perform write + if (isModeAmount(mSettings) && (mSettings->mAmount < static_cast(mSettings->mBufLen))) { + udp_payload->isoch.remaining = htonl(mSettings->mAmount); + reportstruct->remaining=mSettings->mAmount; + currLen = write(mySocket, mSettings->mBuf, mSettings->mAmount); + } else { + udp_payload->isoch.remaining = htonl(bytecnt); + reportstruct->remaining=bytecnt; + currLen = write(mySocket, mSettings->mBuf, (bytecnt < mSettings->mBufLen) ? bytecnt : mSettings->mBufLen); + } + + if (currLen < 0) { + reportstruct->packetID--; + reportstruct->emptyreport = 1; + currLen = 0; + if (FATALUDPWRITERR(errno)) { + reportstruct->errwrite = WriteErrFatal; + WARN_errno(1, "write"); + fatalwrite_err = 1; + } else { + reportstruct->errwrite = WriteErrAccount; + } + } else { + bytecnt -= currLen; + if (!bytecnt) + reportstruct->transit_ready = 1; + else + reportstruct->transit_ready = 0; + // adjust bytecnt so last packet of burst is greater or equal to min packet + if ((bytecnt > 0) && (bytecnt < udp_payload_minimum)) { + bytecnt = udp_payload_minimum; + udp_payload->isoch.burstsize = htonl(bytecnt); + reportstruct->burstsize=bytecnt; + } + } + if (isModeAmount(mSettings)) { + /* mAmount may be unsigned, so don't let it underflow! */ + if (mSettings->mAmount >= static_cast(currLen)) { + mSettings->mAmount -= static_cast(currLen); + } else { + mSettings->mAmount = 0; + } + } + // report packets + + reportstruct->frameID=frameid; + reportstruct->packetLen = static_cast(currLen); + reportstruct->prevPacketTime = myReport->info.ts.prevpacketTime; + myReportPacket(); + reportstruct->scheduled = false; // reset to false after the report + reportstruct->packetID++; + myReport->info.ts.prevpacketTime = reportstruct->packetTime; + // Insert delay here only if the running delay is greater than 1 usec, + // otherwise don't delay and immediately continue with the next tx. + if (delay >= 1000) { + // Convert from nanoseconds to microseconds + // and invoke the microsecond delay + delay_loop(static_cast(delay / 1000)); + } + } + } + FinishTrafficActions(); +} +// end RunUDPIsoch + +inline void Client::WritePacketID (intmax_t packetID) { + struct UDP_datagram * mBuf_UDP = reinterpret_cast(mSettings->mBuf); + // store datagram ID into buffer +#ifdef HAVE_INT64_T + // Pack signed 64bit packetID into unsigned 32bit id1 + unsigned + // 32bit id2. A legacy server reading only id1 will still be able + // to reconstruct a valid signed packet ID number up to 2^31. + uint32_t id1, id2; + id1 = packetID & 0xFFFFFFFFLL; + id2 = (packetID & 0xFFFFFFFF00000000LL) >> 32; + + mBuf_UDP->id = htonl(id1); + mBuf_UDP->id2 = htonl(id2); + +#ifdef HAVE_PACKET_DEBUG + printf("id %" PRIdMAX " (0x%" PRIxMAX ") -> 0x%x, 0x%x\n", + packetID, packetID, id1, id2); +#endif +#else + mBuf_UDP->id = htonl((reportstruct->packetID)); +#endif +} + +inline void Client::WriteTcpTxHdr (struct ReportStruct *reportstruct, int burst_size, int burst_id) { + struct TCP_burst_payload * mBuf_burst = reinterpret_cast(mSettings->mBuf); + // store packet ID into buffer + reportstruct->packetID += burst_size; + mBuf_burst->start_tv_sec = htonl(myReport->info.ts.startTime.tv_sec); + mBuf_burst->start_tv_usec = htonl(myReport->info.ts.startTime.tv_usec); + +#ifdef HAVE_INT64_T + // Pack signed 64bit packetID into unsigned 32bit id1 + unsigned + // 32bit id2. A legacy server reading only id1 will still be able + // to reconstruct a valid signed packet ID number up to 2^31. + uint32_t id1, id2; + id1 = reportstruct->packetID & 0xFFFFFFFFLL; + id2 = (reportstruct->packetID & 0xFFFFFFFF00000000LL) >> 32; + + mBuf_burst->seqno_lower = htonl(id1); + mBuf_burst->seqno_upper = htonl(id2); + +#ifdef HAVE_PACKET_DEBUG + printf("id %" PRIdMAX " (0x%" PRIxMAX ") -> 0x%x, 0x%x\n", + reportstruct->packetID, reportstruct->packetID, id1, id2); +#endif +#else + mBuf_burst->seqno_lower = htonl((reportstruct->packetID)); + mBuf_burst->seqno_upper = htonl(0x0); +#endif + mBuf_burst->send_tt.write_tv_sec = htonl(reportstruct->packetTime.tv_sec); + mBuf_burst->send_tt.write_tv_usec = htonl(reportstruct->packetTime.tv_usec); + mBuf_burst->burst_id = htonl((uint32_t)burst_id); + mBuf_burst->burst_size = htonl((uint32_t)burst_size); + reportstruct->frameID=burst_id; + reportstruct->burstsize=burst_size; +// printf("**** Write tcp burst header size= %d id = %d\n", burst_size, burst_id); +} + +// See payloads.h +void Client::WriteTcpTxBBHdr (struct ReportStruct *reportstruct, uint32_t bbid, int final) { + struct bounceback_hdr * mBuf_bb = reinterpret_cast(mSettings->mBuf); + // store packet ID into buffer + uint32_t flags = HEADER_BOUNCEBACK; + uint32_t bbflags = 0x0; + mBuf_bb->flags = htonl(flags); + if (isTripTime(mSettings)) { + bbflags |= HEADER_BBCLOCKSYNCED; + } + if (mSettings->mTOS) { + bbflags |= HEADER_BBTOS; + mBuf_bb->tos = htons((mSettings->mTOS & 0xFF)); + } + if (isTcpQuickAck(mSettings)) { + bbflags |= HEADER_BBQUICKACK; + } + mBuf_bb->bbRunTime = 0x0; + if (final) { + bbflags |= HEADER_BBSTOP; + } + mBuf_bb->bbflags = htons(bbflags); + mBuf_bb->bbsize = htonl(mSettings->mBufLen); + mBuf_bb->bbid = htonl(bbid); + mBuf_bb->bbclientTx_ts.sec = htonl(reportstruct->packetTime.tv_sec); + mBuf_bb->bbclientTx_ts.usec = htonl(reportstruct->packetTime.tv_usec); + mBuf_bb->bbserverRx_ts.sec = -1; + mBuf_bb->bbserverRx_ts.usec = -1; + mBuf_bb->bbserverTx_ts.sec = -1; + mBuf_bb->bbserverTx_ts.usec = -1; + mBuf_bb->bbhold = htonl(mSettings->mBounceBackHold); +} + +inline bool Client::InProgress (void) { + // Read the next data block from + // the file if it's file input + if (isFileInput(mSettings)) { + Extractor_getNextDataBlock(readAt, mSettings); + return Extractor_canRead(mSettings) != 0; + } + // fprintf(stderr, "DEBUG: SI=%d PC=%d T=%d A=%d\n", sInterupted, peerclose, (isModeTime(mSettings) && mEndTime.before(reportstruct->packetTime)), (isModeAmount(mSettings) && (mSettings->mAmount <= 0))); + return !(sInterupted || peerclose || \ + (isModeTime(mSettings) && mEndTime.before(reportstruct->packetTime)) || + (isModeAmount(mSettings) && (mSettings->mAmount <= 0))); +} + +inline void Client::tcp_shutdown (void) { + if ((mySocket != INVALID_SOCKET) && isConnected()) { + int rc = shutdown(mySocket, SHUT_WR); +#ifdef HAVE_THREAD_DEBUG + thread_debug("Client calls shutdown() SHUTW_WR on tcp socket %d", mySocket); +#endif + WARN_errno(rc == SOCKET_ERROR, "shutdown"); + if (!rc && !isFullDuplex(mSettings)) + AwaitServerCloseEvent(); + } +} + +/* + * Common things to do to finish a traffic thread + * + * Notes on the negative packet count or seq no: + * A negative packet id is used to tell the server + * this UDP stream is terminating. The server will remove + * the sign. So a decrement will be seen as increments by + * the server (e.g, -1000, -1001, -1002 as 1000, 1001, 1002) + * If the retries weren't decrement here the server can get out + * of order packets per these retries actually being received + * by the server (e.g. -1000, -1000, -1000) + */ +void Client::FinishTrafficActions () { + disarm_itimer(); + // Shutdown the TCP socket's writes as the event for the server to end its traffic loop + if (!isUDP(mSettings)) { + tcp_shutdown(); + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + if (one_report) { + /* + * For TCP and if not doing interval or enhanced reporting (needed for write accounting), + * then report the entire transfer as one big packet + * + */ + reportstruct->packetLen = totLen; + } + } else { + // stop timing + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->sentTime = reportstruct->packetTime; + // send a final terminating datagram + // Don't count in the mTotalLen. The server counts this one, + // but didn't count our first datagram, so we're even now. + // The negative datagram ID signifies termination to the server. + WritePacketID(-reportstruct->packetID); + struct UDP_datagram * mBuf_UDP = reinterpret_cast(mSettings->mBuf); + mBuf_UDP->tv_sec = htonl(reportstruct->packetTime.tv_sec); + mBuf_UDP->tv_usec = htonl(reportstruct->packetTime.tv_usec); + int len = write(mySocket, mSettings->mBuf, mSettings->mBufLen); +#ifdef HAVE_THREAD_DEBUG + thread_debug("UDP client sent final packet per negative seqno %ld", -reportstruct->packetID); +#endif + if (len > 0) { + reportstruct->packetLen = len; + myReportPacket(); + } + reportstruct->packetLen = 0; + } + int do_close = EndJob(myJob, reportstruct); + if (isIsochronous(mSettings) && (myReport->info.schedule_error.cnt > 2)) { + fprintf(stderr,"%sIsoch schedule errors (mean/min/max/stdev) = %0.3f/%0.3f/%0.3f/%0.3f ms\n",mSettings->mTransferIDStr, \ + ((myReport->info.schedule_error.sum / myReport->info.schedule_error.cnt) * 1e-3), (myReport->info.schedule_error.min * 1e-3), \ + (myReport->info.schedule_error.max * 1e-3), (1e-3 * (sqrt(myReport->info.schedule_error.m2 / (myReport->info.schedule_error.cnt - 1))))); + } + if (isUDP(mSettings) && !isMulticast(mSettings) && !isNoUDPfin(mSettings)) { + /* + * For UDP, there is a final handshake between the client and the server, + * do that now (unless requested no to) + */ + AwaitServerFinPacket(); + } + if (do_close) { +#if HAVE_THREAD_DEBUG + thread_debug("client close sock=%d", mySocket); +#endif + int rc = close(mySocket); + WARN_errno(rc == SOCKET_ERROR, "client close"); + } + Iperf_remove_host(mSettings); + FreeReport(myJob); + if (framecounter) + DELETE_PTR(framecounter); +} + +/* ------------------------------------------------------------------- + * Await for the server's fin packet which also has the server + * stats to displayed on the client. Attempt to re-transmit + * until the fin is received + * ------------------------------------------------------------------- */ +#define RETRYTIMER 10000 //units of us +#define RETRYCOUNT (2 * 1000000 / RETRYTIMER) // 2 seconds worth of retries +void Client::AwaitServerFinPacket () { + int rc; + fd_set readSet; + struct timeval timeout; + int ack_success = 0; + int count = RETRYCOUNT; + while (--count >= 0) { + // wait until the socket is readable, or our timeout expires + FD_ZERO(&readSet); + FD_SET(mySocket, &readSet); + timeout.tv_sec = 0; + timeout.tv_usec = RETRYTIMER; + rc = select(mySocket+1, &readSet, NULL, NULL, &timeout); + FAIL_errno(rc == SOCKET_ERROR, "select", mSettings); + // rc= zero means select's read timed out + if (rc == 0) { + // try to trigger another FIN by resending a negative seq no + WritePacketID(-(++reportstruct->packetID)); + // write data + rc = write(mySocket, mSettings->mBuf, mSettings->mBufLen); + WARN_errno(rc < 0, "write-fin"); +#ifdef HAVE_THREAD_DEBUG + thread_debug("UDP client retransmit final packet per negative seqno %ld", -reportstruct->packetID); +#endif + } else { + // socket ready to read, this packet size + // is set by the server. Assume it's large enough + // to contain the final server packet + rc = read(mySocket, mSettings->mBuf, MAXUDPBUF); + + // dump any 2.0.13 client acks sent at the start of traffic + if (rc == sizeof(client_hdr_ack)) { + struct client_hdr_ack *ack = reinterpret_cast(mSettings->mBuf); + if (ntohl(ack->typelen.type) == CLIENTHDRACK) { + // printf("**** dump stale ack \n"); + continue; + } + } + + WARN_errno(rc < 0, "read"); + if (rc > 0) { + ack_success = 1; +#ifdef HAVE_THREAD_DEBUG + thread_debug("UDP client received server relay report ack (%d)", -reportstruct->packetID); +#endif + if (mSettings->mReportMode != kReport_CSV) { + PostReport(InitServerRelayUDPReport(mSettings, reinterpret_cast(reinterpret_cast(mSettings->mBuf) + 1))); + } + break; + } + } + } + if ((!ack_success) && (mSettings->mReportMode != kReport_CSV)) + fprintf(stderr, warn_no_ack, mySocket, (isModeTime(mSettings) ? 10 : 1)); +} + + +void Client::PostNullEvent (void) { + assert(myReport!=NULL); + // push a nonevent into the packet ring + // this will cause the reporter to process + // up to this event + memset(reportstruct, 0, sizeof(struct ReportStruct)); + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->emptyreport=1; + myReportPacket(); +} + +void Client::PostNullEvent (bool isFirst) { + assert(myReport!=NULL); + // push a nonevent into the packet ring + // this will cause the reporter to process + // up to this event + memset(reportstruct, 0, sizeof(struct ReportStruct)); + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + reportstruct->emptyreport=1; + reportstruct->scheduled = isFirst; + myReportPacket(); +} + +// The client end timer is based upon the final fin, fin-ack w/the server +// A way to detect this is to hang a recv and wait for the zero byte +// return indicating the socket is closed for recv per the server +// closing it's socket +#define MINAWAITCLOSEUSECS 2000000 +void Client::AwaitServerCloseEvent () { + // the await detection can take awhile so post a non event ahead of it + PostNullEvent(); + unsigned int amount_usec = \ + (isModeTime(mSettings) ? static_cast(mSettings->mAmount * 10000) : MINAWAITCLOSEUSECS); + if (amount_usec < MINAWAITCLOSEUSECS) + amount_usec = MINAWAITCLOSEUSECS; + SetSocketOptionsReceiveTimeout(mSettings, amount_usec); + int rc; + while ((rc = recv(mySocket, mSettings->mBuf, mSettings->mBufLen, 0) > 0)) {}; + if (rc < 0) + WARN_errno(1, "client await server close"); + + if (rc==0) { + connected = false; +#ifdef HAVE_THREAD_DEBUG + thread_debug("Client detected server close %d", mySocket); +#endif + } +} + +int Client::SendFirstPayload () { + int pktlen = 0; + if (!isConnectOnly(mSettings)) { + if (myReport && !TimeZero(myReport->info.ts.startTime) && !(mSettings->mMode == kTest_TradeOff)) { + reportstruct->packetTime = myReport->info.ts.startTime; + } else { + now.setnow(); + reportstruct->packetTime.tv_sec = now.getSecs(); + reportstruct->packetTime.tv_usec = now.getUsecs(); + } + pattern(mSettings->mBuf, mSettings->mBufLen); + if (isTxStartTime(mSettings)) { + pktlen = Settings_GenerateClientHdr(mSettings, (void *) mSettings->mBuf, mSettings->txstart_epoch); + } else { + pktlen = Settings_GenerateClientHdr(mSettings, (void *) mSettings->mBuf, reportstruct->packetTime); + } + if (pktlen > 0) { + if (isUDP(mSettings)) { + struct client_udp_testhdr *tmphdr = reinterpret_cast(mSettings->mBuf); + WritePacketID(reportstruct->packetID); + tmphdr->seqno_ts.tv_sec = htonl(reportstruct->packetTime.tv_sec); + tmphdr->seqno_ts.tv_usec = htonl(reportstruct->packetTime.tv_usec); + udp_payload_minimum = pktlen; +#if HAVE_DECL_MSG_DONTWAIT + pktlen = send(mySocket, mSettings->mBuf, (pktlen > mSettings->mBufLen) ? pktlen : mSettings->mBufLen, MSG_DONTWAIT); +#else + pktlen = send(mySocket, mSettings->mBuf, (pktlen > mSettings->mBufLen) ? pktlen : mSettings->mBufLen, 0); +#endif + apply_first_udppkt_delay = true; + } else { +#if HAVE_DECL_TCP_NODELAY + if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) { + int optflag=1; + int rc; + // Disable Nagle to reduce latency of this intial message + if ((rc = setsockopt(mSettings->mSock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast(&optflag), sizeof(int))) < 0) { + WARN_errno(rc < 0, "tcpnodelay"); + } + } +#endif +#if HAVE_DECL_MSG_DONTWAIT + pktlen = send(mySocket, mSettings->mBuf, pktlen, MSG_DONTWAIT); +#else + pktlen = send(mySocket, mSettings->mBuf, pktlen, 0); +#endif + if (isPeerVerDetect(mSettings) && !isServerReverse(mSettings)) { + PeerXchange(); + } +#if HAVE_DECL_TCP_NODELAY + if (!isNoDelay(mSettings) && isPeerVerDetect(mSettings) && isTripTime(mSettings)) { + int optflag=0; + int rc; + // Disable Nagle to reduce latency of this intial message + if ((rc = setsockopt(mSettings->mSock, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast(&optflag), sizeof(int))) < 0) { + WARN_errno(rc < 0, "tcpnodelay"); + } + } +#endif + } + WARN_errno(pktlen < 0, "send_hdr"); + } + } + return pktlen; +} + +void Client::PeerXchange () { + int n; + client_hdr_ack ack; + /* + * Hang read and see if this is a header ack message + */ + int readlen = isTripTime(mSettings) ? sizeof(struct client_hdr_ack) : (sizeof(struct client_hdr_ack) - sizeof(struct client_hdr_ack_ts)); + if ((n = recvn(mySocket, reinterpret_cast(&ack), readlen, 0)) == readlen) { + if (ntohl(ack.typelen.type) == CLIENTHDRACK) { + mSettings->peer_version_u = ntohl(ack.version_u); + mSettings->peer_version_l = ntohl(ack.version_l); + if (isTripTime(mSettings)) { + Timestamp now; + Timestamp senttx(ntohl(ack.ts.sent_tv_sec), ntohl(ack.ts.sent_tv_usec)); + Timestamp sentrx(ntohl(ack.ts.sentrx_tv_sec), ntohl(ack.ts.sentrx_tv_usec)); + Timestamp acktx(ntohl(ack.ts.ack_tv_sec), ntohl(ack.ts.ack_tv_usec)); + Timestamp ackrx(now.getSecs(), now.getUsecs()); + double str = (sentrx.get() - senttx.get()) * 1e3; + double atr = (now.get() - acktx.get()) * 1e3; + double rtt = str + atr; + double halfrtt = rtt / 2.0; + fprintf(stderr,"%sClock sync check (ms): RTT/Half=(%0.3f/%0.3f) OWD-send/ack/asym=(%0.3f/%0.3f/%0.3f)\n",mSettings->mTransferIDStr, rtt, halfrtt, str, atr, (str-atr)); + } + } + } else { + WARN_errno(1, "recvack"); + } +} + +/* + * BarrierClient allows for multiple stream clients to be syncronized + */ +int Client::BarrierClient (struct BarrierMutex *barrier) { + int last = 0; +#ifdef HAVE_THREAD + assert(barrier != NULL); + Condition_Lock(barrier->await); + if (--barrier->count <= 0) { + // store the barrier release timer +#ifdef HAVE_CLOCK_GETTIME + struct timespec t1; + clock_gettime(CLOCK_REALTIME, &t1); + barrier->release_time.tv_sec = t1.tv_sec; + barrier->release_time.tv_usec = t1.tv_nsec / 1000; +#else + gettimeofday(&barrier->release_time, NULL); +#endif + last = 1; + // last one wake's up everyone else + Condition_Broadcast(&barrier->await); +#ifdef HAVE_THREAD_DEBUG + thread_debug("Barrier BROADCAST on condition %p", (void *)&barrier->await); +#endif + } else { +#ifdef HAVE_THREAD_DEBUG + thread_debug("Barrier WAIT on condition %p count=%d", (void *)&barrier->await, barrier->count); +#endif + Condition_Wait(&barrier->await); + } + Condition_Unlock(barrier->await); +#ifdef HAVE_THREAD_DEBUG + thread_debug("Barrier EXIT on condition %p", (void *)&barrier->await); +#endif +#else + last = 1; +#endif // HAVE_THREAD + return last; +} -- cgit v1.2.3