diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:25:50 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-30 03:25:50 +0000 |
commit | 11ea4fcf515dbc4f75be538b784635085dc10db2 (patch) | |
tree | af05377dc5f1495935a0aa3b43258c20cb8fb5b9 /flows | |
parent | Releasing progress-linux version 2.1.9+dfsg-1~progress7.99u1. (diff) | |
download | iperf-11ea4fcf515dbc4f75be538b784635085dc10db2.tar.xz iperf-11ea4fcf515dbc4f75be538b784635085dc10db2.zip |
Merging upstream version 2.2.0+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | flows/Makefile.am | 1 | ||||
-rw-r--r-- | flows/Makefile.in | 430 | ||||
-rw-r--r-- | flows/Readme.txt | 36 | ||||
-rw-r--r-- | flows/flows.py | 1300 | ||||
-rwxr-xr-x | flows/ssh_nodes.py | 499 |
5 files changed, 2266 insertions, 0 deletions
diff --git a/flows/Makefile.am b/flows/Makefile.am new file mode 100644 index 0000000..053782b --- /dev/null +++ b/flows/Makefile.am @@ -0,0 +1 @@ +EXTRA_DIST = flows.py ssh_nodes.py Readme.txt diff --git a/flows/Makefile.in b/flows/Makefile.in new file mode 100644 index 0000000..1324a26 --- /dev/null +++ b/flows/Makefile.in @@ -0,0 +1,430 @@ +# Makefile.in generated by automake 1.16.5 from Makefile.am. +# @configure_input@ + +# Copyright (C) 1994-2021 Free Software Foundation, Inc. + +# This Makefile.in is free software; the Free Software Foundation +# gives unlimited permission to copy and/or distribute it, +# with or without modifications, as long as this notice is preserved. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY, to the extent permitted by law; without +# even the implied warranty of MERCHANTABILITY or FITNESS FOR A +# PARTICULAR PURPOSE. + +@SET_MAKE@ +VPATH = @srcdir@ +am__is_gnu_make = { \ + if test -z '$(MAKELEVEL)'; then \ + false; \ + elif test -n '$(MAKE_HOST)'; then \ + true; \ + elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \ + true; \ + else \ + false; \ + fi; \ +} +am__make_running_with_option = \ + case $${target_option-} in \ + ?) ;; \ + *) echo "am__make_running_with_option: internal error: invalid" \ + "target option '$${target_option-}' specified" >&2; \ + exit 1;; \ + esac; \ + has_opt=no; \ + sane_makeflags=$$MAKEFLAGS; \ + if $(am__is_gnu_make); then \ + sane_makeflags=$$MFLAGS; \ + else \ + case $$MAKEFLAGS in \ + *\\[\ \ ]*) \ + bs=\\; \ + sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \ + | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \ + esac; \ + fi; \ + skip_next=no; \ + strip_trailopt () \ + { \ + flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \ + }; \ + for flg in $$sane_makeflags; do \ + test $$skip_next = yes && { skip_next=no; continue; }; \ + case $$flg in \ + *=*|--*) continue;; \ + -*I) strip_trailopt 'I'; skip_next=yes;; \ + -*I?*) strip_trailopt 'I';; \ + -*O) strip_trailopt 'O'; skip_next=yes;; \ + -*O?*) strip_trailopt 'O';; \ + -*l) strip_trailopt 'l'; skip_next=yes;; \ + -*l?*) strip_trailopt 'l';; \ + -[dEDm]) skip_next=yes;; \ + -[JT]) skip_next=yes;; \ + esac; \ + case $$flg in \ + *$$target_option*) has_opt=yes; break;; \ + esac; \ + done; \ + test $$has_opt = yes +am__make_dryrun = (target_option=n; $(am__make_running_with_option)) +am__make_keepgoing = (target_option=k; $(am__make_running_with_option)) +pkgdatadir = $(datadir)/@PACKAGE@ +pkgincludedir = $(includedir)/@PACKAGE@ +pkglibdir = $(libdir)/@PACKAGE@ +pkglibexecdir = $(libexecdir)/@PACKAGE@ +am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd +install_sh_DATA = $(install_sh) -c -m 644 +install_sh_PROGRAM = $(install_sh) -c +install_sh_SCRIPT = $(install_sh) -c +INSTALL_HEADER = $(INSTALL_DATA) +transform = $(program_transform_name) +NORMAL_INSTALL = : +PRE_INSTALL = : +POST_INSTALL = : +NORMAL_UNINSTALL = : +PRE_UNINSTALL = : +POST_UNINSTALL = : +build_triplet = @build@ +host_triplet = @host@ +subdir = flows +ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 +am__aclocal_m4_deps = $(top_srcdir)/m4/ax_create_stdint_h.m4 \ + $(top_srcdir)/m4/dast.m4 $(top_srcdir)/m4/ax_pthread.m4 \ + $(top_srcdir)/configure.ac +am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \ + $(ACLOCAL_M4) +DIST_COMMON = $(srcdir)/Makefile.am $(am__DIST_COMMON) +mkinstalldirs = $(install_sh) -d +CONFIG_HEADER = $(top_builddir)/config.h +CONFIG_CLEAN_FILES = +CONFIG_CLEAN_VPATH_FILES = +AM_V_P = $(am__v_P_@AM_V@) +am__v_P_ = $(am__v_P_@AM_DEFAULT_V@) +am__v_P_0 = false +am__v_P_1 = : +AM_V_GEN = $(am__v_GEN_@AM_V@) +am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@) +am__v_GEN_0 = @echo " GEN " $@; +am__v_GEN_1 = +AM_V_at = $(am__v_at_@AM_V@) +am__v_at_ = $(am__v_at_@AM_DEFAULT_V@) +am__v_at_0 = @ +am__v_at_1 = +SOURCES = +DIST_SOURCES = +am__can_run_installinfo = \ + case $$AM_UPDATE_INFO_DIR in \ + n|no|NO) false;; \ + *) (install-info --version) >/dev/null 2>&1;; \ + esac +am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP) +am__DIST_COMMON = $(srcdir)/Makefile.in +DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST) +ACLOCAL = @ACLOCAL@ +AMTAR = @AMTAR@ +AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@ +AUTOCONF = @AUTOCONF@ +AUTOHEADER = @AUTOHEADER@ +AUTOMAKE = @AUTOMAKE@ +AWK = @AWK@ +CC = @CC@ +CCDEPMODE = @CCDEPMODE@ +CFLAGS = @CFLAGS@ +CPP = @CPP@ +CPPFLAGS = @CPPFLAGS@ +CSCOPE = @CSCOPE@ +CTAGS = @CTAGS@ +CXX = @CXX@ +CXXDEPMODE = @CXXDEPMODE@ +CXXFLAGS = @CXXFLAGS@ +CYGPATH_W = @CYGPATH_W@ +DEFS = @DEFS@ +DEPDIR = @DEPDIR@ +ECHO_C = @ECHO_C@ +ECHO_N = @ECHO_N@ +ECHO_T = @ECHO_T@ +EGREP = @EGREP@ +ETAGS = @ETAGS@ +EXEEXT = @EXEEXT@ +GREP = @GREP@ +INSTALL = @INSTALL@ +INSTALL_DATA = @INSTALL_DATA@ +INSTALL_PROGRAM = @INSTALL_PROGRAM@ +INSTALL_SCRIPT = @INSTALL_SCRIPT@ +INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ +LDFLAGS = @LDFLAGS@ +LIBOBJS = @LIBOBJS@ +LIBS = @LIBS@ +LTLIBOBJS = @LTLIBOBJS@ +MAINT = @MAINT@ +MAKEINFO = @MAKEINFO@ +MKDIR_P = @MKDIR_P@ +OBJEXT = @OBJEXT@ +PACKAGE = @PACKAGE@ +PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@ +PACKAGE_NAME = @PACKAGE_NAME@ +PACKAGE_STRING = @PACKAGE_STRING@ +PACKAGE_TARNAME = @PACKAGE_TARNAME@ +PACKAGE_URL = @PACKAGE_URL@ +PACKAGE_VERSION = @PACKAGE_VERSION@ +PATH_SEPARATOR = @PATH_SEPARATOR@ +PTHREAD_CC = @PTHREAD_CC@ +PTHREAD_CFLAGS = @PTHREAD_CFLAGS@ +PTHREAD_LIBS = @PTHREAD_LIBS@ +RANLIB = @RANLIB@ +SED = @SED@ +SET_MAKE = @SET_MAKE@ +SHELL = @SHELL@ +STRIP = @STRIP@ +STRIP_BEGIN = @STRIP_BEGIN@ +STRIP_DUMMY = @STRIP_DUMMY@ +STRIP_END = @STRIP_END@ +VERSION = @VERSION@ +WEB100_CFLAGS = @WEB100_CFLAGS@ +WEB100_CONFIG = @WEB100_CONFIG@ +WEB100_LIBS = @WEB100_LIBS@ +abs_builddir = @abs_builddir@ +abs_srcdir = @abs_srcdir@ +abs_top_builddir = @abs_top_builddir@ +abs_top_srcdir = @abs_top_srcdir@ +ac_ct_CC = @ac_ct_CC@ +ac_ct_CXX = @ac_ct_CXX@ +am__include = @am__include@ +am__leading_dot = @am__leading_dot@ +am__quote = @am__quote@ +am__tar = @am__tar@ +am__untar = @am__untar@ +ax_pthread_config = @ax_pthread_config@ +bindir = @bindir@ +build = @build@ +build_alias = @build_alias@ +build_cpu = @build_cpu@ +build_os = @build_os@ +build_vendor = @build_vendor@ +builddir = @builddir@ +datadir = @datadir@ +datarootdir = @datarootdir@ +docdir = @docdir@ +dvidir = @dvidir@ +exec_prefix = @exec_prefix@ +host = @host@ +host_alias = @host_alias@ +host_cpu = @host_cpu@ +host_os = @host_os@ +host_vendor = @host_vendor@ +htmldir = @htmldir@ +includedir = @includedir@ +infodir = @infodir@ +install_sh = @install_sh@ +libdir = @libdir@ +libexecdir = @libexecdir@ +localedir = @localedir@ +localstatedir = @localstatedir@ +mandir = @mandir@ +mkdir_p = @mkdir_p@ +oldincludedir = @oldincludedir@ +pdfdir = @pdfdir@ +prefix = @prefix@ +program_transform_name = @program_transform_name@ +psdir = @psdir@ +runstatedir = @runstatedir@ +sbindir = @sbindir@ +sharedstatedir = @sharedstatedir@ +srcdir = @srcdir@ +sysconfdir = @sysconfdir@ +target_alias = @target_alias@ +top_build_prefix = @top_build_prefix@ +top_builddir = @top_builddir@ +top_srcdir = @top_srcdir@ +EXTRA_DIST = flows.py ssh_nodes.py Readme.txt +all: all-am + +.SUFFIXES: +$(srcdir)/Makefile.in: @MAINTAINER_MODE_TRUE@ $(srcdir)/Makefile.am $(am__configure_deps) + @for dep in $?; do \ + case '$(am__configure_deps)' in \ + *$$dep*) \ + ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \ + && { if test -f $@; then exit 0; else break; fi; }; \ + exit 1;; \ + esac; \ + done; \ + echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu flows/Makefile'; \ + $(am__cd) $(top_srcdir) && \ + $(AUTOMAKE) --gnu flows/Makefile +Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status + @case '$?' in \ + *config.status*) \ + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \ + *) \ + echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles)'; \ + cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles);; \ + esac; + +$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh + +$(top_srcdir)/configure: @MAINTAINER_MODE_TRUE@ $(am__configure_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(ACLOCAL_M4): @MAINTAINER_MODE_TRUE@ $(am__aclocal_m4_deps) + cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh +$(am__aclocal_m4_deps): +tags TAGS: + +ctags CTAGS: + +cscope cscopelist: + +distdir: $(BUILT_SOURCES) + $(MAKE) $(AM_MAKEFLAGS) distdir-am + +distdir-am: $(DISTFILES) + @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ + topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \ + list='$(DISTFILES)'; \ + dist_files=`for file in $$list; do echo $$file; done | \ + sed -e "s|^$$srcdirstrip/||;t" \ + -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \ + case $$dist_files in \ + */*) $(MKDIR_P) `echo "$$dist_files" | \ + sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \ + sort -u` ;; \ + esac; \ + for file in $$dist_files; do \ + if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \ + if test -d $$d/$$file; then \ + dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \ + if test -d "$(distdir)/$$file"; then \ + find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ + fi; \ + if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \ + cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \ + find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \ + fi; \ + cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \ + else \ + test -f "$(distdir)/$$file" \ + || cp -p $$d/$$file "$(distdir)/$$file" \ + || exit 1; \ + fi; \ + done +check-am: all-am +check: check-am +all-am: Makefile +installdirs: +install: install-am +install-exec: install-exec-am +install-data: install-data-am +uninstall: uninstall-am + +install-am: all-am + @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am + +installcheck: installcheck-am +install-strip: + if test -z '$(STRIP)'; then \ + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + install; \ + else \ + $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \ + install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \ + "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \ + fi +mostlyclean-generic: + +clean-generic: + +distclean-generic: + -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES) + -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES) + +maintainer-clean-generic: + @echo "This command is intended for maintainers to use" + @echo "it deletes files that may require special tools to rebuild." +clean: clean-am + +clean-am: clean-generic mostlyclean-am + +distclean: distclean-am + -rm -f Makefile +distclean-am: clean-am distclean-generic + +dvi: dvi-am + +dvi-am: + +html: html-am + +html-am: + +info: info-am + +info-am: + +install-data-am: + +install-dvi: install-dvi-am + +install-dvi-am: + +install-exec-am: + +install-html: install-html-am + +install-html-am: + +install-info: install-info-am + +install-info-am: + +install-man: + +install-pdf: install-pdf-am + +install-pdf-am: + +install-ps: install-ps-am + +install-ps-am: + +installcheck-am: + +maintainer-clean: maintainer-clean-am + -rm -f Makefile +maintainer-clean-am: distclean-am maintainer-clean-generic + +mostlyclean: mostlyclean-am + +mostlyclean-am: mostlyclean-generic + +pdf: pdf-am + +pdf-am: + +ps: ps-am + +ps-am: + +uninstall-am: + +.MAKE: install-am install-strip + +.PHONY: all all-am check check-am clean clean-generic cscopelist-am \ + ctags-am distclean distclean-generic distdir dvi dvi-am html \ + html-am info info-am install install-am install-data \ + install-data-am install-dvi install-dvi-am install-exec \ + install-exec-am install-html install-html-am install-info \ + install-info-am install-man install-pdf install-pdf-am \ + install-ps install-ps-am install-strip installcheck \ + installcheck-am installdirs maintainer-clean \ + maintainer-clean-generic mostlyclean mostlyclean-generic pdf \ + pdf-am ps ps-am tags-am uninstall uninstall-am + +.PRECIOUS: Makefile + + +# Tell versions [3.59,3.63) of GNU make to not export all variables. +# Otherwise a system limit (for SysV at least) may be exceeded. +.NOEXPORT: diff --git a/flows/Readme.txt b/flows/Readme.txt new file mode 100644 index 0000000..3ff19d0 --- /dev/null +++ b/flows/Readme.txt @@ -0,0 +1,36 @@ +The following steps needs to be done before running pyflows test: + +1) Install Python 3.10 and above + +2) sudo dnf install python3-matplotlib + +3) install gnuplot 'sudo dnf install gnuplot' + +4) Update your ".bashrc" and add the location of your flows files "/your_local_dir/iperf2-code/flows" to PYTHONPATH: +export PYTHONPATH=/your_local_dir/iperf2-code/flows:$PYTHONPATH +echo $PYTHONPATH + +5) compile the following modules: +cd /your_local_dir/iperf2-code/flows +python3 -m py_compile flows.py ssh_nodes.py + +6) Configure the IP addresses and LAN addresses in the router_latency.py + +7) Make sure passwordless ssh is configured for all the ssh DUTs, e.g. + +[bm932125@rjm-wifibt-ctrl:/ltf-local/Code/LTF/pyflows/scripts] $ ssh-copy-id -i ~/.ssh/id_rsa.pub root@10.19.85.40 +/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/home/bm932125/.ssh/id_rsa.pub" +/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed +/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys +root@10.19.85.40's password: + +Number of key(s) added: 1 + +Now try logging into the machine, with: "ssh 'root@10.19.85.40'" +and check to make sure that only the key(s) you wanted were added. + +8) Make sure all the wireless devices are loaded and connected to the SSID + +9) Run the test: +cd /your_local_dir/iperf2-code/flows +python3 router_latency.py diff --git a/flows/flows.py b/flows/flows.py new file mode 100644 index 0000000..5465b6b --- /dev/null +++ b/flows/flows.py @@ -0,0 +1,1300 @@ +# ---------------------------------------------------------------- +# * Copyright (c) 2018-2023 +# * Broadcom Corporation +# * All Rights Reserved. +# *--------------------------------------------------------------- +# Redistribution and use in source and binary forms, with or without modification, are permitted +# provided that the following conditions are met: +# +# Redistributions of source code must retain the above copyright notice, this list of conditions +# and the following disclaimer. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. Neither the name of the Broadcom nor the names of +# contributors may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USEn, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Author Robert J. McMahon, Broadcom LTD +# Date April 2016 - December 2023 + +import re +import subprocess +import logging +import asyncio, sys +import time, datetime +import locale +import signal +import weakref +import os +import getpass +import math +import scipy +import scipy.spatial +import numpy as np +import tkinter +import ctypes +import ipaddress +import collections +import csv + +from datetime import datetime as datetime, timezone +from scipy import stats +from scipy.cluster import hierarchy +from scipy.cluster.hierarchy import linkage +import matplotlib.pyplot as plt +from collections import defaultdict + +logger = logging.getLogger(__name__) + +class iperf_flow(object): + port = 61000 + iperf = '/usr/bin/iperf' + instances = weakref.WeakSet() + _loop = None + flow_scope = ("flowstats") + tasks = [] + flowid2name = defaultdict(str) + + @classmethod + def get_instances(cls): + return list(iperf_flow.instances) + + @classmethod + @property + def loop(cls): + if not cls._loop : + try : + cls._loop = asyncio.get_running_loop() + except : + if os.name == 'nt': + # On Windows, the ProactorEventLoop is necessary to listen on pipes + cls._loop = asyncio.ProactorEventLoop() + else: + cls._loop = asyncio.new_event_loop() + return cls._loop + + + @classmethod + def close_loop(cls): + if iperf_flow.loop.is_running(): + iperf_flow.loop.run_until_complete(loop.shutdown_asyncgens()) + iperf_flow.loop.close() + + @classmethod + def sleep(cls, time=0, text=None, stoptext=None) : + if text : + logging.info('Sleep {} ({})'.format(time, text)) + iperf_flow.loop.run_until_complete(asyncio.sleep(time)) + if stoptext : + logging.info('Sleep done ({})'.format(stoptext)) + + + @classmethod + def run(cls, time=None, amount=None, flows='all', sample_delay=None, io_timer=None, preclean=True, parallel=None, epoch_sync=False) : + if flows == 'all' : + flows = iperf_flow.get_instances() + if not flows: + logging.warn('flow run method called with no flows instantiated') + return + + if preclean: + hosts = [flow.server for flow in flows] + hosts.extend([flow.client for flow in flows]) + hosts=list(set(hosts)) + tasks = [asyncio.ensure_future(iperf_flow.cleanup(user='root', host=host), loop=iperf_flow.loop) for host in hosts] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('preclean timeout') + raise + + logging.info('flow run invoked') + tasks = [asyncio.ensure_future(flow.rx.start(time=time), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow server start timeout') + raise + iperf_flow.sleep(time=0.3, text="wait for rx up", stoptext="rx up done") + + if epoch_sync : + dt = (datetime.now()).timestamp() + tsec = str(dt).split('.') + epoch_sync_time = int(tsec[0]) + 2 + else : + epoch_sync_time = None + + tasks = [asyncio.ensure_future(flow.tx.start(time=time, amount=amount, parallel=parallel, epoch_sync_time=epoch_sync_time), loop=iperf_flow.loop) for flow in flows] + + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow client start timeout') + raise + if sample_delay : + iperf_flow.sleep(time=0.3, text="ramp up", stoptext="ramp up done") + if io_timer : + tasks = [asyncio.ensure_future(flow.is_traffic(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow traffic check timeout') + raise + if time : + iperf_flow.sleep(time=time + 4, text="Running traffic start", stoptext="Stopping flows") + # Signal the remote iperf client sessions to stop them + tasks = [asyncio.ensure_future(flow.tx.signal_stop(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=3)) + except asyncio.TimeoutError: + logging.error('flow tx stop timeout') + raise + + elif amount: + tasks = [asyncio.ensure_future(flow.transmit_completed(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow tx completed timed out') + raise + logging.info('flow transmit completed') + + # Now signal the remote iperf server sessions to stop them + tasks = [asyncio.ensure_future(flow.rx.signal_stop(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=3)) + except asyncio.TimeoutError: + logging.error('flow tx stop timeout') + raise + + # iperf_flow.loop.close() + logging.info('flow run finished') + + @classmethod + def commence(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) : + if flows == 'all' : + flows = iperf_flow.get_instances() + if not flows: + logging.warn('flow run method called with no flows instantiated') + return + + if preclean: + hosts = [flow.server for flow in flows] + hosts.extend([flow.client for flow in flows]) + hosts=list(set(hosts)) + tasks = [asyncio.ensure_future(iperf_flow.cleanup(user='root', host=host), loop=iperf_flow.loop) for host in hosts] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('preclean timeout') + raise + + logging.info('flow start invoked') + tasks = [asyncio.ensure_future(flow.rx.start(time=time), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow server start timeout') + raise + iperf_flow.sleep(time=0.3, text="wait for rx up", stoptext="rx up done") + tasks = [asyncio.ensure_future(flow.tx.start(time=time), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow client start timeout') + raise + + @classmethod + def plot(cls, flows='all', title='None', directory='None') : + if flows == 'all' : + flows = iperf_flow.get_instances() + + tasks = [] + for flow in flows : + for this_name in flow.histogram_names : + path = directory + '/' + this_name + os.makedirs(path, exist_ok=True) + i = 0 + # group by name + histograms = [h for h in flow.histograms if h.name == this_name] + for histogram in histograms : + if histogram.ks_index is not None : + histogram.output_dir = directory + '/' + this_name + '/' + this_name + str(i) + else : + histogram.output_dir = directory + '/' + this_name + '/' + this_name + str(histogram.ks_index) + + logging.info('scheduling task {}'.format(histogram.output_dir)) + tasks.append(asyncio.ensure_future(histogram.async_plot(directory=histogram.output_dir, title=title), loop=iperf_flow.loop)) + i += 1 + try : + logging.info('runnings tasks') + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=600)) + except asyncio.TimeoutError: + logging.error('plot timed out') + raise + + + @classmethod + def cease(cls, flows='all') : + + if flows == 'all' : + flows = iperf_flow.get_instances() + + # Signal the remote iperf client sessions to stop them + tasks = [asyncio.ensure_future(flow.tx.signal_stop(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow tx stop timeout') + + # Now signal the remote iperf server sessions to stop them + tasks = [asyncio.ensure_future(flow.rx.signal_stop(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow rx stop timeout') + + @classmethod + async def cleanup(cls, host=None, sshcmd='/usr/bin/ssh', user='root') : + if host: + logging.info('ssh {}@{} pkill iperf'.format(user, host)) + childprocess = await asyncio.create_subprocess_exec(sshcmd, '{}@{}'.format(user, host), 'pkill', 'iperf', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('cleanup: host({}) stdout={} '.format(host, stdout)) + + @classmethod + def tos_to_txt(cls, tos) : + switcher = { + int(0x0) : "BE", + int(0x02) : "BK", + int(0xC0) : "VO", + int(0x80) : "VI", + } + return switcher.get(int(tos), None) + + @classmethod + def txt_to_tos(cls, txt) : + switcher = { + "BE" : "0x0", + "BESTEFFORT" : "0x0", + "0x0" : "0x0", + "BK" : "0x20", + "BACKGROUND" : "0x20", + "0x20" : "0x20", + "VO" : "0xC0", + "VOICE" : "0xC0", + "0xC0" : "0xC0", + "VI" : "0x80", + "VIDEO" : "0x80", + "0x80" : "0x80", + } + return switcher.get(txt.upper(), None) + + def __init__(self, name='iperf', server=None, client=None, user=None, proto='TCP', dstip='127.0.0.1', interval=1, format='b', offered_load=None, tos='BE', window='4M', src=None, srcip=None, srcport=None, dstport=None, debug=False, length=None, ipg=0.0, amount=None, trip_times=True, prefetch=None, latency=False, bb=False, working_load=False, bb_period=None, bb_hold=None, txstart_delay_sec=None, burst_size=None, burst_period=None, fullduplex=False, cca=None, tcp_tx_delay=None): + iperf_flow.instances.add(self) + self.name = name + self.latency = latency + if not dstport : + iperf_flow.port += 1 + self.dstport = iperf_flow.port + else: + self.dstport = dstport + self.dstip = dstip + self.srcip = srcip + self.srcport = srcport + try : + self.server = server.ipaddr + except AttributeError: + self.server = server + try : + self.client = client.ipaddr + except AttributeError: + self.client = client + + self.client_device = client.device + self.server_device = server.device + + if not user : + self.user = getpass.getuser() + else : + self.user = user + self.proto = proto + self.tcp_tx_delay = tcp_tx_delay + self.tos = tos + if length : + self.length = length + + if amount : + self.amount = amount + if trip_times : + self.trip_times = trip_times + if burst_period : + self.burst_period = burst_period + if burst_size : + self.burst_size = burst_size + + if txstart_delay_sec: + self.txstart_delay_sec = txstart_delay_sec + + if cca: + self.cca = cca + + self.interval = round(interval,3) + self.format = format + self.offered_load = offered_load + if self.offered_load : + if len(self.offered_load.split(':')) == 2 : + self.isoch = True + self.name += '-isoch' + else : + self.isoch = False + self.prefetch = prefetch + self.ipg = ipg + self.debug = debug + self.TRAFFIC_EVENT_TIMEOUT = round(self.interval * 4, 3) + self.bb = bb + self.working_load = working_load + self.bb_period = bb_period + self.bb_hold = bb_hold + self.fullduplex = fullduplex + # use python composition for the server and client + # i.e. a flow has a server and a client + self.rx = iperf_server(name='{}->RX({})'.format(name, str(self.server)), loop=iperf_flow.loop, host=self.server, flow=self, debug=self.debug) + self.tx = iperf_client(name='{}->TX({})'.format(name, str(self.client)), loop=iperf_flow.loop, host=self.client, flow=self, debug=self.debug) + self.rx.window=window + self.tx.window=window + self.ks_critical_p = 0.01 + self.stats_reset() + + #def __del__(self) : + # iperf_flow.instances.remove(self) + + def destroy(self) : + iperf_flow.instances.remove(self) + + def __getattr__(self, attr) : + if attr in self.flowstats : + return self.flowstats[attr] + + def stats_reset(self) : + # Initialize the flow stats dictionary + self.flowstats = {'current_rxbytes' : None , 'current_txbytes' : None , 'flowrate' : None, 'starttime' : None, 'flowid' : None, 'endtime' : None} + self.flowstats['txdatetime']=[] + self.flowstats['txbytes']=[] + self.flowstats['txthroughput']=[] + self.flowstats['writes']=[] + self.flowstats['errwrites']=[] + self.flowstats['retry']=[] + self.flowstats['cwnd']=[] + self.flowstats['rtt']=[] + self.flowstats['rxdatetime']=[] + self.flowstats['rxbytes']=[] + self.flowstats['rxthroughput']=[] + self.flowstats['reads']=[] + self.flowstats['histograms']=[] + self.flowstats['histogram_names'] = set() + self.flowstats['connect_time']=[] + self.flowstats['trip_time']=[] + self.flowstats['jitter']=[] + self.flowstats['rxlostpkts']=[] + self.flowstats['rxtotpkts']=[] + self.flowstats['meanlat']=[] + self.flowstats['minlat']=[] + self.flowstats['maxlat']=[] + self.flowstats['stdevlat']=[] + self.flowstats['rxpps']=[] + self.flowstats['inP']=[] + self.flowstats['inPvar']=[] + self.flowstats['rxpkts']=[] + self.flowstats['netPower']=[] + + async def start(self): + self.flowstats = {'current_rxbytes' : None , 'current_txbytes' : None , 'flowrate' : None, 'flowid' : None} + await self.rx.start() + await self.tx.start() + + async def is_traffic(self) : + if self.interval < 0.005 : + logging.warn('{} {}'.format(self.name, 'traffic check invoked without interval sampling')) + else : + self.rx.traffic_event.clear() + self.tx.traffic_event.clear() + logging.info('{} {}'.format(self.name, 'traffic check invoked')) + await self.rx.traffic_event.wait() + await self.tx.traffic_event.wait() + + async def transmit_completed(self) : + logging.info('{} {}'.format(self.name, 'waiting for transmit to complete')) + await self.tx.txcompleted.wait() + + async def stop(self): + self.tx.stop() + self.rx.stop() + + def stats(self): + logging.info('stats') + + def compute_ks_table(self, plot=True, directory='.', title=None) : + + if len(self.histogram_names) < 1 : + tmp = "***Failed. Expected 1 histogram_names, but instead got {0}".format(len(self.histogram_names)) + logging.info(tmp) + print(tmp) + #raise + + for this_name in self.histogram_names : + # group by name + histograms = [h for h in self.histograms if h.name == this_name] + for index, h in enumerate(histograms) : + h.ks_index = index + tmp = "{} KS Table has {} entries".format(self.name, len(histograms)) + logging.info(tmp) + print(tmp) + + self.condensed_distance_matrix = ([]) + + tasks = [] + for rowindex, h1 in enumerate(histograms) : + resultstr = rowindex * 'x' + maxp = None + minp = None + for h2 in histograms[rowindex:] : + d,p = stats.ks_2samp(h1.samples, h2.samples) + if h1 is not h2 : + self.condensed_distance_matrix = np.append(self.condensed_distance_matrix,d) + logging.debug('D,p={},{} cp={}'.format(str(d),str(p), str(self.ks_critical_p))) + if not minp or p < minp : + minp = p + if not maxp or (p != 1 and p > maxp) : + maxp = p + if p > self.ks_critical_p : + resultstr += '1' + else : + resultstr += '0' + if plot : + tasks.append(asyncio.ensure_future(flow_histogram.plot_two_sample_ks(h1=h1, h2=h2, flowname=self.name, title=title, directory=directory), loop=iperf_flow.loop)) + print('KS: {0}({1:3d}):{2} minp={3} ptest={4}'.format(this_name, rowindex, resultstr, str(minp), str(self.ks_critical_p))) + logging.info('KS: {0}({1:3d}):{2} minp={3} ptest={4}'.format(this_name, rowindex, resultstr, str(minp), str(self.ks_critical_p))) + if tasks : + try : + logging.debug('running KS table plotting coroutines for {} row {}'.format(this_name,str(rowindex))) + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=300)) + except asyncio.TimeoutError: + logging.error('plot timed out') + raise + logging.info('{} {}(condensed distance matrix)\n{}'.format(self.name, this_name,self.condensed_distance_matrix)) + self.linkage_matrix=linkage(self.condensed_distance_matrix, 'ward') + try : + plt.figure(figsize=(18,10)) + dn = hierarchy.dendrogram(self.linkage_matrix) + plt.title("{} {}".format(self.name, this_name)) + plt.savefig('{}/dn_{}_{}.png'.format(directory,self.name,this_name)) + logging.info('{} {}(distance matrix)\n{}'.format(self.name, this_name,scipy.spatial.distance.squareform(self.condensed_distance_matrix))) + print('{} {}(distance matrix)\n{}'.format(self.name, this_name,scipy.spatial.distance.squareform(self.condensed_distance_matrix))) + print('{} {}(cluster linkage)\n{}'.format(self.name,this_name,self.linkage_matrix)) + logging.info('{} {}(cluster linkage)\n{}'.format(self.name,this_name,self.linkage_matrix)) + flattened=scipy.cluster.hierarchy.fcluster(self.linkage_matrix, 0.75*self.condensed_distance_matrix.max(), criterion='distance') + print('{} {} Clusters:{}'.format(self.name, this_name, flattened)) + logging.info('{} {} Clusters:{}'.format(self.name, this_name, flattened)) + except: + pass + + def dump_stats(self, directory='.') : + logging.info("\n********************** dump_stats for flow {} **********************".format(self.name)) + + #logging.info('This flow Name={} id={} items_cnt={}'.format(iperf_flow.flowid2name[self.flowstats['flowid']], str(self.flowstats['flowid']), len(self.flowstats))) + #logging.info('All flows Name and id: {}'.format(str(iperf_flow.flowid2name))) + #logging.info('This flow Name={} flowstats={}'.format(self.name, str(self.flowstats))) + + csvfilename = os.path.join(directory, '{}.csv'.format(self.name)) + if not os.path.exists(directory): + logging.debug('Making results directory {}'.format(directory)) + os.makedirs(directory) + + logging.info("Writing stats to '{}'".format(csvfilename)) + + for stat_name in [stat for stat in self.flowstats.keys() if stat != 'histograms'] : + logging.info("{}={}".format(stat_name, str(self.flowstats[stat_name]))) + + with open(csvfilename, 'w', newline='') as fd : + keynames = self.flowstats.keys() + writer = csv.writer(fd) + writer.writerow(keynames) + writer.writerow([self.flowstats[keyname] for keyname in keynames]) + writer.writerow([h.samples for h in self.flowstats['histograms']]) + +class iperf_server(object): + + class IperfServerProtocol(asyncio.SubprocessProtocol): + def __init__(self, server, flow): + self.__dict__['flow'] = flow + self._exited = False + self._closed_stdout = False + self._closed_stderr = False + self._mypid = None + self._server = server + self._stdoutbuffer = "" + self._stderrbuffer = "" + + def __setattr__(self, attr, value): + if attr in iperf_flow.flow_scope: + self.flow.__setattr__(self.flow, attr, value) + else: + self.__dict__[attr] = value + + # methods and attributes not here are handled by the flow object, + # aka, the flow object delegates to this object per composition + def __getattr__(self, attr): + if attr in iperf_flow.flow_scope: + return getattr(self.flow, attr) + + @property + def finished(self): + return self._exited and self._closed_stdout and self._closed_stderr + + def signal_exit(self): + if not self.finished: + return + self._server.closed.set() + self._server.opened.clear() + + def connection_made(self, trans): + self._server.closed.clear() + self._mypid = trans.get_pid() + logging.debug('server connection made pid=({})'.format(self._mypid)) + + def pipe_data_received(self, fd, data): + if self.debug : + logging.debug('{} {}'.format(fd, data)) + data = data.decode("utf-8") + if fd == 1: + self._stdoutbuffer += data + while "\n" in self._stdoutbuffer: + line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1) + self._server.adapter.info('{} (stdout,{})'.format(line, self._server.remotepid)) + if not self._server.opened.is_set() : + m = self._server.regex_open_pid.match(line) + if m : + self._server.remotepid = m.group('pid') + self._server.opened.set() + logging.debug('{} pipe reading (stdout,{})'.format(self._server.name, self._server.remotepid)) + else : + if self._server.proto == 'TCP' : + m = self._server.regex_traffic.match(line) + if m : + timestamp = datetime.now() + if not self._server.traffic_event.is_set() : + self._server.traffic_event.set() + + bytes = float(m.group('bytes')) + if self.flowstats['current_txbytes'] : + flowrate = round((bytes / self.flowstats['current_txbytes']), 2) + # *consume* the current *txbytes* where the client pipe will repopulate on its next sample + # do this by setting the value to None + self.flowstats['current_txbytes'] = None + # logging.debug('{} flow ratio={:.2f}'.format(self._server.name, flowrate)) + self.flowstats['flowrate'] = flowrate + else : + # *produce* the current *rxbytes* so the client pipe can know this event occurred + # indicate this by setting the value to value + self.flowstats['current_rxbytes'] = bytes + self.flowstats['rxdatetime'].append(timestamp) + self.flowstats['rxbytes'].append(m.group('bytes')) + self.flowstats['rxthroughput'].append(m.group('throughput')) + self.flowstats['reads'].append(m.group('reads')) + else : + m = self._server.regex_trip_time.match(line) + if m : + self.flowstats['trip_time'].append(float(m.group('trip_time')) * 1000) + else : + m = self._server.regex_traffic_udp.match(line) + if m : + timestamp = datetime.now() + if not self._server.traffic_event.is_set() : + self._server.traffic_event.set() + self.flowstats['rxbytes'].append(m.group('bytes')) + self.flowstats['rxthroughput'].append(m.group('throughput')) + self.flowstats['jitter'].append(m.group('jitter')) + self.flowstats['rxlostpkts'].append(m.group('lost_pkts')) + self.flowstats['rxtotpkts'].append(m.group('tot_pkts')) + self.flowstats['meanlat'].append(m.group('lat_mean')) + self.flowstats['minlat'].append(m.group('lat_min')) + self.flowstats['maxlat'].append(m.group('lat_max')) + self.flowstats['stdevlat'].append(m.group('lat_stdev')) + self.flowstats['rxpps'].append(m.group('pps')) + self.flowstats['inP'].append(m.group('inP')) + self.flowstats['inPvar'].append(m.group('inPvar')) + self.flowstats['rxpkts'].append(m.group('pkts')) + self.flowstats['netPower'].append(m.group('netPower')) + m = self._server.regex_final_histogram_traffic.match(line) + if m : + timestamp = datetime.now(timezone.utc).astimezone() + self.flowstats['endtime']= timestamp + self.flowstats['histogram_names'].add(m.group('pdfname')) + this_histogram = flow_histogram(name=m.group('pdfname'),values=m.group('pdf'), population=m.group('population'), binwidth=m.group('binwidth'), starttime=self.flowstats['starttime'], endtime=timestamp, outliers=m.group('outliers'), uci=m.group('uci'), uci_val=m.group('uci_val'), lci=m.group('lci'), lci_val=m.group('lci_val')) + self.flowstats['histograms'].append(this_histogram) + logging.info('pdf {} found with bin width={} us'.format(m.group('pdfname'), m.group('binwidth'))) + + elif fd == 2: + self._stderrbuffer += data + while "\n" in self._stderrbuffer: + line, self._stderrbuffer = self._stderrbuffer.split("\n", 1) + logging.info('{} {} (stderr)'.format(self._server.name, line)) + m = self._server.regex_rx_bind_failed.match(line) + if m : + logging.error('RX Bind Failed. Check LAN / WLAN between server and client.') + iperf_flow.loop.stop() + raise + + def pipe_connection_lost(self, fd, exc): + if fd == 1: + self._closed_stdout = True + logging.debug('stdout pipe to {} closed (exception={})'.format(self._server.name, exc)) + elif fd == 2: + self._closed_stderr = True + logging.debug('stderr pipe to {} closed (exception={})'.format(self._server.name, exc)) + if self._closed_stdout and self._closed_stderr : + self.remotepid = None; + self.signal_exit() + + def process_exited(self): + logging.debug('subprocess with pid={} closed'.format(self._mypid)) + self._exited = True + self._mypid = None + self.signal_exit() + + class CustomAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['connid'], msg), kwargs + + def __init__(self, name='Server', loop=None, host='localhost', flow=None, debug=False): + self.__dict__['flow'] = flow + self.name = name + self.iperf = '/usr/local/bin/iperf' + self.ssh = '/usr/bin/ssh' + self.host = host + self.flow = flow + self.debug = debug + self.opened = asyncio.Event() + self.closed = asyncio.Event() + self.closed.set() + self.traffic_event = asyncio.Event() + self._transport = None + self._protocol = None + self.time = time + conn_id = '{}'.format(self.name) + self.adapter = self.CustomAdapter(logger, {'connid': conn_id}) + + # ex. [ 4] 0.00-0.50 sec 657090 Bytes 10513440 bits/sec 449 449:0:0:0:0:0:0:0 + self.regex_traffic = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>[0-9]+) Bytes\s+(?P<throughput>[0-9]+) bits/sec\s+(?P<reads>[0-9]+)') + self.regex_traffic_udp = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>[0-9]+) Bytes\s+(?P<throughput>[0-9]+) bits/sec\s+(?P<jitter>[0-9.]+)\sms\s(?P<lost_pkts>[0-9]+)/(?P<tot_pkts>[0-9]+).+(?P<lat_mean>[0-9.]+)/(?P<lat_min>[0-9.]+)/(?P<lat_max>[0-9.]+)/(?P<lat_stdev>[0-9.]+)\sms\s(?P<pps>[0-9]+)\spps\s+(?P<netPower>[0-9\.]+)\/(?P<inP>[0-9]+)\((?P<inPvar>[0-9]+)\)\spkts\s(?P<pkts>[0-9]+)') + self.regex_final_histogram_traffic = re.compile(r'\[\s*\d+\] (?P<timestamp>.*) sec\s+(?P<pdfname>[A-Za-z0-9\-]+)\(f\)-PDF: bin\(w=(?P<binwidth>[0-9]+)us\):cnt\((?P<population>[0-9]+)\)=(?P<pdf>.+)\s+\((?P<lci>[0-9\.]+)/(?P<uci>[0-9\.]+)/(?P<uci2>[0-9\.]+)%=(?P<lci_val>[0-9]+)/(?P<uci_val>[0-9]+)/(?P<uci_val2>[0-9]+),Outliers=(?P<outliers>[0-9]+),obl/obu=[0-9]+/[0-9]+\)') + # 0.0000-0.5259 trip-time (3WHS done->fin+finack) = 0.5597 sec + self.regex_trip_time = re.compile(r'.+trip\-time\s+\(3WHS\sdone\->fin\+finack\)\s=\s(?P<trip_time>\d+\.\d+)\ssec') + self.regex_rx_bind_failed = re.compile(r'listener bind failed: Cannot assign requested address') + + def __getattr__(self, attr): + return getattr(self.flow, attr) + + async def start(self, time=time): + if not self.closed.is_set() : + return + + # ex. Server listening on TCP port 61003 with pid 2565 + self.regex_open_pid = re.compile(r'^Server listening on {} port {} with pid (?P<pid>\d+)'.format(self.proto, str(self.dstport))) + + self.opened.clear() + self.remotepid = None + if time : + iperftime = time + 30 + self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-s', '-p ' + str(self.dstport), '-P 1', '-e', '-t ' + str(iperftime), '-f{}'.format(self.format), '-w' , self.window, '--realtime'] + else : + self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-s', '-p ' + str(self.dstport), '-P 1', '-e', '-f{}'.format(self.format), '-w' , self.window, '--realtime'] + if self.interval >= 0.005 : + self.sshcmd.extend(['-i ', str(self.interval)]) + if self.server_device and self.srcip : + self.sshcmd.extend(['-B ', '{}%{}'.format(self.dstip, self.server_device)]) + if self.proto == 'UDP' : + self.sshcmd.extend(['-u']) + if self.latency : + self.sshcmd.extend(['--histograms=100u,100000,50,95']) + self.sshcmd.extend(['--jitter-histograms']) + + logging.info('{}'.format(str(self.sshcmd))) + self._transport, self._protocol = await iperf_flow.loop.subprocess_exec(lambda: self.IperfServerProtocol(self, self.flow), *self.sshcmd) + await self.opened.wait() + + async def signal_stop(self): + if self.remotepid and not self.finished : + childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-HUP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug('({}) sending signal HUP to {} (pid={})'.format(self.user, self.host, self.remotepid)) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('kill remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) + if not self.closed.is_set() : + await self.closed.wait() + logging.info('await kill completed remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) + logging.info('kill remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) + + +class iperf_client(object): + + # Asyncio protocol for subprocess transport + class IperfClientProtocol(asyncio.SubprocessProtocol): + def __init__(self, client, flow): + self.__dict__['flow'] = flow + self._exited = False + self._closed_stdout = False + self._closed_stderr = False + self._mypid = None + self._client = client + self._stdoutbuffer = "" + self._stderrbuffer = "" + + def __setattr__(self, attr, value): + if attr in iperf_flow.flow_scope: + self.flow.__setattr__(self.flow, attr, value) + else: + self.__dict__[attr] = value + + def __getattr__(self, attr): + if attr in iperf_flow.flow_scope: + return getattr(self.flow, attr) + + @property + def finished(self): + return self._exited and self._closed_stdout and self._closed_stderr + + def signal_exit(self): + if not self.finished: + return + self._client.closed.set() + self._client.opened.clear() + self._client.txcompleted.set() + + def connection_made(self, trans): + self._client.closed.clear() + self._mypid = trans.get_pid() + logging.debug('client connection made pid=({})'.format(self._mypid)) + + def pipe_data_received(self, fd, data): + if self.debug : + logging.debug('{} {}'.format(fd, data)) + data = data.decode("utf-8") + if fd == 1: + self._stdoutbuffer += data + while "\n" in self._stdoutbuffer: + line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1) + self._client.adapter.info('{} (stdout,{})'.format(line, self._client.remotepid)) + if not self._client.opened.is_set() : + m = self._client.regex_open_pid.match(line) + if m : + self._client.opened.set() + self._client.remotepid = m.group('pid') + self.flowstats['starttime'] = datetime.now(timezone.utc).astimezone() + logging.debug('{} pipe reading at {} (stdout,{})'.format(self._client.name, self.flowstats['starttime'].isoformat(), self._client.remotepid)) + else : + if self.flowstats['flowid'] is None : + m = self._client.regex_flowid.match(line) + if m : + # [ 1] local 192.168.1.15%enp1s0 port 7001 connected with 192.168.1.232 port 7001 (trip-times) (sock=3) on 2021-10-11 14:39:45 (PDT) + # self.regex_flowid = re.compile(r'local\s(?P<srcip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}).*\sport\s(?P<srcport>[0-9]+)\sconnected with\s(?P<dstip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3})\sport\s(?P<dstport>[0-9]+)') + # + # temp = htonl(config->src_ip); + # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); + # temp = htonl(config->dst_ip); + # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); + # temp = (hton16(config->dst_port) << 16) | hton16(config->src_port); + # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); + # temp = config->proto; + # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); + # return "%08x" % netip + # NOTE: the network or big endian byte order + srcipaddr = ipaddress.ip_address(m.group('srcip')) + srcip32 = ctypes.c_uint32(int.from_bytes(srcipaddr.packed, byteorder='little', signed=False)) + dstipaddr = ipaddress.ip_address(m.group('dstip')) + dstip32 = ctypes.c_uint32(int.from_bytes(dstipaddr.packed, byteorder='little', signed=False)) + dstportbytestr = int(m.group('dstport')).to_bytes(2, byteorder='big', signed=False) + dstport16 = ctypes.c_uint16(int.from_bytes(dstportbytestr, byteorder='little', signed=False)) + srcportbytestr = int(m.group('srcport')).to_bytes(2, byteorder='big', signed=False) + srcport16 = ctypes.c_uint16(int.from_bytes(srcportbytestr, byteorder='little', signed=False)) + ports32 = ctypes.c_uint32((dstport16.value << 16) | srcport16.value) + if self._client.proto == 'UDP': + proto32 = ctypes.c_uint32(0x11) + else : + proto32 = ctypes.c_uint32(0x06) + quintuplehash = srcip32.value ^ dstip32.value ^ ports32.value ^ proto32.value + self.flowstats['flowid'] = '0x{:08x}'.format(quintuplehash) + if self._client.flow.name : + flowkey = self._client.flow.name + else : + flowkey = '0x{:08x}'.format(quintuplehash) + iperf_flow.flowid2name[self.flowstats['flowid']] = flowkey + logging.info('Flow quintuple hash of {} uses name {}'.format(self.flowstats['flowid'], flowkey)) + + if self._client.proto == 'TCP': + m = self._client.regex_traffic.match(line) + if m : + timestamp = datetime.now() + if not self._client.traffic_event.is_set() : + self._client.traffic_event.set() + + bytes = float(m.group('bytes')) + if self.flowstats['current_rxbytes'] : + flowrate = round((self.flowstats['current_rxbytes'] / bytes), 2) + # *consume* the current *rxbytes* where the server pipe will repopulate on its next sample + # do this by setting the value to None + self.flowstats['current_rxbytes'] = None + # logging.debug('{} flow ratio={:.2f}'.format(self._client.name, flowrate)) + self.flowstats['flowrate'] = flowrate + else : + # *produce* the current txbytes so the server pipe can know this event occurred + # indicate this by setting the value to value + self.flowstats['current_txbytes'] = bytes + + self.flowstats['txdatetime'].append(timestamp) + self.flowstats['txbytes'].append(m.group('bytes')) + self.flowstats['txthroughput'].append(m.group('throughput')) + self.flowstats['writes'].append(m.group('writes')) + self.flowstats['errwrites'].append(m.group('errwrites')) + self.flowstats['retry'].append(m.group('retry')) + self.flowstats['cwnd'].append(m.group('cwnd')) + self.flowstats['rtt'].append(m.group('rtt')) + else : + m = self._client.regex_connect_time.match(line) + if m : + self.flowstats['connect_time'].append(float(m.group('connect_time'))) + else : + pass + + elif fd == 2: + self._stderrbuffer += data + while "\n" in self._stderrbuffer: + line, self._stderrbuffer = self._stderrbuffer.split("\n", 1) + logging.info('{} {} (stderr)'.format(self._client.name, line)) + m = self._client.regex_tx_bind_failed.match(line) + if m : + logging.error('TX Bind Failed. Check LAN / WLAN between server and client.') + iperf_flow.loop.stop() + raise + + def pipe_connection_lost(self, fd, exc): + if fd == 1: + logging.debug('stdout pipe to {} closed (exception={})'.format(self._client.name, exc)) + self._closed_stdout = True + elif fd == 2: + logging.debug('stderr pipe to {} closed (exception={})'.format(self._client.name, exc)) + self._closed_stderr = True + self.signal_exit() + + def process_exited(self): + logging.debug('subprocess with pid={} closed'.format(self._mypid)) + self._exited = True + self._mypid = None + self.signal_exit() + + class CustomAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['connid'], msg), kwargs + + def __init__(self, name='Client', loop=None, host='localhost', flow = None, debug=False): + self.__dict__['flow'] = flow + self.opened = asyncio.Event() + self.closed = asyncio.Event() + self.txcompleted = asyncio.Event() + self.closed.set() + self.txcompleted.clear() + self.traffic_event = asyncio.Event() + self.name = name + self.iperf = '/usr/local/bin/iperf' + self.ssh = '/usr/bin/ssh' + self.host = host + self.debug = debug + self.flow = flow + self._transport = None + self._protocol = None + conn_id = '{}'.format(self.name) + self.adapter = self.CustomAdapter(logger, {'connid': conn_id}) + # traffic ex: [ 3] 0.00-0.50 sec 655620 Bytes 10489920 bits/sec 14/211 446 446K/0 us + self.regex_traffic = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>\d+) Bytes\s+(?P<throughput>\d+) bits/sec\s+(?P<writes>\d+)/(?P<errwrites>\d+)\s+(?P<retry>\d+)\s+(?P<cwnd>\d+)K/(?P<rtt>\d+) us') + self.regex_connect_time = re.compile(r'\[\s+\d+]\slocal.*\(ct=(?P<connect_time>\d+\.\d+) ms\)') + # local 192.168.1.4 port 56949 connected with 192.168.1.1 port 61001 + self.regex_flowid = re.compile(r'\[\s+\d+]\slocal\s(?P<srcip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}).*\sport\s(?P<srcport>[0-9]+)\sconnected with\s(?P<dstip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3})\sport\s(?P<dstport>[0-9]+)') + self.regex_tx_bind_failed = re.compile(r'bind failed: Cannot assign requested address') + + def __getattr__(self, attr): + return getattr(self.flow, attr) + + async def start(self, time=None, amount=None, parallel=None, epoch_sync_time=None): + if not self.closed.is_set() : + return + + self.opened.clear() + self.txcompleted.clear() + self.remotepid = None + self.flowstats['flowid']=None + + # Client connecting to 192.168.100.33, TCP port 61009 with pid 1903 + self.regex_open_pid = re.compile(r'Client connecting to .*, {} port {} with pid (?P<pid>\d+)'.format(self.proto, str(self.dstport))) + if self.client_device : + client_dst = self.dstip + '%' + self.client_device + else : + client_dst = self.dstip + self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-c', client_dst, '-p ' + str(self.dstport), '-e', '-f{}'.format(self.format), '-w' , self.window ,'--realtime'] + if self.tcp_tx_delay : + self.sshcmd.extend(['--tcp-tx-delay', self.tcp_tx_delay]) + if self.tos : + self.sshcmd.extend(['-S ', self.tos]) + if self.length : + self.sshcmd.extend(['-l ', str(self.length)]) + if time: + self.sshcmd.extend(['-t ', str(time)]) + elif amount: + iperftime = time + self.sshcmd.extend(['-n ', amount]) + if parallel : + self.sshcmd.extend(['-P', str(parallel)]) + if self.trip_times : + self.sshcmd.extend(['--trip-times']) + if self.prefetch : + self.sshcmd.extend(['--tcp-write-prefetch', self.prefetch]) + self.sshcmd.extend(['--histograms=1m,100000,5,95']) + + if self.srcip : + if self.srcport : + self.sshcmd.extend(['-B ', '{}:{}'.format(self.srcip, self.srcport)]) + else : + self.sshcmd.extend(['-B {}'.format(self.srcip)]) + + if self.cca : + self.sshcmd.extend(['-Z ', self.cca]) + if self.interval >= 0.005 : + self.sshcmd.extend(['-i ', str(self.interval)]) + + if self.proto == 'UDP' : + self.sshcmd.extend(['-u ']) + if self.isoch : + self.sshcmd.extend(['--isochronous=' + self.offered_load, ' --ipg ', str(self.ipg)]) + elif self.offered_load : + self.sshcmd.extend(['-b', self.offered_load]) + elif self.proto == 'TCP' and self.offered_load : + self.sshcmd.extend(['-b', self.offered_load]) + elif self.proto == 'TCP' and self.burst_size and self.burst_period : + self.sshcmd.extend(['--burst-size', str(self.burst_size)]) + self.sshcmd.extend(['--burst-period', str(self.burst_period)]) + elif self.proto == 'TCP' and self.bb : + self.sshcmd.extend(['--bounceback']) + self.sshcmd.extend(['--bounceback-hold', str(self.bb_hold)]) + self.sshcmd.extend(['--bounceback-period', str(self.bb_period)]) + elif self.proto == 'TCP' and self.offered_load : + self.sshcmd.extend(['-b', self.offered_load]) + if not self.bb and self.fullduplex : + self.sshcmd.extend(['--full-duplex', str(" ")]) + + if self.flow.bb : + self.sshcmd.extend(['--bounceback']) + if self.flow.working_load : + self.sshcmd.extend(['--working-load']) + + if epoch_sync_time : + self.sshcmd.extend(['--txstart-time', str(epoch_sync_time)]) + + elif self.txstart_delay_sec : + # use incoming txstart_delay_sec and convert it to epoch_time_sec to use with '--txstart-time' iperf parameter + logging.info('{}'.format(str(datetime.now()))) + epoch_time_sec = (datetime.now()).timestamp() + logging.info('Current epoch_time_sec = {}'.format(str(epoch_time_sec))) + new_txstart_time = epoch_time_sec + self.txstart_delay_sec + logging.info('new_txstart_time = {}'.format(str(new_txstart_time))) + self.sshcmd.extend(['--txstart-time', str(new_txstart_time)]) + + logging.info('{}'.format(str(self.sshcmd))) + try : + self._transport, self._protocol = await iperf_flow.loop.subprocess_exec(lambda: self.IperfClientProtocol(self, self.flow), *self.sshcmd) + await self.opened.wait() + except: + logging.error('flow client start error per: {}'.format(str(self.sshcmd))) + pass + + async def signal_stop(self): + if self.remotepid and not self.finished : + childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-HUP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug('({}) sending signal HUP to {} (pid={})'.format(self.user, self.host, self.remotepid)) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('{}({}) {}'.format(self.user, self.host, stdout)) + if not self.closed.is_set(): + await self.closed.wait() + + async def signal_pause(self): + if self.remotepid : + childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-STOP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug('({}) sending signal STOP to {} (pid={})'.format(self.user, self.host, self.remotepid)) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('{}({}) {}'.format(self.user, self.host, stdout)) + if not self.closed.is_set(): + await self.closed.wait() + + async def signal_resume(self): + if self.remotepid : + childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-CONT', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug('({}) sending signal CONT to {} (pid={})'.format(self.user, self.host, self.remotepid)) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('{}({}) {}'.format(self.user, self.host, stdout)) + if not self.closed.is_set(): + await self.closed.wait() + +class flow_histogram(object): + + @classmethod + async def plot_two_sample_ks(cls, h1=None, h2=None, outputtype='png', directory='.', flowname=None, title=None): + + lci_val = int(h2.lci_val) * h2.binwidth + uci_val = int(h2.uci_val) * h2.binwidth + mytitle = '{} {} two sample KS({},{}) ({} samples) {}/{}%={}/{} us outliers={}\\n{}'.format(flowname, h1.name, h1.ks_index, h2.ks_index, h2.population, h2.lci, h2.uci, lci_val, uci_val, h2.outliers, title) + if h1.basefilename is None : + h1.output_dir = directory + '/' + flowname + h1.name + '/' + h1.name + '_' + str(h1.ks_index) + await h1.write(directory=h1.output_dir) + + if h2.basefilename is None : + h2.output_dir = directory + '/' + flowname + h2.name + '/' + h2.name + '_' + str(h2.ks_index) + await h2.write(directory=h2.output_dir) + + if (h1.basefilename is not None) and (h2.basefilename is not None) : + basefilename = '{}_{}_{}'.format(h1.basefilename, h1.ks_index, h2.ks_index) + gpcfilename = basefilename + '.gpc' + #write out the gnuplot control file + with open(gpcfilename, 'w') as fid : + if outputtype == 'canvas' : + fid.write('set output \"{}.{}\"\n'.format(basefilename, 'html')) + fid.write('set terminal canvas standalone mousing size 1024,768\n') + if outputtype == 'svg' : + fid.write('set output \"{}_svg.{}\"\n'.format(basefilename, 'html')) + fid.write('set terminal svg size 1024,768 dynamic mouse\n') + else : + fid.write('set output \"{}.{}\"\n'.format(basefilename, 'png')) + fid.write('set terminal png size 1024,768\n') + + fid.write('set key bottom\n') + fid.write('set title \"{}\" noenhanced\n'.format(mytitle)) + if float(uci_val) < 400: + fid.write('set format x \"%.2f"\n') + else : + fid.write('set format x \"%.1f"\n') + fid.write('set format y \"%.1f"\n') + fid.write('set yrange [0:1.01]\n') + fid.write('set y2range [0:*]\n') + fid.write('set ytics add 0.1\n') + fid.write('set y2tics nomirror\n') + fid.write('set grid\n') + fid.write('set xlabel \"time (ms)\\n{} - {}\"\n'.format(h1.starttime, h2.endtime)) + default_minx = -0.5 + if float(uci_val) < 0.4: + fid.write('set xrange [{}:0.4]\n'.format(default_minx)) + fid.write('set xtics auto\n') + elif h1.max < 2.0 and h2.max < 2.0 : + fid.write('set xrange [{}:2]\n'.format(default_minx)) + fid.write('set xtics auto\n') + elif h1.max < 5.0 and h2.max < 5.0 : + fid.write('set xrange [{}:5]\n'.format(default_minx)) + fid.write('set xtics auto\n') + elif h1.max < 10.0 and h2.max < 10.0: + fid.write('set xrange [{}:10]\n'.format(default_minx)) + fid.write('set xtics add 1\n') + elif h1.max < 20.0 and h2.max < 20.0 : + fid.write('set xrange [{}:20]\n'.format(default_minx)) + fid.write('set xtics add 1\n') + fid.write('set format x \"%.0f"\n') + elif h1.max < 40.0 and h2.max < 40.0: + fid.write('set xrange [{}:40]\n'.format(default_minx)) + fid.write('set xtics add 5\n') + fid.write('set format x \"%.0f"\n') + elif h1.max < 50.0 and h2.max < 50.0: + fid.write('set xrange [{}:50]\n'.format(default_minx)) + fid.write('set xtics add 5\n') + fid.write('set format x \"%.0f"\n') + elif h1.max < 75.0 and h2.max < 75.0: + fid.write('set xrange [{}:75]\n'.format(default_minx)) + fid.write('set xtics add 5\n') + fid.write('set format x \"%.0f"\n') + elif h1.max < 100.0 and h2.max < 100.0 : + fid.write('set xrange [{}:100]\n'.format(default_minx)) + fid.write('set xtics add 10\n') + fid.write('set format x \"%.0f"\n') + else : + fid.write('set xrange [{}:*]\n'.format(default_minx)) + fid.write('set xtics auto\n') + fid.write('set format x \"%.0f"\n') + fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{1}\" using 1:2 index 0 axes x1y2 with impulses linetype 2 notitle, \"{1}\" using 1:3 index 0 axes x1y1 with lines linetype 1 linewidth 2 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(h1.datafilename, h2.datafilename)) + + childprocess = await asyncio.create_subprocess_exec(flow_histogram.gnuplot,gpcfilename, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = await childprocess.communicate() + if stderr : + logging.error('Exec {} {}'.format(flow_histogram.gnuplot, gpcfilename)) + else : + logging.debug('Exec {} {}'.format(flow_histogram.gnuplot, gpcfilename)) + + gnuplot = '/usr/bin/gnuplot' + def __init__(self, binwidth=None, name=None, values=None, population=None, starttime=None, endtime=None, title=None, outliers=None, lci = None, uci = None, lci_val = None, uci_val = None) : + self.raw = values + self._entropy = None + self._ks_1samp_dist = None + self.bins = self.raw.split(',') + self.name = name + self.ks_index = None + self.population = int(population) + self.samples = np.zeros(int(self.population)) + self.binwidth = int(binwidth) + self.createtime = datetime.now(timezone.utc).astimezone() + self.starttime=starttime + self.endtime=endtime + self.title=title + self.outliers=outliers + self.uci = uci + self.uci_val = uci_val + self.lci = lci + self.lci_val = lci_val + self.basefilename = None + ix = 0 + for bin in self.bins : + x,y = bin.split(':') + for i in range(int(y)) : + self.samples[ix] = x + ix += 1 + + @property + def entropy(self) : + if not self._entropy : + self._entropy = 0 + for bin in self.bins : + x,y = bin.split(':') + y1 = float(y) / float(self.population) + self._entropy -= y1 * math.log2(y1) + return self._entropy + + @property + def ks_1samp_dist(self): + if not self._ks_1samp_dist : + self._ks_1samp_dist,p = stats.ks_1samp(self.samples, stats.norm.cdf) + return self._ks_1samp_dist + + @property + def ampdu_dump(self) : + return self._ampdu_rawdump + + @ampdu_dump.setter + def ampdu_dump(self, value): + self._ampdu_rawdump = value + + async def __exec_gnuplot(self) : + logging.info('Plotting {} {}'.format(self.name, self.gpcfilename)) + childprocess = await asyncio.create_subprocess_exec(flow_histogram.gnuplot, self.gpcfilename, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = await childprocess.communicate() + if stderr : + logging.error('Exec {} {}'.format(flow_histogram.gnuplot, self.gpcfilename)) + else : + logging.debug('Exec {} {}'.format(flow_histogram.gnuplot, self.gpcfilename)) + + async def write(self, directory='.', filename=None) : + # write out the datafiles for the plotting tool, e.g. gnuplot + if filename is None: + filename = self.name + + if not os.path.exists(directory): + logging.debug('Making results directory {}'.format(directory)) + os.makedirs(directory) + + logging.debug('Writing {} results to directory {}'.format(directory, filename)) + basefilename = os.path.join(directory, filename) + datafilename = os.path.join(directory, filename + '.data') + self.max = None + with open(datafilename, 'w') as fid : + cummulative = 0.0 + for bin in self.bins : + x,y = bin.split(':') + #logging.debug('bin={} x={} y={}'.format(bin, x, y)) + if (float(y) > 1.0) or ((cummulative / float(self.population)) < 0.99) : + cummulative += float(y) + perc = cummulative / float(self.population) + self.max = float(x) * float(self.binwidth) / 1000.0 # max is the last value + fid.write('{} {} {}\n'.format((float(x) * float(self.binwidth) / 1000.0), int(y), perc)) + + self.basefilename = basefilename + self.datafilename = datafilename + + async def async_plot(self, title=None, directory='.', outputtype='png', filename=None) : + if self.basefilename is None : + await self.write(directory=directory, filename=filename) + + if self.basefilename is not None : + self.gpcfilename = self.basefilename + '.gpc' + #write out the gnuplot control file + with open(self.gpcfilename, 'w') as fid : + if outputtype == 'canvas' : + fid.write('set output \"{}.{}\"\n'.format(basefilename, 'html')) + fid.write('set terminal canvas standalone mousing size 1024,768\n') + if outputtype == 'svg' : + fid.write('set output \"{}_svg.{}\"\n'.format(basefilename, 'html')) + fid.write('set terminal svg size 1024,768 dynamic mouse\n') + else : + fid.write('set output \"{}.{}\"\n'.format(basefilename, 'png')) + fid.write('set terminal png size 1024,768\n') + + if not title and self.title : + title = self.title + + fid.write('set key bottom\n') + if self.ks_index is not None : + fid.write('set title \"{}({}) {}({}) E={}\" noenhanced\n'.format(self.name, str(self.ks_index), title, int(self.population), self.entropy)) + else : + fid.write('set title \"{}{}({}) E={}\" noenhanced\n'.format(self.name, title, int(self.population), self.entropy)) + fid.write('set format x \"%.0f"\n') + fid.write('set format y \"%.1f"\n') + fid.write('set yrange [0:1.01]\n') + fid.write('set y2range [0:*]\n') + fid.write('set ytics add 0.1\n') + fid.write('set y2tics nomirror\n') + fid.write('set grid\n') + fid.write('set xlabel \"time (ms)\\n{} - {}\"\n'.format(self.starttime, self.endtime)) + if self.max < 5.0 : + fid.write('set xrange [0:5]\n') + fid.write('set xtics auto\n') + elif self.max < 10.0 : + fid.write('set xrange [0:10]\n') + fid.write('set xtics add 1\n') + elif self.max < 20.0 : + fid.write('set xrange [0:20]\n') + fid.write('set xtics add 1\n') + elif self.max < 40.0 : + fid.write('set xrange [0:40]\n') + fid.write('set xtics add 5\n') + elif self.max < 50.0 : + fid.write('set xrange [0:50]\n') + fid.write('set xtics add 5\n') + elif self.max < 75.0 : + fid.write('set xrange [0:75]\n') + fid.write('set xtics add 5\n') + else : + fid.write('set xrange [0:100]\n') + fid.write('set xtics add 10\n') + fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(datafilename)) + + if outputtype == 'png' : + # Create a thumbnail too + fid.write('unset output; unset xtics; unset ytics; unset key; unset xlabel; unset ylabel; unset border; unset grid; unset yzeroaxis; unset xzeroaxis; unset title; set lmargin 0; set rmargin 0; set tmargin 0; set bmargin 0\n') + fid.write('set output \"{}_thumb.{}\"\n'.format(basefilename, 'png')) + fid.write('set terminal png transparent size 64,32 crop\n') + fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(datafilename)) + + await self.__exec_gnuplot() diff --git a/flows/ssh_nodes.py b/flows/ssh_nodes.py new file mode 100755 index 0000000..73e69f3 --- /dev/null +++ b/flows/ssh_nodes.py @@ -0,0 +1,499 @@ +# --------------------------------------------------------------- +# * Copyright (c) 2018-2023 +# * Broadcom Corporation +# * All Rights Reserved. +# *--------------------------------------------------------------- +# Redistribution and use in source and binary forms, with or without modification, are permitted +# provided that the following conditions are met: +# +# Redistributions of source code must retain the above copyright notice, this list of conditions +# and the following disclaimer. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. Neither the name of the Broadcom nor the names of +# contributors may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Author Robert J. McMahon, Broadcom LTD +# +# Python object to support sending remote commands to a host +# +# Date April 2018 - December 2023 + +import logging +import asyncio, subprocess +import time, datetime +import weakref +import os +import re + +from datetime import datetime as datetime, timezone + +logger = logging.getLogger(__name__) + +class ssh_node: + DEFAULT_IO_TIMEOUT = 30.0 + DEFAULT_CMD_TIMEOUT = 30 + DEFAULT_CONNECT_TIMEOUT = 60.0 + rexec_tasks = [] + _loop = None + instances = weakref.WeakSet() + periodic_cmd_futures = [] + periodic_cmd_running_event = asyncio.Event() + periodic_cmd_done_event = asyncio.Event() + + @classmethod + @property + def loop(cls): + if not cls._loop : + try : + cls._loop = asyncio.get_running_loop() + except : + if os.name == 'nt': + # On Windows, the ProactorEventLoop is necessary to listen on pipes + cls._loop = asyncio.ProactorEventLoop() + else: + cls._loop = asyncio.new_event_loop() + return cls._loop + + @classmethod + def sleep(cls, time=0, text=None, stoptext=None) : + if text : + logging.info('Sleep {} ({})'.format(time, text)) + ssh_node.loop.run_until_complete(asyncio.sleep(time)) + if stoptext : + logging.info('Sleep done ({})'.format(stoptext)) + + @classmethod + def get_instances(cls): + try : + return list(ssh_node.instances) + except NameError : + return [] + + @classmethod + def run_all_commands(cls, timeout=None, text=None, stoptext=None) : + if ssh_node.rexec_tasks : + if text : + logging.info('Run all tasks: {})'.format(time, text)) + ssh_node.loop.run_until_complete(asyncio.wait(ssh_node.rexec_tasks, timeout=timeout)) + if stoptext : + logging.info('Commands done ({})'.format(stoptext)) + ssh_node.rexec_tasks = [] + + @classmethod + def open_consoles(cls, silent_mode=False) : + nodes = ssh_node.get_instances() + node_names = [] + tasks = [] + for node in nodes: + if node.sshtype.lower() == 'ssh' : + tasks.append(asyncio.ensure_future(node.clean(), loop=ssh_node.loop)) + if tasks : + logging.info('Run consoles clean') + try : + ssh_node.loop.run_until_complete(asyncio.wait(tasks, timeout=20)) + except asyncio.TimeoutError: + logging.error('console cleanup timeout') + + tasks = [] + ipaddrs = [] + for node in nodes : + #see if we need control master to be started + if node.ssh_speedups and not node.ssh_console_session and node.ipaddr not in ipaddrs: + logging.info('Run consoles speedup') + node.ssh_console_session = ssh_session(name=node.name, hostname=node.ipaddr, node=node, control_master=True, ssh_speedups=True, silent_mode=silent_mode) + node.console_task = asyncio.ensure_future(node.ssh_console_session.post_cmd(cmd='/usr/bin/dmesg -w', IO_TIMEOUT=None, CMD_TIMEOUT=None), loop=ssh_node.loop) + tasks.append(node.console_task) + ipaddrs.append(node.ipaddr) + node_names.append(node.name) + + if tasks : + s = " " + logging.info('Opening consoles: {}'.format(s.join(node_names))) + try : + ssh_node.loop.run_until_complete(asyncio.wait(tasks, timeout=60)) + except asyncio.TimeoutError: + logging.error('open console timeout') + raise + + if tasks : + # Sleep to let the control masters settle + ssh_node.loop.run_until_complete(asyncio.sleep(1)) + logging.info('open_consoles done') + + @classmethod + def close_consoles(cls) : + nodes = ssh_node.get_instances() + tasks = [] + node_names = [] + for node in nodes : + if node.ssh_console_session : + node.console_task = asyncio.ensure_future(node.ssh_console_session.close(), loop=ssh_node.loop) + tasks.append(node.console_task) + node_names.append(node.name) + + if tasks : + s = " " + logging.info('Closing consoles: {}'.format(s.join(node_names))) + ssh_node.loop.run_until_complete(asyncio.wait(tasks, timeout=60)) + logging.info('Closing consoles done: {}'.format(s.join(node_names))) + + @classmethod + def periodic_cmds_stop(cls) : + logging.info("Stop periodic futures") + ssh_node.periodic_cmd_futures = [] + ssh_node.periodic_cmd_running_event.clear() + while not ssh_node.periodic_cmd_done_event.is_set() : + ssh_node.loop.run_until_complete(asyncio.sleep(0.25)) + logging.debug("Awaiting kill periodic futures") + logging.debug("Stop periodic futures done") + + def __init__(self, name=None, ipaddr=None, devip=None, console=False, device=None, ssh_speedups=False, silent_mode=False, sshtype='ssh', relay=None): + self.ipaddr = ipaddr + self.name = name + self.my_futures = [] + self.device = device + self.devip = devip + self.sshtype = sshtype.lower() + if self.sshtype.lower() == 'ssh' : + self.ssh_speedups = ssh_speedups + self.controlmasters = '/tmp/controlmasters_{}'.format(self.ipaddr) + else : + self.ssh_speedups = False + self.controlmasters = None + self.ssh_console_session = None + + if relay : + self.relay = relay + self.ssh = ['/usr/bin/ssh', 'root@{}'.format(relay)] + else : + self.ssh = [] + if self.sshtype.lower() == 'ush' : + self.ssh.extend(['/usr/local/bin/ush']) + elif self.sshtype.lower() == 'ssh' : + if not self.ssh : + logging.debug("node add /usr/bin/ssh") + self.ssh.extend(['/usr/bin/ssh']) + logging.debug("ssh={} ".format(self.ssh)) + else : + raise ValueError("ssh type invalid") + + ssh_node.instances.add(self) + + def rexec(self, cmd='pwd', IO_TIMEOUT=DEFAULT_IO_TIMEOUT, CMD_TIMEOUT=DEFAULT_CMD_TIMEOUT, CONNECT_TIMEOUT=DEFAULT_CONNECT_TIMEOUT, run_now=True, repeat = None) : + io_timer = IO_TIMEOUT + cmd_timer = CMD_TIMEOUT + connect_timer = CONNECT_TIMEOUT + this_session = ssh_session(name=self.name, hostname=self.ipaddr, CONNECT_TIMEOUT=connect_timer, node=self, ssh_speedups=True) + this_future = asyncio.ensure_future(this_session.post_cmd(cmd=cmd, IO_TIMEOUT=io_timer, CMD_TIMEOUT=cmd_timer, repeat = repeat), loop=ssh_node.loop) + if run_now: + ssh_node.loop.run_until_complete(asyncio.wait([this_future], timeout=CMD_TIMEOUT)) + else: + ssh_node.rexec_tasks.append(this_future) + self.my_futures.append(this_future) + return this_session + + async def clean(self) : + childprocess = await asyncio.create_subprocess_exec('/usr/bin/ssh', 'root@{}'.format(self.ipaddr), 'pkill', 'dmesg', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = await childprocess.communicate() + if stdout : + logging.info('{}'.format(stdout)) + if stderr : + logging.info('{}'.format(stderr)) + + def close_console(self) : + if self.ssh_console_session: + self.ssh_console_session.close() + + async def repeat(self, interval, func, *args, **kwargs): + """ + Run func every interval seconds. + If func has not finished before *interval*, will run again + immediately when the previous iteration finished. + *args and **kwargs are passed as the arguments to func. + """ + logging.debug("repeat args={} kwargs={}".format(args, kwargs)) + + while ssh_node.periodic_cmd_running_event.is_set() : + await asyncio.gather( + func(*args, **kwargs), + asyncio.sleep(interval), + ) + if interval == 0 : + break + + logging.debug("Closing log_fh={}".format(kwargs['log_fh'])) + kwargs['log_fh'].flush() + kwargs['log_fh'].close() + ssh_node.periodic_cmd_done_event.set() + + def periodic_cmd_enable(self, cmd='ls', time_period=None, cmd_log_file=None) : + log_file_handle = open(cmd_log_file, 'w', errors='ignore') + + if ssh_node.loop : + future = asyncio.ensure_future(self.repeat(time_period, self.run_cmd, cmd=cmd, log_fh=log_file_handle), loop=ssh_node.loop) + ssh_node.periodic_cmd_futures.append(future) + ssh_node.periodic_cmd_running_event.set() + ssh_node.periodic_cmd_done_event.clear() + else : + raise + + async def run_cmd(self, *args, **kwargs) : + log_file_handle = kwargs['log_fh'] + cmd = kwargs['cmd'] + + msg = "********************** Periodic Command '{}' Begins **********************".format(cmd) + logging.info(msg) + t = '%s' % datetime.now() + t = t[:-3] + " " + str(msg) + log_file_handle.write(t + '\n') + logging.debug("ssh={} ipaddr={} cmd={} ".format(self.ssh, self.ipaddr, cmd)) + this_cmd = [] + ush_flag = False + + for item in self.ssh: + if 'ush' in item : + ush_flag = True + + if ush_flag : + this_cmd.extend([*self.ssh, self.ipaddr, cmd]) + else: + this_cmd.extend([*self.ssh, 'root@{}'.format(self.ipaddr), cmd]) + logging.info("run cmd = {}".format(this_cmd)) + + childprocess = await asyncio.create_subprocess_exec(*this_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.info("subprocess for periodic cmd = {}".format(this_cmd)) + stdout, stderr = await childprocess.communicate() + if stderr: + msg = 'Command {} failed with {}'.format(cmd, stderr) + logging.error(msg) + t = '%s' % datetime.now() + t = t[:-3] + " " + str(msg) + log_file_handle.write(t + '\n') + log_file_handle.flush() + if stdout: + stdout = stdout.decode("utf-8") + log_file_handle.write(stdout) + msg = "********************** Periodic Command Ends **********************" + logging.info(msg) + t = '%s' % datetime.now() + t = t[:-3] + " " + str(msg) + log_file_handle.write(t + '\n') + log_file_handle.flush() + +# Multiplexed sessions need a control master to connect to. The run time parameters -M and -S also correspond +# to ControlMaster and ControlPath, respectively. So first an initial master connection is established using +# -M when accompanied by the path to the control socket using -S. +# +# ssh -M -S /home/fred/.ssh/controlmasters/fred@server.example.org:22 server.example.org +# Then subsequent multiplexed connections are made in other terminals. They use ControlPath or -S to point to the control socket. +# ssh -O check -S ~/.ssh/controlmasters/%r@%h:%p server.example.org +# ssh -S /home/fred/.ssh/controlmasters/fred@server.example.org:22 server.example.org +class ssh_session: + sessionid = 1; + class SSHReaderProtocol(asyncio.SubprocessProtocol): + def __init__(self, session, silent_mode): + self._exited = False + self._closed_stdout = False + self._closed_stderr = False + self._mypid = None + self._stdoutbuffer = "" + self._stderrbuffer = "" + self.debug = False + self._session = session + self._silent_mode = silent_mode + if self._session.CONNECT_TIMEOUT is not None : + self.watchdog = ssh_node.loop.call_later(self._session.CONNECT_TIMEOUT, self.wd_timer) + self._session.closed.clear() + self.timeout_occurred = asyncio.Event() + self.timeout_occurred.clear() + + @property + def finished(self): + return self._exited and self._closed_stdout and self._closed_stderr + + def signal_exit(self): + if not self.finished: + return + self._session.closed.set() + + def connection_made(self, transport): + if self._session.CONNECT_TIMEOUT is not None : + self.watchdog.cancel() + self._mypid = transport.get_pid() + self._transport = transport + self._session.sshpipe = self._transport.get_extra_info('subprocess') + self._session.adapter.debug('{} ssh node connection made pid=({})'.format(self._session.name, self._mypid)) + self._session.connected.set() + if self._session.IO_TIMEOUT is not None : + self.iowatchdog = ssh_node.loop.call_later(self._session.IO_TIMEOUT, self.io_timer) + if self._session.CMD_TIMEOUT is not None : + self.watchdog = ssh_node.loop.call_later(self._session.CMD_TIMEOUT, self.wd_timer) + + def connection_lost(self, exc): + self._session.adapter.debug('{} node connection lost pid=({})'.format(self._session.name, self._mypid)) + self._session.connected.clear() + + def pipe_data_received(self, fd, data): + if self._session.IO_TIMEOUT is not None : + self.iowatchdog.cancel() + if self.debug : + logging.debug('{} {}'.format(fd, data)) + self._session.results.extend(data) + data = data.decode("utf-8") + if fd == 1: + self._stdoutbuffer += data + while "\n" in self._stdoutbuffer: + line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1) + if not self._silent_mode : + self._session.adapter.info('{}'.format(line.replace("\r",""))) + + elif fd == 2: + self._stderrbuffer += data + while "\n" in self._stderrbuffer: + line, self._stderrbuffer = self._stderrbuffer.split("\n", 1) + self._session.adapter.warning('{} {}'.format(self._session.name, line.replace("\r",""))) + + if self._session.IO_TIMEOUT is not None : + self.iowatchdog = ssh_node.loop.call_later(self._session.IO_TIMEOUT, self.io_timer) + + def pipe_connection_lost(self, fd, exc): + if self._session.IO_TIMEOUT is not None : + self.iowatchdog.cancel() + if fd == 1: + self._session.adapter.debug('{} stdout pipe closed (exception={})'.format(self._session.name, exc)) + self._closed_stdout = True + elif fd == 2: + self._session.adapter.debug('{} stderr pipe closed (exception={})'.format(self._session.name, exc)) + self._closed_stderr = True + self.signal_exit() + + def process_exited(self): + if self._session.CMD_TIMEOUT is not None : + self.watchdog.cancel() + logging.debug('{} subprocess with pid={} closed'.format(self._session.name, self._mypid)) + self._exited = True + self._mypid = None + self.signal_exit() + + def wd_timer(self, type=None): + logging.error("{}: timeout: pid={}".format(self._session.name, self._mypid)) + self.timeout_occurred.set() + if self._session.sshpipe : + self._session.sshpipe.terminate() + + def io_timer(self, type=None): + logging.error("{} IO timeout: cmd='{}' host(pid)={}({})".format(self._session.name, self._session.cmd, self._session.hostname, self._mypid)) + self.timeout_occurred.set() + self._session.sshpipe.terminate() + + class CustomAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['connid'], msg), kwargs + + def __init__(self, user='root', name=None, hostname='localhost', CONNECT_TIMEOUT=None, control_master=False, node=None, silent_mode=False, ssh_speedups=True): + self.hostname = hostname + self.name = name + self.user = user + self.opened = asyncio.Event() + self.closed = asyncio.Event() + self.connected = asyncio.Event() + self.closed.set() + self.opened.clear() + self.connected.clear() + self.results = bytearray() + self.sshpipe = None + self.node = node + self.CONNECT_TIMEOUT = CONNECT_TIMEOUT + self.IO_TIMEOUT = None + self.CMD_TIMEOUT = None + self.control_master = control_master + self.ssh = node.ssh.copy() + self.silent_mode = silent_mode + self.ssh_speedups = ssh_speedups + logger = logging.getLogger(__name__) + if control_master : + conn_id = self.name + '(console)' + else : + conn_id = '{}({})'.format(self.name, ssh_session.sessionid) + ssh_session.sessionid += 1 + + self.adapter = self.CustomAdapter(logger, {'connid': conn_id}) + + def __getattr__(self, attr) : + if self.node : + return getattr(self.node, attr) + + @property + def is_established(self): + return self._exited and self._closed_stdout and self._closed_stderr + + async def close(self) : + if self.control_master : + logging.info('control master close called {}'.format(self.controlmasters)) + childprocess = await asyncio.create_subprocess_exec('/usr/bin/ssh', 'root@{}'.format(self.ipaddr), 'pkill', 'dmesg', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = await childprocess.communicate() + if stdout : + logging.info('{}'.format(stdout)) + if stderr : + logging.info('{}'.format(stderr)) + self.sshpipe.terminate() + await self.closed.wait() + logging.info('control master exit called {}'.format(self.controlmasters)) + childprocess = await asyncio.create_subprocess_exec(self.ssh, '-o ControlPath={}'.format(self.controlmasters), '-O exit dummy-arg-why-needed', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = await childprocess.communicate() + if stdout : + logging.info('{}'.format(stdout)) + if stderr : + logging.info('{}'.format(stderr)) + + elif self.sshpipe : + self.sshpipe.terminate() + await self.closed.wait() + + async def post_cmd(self, cmd=None, IO_TIMEOUT=None, CMD_TIMEOUT=None, ssh_speedups=True, repeat=None) : + logging.debug("{} Post command {}".format(self.name, cmd)) + self.opened.clear() + self.cmd = cmd + self.IO_TIMEOUT = IO_TIMEOUT + self.CMD_TIMEOUT = CMD_TIMEOUT + self.repeatcmd = None + sshcmd = self.ssh.copy() + if self.control_master : + try: + os.remove(str(self.controlmasters)) + except OSError: + pass + sshcmd.extend(['-o ControlMaster=yes', '-o ControlPath={}'.format(self.controlmasters), '-o ControlPersist=1']) + elif self.node.sshtype == 'ssh' : + sshcmd.append('-o ControlPath={}'.format(self.controlmasters)) + if self.node.ssh_speedups : + sshcmd.extend(['{}@{}'.format(self.user, self.hostname), cmd]) + else : + sshcmd.extend(['{}'.format(self.hostname), cmd]) + s = " " + logging.info('{} {}'.format(self.name, s.join(sshcmd))) + while True : + # self in the ReaderProtocol() is this ssh_session instance + self._transport, self._protocol = await ssh_node.loop.subprocess_exec(lambda: self.SSHReaderProtocol(self, self.silent_mode), *sshcmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=None) + # self.sshpipe = self._transport.get_extra_info('subprocess') + # Establish the remote command + await self.connected.wait() + logging.debug("post_cmd connected") + # u = '{}\n'.format(cmd) + # self.sshpipe.stdin.write(u.encode()) + # Wait for the command to complete + if not self.control_master : + await self.closed.wait() + if not repeat : + break + return self.results |