diff options
Diffstat (limited to 'src/Listener.cpp')
-rw-r--r-- | src/Listener.cpp | 522 |
1 files changed, 190 insertions, 332 deletions
diff --git a/src/Listener.cpp b/src/Listener.cpp index a517cbc..c6ea969 100644 --- a/src/Listener.cpp +++ b/src/Listener.cpp @@ -1,16 +1,15 @@ /*--------------------------------------------------------------- - * Copyright (c) 1999,2000,2001,2002,2003 - * The Board of Trustees of the University of Illinois - * All Rights Reserved. - *--------------------------------------------------------------- + * Copyright (c) 1999,2000,2001,2002,2003 The Board of Trustees of the + * University of Illinois All Rights Reserved. + * --------------------------------------------------------------- * Permission is hereby granted, free of charge, to any person * obtaining a copy of this software (Iperf) and associated * documentation files (the "Software"), to deal in the Software - * without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, - * sublicense, and/or sell copies of the Software, and to permit - * persons to whom the Software is furnished to do - * so, subject to the following conditions: + * without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: * * * Redistributions of source code must retain the above @@ -139,7 +138,8 @@ void Listener::Run () { } if (!isUDP(mSettings)) { // TCP needs just one listen - my_listen(); // This will set ListenSocket to a new sock fd + if (!my_listen()) // This will set ListenSocket to a new sock fd + return; } bool mMode_Time = isServerModeTime(mSettings) && !isDaemon(mSettings); if (mMode_Time) { @@ -150,7 +150,9 @@ void Listener::Run () { mEndTime.add(mSettings->mListenerTimeout); } Timestamp now; + bool need_listen = true; #define SINGLECLIENTDELAY_DURATION 50000 // units is microseconds + while (!sInterupted && mCount) { #ifdef HAVE_THREAD_DEBUG thread_debug("Listener main loop port %d ", mSettings->mPort); @@ -163,8 +165,8 @@ void Listener::Run () { break; } // Serialize in the event the -1 option or --singleclient is set - int tc; - if ((isSingleClient(mSettings) || isMulticast(mSettings)) && \ + int tc = 0; + if ((isSingleClient(mSettings) || (isUDP(mSettings) && isMulticast(mSettings))) && \ mCount && (tc = (thread_numtrafficthreads()) > 0)) { // Start with a delay in the event some traffic // threads are pending to be scheduled and haven't @@ -178,9 +180,14 @@ void Listener::Run () { #endif continue; } - if (isUDP(mSettings)) { + // This will set ListenSocket to a new sock fd + if (isUDP(mSettings) && need_listen) { // UDP needs a new listen per every new socket - my_listen(); // This will set ListenSocket to a new sock fd + if (!my_listen()) { + break; + } else { + need_listen = false; + } } // Use a select() with a timeout if -t is set or if this is a v1 -r or -d test fd_set set; @@ -195,12 +202,6 @@ void Listener::Run () { timeout.tv_sec = static_cast<long>(mSettings->mListenerTimeout); timeout.tv_usec = (static_cast<long>(mSettings->mListenerTimeout) * 1000000) % 1000000; } - if (isTxStartTime(mSettings)) { - now.setnow(); - long adjsecs = (mSettings->txstart_epoch.tv_sec - now.getSecs()); - if (adjsecs > 0) - timeout.tv_sec += adjsecs + 1; - } FD_ZERO(&set); FD_SET(ListenSocket, &set); if (!(select(ListenSocket + 1, &set, NULL, NULL, &timeout) > 0)) { @@ -234,7 +235,7 @@ void Listener::Run () { Settings_Destroy(server); continue; } - + need_listen = true; #ifdef HAVE_THREAD_DEBUG thread_debug("Listener thread accepted server sock=%d transferID", server->mSock, server->mTransferID); #endif @@ -309,7 +310,7 @@ void Listener::Run () { if (isUDP(server) && isCompat(mSettings)) { setSeqNo64b(server); } - setTransferID(server, 0); + setTransferID(server, NORMAL); if ((mSettings->mReportMode == kReport_CSV) && server->mSumReport && !server->mSumReport->sum_reverse_set) { format_ips_port_string(&server->mSumReport->info, 1); server->mSumReport->sum_reverse_set = true; @@ -334,7 +335,7 @@ void Listener::Run () { if (listener_client_settings) { if (server->mMode != kTest_Normal) listener_client_settings->mTransferID = 0; - setTransferID(listener_client_settings, 1); + setTransferID(listener_client_settings, REVERSED); if (isFullDuplex(listener_client_settings) || isReverse(listener_client_settings)) Iperf_push_host(listener_client_settings); if (isFullDuplex(server)) { @@ -344,9 +345,9 @@ void Listener::Run () { // now that it's know to be full duplex. This wasn't known // during accept() SetSumHandlers(server, server->mSumReport); - server->mSumReport->sum_fd_set = 1; + server->mSumReport->sum_fd_set = true; } - server->mFullDuplexReport = InitSumReport(server, server->mSock, 1); + server->mFullDuplexReport = InitSumReport(server, server->mSock, true); listener_client_settings->mFullDuplexReport = server->mFullDuplexReport; #if HAVE_THREAD_DEBUG thread_debug("FullDuplex report client=%p/%p server=%p/%p", (void *) listener_client_settings, (void *) listener_client_settings->mFullDuplexReport, (void *) server, (void *) server->mFullDuplexReport); @@ -368,7 +369,7 @@ void Listener::Run () { } } } - setTransferID(server, 0); + setTransferID(server, NORMAL); if (isConnectionReport(server) && !isSumOnly(server)) { struct ReportHeader *reporthdr = InitConnectionReport(server); struct ConnectionInfo *cr = static_cast<struct ConnectionInfo *>(reporthdr->this_report); @@ -393,7 +394,7 @@ void Listener::Run () { * wildcard server address, specifying what incoming interface to * accept connections on. * ------------------------------------------------------------------- */ -void Listener::my_listen () { +bool Listener::my_listen () { int rc; int type; int domain; @@ -414,20 +415,13 @@ void Listener::my_listen () { // for the case of L2 testing and UDP, a new AF_PACKET // will be created to supercede this one type = (isUDP(mSettings) ? SOCK_DGRAM : SOCK_STREAM); - domain = (SockAddr_isIPv6(&mSettings->local) ? -#if HAVE_IPV6 - AF_INET6 -#else - AF_INET -#endif - : AF_INET); + domain = SockAddr_getAFdomain(&mSettings->local); #ifdef WIN32 - if (SockAddr_isMulticast(&mSettings->local)) { + if (SockAddr_isMulticast(&mSettings->multicast_group)) { // Multicast on Win32 requires special handling ListenSocket = WSASocket(domain, type, 0, 0, 0, WSA_FLAG_MULTIPOINT_C_LEAF | WSA_FLAG_MULTIPOINT_D_LEAF); WARN_errno(ListenSocket == INVALID_SOCKET, "socket"); - } else #endif { @@ -436,31 +430,46 @@ void Listener::my_listen () { } mSettings->mSock = ListenSocket; SetSocketOptions(mSettings); - // reuse the address, so we can run if a former server was killed off - int boolean = 1; - Socklen_t len = sizeof(boolean); - rc = setsockopt(ListenSocket, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&boolean), len); - // bind socket to server address + SetSocketBindToDeviceIfNeeded(mSettings); + rc = SOCKET_ERROR; + if (isUDP(mSettings) && SockAddr_isMulticast(&mSettings->multicast_group)) { +#if HAVE_MULTICAST #ifdef WIN32 - if (SockAddr_isMulticast(&mSettings->local)) { // Multicast on Win32 requires special handling - rc = WSAJoinLeaf(ListenSocket, (sockaddr*) &mSettings->local, mSettings->size_local,0,0,0,0,JL_BOTH); + rc = WSAJoinLeaf(ListenSocket, reinterpret_cast<sockaddr*> (&mSettings->local), mSettings->size_local,0,0,0,0,JL_BOTH); WARN_errno(rc == SOCKET_ERROR, "WSAJoinLeaf (aka bind)"); - } else +#else +#if 0 && (HAVE_DECL_IP_ADD_MEMBERSHIP || HAVE_DECL_MCAST_JOIN_GROUP) // possible future, bind to all including unicast + iperf_sockaddr tmp; + memcpy(&tmp, &mSettings->local, sizeof(tmp)); + SockAddr_setAddressAny(&tmp); // the multicast join will take care of this + rc = bind(ListenSocket, reinterpret_cast<sockaddr*>(&tmp), mSettings->size_local); + printf("***** any bind\n"); +#else + rc = bind(ListenSocket, reinterpret_cast<sockaddr*> (&mSettings->local), mSettings->size_local); +// printf("***** single bind\n"); #endif - { - rc = bind(ListenSocket, reinterpret_cast<sockaddr*>(&mSettings->local), mSettings->size_local); - FAIL_errno(rc == SOCKET_ERROR, "listener bind", mSettings); + FAIL_errno(rc == SOCKET_ERROR, "listener bind", mSettings); + // if UDP and multicast, join the group + if (iperf_multicast_join(mSettings) != IPERF_MULTICAST_JOIN_SUCCESS) { + rc = SOCKET_ERROR; } +#endif +#else + fprintf(stderr, "Multicast not supported"); +#endif // HAVE_MULTICAST + } else { + // bind socket for unicast + rc = bind(ListenSocket, reinterpret_cast<sockaddr*>(&mSettings->local), mSettings->size_local); + } + FAIL_errno(rc == SOCKET_ERROR, "listener bind", mSettings); } - // update the reporter thread - if (isReport(mSettings) && isSettingsReport(mSettings)) { - struct ReportHeader *report_settings = InitSettingsReport(mSettings); - assert(report_settings != NULL); - // disable future settings reports, listener should only do it once - unsetReport(mSettings); - PostReport(report_settings); + if (isSettingsReport(mSettings)) { + struct ReportHeader *tmp = InitSettingsReport(mSettings); + setNoSettReport(mSettings); + assert(tmp!=NULL); + PostReport(tmp); } // listen for connections (TCP only). @@ -472,253 +481,10 @@ void Listener::my_listen () { rc = listen(ListenSocket, INT_MAX); } WARN_errno(rc == SOCKET_ERROR, "listen"); - } else { -#ifndef WIN32 - // if UDP and multicast, join the group - if (SockAddr_isMulticast(&mSettings->local)) { -#ifdef HAVE_MULTICAST - my_multicast_join(); -#else - fprintf(stderr, "Multicast not supported"); -#endif // HAVE_MULTICAST - } -#endif } + return true; } // end my_listen() -/* ------------------------------------------------------------------- - * Joins the multicast group or source and group (SSM S,G) - * - * taken from: https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.1.0/com.ibm.zos.v2r1.hale001/ipv6d0141001708.htm - * - * Multicast function IPv4 IPv6 Protocol-independent - * ================== ==== ==== ==================== - * Level of specified option on setsockopt()/getsockopt() IPPROTO_IP IPPROTO_IPV6 IPPROTO_IP or IPPROTO_IPV6 - * Join a multicast group IP_ADD_MEMBERSHIP IPV6_JOIN_GROUP MCAST_JOIN_GROUP - * Leave a multicast group or leave all sources of that - * multicast group IP_DROP_MEMBERSHIP IPV6_LEAVE_GROUP MCAST_LEAVE_GROUP - * Select outbound interface for sending multicast datagrams IP_MULTICAST_IF IPV6_MULTICAST_IF NA - * Set maximum hop count IP_MULTICAST_TTL IPV6_MULTICAST_HOPS NA - * Enable multicast loopback IP_MULTICAST_LOOP IPV6_MULTICAST_LOOP NA - * Join a source multicast group IP_ADD_SOURCE_MEMBERSHIP NA MCAST_JOIN_SOURCE_GROUP - * Leave a source multicast group IP_DROP_SOURCE_MEMBERSHIP NA MCAST_LEAVE_SOURCE_GROUP - * Block data from a source to a multicast group IP_BLOCK_SOURCE NA MCAST_BLOCK_SOURCE - * Unblock a previously blocked source for a multicast group IP_UNBLOCK_SOURCE NA MCAST_UNBLOCK_SOURCE - * - * - * Reminder: The os will decide which version of IGMP or MLD to use. This may be controlled by system settings, e.g.: - * - * [rmcmahon@lvnvdb0987:~/Code/ssm/iperf2-code] $ sysctl -a | grep mld | grep force - * net.ipv6.conf.all.force_mld_version = 0 - * net.ipv6.conf.default.force_mld_version = 0 - * net.ipv6.conf.lo.force_mld_version = 0 - * net.ipv6.conf.eth0.force_mld_version = 0 - * - * [rmcmahon@lvnvdb0987:~/Code/ssm/iperf2-code] $ sysctl -a | grep igmp | grep force - * net.ipv4.conf.all.force_igmp_version = 0 - * net.ipv4.conf.default.force_igmp_version = 0 - * net.ipv4.conf.lo.force_igmp_version = 0 - * net.ipv4.conf.eth0.force_igmp_version = 0 - * - * ------------------------------------------------------------------- */ -void Listener::my_multicast_join () { - // This is the older mulitcast join code. Both SSM and binding the - // an interface requires the newer socket options. Using the older - // code here will maintain compatiblity with previous iperf versions - if (!isSSMMulticast(mSettings) && !mSettings->mIfrname) { - if (!SockAddr_isIPv6(&mSettings->local)) { - struct ip_mreq mreq; - memcpy(&mreq.imr_multiaddr, SockAddr_get_in_addr(&mSettings->local), \ - sizeof(mreq.imr_multiaddr)); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); - int rc = setsockopt(ListenSocket, IPPROTO_IP, IP_ADD_MEMBERSHIP, - reinterpret_cast<char*>(&mreq), sizeof(mreq)); - WARN_errno(rc == SOCKET_ERROR, "multicast join"); -#if HAVE_DECL_IP_MULTICAST_ALL - int mc_all = 0; - rc = setsockopt(ListenSocket, IPPROTO_IP, IP_MULTICAST_ALL, (void*) &mc_all, sizeof(mc_all)); - WARN_errno(rc == SOCKET_ERROR, "ip_multicast_all"); -#endif - } else { -#if (HAVE_IPV6 && HAVE_IPV6_MULTICAST && (HAVE_DECL_IPV6_JOIN_GROUP || HAVE_DECL_IPV6_ADD_MEMBERSHIP)) - struct ipv6_mreq mreq; - memcpy(&mreq.ipv6mr_multiaddr, SockAddr_get_in6_addr(&mSettings->local), sizeof(mreq.ipv6mr_multiaddr)); - mreq.ipv6mr_interface = 0; -#if HAVE_DECL_IPV6_JOIN_GROUP - int rc = setsockopt(ListenSocket, IPPROTO_IPV6, IPV6_JOIN_GROUP, \ - reinterpret_cast<char*>(&mreq), sizeof(mreq)); -#else - int rc = setsockopt(ListenSocket, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, \ - reinterpret_cast<char*>(&mreq), sizeof(mreq)); -#endif - FAIL_errno(rc == SOCKET_ERROR, "multicast v6 join", mSettings); -#else - fprintf(stderr, "IPv6 multicast is not supported on this platform\n"); -#endif - } - } else { - int rc; -#ifdef HAVE_SSM_MULTICAST - // Here it's either an SSM S,G multicast join or a *,G with an interface specifier - // Use the newer socket options when these are specified - socklen_t socklen = sizeof(struct sockaddr_storage); - int iface=0; -#ifdef HAVE_NET_IF_H - /* Set the interface or any */ - if (mSettings->mIfrname) { - iface = if_nametoindex(mSettings->mIfrname); - FAIL_errno(!iface, "mcast if_nametoindex",mSettings); - } else { - iface = 0; - } -#endif - - if (isIPV6(mSettings)) { -#if HAVE_IPV6_MULTICAST - if (mSettings->mSSMMulticastStr) { - struct group_source_req group_source_req; - struct sockaddr_in6 *group; - struct sockaddr_in6 *source; - - memset(&group_source_req, 0, sizeof(struct group_source_req)); - - group_source_req.gsr_interface = iface; - group=reinterpret_cast<struct sockaddr_in6*>(&group_source_req.gsr_group); - source=reinterpret_cast<struct sockaddr_in6*>(&group_source_req.gsr_source); - source->sin6_family = AF_INET6; - group->sin6_family = AF_INET6; - /* Set the group */ - rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen); - FAIL_errno(rc == SOCKET_ERROR, "mcast join source group getsockname",mSettings); - group->sin6_port = 0; /* Ignored */ - - /* Set the source, apply the S,G */ - rc=inet_pton(AF_INET6, mSettings->mSSMMulticastStr,&source->sin6_addr); - FAIL_errno(rc != 1, "mcast v6 join source group pton",mSettings); - source->sin6_port = 0; /* Ignored */ -#ifdef HAVE_STRUCT_SOCKADDR_IN6_SIN6_LEN - source->sin6_len = group->sin6_len; -#endif - rc = -1; -#if HAVE_DECL_MCAST_JOIN_SOURCE_GROUP - rc = setsockopt(ListenSocket,IPPROTO_IPV6,MCAST_JOIN_SOURCE_GROUP, reinterpret_cast<const char *>(&group_source_req), - sizeof(group_source_req)); -#endif - FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join source group",mSettings); - } else { - struct group_req group_req; - struct sockaddr_in6 *group; - - memset(&group_req, 0, sizeof(struct group_req)); - - group_req.gr_interface = iface; - group=reinterpret_cast<struct sockaddr_in6*>(&group_req.gr_group); - group->sin6_family = AF_INET6; - /* Set the group */ - rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen); - FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join group getsockname",mSettings); - group->sin6_port = 0; /* Ignored */ - rc = -1; -#if HAVE_DECL_MCAST_JOIN_GROUP - rc = setsockopt(ListenSocket,IPPROTO_IPV6,MCAST_JOIN_GROUP, reinterpret_cast<const char *>(&group_req), - sizeof(group_source_req)); -#endif - FAIL_errno(rc == SOCKET_ERROR, "mcast v6 join group",mSettings); - } -#else - fprintf(stderr, "Unfortunately, IPv6 multicast is not supported on this platform\n"); -#endif - } else { - if (mSettings->mSSMMulticastStr) { - struct sockaddr_in *group; - struct sockaddr_in *source; - - // Fill out both structures because we don't which one will succeed - // and both may need to be tried -#ifdef HAVE_STRUCT_IP_MREQ_SOURCE - struct ip_mreq_source imr; - memset (&imr, 0, sizeof (imr)); -#endif -#ifdef HAVE_STRUCT_GROUP_SOURCE_REQ - struct group_source_req group_source_req; - memset(&group_source_req, 0, sizeof(struct group_source_req)); - group_source_req.gsr_interface = iface; - group=reinterpret_cast<struct sockaddr_in*>(&group_source_req.gsr_group); - source=reinterpret_cast<struct sockaddr_in*>(&group_source_req.gsr_source); -#else - struct sockaddr_in imrgroup; - struct sockaddr_in imrsource; - group = &imrgroup; - source = &imrsource; -#endif - source->sin_family = AF_INET; - group->sin_family = AF_INET; - /* Set the group */ - rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen); - FAIL_errno(rc == SOCKET_ERROR, "mcast join source group getsockname",mSettings); - group->sin_port = 0; /* Ignored */ - - /* Set the source, apply the S,G */ - rc=inet_pton(AF_INET,mSettings->mSSMMulticastStr,&source->sin_addr); - FAIL_errno(rc != 1, "mcast join source pton",mSettings); -#ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN - source->sin_len = group->sin_len; -#endif - source->sin_port = 0; /* Ignored */ - rc = -1; - -#if HAVE_DECL_MCAST_JOIN_SOURCE_GROUP - rc = setsockopt(ListenSocket,IPPROTO_IP,MCAST_JOIN_SOURCE_GROUP, reinterpret_cast<const char *>(&group_source_req), - sizeof(group_source_req)); -#endif - -#if HAVE_DECL_IP_ADD_SOURCE_MEMBERSHIP -#ifdef HAVE_STRUCT_IP_MREQ_SOURCE - // Some operating systems will have MCAST_JOIN_SOURCE_GROUP but still fail - // In those cases try the IP_ADD_SOURCE_MEMBERSHIP - if (rc < 0) { -#ifdef HAVE_STRUCT_IP_MREQ_SOURCE_IMR_MULTIADDR_S_ADDR - imr.imr_multiaddr = ((const struct sockaddr_in *)group)->sin_addr; - imr.imr_sourceaddr = ((const struct sockaddr_in *)source)->sin_addr; -#else - // Some Android versions declare mreq_source without an s_addr - imr.imr_multiaddr = ((const struct sockaddr_in *)group)->sin_addr.s_addr; - imr.imr_sourceaddr = ((const struct sockaddr_in *)source)->sin_addr.s_addr; -#endif - rc = setsockopt (ListenSocket, IPPROTO_IP, IP_ADD_SOURCE_MEMBERSHIP, reinterpret_cast<char*>(&imr), sizeof (imr)); - } -#endif -#endif - FAIL_errno(rc == SOCKET_ERROR, "mcast join source group",mSettings); - } else { - struct group_req group_req; - struct sockaddr_in *group; - - memset(&group_req, 0, sizeof(struct group_req)); - - group_req.gr_interface = iface; - group=reinterpret_cast<struct sockaddr_in*>(&group_req.gr_group); - group->sin_family = AF_INET; - /* Set the group */ - rc=getsockname(ListenSocket,reinterpret_cast<struct sockaddr *>(group), &socklen); - FAIL_errno(rc == SOCKET_ERROR, "mcast join group getsockname",mSettings); - group->sin_port = 0; /* Ignored */ - rc = -1; -#if HAVE_DECL_MCAST_JOIN_GROUP - rc = setsockopt(ListenSocket,IPPROTO_IP,MCAST_JOIN_GROUP, reinterpret_cast<const char *>(&group_req), - sizeof(group_source_req)); -#endif - FAIL_errno(rc == SOCKET_ERROR, "mcast join group",mSettings); - } - } - -#else - fprintf(stderr, "Unfortunately, SSM is not supported on this platform\n"); - exit(-1); -#endif - } -} -// end my_multicast_join() bool Listener::L2_setup (thread_Settings *server, int sockfd) { #if defined(HAVE_LINUX_FILTER_H) && defined(HAVE_AF_PACKET) @@ -905,6 +671,7 @@ int Listener::udp_accept (thread_Settings *server) { // most likely not a new client thread requiring a new server thread, but remnants of an // old one that already ended. Hence, the Listener should ignore "first packets" when // they have negative seq numbers. + RETRYREAD: do { packetID = 0; nread = recvfrom(ListenSocket, server->mBuf, server->mBufLen, 0, \ @@ -925,22 +692,12 @@ int Listener::udp_accept (thread_Settings *server) { Timestamp now; server->accept_time.tv_sec = now.getSecs(); server->accept_time.tv_usec = now.getUsecs(); -#if HAVE_THREAD_DEBUG - { - char tmpaddr[200]; - size_t len=200; - unsigned short port = SockAddr_getPort(&server->peer); - SockAddr_getHostAddress(&server->peer, tmpaddr, len); - thread_debug("rcvfrom peer: %s port %d len=%d", tmpaddr, port, nread); - } -#endif - // Handle connection for UDP sockets - int gid = Iperf_push_host_port_conditional(server); -#if HAVE_THREAD_DEBUG - if (gid < 0) - thread_debug("rcvfrom peer: drop duplicate"); -#endif - if (gid > 0) { + // Drop duplicates, may need to use a BPF drop for better performance + // or ebfs + if (!Iperf_push_host(server)) { + packetID = 0; + goto RETRYREAD; + } else { int rc; // We have a new UDP flow (based upon key of quintuple) // so let's hand off this socket @@ -953,11 +710,28 @@ int Listener::udp_accept (thread_Settings *server) { // This connect() routing is only supported with AF_INET or AF_INET6 sockets, // e.g. AF_PACKET sockets can't do this. We'll handle packet sockets later // All UDP accepts here will use AF_INET. This is intentional and needed - rc = connect(server->mSock, reinterpret_cast<struct sockaddr*>(&server->peer), server->size_peer); - FAIL_errno(rc == SOCKET_ERROR, "connect UDP", mSettings); - server->size_local = sizeof(iperf_sockaddr); - getsockname(server->mSock, reinterpret_cast<sockaddr*>(&server->local), &server->size_local); - SockAddr_Ifrname(server); + if (!isMulticast(server)) { + rc = connect(server->mSock, reinterpret_cast<struct sockaddr*>(&server->peer), server->size_peer); + FAIL_errno(rc == SOCKET_ERROR, "connect UDP", mSettings); + server->size_local = sizeof(iperf_sockaddr); + getsockname(server->mSock, reinterpret_cast<sockaddr*>(&server->local), &server->size_local); + SockAddr_Ifrname(server); + } else { + server->size_multicast_group = sizeof(iperf_sockaddr); + iperf_sockaddr sent_dstaddr; + getsockname(server->mSock, reinterpret_cast<sockaddr*>(&sent_dstaddr), &server->size_multicast_group); + int join_send_match = SockAddr_Hostare_Equal(&sent_dstaddr, &server->multicast_group); +#if DEBUG_MCAST + char joinaddr[200]; + char pktaddr[200]; + size_t len=200; + SockAddr_getHostAddress(&sent_dstaddr, joinaddr, len); + SockAddr_getHostAddress(&server->multicast_group, pktaddr, len); + printf("mcast(%d): join addr %s pkt group addr %s\n", join_send_match, joinaddr, pktaddr); +#endif + WARN(!join_send_match, "mcast join and packet group addr"); + // RJM - use a cmesg to read the interface name + } server->firstreadbytes = nread; } } @@ -1046,8 +820,12 @@ bool Listener::apply_client_settings (thread_Settings *server) { bool rc = false; // Set the receive timeout for the very first read - int sorcvtimer = TESTEXCHANGETIMEOUT; // 4 sec in usecs - SetSocketOptionsReceiveTimeout(server, sorcvtimer); + if (mSettings->mListenerTimeout >= 0) { + SetSocketOptionsReceiveTimeout(server, (int) (mSettings->mListenerTimeout * 1000000)); + } else { + int sorcvtimer = DEFAULT_TESTEXCHANGETIMEOUT; // in usecs + SetSocketOptionsReceiveTimeout(server, sorcvtimer); + } server->peer_version_u = 0; server->peer_version_l = 0; server->mMode = kTest_Normal; @@ -1111,26 +889,39 @@ bool Listener::apply_client_settings_udp (thread_Settings *server) { #if HAVE_THREAD_DEBUG thread_debug("UDP small header"); #endif + struct client_udpsmall_testhdr *smallhdr = reinterpret_cast<struct client_udpsmall_testhdr *>(server->mBuf + server->l4payloadoffset); server->sent_time.tv_sec = ntohl(hdr->seqno_ts.tv_sec); server->sent_time.tv_usec = ntohl(hdr->seqno_ts.tv_usec); + server->txstart_epoch.tv_sec = ntohl(smallhdr->start_tv_sec); + server->txstart_epoch.tv_usec = ntohl(smallhdr->start_tv_usec); uint32_t seqno = ntohl(hdr->seqno_ts.id); + if (server->txstart_epoch.tv_sec > 0) { + setTxStartTime(server); + } if (seqno != 1) { fprintf(stderr, "WARN: first received packet (id=%d) was not first sent packet, report start time will be off\n", seqno); } Timestamp now; if (!isTxStartTime(server) && ((abs(now.getSecs() - server->sent_time.tv_sec)) > (MAXDIFFTIMESTAMPSECS + 1))) { - fprintf(stdout,"WARN: ignore --trip-times because client didn't provide valid start timestamp within %d seconds of now\n", MAXDIFFTIMESTAMPSECS); + fprintf(stdout,"WARN: ignore --trip-times because client didn't provide valid start timestamp within %d seconds of now (now=%ld send=%ld)\n", MAXDIFFTIMESTAMPSECS, now.getSecs(), server->sent_time.tv_sec); unsetTripTime(server); } else { setTripTime(server); } setEnhanced(server); } else if ((flags & HEADER_VERSION1) || (flags & HEADER_VERSION2) || (flags & HEADER_EXTEND)) { - if ((flags & HEADER_VERSION1) && !(flags & HEADER_VERSION2)) { - if (flags & RUN_NOW) - server->mMode = kTest_DualTest; - else - server->mMode = kTest_TradeOff; + if (flags & HEADER_VERSION1) { + uint32_t tidthreads = ntohl(hdr->base.numThreads); + if (tidthreads & HEADER_HASTRANSFERID) { + tidthreads &= (~HEADER_HASTRANSFERID & HEADER_TRANSFERIDMASK); + server->mPeerTransferID = tidthreads >> HEADER_TRANSFERIDSHIFT; + setSyncTransferID(server); + } else if (!(flags & HEADER_VERSION2)) { + if (flags & RUN_NOW) + server->mMode = kTest_DualTest; + else + server->mMode = kTest_TradeOff; + } } if (flags & HEADER_EXTEND) { upperflags = htons(hdr->extend.upperflags); @@ -1234,6 +1025,18 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) { } readptr += nread; server->mBounceBackBytes = ntohl(bbhdr->bbsize); + if (server->mBounceBackBytes > server->mBufLen) { + if (isBuflenSet(server)) { + WARN(1, "Buffer length (-l) too small for bounceback request. Increase -l size or don't set (for auto-adjust)"); + rc = false; + goto DONE; + } else { + int read_offset = readptr - server->mBuf; + Settings_Grow_mBuf(server, server->mBounceBackBytes); + readptr = server->mBuf + read_offset; + bbhdr = reinterpret_cast<struct bounceback_hdr *>(server->mBuf); + } + } server->mBounceBackHold = ntohl(bbhdr->bbhold); uint16_t bbflags = ntohs(bbhdr->bbflags); if (bbflags & HEADER_BBCLOCKSYNCED) { @@ -1249,6 +1052,23 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) { setTcpQuickAck(server); } #endif + if (bbflags & HEADER_BBREPLYSIZE) { + server->mBounceBackReplyBytes = ntohl(bbhdr->bbreplysize); + } else { + server->mBounceBackReplyBytes = server->mBounceBackBytes; + } + if (server->mBounceBackReplyBytes > server->mBufLen) { + if (isBuflenSet(server)) { + WARN(1, "Buffer length (-l) too small for bounceback reply. Increase -l size or don't set (for auto-adjust)"); + rc = false; + goto DONE; + } else { + int read_offset = readptr - server->mBuf; + Settings_Grow_mBuf(server, server->mBounceBackReplyBytes); + readptr = server->mBuf + read_offset; + bbhdr = reinterpret_cast<struct bounceback_hdr *>(server->mBuf); + } + } int remaining = server->mBounceBackBytes - (sizeof(struct bounceback_hdr) + sizeof(uint32_t)); if (remaining < 0) { WARN(1, "bounce back bytes too small"); @@ -1267,9 +1087,15 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) { bbhdr->bbserverRx_ts.usec = htonl(now.getUsecs()); } else { uint16_t upperflags = 0; + uint16_t lowerflags = 0; int readlen; // figure out the length of the test header if ((readlen = Settings_ClientTestHdrLen(flags, server)) > 0) { + if (readlen > (server->mBufLen - nread)) { + WARN(1, "read tcp header too large"); + rc = false; + goto DONE; + } // read the test settings passed to the server by the client nread += recvn(server->mSock, readptr, (readlen - (int) sizeof(uint32_t)), 0); FAIL_errno((nread < readlen), "read tcp test info", server); @@ -1285,14 +1111,22 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) { } server->firstreadbytes = nread; struct client_tcp_testhdr *hdr = reinterpret_cast<struct client_tcp_testhdr*>(server->mBuf); - if ((flags & HEADER_VERSION1) && !(flags & HEADER_VERSION2)) { - if (flags & RUN_NOW) - server->mMode = kTest_DualTest; - else - server->mMode = kTest_TradeOff; + if (flags & HEADER_VERSION1) { + uint32_t tidthreads = ntohl(hdr->base.numThreads); + if (tidthreads & HEADER_HASTRANSFERID) { + tidthreads &= (~HEADER_HASTRANSFERID & HEADER_TRANSFERIDMASK); + server->mPeerTransferID = tidthreads >> HEADER_TRANSFERIDSHIFT; + setSyncTransferID(server); + } else if (!(flags & HEADER_VERSION2)) { + if (flags & RUN_NOW) + server->mMode = kTest_DualTest; + else + server->mMode = kTest_TradeOff; + } } if (flags & HEADER_EXTEND) { upperflags = htons(hdr->extend.upperflags); + lowerflags = htons(hdr->extend.lowerflags); server->mTOS = ntohs(hdr->extend.tos); server->peer_version_u = ntohl(hdr->extend.version_u); server->peer_version_l = ntohl(hdr->extend.version_l); @@ -1335,6 +1169,30 @@ bool Listener::apply_client_settings_tcp (thread_Settings *server) { server->mFPS = 1.0; } } + if ((lowerflags & HEADER_CCA) && !isCongestionControl(server)) { +#if HAVE_DECL_TCP_CONGESTION + int ccalen = ntohs(hdr->cca.cca_length); + setCongestionControl(server); + setEnhanced(server); + server->mCongestion = new char[ccalen+1]; + if (server->mCongestion) { + strncpy(server->mCongestion, hdr->cca.value, ccalen); + server->mCongestion[ccalen] = '\0'; + Socklen_t len = strlen(server->mCongestion) + 1; + int rc = setsockopt(server->mSock, IPPROTO_TCP, TCP_CONGESTION, + server->mCongestion, len); + if (rc == SOCKET_ERROR) { + fprintf(stderr, "Attempt to set '%s' congestion control failed: %s\n", + server->mCongestion, strerror(errno)); + unsetCongestionControl(server); + DELETE_ARRAY(server->mCongestion); + } + } +#endif + } + if (lowerflags & HEADER_BARRIER_TIME) { + server->barrier_time = ntohl(hdr->extend.barrier_usecs); + } if (flags & HEADER_VERSION2) { if (upperflags & HEADER_FULLDUPLEX) { setFullDuplex(server); |