diff options
Diffstat (limited to 'src/Reports.c')
-rw-r--r-- | src/Reports.c | 1091 |
1 files changed, 1091 insertions, 0 deletions
diff --git a/src/Reports.c b/src/Reports.c new file mode 100644 index 0000000..008f7f4 --- /dev/null +++ b/src/Reports.c @@ -0,0 +1,1091 @@ +/*--------------------------------------------------------------- + * Copyright (c) 1999,2000,2001,2002,2003 + * The Board of Trustees of the University of Illinois + * All Rights Reserved. + *--------------------------------------------------------------- + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software (Iperf) and associated + * documentation files (the "Software"), to deal in the Software + * without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, + * sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * + * Redistributions of source code must retain the above + * copyright notice, this list of conditions and + * the following disclaimers. + * + * + * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimers in the documentation and/or other materials + * provided with the distribution. + * + * + * Neither the names of the University of Illinois, NCSA, + * nor the names of its contributors may be used to endorse + * or promote products derived from this Software without + * specific prior written permission. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE CONTRIBUTORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * ________________________________________________________________ + * National Laboratory for Applied Network Research + * National Center for Supercomputing Applications + * University of Illinois at Urbana-Champaign + * http://www.ncsa.uiuc.edu + * ________________________________________________________________ + * + * Reporter.c + * rewritten by Robert McMahon + * ------------------------------------------------------------------- + * Handle instantiation and deletion of reports, including sum reports, + * in a thread safe way + * ------------------------------------------------------------------- */ + +#include "headers.h" +#include <math.h> +#include "Settings.hpp" +#include "PerfSocket.hpp" +#include "Reporter.h" +#include "Locale.h" +#include "active_hosts.h" +#include "payloads.h" +static int transferid_counter = 0; + +static inline int my_str_copy(char **dst, char *src) { + int cnt = 0; + if (src) { + *dst = (char *) calloc(strlen(src) + 1, sizeof(char)); + if (*dst == NULL) { + fprintf(stderr, "Out of Memory!!\n"); + exit(1); + } + cnt = strlen(src) + 1; + strcpy((*dst), src); + } else { + *dst = NULL; + } + return cnt; +} + +// These are the thread settings that are shared among report types +// Make a copy vs referencing the thread setting object. This will +// better encpasulate report handling. +static void common_copy (struct ReportCommon **common, struct thread_Settings *inSettings) { + // Do deep copies from settings + *common = (struct ReportCommon *) calloc(1, sizeof(struct ReportCommon)); + my_str_copy(&(*common)->Host, inSettings->mHost); + my_str_copy(&(*common)->HideHost, inSettings->mHideHost); + my_str_copy(&(*common)->Localhost, inSettings->mLocalhost); + my_str_copy(&(*common)->Ifrname, inSettings->mIfrname); + my_str_copy(&(*common)->Ifrnametx, inSettings->mIfrnametx); + my_str_copy(&(*common)->SSMMulticastStr, inSettings->mSSMMulticastStr); + my_str_copy(&(*common)->Congestion, inSettings->mCongestion); + my_str_copy(&(*common)->transferIDStr, inSettings->mTransferIDStr); + my_str_copy(&(*common)->PermitKey, inSettings->mPermitKey); + + // copy some relevant settings + (*common)->flags = inSettings->flags; + (*common)->flags_extend = inSettings->flags_extend; + (*common)->flags_extend2 = inSettings->flags_extend2; + (*common)->ThreadMode = inSettings->mThreadMode; + (*common)->ReportMode = inSettings->mReportMode; + (*common)->KeyCheck = inSettings->mKeyCheck; + (*common)->Format = inSettings->mFormat; + (*common)->TTL = inSettings->mTTL; + // copy some traffic related settings + (*common)->BufLen = inSettings->mBufLen; + (*common)->MSS = inSettings->mMSS; + (*common)->TCPWin = inSettings->mTCPWin; + (*common)->FQPacingRate = inSettings->mFQPacingRate; + (*common)->Port = inSettings->mPort; + (*common)->PortLast = inSettings->mPortLast; + (*common)->BindPort = inSettings->mBindPort; + (*common)->ListenPort = inSettings->mListenPort; + (*common)->AppRate = inSettings->mAppRate; + (*common)->BurstSize = inSettings->mBurstSize; + (*common)->AppRateUnits = inSettings->mAppRateUnits; + (*common)->socket = inSettings->mSock; + (*common)->transferID = inSettings->mTransferID; + (*common)->threads = inSettings->mThreads; + (*common)->winsize_requested = inSettings->mTCPWin; +#if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET) + (*common)->socketdrop = inSettings->mSockDrop; +#endif + (*common)->peer = inSettings->peer; + (*common)->size_peer = inSettings->size_peer; + (*common)->jitter_binwidth = inSettings->jitter_binwidth; + (*common)->local = inSettings->local; + (*common)->size_local = inSettings->size_local; + (*common)->HistBins =inSettings->mHistBins; + (*common)->HistBinsize =inSettings->mHistBinsize; + (*common)->HistUnits =inSettings->mHistUnits; + (*common)->pktIPG =inSettings->mBurstIPG; + (*common)->rtt_weight = inSettings->rtt_nearcongest_weight_factor; + (*common)->ListenerTimeout =inSettings->mListenerTimeout; + (*common)->FPS = inSettings->mFPS; + (*common)->TOS = inSettings->mTOS; + (*common)->RTOS = inSettings->mRTOS; + (*common)->bbsize = inSettings->mBounceBackBytes; + (*common)->bbhold = inSettings->mBounceBackHold; + (*common)->bbcount = inSettings->mBounceBackBurst; +#if HAVE_DECL_TCP_WINDOW_CLAMP + (*common)->ClampSize = inSettings->mClampSize; +#endif +#if HAVE_DECL_TCP_NOTSENT_LOWAT + (*common)->WritePrefetch = inSettings->mWritePrefetch; +#endif +#ifdef HAVE_THREAD_DEBUG + thread_debug("Alloc common rpt/com/size %p/%p/%d", (void *) common, (void *)(*common), sizeof(struct ReportCommon)); +#endif +} + +static void free_common_copy (struct ReportCommon *common) { + assert(common != NULL); +#ifdef HAVE_THREAD_DEBUG + thread_debug("Free common=%p", (void *)common); +#endif + // Free deep copies + if (common->Host) + free(common->Host); + if (common->HideHost) + free(common->HideHost); + if (common->Localhost) + free(common->Localhost); + if (common->Ifrname) + free(common->Ifrname); + if (common->Ifrnametx) + free(common->Ifrnametx); + if (common->SSMMulticastStr) + free(common->SSMMulticastStr); + if (common->Congestion) + free(common->Congestion); + if (common->transferIDStr) + free(common->transferIDStr); + if (common->PermitKey) + free(common->PermitKey); + free(common); +} + +// This will set the transfer id and id string +// on the setting object. If the current id is zero +// this will get the next one. Otherwise it will use +// the value. +void setTransferID (struct thread_Settings *inSettings, int role_reversal) { + if (!inSettings->mTransferIDStr) { + if (!inSettings->mTransferID) { + Mutex_Lock(&transferid_mutex); + inSettings->mTransferID = ++transferid_counter; + Mutex_Unlock(&transferid_mutex); + } + int len = 0; + if (role_reversal) { +#ifdef HAVE_ROLE_REVERSAL_ID + if (isPermitKey(inSettings) && (inSettings->mPermitKey[0] != '\0')) { + len = snprintf(NULL, 0, "[%s(*%d)] ", \ + inSettings->mPermitKey, inSettings->mTransferID); + inSettings->mTransferIDStr = (char *) calloc(len + 1, sizeof(char)); + len = sprintf(inSettings->mTransferIDStr, "[%s(*%d)] ", \ + inSettings->mPermitKey, inSettings->mTransferID); + } else if (inSettings->mTransferID < 10) { + len = snprintf(NULL, 0, "[ *%d] ", inSettings->mTransferID); + inSettings->mTransferIDStr = (char *) calloc(len + 1, sizeof(char)); + len = sprintf(inSettings->mTransferIDStr, "[ *%d] ", inSettings->mTransferID); + } else { + len = snprintf(NULL, 0, "[*%d] ", inSettings->mTransferID); + inSettings->mTransferIDStr = (char *) calloc(len + 1, sizeof(char)); + len = sprintf(inSettings->mTransferIDStr, "[*%d] ", inSettings->mTransferID); + } +#endif + } else if (isPermitKey(inSettings) && (inSettings->mPermitKey[0] != '\0')) { + len = snprintf(NULL, 0, "[%s(%d)] ", \ + inSettings->mPermitKey, inSettings->mTransferID); + inSettings->mTransferIDStr = (char *) calloc(len + 1, sizeof(char)); + len = sprintf(inSettings->mTransferIDStr, "[%s(%d)] ", \ + inSettings->mPermitKey, inSettings->mTransferID); + } else { + len = snprintf(NULL, 0, "[%3d] ", inSettings->mTransferID); + inSettings->mTransferIDStr = (char *) calloc(len+1, sizeof(char)); + len = sprintf(inSettings->mTransferIDStr, "[%3d] ", inSettings->mTransferID); + } + } +} + +void SetFullDuplexHandlers (struct thread_Settings *inSettings, struct SumReport* sumreport) { + if (isUDP(inSettings)) { + sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_fullduplex_udp; + sumreport->info.output_handler = ((inSettings->mReportMode == kReport_CSV) ? NULL : \ + (isSumOnly(inSettings) ? NULL : \ + (isEnhanced(inSettings) ? udp_output_fullduplex_enhanced : udp_output_fullduplex))); + } else { + sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_fullduplex_tcp; + sumreport->info.output_handler = ((inSettings->mReportMode == kReport_CSV) ? NULL : \ + (isSumOnly(inSettings) ? NULL : \ + (isEnhanced(inSettings) ? tcp_output_fullduplex_enhanced : tcp_output_fullduplex))); + } +} + +void SetSumHandlers (struct thread_Settings *inSettings, struct SumReport* sumreport) { + switch (inSettings->mThreadMode) { + case kMode_Server : + if (isUDP(inSettings)) { + sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_sum_server_udp; + if (inSettings->mReportMode == kReport_CSV) { + if (isEnhanced(inSettings)) + sumreport->info.output_handler = udp_output_enhanced_csv; + else + sumreport->info.output_handler = udp_output_basic_csv; + } else { + if (isTripTime(inSettings)) { + sumreport->info.output_handler = udp_output_sumcnt_read_triptime; + } else if (isSumOnly(inSettings)) { + sumreport->info.output_handler = udp_output_sumcnt_enhanced; + } else if (isFullDuplex(inSettings)) { + sumreport->info.output_handler = udp_output_fullduplex_sum; + } else { + sumreport->info.output_handler = (isEnhanced(inSettings) ? udp_output_sum_read_enhanced : udp_output_sum_read); + } + } + } else { + sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_sum_server_tcp; + if (inSettings->mReportMode == kReport_CSV) { + if (isEnhanced(inSettings)) + sumreport->info.output_handler = tcp_output_read_enhanced_csv; + else + sumreport->info.output_handler = tcp_output_basic_csv; + } else { + if (isTripTime(inSettings)) { + sumreport->info.output_handler = tcp_output_sumcnt_read_triptime; + } else if (isSumOnly(inSettings)) { + sumreport->info.output_handler = (isEnhanced(inSettings) ? tcp_output_sumcnt_read_enhanced : tcp_output_sumcnt_read); + } else if (isFullDuplex(inSettings)) { + sumreport->info.output_handler = tcp_output_sum_read; + } else { + sumreport->info.output_handler = (isEnhanced(inSettings) ? tcp_output_sum_read_enhanced : tcp_output_sum_read); + } + } + } + break; + case kMode_Client : + if (isUDP(inSettings)) { + sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_sum_client_udp; + if (inSettings->mReportMode == kReport_CSV) { + if (isEnhanced(inSettings)) + sumreport->info.output_handler = udp_output_enhanced_csv; + else + sumreport->info.output_handler = udp_output_basic_csv; + } else { + if (isSumOnly(inSettings)) { + sumreport->info.output_handler = ((isEnhanced(inSettings) && !isFullDuplex(inSettings)) ? \ + udp_output_sumcnt_write_enhanced : udp_output_sumcnt); + } else if (isFullDuplex(inSettings)) { + sumreport->info.output_handler = udp_output_fullduplex_sum; + } else { + sumreport->info.output_handler = (isEnhanced(inSettings) ? udp_output_sum_write_enhanced : udp_output_sum_write); + } + } + } else { + sumreport->transfer_protocol_sum_handler = reporter_transfer_protocol_sum_client_tcp; + if (inSettings->mReportMode == kReport_CSV) { + if (isEnhanced(inSettings)) + sumreport->info.output_handler = tcp_output_write_enhanced_csv; + else + sumreport->info.output_handler = tcp_output_basic_csv; + } else { + if (isSumOnly(inSettings)) { + sumreport->info.output_handler = (isEnhanced(inSettings) ? tcp_output_sumcnt_write_enhanced : tcp_output_sumcnt_write); + } else if (isFullDuplex(inSettings)) { + sumreport->info.output_handler = tcp_output_fullduplex_sum; + } else { + sumreport->info.output_handler = (isEnhanced(inSettings) ? tcp_output_sum_write_enhanced : tcp_output_sum_write); + } + } + } + break; + default: + FAIL(1, "SetSumReport", inSettings); + } +} + +struct SumReport* InitSumReport(struct thread_Settings *inSettings, int inID, int fullduplex_report) { + struct SumReport *sumreport = (struct SumReport *) calloc(1, sizeof(struct SumReport)); + if (sumreport == NULL) { + FAIL(1, "Out of Memory!!\n", inSettings); + } + sumreport->reference.count = 0; + sumreport->reference.maxcount = 0; + Mutex_Initialize(&sumreport->reference.lock); + sumreport->threads = 0; + common_copy(&sumreport->info.common, inSettings); + // sumreport->info.common->transferID = inID; // this is now set in the active code + sumreport->info.threadcnt = 0; + sumreport->info.isMaskOutput = false; + if (inSettings->mReportMode == kReport_CSV) { + format_ips_port_string(&sumreport->info, 1); + } + + // Only initialize the interval time here + // The startTime and nextTime for summing reports will be set by + // the reporter thread in realtime + if ((inSettings->mInterval) && (inSettings->mIntervalMode == kInterval_Time)) { + sumreport->info.ts.intervalTime.tv_sec = (long) (inSettings->mInterval / rMillion); + sumreport->info.ts.intervalTime.tv_usec = (long) (inSettings->mInterval % rMillion); + sumreport->info.ts.significant_partial = ((double) inSettings->mInterval * PARTIALPERCENT / rMillion) ; + } + // Note that for UDP the client flag settings have not been read (and set) so only use server side flags in tests + if (isEnhanced(inSettings) && (inSettings->mThreadMode == kMode_Server) && !fullduplex_report) { + if (isHistogram(inSettings)) { + if (isUDP(inSettings)) { + char name[] = "SUMT8"; + sumreport->info.latency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0,\ + pow(10,inSettings->mHistUnits), \ + inSettings->mHistci_lower, inSettings->mHistci_upper, sumreport->info.common->transferID, name); + } else { + char name[] = "SUMF8"; + sumreport->info.framelatency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0, \ + pow(10,inSettings->mHistUnits), inSettings->mHistci_lower, \ + inSettings->mHistci_upper, sumreport->info.common->transferID, name); + } + } + if (isJitterHistogram(inSettings) && isUDP(inSettings)) { + char name[] = "SUMJ8"; + sumreport->info.jitter_histogram = histogram_init(JITTER_BINCNT,inSettings->jitter_binwidth,0,JITTER_UNITS, \ + JITTER_LCI, JITTER_UCI, sumreport->info.common->transferID, name); + } + } + if (fullduplex_report) { + SetFullDuplexHandlers(inSettings, sumreport); + if (!isServerReverse(inSettings)) { + sumreport->fullduplex_barrier.count = 0; + Condition_Initialize(&sumreport->fullduplex_barrier.await); + sumreport->fullduplex_barrier.timeout = ((isModeTime(inSettings) && isUDP(inSettings)) ? \ + ((int)(inSettings->mAmount / 100) + 1) : MINBARRIERTIMEOUT); + if (sumreport->fullduplex_barrier.timeout < MINBARRIERTIMEOUT) + sumreport->fullduplex_barrier.timeout = MINBARRIERTIMEOUT; + } else { + sumreport->info.ts.startTime = inSettings->accept_time; + sumreport->info.ts.nextTime = sumreport->info.ts.startTime; + TimeAdd(sumreport->info.ts.nextTime, sumreport->info.ts.intervalTime); + } + } else { + SetSumHandlers(inSettings, sumreport); + } +#ifdef HAVE_THREAD_DEBUG + thread_debug("Init sum report %p id=%d", (void *)sumreport, inID); +#endif + return sumreport; +} + +struct ConnectionInfo * InitConnectOnlyReport (struct thread_Settings *thread) { + assert(thread != NULL); + // this connection report used only by report for accumulate stats + struct ConnectionInfo *creport = (struct ConnectionInfo *) calloc(1, sizeof(struct ConnectionInfo)); + if (!creport) { + FAIL(1, "Out of Memory!!\n", thread); + } + common_copy(&creport->common, thread); + creport->connect_times.min = FLT_MAX; + creport->connect_times.max = FLT_MIN; + creport->connect_times.vd = 0; + creport->connect_times.m2 = 0; + creport->connect_times.mean = 0; + creport->txholdbacktime = thread->txholdback_timer; + return creport; +} + +void FreeSumReport (struct SumReport *sumreport) { + assert(sumreport); +#ifdef HAVE_THREAD_DEBUG + thread_debug("Free sum report hdr=%p", (void *)sumreport); +#endif + Condition_Destroy_Reference(&sumreport->reference); + if (sumreport->info.latency_histogram) { + histogram_delete(sumreport->info.latency_histogram); + } + if (sumreport->info.framelatency_histogram) { + histogram_delete(sumreport->info.framelatency_histogram); + } + if (sumreport->info.bbrtt_histogram) { + histogram_delete(sumreport->info.bbrtt_histogram); + } + if (sumreport->info.jitter_histogram) { + histogram_delete(sumreport->info.jitter_histogram); + } + free_common_copy(sumreport->info.common); + free(sumreport); +} + + +static void Free_iReport (struct ReporterData *ireport) { + assert(ireport != NULL); + +#ifdef HAVE_THREAD_DEBUG + thread_debug("Free report hdr=%p reporter thread suspend count=%d packetring=%p histo=%p frame histo=%p", \ + (void *)ireport, ireport->reporter_thread_suspends, (void *) ireport->packetring, \ + (void *)ireport->info.latency_histogram, (void *) ireport->info.framelatency_histogram); +#endif + if (ireport->packetring && ireport->info.total.Bytes.current && !(isSingleUDP(ireport->info.common)) && \ + !TimeZero(ireport->info.ts.intervalTime) && (ireport->reporter_thread_suspends < 3)) { + fprintf(stdout, "WARN: this test may have been CPU bound (%d) (or may not be detecting the underlying network devices)\n", \ + ireport->reporter_thread_suspends); + } + if (ireport->packetring) { + packetring_free(ireport->packetring); + } + if (ireport->info.latency_histogram) { + histogram_delete(ireport->info.latency_histogram); + } + if (ireport->info.jitter_histogram) { + histogram_delete(ireport->info.jitter_histogram); + } + if (ireport->info.framelatency_histogram) { + histogram_delete(ireport->info.framelatency_histogram); + } + if (ireport->info.bbrtt_histogram) { + histogram_delete(ireport->info.bbrtt_histogram); + } + free_common_copy(ireport->info.common); + free(ireport); +} + +void FreeConnectionReport (struct ConnectionInfo *report) { + free_common_copy(report->common); + free(report); +} + +static void Free_sReport (struct ReportSettings *report) { + free_common_copy(report->common); + free(report); +} + +static void Free_srReport (struct TransferInfo *report) { + free_common_copy(report->common); + free(report); +} + +void FreeReport (struct ReportHeader *reporthdr) { + assert(reporthdr != NULL); +#ifdef HAVE_THREAD_DEBUG + char rs[REPORTTXTMAX]; + reporttype_text(reporthdr, &rs[0]); + thread_debug("Jobq *FREE* report hdr/rpt %p/%p (%s)", (void *) reporthdr, (void *) reporthdr->this_report, &rs[0]); +#endif + switch (reporthdr->type) { + case DATA_REPORT: + Free_iReport((struct ReporterData *)reporthdr->this_report); + break; + case CONNECTION_REPORT: + FreeConnectionReport((struct ConnectionInfo *)reporthdr->this_report); + break; + case SETTINGS_REPORT: + Free_sReport((struct ReportSettings *)reporthdr->this_report); + break; + case SERVER_RELAY_REPORT: + Free_srReport((struct TransferInfo *)reporthdr->this_report); + break; + default: + fprintf(stderr, "Invalid report type in free (%x)\n", reporthdr->type); + assert(0); + break; + } + free(reporthdr); +} + +/* + * InitReport is called by a transfer agent (client or + * server) to setup the needed structures to communicate + * traffic and connection information. Also initialize + * the report start time and next interval report time + * Finally, in the case of parallel clients, have them all + * synchronize on compeleting their connect() + */ + +void IncrSumReportRefCounter (struct SumReport *sumreport) { + assert(sumreport); +#ifdef HAVE_THREAD_DEBUG + thread_debug("Sum multiheader %p ref=%d->%d", (void *)sumreport, sumreport->reference.count, (sumreport->reference.count + 1)); +#endif + Mutex_Lock(&sumreport->reference.lock); + sumreport->reference.count++; + if (sumreport->reference.count > sumreport->reference.maxcount) + sumreport->reference.maxcount = sumreport->reference.count; + Mutex_Unlock(&sumreport->reference.lock); +} + +int DecrSumReportRefCounter (struct SumReport *sumreport) { + assert(sumreport); +// thread_debug("before lock hdr=%p", (void *)sumreport); + Mutex_Lock(&sumreport->reference.lock); +#ifdef HAVE_THREAD_DEBUG + thread_debug("Sum multiheader %p ref=%d->%d", (void *)sumreport, sumreport->reference.count, (sumreport->reference.count - 1)); +#endif +// thread_debug("in lock hdr=%p", (void *)sumreport); + int refcnt = --sumreport->reference.count; + Mutex_Unlock(&sumreport->reference.lock); +// thread_debug("unlock hdr=%p", (void *)sumreport); + return refcnt; +} + + +// Note, this report structure needs to remain self contained and not coupled +// to any settings structure pointers. This allows the thread settings to +// be freed without impacting the reporter. It's not recommended that +// this be done, i.e. free the settings before the report, but be defensive +// here to allow it +struct ReportHeader* InitIndividualReport (struct thread_Settings *inSettings) { + /* + * Create the report header and an ireport (if needed) + */ + assert(inSettings!=NULL); + struct ReportHeader *reporthdr = (struct ReportHeader *) calloc(1, sizeof(struct ReportHeader)); + if (reporthdr == NULL) { + FAIL(1, "Out of Memory!!\n", inSettings); + } + reporthdr->this_report = calloc(1, sizeof(struct ReporterData)); + if (reporthdr->this_report == NULL) { + FAIL(1, "Out of Memory!!\n", inSettings); + } + reporthdr->type = DATA_REPORT; + reporthdr->ReportMode = inSettings->mReportMode; + + struct ReporterData *ireport = (struct ReporterData *)(reporthdr->this_report); + if (inSettings->mSumReport) { + ireport->GroupSumReport = inSettings->mSumReport; + } + if (isFullDuplex(inSettings)) { + assert(inSettings->mFullDuplexReport != NULL); + IncrSumReportRefCounter(inSettings->mFullDuplexReport); + ireport->FullDuplexReport = inSettings->mFullDuplexReport; + } + // Copy common settings into the transfer report section + common_copy(&ireport->info.common, inSettings); + ireport->info.final = false; + ireport->info.burstid_transition = false; + ireport->info.isEnableTcpInfo = false; + + // Create a new packet ring which is used to communicate + // packet stats from the traffic thread to the reporter + // thread. The reporter thread does all packet accounting + ireport->packetring = packetring_init((inSettings->numreportstructs ? inSettings->numreportstructs : (isSingleUDP(inSettings) ? 40 : NUM_REPORT_STRUCTS)), \ + &ReportCond, (isSingleUDP(inSettings) ? NULL : &inSettings->awake_me)); +#ifdef HAVE_THREAD_DEBUG + char rs[REPORTTXTMAX]; + reporttype_text(reporthdr, &rs[0]); + thread_debug("Init %s report hdr/rpt/com=%p/%p/%p multireport/fullduplex=%p/%p pring(bytes)/cond=%p(%d)/%p (socket=%d)", &rs[0], \ + (void *) reporthdr, (void *) ireport, (void *) ireport->info.common, \ + (void *) inSettings->mSumReport, (void *) inSettings->mFullDuplexReport, \ + (void *) ireport->packetring, ireport->packetring->bytes, (void *) ireport->packetring->awake_producer, inSettings->mSock); +#endif + if (inSettings->numreportstructs) + fprintf (stdout, "%sNUM_REPORT_STRUCTS override from %d to %d\n", inSettings->mTransferIDStr, NUM_REPORT_STRUCTS, inSettings->numreportstructs); + + // Set up the function vectors, there are three + // 1) packet_handler: does packet accounting per the test and protocol + // 2) transfer_protocol_handler: performs output, e.g. interval reports, per the test and protocol + + if (inSettings->mIntervalMode == kInterval_Time) { + ireport->info.ts.intervalTime.tv_sec = (long) (inSettings->mInterval / rMillion); + ireport->info.ts.intervalTime.tv_usec = (long) (inSettings->mInterval % rMillion); + ireport->transfer_interval_handler = reporter_condprint_time_interval_report; + ireport->info.ts.significant_partial = (double) inSettings->mInterval * PARTIALPERCENT / rMillion ; + } else { + ireport->transfer_interval_handler = NULL; + } + ireport->packet_handler_pre_report = NULL; + ireport->packet_handler_post_report = NULL; + switch (inSettings->mThreadMode) { + case kMode_Server : + if (isUDP(inSettings)) { + ireport->packet_handler_post_report = reporter_handle_packet_server_udp; + ireport->transfer_protocol_handler = reporter_transfer_protocol_server_udp; + if ((inSettings->mIntervalMode == kInterval_Frames) && isIsochronous(inSettings)) { + ireport->transfer_interval_handler = reporter_condprint_frame_interval_report_server_udp; + ireport->transfer_protocol_handler = reporter_transfer_protocol_server_udp; + } else { + ireport->transfer_protocol_handler = reporter_transfer_protocol_server_udp; + if (isSumOnly(inSettings)) { + ireport->info.output_handler = NULL; + } else if ((inSettings->mReportMode == kReport_CSV) && !isSumOnly(inSettings)) { + if (isEnhanced(inSettings)) + ireport->info.output_handler = udp_output_enhanced_csv; + else + ireport->info.output_handler = udp_output_basic_csv; + } else if (isTripTime(inSettings)) { + if (isIsochronous(inSettings)) + ireport->info.output_handler = udp_output_read_triptime_isoch; + else + ireport->info.output_handler = udp_output_read_triptime; + } else if (isEnhanced(inSettings)) { + ireport->info.output_handler = udp_output_read_enhanced; + } else if (isFullDuplex(inSettings)) { + ireport->info.output_handler = udp_output_read; + } else { + ireport->info.output_handler = udp_output_read; + } + } + } else { // TCP case + ireport->packet_handler_post_report = reporter_handle_packet_server_tcp; + ireport->transfer_protocol_handler = reporter_transfer_protocol_server_tcp; + if (isPeriodicBurst(inSettings)) { + ireport->transfer_interval_handler = reporter_condprint_burst_interval_report_server_tcp; + ireport->info.output_handler = tcp_output_burst_read; + ireport->packet_handler_pre_report = reporter_handle_packet_server_tcp; + ireport->packet_handler_post_report = NULL; + } else if ((inSettings->mIntervalMode == kInterval_Frames) && isIsochronous(inSettings)) { + ireport->transfer_interval_handler = reporter_condprint_frame_interval_report_server_tcp; + ireport->info.output_handler = tcp_output_frame_read_triptime; + ireport->packet_handler_pre_report = reporter_handle_packet_server_tcp; + ireport->packet_handler_post_report = NULL; + } else if (isSumOnly(inSettings)) { + ireport->info.output_handler = NULL; + } else if ((inSettings->mReportMode == kReport_CSV) && !isSumOnly(inSettings)) { + if (isEnhanced(inSettings)) + ireport->info.output_handler = tcp_output_read_enhanced_csv; + else + ireport->info.output_handler = tcp_output_basic_csv; + } else if (isBounceBack(inSettings)) { + ireport->packet_handler_post_report = reporter_handle_packet_bb_server; + ireport->transfer_protocol_handler = reporter_transfer_protocol_server_bb_tcp; + ireport->info.output_handler = tcp_output_write; + } else if (isTripTime(inSettings) && isIsochronous(inSettings)) { + ireport->info.output_handler = tcp_output_read_enhanced_isoch; + } else if (isTripTime(inSettings)) { + ireport->info.output_handler = tcp_output_read_triptime; + } else if (isEnhanced(inSettings)) { + ireport->info.output_handler = tcp_output_read_enhanced; + } else if (!isFullDuplex(inSettings)) { + ireport->info.output_handler = tcp_output_read; + } else { + ireport->info.output_handler = tcp_output_read; + } + } + break; + case kMode_Client : + ireport->packet_handler_post_report = reporter_handle_packet_client; + if (isUDP(inSettings)) { + ireport->transfer_protocol_handler = reporter_transfer_protocol_client_udp; + if (isSumOnly(inSettings)) { + ireport->info.output_handler = NULL; + } else if ((inSettings->mReportMode == kReport_CSV) && !isSumOnly(inSettings)) { + if (isEnhanced(inSettings)) + ireport->info.output_handler = udp_output_enhanced_csv; + else + ireport->info.output_handler = udp_output_basic_csv; + } else if (isIsochronous(inSettings)) { + ireport->info.output_handler = udp_output_write_enhanced_isoch; + } else if (isEnhanced(inSettings)) { + ireport->info.output_handler = udp_output_write_enhanced; + } else if (isFullDuplex(inSettings)) { + ireport->info.output_handler = udp_output_write; + } else { + ireport->info.output_handler = udp_output_write; + } + } else { + ireport->transfer_protocol_handler = reporter_transfer_protocol_client_tcp; + if (isSumOnly(inSettings)) { + ireport->info.output_handler = NULL; + } else if ((inSettings->mReportMode == kReport_CSV) && !isSumOnly(inSettings)) { + if (isEnhanced(inSettings)) + ireport->info.output_handler = tcp_output_write_enhanced_csv; + else + ireport->info.output_handler = tcp_output_basic_csv; + } else if (isBounceBack(inSettings)) { + ireport->packet_handler_post_report = reporter_handle_packet_bb_client; + ireport->transfer_protocol_handler = reporter_transfer_protocol_client_bb_tcp; + ireport->info.output_handler = tcp_output_write_bb; + } else if (isIsochronous(inSettings)) { + ireport->info.output_handler = tcp_output_write_enhanced_isoch; + } else if (isTcpWriteTimes(inSettings)) { + ireport->info.output_handler = tcp_output_write_enhanced_write; + } else if (isEnhanced(inSettings)) { + ireport->info.output_handler = tcp_output_write_enhanced; + } else if (isFullDuplex(inSettings)) { + ireport->info.output_handler = tcp_output_write; + } else { + ireport->info.output_handler = tcp_output_write; + } + } + break; + case kMode_Unknown : + case kMode_Reporter : + case kMode_ReporterClient : + case kMode_Listener: + default: + FAIL(1, "InitIndividualReport\n", inSettings); + } + + if (inSettings->mThreadMode == kMode_Server) { + ireport->info.sock_callstats.read.binsize = inSettings->mBufLen / 8; + if (isUDP(inSettings)) { + if (isJitterHistogram(inSettings)) { + char name[] = "J8"; + ireport->info.jitter_histogram = histogram_init(JITTER_BINCNT,inSettings->jitter_binwidth,0,JITTER_UNITS, \ + JITTER_LCI, JITTER_UCI, ireport->info.common->transferID, name); + } + if (isTripTime(inSettings) && isHistogram(inSettings)) { + char name[] = "T8"; + ireport->info.latency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0,\ + pow(10,inSettings->mHistUnits), \ + inSettings->mHistci_lower, inSettings->mHistci_upper, ireport->info.common->transferID, name); + } + } + if (isHistogram(inSettings) && (isIsochronous(inSettings) || (!isUDP(inSettings) && isTripTime(inSettings)))) { + char name[] = "F8"; + // make sure frame bin size min is 100 microsecond + ireport->info.framelatency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0, \ + pow(10,inSettings->mHistUnits), inSettings->mHistci_lower, \ + inSettings->mHistci_upper, ireport->info.common->transferID, name); + } + } + if ((inSettings->mThreadMode == kMode_Client) && !isUDP(inSettings) && isHistogram(inSettings)) { + if (isTcpWriteTimes(inSettings)) { + char name[] = "W8"; + ireport->info.write_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0,\ + pow(10,inSettings->mHistUnits), \ + inSettings->mHistci_lower, inSettings->mHistci_upper, ireport->info.common->transferID, name); + } else if (isWritePrefetch(inSettings)) { + char name[] = "S8"; + ireport->info.latency_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0,\ + pow(10,inSettings->mHistUnits), \ + inSettings->mHistci_lower, inSettings->mHistci_upper, ireport->info.common->transferID, name); + } + } + if ((inSettings->mThreadMode == kMode_Client) && isBounceBack(inSettings)) { + char name[] = "BB8"; + if (!isHistogram(inSettings)) { + inSettings->mHistBins = 100000; // 10 seconds wide + inSettings->mHistBinsize = 100; // 100 usec bins + inSettings->mHistUnits = 6; // usecs 10 pow(x) + inSettings->mHistci_lower = 5; + inSettings->mHistci_upper = 95; + } + ireport->info.bbrtt_histogram = histogram_init(inSettings->mHistBins,inSettings->mHistBinsize,0, \ + pow(10,inSettings->mHistUnits), \ + inSettings->mHistci_lower, inSettings->mHistci_upper, ireport->info.common->transferID, name); + } + return reporthdr; +} + + +/* + * This init/update and print/finish (in the ReportDefault.c) + * is poor. It has to be done this way to preserve the + * interface to older versions where the reporter settings + * were delayed until a Transfer report came through. + * This transfer report has all the reports bound to it. + * + * The better implmementation is to treat all reports + * as independent objects that can be updated, processed, + * and output independlty per the Reporter threads job queue + * without shared state or copied state variables between these + * reports. The shared state, is really reporter state, that + * should be maintained in and by the reporter object/thread. + * + * For now, just fix it good enough. Later, write a c++ + * reporter object and use standard c++ design techniques + * to achieve this. Such code will be easier to maintain + * and to extend. + */ +struct ReportHeader* InitConnectionReport (struct thread_Settings *inSettings) { + assert(inSettings != NULL); + struct ReportHeader *reporthdr = (struct ReportHeader *) calloc(1, sizeof(struct ReportHeader)); + if (reporthdr == NULL) { + FAIL(1, "Out of Memory!!\n", inSettings); + } + reporthdr->this_report = calloc(1, sizeof(struct ConnectionInfo)); + if (reporthdr->this_report == NULL) { + FAIL(1, "Out of Memory!!\n", inSettings); + } + reporthdr->type = CONNECTION_REPORT; + reporthdr->ReportMode = inSettings->mReportMode; + + struct ConnectionInfo * creport = (struct ConnectionInfo *)(reporthdr->this_report); + common_copy(&creport->common, inSettings); + tcpstats_copy(&creport->tcpinitstats, &inSettings->tcpinitstats); + // Fill out known fields for the connection report + reporter_peerversion(creport, inSettings->peer_version_u, inSettings->peer_version_l); + if (isEnhanced(inSettings) && isTxStartTime(inSettings)) { + creport->epochStartTime.tv_sec = inSettings->txstart_epoch.tv_sec; + creport->epochStartTime.tv_usec = inSettings->txstart_epoch.tv_usec; + } else if (isTripTime(inSettings)) { + creport->epochStartTime.tv_sec = inSettings->accept_time.tv_sec; + creport->epochStartTime.tv_usec = inSettings->accept_time.tv_usec; + } + // Copy state from the settings object into the connection report + creport->connect_times.min = FLT_MAX; + creport->connect_times.max = FLT_MIN; + creport->connect_times.vd = 0; + creport->connect_times.m2 = 0; + creport->connect_times.mean = 0; + if (inSettings->mSock > 0) { + creport->winsize = getsock_tcp_windowsize(inSettings->mSock, \ + (inSettings->mThreadMode != kMode_Client ? 0 : 1) ); +#if HAVE_DECL_TCP_WINDOW_CLAMP + if (isRxClamp(inSettings)) { + getsock_tcp_windowclamp(inSettings->mSock); + } +#endif + } + creport->common->winsize_requested = inSettings->mTCPWin; + creport->txholdbacktime = inSettings->txholdback_timer; + if (isPeriodicBurst(inSettings)) { + creport->common->FPS = inSettings->mFPS; + } +#ifdef HAVE_THREAD_DEBUG + char rs[REPORTTXTMAX]; + reporttype_text(reporthdr, &rs[0]); + thread_debug("Init %s report hdr/rpt/com %p/%p/%p", &rs[0], \ + (void *) reporthdr, (void *) reporthdr->this_report, (void *) creport->common); +#endif + return reporthdr; +} + +/* + * ReportSettings will generate a summary report for + * settings being used with Listeners or Clients + */ +struct ReportHeader *InitSettingsReport (struct thread_Settings *inSettings) { + assert(inSettings != NULL); + struct ReportHeader *reporthdr = (struct ReportHeader *) calloc(1, sizeof(struct ReportHeader)); + if (reporthdr == NULL) { + FAIL(1, "Out of Memory!!\n", inSettings); + } + reporthdr->this_report = calloc(1, sizeof(struct ReportSettings)); + if (reporthdr->this_report == NULL) { + FAIL(1, "Out of Memory!!\n", inSettings); + } + reporthdr->type = SETTINGS_REPORT; + reporthdr->ReportMode = inSettings->mReportMode; + + struct ReportSettings *sreport = (struct ReportSettings *)reporthdr->this_report; + common_copy(&sreport->common, inSettings); + sreport->peer = inSettings->peer; + sreport->size_peer = inSettings->size_peer; + sreport->local = inSettings->local; + sreport->size_local = inSettings->size_local; + sreport->isochstats.mFPS = inSettings->mFPS; + sreport->isochstats.mMean = inSettings->mMean/8; + sreport->isochstats.mVariance = inSettings->mVariance/8; + sreport->isochstats.mBurstIPG = (unsigned int) (inSettings->mBurstIPG*1000.0); + sreport->isochstats.mBurstInterval = (unsigned int) (1 / inSettings->mFPS * 1000000); + if (!isUDP(inSettings)) { + if (inSettings->mMSS > 0) { + sreport->sockmaxseg = inSettings->mMSS; + } else if (isPrintMSS(inSettings) && !(inSettings->mMSS > 0)) { + sreport->sockmaxseg = getsock_tcp_mss(inSettings->mSock); + } + } +#ifdef HAVE_THREAD_DEBUG + char rs[REPORTTXTMAX]; + reporttype_text(reporthdr, &rs[0]); + thread_debug("Init %s report hdr/rpt/com %p/%p/%p", &rs[0], \ + (void *) reporthdr, (void *) reporthdr->this_report, (void *) sreport->common); +#endif + return reporthdr; +} + +/* + * This will generate a report of the UDP + * statistics as reported by the server on the client + * side. + */ +struct ReportHeader* InitServerRelayUDPReport(struct thread_Settings *inSettings, struct server_hdr *server) { + /* + * Create the report header and an ireport (if needed) + */ + struct ReportHeader *reporthdr = (struct ReportHeader *) calloc(1, sizeof(struct ReportHeader)); + if (reporthdr == NULL) { + FAIL(1, "Out of Memory!!\n", inSettings); + } + reporthdr->this_report = calloc(1, sizeof(struct ServerRelay)); + if (!reporthdr->this_report) { + FAIL(1, "Out of Memory!!\n", inSettings); + } +#ifdef HAVE_THREAD_DEBUG + thread_debug("Init server relay report %p size %ld", (void *)reporthdr, sizeof(struct ReportHeader) + sizeof(struct ServerRelay)); +#endif + reporthdr->type = SERVER_RELAY_REPORT; + reporthdr->ReportMode = inSettings->mReportMode; + struct ServerRelay *sr_report = (struct ServerRelay *)reporthdr->this_report; + common_copy(&sr_report->info.common, inSettings); + struct TransferInfo *stats = &sr_report->info; + stats->common->transferID = inSettings->mTransferID; + + stats->jitter = ntohl(server->base.jitter1); + stats->jitter += ntohl(server->base.jitter2) / (double)rMillion; +#ifdef HAVE_INT64_T + stats->cntBytes = (((intmax_t) ntohl(server->base.total_len1)) << 32) + \ + ntohl(server->base.total_len2); +#else + stats->cntBytes = (intmax_t) ntohl(server->base.total_len2); +#endif + stats->ts.iStart = 0; + stats->ts.iEnd = ntohl(server->base.stop_sec); + stats->ts.iEnd += ntohl(server->base.stop_usec) / (double)rMillion; + uint32_t flags = ntohl(server->base.flags); + if ((flags & HEADER_SEQNO64B)) { + stats->cntError = (((intmax_t) ntohl(server->extend2.error_cnt2)) << 32) + \ + ntohl(server->base.error_cnt); + stats->cntOutofOrder = (((intmax_t) ntohl(server->extend2.outorder_cnt2)) << 32) + \ + ntohl(server->base.outorder_cnt); + stats->cntDatagrams = (((intmax_t) ntohl(server->extend2.datagrams2)) << 32) + \ + ntohl(server->base.datagrams); + } else { + stats->cntError = ntohl(server->base.error_cnt); + stats->cntOutofOrder = ntohl(server->base.outorder_cnt); + stats->cntDatagrams = ntohl(server->base.datagrams); + } + if ((flags & SERVER_HEADER_EXTEND) != 0) { + setEnhanced(stats->common); + stats->transit.current.min = ntohl(server->extend.minTransit1); + stats->transit.current.min += ntohl(server->extend.minTransit2) / (double)rMillion; + stats->transit.current.max = ntohl(server->extend.maxTransit1); + stats->transit.current.max += ntohl(server->extend.maxTransit2) / (double)rMillion; + stats->transit.current.sum = ntohl(server->extend.sumTransit1); + stats->transit.current.sum += ntohl(server->extend.sumTransit2) / (double)rMillion; + stats->transit.current.mean = ntohl(server->extend.meanTransit1); + stats->transit.current.mean += ntohl(server->extend.meanTransit2) / (double)rMillion; + stats->transit.current.m2 = ntohl(server->extend.m2Transit1); + stats->transit.current.m2 += ntohl(server->extend.m2Transit2) / (double)rMillion; + stats->transit.current.m2 *= 1e-12; + stats->transit.current.vd = ntohl(server->extend.vdTransit1); + stats->transit.current.vd += ntohl(server->extend.vdTransit2) / (double)rMillion; + stats->transit.current.cnt = ntohl(server->extend.cntTransit); + stats->cntIPG = ntohl(server->extend.cntIPG); + stats->IPGsum = ntohl(server->extend.IPGsum); + } else { + unsetEnhanced(stats->common); + } + sr_report->peer = inSettings->local; + sr_report->size_peer = inSettings->size_local; + sr_report->local = inSettings->peer; + sr_report->size_local = inSettings->size_peer; + return reporthdr; +} + +/* ------------------------------------------------------------------- + * Send an AckFIN (a datagram acknowledging a FIN) on the socket, + * then select on the socket for some time to check for silence. + * If additional datagrams come in (not silent), probably our AckFIN + * was lost so the client has re-transmitted + * termination datagrams, so re-transmit our AckFIN. + * Sent by server to client + * ------------------------------------------------------------------- */ +void write_UDP_AckFIN (struct TransferInfo *stats, int len) { + assert(stats!= NULL); + int ackpacket_length = (int) (sizeof(struct UDP_datagram) + sizeof(struct server_hdr)); + int readlen = ((ackpacket_length * 2) > len * 2) ? (ackpacket_length * 2) : (len * 2); + char *ackPacket = (char *) calloc(1, readlen); + int success = 0; + assert(ackPacket); + fd_set readSet; + int rc = 1; + struct timeval timeout; + + if (ackPacket) { + struct UDP_datagram *UDP_Hdr = (struct UDP_datagram *)ackPacket; + struct server_hdr *hdr = (struct server_hdr *)(UDP_Hdr+1); + + UDP_Hdr = (struct UDP_datagram*) ackPacket; + int flags = HEADER_VERSION1; + if (isEnhanced(stats->common) || isTripTime(stats->common)) + flags |= SERVER_HEADER_EXTEND; +#ifdef HAVE_INT64_T + flags |= HEADER_SEQNO64B; +#endif + hdr->base.flags = htonl((long) flags); +#ifdef HAVE_INT64_T + hdr->base.total_len1 = htonl((long) (stats->cntBytes >> 32)); +#else + hdr->base.total_len1 = htonl(0x0); +#endif + hdr->base.total_len2 = htonl((long) (stats->cntBytes & 0xFFFFFFFF)); + hdr->base.stop_sec = htonl( (long) stats->ts.iEnd); + hdr->base.stop_usec = htonl( (long)((stats->ts.iEnd - (long)stats->ts.iEnd) * rMillion)); + hdr->base.error_cnt = htonl((long) (stats->cntError & 0xFFFFFFFF)); + hdr->base.outorder_cnt = htonl((long) (stats->cntOutofOrder & 0xFFFFFFFF)); + hdr->base.datagrams = htonl((long) (stats->cntDatagrams & 0xFFFFFFFF)); + if (flags & HEADER_SEQNO64B) { + hdr->extend2.error_cnt2 = htonl((long) (stats->cntError >> 32)); + hdr->extend2.outorder_cnt2 = htonl((long) (stats->cntOutofOrder >> 32) ); + hdr->extend2.datagrams2 = htonl((long) (stats->cntDatagrams >> 32)); + } + // printf("****** Server final estimator %f calculated average %f\n", stats->jitter, (stats->inline_jitter.total.sum / stats->inline_jitter.total.cnt)); + if (stats->inline_jitter.total.cnt > 0) + stats->jitter = (stats->inline_jitter.total.sum / stats->inline_jitter.total.cnt); // overide the final estimator with an average + hdr->base.jitter1 = htonl((long) stats->jitter); + hdr->base.jitter2 = htonl((long) ((stats->jitter - (long)stats->jitter) * rMillion)); + + hdr->extend.minTransit1 = htonl((long) stats->transit.total.min); + hdr->extend.minTransit2 = htonl((long) ((stats->transit.total.min - (long)stats->transit.total.min) * rMillion)); + hdr->extend.maxTransit1 = htonl((long) stats->transit.total.max); + hdr->extend.maxTransit2 = htonl((long) ((stats->transit.total.max - (long)stats->transit.total.max) * rMillion)); + hdr->extend.sumTransit1 = htonl((long) stats->transit.total.sum); + hdr->extend.sumTransit2 = htonl((long) ((stats->transit.total.sum - (long)stats->transit.total.sum) * rMillion)); + hdr->extend.meanTransit1 = htonl((long) stats->transit.total.mean); + hdr->extend.meanTransit2 = htonl((long) ((stats->transit.total.mean - (long)stats->transit.total.mean) * rMillion)); + stats->transit.total.m2 *= 1e12; + hdr->extend.m2Transit1 = htonl((long) stats->transit.total.m2); + hdr->extend.m2Transit2 = htonl((long) ((stats->transit.total.m2 - (long)stats->transit.total.m2) * rMillion)); + hdr->extend.vdTransit1 = htonl((long) stats->transit.total.vd); + hdr->extend.vdTransit2 = htonl((long) ((stats->transit.total.vd - (long)stats->transit.total.vd) * rMillion)); + hdr->extend.cntTransit = htonl(stats->transit.total.cnt); + hdr->extend.cntIPG = htonl((long) (stats->cntDatagrams / (stats->ts.iEnd - stats->ts.iStart))); + hdr->extend.IPGsum = htonl(1); + +#define TRYCOUNT 10 + int count = TRYCOUNT; + while (--count) { + // write data +#if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET) + // If in l2mode, use the AF_INET socket to write this packet + // +#ifdef HAVE_THREAD_DEBUG + thread_debug("UDP server send done-ack w/server-stats to client (sock=%d)", stats->common->socket); +#endif + rc = write(((stats->common->socketdrop > 0) ? stats->common->socketdrop : stats->common->socket), ackPacket, ackpacket_length); +#else + rc = write(stats->common->socket, ackPacket, ackpacket_length); +#endif + WARN_errno(rc < 0, "write-ackfin"); + // wait here is for silence, no more packets from the client + + FD_ZERO(&readSet); + FD_SET(stats->common->socket, &readSet); + timeout.tv_sec = 0; + timeout.tv_usec = 250000; + rc = select(stats->common->socket+1, &readSet, NULL, NULL, &timeout); + if (rc == 0) { +#ifdef HAVE_THREAD_DEBUG + thread_debug("UDP server detected silence - server stats assumed received by client"); +#endif + success = 1; + break; + } + rc = read(stats->common->socket, ackPacket, readlen); + // WARN_errno(rc < 0, "ack await silence"); + if ((rc < 0) && FATALUDPREADERR(errno)) { + break; + } +#ifdef HAVE_THREAD_DEBUG + if (rc > 0) { + thread_debug("UDP server thinks server stats packet maybe lost, will retransmit and try again", rc); + } +#endif + } + free(ackPacket); + } + if (!success && (stats->common->ReportMode != kReport_CSV)) { + fprintf(stderr, warn_ack_failed, stats->common->socket); + } +} +// end write_UDP_AckFIN |