summaryrefslogtreecommitdiffstats
path: root/src/Reports.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/Reports.c')
-rw-r--r--src/Reports.c1091
1 files changed, 1091 insertions, 0 deletions
diff --git a/src/Reports.c b/src/Reports.c
new file mode 100644
index 0000000..008f7f4
--- /dev/null
+++ b/src/Reports.c
@@ -0,0 +1,1091 @@
+/*---------------------------------------------------------------
+ * Copyright (c) 1999,2000,2001,2002,2003
+ * The Board of Trustees of the University of Illinois
+ * All Rights Reserved.
+ *---------------------------------------------------------------
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software (Iperf) and associated
+ * documentation files (the "Software"), to deal in the Software
+ * without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute,
+ * sublicense, and/or sell copies of the Software, and to permit
+ * persons to whom the Software is furnished to do
+ * so, subject to the following conditions:
+ *
+ *
+ * Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and
+ * the following disclaimers.
+ *
+ *
+ * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimers in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ *
+ * Neither the names of the University of Illinois, NCSA,
+ * nor the names of its contributors may be used to endorse
+ * or promote products derived from this Software without
+ * specific prior written permission.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTRIBUTORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ * ________________________________________________________________
+ * National Laboratory for Applied Network Research
+ * National Center for Supercomputing Applications
+ * University of Illinois at Urbana-Champaign
+ * http://www.ncsa.uiuc.edu
+ * ________________________________________________________________
+ *
+ * Reporter.c
+ * rewritten by Robert McMahon
+ * -------------------------------------------------------------------
+ * Handle instantiation and deletion of reports, including sum reports,
+ * in a thread safe way
+ * ------------------------------------------------------------------- */
+
+#include "headers.h"
+#include <math.h>
+#include "Settings.hpp"
+#include "PerfSocket.hpp"
+#include "Reporter.h"
+#include "Locale.h"
+#include "active_hosts.h"
+#include "payloads.h"
+static int transferid_counter = 0;
+
+static inline int my_str_copy(char **dst, char *src) {
+ int cnt = 0;
+ if (src) {
+ *dst = (char *) calloc(strlen(src) + 1, sizeof(char));
+ if (*dst == NULL) {
+ fprintf(stderr, "Out of Memory!!\n");
+ exit(1);
+ }
+ cnt = strlen(src) + 1;
+ strcpy((*dst), src);
+ } else {
+ *dst = NULL;
+ }
+ return cnt;
+}
+
+// These are the thread settings that are shared among report types
+// Make a copy vs referencing the thread setting object. This will
+// better encpasulate report handling.
+static void common_copy (struct ReportCommon **common, struct thread_Settings *inSettings) {
+ // Do deep copies from settings
+ *common = (struct ReportCommon *) calloc(1, sizeof(struct ReportCommon));
+ my_str_copy(&(*common)->Host, inSettings->mHost);
+ my_str_copy(&(*common)->HideHost, inSettings->mHideHost);
+ my_str_copy(&(*common)->Localhost, inSettings->mLocalhost);
+ my_str_copy(&(*common)->Ifrname, inSettings->mIfrname);
+ my_str_copy(&(*common)->Ifrnametx, inSettings->mIfrnametx);
+ my_str_copy(&(*common)->SSMMulticastStr, inSettings->mSSMMulticastStr);
+ my_str_copy(&(*common)->Congestion, inSettings->mCongestion);
+ my_str_copy(&(*common)->transferIDStr, inSettings->mTransferIDStr);
+ my_str_copy(&(*common)->PermitKey, inSettings->mPermitKey);
+
+ // copy some relevant settings
+ (*common)->flags = inSettings->flags;
+ (*common)->flags_extend = inSettings->flags_extend;
+ (*common)->flags_extend2 = inSettings->flags_extend2;
+ (*common)->ThreadMode = inSettings->mThreadMode;
+ (*common)->ReportMode = inSettings->mReportMode;
+ (*common)->KeyCheck = inSettings->mKeyCheck;
+ (*common)->Format = inSettings->mFormat;
+ (*common)->TTL = inSettings->mTTL;
+ // copy some traffic related settings
+ (*common)->BufLen = inSettings->mBufLen;
+ (*common)->MSS = inSettings->mMSS;
+ (*common)->TCPWin = inSettings->mTCPWin;
+ (*common)->FQPacingRate = inSettings->mFQPacingRate;
+ (*common)->Port = inSettings->mPort;
+ (*common)->PortLast = inSettings->mPortLast;
+ (*common)->BindPort = inSettings->mBindPort;
+ (*common)->ListenPort = inSettings->mListenPort;
+ (*common)->AppRate = inSettings->mAppRate;
+ (*common)->BurstSize = inSettings->mBurstSize;
+ (*common)->AppRateUnits = inSettings->mAppRateUnits;
+ (*common)->socket = inSettings->mSock;
+ (*common)->transferID = inSettings->mTransferID;
+ (*common)->threads = inSettings->mThreads;
+ (*common)->winsize_requested = inSettings->mTCPWin;
+#if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
+ (*common)->socketdrop = inSettings->mSockDrop;
+#endif
+ (*common)->peer = inSettings->peer;
+ (*common)->size_peer = inSettings->size_peer;
+ (*common)->jitter_binwidth = inSettings->jitter_binwidth;
+ (*common)->local = inSettings->local;
+ (*common)->size_local = inSettings->size_local;
+ (*common)->HistBins =inSettings->mHistBins;
+ (*common)->HistBinsize =inSettings->mHistBinsize;
+ (*common)->HistUnits =inSettings->mHistUnits;
+ (*common)->pktIPG =inSettings->mBurstIPG;
+ (*common)->rtt_weight = inSettings->rtt_nearcongest_weight_factor;
+ (*common)->ListenerTimeout =inSettings->mListenerTimeout;
+ (*common)->FPS = inSettings->mFPS;
+ (*common)->TOS = inSettings->mTOS;
+ (*common)->RTOS = inSettings->mRTOS;
+ (*common)->bbsize = inSettings->mBounceBackBytes;
+ (*common)->bbhold = inSettings->mBounceBackHold;
+ (*common)->bbcount = inSettings->mBounceBackBurst;
+#if HAVE_DECL_TCP_WINDOW_CLAMP
+ (*common)->ClampSize = inSettings->mClampSize;
+#endif
+#if HAVE_DECL_TCP_NOTSENT_LOWAT
+ (*common)->WritePrefetch = inSettings->mWritePrefetch;
+#endif
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Alloc common rpt/com/size %p/%p/%d", (void *) common, (void *)(*common), sizeof(struct ReportCommon));
+#endif
+}
+
+static void free_common_copy (struct ReportCommon *common) {
+ assert(common != NULL);
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Free common=%p", (void *)common);
+#endif
+ // Free deep copies
+ if (common->Host)
+ free(common->Host);
+ if (common->HideHost)
+ free(common->HideHost);
+ if (common->Localhost)
+ free(common->Localhost);
+ if (common->Ifrname)
+ free(common->Ifrname);
+ if (common->Ifrnametx)
+ free(common->Ifrnametx);
+ if (common->SSMMulticastStr)
+ free(common->SSMMulticastStr);
+ if (common->Congestion)
+ free(common->Congestion);
+ if (common->transferIDStr)
+ free(common->transferIDStr);
+ if (common->PermitKey)
+ free(common->PermitKey);
+ free(common);
+}
+
+// This will set the transfer id and id string
+// on the setting object. If the current id is zero
+// this will get the next one. Otherwise it will use
+// the value.
+void setTransferID (struct thread_Settings *inSettings, int role_reversal) {
+ if (!inSettings->mTransferIDStr) {
+ if (!inSettings->mTransferID) {
+ Mutex_Lock(&transferid_mutex);
+ inSettings->mTransferID = ++transferid_counter;
+ Mutex_Unlock(&transferid_mutex);
+ }
+ int len = 0;
+ if (role_reversal) {
+#ifdef HAVE_ROLE_REVERSAL_ID
+ if (isPermitKey(inSettings) && (inSettings->mPermitKey[0] != '\0')) {
+ len = snprintf(NULL, 0, "[%s(*%d)] ", \
+ inSettings->mPermitKey, inSettings->mTransferID);
+ inSettings->mTransferIDStr = (char *) calloc(len + 1, sizeof(char));
+ len = sprintf(inSettings->mTransferIDStr, "[%s(*%d)] ", \
+ inSettings->mPermitKey, inSettings->mTransferID);
+ } else if (inSettings->mTransferID < 10) {
+ len = snprintf(NULL, 0, "[ *%d] ", inSettings->mTransferID);
+ inSettings->mTransferIDStr = (char *) calloc(len + 1, sizeof(char));
+ len = sprintf(inSettings->mTransferIDStr, "[ *%d] ", inSettings->mTransferID);
+ } else {
+ len = snprintf(NULL, 0, "[*%d] ", inSettings->mTransferID);
+ inSettings->mTransferIDStr = (char *) calloc(len + 1, sizeof(char));
+ len = sprintf(inSettings->mTransferIDStr, "[*%d] ", inSettings->mTransferID);
+ }
+#endif
+ } else if (isPermitKey(inSettings) && (inSettings->mPermitKey[0] != '\0')) {
+ len = snprintf(NULL, 0, "[%s(%d)] ", \
+ inSettings->mPermitKey, inSettings->mTransferID);
+ inSettings->mTransferIDStr = (char *) calloc(len + 1, sizeof(char));
+ len = sprintf(inSettings->mTransferIDStr, "[%s(%d)] ", \
+ inSettings->mPermitKey, inSettings->mTransferID);
+ } else {
+ len = snprintf(NULL, 0, "[%3d] ", inSettings->mTransferID);
+ inSettings->mTransferIDStr = (char *) calloc(len+1, sizeof(char));
+ len = sprintf(inSettings->mTransferIDStr, "[%3d] ", inSettings->mTransferID);
+ }
+ }
+}
+
+void SetFullDuplexHandlers (struct thread_Settings *inSettings, struct SumReport* sumreport) {
+ if (isUDP(inSettings)) {
+ sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_fullduplex_udp;
+ sumreport->info.output_handler = ((inSettings->mReportMode == kReport_CSV) ? NULL : \
+ (isSumOnly(inSettings) ? NULL : \
+ (isEnhanced(inSettings) ? udp_output_fullduplex_enhanced : udp_output_fullduplex)));
+ } else {
+ sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_fullduplex_tcp;
+ sumreport->info.output_handler = ((inSettings->mReportMode == kReport_CSV) ? NULL : \
+ (isSumOnly(inSettings) ? NULL : \
+ (isEnhanced(inSettings) ? tcp_output_fullduplex_enhanced : tcp_output_fullduplex)));
+ }
+}
+
+void SetSumHandlers (struct thread_Settings *inSettings, struct SumReport* sumreport) {
+ switch (inSettings->mThreadMode) {
+ case kMode_Server :
+ if (isUDP(inSettings)) {
+ sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_sum_server_udp;
+ if (inSettings->mReportMode == kReport_CSV) {
+ if (isEnhanced(inSettings))
+ sumreport->info.output_handler = udp_output_enhanced_csv;
+ else
+ sumreport->info.output_handler = udp_output_basic_csv;
+ } else {
+ if (isTripTime(inSettings)) {
+ sumreport->info.output_handler = udp_output_sumcnt_read_triptime;
+ } else if (isSumOnly(inSettings)) {
+ sumreport->info.output_handler = udp_output_sumcnt_enhanced;
+ } else if (isFullDuplex(inSettings)) {
+ sumreport->info.output_handler = udp_output_fullduplex_sum;
+ } else {
+ sumreport->info.output_handler = (isEnhanced(inSettings) ? udp_output_sum_read_enhanced : udp_output_sum_read);
+ }
+ }
+ } else {
+ sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_sum_server_tcp;
+ if (inSettings->mReportMode == kReport_CSV) {
+ if (isEnhanced(inSettings))
+ sumreport->info.output_handler = tcp_output_read_enhanced_csv;
+ else
+ sumreport->info.output_handler = tcp_output_basic_csv;
+ } else {
+ if (isTripTime(inSettings)) {
+ sumreport->info.output_handler = tcp_output_sumcnt_read_triptime;
+ } else if (isSumOnly(inSettings)) {
+ sumreport->info.output_handler = (isEnhanced(inSettings) ? tcp_output_sumcnt_read_enhanced : tcp_output_sumcnt_read);
+ } else if (isFullDuplex(inSettings)) {
+ sumreport->info.output_handler = tcp_output_sum_read;
+ } else {
+ sumreport->info.output_handler = (isEnhanced(inSettings) ? tcp_output_sum_read_enhanced : tcp_output_sum_read);
+ }
+ }
+ }
+ break;
+ case kMode_Client :
+ if (isUDP(inSettings)) {
+ sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_sum_client_udp;
+ if (inSettings->mReportMode == kReport_CSV) {
+ if (isEnhanced(inSettings))
+ sumreport->info.output_handler = udp_output_enhanced_csv;
+ else
+ sumreport->info.output_handler = udp_output_basic_csv;
+ } else {
+ if (isSumOnly(inSettings)) {
+ sumreport->info.output_handler = ((isEnhanced(inSettings) && !isFullDuplex(inSettings)) ? \
+ udp_output_sumcnt_write_enhanced : udp_output_sumcnt);
+ } else if (isFullDuplex(inSettings)) {
+ sumreport->info.output_handler = udp_output_fullduplex_sum;
+ } else {
+ sumreport->info.output_handler = (isEnhanced(inSettings) ? udp_output_sum_write_enhanced : udp_output_sum_write);
+ }
+ }
+ } else {
+ sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_sum_client_tcp;
+ if (inSettings->mReportMode == kReport_CSV) {
+ if (isEnhanced(inSettings))
+ sumreport->info.output_handler = tcp_output_write_enhanced_csv;
+ else
+ sumreport->info.output_handler = tcp_output_basic_csv;
+ } else {
+ if (isSumOnly(inSettings)) {
+ sumreport->info.output_handler = (isEnhanced(inSettings) ? tcp_output_sumcnt_write_enhanced : tcp_output_sumcnt_write);
+ } else if (isFullDuplex(inSettings)) {
+ sumreport->info.output_handler = tcp_output_fullduplex_sum;
+ } else {
+ sumreport->info.output_handler = (isEnhanced(inSettings) ? tcp_output_sum_write_enhanced : tcp_output_sum_write);
+ }
+ }
+ }
+ break;
+ default:
+ FAIL(1, "SetSumReport", inSettings);
+ }
+}
+
+struct SumReport* InitSumReport(struct thread_Settings *inSettings, int inID, int fullduplex_report) {
+ struct SumReport *sumreport = (struct SumReport *) calloc(1, sizeof(struct SumReport));
+ if (sumreport == NULL) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+ sumreport->reference.count = 0;
+ sumreport->reference.maxcount = 0;
+ Mutex_Initialize(&sumreport->reference.lock);
+ sumreport->threads = 0;
+ common_copy(&sumreport->info.common, inSettings);
+ // sumreport->info.common->transferID = inID; // this is now set in the active code
+ sumreport->info.threadcnt = 0;
+ sumreport->info.isMaskOutput = false;
+ if (inSettings->mReportMode == kReport_CSV) {
+ format_ips_port_string(&sumreport->info, 1);
+ }
+
+ // Only initialize the interval time here
+ // The startTime and nextTime for summing reports will be set by
+ // the reporter thread in realtime
+ if ((inSettings->mInterval) && (inSettings->mIntervalMode == kInterval_Time)) {
+ sumreport->info.ts.intervalTime.tv_sec = (long) (inSettings->mInterval / rMillion);
+ sumreport->info.ts.intervalTime.tv_usec = (long) (inSettings->mInterval % rMillion);
+ sumreport->info.ts.significant_partial = ((double) inSettings->mInterval * PARTIALPERCENT / rMillion) ;
+ }
+ // Note that for UDP the client flag settings have not been read (and set) so only use server side flags in tests
+ if (isEnhanced(inSettings) && (inSettings->mThreadMode == kMode_Server) && !fullduplex_report) {
+ if (isHistogram(inSettings)) {
+ if (isUDP(inSettings)) {
+ char name[] = "SUMT8";
+ sumreport->info.latency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0,\
+ pow(10,inSettings->mHistUnits), \
+ inSettings->mHistci_lower, inSettings->mHistci_upper, sumreport->info.common->transferID, name);
+ } else {
+ char name[] = "SUMF8";
+ sumreport->info.framelatency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0, \
+ pow(10,inSettings->mHistUnits), inSettings->mHistci_lower, \
+ inSettings->mHistci_upper, sumreport->info.common->transferID, name);
+ }
+ }
+ if (isJitterHistogram(inSettings) && isUDP(inSettings)) {
+ char name[] = "SUMJ8";
+ sumreport->info.jitter_histogram = histogram_init(JITTER_BINCNT,inSettings->jitter_binwidth,0,JITTER_UNITS, \
+ JITTER_LCI, JITTER_UCI, sumreport->info.common->transferID, name);
+ }
+ }
+ if (fullduplex_report) {
+ SetFullDuplexHandlers(inSettings, sumreport);
+ if (!isServerReverse(inSettings)) {
+ sumreport->fullduplex_barrier.count = 0;
+ Condition_Initialize(&sumreport->fullduplex_barrier.await);
+ sumreport->fullduplex_barrier.timeout = ((isModeTime(inSettings) && isUDP(inSettings)) ? \
+ ((int)(inSettings->mAmount / 100) + 1) : MINBARRIERTIMEOUT);
+ if (sumreport->fullduplex_barrier.timeout < MINBARRIERTIMEOUT)
+ sumreport->fullduplex_barrier.timeout = MINBARRIERTIMEOUT;
+ } else {
+ sumreport->info.ts.startTime = inSettings->accept_time;
+ sumreport->info.ts.nextTime = sumreport->info.ts.startTime;
+ TimeAdd(sumreport->info.ts.nextTime, sumreport->info.ts.intervalTime);
+ }
+ } else {
+ SetSumHandlers(inSettings, sumreport);
+ }
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Init sum report %p id=%d", (void *)sumreport, inID);
+#endif
+ return sumreport;
+}
+
+struct ConnectionInfo * InitConnectOnlyReport (struct thread_Settings *thread) {
+ assert(thread != NULL);
+ // this connection report used only by report for accumulate stats
+ struct ConnectionInfo *creport = (struct ConnectionInfo *) calloc(1, sizeof(struct ConnectionInfo));
+ if (!creport) {
+ FAIL(1, "Out of Memory!!\n", thread);
+ }
+ common_copy(&creport->common, thread);
+ creport->connect_times.min = FLT_MAX;
+ creport->connect_times.max = FLT_MIN;
+ creport->connect_times.vd = 0;
+ creport->connect_times.m2 = 0;
+ creport->connect_times.mean = 0;
+ creport->txholdbacktime = thread->txholdback_timer;
+ return creport;
+}
+
+void FreeSumReport (struct SumReport *sumreport) {
+ assert(sumreport);
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Free sum report hdr=%p", (void *)sumreport);
+#endif
+ Condition_Destroy_Reference(&sumreport->reference);
+ if (sumreport->info.latency_histogram) {
+ histogram_delete(sumreport->info.latency_histogram);
+ }
+ if (sumreport->info.framelatency_histogram) {
+ histogram_delete(sumreport->info.framelatency_histogram);
+ }
+ if (sumreport->info.bbrtt_histogram) {
+ histogram_delete(sumreport->info.bbrtt_histogram);
+ }
+ if (sumreport->info.jitter_histogram) {
+ histogram_delete(sumreport->info.jitter_histogram);
+ }
+ free_common_copy(sumreport->info.common);
+ free(sumreport);
+}
+
+
+static void Free_iReport (struct ReporterData *ireport) {
+ assert(ireport != NULL);
+
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Free report hdr=%p reporter thread suspend count=%d packetring=%p histo=%p frame histo=%p", \
+ (void *)ireport, ireport->reporter_thread_suspends, (void *) ireport->packetring, \
+ (void *)ireport->info.latency_histogram, (void *) ireport->info.framelatency_histogram);
+#endif
+ if (ireport->packetring && ireport->info.total.Bytes.current && !(isSingleUDP(ireport->info.common)) && \
+ !TimeZero(ireport->info.ts.intervalTime) && (ireport->reporter_thread_suspends < 3)) {
+ fprintf(stdout, "WARN: this test may have been CPU bound (%d) (or may not be detecting the underlying network devices)\n", \
+ ireport->reporter_thread_suspends);
+ }
+ if (ireport->packetring) {
+ packetring_free(ireport->packetring);
+ }
+ if (ireport->info.latency_histogram) {
+ histogram_delete(ireport->info.latency_histogram);
+ }
+ if (ireport->info.jitter_histogram) {
+ histogram_delete(ireport->info.jitter_histogram);
+ }
+ if (ireport->info.framelatency_histogram) {
+ histogram_delete(ireport->info.framelatency_histogram);
+ }
+ if (ireport->info.bbrtt_histogram) {
+ histogram_delete(ireport->info.bbrtt_histogram);
+ }
+ free_common_copy(ireport->info.common);
+ free(ireport);
+}
+
+void FreeConnectionReport (struct ConnectionInfo *report) {
+ free_common_copy(report->common);
+ free(report);
+}
+
+static void Free_sReport (struct ReportSettings *report) {
+ free_common_copy(report->common);
+ free(report);
+}
+
+static void Free_srReport (struct TransferInfo *report) {
+ free_common_copy(report->common);
+ free(report);
+}
+
+void FreeReport (struct ReportHeader *reporthdr) {
+ assert(reporthdr != NULL);
+#ifdef HAVE_THREAD_DEBUG
+ char rs[REPORTTXTMAX];
+ reporttype_text(reporthdr, &rs[0]);
+ thread_debug("Jobq *FREE* report hdr/rpt %p/%p (%s)", (void *) reporthdr, (void *) reporthdr->this_report, &rs[0]);
+#endif
+ switch (reporthdr->type) {
+ case DATA_REPORT:
+ Free_iReport((struct ReporterData *)reporthdr->this_report);
+ break;
+ case CONNECTION_REPORT:
+ FreeConnectionReport((struct ConnectionInfo *)reporthdr->this_report);
+ break;
+ case SETTINGS_REPORT:
+ Free_sReport((struct ReportSettings *)reporthdr->this_report);
+ break;
+ case SERVER_RELAY_REPORT:
+ Free_srReport((struct TransferInfo *)reporthdr->this_report);
+ break;
+ default:
+ fprintf(stderr, "Invalid report type in free (%x)\n", reporthdr->type);
+ assert(0);
+ break;
+ }
+ free(reporthdr);
+}
+
+/*
+ * InitReport is called by a transfer agent (client or
+ * server) to setup the needed structures to communicate
+ * traffic and connection information. Also initialize
+ * the report start time and next interval report time
+ * Finally, in the case of parallel clients, have them all
+ * synchronize on compeleting their connect()
+ */
+
+void IncrSumReportRefCounter (struct SumReport *sumreport) {
+ assert(sumreport);
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Sum multiheader %p ref=%d->%d", (void *)sumreport, sumreport->reference.count, (sumreport->reference.count + 1));
+#endif
+ Mutex_Lock(&sumreport->reference.lock);
+ sumreport->reference.count++;
+ if (sumreport->reference.count > sumreport->reference.maxcount)
+ sumreport->reference.maxcount = sumreport->reference.count;
+ Mutex_Unlock(&sumreport->reference.lock);
+}
+
+int DecrSumReportRefCounter (struct SumReport *sumreport) {
+ assert(sumreport);
+// thread_debug("before lock hdr=%p", (void *)sumreport);
+ Mutex_Lock(&sumreport->reference.lock);
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Sum multiheader %p ref=%d->%d", (void *)sumreport, sumreport->reference.count, (sumreport->reference.count - 1));
+#endif
+// thread_debug("in lock hdr=%p", (void *)sumreport);
+ int refcnt = --sumreport->reference.count;
+ Mutex_Unlock(&sumreport->reference.lock);
+// thread_debug("unlock hdr=%p", (void *)sumreport);
+ return refcnt;
+}
+
+
+// Note, this report structure needs to remain self contained and not coupled
+// to any settings structure pointers. This allows the thread settings to
+// be freed without impacting the reporter. It's not recommended that
+// this be done, i.e. free the settings before the report, but be defensive
+// here to allow it
+struct ReportHeader* InitIndividualReport (struct thread_Settings *inSettings) {
+ /*
+ * Create the report header and an ireport (if needed)
+ */
+ assert(inSettings!=NULL);
+ struct ReportHeader *reporthdr = (struct ReportHeader *) calloc(1, sizeof(struct ReportHeader));
+ if (reporthdr == NULL) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+ reporthdr->this_report = calloc(1, sizeof(struct ReporterData));
+ if (reporthdr->this_report == NULL) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+ reporthdr->type = DATA_REPORT;
+ reporthdr->ReportMode = inSettings->mReportMode;
+
+ struct ReporterData *ireport = (struct ReporterData *)(reporthdr->this_report);
+ if (inSettings->mSumReport) {
+ ireport->GroupSumReport = inSettings->mSumReport;
+ }
+ if (isFullDuplex(inSettings)) {
+ assert(inSettings->mFullDuplexReport != NULL);
+ IncrSumReportRefCounter(inSettings->mFullDuplexReport);
+ ireport->FullDuplexReport = inSettings->mFullDuplexReport;
+ }
+ // Copy common settings into the transfer report section
+ common_copy(&ireport->info.common, inSettings);
+ ireport->info.final = false;
+ ireport->info.burstid_transition = false;
+ ireport->info.isEnableTcpInfo = false;
+
+ // Create a new packet ring which is used to communicate
+ // packet stats from the traffic thread to the reporter
+ // thread. The reporter thread does all packet accounting
+ ireport->packetring = packetring_init((inSettings->numreportstructs ? inSettings->numreportstructs : (isSingleUDP(inSettings) ? 40 : NUM_REPORT_STRUCTS)), \
+ &ReportCond, (isSingleUDP(inSettings) ? NULL : &inSettings->awake_me));
+#ifdef HAVE_THREAD_DEBUG
+ char rs[REPORTTXTMAX];
+ reporttype_text(reporthdr, &rs[0]);
+ thread_debug("Init %s report hdr/rpt/com=%p/%p/%p multireport/fullduplex=%p/%p pring(bytes)/cond=%p(%d)/%p (socket=%d)", &rs[0], \
+ (void *) reporthdr, (void *) ireport, (void *) ireport->info.common, \
+ (void *) inSettings->mSumReport, (void *) inSettings->mFullDuplexReport, \
+ (void *) ireport->packetring, ireport->packetring->bytes, (void *) ireport->packetring->awake_producer, inSettings->mSock);
+#endif
+ if (inSettings->numreportstructs)
+ fprintf (stdout, "%sNUM_REPORT_STRUCTS override from %d to %d\n", inSettings->mTransferIDStr, NUM_REPORT_STRUCTS, inSettings->numreportstructs);
+
+ // Set up the function vectors, there are three
+ // 1) packet_handler: does packet accounting per the test and protocol
+ // 2) transfer_protocol_handler: performs output, e.g. interval reports, per the test and protocol
+
+ if (inSettings->mIntervalMode == kInterval_Time) {
+ ireport->info.ts.intervalTime.tv_sec = (long) (inSettings->mInterval / rMillion);
+ ireport->info.ts.intervalTime.tv_usec = (long) (inSettings->mInterval % rMillion);
+ ireport->transfer_interval_handler = reporter_condprint_time_interval_report;
+ ireport->info.ts.significant_partial = (double) inSettings->mInterval * PARTIALPERCENT / rMillion ;
+ } else {
+ ireport->transfer_interval_handler = NULL;
+ }
+ ireport->packet_handler_pre_report = NULL;
+ ireport->packet_handler_post_report = NULL;
+ switch (inSettings->mThreadMode) {
+ case kMode_Server :
+ if (isUDP(inSettings)) {
+ ireport->packet_handler_post_report = reporter_handle_packet_server_udp;
+ ireport->transfer_protocol_handler = reporter_transfer_protocol_server_udp;
+ if ((inSettings->mIntervalMode == kInterval_Frames) && isIsochronous(inSettings)) {
+ ireport->transfer_interval_handler = reporter_condprint_frame_interval_report_server_udp;
+ ireport->transfer_protocol_handler = reporter_transfer_protocol_server_udp;
+ } else {
+ ireport->transfer_protocol_handler = reporter_transfer_protocol_server_udp;
+ if (isSumOnly(inSettings)) {
+ ireport->info.output_handler = NULL;
+ } else if ((inSettings->mReportMode == kReport_CSV) && !isSumOnly(inSettings)) {
+ if (isEnhanced(inSettings))
+ ireport->info.output_handler = udp_output_enhanced_csv;
+ else
+ ireport->info.output_handler = udp_output_basic_csv;
+ } else if (isTripTime(inSettings)) {
+ if (isIsochronous(inSettings))
+ ireport->info.output_handler = udp_output_read_triptime_isoch;
+ else
+ ireport->info.output_handler = udp_output_read_triptime;
+ } else if (isEnhanced(inSettings)) {
+ ireport->info.output_handler = udp_output_read_enhanced;
+ } else if (isFullDuplex(inSettings)) {
+ ireport->info.output_handler = udp_output_read;
+ } else {
+ ireport->info.output_handler = udp_output_read;
+ }
+ }
+ } else { // TCP case
+ ireport->packet_handler_post_report = reporter_handle_packet_server_tcp;
+ ireport->transfer_protocol_handler = reporter_transfer_protocol_server_tcp;
+ if (isPeriodicBurst(inSettings)) {
+ ireport->transfer_interval_handler = reporter_condprint_burst_interval_report_server_tcp;
+ ireport->info.output_handler = tcp_output_burst_read;
+ ireport->packet_handler_pre_report = reporter_handle_packet_server_tcp;
+ ireport->packet_handler_post_report = NULL;
+ } else if ((inSettings->mIntervalMode == kInterval_Frames) && isIsochronous(inSettings)) {
+ ireport->transfer_interval_handler = reporter_condprint_frame_interval_report_server_tcp;
+ ireport->info.output_handler = tcp_output_frame_read_triptime;
+ ireport->packet_handler_pre_report = reporter_handle_packet_server_tcp;
+ ireport->packet_handler_post_report = NULL;
+ } else if (isSumOnly(inSettings)) {
+ ireport->info.output_handler = NULL;
+ } else if ((inSettings->mReportMode == kReport_CSV) && !isSumOnly(inSettings)) {
+ if (isEnhanced(inSettings))
+ ireport->info.output_handler = tcp_output_read_enhanced_csv;
+ else
+ ireport->info.output_handler = tcp_output_basic_csv;
+ } else if (isBounceBack(inSettings)) {
+ ireport->packet_handler_post_report = reporter_handle_packet_bb_server;
+ ireport->transfer_protocol_handler = reporter_transfer_protocol_server_bb_tcp;
+ ireport->info.output_handler = tcp_output_write;
+ } else if (isTripTime(inSettings) && isIsochronous(inSettings)) {
+ ireport->info.output_handler = tcp_output_read_enhanced_isoch;
+ } else if (isTripTime(inSettings)) {
+ ireport->info.output_handler = tcp_output_read_triptime;
+ } else if (isEnhanced(inSettings)) {
+ ireport->info.output_handler = tcp_output_read_enhanced;
+ } else if (!isFullDuplex(inSettings)) {
+ ireport->info.output_handler = tcp_output_read;
+ } else {
+ ireport->info.output_handler = tcp_output_read;
+ }
+ }
+ break;
+ case kMode_Client :
+ ireport->packet_handler_post_report = reporter_handle_packet_client;
+ if (isUDP(inSettings)) {
+ ireport->transfer_protocol_handler = reporter_transfer_protocol_client_udp;
+ if (isSumOnly(inSettings)) {
+ ireport->info.output_handler = NULL;
+ } else if ((inSettings->mReportMode == kReport_CSV) && !isSumOnly(inSettings)) {
+ if (isEnhanced(inSettings))
+ ireport->info.output_handler = udp_output_enhanced_csv;
+ else
+ ireport->info.output_handler = udp_output_basic_csv;
+ } else if (isIsochronous(inSettings)) {
+ ireport->info.output_handler = udp_output_write_enhanced_isoch;
+ } else if (isEnhanced(inSettings)) {
+ ireport->info.output_handler = udp_output_write_enhanced;
+ } else if (isFullDuplex(inSettings)) {
+ ireport->info.output_handler = udp_output_write;
+ } else {
+ ireport->info.output_handler = udp_output_write;
+ }
+ } else {
+ ireport->transfer_protocol_handler = reporter_transfer_protocol_client_tcp;
+ if (isSumOnly(inSettings)) {
+ ireport->info.output_handler = NULL;
+ } else if ((inSettings->mReportMode == kReport_CSV) && !isSumOnly(inSettings)) {
+ if (isEnhanced(inSettings))
+ ireport->info.output_handler = tcp_output_write_enhanced_csv;
+ else
+ ireport->info.output_handler = tcp_output_basic_csv;
+ } else if (isBounceBack(inSettings)) {
+ ireport->packet_handler_post_report = reporter_handle_packet_bb_client;
+ ireport->transfer_protocol_handler = reporter_transfer_protocol_client_bb_tcp;
+ ireport->info.output_handler = tcp_output_write_bb;
+ } else if (isIsochronous(inSettings)) {
+ ireport->info.output_handler = tcp_output_write_enhanced_isoch;
+ } else if (isTcpWriteTimes(inSettings)) {
+ ireport->info.output_handler = tcp_output_write_enhanced_write;
+ } else if (isEnhanced(inSettings)) {
+ ireport->info.output_handler = tcp_output_write_enhanced;
+ } else if (isFullDuplex(inSettings)) {
+ ireport->info.output_handler = tcp_output_write;
+ } else {
+ ireport->info.output_handler = tcp_output_write;
+ }
+ }
+ break;
+ case kMode_Unknown :
+ case kMode_Reporter :
+ case kMode_ReporterClient :
+ case kMode_Listener:
+ default:
+ FAIL(1, "InitIndividualReport\n", inSettings);
+ }
+
+ if (inSettings->mThreadMode == kMode_Server) {
+ ireport->info.sock_callstats.read.binsize = inSettings->mBufLen / 8;
+ if (isUDP(inSettings)) {
+ if (isJitterHistogram(inSettings)) {
+ char name[] = "J8";
+ ireport->info.jitter_histogram = histogram_init(JITTER_BINCNT,inSettings->jitter_binwidth,0,JITTER_UNITS, \
+ JITTER_LCI, JITTER_UCI, ireport->info.common->transferID, name);
+ }
+ if (isTripTime(inSettings) && isHistogram(inSettings)) {
+ char name[] = "T8";
+ ireport->info.latency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0,\
+ pow(10,inSettings->mHistUnits), \
+ inSettings->mHistci_lower, inSettings->mHistci_upper, ireport->info.common->transferID, name);
+ }
+ }
+ if (isHistogram(inSettings) && (isIsochronous(inSettings) || (!isUDP(inSettings) && isTripTime(inSettings)))) {
+ char name[] = "F8";
+ // make sure frame bin size min is 100 microsecond
+ ireport->info.framelatency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0, \
+ pow(10,inSettings->mHistUnits), inSettings->mHistci_lower, \
+ inSettings->mHistci_upper, ireport->info.common->transferID, name);
+ }
+ }
+ if ((inSettings->mThreadMode == kMode_Client) && !isUDP(inSettings) && isHistogram(inSettings)) {
+ if (isTcpWriteTimes(inSettings)) {
+ char name[] = "W8";
+ ireport->info.write_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0,\
+ pow(10,inSettings->mHistUnits), \
+ inSettings->mHistci_lower, inSettings->mHistci_upper, ireport->info.common->transferID, name);
+ } else if (isWritePrefetch(inSettings)) {
+ char name[] = "S8";
+ ireport->info.latency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0,\
+ pow(10,inSettings->mHistUnits), \
+ inSettings->mHistci_lower, inSettings->mHistci_upper, ireport->info.common->transferID, name);
+ }
+ }
+ if ((inSettings->mThreadMode == kMode_Client) && isBounceBack(inSettings)) {
+ char name[] = "BB8";
+ if (!isHistogram(inSettings)) {
+ inSettings->mHistBins = 100000; // 10 seconds wide
+ inSettings->mHistBinsize = 100; // 100 usec bins
+ inSettings->mHistUnits = 6; // usecs 10 pow(x)
+ inSettings->mHistci_lower = 5;
+ inSettings->mHistci_upper = 95;
+ }
+ ireport->info.bbrtt_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0, \
+ pow(10,inSettings->mHistUnits), \
+ inSettings->mHistci_lower, inSettings->mHistci_upper, ireport->info.common->transferID, name);
+ }
+ return reporthdr;
+}
+
+
+/*
+ * This init/update and print/finish (in the ReportDefault.c)
+ * is poor. It has to be done this way to preserve the
+ * interface to older versions where the reporter settings
+ * were delayed until a Transfer report came through.
+ * This transfer report has all the reports bound to it.
+ *
+ * The better implmementation is to treat all reports
+ * as independent objects that can be updated, processed,
+ * and output independlty per the Reporter threads job queue
+ * without shared state or copied state variables between these
+ * reports. The shared state, is really reporter state, that
+ * should be maintained in and by the reporter object/thread.
+ *
+ * For now, just fix it good enough. Later, write a c++
+ * reporter object and use standard c++ design techniques
+ * to achieve this. Such code will be easier to maintain
+ * and to extend.
+ */
+struct ReportHeader* InitConnectionReport (struct thread_Settings *inSettings) {
+ assert(inSettings != NULL);
+ struct ReportHeader *reporthdr = (struct ReportHeader *) calloc(1, sizeof(struct ReportHeader));
+ if (reporthdr == NULL) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+ reporthdr->this_report = calloc(1, sizeof(struct ConnectionInfo));
+ if (reporthdr->this_report == NULL) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+ reporthdr->type = CONNECTION_REPORT;
+ reporthdr->ReportMode = inSettings->mReportMode;
+
+ struct ConnectionInfo * creport = (struct ConnectionInfo *)(reporthdr->this_report);
+ common_copy(&creport->common, inSettings);
+ tcpstats_copy(&creport->tcpinitstats, &inSettings->tcpinitstats);
+ // Fill out known fields for the connection report
+ reporter_peerversion(creport, inSettings->peer_version_u, inSettings->peer_version_l);
+ if (isEnhanced(inSettings) && isTxStartTime(inSettings)) {
+ creport->epochStartTime.tv_sec = inSettings->txstart_epoch.tv_sec;
+ creport->epochStartTime.tv_usec = inSettings->txstart_epoch.tv_usec;
+ } else if (isTripTime(inSettings)) {
+ creport->epochStartTime.tv_sec = inSettings->accept_time.tv_sec;
+ creport->epochStartTime.tv_usec = inSettings->accept_time.tv_usec;
+ }
+ // Copy state from the settings object into the connection report
+ creport->connect_times.min = FLT_MAX;
+ creport->connect_times.max = FLT_MIN;
+ creport->connect_times.vd = 0;
+ creport->connect_times.m2 = 0;
+ creport->connect_times.mean = 0;
+ if (inSettings->mSock > 0) {
+ creport->winsize = getsock_tcp_windowsize(inSettings->mSock, \
+ (inSettings->mThreadMode != kMode_Client ? 0 : 1) );
+#if HAVE_DECL_TCP_WINDOW_CLAMP
+ if (isRxClamp(inSettings)) {
+ getsock_tcp_windowclamp(inSettings->mSock);
+ }
+#endif
+ }
+ creport->common->winsize_requested = inSettings->mTCPWin;
+ creport->txholdbacktime = inSettings->txholdback_timer;
+ if (isPeriodicBurst(inSettings)) {
+ creport->common->FPS = inSettings->mFPS;
+ }
+#ifdef HAVE_THREAD_DEBUG
+ char rs[REPORTTXTMAX];
+ reporttype_text(reporthdr, &rs[0]);
+ thread_debug("Init %s report hdr/rpt/com %p/%p/%p", &rs[0], \
+ (void *) reporthdr, (void *) reporthdr->this_report, (void *) creport->common);
+#endif
+ return reporthdr;
+}
+
+/*
+ * ReportSettings will generate a summary report for
+ * settings being used with Listeners or Clients
+ */
+struct ReportHeader *InitSettingsReport (struct thread_Settings *inSettings) {
+ assert(inSettings != NULL);
+ struct ReportHeader *reporthdr = (struct ReportHeader *) calloc(1, sizeof(struct ReportHeader));
+ if (reporthdr == NULL) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+ reporthdr->this_report = calloc(1, sizeof(struct ReportSettings));
+ if (reporthdr->this_report == NULL) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+ reporthdr->type = SETTINGS_REPORT;
+ reporthdr->ReportMode = inSettings->mReportMode;
+
+ struct ReportSettings *sreport = (struct ReportSettings *)reporthdr->this_report;
+ common_copy(&sreport->common, inSettings);
+ sreport->peer = inSettings->peer;
+ sreport->size_peer = inSettings->size_peer;
+ sreport->local = inSettings->local;
+ sreport->size_local = inSettings->size_local;
+ sreport->isochstats.mFPS = inSettings->mFPS;
+ sreport->isochstats.mMean = inSettings->mMean/8;
+ sreport->isochstats.mVariance = inSettings->mVariance/8;
+ sreport->isochstats.mBurstIPG = (unsigned int) (inSettings->mBurstIPG*1000.0);
+ sreport->isochstats.mBurstInterval = (unsigned int) (1 / inSettings->mFPS * 1000000);
+ if (!isUDP(inSettings)) {
+ if (inSettings->mMSS > 0) {
+ sreport->sockmaxseg = inSettings->mMSS;
+ } else if (isPrintMSS(inSettings) && !(inSettings->mMSS > 0)) {
+ sreport->sockmaxseg = getsock_tcp_mss(inSettings->mSock);
+ }
+ }
+#ifdef HAVE_THREAD_DEBUG
+ char rs[REPORTTXTMAX];
+ reporttype_text(reporthdr, &rs[0]);
+ thread_debug("Init %s report hdr/rpt/com %p/%p/%p", &rs[0], \
+ (void *) reporthdr, (void *) reporthdr->this_report, (void *) sreport->common);
+#endif
+ return reporthdr;
+}
+
+/*
+ * This will generate a report of the UDP
+ * statistics as reported by the server on the client
+ * side.
+ */
+struct ReportHeader* InitServerRelayUDPReport(struct thread_Settings *inSettings, struct server_hdr *server) {
+ /*
+ * Create the report header and an ireport (if needed)
+ */
+ struct ReportHeader *reporthdr = (struct ReportHeader *) calloc(1, sizeof(struct ReportHeader));
+ if (reporthdr == NULL) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+ reporthdr->this_report = calloc(1, sizeof(struct ServerRelay));
+ if (!reporthdr->this_report) {
+ FAIL(1, "Out of Memory!!\n", inSettings);
+ }
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("Init server relay report %p size %ld", (void *)reporthdr, sizeof(struct ReportHeader) + sizeof(struct ServerRelay));
+#endif
+ reporthdr->type = SERVER_RELAY_REPORT;
+ reporthdr->ReportMode = inSettings->mReportMode;
+ struct ServerRelay *sr_report = (struct ServerRelay *)reporthdr->this_report;
+ common_copy(&sr_report->info.common, inSettings);
+ struct TransferInfo *stats = &sr_report->info;
+ stats->common->transferID = inSettings->mTransferID;
+
+ stats->jitter = ntohl(server->base.jitter1);
+ stats->jitter += ntohl(server->base.jitter2) / (double)rMillion;
+#ifdef HAVE_INT64_T
+ stats->cntBytes = (((intmax_t) ntohl(server->base.total_len1)) << 32) + \
+ ntohl(server->base.total_len2);
+#else
+ stats->cntBytes = (intmax_t) ntohl(server->base.total_len2);
+#endif
+ stats->ts.iStart = 0;
+ stats->ts.iEnd = ntohl(server->base.stop_sec);
+ stats->ts.iEnd += ntohl(server->base.stop_usec) / (double)rMillion;
+ uint32_t flags = ntohl(server->base.flags);
+ if ((flags & HEADER_SEQNO64B)) {
+ stats->cntError = (((intmax_t) ntohl(server->extend2.error_cnt2)) << 32) + \
+ ntohl(server->base.error_cnt);
+ stats->cntOutofOrder = (((intmax_t) ntohl(server->extend2.outorder_cnt2)) << 32) + \
+ ntohl(server->base.outorder_cnt);
+ stats->cntDatagrams = (((intmax_t) ntohl(server->extend2.datagrams2)) << 32) + \
+ ntohl(server->base.datagrams);
+ } else {
+ stats->cntError = ntohl(server->base.error_cnt);
+ stats->cntOutofOrder = ntohl(server->base.outorder_cnt);
+ stats->cntDatagrams = ntohl(server->base.datagrams);
+ }
+ if ((flags & SERVER_HEADER_EXTEND) != 0) {
+ setEnhanced(stats->common);
+ stats->transit.current.min = ntohl(server->extend.minTransit1);
+ stats->transit.current.min += ntohl(server->extend.minTransit2) / (double)rMillion;
+ stats->transit.current.max = ntohl(server->extend.maxTransit1);
+ stats->transit.current.max += ntohl(server->extend.maxTransit2) / (double)rMillion;
+ stats->transit.current.sum = ntohl(server->extend.sumTransit1);
+ stats->transit.current.sum += ntohl(server->extend.sumTransit2) / (double)rMillion;
+ stats->transit.current.mean = ntohl(server->extend.meanTransit1);
+ stats->transit.current.mean += ntohl(server->extend.meanTransit2) / (double)rMillion;
+ stats->transit.current.m2 = ntohl(server->extend.m2Transit1);
+ stats->transit.current.m2 += ntohl(server->extend.m2Transit2) / (double)rMillion;
+ stats->transit.current.m2 *= 1e-12;
+ stats->transit.current.vd = ntohl(server->extend.vdTransit1);
+ stats->transit.current.vd += ntohl(server->extend.vdTransit2) / (double)rMillion;
+ stats->transit.current.cnt = ntohl(server->extend.cntTransit);
+ stats->cntIPG = ntohl(server->extend.cntIPG);
+ stats->IPGsum = ntohl(server->extend.IPGsum);
+ } else {
+ unsetEnhanced(stats->common);
+ }
+ sr_report->peer = inSettings->local;
+ sr_report->size_peer = inSettings->size_local;
+ sr_report->local = inSettings->peer;
+ sr_report->size_local = inSettings->size_peer;
+ return reporthdr;
+}
+
+/* -------------------------------------------------------------------
+ * Send an AckFIN (a datagram acknowledging a FIN) on the socket,
+ * then select on the socket for some time to check for silence.
+ * If additional datagrams come in (not silent), probably our AckFIN
+ * was lost so the client has re-transmitted
+ * termination datagrams, so re-transmit our AckFIN.
+ * Sent by server to client
+ * ------------------------------------------------------------------- */
+void write_UDP_AckFIN (struct TransferInfo *stats, int len) {
+ assert(stats!= NULL);
+ int ackpacket_length = (int) (sizeof(struct UDP_datagram) + sizeof(struct server_hdr));
+ int readlen = ((ackpacket_length * 2) > len * 2) ? (ackpacket_length * 2) : (len * 2);
+ char *ackPacket = (char *) calloc(1, readlen);
+ int success = 0;
+ assert(ackPacket);
+ fd_set readSet;
+ int rc = 1;
+ struct timeval timeout;
+
+ if (ackPacket) {
+ struct UDP_datagram *UDP_Hdr = (struct UDP_datagram *)ackPacket;
+ struct server_hdr *hdr = (struct server_hdr *)(UDP_Hdr+1);
+
+ UDP_Hdr = (struct UDP_datagram*) ackPacket;
+ int flags = HEADER_VERSION1;
+ if (isEnhanced(stats->common) || isTripTime(stats->common))
+ flags |= SERVER_HEADER_EXTEND;
+#ifdef HAVE_INT64_T
+ flags |= HEADER_SEQNO64B;
+#endif
+ hdr->base.flags = htonl((long) flags);
+#ifdef HAVE_INT64_T
+ hdr->base.total_len1 = htonl((long) (stats->cntBytes >> 32));
+#else
+ hdr->base.total_len1 = htonl(0x0);
+#endif
+ hdr->base.total_len2 = htonl((long) (stats->cntBytes & 0xFFFFFFFF));
+ hdr->base.stop_sec = htonl( (long) stats->ts.iEnd);
+ hdr->base.stop_usec = htonl( (long)((stats->ts.iEnd - (long)stats->ts.iEnd) * rMillion));
+ hdr->base.error_cnt = htonl((long) (stats->cntError & 0xFFFFFFFF));
+ hdr->base.outorder_cnt = htonl((long) (stats->cntOutofOrder & 0xFFFFFFFF));
+ hdr->base.datagrams = htonl((long) (stats->cntDatagrams & 0xFFFFFFFF));
+ if (flags & HEADER_SEQNO64B) {
+ hdr->extend2.error_cnt2 = htonl((long) (stats->cntError >> 32));
+ hdr->extend2.outorder_cnt2 = htonl((long) (stats->cntOutofOrder >> 32) );
+ hdr->extend2.datagrams2 = htonl((long) (stats->cntDatagrams >> 32));
+ }
+ // printf("****** Server final estimator %f calculated average %f\n", stats->jitter, (stats->inline_jitter.total.sum / stats->inline_jitter.total.cnt));
+ if (stats->inline_jitter.total.cnt > 0)
+ stats->jitter = (stats->inline_jitter.total.sum / stats->inline_jitter.total.cnt); // overide the final estimator with an average
+ hdr->base.jitter1 = htonl((long) stats->jitter);
+ hdr->base.jitter2 = htonl((long) ((stats->jitter - (long)stats->jitter) * rMillion));
+
+ hdr->extend.minTransit1 = htonl((long) stats->transit.total.min);
+ hdr->extend.minTransit2 = htonl((long) ((stats->transit.total.min - (long)stats->transit.total.min) * rMillion));
+ hdr->extend.maxTransit1 = htonl((long) stats->transit.total.max);
+ hdr->extend.maxTransit2 = htonl((long) ((stats->transit.total.max - (long)stats->transit.total.max) * rMillion));
+ hdr->extend.sumTransit1 = htonl((long) stats->transit.total.sum);
+ hdr->extend.sumTransit2 = htonl((long) ((stats->transit.total.sum - (long)stats->transit.total.sum) * rMillion));
+ hdr->extend.meanTransit1 = htonl((long) stats->transit.total.mean);
+ hdr->extend.meanTransit2 = htonl((long) ((stats->transit.total.mean - (long)stats->transit.total.mean) * rMillion));
+ stats->transit.total.m2 *= 1e12;
+ hdr->extend.m2Transit1 = htonl((long) stats->transit.total.m2);
+ hdr->extend.m2Transit2 = htonl((long) ((stats->transit.total.m2 - (long)stats->transit.total.m2) * rMillion));
+ hdr->extend.vdTransit1 = htonl((long) stats->transit.total.vd);
+ hdr->extend.vdTransit2 = htonl((long) ((stats->transit.total.vd - (long)stats->transit.total.vd) * rMillion));
+ hdr->extend.cntTransit = htonl(stats->transit.total.cnt);
+ hdr->extend.cntIPG = htonl((long) (stats->cntDatagrams / (stats->ts.iEnd - stats->ts.iStart)));
+ hdr->extend.IPGsum = htonl(1);
+
+#define TRYCOUNT 10
+ int count = TRYCOUNT;
+ while (--count) {
+ // write data
+#if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
+ // If in l2mode, use the AF_INET socket to write this packet
+ //
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("UDP server send done-ack w/server-stats to client (sock=%d)", stats->common->socket);
+#endif
+ rc = write(((stats->common->socketdrop > 0) ? stats->common->socketdrop : stats->common->socket), ackPacket, ackpacket_length);
+#else
+ rc = write(stats->common->socket, ackPacket, ackpacket_length);
+#endif
+ WARN_errno(rc < 0, "write-ackfin");
+ // wait here is for silence, no more packets from the client
+
+ FD_ZERO(&readSet);
+ FD_SET(stats->common->socket, &readSet);
+ timeout.tv_sec = 0;
+ timeout.tv_usec = 250000;
+ rc = select(stats->common->socket+1, &readSet, NULL, NULL, &timeout);
+ if (rc == 0) {
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug("UDP server detected silence - server stats assumed received by client");
+#endif
+ success = 1;
+ break;
+ }
+ rc = read(stats->common->socket, ackPacket, readlen);
+ // WARN_errno(rc < 0, "ack await silence");
+ if ((rc < 0) && FATALUDPREADERR(errno)) {
+ break;
+ }
+#ifdef HAVE_THREAD_DEBUG
+ if (rc > 0) {
+ thread_debug("UDP server thinks server stats packet maybe lost, will retransmit and try again", rc);
+ }
+#endif
+ }
+ free(ackPacket);
+ }
+ if (!success && (stats->common->ReportMode != kReport_CSV)) {
+ fprintf(stderr, warn_ack_failed, stats->common->socket);
+ }
+}
+// end write_UDP_AckFIN