diff options
Diffstat (limited to '')
-rw-r--r-- | src/Reporter.c | 466 |
1 files changed, 328 insertions, 138 deletions
diff --git a/src/Reporter.c b/src/Reporter.c index bbd6fd5..e200a80 100644 --- a/src/Reporter.c +++ b/src/Reporter.c @@ -82,12 +82,12 @@ static void reporter_reset_transfer_stats_client_tcp(struct TransferInfo *stats) static void reporter_reset_transfer_stats_client_udp(struct TransferInfo *stats); static void reporter_reset_transfer_stats_server_udp(struct TransferInfo *stats); static void reporter_reset_transfer_stats_server_tcp(struct TransferInfo *stats); +static void reporter_reset_transfer_stats_sum(struct TransferInfo *stats); // code for welfornd's algorithm to produce running mean/min/max/var static void reporter_update_mmm (struct MeanMinMaxStats *stats, double value); static void reporter_reset_mmm (struct MeanMinMaxStats *stats); - // one way delay (OWD) calculations static void reporter_handle_packet_oneway_transit(struct TransferInfo *stats, struct ReportStruct *packet); static void reporter_handle_isoch_oneway_transit_tcp(struct TransferInfo *stats, struct ReportStruct *packet); @@ -160,8 +160,10 @@ bool ReportPacket (struct ReporterData* data, struct ReportStruct *packet) { if (!TimeZero(stats->ts.nextTCPSampleTime) && (TimeDifference(stats->ts.nextTCPSampleTime, packet->packetTime) < 0)) { gettcpinfo(data->info.common->socket, &packet->tcpstats); TimeAdd(stats->ts.nextTCPSampleTime, stats->ts.intervalTime); + rc = true; } else { gettcpinfo(data->info.common->socket, &packet->tcpstats); + rc = true; } } #endif @@ -194,14 +196,14 @@ bool ReportPacket (struct ReporterData* data, struct ReportStruct *packet) { * thread to print a final report and to remove the data report from its jobq. * It also handles the freeing reports and other closing actions */ -int EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) { +bool EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) { assert(reporthdr!=NULL); assert(finalpacket!=NULL); struct ReporterData *report = (struct ReporterData *) reporthdr->this_report; struct ReportStruct packet; memset(&packet, 0, sizeof(struct ReportStruct)); - int do_close = 1; + bool do_close = true; /* * Using PacketID of -1 ends reporting * It pushes a "special packet" through @@ -217,24 +219,25 @@ int EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) { } #endif // clear the reporter done predicate - report->packetring->consumerdone = 0; + report->packetring->consumerdone = false; // the negative packetID is used to inform the report thread this traffic thread is done packet.packetID = -1; packet.packetLen = finalpacket->packetLen; packet.packetTime = finalpacket->packetTime; + packet.err_readwrite = NullEvent; // this is not a real event if (isSingleUDP(report->info.common)) { packetring_enqueue(report->packetring, &packet); reporter_process_transfer_report(report); } else { ReportPacket(report, &packet); -#ifdef HAVE_THREAD_DEBUG - thread_debug( "Traffic thread awaiting reporter to be done with %p and cond %p", (void *)report, (void *) report->packetring->awake_producer); -#endif Condition_Lock((*(report->packetring->awake_producer))); while (!report->packetring->consumerdone) { // This wait time is the lag between the reporter thread // and the traffic thread, a reporter thread with lots of // reports (e.g. fastsampling) can lag per the i/o +#ifdef HAVE_THREAD_DEBUG + thread_debug( "Traffic thread awaiting reporter to be done with %p and cond %p", (void *)report, (void *) report->packetring->awake_producer); +#endif Condition_TimedWait(report->packetring->awake_producer, 1); // printf("Consumer done may be stuck\n"); } @@ -249,7 +252,7 @@ int EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) { #endif FreeSumReport(report->FullDuplexReport); } else { - do_close = 0; + do_close = false; } } return do_close; @@ -422,7 +425,7 @@ void reporter_spawn (struct thread_Settings *thread) { #ifdef HAVE_THREAD_DEBUG thread_debug( "Reporter thread started"); #endif - if (isEnhanced(thread)) { + if (isEnhanced(thread) && (thread->mThreadMode == kMode_Client)) { myConnectionReport = InitConnectOnlyReport(thread); } /* @@ -480,8 +483,9 @@ void reporter_spawn (struct thread_Settings *thread) { // Report process report returns true // when a report needs to be removed // from the jobq. Also, work item might - // be removed as part of processing - // Store a cached pointer for the linked list maitenance + // be free as part of its processing + // Store a cached pointer tmp + // for the next work item struct ReportHeader *tmp = (*work_item)->next; if (reporter_process_report(*work_item)) { #ifdef HAVE_THREAD_DEBUG @@ -510,11 +514,11 @@ void reporter_spawn (struct thread_Settings *thread) { } // The Transfer or Data report is by far the most complicated report -int reporter_process_transfer_report (struct ReporterData *this_ireport) { +bool reporter_process_transfer_report (struct ReporterData *this_ireport) { assert(this_ireport != NULL); struct TransferInfo *sumstats = (this_ireport->GroupSumReport ? &this_ireport->GroupSumReport->info : NULL); struct TransferInfo *fullduplexstats = (this_ireport->FullDuplexReport ? &this_ireport->FullDuplexReport->info : NULL); - int need_free = 0; + bool need_free = false; // The consumption detector applies delay to the reporter // thread when its consumption rate is too low. This allows // the traffic threads to send aggregates vs thrash @@ -530,7 +534,7 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) { apply_consumption_detector(); // If there are more packets to process then handle them struct ReportStruct *packet = NULL; - int advance_jobq = 0; + bool advance_jobq = false; while (!advance_jobq && (packet = packetring_dequeue(this_ireport->packetring))) { // Increment the total packet count processed by this thread // this will be used to make decisions on if the reporter @@ -543,9 +547,21 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) { reporter_handle_packet_tcpistats(this_ireport, packet); } #endif + if (this_ireport->transfer_interval_handler) { + if (sumstats && (this_ireport->packetring->uplevel != sumstats->uplevel) \ + && (TimeDifference(sumstats->ts.nextTime, packet->packetTime) > 0)) { + sumstats->slot_thread_upcount++; +#if HAVE_SUMMING_DEBUG + printf("**** %s upcnt (%p) pkt=%ld.%ld (up/down)=%d/%d uplevel (sum/pkt)=%d/%d\n", this_ireport->info.common->transferIDStr, (void *)this_ireport->packetring, \ + (long) packet->packetTime.tv_sec, (long) packet->packetTime.tv_usec, sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \ + sumstats->uplevel, this_ireport->packetring->uplevel); +#endif + this_ireport->packetring->uplevel = toggleLevel(this_ireport->packetring->uplevel); + } + } if (!(packet->packetID < 0)) { // Check to output any interval reports, - // bursts need to report the packet first + // bursts need to report the packet first if (this_ireport->packet_handler_pre_report) { (*this_ireport->packet_handler_pre_report)(this_ireport, packet); } @@ -564,8 +580,8 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) { if (fullduplexstats) fullduplexstats->ts.packetTime = packet->packetTime; } else { - need_free = 1; - advance_jobq = 1; + need_free = true; + advance_jobq = true; // A last packet event was detected // printf("last packet event detected\n"); fflush(stdout); this_ireport->reporter_thread_suspends = consumption_detector.reporter_thread_suspends; @@ -577,7 +593,7 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) { } this_ireport->info.ts.packetTime = packet->packetTime; assert(this_ireport->transfer_protocol_handler != NULL); - (*this_ireport->transfer_protocol_handler)(this_ireport, 1); + (*this_ireport->transfer_protocol_handler)(this_ireport, true); // This is a final report so set the sum report header's packet time // Note, the thread with the max value will set this if (fullduplexstats && isEnhanced(this_ireport->info.common)) { @@ -587,20 +603,22 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) { } if (DecrSumReportRefCounter(this_ireport->FullDuplexReport) == 0) { if (this_ireport->FullDuplexReport->transfer_protocol_sum_handler) { - (*this_ireport->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, 1); + (*this_ireport->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, true); } // FullDuplex report gets freed by a traffic thread (per its barrier) } } if (sumstats) { - if (!this_ireport->GroupSumReport->threads_cntr_fsum) - this_ireport->GroupSumReport->threads_cntr_fsum = this_ireport->GroupSumReport->reference.maxcount; if (TimeDifference(sumstats->ts.packetTime, packet->packetTime) > 0) { sumstats->ts.packetTime = packet->packetTime; } - if (this_ireport->GroupSumReport->transfer_protocol_sum_handler && \ - (--this_ireport->GroupSumReport->threads_cntr_fsum == 0) && (this_ireport->GroupSumReport->reference.maxcount > 1)) { - (*this_ireport->GroupSumReport->transfer_protocol_sum_handler)(&this_ireport->GroupSumReport->info, 1); + if (this_ireport->GroupSumReport->transfer_protocol_sum_handler) { + Mutex_Lock(&this_ireport->GroupSumReport->reference.lock); + if ((++this_ireport->GroupSumReport->final_thread_upcount == this_ireport->GroupSumReport->reference.maxcount) && \ + ((this_ireport->GroupSumReport->reference.maxcount > 1) || isSumOnly(this_ireport->info.common))) { + (*this_ireport->GroupSumReport->transfer_protocol_sum_handler)(&this_ireport->GroupSumReport->info, true); + } + Mutex_Unlock(&this_ireport->GroupSumReport->reference.lock); } } } @@ -614,17 +632,20 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) { * can't use them anymore (except for the DATA REPORT); * */ -inline int reporter_process_report (struct ReportHeader *reporthdr) { +inline bool reporter_process_report (struct ReportHeader *reporthdr) { assert(reporthdr != NULL); - int done = 1; + bool done = true; switch (reporthdr->type) { case DATA_REPORT: done = reporter_process_transfer_report((struct ReporterData *)reporthdr->this_report); - fflush(stdout); if (done) { struct ReporterData *tmp = (struct ReporterData *)reporthdr->this_report; struct PacketRing *pr = tmp->packetring; - pr->consumerdone = 1; + pr->consumerdone = true; +# ifdef HAVE_THREAD_DEBUG + struct ReporterData *report = (struct ReporterData *) reporthdr->this_report; + thread_debug( "Reporter thread signal traffic thread %p %p", (void *)report, (void *) report->packetring->awake_producer); +#endif // Data Reports are special because the traffic thread needs to free them, just signal Condition_Signal(pr->awake_producer); } @@ -642,20 +663,23 @@ inline int reporter_process_report (struct ReportHeader *reporthdr) { } } reporter_print_connection_report(creport); - fflush(stdout); FreeReport(reporthdr); } break; case SETTINGS_REPORT: reporter_print_settings_report((struct ReportSettings *)reporthdr->this_report); - fflush(stdout); FreeReport(reporthdr); break; case SERVER_RELAY_REPORT: reporter_print_server_relay_report((struct ServerRelay *)reporthdr->this_report); - fflush(stdout); FreeReport(reporthdr); break; + case STRING_REPORT: + if (reporthdr->this_report) { + printf("%s\n", (char *)reporthdr->this_report); + free((char *)reporthdr->this_report); + } + break; default: fprintf(stderr,"Invalid report type in process report %p\n", reporthdr->this_report); assert(0); @@ -675,7 +699,7 @@ inline int reporter_process_report (struct ReportHeader *reporthdr) { // Reporter private routines void reporter_handle_packet_null (struct ReporterData *data, struct ReportStruct *packet) { } -void reporter_transfer_protocol_null (struct ReporterData *data, int final){ +void reporter_transfer_protocol_null (struct ReporterData *data, bool final){ } static inline void reporter_compute_packet_pps (struct TransferInfo *stats, struct ReportStruct *packet) { @@ -694,7 +718,7 @@ static void reporter_handle_packet_oneway_transit (struct TransferInfo *stats, s // Transit or latency updates done inline below double transit = TimeDifference(packet->packetTime, packet->sentTime); if (stats->latency_histogram) { - histogram_insert(stats->latency_histogram, transit, NULL); + histogram_insert(stats->latency_histogram, transit, &packet->packetTime); } double deltaTransit; deltaTransit = transit - stats->transit.current.last; @@ -756,7 +780,6 @@ static void reporter_handle_packet_oneway_transit (struct TransferInfo *stats, s } } - static void reporter_handle_isoch_oneway_transit_tcp (struct TransferInfo *stats, struct ReportStruct *packet) { // printf("fid=%lu bs=%lu remain=%lu\n", packet->frameID, packet->burstsize, packet->remaining); if (packet->frameID && packet->transit_ready) { @@ -908,12 +931,12 @@ static void reporter_handle_frame_isoch_oneway_transit (struct TransferInfo *sta void reporter_handle_packet_client (struct ReporterData *data, struct ReportStruct *packet) { struct TransferInfo *stats = &data->info; stats->ts.packetTime = packet->packetTime; - if (!packet->emptyreport) { + switch (packet->err_readwrite) { + case WriteSelectRetry : + stats->sock_callstats.write.WriteErr++; + stats->sock_callstats.write.totWriteErr++; + case WriteSuccess : stats->total.Bytes.current += packet->packetLen; - if (packet->errwrite && (packet->errwrite != WriteErrNoAccount)) { - stats->sock_callstats.write.WriteErr++; - stats->sock_callstats.write.totWriteErr++; - } // These are valid packets that need standard iperf accounting stats->sock_callstats.write.WriteCnt += packet->writecnt; stats->sock_callstats.write.totWriteCnt += packet->writecnt; @@ -929,7 +952,20 @@ void reporter_handle_packet_client (struct ReporterData *data, struct ReportStru histogram_insert(stats->write_histogram, (1e-6 * packet->write_time), NULL); } } + break; + case WriteTimeo: + stats->sock_callstats.write.WriteTimeo++; + stats->sock_callstats.write.totWriteTimeo++; + case WriteErrAccount : + stats->sock_callstats.write.WriteErr++; + stats->sock_callstats.write.totWriteErr++; + case WriteNoAccount: + case NullEvent: + break; + default : + fprintf(stderr, "Program error: invalid client packet->err_readwrite %d\n", packet->err_readwrite); } + if (isUDP(stats->common)) { stats->PacketID = packet->packetID; reporter_compute_packet_pps(stats, packet); @@ -945,25 +981,47 @@ void reporter_handle_packet_bb_client (struct ReporterData *data, struct ReportS if (!packet->emptyreport && (packet->packetLen > 0)) { stats->total.Bytes.current += packet->packetLen; double bbrtt = TimeDifference(packet->packetTime, packet->sentTime); - double bbowdto = TimeDifference(packet->sentTimeRX, packet->sentTime); - double bbowdfro = TimeDifference(packet->packetTime, packet->sentTimeTX); - double asym = bbowdfro - bbowdto; - stats->ts.prevpacketTime = packet->packetTime; -#if DEBUG_BB_TIMESTAMPS - fprintf(stderr, "BB Debug: ctx=%lx.%lx srx=%lx.%lx stx=%lx.%lx crx=%lx.%lx (hex)\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->sentTimeRX.tv_sec, packet->sentTimeRX.tv_usec, packet->sentTimeTX.tv_sec, packet->sentTimeTX.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec); - fprintf(stderr, "BB Debug: ctx=%ld.%ld srx=%ld.%ld stx=%ld.%ld crx=%ld.%ld\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->sentTimeRX.tv_sec, packet->sentTimeRX.tv_usec, packet->sentTimeTX.tv_sec, packet->sentTimeTX.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec); - fprintf(stderr, "BB RTT=%f OWDTo=%f OWDFro=%f Asym=%f\n", bbrtt, bbowdto, bbowdfro, asym); -#endif stats->iBBrunning += bbrtt; stats->fBBrunning += bbrtt; reporter_update_mmm(&stats->bbrtt.current, bbrtt); reporter_update_mmm(&stats->bbrtt.total, bbrtt); - reporter_update_mmm(&stats->bbowdto.total, bbowdto); - reporter_update_mmm(&stats->bbowdfro.total, bbowdfro); - reporter_update_mmm(&stats->bbasym.total, fabs(asym)); if (stats->bbrtt_histogram) { histogram_insert(stats->bbrtt_histogram, bbrtt, NULL); } + if (isTripTime(stats->common)) { + double bbowdto = TimeDifference(packet->sentTimeRX, packet->sentTime); + double bbowdfro = TimeDifference(packet->packetTime, packet->sentTimeTX); + double asym = bbowdfro - bbowdto; + double bbturnaround = TimeDifference(packet->sentTimeTX, packet->sentTimeRX); + double bbadj = TimeDifference(packet->packetTime, packet->sentTimeRX); + // If you measure RTT, you can detect when clock are unsync. + // If you have the sent-time, rcv-time and return-time, you can check that + // sent-time < rcv-time < return-time. As sent-time and return-time use + // the same clock, you can detect any drift bigger than RTT. JT + // + // Adjust this clock A write < clock B read < Clock A read - (clock B write - clock B read) + if ((bbowdto < 0) || ((bbadj - bbturnaround) < 0)) { + stats->bb_clocksync_error++; + } + reporter_update_mmm(&stats->bbowdto.total, bbowdto); + reporter_update_mmm(&stats->bbowdfro.total, bbowdfro); + reporter_update_mmm(&stats->bbasym.total, fabs(asym)); + reporter_update_mmm(&stats->bbowdto.current, bbowdto); + reporter_update_mmm(&stats->bbowdfro.current, bbowdfro); + reporter_update_mmm(&stats->bbasym.current, fabs(asym)); + if (stats->bbowdto_histogram) { + histogram_insert(stats->bbowdto_histogram, bbowdto, NULL); + } + if (stats->bbowdfro_histogram) { + histogram_insert(stats->bbowdfro_histogram, bbowdfro, NULL); + } + } + stats->ts.prevpacketTime = packet->packetTime; +#if DEBUG_BB_TIMESTAMPS + fprintf(stderr, "BB Debug: ctx=%lx.%lx srx=%lx.%lx stx=%lx.%lx crx=%lx.%lx (hex)\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->sentTimeRX.tv_sec, packet->sentTimeRX.tv_usec, packet->sentTimeTX.tv_sec, packet->sentTimeTX.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec); + fprintf(stderr, "BB Debug: ctx=%ld.%ld srx=%ld.%ld stx=%ld.%ld crx=%ld.%ld\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->sentTimeRX.tv_sec, packet->sentTimeRX.tv_usec, packet->sentTimeTX.tv_sec, packet->sentTimeTX.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec); + fprintf(stderr, "BB RTT=%f OWDTo=%f OWDFro=%f Asym=%f\n", bbrtt, bbowdto, bbowdfro, asym); +#endif } } @@ -981,8 +1039,7 @@ inline void reporter_handle_packet_server_tcp (struct ReporterData *data, struct int bin; stats->total.Bytes.current += packet->packetLen; // mean min max tests - stats->sock_callstats.read.cntRead++; - stats->sock_callstats.read.totcntRead++; + stats->sock_callstats.read.ReadCnt.current++; bin = (int)floor((packet->packetLen -1)/stats->sock_callstats.read.binsize); if (bin < TCPREADBINCOUNT) { stats->sock_callstats.read.bins[bin]++; @@ -1007,8 +1064,21 @@ inline void reporter_handle_packet_server_udp (struct ReporterData *data, struct // Hence, set the per interval min to infinity // and the per interval max and sum to zero reporter_reset_mmm(&stats->transit.current); - } else if (packet->packetID > 0) { - stats->total.Bytes.current += packet->packetLen; + } else if (!packet->emptyreport && (packet->packetID > 0)) { + bool ooo_packet = false; + // packet loss occured if the datagram numbers aren't sequential + if (packet->packetID != stats->PacketID + 1) { + if (packet->packetID < stats->PacketID + 1) { + stats->total.OutofOrder.current++; + ooo_packet = true; + } else { + stats->total.Lost.current += packet->packetID - stats->PacketID - 1; + } + } + // never decrease datagramID (e.g. if we get an out-of-order packet) + if (packet->packetID > stats->PacketID) { + stats->PacketID = packet->packetID; + } // These are valid packets that need standard iperf accounting // Do L2 accounting first (if needed) if (packet->l2errors && (stats->total.Datagrams.current > L2DROPFILTERCOUNTER)) { @@ -1027,20 +1097,17 @@ inline void reporter_handle_packet_server_udp (struct ReporterData *data, struct stats->l2counts.tot_udpcsumerr++; } } - // packet loss occured if the datagram numbers aren't sequential - if (packet->packetID != stats->PacketID + 1) { - if (packet->packetID < stats->PacketID + 1) { - stats->total.OutofOrder.current++; - } else { - stats->total.Lost.current += packet->packetID - stats->PacketID - 1; - } + if (packet->err_readwrite == ReadErrLen) { + stats->sock_callstats.read.ReadErrLenCnt.current++; } - // never decrease datagramID (e.g. if we get an out-of-order packet) - if (packet->packetID > stats->PacketID) { - stats->PacketID = packet->packetID; + if (!ooo_packet && \ + ((packet->err_readwrite == ReadSuccess) || + ((packet->err_readwrite == ReadErrLen) && (packet->packetLen >= sizeof(struct UDP_datagram))))) { + reporter_handle_packet_oneway_transit(stats, packet); } + stats->total.Bytes.current += packet->packetLen; reporter_compute_packet_pps(stats, packet); - reporter_handle_packet_oneway_transit(stats, packet); + if (packet->transit_ready) { if (isIsochronous(stats->common)) { reporter_handle_isoch_oneway_transit_udp(stats, packet); @@ -1049,9 +1116,14 @@ inline void reporter_handle_packet_server_udp (struct ReporterData *data, struct } } } + if (packet->err_readwrite != ReadNoAccount) { + if (packet->emptyreport) { + stats->sock_callstats.read.ReadTimeoCnt.current++; + } else { + stats->sock_callstats.read.ReadCnt.current++; + } + } } - -// This is done in reporter thread context #if HAVE_TCP_STATS static inline void reporter_handle_packet_tcpistats (struct ReporterData *data, struct ReportStruct *packet) { assert(data!=NULL); @@ -1060,12 +1132,22 @@ static inline void reporter_handle_packet_tcpistats (struct ReporterData *data, stats->sock_callstats.write.tcpstats.retry_prev = packet->tcpstats.retry_tot; stats->sock_callstats.write.tcpstats.retry_tot = packet->tcpstats.retry_tot; stats->sock_callstats.write.tcpstats.cwnd = packet->tcpstats.cwnd; + stats->sock_callstats.write.tcpstats.cwnd_packets = packet->tcpstats.cwnd_packets; stats->sock_callstats.write.tcpstats.rtt = packet->tcpstats.rtt; stats->sock_callstats.write.tcpstats.rttvar = packet->tcpstats.rttvar; +#if HAVE_TCP_INFLIGHT + stats->sock_callstats.write.tcpstats.packets_in_flight = packet->tcpstats.packets_in_flight; + stats->sock_callstats.write.tcpstats.bytes_in_flight = packet->tcpstats.bytes_in_flight; +#else + stats->sock_callstats.write.tcpstats.bytes_in_flight = -1; + stats->sock_callstats.write.tcpstats.packets_in_flight = -1; +#endif +#if (HAVE_DECL_SO_MAX_PACING_RATE) + stats->FQPacingRateCurrent = packet->FQPacingRate; +#endif } #endif - /* * Report printing routines below */ @@ -1110,6 +1192,22 @@ static inline void reporter_set_timestamps_time (struct TransferInfo *stats, enu } } +#if HAVE_SUMMING_DEBUG +static void reporter_dump_timestamps (struct ReportStruct *packet, struct TransferInfo *stats, struct TransferInfo *sumstats) { + if (packet) + printf("**** %s pkt =%ld.%ld (up/down)=%d/%d\n", stats->common->transferIDStr, (long) packet->packetTime.tv_sec, \ + (long) packet->packetTime.tv_usec, sumstats->slot_thread_upcount, sumstats->slot_thread_downcount); + else { + printf("**** %s pkt ts =%ld.%ld prev=%ld.%ld (up/down)=%d/%d\n", stats->common->transferIDStr, (long) stats->ts.packetTime.tv_sec, \ + (long) stats->ts.packetTime.tv_usec, (long) stats->ts.prevpacketTime.tv_sec, (long) stats->ts.prevpacketTime.tv_usec, sumstats->slot_thread_upcount, sumstats->slot_thread_downcount); + } + printf("**** %s stats =%ld.%ld next=%ld.%ld prev=%ld.%ld\n", stats->common->transferIDStr, (long) stats->ts.packetTime.tv_sec, \ + (long) stats->ts.packetTime.tv_usec, (long) stats->ts.nextTime.tv_sec, (long) stats->ts.nextTime.tv_usec, (long) stats->ts.prevpacketTime.tv_sec, (long) stats->ts.prevpacketTime.tv_usec); + printf("**** %s sum stats=%ld.%ld next=%ld.%ld prev=%ld.%ld \n", stats->common->transferIDStr, (long) sumstats->ts.packetTime.tv_sec, \ + (long) sumstats->ts.packetTime.tv_usec, (long) sumstats->ts.nextTime.tv_sec, (long) sumstats->ts.nextTime.tv_usec, (long) sumstats->ts.prevTime.tv_sec, (long) sumstats->ts.prevTime.tv_usec); +} +#endif + // If reports were missed, catch up now static inline void reporter_transfer_protocol_missed_reports (struct TransferInfo *stats, struct ReportStruct *packet) { while (TimeDifference(packet->packetTime, stats->ts.nextTime) > TimeDouble(stats->ts.intervalTime)) { @@ -1125,10 +1223,24 @@ static inline void reporter_transfer_protocol_missed_reports (struct TransferInf } } +static inline void reporter_reset_transfer_stats_sum (struct TransferInfo *sumstats) { +#if HAVE_SUMMING_DEBUG + printf("***** [SUM] RESET %ld.%ld nxt %ld.%ld down=%d up=%d\n", (long) sumstats->ts.prevTime.tv_sec, (long) sumstats->ts.prevTime.tv_usec, \ + (long) sumstats->ts.nextTime.tv_sec, (long) sumstats->ts.nextTime.tv_usec, sumstats->slot_thread_downcount, sumstats->slot_thread_upcount); +#endif + sumstats->slot_thread_upcount -= sumstats->slot_thread_downcount; + sumstats->slot_thread_downcount = 0; + sumstats->ts.prevTime = sumstats->ts.nextTime; + sumstats->iInP = 0; + sumstats->uplevel = toggleLevel(sumstats->uplevel); + sumstats->downlevel = toggleLevel(sumstats->downlevel); +} + static inline void reporter_reset_transfer_stats_client_tcp (struct TransferInfo *stats) { stats->total.Bytes.prev = stats->total.Bytes.current; stats->sock_callstats.write.WriteCnt = 0; stats->sock_callstats.write.WriteErr = 0; + stats->sock_callstats.write.WriteTimeo = 0; stats->isochstats.framecnt.prev = stats->isochstats.framecnt.current; stats->isochstats.framelostcnt.prev = stats->isochstats.framelostcnt.current; stats->isochstats.slipcnt.prev = stats->isochstats.slipcnt.current; @@ -1144,13 +1256,7 @@ static inline void reporter_reset_transfer_stats_client_tcp (struct TransferInfo reporter_reset_mmm(&stats->bbasym.current); } if (isTcpWriteTimes(stats->common)) { - stats->write_mmm.current.cnt = 0; - stats->write_mmm.current.min = FLT_MAX; - stats->write_mmm.current.max = FLT_MIN; - stats->write_mmm.current.sum = 0; - stats->write_mmm.current.vd = 0; - stats->write_mmm.current.mean = 0; - stats->write_mmm.current.m2 = 0; + reporter_reset_mmm(&stats->write_mmm.current); } } @@ -1164,6 +1270,7 @@ static inline void reporter_reset_transfer_stats_client_udp (struct TransferInfo stats->total.IPG.prev = stats->total.IPG.current; stats->sock_callstats.write.WriteCnt = 0; stats->sock_callstats.write.WriteErr = 0; + stats->sock_callstats.write.WriteTimeo = 0; stats->isochstats.framecnt.prev = stats->isochstats.framecnt.current; stats->isochstats.framelostcnt.prev = stats->isochstats.framelostcnt.current; stats->isochstats.slipcnt.prev = stats->isochstats.slipcnt.current; @@ -1174,7 +1281,7 @@ static inline void reporter_reset_transfer_stats_client_udp (struct TransferInfo static inline void reporter_reset_transfer_stats_server_tcp (struct TransferInfo *stats) { int ix; stats->total.Bytes.prev = stats->total.Bytes.current; - stats->sock_callstats.read.cntRead = 0; + stats->sock_callstats.read.ReadCnt.prev = stats->sock_callstats.read.ReadCnt.current; for (ix = 0; ix < 8; ix++) { stats->sock_callstats.read.bins[ix] = 0; } @@ -1198,8 +1305,10 @@ static inline void reporter_reset_transfer_stats_server_udp (struct TransferInfo stats->l2counts.unknown = 0; stats->l2counts.udpcsumerr = 0; stats->l2counts.lengtherr = 0; - stats->threadcnt = 0; stats->iInP = 0; + stats->sock_callstats.read.ReadCnt.prev = stats->sock_callstats.read.ReadCnt.current; + stats->sock_callstats.read.ReadTimeoCnt.prev = stats->sock_callstats.read.ReadTimeoCnt.current; + stats->sock_callstats.read.ReadErrLenCnt.prev = stats->sock_callstats.read.ReadErrLenCnt.current; if (stats->cntDatagrams) stats->IPGsum = 0; } @@ -1209,7 +1318,7 @@ static inline void reporter_reset_transfer_stats_server_udp (struct TransferInfo // o) set the TransferInfo struct and then calls the individual report output handler // o) updates the sum and fullduplex reports // -void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final) { +void reporter_transfer_protocol_server_udp (struct ReporterData *data, bool final) { struct TransferInfo *stats = &data->info; struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL; struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL; @@ -1223,6 +1332,9 @@ void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final stats->cntError = 0; stats->cntDatagrams = stats->PacketID - stats->total.Datagrams.prev; stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev; + stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current - stats->sock_callstats.read.ReadCnt.prev; + stats->sock_callstats.read.cntReadTimeo = stats->sock_callstats.read.ReadTimeoCnt.current - stats->sock_callstats.read.ReadTimeoCnt.prev; + stats->sock_callstats.read.cntReadErrLen = stats->sock_callstats.read.ReadErrLenCnt.current - stats->sock_callstats.read.ReadErrLenCnt.prev; if (stats->latency_histogram) { stats->latency_histogram->final = final; @@ -1258,8 +1370,24 @@ void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final sumstats->total.IPG.current += stats->cntIPG; if (sumstats->IPGsum < stats->IPGsum) sumstats->IPGsum = stats->IPGsum; - sumstats->threadcnt++; sumstats->iInP += stats->iInP; + sumstats->sock_callstats.read.cntRead += stats->sock_callstats.read.cntRead; + sumstats->sock_callstats.read.cntReadTimeo += stats->sock_callstats.read.cntReadTimeo; + sumstats->sock_callstats.read.cntReadErrLen += stats->sock_callstats.read.cntReadErrLen; + if (final) { + sumstats->threadcnt_final++; + if (data->packetring->downlevel != sumstats->downlevel) { + sumstats->slot_thread_downcount++; + } + if (data->packetring->uplevel != sumstats->uplevel){ + sumstats->slot_thread_upcount++; + } +#if HAVE_SUMMING_DEBUG + printf("**** %s downcnt (%p) (up/down)=%d/%d final true uplevel (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \ + sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \ + sumstats->uplevel, data->packetring->uplevel); +#endif + } } if (fullduplexstats) { fullduplexstats->total.Bytes.current += stats->cntBytes; @@ -1321,20 +1449,23 @@ void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final if (stats->latency_histogram) { if (sumstats && sumstats->latency_histogram) { histogram_add(sumstats->latency_histogram, stats->latency_histogram); - sumstats->latency_histogram->final = 1; + sumstats->latency_histogram->final = true; } - stats->latency_histogram->final = 1; + stats->latency_histogram->final = true; } if (stats->jitter_histogram) { if (sumstats && sumstats->jitter_histogram) { histogram_add(sumstats->jitter_histogram, stats->jitter_histogram); - sumstats->jitter_histogram->final = 1; + sumstats->jitter_histogram->final = true; } - stats->jitter_histogram->final = 1; + stats->jitter_histogram->final = true; } if (stats->framelatency_histogram) { - stats->framelatency_histogram->final = 1; + stats->framelatency_histogram->final = true; } + stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current; + stats->sock_callstats.read.cntReadTimeo = stats->sock_callstats.read.ReadTimeoCnt.current; + stats->sock_callstats.read.cntReadErrLen = stats->sock_callstats.read.ReadErrLenCnt.current; } if ((stats->output_handler) && !(stats->isMaskOutput)) (*stats->output_handler)(stats); @@ -1342,7 +1473,7 @@ void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final reporter_reset_transfer_stats_server_udp(stats); } -void reporter_transfer_protocol_sum_server_udp (struct TransferInfo *stats, int final) { +void reporter_transfer_protocol_sum_server_udp (struct TransferInfo *stats, bool final) { if (final) { reporter_set_timestamps_time(stats, TOTAL); stats->cntOutofOrder = stats->total.OutofOrder.current; @@ -1368,15 +1499,16 @@ void reporter_transfer_protocol_sum_server_udp (struct TransferInfo *stats, int stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev; stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev; } - if ((stats->output_handler) && !(stats->isMaskOutput)) + if ((stats->output_handler) && !(stats->isMaskOutput)) { (*stats->output_handler)(stats); + } if (!final) { // there is no packet ID for sum server reports, set it to total cnt for calculation stats->PacketID = stats->total.Datagrams.current; reporter_reset_transfer_stats_server_udp(stats); } } -void reporter_transfer_protocol_sum_client_udp (struct TransferInfo *stats, int final) { +void reporter_transfer_protocol_sum_client_udp (struct TransferInfo *stats, bool final) { if (final) { reporter_set_timestamps_time(stats, TOTAL); stats->sock_callstats.write.WriteErr = stats->sock_callstats.write.totWriteErr; @@ -1390,19 +1522,18 @@ void reporter_transfer_protocol_sum_client_udp (struct TransferInfo *stats, int stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev; stats->cntDatagrams = stats->total.Datagrams.current - stats->total.Datagrams.prev; } - if ((stats->output_handler) && !(stats->isMaskOutput)) + if ((stats->output_handler) && !(stats->isMaskOutput)) { (*stats->output_handler)(stats); - + } if (!final) { - stats->threadcnt = 0; reporter_reset_transfer_stats_client_udp(stats); } else if ((stats->common->ReportMode != kReport_CSV) && !(stats->isMaskOutput)) { - printf(report_sumcnt_datagrams, stats->threadcnt, stats->total.Datagrams.current); + printf(report_sumcnt_datagrams, stats->threadcnt_final, stats->total.Datagrams.current); fflush(stdout); } } -void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final) { +void reporter_transfer_protocol_client_udp (struct ReporterData *data, bool final) { struct TransferInfo *stats = &data->info; struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL; struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL; @@ -1418,13 +1549,28 @@ void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final sumstats->total.Bytes.current += stats->cntBytes; sumstats->sock_callstats.write.WriteErr += stats->sock_callstats.write.WriteErr; sumstats->sock_callstats.write.WriteCnt += stats->sock_callstats.write.WriteCnt; + sumstats->sock_callstats.write.WriteTimeo += stats->sock_callstats.write.WriteTimeo; sumstats->sock_callstats.write.totWriteErr += stats->sock_callstats.write.WriteErr; sumstats->sock_callstats.write.totWriteCnt += stats->sock_callstats.write.WriteCnt; + sumstats->sock_callstats.write.totWriteTimeo += stats->sock_callstats.write.WriteTimeo; sumstats->total.Datagrams.current += stats->cntDatagrams; if (sumstats->IPGsum < stats->IPGsum) sumstats->IPGsum = stats->IPGsum; sumstats->total.IPG.current += stats->cntIPG; - sumstats->threadcnt++; + if (final) { + sumstats->threadcnt_final++; + if (data->packetring->downlevel != sumstats->downlevel) { + sumstats->slot_thread_downcount++; + } + if (data->packetring->uplevel != sumstats->uplevel){ + sumstats->slot_thread_upcount++; + } +#if HAVE_SUMMING_DEBUG + printf("**** %s downcnt (%p) (up/down)=%d/%d final true level (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \ + sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \ + sumstats->uplevel, data->packetring->uplevel); +#endif + } } if (fullduplexstats) { fullduplexstats->total.Bytes.current += stats->cntBytes; @@ -1438,6 +1584,7 @@ void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final stats->cntBytes = stats->total.Bytes.current; stats->sock_callstats.write.WriteErr = stats->sock_callstats.write.totWriteErr; stats->sock_callstats.write.WriteCnt = stats->sock_callstats.write.totWriteCnt; + stats->sock_callstats.write.WriteTimeo = stats->sock_callstats.write.totWriteTimeo; stats->cntIPG = stats->total.IPG.current; stats->cntDatagrams = stats->PacketID; stats->IPGsum = TimeDifference(stats->ts.packetTime, stats->ts.startTime); @@ -1463,14 +1610,14 @@ void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final reporter_reset_transfer_stats_client_udp(stats); } -void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final) { +void reporter_transfer_protocol_server_tcp (struct ReporterData *data, bool final) { struct TransferInfo *stats = &data->info; struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL; struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL; stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev; int ix; if (stats->framelatency_histogram) { - stats->framelatency_histogram->final = 0; + stats->framelatency_histogram->final = false; } double thisInP; if (!final) { @@ -1479,18 +1626,18 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final double meantransit = (double) ((stats->transit.current.cnt > 0) ? (stats->transit.current.sum / stats->transit.current.cnt) : 0.0); thisInP = lambda * meantransit; stats->iInP = thisInP; + stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current - stats->sock_callstats.read.ReadCnt.prev; } else { double bytecnt = (double) stats->cntBytes; double lambda = (stats->IPGsum > 0.0) ? (bytecnt / stats->IPGsum) : 0.0; double meantransit = (double) ((stats->transit.total.cnt > 0) ? (stats->transit.total.sum / stats->transit.total.cnt) : 0.0); thisInP = lambda * meantransit; stats->fInP = thisInP; + stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current; } if (sumstats) { - sumstats->threadcnt++; sumstats->total.Bytes.current += stats->cntBytes; - sumstats->sock_callstats.read.cntRead += stats->sock_callstats.read.cntRead; - sumstats->sock_callstats.read.totcntRead += stats->sock_callstats.read.cntRead; + sumstats->sock_callstats.read.ReadCnt.current += stats->sock_callstats.read.cntRead; for (ix = 0; ix < TCPREADBINCOUNT; ix++) { sumstats->sock_callstats.read.bins[ix] += stats->sock_callstats.read.bins[ix]; sumstats->sock_callstats.read.totbins[ix] += stats->sock_callstats.read.bins[ix]; @@ -1499,6 +1646,18 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final sumstats->iInP += thisInP; } else { sumstats->fInP += thisInP; + sumstats->threadcnt_final++; + if (data->packetring->downlevel != sumstats->downlevel) { + sumstats->slot_thread_downcount++; + } + if (data->packetring->uplevel != sumstats->uplevel){ + sumstats->slot_thread_upcount++; + } +#if HAVE_SUMMING_DEBUG + printf("**** %s downcnt (%p) (up/down)=%d/%d final true level (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \ + sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \ + sumstats->uplevel, data->packetring->uplevel); +#endif } } if (fullduplexstats) { @@ -1520,12 +1679,12 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final } } if (stats->framelatency_histogram) { - stats->framelatency_histogram->final = 1; + stats->framelatency_histogram->final = true; } reporter_set_timestamps_time(stats, TOTAL); stats->cntBytes = stats->total.Bytes.current; stats->IPGsum = stats->ts.iEnd; - stats->sock_callstats.read.cntRead = stats->sock_callstats.read.totcntRead; + stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current; for (ix = 0; ix < TCPREADBINCOUNT; ix++) { stats->sock_callstats.read.bins[ix] = stats->sock_callstats.read.totbins[ix]; } @@ -1538,9 +1697,9 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final if (stats->framelatency_histogram) { if (sumstats && sumstats->framelatency_histogram) { histogram_add(sumstats->framelatency_histogram, stats->framelatency_histogram); - sumstats->framelatency_histogram->final = 1; + sumstats->framelatency_histogram->final = true; } - stats->framelatency_histogram->final = 1; + stats->framelatency_histogram->final = true; } } else if (isIsochronous(stats->common)) { stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev; @@ -1557,7 +1716,7 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final reporter_reset_transfer_stats_server_tcp(stats); } -void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final) { +void reporter_transfer_protocol_client_tcp (struct ReporterData *data, bool final) { struct TransferInfo *stats = &data->info; struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL; struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL; @@ -1585,7 +1744,24 @@ void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final sumstats->sock_callstats.write.WriteCnt += stats->sock_callstats.write.WriteCnt; sumstats->sock_callstats.write.totWriteErr += stats->sock_callstats.write.WriteErr; sumstats->sock_callstats.write.totWriteCnt += stats->sock_callstats.write.WriteCnt; - sumstats->threadcnt++; + if (final) { + sumstats->threadcnt_final++; + if (data->packetring->downlevel != sumstats->downlevel) { + sumstats->slot_thread_downcount++; + } + if (data->packetring->uplevel != sumstats->uplevel){ + sumstats->slot_thread_upcount++; + } +#if HAVE_SUMMING_DEBUG + printf("**** %s downcnt (%p) (up/down)=%d/%d final true level (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \ + sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \ + sumstats->uplevel, data->packetring->uplevel); +#endif + } +#if HAVE_SUMMING_DEBUG + reporter_dump_timestamps(NULL, stats, sumstats); +#endif + #if HAVE_TCP_STATS sumstats->sock_callstats.write.tcpstats.retry += stats->sock_callstats.write.tcpstats.retry; sumstats->sock_callstats.write.tcpstats.retry_tot += stats->sock_callstats.write.tcpstats.retry; @@ -1596,10 +1772,10 @@ void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final } if (final) { if (stats->latency_histogram) { - stats->latency_histogram->final = 1; + stats->latency_histogram->final = true; } if (stats->write_histogram) { - stats->write_histogram->final = 1; + stats->write_histogram->final = true; } if ((stats->cntBytes > 0) && stats->output_handler && !TimeZero(stats->ts.intervalTime)) { // print a partial interval report if enable and this a final @@ -1626,7 +1802,7 @@ void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final stats->sock_callstats.write.tcpstats.retry = stats->sock_callstats.write.tcpstats.retry_tot; #endif if (stats->framelatency_histogram) { - stats->framelatency_histogram->final = 1; + stats->framelatency_histogram->final = true; } stats->cntBytes = stats->total.Bytes.current; stats->write_mmm.current = stats->write_mmm.total; @@ -1646,19 +1822,20 @@ void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final /* * Handles summing of threads */ -void reporter_transfer_protocol_sum_client_tcp (struct TransferInfo *stats, int final) { +void reporter_transfer_protocol_sum_client_tcp (struct TransferInfo *stats, bool final) { if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) { stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev; if (final) { if ((stats->output_handler) && !(stats->isMaskOutput)) { reporter_set_timestamps_time(stats, FINALPARTIAL); - if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial) + if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial) { (*stats->output_handler)(stats); + } reporter_reset_transfer_stats_client_tcp(stats); } } else if ((stats->output_handler) && !(stats->isMaskOutput)) { (*stats->output_handler)(stats); - stats->threadcnt = 0; + reporter_reset_transfer_stats_sum(stats); } reporter_reset_transfer_stats_client_tcp(stats); } @@ -1675,7 +1852,7 @@ void reporter_transfer_protocol_sum_client_tcp (struct TransferInfo *stats, int } } -void reporter_transfer_protocol_client_bb_tcp (struct ReporterData *data, int final) { +void reporter_transfer_protocol_client_bb_tcp (struct ReporterData *data, bool final) { struct TransferInfo *stats = &data->info; if (final) { @@ -1703,7 +1880,7 @@ void reporter_transfer_protocol_client_bb_tcp (struct ReporterData *data, int fi } } -void reporter_transfer_protocol_server_bb_tcp (struct ReporterData *data, int final) { +void reporter_transfer_protocol_server_bb_tcp (struct ReporterData *data, bool final) { struct TransferInfo *stats = &data->info; if (final) { if ((stats->cntBytes > 0) && stats->output_handler && !TimeZero(stats->ts.intervalTime)) { @@ -1716,7 +1893,6 @@ void reporter_transfer_protocol_server_bb_tcp (struct ReporterData *data, int fi } } #if HAVE_TCP_STATS - stats->sock_callstats.write.tcpstats.retry = stats->sock_callstats.write.tcpstats.retry_tot; #endif stats->cntBytes = stats->total.Bytes.current; @@ -1730,26 +1906,27 @@ void reporter_transfer_protocol_server_bb_tcp (struct ReporterData *data, int fi reporter_reset_transfer_stats_client_tcp(stats); } -void reporter_transfer_protocol_sum_server_tcp (struct TransferInfo *stats, int final) { +void reporter_transfer_protocol_sum_server_tcp (struct TransferInfo *stats, bool final) { if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) { stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev; + stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current - stats->sock_callstats.read.ReadCnt.prev; if (final) { if ((stats->output_handler) && !(stats->isMaskOutput)) { reporter_set_timestamps_time(stats, FINALPARTIAL); - if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial) + if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial) { (*stats->output_handler)(stats); + } } } else if ((stats->output_handler) && !(stats->isMaskOutput)) { (*stats->output_handler)(stats); - stats->threadcnt = 0; - stats->iInP = 0; + reporter_reset_transfer_stats_sum(stats); } reporter_reset_transfer_stats_server_tcp(stats); } if (final) { int ix; stats->cntBytes = stats->total.Bytes.current; - stats->sock_callstats.read.cntRead = stats->sock_callstats.read.totcntRead; + stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current; for (ix = 0; ix < TCPREADBINCOUNT; ix++) { stats->sock_callstats.read.bins[ix] = stats->sock_callstats.read.totbins[ix]; } @@ -1759,7 +1936,7 @@ void reporter_transfer_protocol_sum_server_tcp (struct TransferInfo *stats, int (*stats->output_handler)(stats); } } -void reporter_transfer_protocol_fullduplex_tcp (struct TransferInfo *stats, int final) { +void reporter_transfer_protocol_fullduplex_tcp (struct TransferInfo *stats, bool final) { if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) { stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev; if (final) { @@ -1781,7 +1958,7 @@ void reporter_transfer_protocol_fullduplex_tcp (struct TransferInfo *stats, int (*stats->output_handler)(stats); } -void reporter_transfer_protocol_fullduplex_udp (struct TransferInfo *stats, int final) { +void reporter_transfer_protocol_fullduplex_udp (struct TransferInfo *stats, bool final) { if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) { stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev; stats->cntDatagrams = stats->total.Datagrams.current - stats->total.Datagrams.prev; @@ -1814,11 +1991,11 @@ void reporter_transfer_protocol_fullduplex_udp (struct TransferInfo *stats, int } // Conditional print based on time -int reporter_condprint_time_interval_report (struct ReporterData *data, struct ReportStruct *packet) { +bool reporter_condprint_time_interval_report (struct ReporterData *data, struct ReportStruct *packet) { struct TransferInfo *stats = &data->info; assert(stats!=NULL); // printf("***sum handler = %p\n", (void *) data->GroupSumReport->transfer_protocol_sum_handler); - int advance_jobq = 0; + bool advance_jobq = false; // Print a report if packet time exceeds the next report interval time, // Also signal to the caller to move to the next report (or packet ring) // if there was output. This will allow for more precise interval sum accounting. @@ -1826,7 +2003,7 @@ int reporter_condprint_time_interval_report (struct ReporterData *data, struct R // printf("***** nt %ld.%ld pt %ld.%ld pid=%lld empty=%d\n", stats->ts.nextTime.tv_sec, stats->ts.nextTime.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec, packet->packetID, packet->emptyreport); if (TimeDifference(stats->ts.nextTime, packet->packetTime) < 0) { assert(data->transfer_protocol_handler!=NULL); - advance_jobq = 1; + advance_jobq = true; struct TransferInfo *sumstats = (data->GroupSumReport ? &data->GroupSumReport->info : NULL); struct TransferInfo *fullduplexstats = (data->FullDuplexReport ? &data->FullDuplexReport->info : NULL); stats->ts.packetTime = packet->packetTime; @@ -1838,10 +2015,19 @@ int reporter_condprint_time_interval_report (struct ReporterData *data, struct R if (fullduplexstats && ((++data->FullDuplexReport->threads) == 2) && isEnhanced(stats->common)) { data->FullDuplexReport->threads = 0; assert(data->FullDuplexReport->transfer_protocol_sum_handler != NULL); - (*data->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, 0); + (*data->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, false); } if (sumstats) { - if ((++data->GroupSumReport->threads) == data->GroupSumReport->reference.count) { + if (data->packetring->downlevel != sumstats->downlevel) { + sumstats->slot_thread_downcount++; + data->packetring->downlevel = toggleLevel(data->packetring->downlevel); +#if HAVE_SUMMING_DEBUG + printf("**** %s downcnt (%p) pkt=%ld.%ld (up/down)=%d/%d final false level (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \ + (long) packet->packetTime.tv_sec, (long) packet->packetTime.tv_usec, sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \ + sumstats->uplevel, data->packetring->uplevel); +#endif + } + if ((sumstats->slot_thread_downcount) == sumstats->slot_thread_upcount) { data->GroupSumReport->threads = 0; if ((data->GroupSumReport->reference.count > (fullduplexstats ? 2 : 1)) || \ isSumOnly(data->info.common)) { @@ -1849,9 +2035,12 @@ int reporter_condprint_time_interval_report (struct ReporterData *data, struct R } else { sumstats->isMaskOutput = true; } +#if HAVE_SUMMING_DEBUG + reporter_dump_timestamps(packet, stats, sumstats); +#endif reporter_set_timestamps_time(sumstats, INTERVAL); assert(data->GroupSumReport->transfer_protocol_sum_handler != NULL); - (*data->GroupSumReport->transfer_protocol_sum_handler)(sumstats, 0); + (*data->GroupSumReport->transfer_protocol_sum_handler)(sumstats, false); } } // In the (hopefully unlikely event) the reporter fell behind @@ -1863,9 +2052,9 @@ int reporter_condprint_time_interval_report (struct ReporterData *data, struct R } // Conditional print based on bursts or frames -int reporter_condprint_frame_interval_report_server_udp (struct ReporterData *data, struct ReportStruct *packet) { +bool reporter_condprint_frame_interval_report_server_udp (struct ReporterData *data, struct ReportStruct *packet) { struct TransferInfo *stats = &data->info; - int advance_jobq = 0; + bool advance_jobq = false; // first packet of a burst and not a duplicate if ((packet->burstsize == (packet->remaining + packet->packetLen)) && (stats->matchframeID != packet->frameID)) { stats->matchframeID=packet->frameID; @@ -1887,35 +2076,36 @@ int reporter_condprint_frame_interval_report_server_udp (struct ReporterData *da if ((stats->output_handler) && !(stats->isMaskOutput)) (*stats->output_handler)(stats); reporter_reset_transfer_stats_server_udp(stats); - advance_jobq = 1; + advance_jobq = true; } return advance_jobq; } -int reporter_condprint_frame_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) { +bool reporter_condprint_frame_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) { fprintf(stderr, "FIX ME\n"); - return 1; + return true; } -int reporter_condprint_burst_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) { +bool reporter_condprint_burst_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) { struct TransferInfo *stats = &data->info; - int advance_jobq = 0; + int advance_jobq = false; if (packet->transit_ready) { stats->ts.prevpacketTime = packet->sentTime; stats->ts.packetTime = packet->packetTime; reporter_set_timestamps_time(stats, INTERVALPARTIAL); stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev; + stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current - stats->sock_callstats.read.ReadCnt.prev; if ((stats->output_handler) && !(stats->isMaskOutput)) (*stats->output_handler)(stats); reporter_reset_transfer_stats_server_tcp(stats); - advance_jobq = 1; + advance_jobq = true; } return advance_jobq; } -int reporter_condprint_burst_interval_report_client_tcp (struct ReporterData *data, struct ReportStruct *packet) { +bool reporter_condprint_burst_interval_report_client_tcp (struct ReporterData *data, struct ReportStruct *packet) { struct TransferInfo *stats = &data->info; - int advance_jobq = 0; + int advance_jobq = false; // first packet of a burst and not a duplicate if (packet->transit_ready) { reporter_handle_packet_oneway_transit(stats, packet); @@ -1927,7 +2117,7 @@ int reporter_condprint_burst_interval_report_client_tcp (struct ReporterData *da if ((stats->output_handler) && !(stats->isMaskOutput)) (*stats->output_handler)(stats); reporter_reset_transfer_stats_client_tcp(stats); - advance_jobq = 1; + advance_jobq = true; } return advance_jobq; } |