summaryrefslogtreecommitdiffstats
path: root/src/active_hosts.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/active_hosts.cpp')
-rw-r--r--src/active_hosts.cpp198
1 files changed, 129 insertions, 69 deletions
diff --git a/src/active_hosts.cpp b/src/active_hosts.cpp
index a3455ab..24358c3 100644
--- a/src/active_hosts.cpp
+++ b/src/active_hosts.cpp
@@ -61,10 +61,18 @@
* Global table with active hosts, their sum reports and active thread counts
*/
static struct Iperf_Table active_table;
-static bool Iperf_host_port_present (iperf_sockaddr *find);
static struct Iperf_ListEntry* Iperf_host_present (iperf_sockaddr *find);
+static struct Iperf_ListEntry* Iperf_flow_present (iperf_sockaddr *find);
#if HAVE_THREAD_DEBUG
+static void rcvfrom_peer_debug (thread_Settings *server, bool duplicate) {
+ char tmpaddr[200];
+ size_t len=200;
+ unsigned short port = SockAddr_getPort(&server->peer);
+ SockAddr_getHostAddress(&server->peer, tmpaddr, len);
+ thread_debug("rcvfrom peer: %s port %d dup=%s", tmpaddr, port, (duplicate ? "true" : "false"));
+}
+
static void active_table_show_entry(const char *action, Iperf_ListEntry *entry, int found) {
assert(action != NULL);
assert(entry != NULL);
@@ -72,9 +80,9 @@ static void active_table_show_entry(const char *action, Iperf_ListEntry *entry,
size_t len=200;
unsigned short port = SockAddr_getPort(&(entry->host));
SockAddr_getHostAddress(&(entry->host), tmpaddr, len);
- thread_debug("active table: %s %s port %d (flag=%d) rootp=%p entryp=%p totcnt/activecnt/hostcnt = %d/%d/%d", \
- action, tmpaddr, port, found, (void *) active_table.root, (void *) entry, active_table.total_count, \
- active_table.count, entry->thread_count);
+ thread_debug("active table: %s %s port %d (flag=%d) rootp=%p entryp=%p hostcnt/flowcnt/threadcnt = %d/%d/%d", \
+ action, tmpaddr, port, found, (void *) active_table.sum_root, (void *) entry->sumreport, active_table.sum_count, \
+ active_table.flow_count, entry->thread_count);
}
static void active_table_show_compare(const char *action, Iperf_ListEntry *entry, iperf_sockaddr *host, const char *type) {
assert(action != NULL);
@@ -92,79 +100,108 @@ static void active_table_show_compare(const char *action, Iperf_ListEntry *entry
void Iperf_initialize_active_table () {
Mutex_Initialize(&active_table.my_mutex);
- active_table.root = NULL;
+ active_table.flow_root = NULL;
+ active_table.sum_root = NULL;
active_table.groupid = 0;
+#if HAVE_THREAD_DEBUG
+ active_table.sum_count = 0;
+ active_table.flow_count = 0;
+#endif
}
/*
- * Add Entry add to the list or update thread count
+ * Add Entry add to the list or update thread count, return 0 on UDP tuple duplicate
*/
-static void active_table_update (iperf_sockaddr *host, struct thread_Settings *agent) {
- assert(host != NULL);
- assert(agent != NULL);
- Iperf_ListEntry *this_entry = Iperf_host_present(host);
- active_table.total_count++;
- if (this_entry == NULL) {
- this_entry = new Iperf_ListEntry();
- assert(this_entry != NULL);
- this_entry->host = *host;
- this_entry->next = active_table.root;
- this_entry->thread_count = 1;
- this_entry->socket = agent->mSock;
- active_table.count++;
- active_table.groupid++;
- active_table.root = this_entry;
- this_entry->sum_report = InitSumReport(agent, active_table.total_count, 0);
- IncrSumReportRefCounter(this_entry->sum_report);
- agent->mSumReport = this_entry->sum_report;
- this_entry->sum_report->info.common->transferID = -active_table.groupid; // sum ids are negative
+static inline struct Iperf_ListEntry *hostkey_insert (iperf_sockaddr *host) {
+ struct Iperf_ListEntry *this_key = new Iperf_ListEntry();
+ assert(this_key != NULL);
+ if (!this_key) {
+ fprintf(stderr, "Memory alloc failure in key insert\n");
+ exit(1);
+ }
+ this_key->next = active_table.sum_root;
+ active_table.sum_root = this_key;
+ this_key->host = *host;
+ this_key->thread_count = 0;
#if HAVE_THREAD_DEBUG
- active_table_show_entry("new entry", this_entry, ((SockAddr_are_Equal(&this_entry->host, host) && SockAddr_Hostare_Equal(&this_entry->host, host))));
+ active_table.sum_count++;
+ active_table_show_entry("new host entry", this_key, ((SockAddr_are_Equal(&this_key->host, host) && SockAddr_Hostare_Equal(&this_key->host, host))));
#endif
- } else {
- this_entry->thread_count++;
- agent->mSumReport = this_entry->sum_report;
- IncrSumReportRefCounter(this_entry->sum_report);
+ return this_key;
+}
+
+static inline struct Iperf_ListEntry *flowkey_insert (iperf_sockaddr *host) {
+ struct Iperf_ListEntry *this_key = new Iperf_ListEntry();
+ assert(this_key != NULL);
+ if (!this_key) {
+ fprintf(stderr, "Memory alloc failure in key insert\n");
+ exit(1);
+ }
+ this_key->next = active_table.flow_root;
+ active_table.flow_root = this_key;
+ this_key->host = *host;
#if HAVE_THREAD_DEBUG
- active_table_show_entry("incr entry", this_entry, 1);
+ active_table.flow_count++;
+// active_table_show_flow_entry("new flow entry", this_key, ((SockAddr_are_Equal(&this_key->host, host) && SockAddr_Hostare_Equal(&this_key->host, host))));
#endif
- }
+ return this_key;
}
static inline iperf_sockaddr *active_table_get_host_key (struct thread_Settings *agent) {
- iperf_sockaddr *key = (isSumServerDstIP(agent) ? &agent->local : &agent->peer);
+ iperf_sockaddr *key = ((isIncrDstIP(agent) || isSumServerDstIP(agent)) ? &agent->local : &agent->peer);
return key;
}
-// Thread access to store a host
-int Iperf_push_host (struct thread_Settings *agent) {
- iperf_sockaddr *host = active_table_get_host_key(agent);
- Mutex_Lock(&active_table.my_mutex);
- active_table_update(host, agent);
- int groupid = active_table.groupid;
- Mutex_Unlock(&active_table.my_mutex);
- return groupid;
+static bool Iperf_push_flow (iperf_sockaddr *host) {
+ bool rc;
+ if (Iperf_flow_present(host)) {
+ rc = false;
+ } else {
+ flowkey_insert(host);
+ rc = true;
+ }
+ return rc;
}
-// Used for UDP push of a new host, returns negative value if the host/port is already present
-// This is critical because UDP is connectionless and designed to be stateless
-int Iperf_push_host_port_conditional (struct thread_Settings *agent) {
- iperf_sockaddr *host = active_table_get_host_key(agent);
- int rc = -1;
+// Thread access to store a host
+bool Iperf_push_host (struct thread_Settings *agent) {
Mutex_Lock(&active_table.my_mutex);
- if (!Iperf_host_port_present(host)) {
- active_table_update(host, agent);
- rc = active_table.groupid;
+ if (isUDP(agent) && (agent->mThreadMode == kMode_Server)) {
+ if (!Iperf_push_flow(&agent->peer)) {
+ // this is a duplicate on UDP, should just ignore
+ Mutex_Unlock(&active_table.my_mutex);
+#if HAVE_THREAD_DEBUG
+ rcvfrom_peer_debug(agent, true);
+#endif
+ return false;
+ }
}
+ struct Iperf_ListEntry *this_host = Iperf_host_present(active_table_get_host_key(agent));
+ if (!this_host) {
+ this_host = hostkey_insert(active_table_get_host_key(agent));
+ active_table.groupid++;
+ this_host->sumreport = InitSumReport(agent, -active_table.groupid, false);
+ this_host->sumreport->info.common->transferID = -active_table.groupid;
+#if HAVE_THREAD_DEBUG
+ active_table_show_entry("new sum report", this_host , 0);
+#endif
+ }
+ agent->mSumReport = this_host->sumreport;
+ this_host->thread_count++;
+ IncrSumReportRefCounter(this_host->sumreport);
+ this_host->socket = agent->mSock;
+#if HAVE_THREAD_DEBUG
+ active_table_show_entry("bind sum report", this_host, 0);
+#endif
Mutex_Unlock(&active_table.my_mutex);
- return (rc);
+ return true;
}
/*
* Remove a host from the table
*/
void Iperf_remove_host (struct thread_Settings *agent) {
- iperf_sockaddr *del = active_table_get_host_key(agent);
+ iperf_sockaddr *del;
// remove_list_entry(entry) {
// indirect = &head;
// while ((*indirect) != entry) {
@@ -172,7 +209,25 @@ void Iperf_remove_host (struct thread_Settings *agent) {
// }
// *indirect = entry->next
Mutex_Lock(&active_table.my_mutex);
- Iperf_ListEntry **tmp = &active_table.root;
+ // Delete any flow entries first
+ if (isUDP(agent)) {
+ del = &agent->peer;
+ Iperf_ListEntry **tmp = &active_table.flow_root;
+ while ((*tmp) && !(SockAddr_are_Equal(&(*tmp)->host, del))) {
+ tmp = &(*tmp)->next;
+ }
+ if (*tmp) {
+ Iperf_ListEntry *remove = (*tmp);
+#if HAVE_THREAD_DEBUG
+ active_table.flow_count--;
+#endif
+ *tmp = remove->next;
+ delete remove;
+ }
+ }
+
+ del = active_table_get_host_key(agent);
+ Iperf_ListEntry **tmp = &active_table.sum_root;
while ((*tmp) && !(SockAddr_Hostare_Equal(&(*tmp)->host, del))) {
#if HAVE_THREAD_DEBUG
active_table_show_compare("miss", *tmp, del, "client ip");
@@ -182,16 +237,16 @@ void Iperf_remove_host (struct thread_Settings *agent) {
if (*tmp) {
if (--(*tmp)->thread_count == 0) {
Iperf_ListEntry *remove = (*tmp);
- active_table.count--;
agent->mSumReport = NULL;
#if HAVE_THREAD_DEBUG
+ active_table.sum_count--;
active_table_show_entry("delete", remove, 1);
#endif
*tmp = remove->next;
- FreeSumReport(remove->sum_report);
+ FreeSumReport(remove->sumreport);
delete remove;
} else {
- DecrSumReportRefCounter((*tmp)->sum_report);
+ DecrSumReportRefCounter((*tmp)->sumreport);
#if HAVE_THREAD_DEBUG
active_table_show_entry("decr", (*tmp), 1);
#endif
@@ -204,55 +259,60 @@ void Iperf_remove_host (struct thread_Settings *agent) {
* Destroy the table
*/
void Iperf_destroy_active_table () {
- Iperf_ListEntry *itr1 = active_table.root, *itr2;
+ Iperf_ListEntry *itr1 = active_table.sum_root, *itr2;
+ while (itr1 != NULL) {
+ itr2 = itr1->next;
+ delete itr1;
+ itr1 = itr2;
+ }
+ itr1 = active_table.flow_root;
while (itr1 != NULL) {
itr2 = itr1->next;
delete itr1;
itr1 = itr2;
}
Mutex_Destroy(&active_table.my_mutex);
- active_table.root = NULL;
- active_table.count = 0;
- active_table.total_count = 0;
+ active_table.sum_root = NULL;
+#if HAVE_THREAD_DEBUG
+ active_table.sum_count = 0;
+#endif
}
/*
* Check if the host and port are present in the active table
*/
-bool Iperf_host_port_present (iperf_sockaddr *find) {
- Iperf_ListEntry *itr = active_table.root;
- bool rc = false;
+struct Iperf_ListEntry* Iperf_flow_present (iperf_sockaddr *find) {
+ Iperf_ListEntry *itr = active_table.flow_root;
while (itr != NULL) {
if (SockAddr_are_Equal(&itr->host, find)) {
#if HAVE_THREAD_DEBUG
- active_table_show_compare("match", itr, find, "client ip/port");
+ active_table_show_compare("match host/port", itr, find, "client ip/port");
#endif
- rc = true;
break;
} else {
#if HAVE_THREAD_DEBUG
- active_table_show_compare("miss", itr, find, "client ip/port");
+ active_table_show_compare("miss host/port", itr, find, "client ip/port");
#endif
itr = itr->next;
}
}
- return rc;
+ return itr;
}
/*
* Check if the host is present in the active table
*/
static Iperf_ListEntry* Iperf_host_present (iperf_sockaddr *find) {
- Iperf_ListEntry *itr = active_table.root;
+ Iperf_ListEntry *itr = active_table.sum_root;
while (itr != NULL) {
if (SockAddr_Hostare_Equal(&itr->host, find)) {
#if HAVE_THREAD_DEBUG
- active_table_show_compare("match", itr, find, "client ip");
+ active_table_show_compare("match host", itr, find, "client ip");
#endif
break;
} else {
#if HAVE_THREAD_DEBUG
- active_table_show_compare("miss", itr, find, "client ip");
+ active_table_show_compare("miss host", itr, find, "client ip");
#endif
itr = itr->next;
}