summaryrefslogtreecommitdiffstats
path: root/src/Reporter.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/Reporter.c')
-rw-r--r--src/Reporter.c466
1 files changed, 328 insertions, 138 deletions
diff --git a/src/Reporter.c b/src/Reporter.c
index bbd6fd5..e200a80 100644
--- a/src/Reporter.c
+++ b/src/Reporter.c
@@ -82,12 +82,12 @@ static void reporter_reset_transfer_stats_client_tcp(struct TransferInfo *stats)
static void reporter_reset_transfer_stats_client_udp(struct TransferInfo *stats);
static void reporter_reset_transfer_stats_server_udp(struct TransferInfo *stats);
static void reporter_reset_transfer_stats_server_tcp(struct TransferInfo *stats);
+static void reporter_reset_transfer_stats_sum(struct TransferInfo *stats);
// code for welfornd's algorithm to produce running mean/min/max/var
static void reporter_update_mmm (struct MeanMinMaxStats *stats, double value);
static void reporter_reset_mmm (struct MeanMinMaxStats *stats);
-
// one way delay (OWD) calculations
static void reporter_handle_packet_oneway_transit(struct TransferInfo *stats, struct ReportStruct *packet);
static void reporter_handle_isoch_oneway_transit_tcp(struct TransferInfo *stats, struct ReportStruct *packet);
@@ -160,8 +160,10 @@ bool ReportPacket (struct ReporterData* data, struct ReportStruct *packet) {
if (!TimeZero(stats->ts.nextTCPSampleTime) && (TimeDifference(stats->ts.nextTCPSampleTime, packet->packetTime) < 0)) {
gettcpinfo(data->info.common->socket, &packet->tcpstats);
TimeAdd(stats->ts.nextTCPSampleTime, stats->ts.intervalTime);
+ rc = true;
} else {
gettcpinfo(data->info.common->socket, &packet->tcpstats);
+ rc = true;
}
}
#endif
@@ -194,14 +196,14 @@ bool ReportPacket (struct ReporterData* data, struct ReportStruct *packet) {
* thread to print a final report and to remove the data report from its jobq.
* It also handles the freeing reports and other closing actions
*/
-int EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) {
+bool EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) {
assert(reporthdr!=NULL);
assert(finalpacket!=NULL);
struct ReporterData *report = (struct ReporterData *) reporthdr->this_report;
struct ReportStruct packet;
memset(&packet, 0, sizeof(struct ReportStruct));
- int do_close = 1;
+ bool do_close = true;
/*
* Using PacketID of -1 ends reporting
* It pushes a "special packet" through
@@ -217,24 +219,25 @@ int EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) {
}
#endif
// clear the reporter done predicate
- report->packetring->consumerdone = 0;
+ report->packetring->consumerdone = false;
// the negative packetID is used to inform the report thread this traffic thread is done
packet.packetID = -1;
packet.packetLen = finalpacket->packetLen;
packet.packetTime = finalpacket->packetTime;
+ packet.err_readwrite = NullEvent; // this is not a real event
if (isSingleUDP(report->info.common)) {
packetring_enqueue(report->packetring, &packet);
reporter_process_transfer_report(report);
} else {
ReportPacket(report, &packet);
-#ifdef HAVE_THREAD_DEBUG
- thread_debug( "Traffic thread awaiting reporter to be done with %p and cond %p", (void *)report, (void *) report->packetring->awake_producer);
-#endif
Condition_Lock((*(report->packetring->awake_producer)));
while (!report->packetring->consumerdone) {
// This wait time is the lag between the reporter thread
// and the traffic thread, a reporter thread with lots of
// reports (e.g. fastsampling) can lag per the i/o
+#ifdef HAVE_THREAD_DEBUG
+ thread_debug( "Traffic thread awaiting reporter to be done with %p and cond %p", (void *)report, (void *) report->packetring->awake_producer);
+#endif
Condition_TimedWait(report->packetring->awake_producer, 1);
// printf("Consumer done may be stuck\n");
}
@@ -249,7 +252,7 @@ int EndJob (struct ReportHeader *reporthdr, struct ReportStruct *finalpacket) {
#endif
FreeSumReport(report->FullDuplexReport);
} else {
- do_close = 0;
+ do_close = false;
}
}
return do_close;
@@ -422,7 +425,7 @@ void reporter_spawn (struct thread_Settings *thread) {
#ifdef HAVE_THREAD_DEBUG
thread_debug( "Reporter thread started");
#endif
- if (isEnhanced(thread)) {
+ if (isEnhanced(thread) && (thread->mThreadMode == kMode_Client)) {
myConnectionReport = InitConnectOnlyReport(thread);
}
/*
@@ -480,8 +483,9 @@ void reporter_spawn (struct thread_Settings *thread) {
// Report process report returns true
// when a report needs to be removed
// from the jobq. Also, work item might
- // be removed as part of processing
- // Store a cached pointer for the linked list maitenance
+ // be free as part of its processing
+ // Store a cached pointer tmp
+ // for the next work item
struct ReportHeader *tmp = (*work_item)->next;
if (reporter_process_report(*work_item)) {
#ifdef HAVE_THREAD_DEBUG
@@ -510,11 +514,11 @@ void reporter_spawn (struct thread_Settings *thread) {
}
// The Transfer or Data report is by far the most complicated report
-int reporter_process_transfer_report (struct ReporterData *this_ireport) {
+bool reporter_process_transfer_report (struct ReporterData *this_ireport) {
assert(this_ireport != NULL);
struct TransferInfo *sumstats = (this_ireport->GroupSumReport ? &this_ireport->GroupSumReport->info : NULL);
struct TransferInfo *fullduplexstats = (this_ireport->FullDuplexReport ? &this_ireport->FullDuplexReport->info : NULL);
- int need_free = 0;
+ bool need_free = false;
// The consumption detector applies delay to the reporter
// thread when its consumption rate is too low. This allows
// the traffic threads to send aggregates vs thrash
@@ -530,7 +534,7 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) {
apply_consumption_detector();
// If there are more packets to process then handle them
struct ReportStruct *packet = NULL;
- int advance_jobq = 0;
+ bool advance_jobq = false;
while (!advance_jobq && (packet = packetring_dequeue(this_ireport->packetring))) {
// Increment the total packet count processed by this thread
// this will be used to make decisions on if the reporter
@@ -543,9 +547,21 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) {
reporter_handle_packet_tcpistats(this_ireport, packet);
}
#endif
+ if (this_ireport->transfer_interval_handler) {
+ if (sumstats && (this_ireport->packetring->uplevel != sumstats->uplevel) \
+ && (TimeDifference(sumstats->ts.nextTime, packet->packetTime) > 0)) {
+ sumstats->slot_thread_upcount++;
+#if HAVE_SUMMING_DEBUG
+ printf("**** %s upcnt (%p) pkt=%ld.%ld (up/down)=%d/%d uplevel (sum/pkt)=%d/%d\n", this_ireport->info.common->transferIDStr, (void *)this_ireport->packetring, \
+ (long) packet->packetTime.tv_sec, (long) packet->packetTime.tv_usec, sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \
+ sumstats->uplevel, this_ireport->packetring->uplevel);
+#endif
+ this_ireport->packetring->uplevel = toggleLevel(this_ireport->packetring->uplevel);
+ }
+ }
if (!(packet->packetID < 0)) {
// Check to output any interval reports,
- // bursts need to report the packet first
+ // bursts need to report the packet first
if (this_ireport->packet_handler_pre_report) {
(*this_ireport->packet_handler_pre_report)(this_ireport, packet);
}
@@ -564,8 +580,8 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) {
if (fullduplexstats)
fullduplexstats->ts.packetTime = packet->packetTime;
} else {
- need_free = 1;
- advance_jobq = 1;
+ need_free = true;
+ advance_jobq = true;
// A last packet event was detected
// printf("last packet event detected\n"); fflush(stdout);
this_ireport->reporter_thread_suspends = consumption_detector.reporter_thread_suspends;
@@ -577,7 +593,7 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) {
}
this_ireport->info.ts.packetTime = packet->packetTime;
assert(this_ireport->transfer_protocol_handler != NULL);
- (*this_ireport->transfer_protocol_handler)(this_ireport, 1);
+ (*this_ireport->transfer_protocol_handler)(this_ireport, true);
// This is a final report so set the sum report header's packet time
// Note, the thread with the max value will set this
if (fullduplexstats && isEnhanced(this_ireport->info.common)) {
@@ -587,20 +603,22 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) {
}
if (DecrSumReportRefCounter(this_ireport->FullDuplexReport) == 0) {
if (this_ireport->FullDuplexReport->transfer_protocol_sum_handler) {
- (*this_ireport->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, 1);
+ (*this_ireport->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, true);
}
// FullDuplex report gets freed by a traffic thread (per its barrier)
}
}
if (sumstats) {
- if (!this_ireport->GroupSumReport->threads_cntr_fsum)
- this_ireport->GroupSumReport->threads_cntr_fsum = this_ireport->GroupSumReport->reference.maxcount;
if (TimeDifference(sumstats->ts.packetTime, packet->packetTime) > 0) {
sumstats->ts.packetTime = packet->packetTime;
}
- if (this_ireport->GroupSumReport->transfer_protocol_sum_handler && \
- (--this_ireport->GroupSumReport->threads_cntr_fsum == 0) && (this_ireport->GroupSumReport->reference.maxcount > 1)) {
- (*this_ireport->GroupSumReport->transfer_protocol_sum_handler)(&this_ireport->GroupSumReport->info, 1);
+ if (this_ireport->GroupSumReport->transfer_protocol_sum_handler) {
+ Mutex_Lock(&this_ireport->GroupSumReport->reference.lock);
+ if ((++this_ireport->GroupSumReport->final_thread_upcount == this_ireport->GroupSumReport->reference.maxcount) && \
+ ((this_ireport->GroupSumReport->reference.maxcount > 1) || isSumOnly(this_ireport->info.common))) {
+ (*this_ireport->GroupSumReport->transfer_protocol_sum_handler)(&this_ireport->GroupSumReport->info, true);
+ }
+ Mutex_Unlock(&this_ireport->GroupSumReport->reference.lock);
}
}
}
@@ -614,17 +632,20 @@ int reporter_process_transfer_report (struct ReporterData *this_ireport) {
* can't use them anymore (except for the DATA REPORT);
*
*/
-inline int reporter_process_report (struct ReportHeader *reporthdr) {
+inline bool reporter_process_report (struct ReportHeader *reporthdr) {
assert(reporthdr != NULL);
- int done = 1;
+ bool done = true;
switch (reporthdr->type) {
case DATA_REPORT:
done = reporter_process_transfer_report((struct ReporterData *)reporthdr->this_report);
- fflush(stdout);
if (done) {
struct ReporterData *tmp = (struct ReporterData *)reporthdr->this_report;
struct PacketRing *pr = tmp->packetring;
- pr->consumerdone = 1;
+ pr->consumerdone = true;
+# ifdef HAVE_THREAD_DEBUG
+ struct ReporterData *report = (struct ReporterData *) reporthdr->this_report;
+ thread_debug( "Reporter thread signal traffic thread %p %p", (void *)report, (void *) report->packetring->awake_producer);
+#endif
// Data Reports are special because the traffic thread needs to free them, just signal
Condition_Signal(pr->awake_producer);
}
@@ -642,20 +663,23 @@ inline int reporter_process_report (struct ReportHeader *reporthdr) {
}
}
reporter_print_connection_report(creport);
- fflush(stdout);
FreeReport(reporthdr);
}
break;
case SETTINGS_REPORT:
reporter_print_settings_report((struct ReportSettings *)reporthdr->this_report);
- fflush(stdout);
FreeReport(reporthdr);
break;
case SERVER_RELAY_REPORT:
reporter_print_server_relay_report((struct ServerRelay *)reporthdr->this_report);
- fflush(stdout);
FreeReport(reporthdr);
break;
+ case STRING_REPORT:
+ if (reporthdr->this_report) {
+ printf("%s\n", (char *)reporthdr->this_report);
+ free((char *)reporthdr->this_report);
+ }
+ break;
default:
fprintf(stderr,"Invalid report type in process report %p\n", reporthdr->this_report);
assert(0);
@@ -675,7 +699,7 @@ inline int reporter_process_report (struct ReportHeader *reporthdr) {
// Reporter private routines
void reporter_handle_packet_null (struct ReporterData *data, struct ReportStruct *packet) {
}
-void reporter_transfer_protocol_null (struct ReporterData *data, int final){
+void reporter_transfer_protocol_null (struct ReporterData *data, bool final){
}
static inline void reporter_compute_packet_pps (struct TransferInfo *stats, struct ReportStruct *packet) {
@@ -694,7 +718,7 @@ static void reporter_handle_packet_oneway_transit (struct TransferInfo *stats, s
// Transit or latency updates done inline below
double transit = TimeDifference(packet->packetTime, packet->sentTime);
if (stats->latency_histogram) {
- histogram_insert(stats->latency_histogram, transit, NULL);
+ histogram_insert(stats->latency_histogram, transit, &packet->packetTime);
}
double deltaTransit;
deltaTransit = transit - stats->transit.current.last;
@@ -756,7 +780,6 @@ static void reporter_handle_packet_oneway_transit (struct TransferInfo *stats, s
}
}
-
static void reporter_handle_isoch_oneway_transit_tcp (struct TransferInfo *stats, struct ReportStruct *packet) {
// printf("fid=%lu bs=%lu remain=%lu\n", packet->frameID, packet->burstsize, packet->remaining);
if (packet->frameID && packet->transit_ready) {
@@ -908,12 +931,12 @@ static void reporter_handle_frame_isoch_oneway_transit (struct TransferInfo *sta
void reporter_handle_packet_client (struct ReporterData *data, struct ReportStruct *packet) {
struct TransferInfo *stats = &data->info;
stats->ts.packetTime = packet->packetTime;
- if (!packet->emptyreport) {
+ switch (packet->err_readwrite) {
+ case WriteSelectRetry :
+ stats->sock_callstats.write.WriteErr++;
+ stats->sock_callstats.write.totWriteErr++;
+ case WriteSuccess :
stats->total.Bytes.current += packet->packetLen;
- if (packet->errwrite && (packet->errwrite != WriteErrNoAccount)) {
- stats->sock_callstats.write.WriteErr++;
- stats->sock_callstats.write.totWriteErr++;
- }
// These are valid packets that need standard iperf accounting
stats->sock_callstats.write.WriteCnt += packet->writecnt;
stats->sock_callstats.write.totWriteCnt += packet->writecnt;
@@ -929,7 +952,20 @@ void reporter_handle_packet_client (struct ReporterData *data, struct ReportStru
histogram_insert(stats->write_histogram, (1e-6 * packet->write_time), NULL);
}
}
+ break;
+ case WriteTimeo:
+ stats->sock_callstats.write.WriteTimeo++;
+ stats->sock_callstats.write.totWriteTimeo++;
+ case WriteErrAccount :
+ stats->sock_callstats.write.WriteErr++;
+ stats->sock_callstats.write.totWriteErr++;
+ case WriteNoAccount:
+ case NullEvent:
+ break;
+ default :
+ fprintf(stderr, "Program error: invalid client packet->err_readwrite %d\n", packet->err_readwrite);
}
+
if (isUDP(stats->common)) {
stats->PacketID = packet->packetID;
reporter_compute_packet_pps(stats, packet);
@@ -945,25 +981,47 @@ void reporter_handle_packet_bb_client (struct ReporterData *data, struct ReportS
if (!packet->emptyreport && (packet->packetLen > 0)) {
stats->total.Bytes.current += packet->packetLen;
double bbrtt = TimeDifference(packet->packetTime, packet->sentTime);
- double bbowdto = TimeDifference(packet->sentTimeRX, packet->sentTime);
- double bbowdfro = TimeDifference(packet->packetTime, packet->sentTimeTX);
- double asym = bbowdfro - bbowdto;
- stats->ts.prevpacketTime = packet->packetTime;
-#if DEBUG_BB_TIMESTAMPS
- fprintf(stderr, "BB Debug: ctx=%lx.%lx srx=%lx.%lx stx=%lx.%lx crx=%lx.%lx (hex)\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->sentTimeRX.tv_sec, packet->sentTimeRX.tv_usec, packet->sentTimeTX.tv_sec, packet->sentTimeTX.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec);
- fprintf(stderr, "BB Debug: ctx=%ld.%ld srx=%ld.%ld stx=%ld.%ld crx=%ld.%ld\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->sentTimeRX.tv_sec, packet->sentTimeRX.tv_usec, packet->sentTimeTX.tv_sec, packet->sentTimeTX.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec);
- fprintf(stderr, "BB RTT=%f OWDTo=%f OWDFro=%f Asym=%f\n", bbrtt, bbowdto, bbowdfro, asym);
-#endif
stats->iBBrunning += bbrtt;
stats->fBBrunning += bbrtt;
reporter_update_mmm(&stats->bbrtt.current, bbrtt);
reporter_update_mmm(&stats->bbrtt.total, bbrtt);
- reporter_update_mmm(&stats->bbowdto.total, bbowdto);
- reporter_update_mmm(&stats->bbowdfro.total, bbowdfro);
- reporter_update_mmm(&stats->bbasym.total, fabs(asym));
if (stats->bbrtt_histogram) {
histogram_insert(stats->bbrtt_histogram, bbrtt, NULL);
}
+ if (isTripTime(stats->common)) {
+ double bbowdto = TimeDifference(packet->sentTimeRX, packet->sentTime);
+ double bbowdfro = TimeDifference(packet->packetTime, packet->sentTimeTX);
+ double asym = bbowdfro - bbowdto;
+ double bbturnaround = TimeDifference(packet->sentTimeTX, packet->sentTimeRX);
+ double bbadj = TimeDifference(packet->packetTime, packet->sentTimeRX);
+ // If you measure RTT, you can detect when clock are unsync.
+ // If you have the sent-time, rcv-time and return-time, you can check that
+ // sent-time < rcv-time < return-time. As sent-time and return-time use
+ // the same clock, you can detect any drift bigger than RTT. JT
+ //
+ // Adjust this clock A write < clock B read < Clock A read - (clock B write - clock B read)
+ if ((bbowdto < 0) || ((bbadj - bbturnaround) < 0)) {
+ stats->bb_clocksync_error++;
+ }
+ reporter_update_mmm(&stats->bbowdto.total, bbowdto);
+ reporter_update_mmm(&stats->bbowdfro.total, bbowdfro);
+ reporter_update_mmm(&stats->bbasym.total, fabs(asym));
+ reporter_update_mmm(&stats->bbowdto.current, bbowdto);
+ reporter_update_mmm(&stats->bbowdfro.current, bbowdfro);
+ reporter_update_mmm(&stats->bbasym.current, fabs(asym));
+ if (stats->bbowdto_histogram) {
+ histogram_insert(stats->bbowdto_histogram, bbowdto, NULL);
+ }
+ if (stats->bbowdfro_histogram) {
+ histogram_insert(stats->bbowdfro_histogram, bbowdfro, NULL);
+ }
+ }
+ stats->ts.prevpacketTime = packet->packetTime;
+#if DEBUG_BB_TIMESTAMPS
+ fprintf(stderr, "BB Debug: ctx=%lx.%lx srx=%lx.%lx stx=%lx.%lx crx=%lx.%lx (hex)\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->sentTimeRX.tv_sec, packet->sentTimeRX.tv_usec, packet->sentTimeTX.tv_sec, packet->sentTimeTX.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec);
+ fprintf(stderr, "BB Debug: ctx=%ld.%ld srx=%ld.%ld stx=%ld.%ld crx=%ld.%ld\n", packet->sentTime.tv_sec, packet->sentTime.tv_usec, packet->sentTimeRX.tv_sec, packet->sentTimeRX.tv_usec, packet->sentTimeTX.tv_sec, packet->sentTimeTX.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec);
+ fprintf(stderr, "BB RTT=%f OWDTo=%f OWDFro=%f Asym=%f\n", bbrtt, bbowdto, bbowdfro, asym);
+#endif
}
}
@@ -981,8 +1039,7 @@ inline void reporter_handle_packet_server_tcp (struct ReporterData *data, struct
int bin;
stats->total.Bytes.current += packet->packetLen;
// mean min max tests
- stats->sock_callstats.read.cntRead++;
- stats->sock_callstats.read.totcntRead++;
+ stats->sock_callstats.read.ReadCnt.current++;
bin = (int)floor((packet->packetLen -1)/stats->sock_callstats.read.binsize);
if (bin < TCPREADBINCOUNT) {
stats->sock_callstats.read.bins[bin]++;
@@ -1007,8 +1064,21 @@ inline void reporter_handle_packet_server_udp (struct ReporterData *data, struct
// Hence, set the per interval min to infinity
// and the per interval max and sum to zero
reporter_reset_mmm(&stats->transit.current);
- } else if (packet->packetID > 0) {
- stats->total.Bytes.current += packet->packetLen;
+ } else if (!packet->emptyreport && (packet->packetID > 0)) {
+ bool ooo_packet = false;
+ // packet loss occured if the datagram numbers aren't sequential
+ if (packet->packetID != stats->PacketID + 1) {
+ if (packet->packetID < stats->PacketID + 1) {
+ stats->total.OutofOrder.current++;
+ ooo_packet = true;
+ } else {
+ stats->total.Lost.current += packet->packetID - stats->PacketID - 1;
+ }
+ }
+ // never decrease datagramID (e.g. if we get an out-of-order packet)
+ if (packet->packetID > stats->PacketID) {
+ stats->PacketID = packet->packetID;
+ }
// These are valid packets that need standard iperf accounting
// Do L2 accounting first (if needed)
if (packet->l2errors && (stats->total.Datagrams.current > L2DROPFILTERCOUNTER)) {
@@ -1027,20 +1097,17 @@ inline void reporter_handle_packet_server_udp (struct ReporterData *data, struct
stats->l2counts.tot_udpcsumerr++;
}
}
- // packet loss occured if the datagram numbers aren't sequential
- if (packet->packetID != stats->PacketID + 1) {
- if (packet->packetID < stats->PacketID + 1) {
- stats->total.OutofOrder.current++;
- } else {
- stats->total.Lost.current += packet->packetID - stats->PacketID - 1;
- }
+ if (packet->err_readwrite == ReadErrLen) {
+ stats->sock_callstats.read.ReadErrLenCnt.current++;
}
- // never decrease datagramID (e.g. if we get an out-of-order packet)
- if (packet->packetID > stats->PacketID) {
- stats->PacketID = packet->packetID;
+ if (!ooo_packet && \
+ ((packet->err_readwrite == ReadSuccess) ||
+ ((packet->err_readwrite == ReadErrLen) && (packet->packetLen >= sizeof(struct UDP_datagram))))) {
+ reporter_handle_packet_oneway_transit(stats, packet);
}
+ stats->total.Bytes.current += packet->packetLen;
reporter_compute_packet_pps(stats, packet);
- reporter_handle_packet_oneway_transit(stats, packet);
+
if (packet->transit_ready) {
if (isIsochronous(stats->common)) {
reporter_handle_isoch_oneway_transit_udp(stats, packet);
@@ -1049,9 +1116,14 @@ inline void reporter_handle_packet_server_udp (struct ReporterData *data, struct
}
}
}
+ if (packet->err_readwrite != ReadNoAccount) {
+ if (packet->emptyreport) {
+ stats->sock_callstats.read.ReadTimeoCnt.current++;
+ } else {
+ stats->sock_callstats.read.ReadCnt.current++;
+ }
+ }
}
-
-// This is done in reporter thread context
#if HAVE_TCP_STATS
static inline void reporter_handle_packet_tcpistats (struct ReporterData *data, struct ReportStruct *packet) {
assert(data!=NULL);
@@ -1060,12 +1132,22 @@ static inline void reporter_handle_packet_tcpistats (struct ReporterData *data,
stats->sock_callstats.write.tcpstats.retry_prev = packet->tcpstats.retry_tot;
stats->sock_callstats.write.tcpstats.retry_tot = packet->tcpstats.retry_tot;
stats->sock_callstats.write.tcpstats.cwnd = packet->tcpstats.cwnd;
+ stats->sock_callstats.write.tcpstats.cwnd_packets = packet->tcpstats.cwnd_packets;
stats->sock_callstats.write.tcpstats.rtt = packet->tcpstats.rtt;
stats->sock_callstats.write.tcpstats.rttvar = packet->tcpstats.rttvar;
+#if HAVE_TCP_INFLIGHT
+ stats->sock_callstats.write.tcpstats.packets_in_flight = packet->tcpstats.packets_in_flight;
+ stats->sock_callstats.write.tcpstats.bytes_in_flight = packet->tcpstats.bytes_in_flight;
+#else
+ stats->sock_callstats.write.tcpstats.bytes_in_flight = -1;
+ stats->sock_callstats.write.tcpstats.packets_in_flight = -1;
+#endif
+#if (HAVE_DECL_SO_MAX_PACING_RATE)
+ stats->FQPacingRateCurrent = packet->FQPacingRate;
+#endif
}
#endif
-
/*
* Report printing routines below
*/
@@ -1110,6 +1192,22 @@ static inline void reporter_set_timestamps_time (struct TransferInfo *stats, enu
}
}
+#if HAVE_SUMMING_DEBUG
+static void reporter_dump_timestamps (struct ReportStruct *packet, struct TransferInfo *stats, struct TransferInfo *sumstats) {
+ if (packet)
+ printf("**** %s pkt =%ld.%ld (up/down)=%d/%d\n", stats->common->transferIDStr, (long) packet->packetTime.tv_sec, \
+ (long) packet->packetTime.tv_usec, sumstats->slot_thread_upcount, sumstats->slot_thread_downcount);
+ else {
+ printf("**** %s pkt ts =%ld.%ld prev=%ld.%ld (up/down)=%d/%d\n", stats->common->transferIDStr, (long) stats->ts.packetTime.tv_sec, \
+ (long) stats->ts.packetTime.tv_usec, (long) stats->ts.prevpacketTime.tv_sec, (long) stats->ts.prevpacketTime.tv_usec, sumstats->slot_thread_upcount, sumstats->slot_thread_downcount);
+ }
+ printf("**** %s stats =%ld.%ld next=%ld.%ld prev=%ld.%ld\n", stats->common->transferIDStr, (long) stats->ts.packetTime.tv_sec, \
+ (long) stats->ts.packetTime.tv_usec, (long) stats->ts.nextTime.tv_sec, (long) stats->ts.nextTime.tv_usec, (long) stats->ts.prevpacketTime.tv_sec, (long) stats->ts.prevpacketTime.tv_usec);
+ printf("**** %s sum stats=%ld.%ld next=%ld.%ld prev=%ld.%ld \n", stats->common->transferIDStr, (long) sumstats->ts.packetTime.tv_sec, \
+ (long) sumstats->ts.packetTime.tv_usec, (long) sumstats->ts.nextTime.tv_sec, (long) sumstats->ts.nextTime.tv_usec, (long) sumstats->ts.prevTime.tv_sec, (long) sumstats->ts.prevTime.tv_usec);
+}
+#endif
+
// If reports were missed, catch up now
static inline void reporter_transfer_protocol_missed_reports (struct TransferInfo *stats, struct ReportStruct *packet) {
while (TimeDifference(packet->packetTime, stats->ts.nextTime) > TimeDouble(stats->ts.intervalTime)) {
@@ -1125,10 +1223,24 @@ static inline void reporter_transfer_protocol_missed_reports (struct TransferInf
}
}
+static inline void reporter_reset_transfer_stats_sum (struct TransferInfo *sumstats) {
+#if HAVE_SUMMING_DEBUG
+ printf("***** [SUM] RESET %ld.%ld nxt %ld.%ld down=%d up=%d\n", (long) sumstats->ts.prevTime.tv_sec, (long) sumstats->ts.prevTime.tv_usec, \
+ (long) sumstats->ts.nextTime.tv_sec, (long) sumstats->ts.nextTime.tv_usec, sumstats->slot_thread_downcount, sumstats->slot_thread_upcount);
+#endif
+ sumstats->slot_thread_upcount -= sumstats->slot_thread_downcount;
+ sumstats->slot_thread_downcount = 0;
+ sumstats->ts.prevTime = sumstats->ts.nextTime;
+ sumstats->iInP = 0;
+ sumstats->uplevel = toggleLevel(sumstats->uplevel);
+ sumstats->downlevel = toggleLevel(sumstats->downlevel);
+}
+
static inline void reporter_reset_transfer_stats_client_tcp (struct TransferInfo *stats) {
stats->total.Bytes.prev = stats->total.Bytes.current;
stats->sock_callstats.write.WriteCnt = 0;
stats->sock_callstats.write.WriteErr = 0;
+ stats->sock_callstats.write.WriteTimeo = 0;
stats->isochstats.framecnt.prev = stats->isochstats.framecnt.current;
stats->isochstats.framelostcnt.prev = stats->isochstats.framelostcnt.current;
stats->isochstats.slipcnt.prev = stats->isochstats.slipcnt.current;
@@ -1144,13 +1256,7 @@ static inline void reporter_reset_transfer_stats_client_tcp (struct TransferInfo
reporter_reset_mmm(&stats->bbasym.current);
}
if (isTcpWriteTimes(stats->common)) {
- stats->write_mmm.current.cnt = 0;
- stats->write_mmm.current.min = FLT_MAX;
- stats->write_mmm.current.max = FLT_MIN;
- stats->write_mmm.current.sum = 0;
- stats->write_mmm.current.vd = 0;
- stats->write_mmm.current.mean = 0;
- stats->write_mmm.current.m2 = 0;
+ reporter_reset_mmm(&stats->write_mmm.current);
}
}
@@ -1164,6 +1270,7 @@ static inline void reporter_reset_transfer_stats_client_udp (struct TransferInfo
stats->total.IPG.prev = stats->total.IPG.current;
stats->sock_callstats.write.WriteCnt = 0;
stats->sock_callstats.write.WriteErr = 0;
+ stats->sock_callstats.write.WriteTimeo = 0;
stats->isochstats.framecnt.prev = stats->isochstats.framecnt.current;
stats->isochstats.framelostcnt.prev = stats->isochstats.framelostcnt.current;
stats->isochstats.slipcnt.prev = stats->isochstats.slipcnt.current;
@@ -1174,7 +1281,7 @@ static inline void reporter_reset_transfer_stats_client_udp (struct TransferInfo
static inline void reporter_reset_transfer_stats_server_tcp (struct TransferInfo *stats) {
int ix;
stats->total.Bytes.prev = stats->total.Bytes.current;
- stats->sock_callstats.read.cntRead = 0;
+ stats->sock_callstats.read.ReadCnt.prev = stats->sock_callstats.read.ReadCnt.current;
for (ix = 0; ix < 8; ix++) {
stats->sock_callstats.read.bins[ix] = 0;
}
@@ -1198,8 +1305,10 @@ static inline void reporter_reset_transfer_stats_server_udp (struct TransferInfo
stats->l2counts.unknown = 0;
stats->l2counts.udpcsumerr = 0;
stats->l2counts.lengtherr = 0;
- stats->threadcnt = 0;
stats->iInP = 0;
+ stats->sock_callstats.read.ReadCnt.prev = stats->sock_callstats.read.ReadCnt.current;
+ stats->sock_callstats.read.ReadTimeoCnt.prev = stats->sock_callstats.read.ReadTimeoCnt.current;
+ stats->sock_callstats.read.ReadErrLenCnt.prev = stats->sock_callstats.read.ReadErrLenCnt.current;
if (stats->cntDatagrams)
stats->IPGsum = 0;
}
@@ -1209,7 +1318,7 @@ static inline void reporter_reset_transfer_stats_server_udp (struct TransferInfo
// o) set the TransferInfo struct and then calls the individual report output handler
// o) updates the sum and fullduplex reports
//
-void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final) {
+void reporter_transfer_protocol_server_udp (struct ReporterData *data, bool final) {
struct TransferInfo *stats = &data->info;
struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL;
struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL;
@@ -1223,6 +1332,9 @@ void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final
stats->cntError = 0;
stats->cntDatagrams = stats->PacketID - stats->total.Datagrams.prev;
stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev;
+ stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current - stats->sock_callstats.read.ReadCnt.prev;
+ stats->sock_callstats.read.cntReadTimeo = stats->sock_callstats.read.ReadTimeoCnt.current - stats->sock_callstats.read.ReadTimeoCnt.prev;
+ stats->sock_callstats.read.cntReadErrLen = stats->sock_callstats.read.ReadErrLenCnt.current - stats->sock_callstats.read.ReadErrLenCnt.prev;
if (stats->latency_histogram) {
stats->latency_histogram->final = final;
@@ -1258,8 +1370,24 @@ void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final
sumstats->total.IPG.current += stats->cntIPG;
if (sumstats->IPGsum < stats->IPGsum)
sumstats->IPGsum = stats->IPGsum;
- sumstats->threadcnt++;
sumstats->iInP += stats->iInP;
+ sumstats->sock_callstats.read.cntRead += stats->sock_callstats.read.cntRead;
+ sumstats->sock_callstats.read.cntReadTimeo += stats->sock_callstats.read.cntReadTimeo;
+ sumstats->sock_callstats.read.cntReadErrLen += stats->sock_callstats.read.cntReadErrLen;
+ if (final) {
+ sumstats->threadcnt_final++;
+ if (data->packetring->downlevel != sumstats->downlevel) {
+ sumstats->slot_thread_downcount++;
+ }
+ if (data->packetring->uplevel != sumstats->uplevel){
+ sumstats->slot_thread_upcount++;
+ }
+#if HAVE_SUMMING_DEBUG
+ printf("**** %s downcnt (%p) (up/down)=%d/%d final true uplevel (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \
+ sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \
+ sumstats->uplevel, data->packetring->uplevel);
+#endif
+ }
}
if (fullduplexstats) {
fullduplexstats->total.Bytes.current += stats->cntBytes;
@@ -1321,20 +1449,23 @@ void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final
if (stats->latency_histogram) {
if (sumstats && sumstats->latency_histogram) {
histogram_add(sumstats->latency_histogram, stats->latency_histogram);
- sumstats->latency_histogram->final = 1;
+ sumstats->latency_histogram->final = true;
}
- stats->latency_histogram->final = 1;
+ stats->latency_histogram->final = true;
}
if (stats->jitter_histogram) {
if (sumstats && sumstats->jitter_histogram) {
histogram_add(sumstats->jitter_histogram, stats->jitter_histogram);
- sumstats->jitter_histogram->final = 1;
+ sumstats->jitter_histogram->final = true;
}
- stats->jitter_histogram->final = 1;
+ stats->jitter_histogram->final = true;
}
if (stats->framelatency_histogram) {
- stats->framelatency_histogram->final = 1;
+ stats->framelatency_histogram->final = true;
}
+ stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current;
+ stats->sock_callstats.read.cntReadTimeo = stats->sock_callstats.read.ReadTimeoCnt.current;
+ stats->sock_callstats.read.cntReadErrLen = stats->sock_callstats.read.ReadErrLenCnt.current;
}
if ((stats->output_handler) && !(stats->isMaskOutput))
(*stats->output_handler)(stats);
@@ -1342,7 +1473,7 @@ void reporter_transfer_protocol_server_udp (struct ReporterData *data, int final
reporter_reset_transfer_stats_server_udp(stats);
}
-void reporter_transfer_protocol_sum_server_udp (struct TransferInfo *stats, int final) {
+void reporter_transfer_protocol_sum_server_udp (struct TransferInfo *stats, bool final) {
if (final) {
reporter_set_timestamps_time(stats, TOTAL);
stats->cntOutofOrder = stats->total.OutofOrder.current;
@@ -1368,15 +1499,16 @@ void reporter_transfer_protocol_sum_server_udp (struct TransferInfo *stats, int
stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev;
}
- if ((stats->output_handler) && !(stats->isMaskOutput))
+ if ((stats->output_handler) && !(stats->isMaskOutput)) {
(*stats->output_handler)(stats);
+ }
if (!final) {
// there is no packet ID for sum server reports, set it to total cnt for calculation
stats->PacketID = stats->total.Datagrams.current;
reporter_reset_transfer_stats_server_udp(stats);
}
}
-void reporter_transfer_protocol_sum_client_udp (struct TransferInfo *stats, int final) {
+void reporter_transfer_protocol_sum_client_udp (struct TransferInfo *stats, bool final) {
if (final) {
reporter_set_timestamps_time(stats, TOTAL);
stats->sock_callstats.write.WriteErr = stats->sock_callstats.write.totWriteErr;
@@ -1390,19 +1522,18 @@ void reporter_transfer_protocol_sum_client_udp (struct TransferInfo *stats, int
stats->cntIPG = stats->total.IPG.current - stats->total.IPG.prev;
stats->cntDatagrams = stats->total.Datagrams.current - stats->total.Datagrams.prev;
}
- if ((stats->output_handler) && !(stats->isMaskOutput))
+ if ((stats->output_handler) && !(stats->isMaskOutput)) {
(*stats->output_handler)(stats);
-
+ }
if (!final) {
- stats->threadcnt = 0;
reporter_reset_transfer_stats_client_udp(stats);
} else if ((stats->common->ReportMode != kReport_CSV) && !(stats->isMaskOutput)) {
- printf(report_sumcnt_datagrams, stats->threadcnt, stats->total.Datagrams.current);
+ printf(report_sumcnt_datagrams, stats->threadcnt_final, stats->total.Datagrams.current);
fflush(stdout);
}
}
-void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final) {
+void reporter_transfer_protocol_client_udp (struct ReporterData *data, bool final) {
struct TransferInfo *stats = &data->info;
struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL;
struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL;
@@ -1418,13 +1549,28 @@ void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final
sumstats->total.Bytes.current += stats->cntBytes;
sumstats->sock_callstats.write.WriteErr += stats->sock_callstats.write.WriteErr;
sumstats->sock_callstats.write.WriteCnt += stats->sock_callstats.write.WriteCnt;
+ sumstats->sock_callstats.write.WriteTimeo += stats->sock_callstats.write.WriteTimeo;
sumstats->sock_callstats.write.totWriteErr += stats->sock_callstats.write.WriteErr;
sumstats->sock_callstats.write.totWriteCnt += stats->sock_callstats.write.WriteCnt;
+ sumstats->sock_callstats.write.totWriteTimeo += stats->sock_callstats.write.WriteTimeo;
sumstats->total.Datagrams.current += stats->cntDatagrams;
if (sumstats->IPGsum < stats->IPGsum)
sumstats->IPGsum = stats->IPGsum;
sumstats->total.IPG.current += stats->cntIPG;
- sumstats->threadcnt++;
+ if (final) {
+ sumstats->threadcnt_final++;
+ if (data->packetring->downlevel != sumstats->downlevel) {
+ sumstats->slot_thread_downcount++;
+ }
+ if (data->packetring->uplevel != sumstats->uplevel){
+ sumstats->slot_thread_upcount++;
+ }
+#if HAVE_SUMMING_DEBUG
+ printf("**** %s downcnt (%p) (up/down)=%d/%d final true level (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \
+ sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \
+ sumstats->uplevel, data->packetring->uplevel);
+#endif
+ }
}
if (fullduplexstats) {
fullduplexstats->total.Bytes.current += stats->cntBytes;
@@ -1438,6 +1584,7 @@ void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final
stats->cntBytes = stats->total.Bytes.current;
stats->sock_callstats.write.WriteErr = stats->sock_callstats.write.totWriteErr;
stats->sock_callstats.write.WriteCnt = stats->sock_callstats.write.totWriteCnt;
+ stats->sock_callstats.write.WriteTimeo = stats->sock_callstats.write.totWriteTimeo;
stats->cntIPG = stats->total.IPG.current;
stats->cntDatagrams = stats->PacketID;
stats->IPGsum = TimeDifference(stats->ts.packetTime, stats->ts.startTime);
@@ -1463,14 +1610,14 @@ void reporter_transfer_protocol_client_udp (struct ReporterData *data, int final
reporter_reset_transfer_stats_client_udp(stats);
}
-void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final) {
+void reporter_transfer_protocol_server_tcp (struct ReporterData *data, bool final) {
struct TransferInfo *stats = &data->info;
struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL;
struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL;
stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
int ix;
if (stats->framelatency_histogram) {
- stats->framelatency_histogram->final = 0;
+ stats->framelatency_histogram->final = false;
}
double thisInP;
if (!final) {
@@ -1479,18 +1626,18 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final
double meantransit = (double) ((stats->transit.current.cnt > 0) ? (stats->transit.current.sum / stats->transit.current.cnt) : 0.0);
thisInP = lambda * meantransit;
stats->iInP = thisInP;
+ stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current - stats->sock_callstats.read.ReadCnt.prev;
} else {
double bytecnt = (double) stats->cntBytes;
double lambda = (stats->IPGsum > 0.0) ? (bytecnt / stats->IPGsum) : 0.0;
double meantransit = (double) ((stats->transit.total.cnt > 0) ? (stats->transit.total.sum / stats->transit.total.cnt) : 0.0);
thisInP = lambda * meantransit;
stats->fInP = thisInP;
+ stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current;
}
if (sumstats) {
- sumstats->threadcnt++;
sumstats->total.Bytes.current += stats->cntBytes;
- sumstats->sock_callstats.read.cntRead += stats->sock_callstats.read.cntRead;
- sumstats->sock_callstats.read.totcntRead += stats->sock_callstats.read.cntRead;
+ sumstats->sock_callstats.read.ReadCnt.current += stats->sock_callstats.read.cntRead;
for (ix = 0; ix < TCPREADBINCOUNT; ix++) {
sumstats->sock_callstats.read.bins[ix] += stats->sock_callstats.read.bins[ix];
sumstats->sock_callstats.read.totbins[ix] += stats->sock_callstats.read.bins[ix];
@@ -1499,6 +1646,18 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final
sumstats->iInP += thisInP;
} else {
sumstats->fInP += thisInP;
+ sumstats->threadcnt_final++;
+ if (data->packetring->downlevel != sumstats->downlevel) {
+ sumstats->slot_thread_downcount++;
+ }
+ if (data->packetring->uplevel != sumstats->uplevel){
+ sumstats->slot_thread_upcount++;
+ }
+#if HAVE_SUMMING_DEBUG
+ printf("**** %s downcnt (%p) (up/down)=%d/%d final true level (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \
+ sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \
+ sumstats->uplevel, data->packetring->uplevel);
+#endif
}
}
if (fullduplexstats) {
@@ -1520,12 +1679,12 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final
}
}
if (stats->framelatency_histogram) {
- stats->framelatency_histogram->final = 1;
+ stats->framelatency_histogram->final = true;
}
reporter_set_timestamps_time(stats, TOTAL);
stats->cntBytes = stats->total.Bytes.current;
stats->IPGsum = stats->ts.iEnd;
- stats->sock_callstats.read.cntRead = stats->sock_callstats.read.totcntRead;
+ stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current;
for (ix = 0; ix < TCPREADBINCOUNT; ix++) {
stats->sock_callstats.read.bins[ix] = stats->sock_callstats.read.totbins[ix];
}
@@ -1538,9 +1697,9 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final
if (stats->framelatency_histogram) {
if (sumstats && sumstats->framelatency_histogram) {
histogram_add(sumstats->framelatency_histogram, stats->framelatency_histogram);
- sumstats->framelatency_histogram->final = 1;
+ sumstats->framelatency_histogram->final = true;
}
- stats->framelatency_histogram->final = 1;
+ stats->framelatency_histogram->final = true;
}
} else if (isIsochronous(stats->common)) {
stats->isochstats.cntFrames = stats->isochstats.framecnt.current - stats->isochstats.framecnt.prev;
@@ -1557,7 +1716,7 @@ void reporter_transfer_protocol_server_tcp (struct ReporterData *data, int final
reporter_reset_transfer_stats_server_tcp(stats);
}
-void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final) {
+void reporter_transfer_protocol_client_tcp (struct ReporterData *data, bool final) {
struct TransferInfo *stats = &data->info;
struct TransferInfo *sumstats = (data->GroupSumReport != NULL) ? &data->GroupSumReport->info : NULL;
struct TransferInfo *fullduplexstats = (data->FullDuplexReport != NULL) ? &data->FullDuplexReport->info : NULL;
@@ -1585,7 +1744,24 @@ void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final
sumstats->sock_callstats.write.WriteCnt += stats->sock_callstats.write.WriteCnt;
sumstats->sock_callstats.write.totWriteErr += stats->sock_callstats.write.WriteErr;
sumstats->sock_callstats.write.totWriteCnt += stats->sock_callstats.write.WriteCnt;
- sumstats->threadcnt++;
+ if (final) {
+ sumstats->threadcnt_final++;
+ if (data->packetring->downlevel != sumstats->downlevel) {
+ sumstats->slot_thread_downcount++;
+ }
+ if (data->packetring->uplevel != sumstats->uplevel){
+ sumstats->slot_thread_upcount++;
+ }
+#if HAVE_SUMMING_DEBUG
+ printf("**** %s downcnt (%p) (up/down)=%d/%d final true level (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \
+ sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \
+ sumstats->uplevel, data->packetring->uplevel);
+#endif
+ }
+#if HAVE_SUMMING_DEBUG
+ reporter_dump_timestamps(NULL, stats, sumstats);
+#endif
+
#if HAVE_TCP_STATS
sumstats->sock_callstats.write.tcpstats.retry += stats->sock_callstats.write.tcpstats.retry;
sumstats->sock_callstats.write.tcpstats.retry_tot += stats->sock_callstats.write.tcpstats.retry;
@@ -1596,10 +1772,10 @@ void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final
}
if (final) {
if (stats->latency_histogram) {
- stats->latency_histogram->final = 1;
+ stats->latency_histogram->final = true;
}
if (stats->write_histogram) {
- stats->write_histogram->final = 1;
+ stats->write_histogram->final = true;
}
if ((stats->cntBytes > 0) && stats->output_handler && !TimeZero(stats->ts.intervalTime)) {
// print a partial interval report if enable and this a final
@@ -1626,7 +1802,7 @@ void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final
stats->sock_callstats.write.tcpstats.retry = stats->sock_callstats.write.tcpstats.retry_tot;
#endif
if (stats->framelatency_histogram) {
- stats->framelatency_histogram->final = 1;
+ stats->framelatency_histogram->final = true;
}
stats->cntBytes = stats->total.Bytes.current;
stats->write_mmm.current = stats->write_mmm.total;
@@ -1646,19 +1822,20 @@ void reporter_transfer_protocol_client_tcp (struct ReporterData *data, int final
/*
* Handles summing of threads
*/
-void reporter_transfer_protocol_sum_client_tcp (struct TransferInfo *stats, int final) {
+void reporter_transfer_protocol_sum_client_tcp (struct TransferInfo *stats, bool final) {
if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) {
stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
if (final) {
if ((stats->output_handler) && !(stats->isMaskOutput)) {
reporter_set_timestamps_time(stats, FINALPARTIAL);
- if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
+ if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial) {
(*stats->output_handler)(stats);
+ }
reporter_reset_transfer_stats_client_tcp(stats);
}
} else if ((stats->output_handler) && !(stats->isMaskOutput)) {
(*stats->output_handler)(stats);
- stats->threadcnt = 0;
+ reporter_reset_transfer_stats_sum(stats);
}
reporter_reset_transfer_stats_client_tcp(stats);
}
@@ -1675,7 +1852,7 @@ void reporter_transfer_protocol_sum_client_tcp (struct TransferInfo *stats, int
}
}
-void reporter_transfer_protocol_client_bb_tcp (struct ReporterData *data, int final) {
+void reporter_transfer_protocol_client_bb_tcp (struct ReporterData *data, bool final) {
struct TransferInfo *stats = &data->info;
if (final) {
@@ -1703,7 +1880,7 @@ void reporter_transfer_protocol_client_bb_tcp (struct ReporterData *data, int fi
}
}
-void reporter_transfer_protocol_server_bb_tcp (struct ReporterData *data, int final) {
+void reporter_transfer_protocol_server_bb_tcp (struct ReporterData *data, bool final) {
struct TransferInfo *stats = &data->info;
if (final) {
if ((stats->cntBytes > 0) && stats->output_handler && !TimeZero(stats->ts.intervalTime)) {
@@ -1716,7 +1893,6 @@ void reporter_transfer_protocol_server_bb_tcp (struct ReporterData *data, int fi
}
}
#if HAVE_TCP_STATS
-
stats->sock_callstats.write.tcpstats.retry = stats->sock_callstats.write.tcpstats.retry_tot;
#endif
stats->cntBytes = stats->total.Bytes.current;
@@ -1730,26 +1906,27 @@ void reporter_transfer_protocol_server_bb_tcp (struct ReporterData *data, int fi
reporter_reset_transfer_stats_client_tcp(stats);
}
-void reporter_transfer_protocol_sum_server_tcp (struct TransferInfo *stats, int final) {
+void reporter_transfer_protocol_sum_server_tcp (struct TransferInfo *stats, bool final) {
if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) {
stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
+ stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current - stats->sock_callstats.read.ReadCnt.prev;
if (final) {
if ((stats->output_handler) && !(stats->isMaskOutput)) {
reporter_set_timestamps_time(stats, FINALPARTIAL);
- if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial)
+ if ((stats->ts.iEnd - stats->ts.iStart) > stats->ts.significant_partial) {
(*stats->output_handler)(stats);
+ }
}
} else if ((stats->output_handler) && !(stats->isMaskOutput)) {
(*stats->output_handler)(stats);
- stats->threadcnt = 0;
- stats->iInP = 0;
+ reporter_reset_transfer_stats_sum(stats);
}
reporter_reset_transfer_stats_server_tcp(stats);
}
if (final) {
int ix;
stats->cntBytes = stats->total.Bytes.current;
- stats->sock_callstats.read.cntRead = stats->sock_callstats.read.totcntRead;
+ stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current;
for (ix = 0; ix < TCPREADBINCOUNT; ix++) {
stats->sock_callstats.read.bins[ix] = stats->sock_callstats.read.totbins[ix];
}
@@ -1759,7 +1936,7 @@ void reporter_transfer_protocol_sum_server_tcp (struct TransferInfo *stats, int
(*stats->output_handler)(stats);
}
}
-void reporter_transfer_protocol_fullduplex_tcp (struct TransferInfo *stats, int final) {
+void reporter_transfer_protocol_fullduplex_tcp (struct TransferInfo *stats, bool final) {
if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) {
stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
if (final) {
@@ -1781,7 +1958,7 @@ void reporter_transfer_protocol_fullduplex_tcp (struct TransferInfo *stats, int
(*stats->output_handler)(stats);
}
-void reporter_transfer_protocol_fullduplex_udp (struct TransferInfo *stats, int final) {
+void reporter_transfer_protocol_fullduplex_udp (struct TransferInfo *stats, bool final) {
if (!final || (final && (stats->cntBytes > 0) && !TimeZero(stats->ts.intervalTime))) {
stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
stats->cntDatagrams = stats->total.Datagrams.current - stats->total.Datagrams.prev;
@@ -1814,11 +1991,11 @@ void reporter_transfer_protocol_fullduplex_udp (struct TransferInfo *stats, int
}
// Conditional print based on time
-int reporter_condprint_time_interval_report (struct ReporterData *data, struct ReportStruct *packet) {
+bool reporter_condprint_time_interval_report (struct ReporterData *data, struct ReportStruct *packet) {
struct TransferInfo *stats = &data->info;
assert(stats!=NULL);
// printf("***sum handler = %p\n", (void *) data->GroupSumReport->transfer_protocol_sum_handler);
- int advance_jobq = 0;
+ bool advance_jobq = false;
// Print a report if packet time exceeds the next report interval time,
// Also signal to the caller to move to the next report (or packet ring)
// if there was output. This will allow for more precise interval sum accounting.
@@ -1826,7 +2003,7 @@ int reporter_condprint_time_interval_report (struct ReporterData *data, struct R
// printf("***** nt %ld.%ld pt %ld.%ld pid=%lld empty=%d\n", stats->ts.nextTime.tv_sec, stats->ts.nextTime.tv_usec, packet->packetTime.tv_sec, packet->packetTime.tv_usec, packet->packetID, packet->emptyreport);
if (TimeDifference(stats->ts.nextTime, packet->packetTime) < 0) {
assert(data->transfer_protocol_handler!=NULL);
- advance_jobq = 1;
+ advance_jobq = true;
struct TransferInfo *sumstats = (data->GroupSumReport ? &data->GroupSumReport->info : NULL);
struct TransferInfo *fullduplexstats = (data->FullDuplexReport ? &data->FullDuplexReport->info : NULL);
stats->ts.packetTime = packet->packetTime;
@@ -1838,10 +2015,19 @@ int reporter_condprint_time_interval_report (struct ReporterData *data, struct R
if (fullduplexstats && ((++data->FullDuplexReport->threads) == 2) && isEnhanced(stats->common)) {
data->FullDuplexReport->threads = 0;
assert(data->FullDuplexReport->transfer_protocol_sum_handler != NULL);
- (*data->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, 0);
+ (*data->FullDuplexReport->transfer_protocol_sum_handler)(fullduplexstats, false);
}
if (sumstats) {
- if ((++data->GroupSumReport->threads) == data->GroupSumReport->reference.count) {
+ if (data->packetring->downlevel != sumstats->downlevel) {
+ sumstats->slot_thread_downcount++;
+ data->packetring->downlevel = toggleLevel(data->packetring->downlevel);
+#if HAVE_SUMMING_DEBUG
+ printf("**** %s downcnt (%p) pkt=%ld.%ld (up/down)=%d/%d final false level (sum/pkt)=%d/%d\n", stats->common->transferIDStr, (void *)data->packetring, \
+ (long) packet->packetTime.tv_sec, (long) packet->packetTime.tv_usec, sumstats->slot_thread_upcount, sumstats->slot_thread_downcount, \
+ sumstats->uplevel, data->packetring->uplevel);
+#endif
+ }
+ if ((sumstats->slot_thread_downcount) == sumstats->slot_thread_upcount) {
data->GroupSumReport->threads = 0;
if ((data->GroupSumReport->reference.count > (fullduplexstats ? 2 : 1)) || \
isSumOnly(data->info.common)) {
@@ -1849,9 +2035,12 @@ int reporter_condprint_time_interval_report (struct ReporterData *data, struct R
} else {
sumstats->isMaskOutput = true;
}
+#if HAVE_SUMMING_DEBUG
+ reporter_dump_timestamps(packet, stats, sumstats);
+#endif
reporter_set_timestamps_time(sumstats, INTERVAL);
assert(data->GroupSumReport->transfer_protocol_sum_handler != NULL);
- (*data->GroupSumReport->transfer_protocol_sum_handler)(sumstats, 0);
+ (*data->GroupSumReport->transfer_protocol_sum_handler)(sumstats, false);
}
}
// In the (hopefully unlikely event) the reporter fell behind
@@ -1863,9 +2052,9 @@ int reporter_condprint_time_interval_report (struct ReporterData *data, struct R
}
// Conditional print based on bursts or frames
-int reporter_condprint_frame_interval_report_server_udp (struct ReporterData *data, struct ReportStruct *packet) {
+bool reporter_condprint_frame_interval_report_server_udp (struct ReporterData *data, struct ReportStruct *packet) {
struct TransferInfo *stats = &data->info;
- int advance_jobq = 0;
+ bool advance_jobq = false;
// first packet of a burst and not a duplicate
if ((packet->burstsize == (packet->remaining + packet->packetLen)) && (stats->matchframeID != packet->frameID)) {
stats->matchframeID=packet->frameID;
@@ -1887,35 +2076,36 @@ int reporter_condprint_frame_interval_report_server_udp (struct ReporterData *da
if ((stats->output_handler) && !(stats->isMaskOutput))
(*stats->output_handler)(stats);
reporter_reset_transfer_stats_server_udp(stats);
- advance_jobq = 1;
+ advance_jobq = true;
}
return advance_jobq;
}
-int reporter_condprint_frame_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) {
+bool reporter_condprint_frame_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) {
fprintf(stderr, "FIX ME\n");
- return 1;
+ return true;
}
-int reporter_condprint_burst_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) {
+bool reporter_condprint_burst_interval_report_server_tcp (struct ReporterData *data, struct ReportStruct *packet) {
struct TransferInfo *stats = &data->info;
- int advance_jobq = 0;
+ int advance_jobq = false;
if (packet->transit_ready) {
stats->ts.prevpacketTime = packet->sentTime;
stats->ts.packetTime = packet->packetTime;
reporter_set_timestamps_time(stats, INTERVALPARTIAL);
stats->cntBytes = stats->total.Bytes.current - stats->total.Bytes.prev;
+ stats->sock_callstats.read.cntRead = stats->sock_callstats.read.ReadCnt.current - stats->sock_callstats.read.ReadCnt.prev;
if ((stats->output_handler) && !(stats->isMaskOutput))
(*stats->output_handler)(stats);
reporter_reset_transfer_stats_server_tcp(stats);
- advance_jobq = 1;
+ advance_jobq = true;
}
return advance_jobq;
}
-int reporter_condprint_burst_interval_report_client_tcp (struct ReporterData *data, struct ReportStruct *packet) {
+bool reporter_condprint_burst_interval_report_client_tcp (struct ReporterData *data, struct ReportStruct *packet) {
struct TransferInfo *stats = &data->info;
- int advance_jobq = 0;
+ int advance_jobq = false;
// first packet of a burst and not a duplicate
if (packet->transit_ready) {
reporter_handle_packet_oneway_transit(stats, packet);
@@ -1927,7 +2117,7 @@ int reporter_condprint_burst_interval_report_client_tcp (struct ReporterData *da
if ((stats->output_handler) && !(stats->isMaskOutput))
(*stats->output_handler)(stats);
reporter_reset_transfer_stats_client_tcp(stats);
- advance_jobq = 1;
+ advance_jobq = true;
}
return advance_jobq;
}