summaryrefslogtreecommitdiffstats
path: root/src/Listener.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/Listener.cpp')
-rw-r--r--src/Listener.cpp522
1 files changed, 190 insertions, 332 deletions
diff --git a/src/Listener.cpp b/src/Listener.cpp
index a517cbc..c6ea969 100644
--- a/src/Listener.cpp
+++ b/src/Listener.cpp
@@ -1,16 +1,15 @@
/*---------------------------------------------------------------
- * Copyright (c) 1999,2000,2001,2002,2003
- * The Board of Trustees of the University of Illinois
- * All Rights Reserved.
- *---------------------------------------------------------------
+ * Copyright (c) 1999,2000,2001,2002,2003 The Board of Trustees of the
+ * University of Illinois All Rights Reserved.
+ * ---------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software (Iperf) and associated
* documentation files (the "Software"), to deal in the Software
- * without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute,
- * sublicense, and/or sell copies of the Software, and to permit
- * persons to whom the Software is furnished to do
- * so, subject to the following conditions:
+ * without restriction, including without limitation the rights to
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
*
*
* Redistributions of source code must retain the above
@@ -139,7 +138,8 @@ void Listener::Run () {
}
if (!isUDP(mSettings)) {
// TCP needs just one listen
- my_listen(); // This will set ListenSocket to a new sock fd
+ if (!my_listen()) // This will set ListenSocket to a new sock fd
+ return;
}
bool mMode_Time = isServerModeTime(mSettings) && !isDaemon(mSettings);
if (mMode_Time) {
@@ -150,7 +150,9 @@ void Listener::Run () {
mEndTime.add(mSettings->mListenerTimeout);
}
Timestamp now;
+ bool need_listen = true;
#define SINGLECLIENTDELAY_DURATION 50000 // units is microseconds
+
while (!sInterupted && mCount) {
#ifdef HAVE_THREAD_DEBUG
thread_debug("Listener main loop port %d ", mSettings->mPort);
@@ -163,8 +165,8 @@ void Listener::Run () {
break;
}
// Serialize in the event the -1 option or --singleclient is set
- int tc;
- if ((isSingleClient(mSettings) || isMulticast(mSettings)) && \
+ int tc = 0;
+ if ((isSingleClient(mSettings) || (isUDP(mSettings) && isMulticast(mSettings))) && \
mCount && (tc = (thread_numtrafficthreads()) > 0)) {
// Start with a delay in the event some traffic
// threads are pending to be scheduled and haven't
@@ -178,9 +180,14 @@ void Listener::Run () {
#endif
continue;
}
- if (isUDP(mSettings)) {
+ // This will set ListenSocket to a new sock fd
+ if (isUDP(mSettings) && need_listen) {
// UDP needs a new listen per every new socket
- my_listen(); // This will set ListenSocket to a new sock fd
+ if (!my_listen()) {
+ break;
+ } else {
+ need_listen = false;
+ }
}
// Use a select() with a timeout if -t is set or if this is a v1 -r or -d test
fd_set set;
@@ -195,12 +202,6 @@ void Listener::Run () {
timeout.tv_sec = static_cast<long>(mSettings->mListenerTimeout);
timeout.tv_usec = (static_cast<long>(mSettings->mListenerTimeout) * 1000000) % 1000000;
}
- if (isTxStartTime(mSettings)) {
- now.setnow();
- long adjsecs = (mSettings->txstart_epoch.tv_sec - now.getSecs());
- if (adjsecs > 0)
- timeout.tv_sec += adjsecs + 1;
- }
FD_ZERO(&set);
FD_SET(ListenSocket, &set);
if (!(select(ListenSocket + 1, &set, NULL, NULL, &timeout) > 0)) {
@@ -234,7 +235,7 @@ void Listener::Run () {
Settings_Destroy(server);
continue;
}
-
+ need_listen = true;
#ifdef HAVE_THREAD_DEBUG
thread_debug("Listener thread accepted server sock=%d transferID", server->mSock, server->mTransferID);
#endif
@@ -309,7 +310,7 @@ void Listener::Run () {
if (isUDP(server) && isCompat(mSettings)) {
setSeqNo64b(server);
}
- setTransferID(server, 0);
+ setTransferID(server, NORMAL);
if ((mSettings->mReportMode == kReport_CSV) && server->mSumReport && !server->mSumReport->sum_reverse_set) {
format_ips_port_string(&server->mSumReport->info, 1);
server->mSumReport->sum_reverse_set = true;
@@ -334,7 +335,7 @@ void Listener::Run () {
if (listener_client_settings) {
if (server->mMode != kTest_Normal)
listener_client_settings->mTransferID = 0;
- setTransferID(listener_client_settings, 1);
+ setTransferID(listener_client_settings, REVERSED);
if (isFullDuplex(listener_client_settings) || isReverse(listener_client_settings))
Iperf_push_host(listener_client_settings);
if (isFullDuplex(server)) {
@@ -344,9 +345,9 @@ void Listener::Run () {
// now that it's know to be full duplex. This wasn't known
// during accept()
SetSumHandlers(server, server->mSumReport);
- server->mSumReport->sum_fd_set = 1;
+ server->mSumReport->sum_fd_set = true;
}
- server->mFullDuplexReport = InitSumReport(server, server->mSock, 1);
+ server->mFullDuplexReport = InitSumReport(server, server->mSock, true);
listener_client_settings->mFullDuplexReport = server->mFullDuplexReport;
#if HAVE_THREAD_DEBUG
thread_debug("FullDuplex report client=%p/%p server=%p/%p", (void *) listener_client_settings, (void *) listener_client_settings->mFullDuplexReport, (void *) server, (void *) server->mFullDuplexReport);
@@ -368,7 +369,7 @@ void Listener::Run () {
}
}
}
- setTransferID(server, 0);
+ setTransferID(server, NORMAL);
if (isConnectionReport(server) && !isSumOnly(server)) {
struct ReportHeader *reporthdr = InitConnectionReport(server);
struct ConnectionInfo *cr = static_cast<struct ConnectionInfo *>(reporthdr->this_report);
@@ -393,7 +394,7 @@ void Listener::Run () {
* wildcard server address, specifying what incoming interface to
* accept connections on.
* ------------------------------------------------------------------- */
-void Listener::my_listen () {
+bool Listener::my_listen () {
int rc;
int type;
int domain;
@@ -414,20 +415,13 @@ void Listener::my_listen () {
// for the case of L2 testing and UDP, a new AF_PACKET
// will be created to supercede this one
type = (isUDP(mSettings) ? SOCK_DGRAM : SOCK_STREAM);
- domain = (SockAddr_isIPv6(&mSettings->local) ?
-#if HAVE_IPV6
- AF_INET6
-#else
- AF_INET
-#endif
- : AF_INET);
+ domain = SockAddr_getAFdomain(&mSettings->local);
#ifdef WIN32
- if (SockAddr_isMulticast(&mSettings->local)) {
+ if (SockAddr_isMulticast(&mSettings->multicast_group)) {
// Multicast on Win32 requires special handling
ListenSocket = WSASocket(domain, type, 0, 0, 0, WSA_FLAG_MULTIPOINT_C_LEAF | WSA_FLAG_MULTIPOINT_D_LEAF);
WARN_errno(ListenSocket == INVALID_SOCKET, "socket");
-
} else
#endif
{
@@ -436,31 +430,46 @@ void Listener::my_listen () {
}
mSettings->mSock = ListenSocket;
SetSocketOptions(mSettings);
- // reuse the address, so we can run if a former server was killed off
- int boolean = 1;
- Socklen_t len = sizeof(boolean);
- rc = setsockopt(ListenSocket, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&boolean), len);
- // bind socket to server address
+ SetSocketBindToDeviceIfNeeded(mSettings);
+ rc = SOCKET_ERROR;
+ if (isUDP(mSettings) && SockAddr_isMulticast(&mSettings->multicast_group)) {
+#if HAVE_MULTICAST
#ifdef WIN32
- if (SockAddr_isMulticast(&mSettings->local)) {
// Multicast on Win32 requires special handling
- rc = WSAJoinLeaf(ListenSocket, (sockaddr*) &mSettings->local, mSettings->size_local,0,0,0,0,JL_BOTH);
+ rc = WSAJoinLeaf(ListenSocket, reinterpret_cast<sockaddr*> (&mSettings->local), mSettings->size_local,0,0,0,0,JL_BOTH);
WARN_errno(rc == SOCKET_ERROR, "WSAJoinLeaf (aka bind)");
- } else
+#else
+#if 0 && (HAVE_DECL_IP_ADD_MEMBERSHIP || HAVE_DECL_MCAST_JOIN_GROUP) // possible future, bind to all including unicast
+ iperf_sockaddr tmp;
+ memcpy(&tmp, &mSettings->local, sizeof(tmp));
+ SockAddr_setAddressAny(&tmp); // the multicast join will take care of this
+ rc = bind(ListenSocket, reinterpret_cast<sockaddr*>(&tmp), mSettings->size_local);
+ printf("***** any bind\n");
+#else
+ rc = bind(ListenSocket, reinterpret_cast<sockaddr*> (&mSettings->local), mSettings->size_local);
+// printf("***** single bind\n");
#endif
- {
- rc = bind(ListenSocket, reinterpret_cast<sockaddr*>(&mSettings->local), mSettings->size_local);
- FAIL_errno(rc == SOCKET_ERROR, "listener bind", mSettings);
+ FAIL_errno(rc == SOCKET_ERROR, "listener bind", mSettings);
+ // if UDP and multicast, join the group
+ if (iperf_multicast_join(mSettings) != IPERF_MULTICAST_JOIN_SUCCESS) {
+ rc = SOCKET_ERROR;
}
+#endif
+#else
+ fprintf(stderr, "Multicast not supported");
+#endif // HAVE_MULTICAST
+ } else {
+ // bind socket for unicast
+ rc = bind(ListenSocket, reinterpret_cast<sockaddr*>(&mSettings->local), mSettings->size_local);
+ }
+ FAIL_errno(rc == SOCKET_ERROR, "listener bind", mSettings);
}
-
// update the reporter thread
- if (isReport(mSettings) && isSettingsReport(mSettings)) {
- struct ReportHeader *report_settings = InitSettingsReport(mSettings);
- assert(report_settings != NULL);
- // disable future settings reports, listener should only do it once
- unsetReport(mSettings);
- PostReport(report_settings);
+ if (isSettingsReport(mSettings)) {
+ struct ReportHeader *tmp = InitSettingsReport(mSettings);
+ setNoSettReport(mSettings);
+ assert(tmp!=NULL);
+ PostReport(tmp);
}
// listen for connections (TCP only).
@@ -472,253 +481,10 @@ void Listener::my_listen () {
rc = listen(ListenSocket, INT_MAX);
}
WARN_errno(rc == SOCKET_ERROR, "listen");
- } else {
-#ifndef WIN32
- // if UDP and multicast, join the group
- if (SockAddr_isMulticast(&mSettings->local)) {
-#ifdef HAVE_MULTICAST
- my_multicast_join();
-#else
- fprintf(stderr, "Multicast not supported");
-#endif // HAVE_MULTICAST
- }
-#endif
}
+ return true;
} // end my_listen()
-/* -------------------------------------------------------------------
- * Joins the multicast group or source and group (SSM S,G)
- *
- * taken from: https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.1.0/com.ibm.zos.v2r1.hale001/ipv6d0141001708.htm
- *
- * Multicast function IPv4 IPv6 Protocol-independent
- * ================== ==== ==== ====================
- * Level of specified option on setsockopt()/getsockopt() IPPROTO_IP IPPROTO_IPV6 IPPROTO_IP or IPPROTO_IPV6
- * Join a multicast group IP_ADD_MEMBERSHIP IPV6_JOIN_GROUP MCAST_JOIN_GROUP
- * Leave a multicast group or leave all sources of that
- * multicast group IP_DROP_MEMBERSHIP IPV6_LEAVE_GROUP MCAST_LEAVE_GROUP
- * Select outbound interface for sending multicast datagrams IP_MULTICAST_IF IPV6_MULTICAST_IF NA
- * Set maximum hop count IP_MULTICAST_TTL IPV6_MULTICAST_HOPS NA
- * Enable multicast loopback IP_MULTICAST_LOOP IPV6_MULTICAST_LOOP NA
- * Join a source multicast group IP_ADD_SOURCE_MEMBERSHIP NA MCAST_JOIN_SOURCE_GROUP
- * Leave a source multicast group IP_DROP_SOURCE_MEMBERSHIP NA MCAST_LEAVE_SOURCE_GROUP
- * Block data from a source to a multicast group IP_BLOCK_SOURCE NA MCAST_BLOCK_SOURCE
- * Unblock a previously blocked source for a multicast group IP_UNBLOCK_SOURCE NA MCAST_UNBLOCK_SOURCE
- *
- *
- * Reminder: The os will decide which version of IGMP or MLD to use. This may be controlled by system settings, e.g.:
- *
- * [rmcmahon@lvnvdb0987:~/Code/ssm/iperf2-code] $ sysctl -a | grep mld | grep force
- * net.ipv6.conf.all.force_mld_version = 0
- * net.ipv6.conf.default.force_mld_version = 0
- * net.ipv6.conf.lo.force_mld_version = 0
- * net.ipv6.conf.eth0.force_mld_version = 0
- *
- * [rmcmahon@lvnvdb0987:~/Code/ssm/iperf2-code] $ sysctl -a | grep igmp | grep force
- * net.ipv4.conf.all.force_igmp_version = 0
- * net.ipv4.conf.default.force_igmp_version = 0
- * net.ipv4.conf.lo.force_igmp_version = 0
- * net.ipv4.conf.eth0.force_igmp_version = 0
- *
- * ------------------------------------------------------------------- */
-void Listener::my_multicast_join () {
- // This is the older mulitcast join code. Both SSM and binding the
- // an interface requires the newer socket options. Using the older
- // code here will maintain compatiblity with previous iperf versions
- if (!isSSMMulticast(mSettings) && !mSettings->mIfrname) {
- if (!SockAddr_isIPv6(&mSettings->local)) {
- struct ip_mreq mreq;
- memcpy(&mreq.imr_multiaddr, SockAddr_get_in_addr(&mSettings->local), \
- sizeof(mreq.imr_multiaddr));
- mreq.imr_interface.s_addr = htonl(INADDR_ANY);
- int rc = setsockopt(ListenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP,
- reinterpret_cast<char*>(&mreq), sizeof(mreq));
- WARN_errno(rc == SOCKET_ERROR, "multicast join");
-#if HAVE_DECL_IP_MULTICAST_ALL
- int mc_all = 0;
- rc = setsockopt(ListenSocket, IPPROTO_IP, IP_MULTICAST_ALL, (void*) &mc_all, sizeof(mc_all));
- WARN_errno(rc == SOCKET_ERROR, "ip_multicast_all");
-#endif
- } else {
-#if (HAVE_IPV6 && HAVE_IPV6_MULTICAST && (HAVE_DECL_IPV6_JOIN_GROUP || HAVE_DECL_IPV6_ADD_MEMBERSHIP))
- struct ipv6_mreq mreq;
- memcpy(&mreq.ipv6mr_multiaddr, SockAddr_get_in6_addr(&mSettings->local), sizeof(mreq.ipv6mr_multiaddr));
- mreq.ipv6mr_interface = 0;
-#if HAVE_DECL_IPV6_JOIN_GROUP
- int rc = setsockopt(ListenSocket, IPPROTO_IPV6, IPV6_JOIN_GROUP, \
- reinterpret_cast<char*>(&mreq), sizeof(mreq));
-#else
- int rc = setsockopt(ListenSocket, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, \
- reinterpret_cast<char*>(&mreq), sizeof(mreq));
-#endif
- FAIL_errno(rc == SOCKET_ERROR, "multicast v6 join", mSettings);
-#else
- fprintf(stderr, "IPv6 multicast is not supported on this platform\n");
-#endif
- }
- } else {
- int rc;
-#ifdef HAVE_SSM_MULTICAST
- // Here it's either an SSM S,G multicast join or a *,G with an interface specifier
- // Use the newer socket options when these are specified
- socklen_t socklen = sizeof(struct sockaddr_storage);
- int iface=0;
-#ifdef HAVE_NET_IF_H
- /* Set the interface or any */
- if (mSettings->mIfrname) {
- iface = if_nametoindex(mSettings->mIfrname);
- FAIL_errno(!iface, "mcast if_nametoindex",mSettings);
- } else {
- iface = 0;
- }
-#endif
-
- if (isIPV6(mSettings)) {
-#if HAVE_IPV6_MULTICAST
- if (mSettings->mSSMMulticastStr) {
- struct group_source_req group_source_req;
- struct sockaddr_in6 *group;
- struct sockaddr_in6 *source;
-
- memset(&group_source_req, 0, sizeof(struct group_source_req));
-
- group_source_req.gsr_interface = iface;
- group=reinterpret_cast<struct sockaddr_in6*>(&group_source_req.gsr_group);
- source=reinterpret_cast<struct sockaddr_in6*>(&group_source_req.gsr_source);
- source->sin6_family = AF_INET6;
- group->sin6_family = AF_INET6;
- /* Set the group */
- rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen);
- FAIL_errno(rc == SOCKET_ERROR, "mcast join source group getsockname",mSettings);
- group->sin6_port = 0; /* Ignored */
-
- /* Set the source, apply the S,G */
- rc=inet_pton(AF_INET6, mSettings->mSSMMulticastStr,&source->sin6_addr);
- FAIL_errno(rc != 1, "mcast v6 join source group pton",mSettings);
- source->sin6_port = 0; /* Ignored */
-#ifdef HAVE_STRUCT_SOCKADDR_IN6_SIN6_LEN
- source->sin6_len = group->sin6_len;
-#endif
- rc = -1;
-#if HAVE_DECL_MCAST_JOIN_SOURCE_GROUP
- rc = setsockopt(ListenSocket,IPPROTO_IPV6,MCAST_JOIN_SOURCE_GROUP, reinterpret_cast<const char *>(&group_source_req),
- sizeof(group_source_req));
-#endif
- FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join source group",mSettings);
- } else {
- struct group_req group_req;
- struct sockaddr_in6 *group;
-
- memset(&group_req, 0, sizeof(struct group_req));
-
- group_req.gr_interface = iface;
- group=reinterpret_cast<struct sockaddr_in6*>(&group_req.gr_group);
- group->sin6_family = AF_INET6;
- /* Set the group */
- rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen);
- FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join group getsockname",mSettings);
- group->sin6_port = 0; /* Ignored */
- rc = -1;
-#if HAVE_DECL_MCAST_JOIN_GROUP
- rc = setsockopt(ListenSocket,IPPROTO_IPV6,MCAST_JOIN_GROUP, reinterpret_cast<const char *>(&group_req),
- sizeof(group_source_req));
-#endif
- FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join group",mSettings);
- }
-#else
- fprintf(stderr, "Unfortunately, IPv6 multicast is not supported on this platform\n");
-#endif
- } else {
- if (mSettings->mSSMMulticastStr) {
- struct sockaddr_in *group;
- struct sockaddr_in *source;
-
- // Fill out both structures because we don't which one will succeed
- // and both may need to be tried
-#ifdef HAVE_STRUCT_IP_MREQ_SOURCE
- struct ip_mreq_source imr;
- memset (&imr, 0, sizeof (imr));
-#endif
-#ifdef HAVE_STRUCT_GROUP_SOURCE_REQ
- struct group_source_req group_source_req;
- memset(&group_source_req, 0, sizeof(struct group_source_req));
- group_source_req.gsr_interface = iface;
- group=reinterpret_cast<struct sockaddr_in*>(&group_source_req.gsr_group);
- source=reinterpret_cast<struct sockaddr_in*>(&group_source_req.gsr_source);
-#else
- struct sockaddr_in imrgroup;
- struct sockaddr_in imrsource;
- group = &imrgroup;
- source = &imrsource;
-#endif
- source->sin_family = AF_INET;
- group->sin_family = AF_INET;
- /* Set the group */
- rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen);
- FAIL_errno(rc == SOCKET_ERROR, "mcast join source group getsockname",mSettings);
- group->sin_port = 0; /* Ignored */
-
- /* Set the source, apply the S,G */
- rc=inet_pton(AF_INET,mSettings->mSSMMulticastStr,&source->sin_addr);
- FAIL_errno(rc != 1, "mcast join source pton",mSettings);
-#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
- source->sin_len = group->sin_len;
-#endif
- source->sin_port = 0; /* Ignored */
- rc = -1;
-
-#if HAVE_DECL_MCAST_JOIN_SOURCE_GROUP
- rc = setsockopt(ListenSocket,IPPROTO_IP,MCAST_JOIN_SOURCE_GROUP, reinterpret_cast<const char *>(&group_source_req),
- sizeof(group_source_req));
-#endif
-
-#if HAVE_DECL_IP_ADD_SOURCE_MEMBERSHIP
-#ifdef HAVE_STRUCT_IP_MREQ_SOURCE
- // Some operating systems will have MCAST_JOIN_SOURCE_GROUP but still fail
- // In those cases try the IP_ADD_SOURCE_MEMBERSHIP
- if (rc < 0) {
-#ifdef HAVE_STRUCT_IP_MREQ_SOURCE_IMR_MULTIADDR_S_ADDR
- imr.imr_multiaddr = ((const struct sockaddr_in *)group)->sin_addr;
- imr.imr_sourceaddr = ((const struct sockaddr_in *)source)->sin_addr;
-#else
- // Some Android versions declare mreq_source without an s_addr
- imr.imr_multiaddr = ((const struct sockaddr_in *)group)->sin_addr.s_addr;
- imr.imr_sourceaddr = ((const struct sockaddr_in *)source)->sin_addr.s_addr;
-#endif
- rc = setsockopt (ListenSocket, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, reinterpret_cast<char*>(&imr), sizeof (imr));
- }
-#endif
-#endif
- FAIL_errno(rc == SOCKET_ERROR, "mcast join source group",mSettings);
- } else {
- struct group_req group_req;
- struct sockaddr_in *group;
-
- memset(&group_req, 0, sizeof(struct group_req));
-
- group_req.gr_interface = iface;
- group=reinterpret_cast<struct sockaddr_in*>(&group_req.gr_group);
- group->sin_family = AF_INET;
- /* Set the group */
- rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen);
- FAIL_errno(rc == SOCKET_ERROR, "mcast join group getsockname",mSettings);
- group->sin_port = 0; /* Ignored */
- rc = -1;
-#if HAVE_DECL_MCAST_JOIN_GROUP
- rc = setsockopt(ListenSocket,IPPROTO_IP,MCAST_JOIN_GROUP, reinterpret_cast<const char *>(&group_req),
- sizeof(group_source_req));
-#endif
- FAIL_errno(rc == SOCKET_ERROR, "mcast join group",mSettings);
- }
- }
-
-#else
- fprintf(stderr, "Unfortunately, SSM is not supported on this platform\n");
- exit(-1);
-#endif
- }
-}
-// end my_multicast_join()
bool Listener::L2_setup (thread_Settings *server, int sockfd) {
#if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET)
@@ -905,6 +671,7 @@ int Listener::udp_accept (thread_Settings *server) {
// most likely not a new client thread requiring a new server thread, but remnants of an
// old one that already ended. Hence, the Listener should ignore "first packets" when
// they have negative seq numbers.
+ RETRYREAD:
do {
packetID = 0;
nread = recvfrom(ListenSocket, server->mBuf, server->mBufLen, 0, \
@@ -925,22 +692,12 @@ int Listener::udp_accept (thread_Settings *server) {
Timestamp now;
server->accept_time.tv_sec = now.getSecs();
server->accept_time.tv_usec = now.getUsecs();
-#if HAVE_THREAD_DEBUG
- {
- char tmpaddr[200];
- size_t len=200;
- unsigned short port = SockAddr_getPort(&server->peer);
- SockAddr_getHostAddress(&server->peer, tmpaddr, len);
- thread_debug("rcvfrom peer: %s port %d len=%d", tmpaddr, port, nread);
- }
-#endif
- // Handle connection for UDP sockets
- int gid = Iperf_push_host_port_conditional(server);
-#if HAVE_THREAD_DEBUG
- if (gid < 0)
- thread_debug("rcvfrom peer: drop duplicate");
-#endif
- if (gid > 0) {
+ // Drop duplicates, may need to use a BPF drop for better performance
+ // or ebfs
+ if (!Iperf_push_host(server)) {
+ packetID = 0;
+ goto RETRYREAD;
+ } else {
int rc;
// We have a new UDP flow (based upon key of quintuple)
// so let's hand off this socket
@@ -953,11 +710,28 @@ int Listener::udp_accept (thread_Settings *server) {
// This connect() routing is only supported with AF_INET or AF_INET6 sockets,
// e.g. AF_PACKET sockets can't do this. We'll handle packet sockets later
// All UDP accepts here will use AF_INET. This is intentional and needed
- rc = connect(server->mSock, reinterpret_cast<struct sockaddr*>(&server->peer), server->size_peer);
- FAIL_errno(rc == SOCKET_ERROR, "connect UDP", mSettings);
- server->size_local = sizeof(iperf_sockaddr);
- getsockname(server->mSock, reinterpret_cast<sockaddr*>(&server->local), &server->size_local);
- SockAddr_Ifrname(server);
+ if (!isMulticast(server)) {
+ rc = connect(server->mSock, reinterpret_cast<struct sockaddr*>(&server->peer), server->size_peer);
+ FAIL_errno(rc == SOCKET_ERROR, "connect UDP", mSettings);
+ server->size_local = sizeof(iperf_sockaddr);
+ getsockname(server->mSock, reinterpret_cast<sockaddr*>(&server->local), &server->size_local);
+ SockAddr_Ifrname(server);
+ } else {
+ server->size_multicast_group = sizeof(iperf_sockaddr);
+ iperf_sockaddr sent_dstaddr;
+ getsockname(server->mSock, reinterpret_cast<sockaddr*>(&sent_dstaddr), &server->size_multicast_group);
+ int join_send_match = SockAddr_Hostare_Equal(&sent_dstaddr, &server->multicast_group);
+#if DEBUG_MCAST
+ char joinaddr[200];
+ char pktaddr[200];
+ size_t len=200;
+ SockAddr_getHostAddress(&sent_dstaddr, joinaddr, len);
+ SockAddr_getHostAddress(&server->multicast_group, pktaddr, len);
+ printf("mcast(%d): join addr %s pkt group addr %s\n", join_send_match, joinaddr, pktaddr);
+#endif
+ WARN(!join_send_match, "mcast join and packet group addr");
+ // RJM - use a cmesg to read the interface name
+ }
server->firstreadbytes = nread;
}
}
@@ -1046,8 +820,12 @@ bool Listener::apply_client_settings (thread_Settings *server) {
bool rc = false;
// Set the receive timeout for the very first read
- int sorcvtimer = TESTEXCHANGETIMEOUT; // 4 sec in usecs
- SetSocketOptionsReceiveTimeout(server, sorcvtimer);
+ if (mSettings->mListenerTimeout >= 0) {
+ SetSocketOptionsReceiveTimeout(server, (int) (mSettings->mListenerTimeout * 1000000));
+ } else {
+ int sorcvtimer = DEFAULT_TESTEXCHANGETIMEOUT; // in usecs
+ SetSocketOptionsReceiveTimeout(server, sorcvtimer);
+ }
server->peer_version_u = 0;
server->peer_version_l = 0;
server->mMode = kTest_Normal;
@@ -1111,26 +889,39 @@ bool Listener::apply_client_settings_udp (thread_Settings *server) {
#if HAVE_THREAD_DEBUG
thread_debug("UDP small header");
#endif
+ struct client_udpsmall_testhdr *smallhdr = reinterpret_cast<struct client_udpsmall_testhdr *>(server->mBuf + server->l4payloadoffset);
server->sent_time.tv_sec = ntohl(hdr->seqno_ts.tv_sec);
server->sent_time.tv_usec = ntohl(hdr->seqno_ts.tv_usec);
+ server->txstart_epoch.tv_sec = ntohl(smallhdr->start_tv_sec);
+ server->txstart_epoch.tv_usec = ntohl(smallhdr->start_tv_usec);
uint32_t seqno = ntohl(hdr->seqno_ts.id);
+ if (server->txstart_epoch.tv_sec > 0) {
+ setTxStartTime(server);
+ }
if (seqno != 1) {
fprintf(stderr, "WARN: first received packet (id=%d) was not first sent packet, report start time will be off\n", seqno);
}
Timestamp now;
if (!isTxStartTime(server) && ((abs(now.getSecs() - server->sent_time.tv_sec)) > (MAXDIFFTIMESTAMPSECS + 1))) {
- fprintf(stdout,"WARN: ignore --trip-times because client didn't provide valid start timestamp within %d seconds of now\n", MAXDIFFTIMESTAMPSECS);
+ fprintf(stdout,"WARN: ignore --trip-times because client didn't provide valid start timestamp within %d seconds of now (now=%ld send=%ld)\n", MAXDIFFTIMESTAMPSECS, now.getSecs(), server->sent_time.tv_sec);
unsetTripTime(server);
} else {
setTripTime(server);
}
setEnhanced(server);
} else if ((flags & HEADER_VERSION1) || (flags & HEADER_VERSION2) || (flags & HEADER_EXTEND)) {
- if ((flags & HEADER_VERSION1) && !(flags & HEADER_VERSION2)) {
- if (flags & RUN_NOW)
- server->mMode = kTest_DualTest;
- else
- server->mMode = kTest_TradeOff;
+ if (flags & HEADER_VERSION1) {
+ uint32_t tidthreads = ntohl(hdr->base.numThreads);
+ if (tidthreads & HEADER_HASTRANSFERID) {
+ tidthreads &= (~HEADER_HASTRANSFERID & HEADER_TRANSFERIDMASK);
+ server->mPeerTransferID = tidthreads >> HEADER_TRANSFERIDSHIFT;
+ setSyncTransferID(server);
+ } else if (!(flags & HEADER_VERSION2)) {
+ if (flags & RUN_NOW)
+ server->mMode = kTest_DualTest;
+ else
+ server->mMode = kTest_TradeOff;
+ }
}
if (flags & HEADER_EXTEND) {
upperflags = htons(hdr->extend.upperflags);
@@ -1234,6 +1025,18 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) {
}
readptr += nread;
server->mBounceBackBytes = ntohl(bbhdr->bbsize);
+ if (server->mBounceBackBytes > server->mBufLen) {
+ if (isBuflenSet(server)) {
+ WARN(1, "Buffer length (-l) too small for bounceback request. Increase -l size or don't set (for auto-adjust)");
+ rc = false;
+ goto DONE;
+ } else {
+ int read_offset = readptr - server->mBuf;
+ Settings_Grow_mBuf(server, server->mBounceBackBytes);
+ readptr = server->mBuf + read_offset;
+ bbhdr = reinterpret_cast<struct bounceback_hdr *>(server->mBuf);
+ }
+ }
server->mBounceBackHold = ntohl(bbhdr->bbhold);
uint16_t bbflags = ntohs(bbhdr->bbflags);
if (bbflags & HEADER_BBCLOCKSYNCED) {
@@ -1249,6 +1052,23 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) {
setTcpQuickAck(server);
}
#endif
+ if (bbflags & HEADER_BBREPLYSIZE) {
+ server->mBounceBackReplyBytes = ntohl(bbhdr->bbreplysize);
+ } else {
+ server->mBounceBackReplyBytes = server->mBounceBackBytes;
+ }
+ if (server->mBounceBackReplyBytes > server->mBufLen) {
+ if (isBuflenSet(server)) {
+ WARN(1, "Buffer length (-l) too small for bounceback reply. Increase -l size or don't set (for auto-adjust)");
+ rc = false;
+ goto DONE;
+ } else {
+ int read_offset = readptr - server->mBuf;
+ Settings_Grow_mBuf(server, server->mBounceBackReplyBytes);
+ readptr = server->mBuf + read_offset;
+ bbhdr = reinterpret_cast<struct bounceback_hdr *>(server->mBuf);
+ }
+ }
int remaining = server->mBounceBackBytes - (sizeof(struct bounceback_hdr) + sizeof(uint32_t));
if (remaining < 0) {
WARN(1, "bounce back bytes too small");
@@ -1267,9 +1087,15 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) {
bbhdr->bbserverRx_ts.usec = htonl(now.getUsecs());
} else {
uint16_t upperflags = 0;
+ uint16_t lowerflags = 0;
int readlen;
// figure out the length of the test header
if ((readlen = Settings_ClientTestHdrLen(flags, server)) > 0) {
+ if (readlen > (server->mBufLen - nread)) {
+ WARN(1, "read tcp header too large");
+ rc = false;
+ goto DONE;
+ }
// read the test settings passed to the server by the client
nread += recvn(server->mSock, readptr, (readlen - (int) sizeof(uint32_t)), 0);
FAIL_errno((nread < readlen), "read tcp test info", server);
@@ -1285,14 +1111,22 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) {
}
server->firstreadbytes = nread;
struct client_tcp_testhdr *hdr = reinterpret_cast<struct client_tcp_testhdr*>(server->mBuf);
- if ((flags & HEADER_VERSION1) && !(flags & HEADER_VERSION2)) {
- if (flags & RUN_NOW)
- server->mMode = kTest_DualTest;
- else
- server->mMode = kTest_TradeOff;
+ if (flags & HEADER_VERSION1) {
+ uint32_t tidthreads = ntohl(hdr->base.numThreads);
+ if (tidthreads & HEADER_HASTRANSFERID) {
+ tidthreads &= (~HEADER_HASTRANSFERID & HEADER_TRANSFERIDMASK);
+ server->mPeerTransferID = tidthreads >> HEADER_TRANSFERIDSHIFT;
+ setSyncTransferID(server);
+ } else if (!(flags & HEADER_VERSION2)) {
+ if (flags & RUN_NOW)
+ server->mMode = kTest_DualTest;
+ else
+ server->mMode = kTest_TradeOff;
+ }
}
if (flags & HEADER_EXTEND) {
upperflags = htons(hdr->extend.upperflags);
+ lowerflags = htons(hdr->extend.lowerflags);
server->mTOS = ntohs(hdr->extend.tos);
server->peer_version_u = ntohl(hdr->extend.version_u);
server->peer_version_l = ntohl(hdr->extend.version_l);
@@ -1335,6 +1169,30 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) {
server->mFPS = 1.0;
}
}
+ if ((lowerflags & HEADER_CCA) && !isCongestionControl(server)) {
+#if HAVE_DECL_TCP_CONGESTION
+ int ccalen = ntohs(hdr->cca.cca_length);
+ setCongestionControl(server);
+ setEnhanced(server);
+ server->mCongestion = new char[ccalen+1];
+ if (server->mCongestion) {
+ strncpy(server->mCongestion, hdr->cca.value, ccalen);
+ server->mCongestion[ccalen] = '\0';
+ Socklen_t len = strlen(server->mCongestion) + 1;
+ int rc = setsockopt(server->mSock, IPPROTO_TCP, TCP_CONGESTION,
+ server->mCongestion, len);
+ if (rc == SOCKET_ERROR) {
+ fprintf(stderr, "Attempt to set '%s' congestion control failed: %s\n",
+ server->mCongestion, strerror(errno));
+ unsetCongestionControl(server);
+ DELETE_ARRAY(server->mCongestion);
+ }
+ }
+#endif
+ }
+ if (lowerflags & HEADER_BARRIER_TIME) {
+ server->barrier_time = ntohl(hdr->extend.barrier_usecs);
+ }
if (flags & HEADER_VERSION2) {
if (upperflags & HEADER_FULLDUPLEX) {
setFullDuplex(server);