diff options
Diffstat (limited to 'src/active_hosts.cpp')
-rw-r--r-- | src/active_hosts.cpp | 198 |
1 files changed, 129 insertions, 69 deletions
diff --git a/src/active_hosts.cpp b/src/active_hosts.cpp index a3455ab..24358c3 100644 --- a/src/active_hosts.cpp +++ b/src/active_hosts.cpp @@ -61,10 +61,18 @@ * Global table with active hosts, their sum reports and active thread counts */ static struct Iperf_Table active_table; -static bool Iperf_host_port_present (iperf_sockaddr *find); static struct Iperf_ListEntry* Iperf_host_present (iperf_sockaddr *find); +static struct Iperf_ListEntry* Iperf_flow_present (iperf_sockaddr *find); #if HAVE_THREAD_DEBUG +static void rcvfrom_peer_debug (thread_Settings *server, bool duplicate) { + char tmpaddr[200]; + size_t len=200; + unsigned short port = SockAddr_getPort(&server->peer); + SockAddr_getHostAddress(&server->peer, tmpaddr, len); + thread_debug("rcvfrom peer: %s port %d dup=%s", tmpaddr, port, (duplicate ? "true" : "false")); +} + static void active_table_show_entry(const char *action, Iperf_ListEntry *entry, int found) { assert(action != NULL); assert(entry != NULL); @@ -72,9 +80,9 @@ static void active_table_show_entry(const char *action, Iperf_ListEntry *entry, size_t len=200; unsigned short port = SockAddr_getPort(&(entry->host)); SockAddr_getHostAddress(&(entry->host), tmpaddr, len); - thread_debug("active table: %s %s port %d (flag=%d) rootp=%p entryp=%p totcnt/activecnt/hostcnt = %d/%d/%d", \ - action, tmpaddr, port, found, (void *) active_table.root, (void *) entry, active_table.total_count, \ - active_table.count, entry->thread_count); + thread_debug("active table: %s %s port %d (flag=%d) rootp=%p entryp=%p hostcnt/flowcnt/threadcnt = %d/%d/%d", \ + action, tmpaddr, port, found, (void *) active_table.sum_root, (void *) entry->sumreport, active_table.sum_count, \ + active_table.flow_count, entry->thread_count); } static void active_table_show_compare(const char *action, Iperf_ListEntry *entry, iperf_sockaddr *host, const char *type) { assert(action != NULL); @@ -92,79 +100,108 @@ static void active_table_show_compare(const char *action, Iperf_ListEntry *entry void Iperf_initialize_active_table () { Mutex_Initialize(&active_table.my_mutex); - active_table.root = NULL; + active_table.flow_root = NULL; + active_table.sum_root = NULL; active_table.groupid = 0; +#if HAVE_THREAD_DEBUG + active_table.sum_count = 0; + active_table.flow_count = 0; +#endif } /* - * Add Entry add to the list or update thread count + * Add Entry add to the list or update thread count, return 0 on UDP tuple duplicate */ -static void active_table_update (iperf_sockaddr *host, struct thread_Settings *agent) { - assert(host != NULL); - assert(agent != NULL); - Iperf_ListEntry *this_entry = Iperf_host_present(host); - active_table.total_count++; - if (this_entry == NULL) { - this_entry = new Iperf_ListEntry(); - assert(this_entry != NULL); - this_entry->host = *host; - this_entry->next = active_table.root; - this_entry->thread_count = 1; - this_entry->socket = agent->mSock; - active_table.count++; - active_table.groupid++; - active_table.root = this_entry; - this_entry->sum_report = InitSumReport(agent, active_table.total_count, 0); - IncrSumReportRefCounter(this_entry->sum_report); - agent->mSumReport = this_entry->sum_report; - this_entry->sum_report->info.common->transferID = -active_table.groupid; // sum ids are negative +static inline struct Iperf_ListEntry *hostkey_insert (iperf_sockaddr *host) { + struct Iperf_ListEntry *this_key = new Iperf_ListEntry(); + assert(this_key != NULL); + if (!this_key) { + fprintf(stderr, "Memory alloc failure in key insert\n"); + exit(1); + } + this_key->next = active_table.sum_root; + active_table.sum_root = this_key; + this_key->host = *host; + this_key->thread_count = 0; #if HAVE_THREAD_DEBUG - active_table_show_entry("new entry", this_entry, ((SockAddr_are_Equal(&this_entry->host, host) && SockAddr_Hostare_Equal(&this_entry->host, host)))); + active_table.sum_count++; + active_table_show_entry("new host entry", this_key, ((SockAddr_are_Equal(&this_key->host, host) && SockAddr_Hostare_Equal(&this_key->host, host)))); #endif - } else { - this_entry->thread_count++; - agent->mSumReport = this_entry->sum_report; - IncrSumReportRefCounter(this_entry->sum_report); + return this_key; +} + +static inline struct Iperf_ListEntry *flowkey_insert (iperf_sockaddr *host) { + struct Iperf_ListEntry *this_key = new Iperf_ListEntry(); + assert(this_key != NULL); + if (!this_key) { + fprintf(stderr, "Memory alloc failure in key insert\n"); + exit(1); + } + this_key->next = active_table.flow_root; + active_table.flow_root = this_key; + this_key->host = *host; #if HAVE_THREAD_DEBUG - active_table_show_entry("incr entry", this_entry, 1); + active_table.flow_count++; +// active_table_show_flow_entry("new flow entry", this_key, ((SockAddr_are_Equal(&this_key->host, host) && SockAddr_Hostare_Equal(&this_key->host, host)))); #endif - } + return this_key; } static inline iperf_sockaddr *active_table_get_host_key (struct thread_Settings *agent) { - iperf_sockaddr *key = (isSumServerDstIP(agent) ? &agent->local : &agent->peer); + iperf_sockaddr *key = ((isIncrDstIP(agent) || isSumServerDstIP(agent)) ? &agent->local : &agent->peer); return key; } -// Thread access to store a host -int Iperf_push_host (struct thread_Settings *agent) { - iperf_sockaddr *host = active_table_get_host_key(agent); - Mutex_Lock(&active_table.my_mutex); - active_table_update(host, agent); - int groupid = active_table.groupid; - Mutex_Unlock(&active_table.my_mutex); - return groupid; +static bool Iperf_push_flow (iperf_sockaddr *host) { + bool rc; + if (Iperf_flow_present(host)) { + rc = false; + } else { + flowkey_insert(host); + rc = true; + } + return rc; } -// Used for UDP push of a new host, returns negative value if the host/port is already present -// This is critical because UDP is connectionless and designed to be stateless -int Iperf_push_host_port_conditional (struct thread_Settings *agent) { - iperf_sockaddr *host = active_table_get_host_key(agent); - int rc = -1; +// Thread access to store a host +bool Iperf_push_host (struct thread_Settings *agent) { Mutex_Lock(&active_table.my_mutex); - if (!Iperf_host_port_present(host)) { - active_table_update(host, agent); - rc = active_table.groupid; + if (isUDP(agent) && (agent->mThreadMode == kMode_Server)) { + if (!Iperf_push_flow(&agent->peer)) { + // this is a duplicate on UDP, should just ignore + Mutex_Unlock(&active_table.my_mutex); +#if HAVE_THREAD_DEBUG + rcvfrom_peer_debug(agent, true); +#endif + return false; + } } + struct Iperf_ListEntry *this_host = Iperf_host_present(active_table_get_host_key(agent)); + if (!this_host) { + this_host = hostkey_insert(active_table_get_host_key(agent)); + active_table.groupid++; + this_host->sumreport = InitSumReport(agent, -active_table.groupid, false); + this_host->sumreport->info.common->transferID = -active_table.groupid; +#if HAVE_THREAD_DEBUG + active_table_show_entry("new sum report", this_host , 0); +#endif + } + agent->mSumReport = this_host->sumreport; + this_host->thread_count++; + IncrSumReportRefCounter(this_host->sumreport); + this_host->socket = agent->mSock; +#if HAVE_THREAD_DEBUG + active_table_show_entry("bind sum report", this_host, 0); +#endif Mutex_Unlock(&active_table.my_mutex); - return (rc); + return true; } /* * Remove a host from the table */ void Iperf_remove_host (struct thread_Settings *agent) { - iperf_sockaddr *del = active_table_get_host_key(agent); + iperf_sockaddr *del; // remove_list_entry(entry) { // indirect = &head; // while ((*indirect) != entry) { @@ -172,7 +209,25 @@ void Iperf_remove_host (struct thread_Settings *agent) { // } // *indirect = entry->next Mutex_Lock(&active_table.my_mutex); - Iperf_ListEntry **tmp = &active_table.root; + // Delete any flow entries first + if (isUDP(agent)) { + del = &agent->peer; + Iperf_ListEntry **tmp = &active_table.flow_root; + while ((*tmp) && !(SockAddr_are_Equal(&(*tmp)->host, del))) { + tmp = &(*tmp)->next; + } + if (*tmp) { + Iperf_ListEntry *remove = (*tmp); +#if HAVE_THREAD_DEBUG + active_table.flow_count--; +#endif + *tmp = remove->next; + delete remove; + } + } + + del = active_table_get_host_key(agent); + Iperf_ListEntry **tmp = &active_table.sum_root; while ((*tmp) && !(SockAddr_Hostare_Equal(&(*tmp)->host, del))) { #if HAVE_THREAD_DEBUG active_table_show_compare("miss", *tmp, del, "client ip"); @@ -182,16 +237,16 @@ void Iperf_remove_host (struct thread_Settings *agent) { if (*tmp) { if (--(*tmp)->thread_count == 0) { Iperf_ListEntry *remove = (*tmp); - active_table.count--; agent->mSumReport = NULL; #if HAVE_THREAD_DEBUG + active_table.sum_count--; active_table_show_entry("delete", remove, 1); #endif *tmp = remove->next; - FreeSumReport(remove->sum_report); + FreeSumReport(remove->sumreport); delete remove; } else { - DecrSumReportRefCounter((*tmp)->sum_report); + DecrSumReportRefCounter((*tmp)->sumreport); #if HAVE_THREAD_DEBUG active_table_show_entry("decr", (*tmp), 1); #endif @@ -204,55 +259,60 @@ void Iperf_remove_host (struct thread_Settings *agent) { * Destroy the table */ void Iperf_destroy_active_table () { - Iperf_ListEntry *itr1 = active_table.root, *itr2; + Iperf_ListEntry *itr1 = active_table.sum_root, *itr2; + while (itr1 != NULL) { + itr2 = itr1->next; + delete itr1; + itr1 = itr2; + } + itr1 = active_table.flow_root; while (itr1 != NULL) { itr2 = itr1->next; delete itr1; itr1 = itr2; } Mutex_Destroy(&active_table.my_mutex); - active_table.root = NULL; - active_table.count = 0; - active_table.total_count = 0; + active_table.sum_root = NULL; +#if HAVE_THREAD_DEBUG + active_table.sum_count = 0; +#endif } /* * Check if the host and port are present in the active table */ -bool Iperf_host_port_present (iperf_sockaddr *find) { - Iperf_ListEntry *itr = active_table.root; - bool rc = false; +struct Iperf_ListEntry* Iperf_flow_present (iperf_sockaddr *find) { + Iperf_ListEntry *itr = active_table.flow_root; while (itr != NULL) { if (SockAddr_are_Equal(&itr->host, find)) { #if HAVE_THREAD_DEBUG - active_table_show_compare("match", itr, find, "client ip/port"); + active_table_show_compare("match host/port", itr, find, "client ip/port"); #endif - rc = true; break; } else { #if HAVE_THREAD_DEBUG - active_table_show_compare("miss", itr, find, "client ip/port"); + active_table_show_compare("miss host/port", itr, find, "client ip/port"); #endif itr = itr->next; } } - return rc; + return itr; } /* * Check if the host is present in the active table */ static Iperf_ListEntry* Iperf_host_present (iperf_sockaddr *find) { - Iperf_ListEntry *itr = active_table.root; + Iperf_ListEntry *itr = active_table.sum_root; while (itr != NULL) { if (SockAddr_Hostare_Equal(&itr->host, find)) { #if HAVE_THREAD_DEBUG - active_table_show_compare("match", itr, find, "client ip"); + active_table_show_compare("match host", itr, find, "client ip"); #endif break; } else { #if HAVE_THREAD_DEBUG - active_table_show_compare("miss", itr, find, "client ip"); + active_table_show_compare("miss host", itr, find, "client ip"); #endif itr = itr->next; } |