summaryrefslogtreecommitdiffstats
path: root/flows
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:25:50 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-30 03:25:50 +0000
commit11ea4fcf515dbc4f75be538b784635085dc10db2 (patch)
treeaf05377dc5f1495935a0aa3b43258c20cb8fb5b9 /flows
parentReleasing progress-linux version 2.1.9+dfsg-1~progress7.99u1. (diff)
downloadiperf-11ea4fcf515dbc4f75be538b784635085dc10db2.tar.xz
iperf-11ea4fcf515dbc4f75be538b784635085dc10db2.zip
Merging upstream version 2.2.0+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r--flows/Makefile.am1
-rw-r--r--flows/Makefile.in430
-rw-r--r--flows/Readme.txt36
-rw-r--r--flows/flows.py1300
-rwxr-xr-xflows/ssh_nodes.py499
5 files changed, 2266 insertions, 0 deletions
diff --git a/flows/Makefile.am b/flows/Makefile.am
new file mode 100644
index 0000000..053782b
--- /dev/null
+++ b/flows/Makefile.am
@@ -0,0 +1 @@
+EXTRA_DIST = flows.py ssh_nodes.py Readme.txt
diff --git a/flows/Makefile.in b/flows/Makefile.in
new file mode 100644
index 0000000..1324a26
--- /dev/null
+++ b/flows/Makefile.in
@@ -0,0 +1,430 @@
+# Makefile.in generated by automake 1.16.5 from Makefile.am.
+# @configure_input@
+
+# Copyright (C) 1994-2021 Free Software Foundation, Inc.
+
+# This Makefile.in is free software; the Free Software Foundation
+# gives unlimited permission to copy and/or distribute it,
+# with or without modifications, as long as this notice is preserved.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
+# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+
+@SET_MAKE@
+VPATH = @srcdir@
+am__is_gnu_make = { \
+ if test -z '$(MAKELEVEL)'; then \
+ false; \
+ elif test -n '$(MAKE_HOST)'; then \
+ true; \
+ elif test -n '$(MAKE_VERSION)' && test -n '$(CURDIR)'; then \
+ true; \
+ else \
+ false; \
+ fi; \
+}
+am__make_running_with_option = \
+ case $${target_option-} in \
+ ?) ;; \
+ *) echo "am__make_running_with_option: internal error: invalid" \
+ "target option '$${target_option-}' specified" >&2; \
+ exit 1;; \
+ esac; \
+ has_opt=no; \
+ sane_makeflags=$$MAKEFLAGS; \
+ if $(am__is_gnu_make); then \
+ sane_makeflags=$$MFLAGS; \
+ else \
+ case $$MAKEFLAGS in \
+ *\\[\ \ ]*) \
+ bs=\\; \
+ sane_makeflags=`printf '%s\n' "$$MAKEFLAGS" \
+ | sed "s/$$bs$$bs[$$bs $$bs ]*//g"`;; \
+ esac; \
+ fi; \
+ skip_next=no; \
+ strip_trailopt () \
+ { \
+ flg=`printf '%s\n' "$$flg" | sed "s/$$1.*$$//"`; \
+ }; \
+ for flg in $$sane_makeflags; do \
+ test $$skip_next = yes && { skip_next=no; continue; }; \
+ case $$flg in \
+ *=*|--*) continue;; \
+ -*I) strip_trailopt 'I'; skip_next=yes;; \
+ -*I?*) strip_trailopt 'I';; \
+ -*O) strip_trailopt 'O'; skip_next=yes;; \
+ -*O?*) strip_trailopt 'O';; \
+ -*l) strip_trailopt 'l'; skip_next=yes;; \
+ -*l?*) strip_trailopt 'l';; \
+ -[dEDm]) skip_next=yes;; \
+ -[JT]) skip_next=yes;; \
+ esac; \
+ case $$flg in \
+ *$$target_option*) has_opt=yes; break;; \
+ esac; \
+ done; \
+ test $$has_opt = yes
+am__make_dryrun = (target_option=n; $(am__make_running_with_option))
+am__make_keepgoing = (target_option=k; $(am__make_running_with_option))
+pkgdatadir = $(datadir)/@PACKAGE@
+pkgincludedir = $(includedir)/@PACKAGE@
+pkglibdir = $(libdir)/@PACKAGE@
+pkglibexecdir = $(libexecdir)/@PACKAGE@
+am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
+install_sh_DATA = $(install_sh) -c -m 644
+install_sh_PROGRAM = $(install_sh) -c
+install_sh_SCRIPT = $(install_sh) -c
+INSTALL_HEADER = $(INSTALL_DATA)
+transform = $(program_transform_name)
+NORMAL_INSTALL = :
+PRE_INSTALL = :
+POST_INSTALL = :
+NORMAL_UNINSTALL = :
+PRE_UNINSTALL = :
+POST_UNINSTALL = :
+build_triplet = @build@
+host_triplet = @host@
+subdir = flows
+ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
+am__aclocal_m4_deps = $(top_srcdir)/m4/ax_create_stdint_h.m4 \
+ $(top_srcdir)/m4/dast.m4 $(top_srcdir)/m4/ax_pthread.m4 \
+ $(top_srcdir)/configure.ac
+am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
+ $(ACLOCAL_M4)
+DIST_COMMON = $(srcdir)/Makefile.am $(am__DIST_COMMON)
+mkinstalldirs = $(install_sh) -d
+CONFIG_HEADER = $(top_builddir)/config.h
+CONFIG_CLEAN_FILES =
+CONFIG_CLEAN_VPATH_FILES =
+AM_V_P = $(am__v_P_@AM_V@)
+am__v_P_ = $(am__v_P_@AM_DEFAULT_V@)
+am__v_P_0 = false
+am__v_P_1 = :
+AM_V_GEN = $(am__v_GEN_@AM_V@)
+am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@)
+am__v_GEN_0 = @echo " GEN " $@;
+am__v_GEN_1 =
+AM_V_at = $(am__v_at_@AM_V@)
+am__v_at_ = $(am__v_at_@AM_DEFAULT_V@)
+am__v_at_0 = @
+am__v_at_1 =
+SOURCES =
+DIST_SOURCES =
+am__can_run_installinfo = \
+ case $$AM_UPDATE_INFO_DIR in \
+ n|no|NO) false;; \
+ *) (install-info --version) >/dev/null 2>&1;; \
+ esac
+am__tagged_files = $(HEADERS) $(SOURCES) $(TAGS_FILES) $(LISP)
+am__DIST_COMMON = $(srcdir)/Makefile.in
+DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
+ACLOCAL = @ACLOCAL@
+AMTAR = @AMTAR@
+AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@
+AUTOCONF = @AUTOCONF@
+AUTOHEADER = @AUTOHEADER@
+AUTOMAKE = @AUTOMAKE@
+AWK = @AWK@
+CC = @CC@
+CCDEPMODE = @CCDEPMODE@
+CFLAGS = @CFLAGS@
+CPP = @CPP@
+CPPFLAGS = @CPPFLAGS@
+CSCOPE = @CSCOPE@
+CTAGS = @CTAGS@
+CXX = @CXX@
+CXXDEPMODE = @CXXDEPMODE@
+CXXFLAGS = @CXXFLAGS@
+CYGPATH_W = @CYGPATH_W@
+DEFS = @DEFS@
+DEPDIR = @DEPDIR@
+ECHO_C = @ECHO_C@
+ECHO_N = @ECHO_N@
+ECHO_T = @ECHO_T@
+EGREP = @EGREP@
+ETAGS = @ETAGS@
+EXEEXT = @EXEEXT@
+GREP = @GREP@
+INSTALL = @INSTALL@
+INSTALL_DATA = @INSTALL_DATA@
+INSTALL_PROGRAM = @INSTALL_PROGRAM@
+INSTALL_SCRIPT = @INSTALL_SCRIPT@
+INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
+LDFLAGS = @LDFLAGS@
+LIBOBJS = @LIBOBJS@
+LIBS = @LIBS@
+LTLIBOBJS = @LTLIBOBJS@
+MAINT = @MAINT@
+MAKEINFO = @MAKEINFO@
+MKDIR_P = @MKDIR_P@
+OBJEXT = @OBJEXT@
+PACKAGE = @PACKAGE@
+PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
+PACKAGE_NAME = @PACKAGE_NAME@
+PACKAGE_STRING = @PACKAGE_STRING@
+PACKAGE_TARNAME = @PACKAGE_TARNAME@
+PACKAGE_URL = @PACKAGE_URL@
+PACKAGE_VERSION = @PACKAGE_VERSION@
+PATH_SEPARATOR = @PATH_SEPARATOR@
+PTHREAD_CC = @PTHREAD_CC@
+PTHREAD_CFLAGS = @PTHREAD_CFLAGS@
+PTHREAD_LIBS = @PTHREAD_LIBS@
+RANLIB = @RANLIB@
+SED = @SED@
+SET_MAKE = @SET_MAKE@
+SHELL = @SHELL@
+STRIP = @STRIP@
+STRIP_BEGIN = @STRIP_BEGIN@
+STRIP_DUMMY = @STRIP_DUMMY@
+STRIP_END = @STRIP_END@
+VERSION = @VERSION@
+WEB100_CFLAGS = @WEB100_CFLAGS@
+WEB100_CONFIG = @WEB100_CONFIG@
+WEB100_LIBS = @WEB100_LIBS@
+abs_builddir = @abs_builddir@
+abs_srcdir = @abs_srcdir@
+abs_top_builddir = @abs_top_builddir@
+abs_top_srcdir = @abs_top_srcdir@
+ac_ct_CC = @ac_ct_CC@
+ac_ct_CXX = @ac_ct_CXX@
+am__include = @am__include@
+am__leading_dot = @am__leading_dot@
+am__quote = @am__quote@
+am__tar = @am__tar@
+am__untar = @am__untar@
+ax_pthread_config = @ax_pthread_config@
+bindir = @bindir@
+build = @build@
+build_alias = @build_alias@
+build_cpu = @build_cpu@
+build_os = @build_os@
+build_vendor = @build_vendor@
+builddir = @builddir@
+datadir = @datadir@
+datarootdir = @datarootdir@
+docdir = @docdir@
+dvidir = @dvidir@
+exec_prefix = @exec_prefix@
+host = @host@
+host_alias = @host_alias@
+host_cpu = @host_cpu@
+host_os = @host_os@
+host_vendor = @host_vendor@
+htmldir = @htmldir@
+includedir = @includedir@
+infodir = @infodir@
+install_sh = @install_sh@
+libdir = @libdir@
+libexecdir = @libexecdir@
+localedir = @localedir@
+localstatedir = @localstatedir@
+mandir = @mandir@
+mkdir_p = @mkdir_p@
+oldincludedir = @oldincludedir@
+pdfdir = @pdfdir@
+prefix = @prefix@
+program_transform_name = @program_transform_name@
+psdir = @psdir@
+runstatedir = @runstatedir@
+sbindir = @sbindir@
+sharedstatedir = @sharedstatedir@
+srcdir = @srcdir@
+sysconfdir = @sysconfdir@
+target_alias = @target_alias@
+top_build_prefix = @top_build_prefix@
+top_builddir = @top_builddir@
+top_srcdir = @top_srcdir@
+EXTRA_DIST = flows.py ssh_nodes.py Readme.txt
+all: all-am
+
+.SUFFIXES:
+$(srcdir)/Makefile.in: @MAINTAINER_MODE_TRUE@ $(srcdir)/Makefile.am $(am__configure_deps)
+ @for dep in $?; do \
+ case '$(am__configure_deps)' in \
+ *$$dep*) \
+ ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \
+ && { if test -f $@; then exit 0; else break; fi; }; \
+ exit 1;; \
+ esac; \
+ done; \
+ echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu flows/Makefile'; \
+ $(am__cd) $(top_srcdir) && \
+ $(AUTOMAKE) --gnu flows/Makefile
+Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
+ @case '$?' in \
+ *config.status*) \
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
+ *) \
+ echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles)'; \
+ cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__maybe_remake_depfiles);; \
+ esac;
+
+$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+
+$(top_srcdir)/configure: @MAINTAINER_MODE_TRUE@ $(am__configure_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(ACLOCAL_M4): @MAINTAINER_MODE_TRUE@ $(am__aclocal_m4_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(am__aclocal_m4_deps):
+tags TAGS:
+
+ctags CTAGS:
+
+cscope cscopelist:
+
+distdir: $(BUILT_SOURCES)
+ $(MAKE) $(AM_MAKEFLAGS) distdir-am
+
+distdir-am: $(DISTFILES)
+ @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
+ topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
+ list='$(DISTFILES)'; \
+ dist_files=`for file in $$list; do echo $$file; done | \
+ sed -e "s|^$$srcdirstrip/||;t" \
+ -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \
+ case $$dist_files in \
+ */*) $(MKDIR_P) `echo "$$dist_files" | \
+ sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \
+ sort -u` ;; \
+ esac; \
+ for file in $$dist_files; do \
+ if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
+ if test -d $$d/$$file; then \
+ dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \
+ if test -d "$(distdir)/$$file"; then \
+ find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
+ fi; \
+ if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
+ cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \
+ find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
+ fi; \
+ cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \
+ else \
+ test -f "$(distdir)/$$file" \
+ || cp -p $$d/$$file "$(distdir)/$$file" \
+ || exit 1; \
+ fi; \
+ done
+check-am: all-am
+check: check-am
+all-am: Makefile
+installdirs:
+install: install-am
+install-exec: install-exec-am
+install-data: install-data-am
+uninstall: uninstall-am
+
+install-am: all-am
+ @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
+
+installcheck: installcheck-am
+install-strip:
+ if test -z '$(STRIP)'; then \
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ install; \
+ else \
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \
+ fi
+mostlyclean-generic:
+
+clean-generic:
+
+distclean-generic:
+ -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
+ -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES)
+
+maintainer-clean-generic:
+ @echo "This command is intended for maintainers to use"
+ @echo "it deletes files that may require special tools to rebuild."
+clean: clean-am
+
+clean-am: clean-generic mostlyclean-am
+
+distclean: distclean-am
+ -rm -f Makefile
+distclean-am: clean-am distclean-generic
+
+dvi: dvi-am
+
+dvi-am:
+
+html: html-am
+
+html-am:
+
+info: info-am
+
+info-am:
+
+install-data-am:
+
+install-dvi: install-dvi-am
+
+install-dvi-am:
+
+install-exec-am:
+
+install-html: install-html-am
+
+install-html-am:
+
+install-info: install-info-am
+
+install-info-am:
+
+install-man:
+
+install-pdf: install-pdf-am
+
+install-pdf-am:
+
+install-ps: install-ps-am
+
+install-ps-am:
+
+installcheck-am:
+
+maintainer-clean: maintainer-clean-am
+ -rm -f Makefile
+maintainer-clean-am: distclean-am maintainer-clean-generic
+
+mostlyclean: mostlyclean-am
+
+mostlyclean-am: mostlyclean-generic
+
+pdf: pdf-am
+
+pdf-am:
+
+ps: ps-am
+
+ps-am:
+
+uninstall-am:
+
+.MAKE: install-am install-strip
+
+.PHONY: all all-am check check-am clean clean-generic cscopelist-am \
+ ctags-am distclean distclean-generic distdir dvi dvi-am html \
+ html-am info info-am install install-am install-data \
+ install-data-am install-dvi install-dvi-am install-exec \
+ install-exec-am install-html install-html-am install-info \
+ install-info-am install-man install-pdf install-pdf-am \
+ install-ps install-ps-am install-strip installcheck \
+ installcheck-am installdirs maintainer-clean \
+ maintainer-clean-generic mostlyclean mostlyclean-generic pdf \
+ pdf-am ps ps-am tags-am uninstall uninstall-am
+
+.PRECIOUS: Makefile
+
+
+# Tell versions [3.59,3.63) of GNU make to not export all variables.
+# Otherwise a system limit (for SysV at least) may be exceeded.
+.NOEXPORT:
diff --git a/flows/Readme.txt b/flows/Readme.txt
new file mode 100644
index 0000000..3ff19d0
--- /dev/null
+++ b/flows/Readme.txt
@@ -0,0 +1,36 @@
+The following steps needs to be done before running pyflows test:
+
+1) Install Python 3.10 and above
+
+2) sudo dnf install python3-matplotlib
+
+3) install gnuplot 'sudo dnf install gnuplot'
+
+4) Update your ".bashrc" and add the location of your flows files "/your_local_dir/iperf2-code/flows" to PYTHONPATH:
+export PYTHONPATH=/your_local_dir/iperf2-code/flows:$PYTHONPATH
+echo $PYTHONPATH
+
+5) compile the following modules:
+cd /your_local_dir/iperf2-code/flows
+python3 -m py_compile flows.py ssh_nodes.py
+
+6) Configure the IP addresses and LAN addresses in the router_latency.py
+
+7) Make sure passwordless ssh is configured for all the ssh DUTs, e.g.
+
+[bm932125@rjm-wifibt-ctrl:/ltf-local/Code/LTF/pyflows/scripts] $ ssh-copy-id -i ~/.ssh/id_rsa.pub root@10.19.85.40
+/usr/bin/ssh-copy-id: INFO: Source of key(s) to be installed: "/home/bm932125/.ssh/id_rsa.pub"
+/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
+/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
+root@10.19.85.40's password:
+
+Number of key(s) added: 1
+
+Now try logging into the machine, with: "ssh 'root@10.19.85.40'"
+and check to make sure that only the key(s) you wanted were added.
+
+8) Make sure all the wireless devices are loaded and connected to the SSID
+
+9) Run the test:
+cd /your_local_dir/iperf2-code/flows
+python3 router_latency.py
diff --git a/flows/flows.py b/flows/flows.py
new file mode 100644
index 0000000..5465b6b
--- /dev/null
+++ b/flows/flows.py
@@ -0,0 +1,1300 @@
+# ----------------------------------------------------------------
+# * Copyright (c) 2018-2023
+# * Broadcom Corporation
+# * All Rights Reserved.
+# *---------------------------------------------------------------
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# Redistributions of source code must retain the above copyright notice, this list of conditions
+# and the following disclaimer. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the documentation and/or other
+# materials provided with the distribution. Neither the name of the Broadcom nor the names of
+# contributors may be used to endorse or promote products derived from this software without
+# specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
+# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USEn,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
+# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Author Robert J. McMahon, Broadcom LTD
+# Date April 2016 - December 2023
+
+import re
+import subprocess
+import logging
+import asyncio, sys
+import time, datetime
+import locale
+import signal
+import weakref
+import os
+import getpass
+import math
+import scipy
+import scipy.spatial
+import numpy as np
+import tkinter
+import ctypes
+import ipaddress
+import collections
+import csv
+
+from datetime import datetime as datetime, timezone
+from scipy import stats
+from scipy.cluster import hierarchy
+from scipy.cluster.hierarchy import linkage
+import matplotlib.pyplot as plt
+from collections import defaultdict
+
+logger = logging.getLogger(__name__)
+
+class iperf_flow(object):
+ port = 61000
+ iperf = '/usr/bin/iperf'
+ instances = weakref.WeakSet()
+ _loop = None
+ flow_scope = ("flowstats")
+ tasks = []
+ flowid2name = defaultdict(str)
+
+ @classmethod
+ def get_instances(cls):
+ return list(iperf_flow.instances)
+
+ @classmethod
+ @property
+ def loop(cls):
+ if not cls._loop :
+ try :
+ cls._loop = asyncio.get_running_loop()
+ except :
+ if os.name == 'nt':
+ # On Windows, the ProactorEventLoop is necessary to listen on pipes
+ cls._loop = asyncio.ProactorEventLoop()
+ else:
+ cls._loop = asyncio.new_event_loop()
+ return cls._loop
+
+
+ @classmethod
+ def close_loop(cls):
+ if iperf_flow.loop.is_running():
+ iperf_flow.loop.run_until_complete(loop.shutdown_asyncgens())
+ iperf_flow.loop.close()
+
+ @classmethod
+ def sleep(cls, time=0, text=None, stoptext=None) :
+ if text :
+ logging.info('Sleep {} ({})'.format(time, text))
+ iperf_flow.loop.run_until_complete(asyncio.sleep(time))
+ if stoptext :
+ logging.info('Sleep done ({})'.format(stoptext))
+
+
+ @classmethod
+ def run(cls, time=None, amount=None, flows='all', sample_delay=None, io_timer=None, preclean=True, parallel=None, epoch_sync=False) :
+ if flows == 'all' :
+ flows = iperf_flow.get_instances()
+ if not flows:
+ logging.warn('flow run method called with no flows instantiated')
+ return
+
+ if preclean:
+ hosts = [flow.server for flow in flows]
+ hosts.extend([flow.client for flow in flows])
+ hosts=list(set(hosts))
+ tasks = [asyncio.ensure_future(iperf_flow.cleanup(user='root', host=host), loop=iperf_flow.loop) for host in hosts]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('preclean timeout')
+ raise
+
+ logging.info('flow run invoked')
+ tasks = [asyncio.ensure_future(flow.rx.start(time=time), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('flow server start timeout')
+ raise
+ iperf_flow.sleep(time=0.3, text="wait for rx up", stoptext="rx up done")
+
+ if epoch_sync :
+ dt = (datetime.now()).timestamp()
+ tsec = str(dt).split('.')
+ epoch_sync_time = int(tsec[0]) + 2
+ else :
+ epoch_sync_time = None
+
+ tasks = [asyncio.ensure_future(flow.tx.start(time=time, amount=amount, parallel=parallel, epoch_sync_time=epoch_sync_time), loop=iperf_flow.loop) for flow in flows]
+
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('flow client start timeout')
+ raise
+ if sample_delay :
+ iperf_flow.sleep(time=0.3, text="ramp up", stoptext="ramp up done")
+ if io_timer :
+ tasks = [asyncio.ensure_future(flow.is_traffic(), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('flow traffic check timeout')
+ raise
+ if time :
+ iperf_flow.sleep(time=time + 4, text="Running traffic start", stoptext="Stopping flows")
+ # Signal the remote iperf client sessions to stop them
+ tasks = [asyncio.ensure_future(flow.tx.signal_stop(), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=3))
+ except asyncio.TimeoutError:
+ logging.error('flow tx stop timeout')
+ raise
+
+ elif amount:
+ tasks = [asyncio.ensure_future(flow.transmit_completed(), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('flow tx completed timed out')
+ raise
+ logging.info('flow transmit completed')
+
+ # Now signal the remote iperf server sessions to stop them
+ tasks = [asyncio.ensure_future(flow.rx.signal_stop(), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=3))
+ except asyncio.TimeoutError:
+ logging.error('flow tx stop timeout')
+ raise
+
+ # iperf_flow.loop.close()
+ logging.info('flow run finished')
+
+ @classmethod
+ def commence(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) :
+ if flows == 'all' :
+ flows = iperf_flow.get_instances()
+ if not flows:
+ logging.warn('flow run method called with no flows instantiated')
+ return
+
+ if preclean:
+ hosts = [flow.server for flow in flows]
+ hosts.extend([flow.client for flow in flows])
+ hosts=list(set(hosts))
+ tasks = [asyncio.ensure_future(iperf_flow.cleanup(user='root', host=host), loop=iperf_flow.loop) for host in hosts]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('preclean timeout')
+ raise
+
+ logging.info('flow start invoked')
+ tasks = [asyncio.ensure_future(flow.rx.start(time=time), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('flow server start timeout')
+ raise
+ iperf_flow.sleep(time=0.3, text="wait for rx up", stoptext="rx up done")
+ tasks = [asyncio.ensure_future(flow.tx.start(time=time), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('flow client start timeout')
+ raise
+
+ @classmethod
+ def plot(cls, flows='all', title='None', directory='None') :
+ if flows == 'all' :
+ flows = iperf_flow.get_instances()
+
+ tasks = []
+ for flow in flows :
+ for this_name in flow.histogram_names :
+ path = directory + '/' + this_name
+ os.makedirs(path, exist_ok=True)
+ i = 0
+ # group by name
+ histograms = [h for h in flow.histograms if h.name == this_name]
+ for histogram in histograms :
+ if histogram.ks_index is not None :
+ histogram.output_dir = directory + '/' + this_name + '/' + this_name + str(i)
+ else :
+ histogram.output_dir = directory + '/' + this_name + '/' + this_name + str(histogram.ks_index)
+
+ logging.info('scheduling task {}'.format(histogram.output_dir))
+ tasks.append(asyncio.ensure_future(histogram.async_plot(directory=histogram.output_dir, title=title), loop=iperf_flow.loop))
+ i += 1
+ try :
+ logging.info('runnings tasks')
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=600))
+ except asyncio.TimeoutError:
+ logging.error('plot timed out')
+ raise
+
+
+ @classmethod
+ def cease(cls, flows='all') :
+
+ if flows == 'all' :
+ flows = iperf_flow.get_instances()
+
+ # Signal the remote iperf client sessions to stop them
+ tasks = [asyncio.ensure_future(flow.tx.signal_stop(), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('flow tx stop timeout')
+
+ # Now signal the remote iperf server sessions to stop them
+ tasks = [asyncio.ensure_future(flow.rx.signal_stop(), loop=iperf_flow.loop) for flow in flows]
+ try :
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10))
+ except asyncio.TimeoutError:
+ logging.error('flow rx stop timeout')
+
+ @classmethod
+ async def cleanup(cls, host=None, sshcmd='/usr/bin/ssh', user='root') :
+ if host:
+ logging.info('ssh {}@{} pkill iperf'.format(user, host))
+ childprocess = await asyncio.create_subprocess_exec(sshcmd, '{}@{}'.format(user, host), 'pkill', 'iperf', stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ stdout, _ = await childprocess.communicate()
+ if stdout:
+ logging.info('cleanup: host({}) stdout={} '.format(host, stdout))
+
+ @classmethod
+ def tos_to_txt(cls, tos) :
+ switcher = {
+ int(0x0) : "BE",
+ int(0x02) : "BK",
+ int(0xC0) : "VO",
+ int(0x80) : "VI",
+ }
+ return switcher.get(int(tos), None)
+
+ @classmethod
+ def txt_to_tos(cls, txt) :
+ switcher = {
+ "BE" : "0x0",
+ "BESTEFFORT" : "0x0",
+ "0x0" : "0x0",
+ "BK" : "0x20",
+ "BACKGROUND" : "0x20",
+ "0x20" : "0x20",
+ "VO" : "0xC0",
+ "VOICE" : "0xC0",
+ "0xC0" : "0xC0",
+ "VI" : "0x80",
+ "VIDEO" : "0x80",
+ "0x80" : "0x80",
+ }
+ return switcher.get(txt.upper(), None)
+
+ def __init__(self, name='iperf', server=None, client=None, user=None, proto='TCP', dstip='127.0.0.1', interval=1, format='b', offered_load=None, tos='BE', window='4M', src=None, srcip=None, srcport=None, dstport=None, debug=False, length=None, ipg=0.0, amount=None, trip_times=True, prefetch=None, latency=False, bb=False, working_load=False, bb_period=None, bb_hold=None, txstart_delay_sec=None, burst_size=None, burst_period=None, fullduplex=False, cca=None, tcp_tx_delay=None):
+ iperf_flow.instances.add(self)
+ self.name = name
+ self.latency = latency
+ if not dstport :
+ iperf_flow.port += 1
+ self.dstport = iperf_flow.port
+ else:
+ self.dstport = dstport
+ self.dstip = dstip
+ self.srcip = srcip
+ self.srcport = srcport
+ try :
+ self.server = server.ipaddr
+ except AttributeError:
+ self.server = server
+ try :
+ self.client = client.ipaddr
+ except AttributeError:
+ self.client = client
+
+ self.client_device = client.device
+ self.server_device = server.device
+
+ if not user :
+ self.user = getpass.getuser()
+ else :
+ self.user = user
+ self.proto = proto
+ self.tcp_tx_delay = tcp_tx_delay
+ self.tos = tos
+ if length :
+ self.length = length
+
+ if amount :
+ self.amount = amount
+ if trip_times :
+ self.trip_times = trip_times
+ if burst_period :
+ self.burst_period = burst_period
+ if burst_size :
+ self.burst_size = burst_size
+
+ if txstart_delay_sec:
+ self.txstart_delay_sec = txstart_delay_sec
+
+ if cca:
+ self.cca = cca
+
+ self.interval = round(interval,3)
+ self.format = format
+ self.offered_load = offered_load
+ if self.offered_load :
+ if len(self.offered_load.split(':')) == 2 :
+ self.isoch = True
+ self.name += '-isoch'
+ else :
+ self.isoch = False
+ self.prefetch = prefetch
+ self.ipg = ipg
+ self.debug = debug
+ self.TRAFFIC_EVENT_TIMEOUT = round(self.interval * 4, 3)
+ self.bb = bb
+ self.working_load = working_load
+ self.bb_period = bb_period
+ self.bb_hold = bb_hold
+ self.fullduplex = fullduplex
+ # use python composition for the server and client
+ # i.e. a flow has a server and a client
+ self.rx = iperf_server(name='{}->RX({})'.format(name, str(self.server)), loop=iperf_flow.loop, host=self.server, flow=self, debug=self.debug)
+ self.tx = iperf_client(name='{}->TX({})'.format(name, str(self.client)), loop=iperf_flow.loop, host=self.client, flow=self, debug=self.debug)
+ self.rx.window=window
+ self.tx.window=window
+ self.ks_critical_p = 0.01
+ self.stats_reset()
+
+ #def __del__(self) :
+ # iperf_flow.instances.remove(self)
+
+ def destroy(self) :
+ iperf_flow.instances.remove(self)
+
+ def __getattr__(self, attr) :
+ if attr in self.flowstats :
+ return self.flowstats[attr]
+
+ def stats_reset(self) :
+ # Initialize the flow stats dictionary
+ self.flowstats = {'current_rxbytes' : None , 'current_txbytes' : None , 'flowrate' : None, 'starttime' : None, 'flowid' : None, 'endtime' : None}
+ self.flowstats['txdatetime']=[]
+ self.flowstats['txbytes']=[]
+ self.flowstats['txthroughput']=[]
+ self.flowstats['writes']=[]
+ self.flowstats['errwrites']=[]
+ self.flowstats['retry']=[]
+ self.flowstats['cwnd']=[]
+ self.flowstats['rtt']=[]
+ self.flowstats['rxdatetime']=[]
+ self.flowstats['rxbytes']=[]
+ self.flowstats['rxthroughput']=[]
+ self.flowstats['reads']=[]
+ self.flowstats['histograms']=[]
+ self.flowstats['histogram_names'] = set()
+ self.flowstats['connect_time']=[]
+ self.flowstats['trip_time']=[]
+ self.flowstats['jitter']=[]
+ self.flowstats['rxlostpkts']=[]
+ self.flowstats['rxtotpkts']=[]
+ self.flowstats['meanlat']=[]
+ self.flowstats['minlat']=[]
+ self.flowstats['maxlat']=[]
+ self.flowstats['stdevlat']=[]
+ self.flowstats['rxpps']=[]
+ self.flowstats['inP']=[]
+ self.flowstats['inPvar']=[]
+ self.flowstats['rxpkts']=[]
+ self.flowstats['netPower']=[]
+
+ async def start(self):
+ self.flowstats = {'current_rxbytes' : None , 'current_txbytes' : None , 'flowrate' : None, 'flowid' : None}
+ await self.rx.start()
+ await self.tx.start()
+
+ async def is_traffic(self) :
+ if self.interval < 0.005 :
+ logging.warn('{} {}'.format(self.name, 'traffic check invoked without interval sampling'))
+ else :
+ self.rx.traffic_event.clear()
+ self.tx.traffic_event.clear()
+ logging.info('{} {}'.format(self.name, 'traffic check invoked'))
+ await self.rx.traffic_event.wait()
+ await self.tx.traffic_event.wait()
+
+ async def transmit_completed(self) :
+ logging.info('{} {}'.format(self.name, 'waiting for transmit to complete'))
+ await self.tx.txcompleted.wait()
+
+ async def stop(self):
+ self.tx.stop()
+ self.rx.stop()
+
+ def stats(self):
+ logging.info('stats')
+
+ def compute_ks_table(self, plot=True, directory='.', title=None) :
+
+ if len(self.histogram_names) < 1 :
+ tmp = "***Failed. Expected 1 histogram_names, but instead got {0}".format(len(self.histogram_names))
+ logging.info(tmp)
+ print(tmp)
+ #raise
+
+ for this_name in self.histogram_names :
+ # group by name
+ histograms = [h for h in self.histograms if h.name == this_name]
+ for index, h in enumerate(histograms) :
+ h.ks_index = index
+ tmp = "{} KS Table has {} entries".format(self.name, len(histograms))
+ logging.info(tmp)
+ print(tmp)
+
+ self.condensed_distance_matrix = ([])
+
+ tasks = []
+ for rowindex, h1 in enumerate(histograms) :
+ resultstr = rowindex * 'x'
+ maxp = None
+ minp = None
+ for h2 in histograms[rowindex:] :
+ d,p = stats.ks_2samp(h1.samples, h2.samples)
+ if h1 is not h2 :
+ self.condensed_distance_matrix = np.append(self.condensed_distance_matrix,d)
+ logging.debug('D,p={},{} cp={}'.format(str(d),str(p), str(self.ks_critical_p)))
+ if not minp or p < minp :
+ minp = p
+ if not maxp or (p != 1 and p > maxp) :
+ maxp = p
+ if p > self.ks_critical_p :
+ resultstr += '1'
+ else :
+ resultstr += '0'
+ if plot :
+ tasks.append(asyncio.ensure_future(flow_histogram.plot_two_sample_ks(h1=h1, h2=h2, flowname=self.name, title=title, directory=directory), loop=iperf_flow.loop))
+ print('KS: {0}({1:3d}):{2} minp={3} ptest={4}'.format(this_name, rowindex, resultstr, str(minp), str(self.ks_critical_p)))
+ logging.info('KS: {0}({1:3d}):{2} minp={3} ptest={4}'.format(this_name, rowindex, resultstr, str(minp), str(self.ks_critical_p)))
+ if tasks :
+ try :
+ logging.debug('running KS table plotting coroutines for {} row {}'.format(this_name,str(rowindex)))
+ iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=300))
+ except asyncio.TimeoutError:
+ logging.error('plot timed out')
+ raise
+ logging.info('{} {}(condensed distance matrix)\n{}'.format(self.name, this_name,self.condensed_distance_matrix))
+ self.linkage_matrix=linkage(self.condensed_distance_matrix, 'ward')
+ try :
+ plt.figure(figsize=(18,10))
+ dn = hierarchy.dendrogram(self.linkage_matrix)
+ plt.title("{} {}".format(self.name, this_name))
+ plt.savefig('{}/dn_{}_{}.png'.format(directory,self.name,this_name))
+ logging.info('{} {}(distance matrix)\n{}'.format(self.name, this_name,scipy.spatial.distance.squareform(self.condensed_distance_matrix)))
+ print('{} {}(distance matrix)\n{}'.format(self.name, this_name,scipy.spatial.distance.squareform(self.condensed_distance_matrix)))
+ print('{} {}(cluster linkage)\n{}'.format(self.name,this_name,self.linkage_matrix))
+ logging.info('{} {}(cluster linkage)\n{}'.format(self.name,this_name,self.linkage_matrix))
+ flattened=scipy.cluster.hierarchy.fcluster(self.linkage_matrix, 0.75*self.condensed_distance_matrix.max(), criterion='distance')
+ print('{} {} Clusters:{}'.format(self.name, this_name, flattened))
+ logging.info('{} {} Clusters:{}'.format(self.name, this_name, flattened))
+ except:
+ pass
+
+ def dump_stats(self, directory='.') :
+ logging.info("\n********************** dump_stats for flow {} **********************".format(self.name))
+
+ #logging.info('This flow Name={} id={} items_cnt={}'.format(iperf_flow.flowid2name[self.flowstats['flowid']], str(self.flowstats['flowid']), len(self.flowstats)))
+ #logging.info('All flows Name and id: {}'.format(str(iperf_flow.flowid2name)))
+ #logging.info('This flow Name={} flowstats={}'.format(self.name, str(self.flowstats)))
+
+ csvfilename = os.path.join(directory, '{}.csv'.format(self.name))
+ if not os.path.exists(directory):
+ logging.debug('Making results directory {}'.format(directory))
+ os.makedirs(directory)
+
+ logging.info("Writing stats to '{}'".format(csvfilename))
+
+ for stat_name in [stat for stat in self.flowstats.keys() if stat != 'histograms'] :
+ logging.info("{}={}".format(stat_name, str(self.flowstats[stat_name])))
+
+ with open(csvfilename, 'w', newline='') as fd :
+ keynames = self.flowstats.keys()
+ writer = csv.writer(fd)
+ writer.writerow(keynames)
+ writer.writerow([self.flowstats[keyname] for keyname in keynames])
+ writer.writerow([h.samples for h in self.flowstats['histograms']])
+
+class iperf_server(object):
+
+ class IperfServerProtocol(asyncio.SubprocessProtocol):
+ def __init__(self, server, flow):
+ self.__dict__['flow'] = flow
+ self._exited = False
+ self._closed_stdout = False
+ self._closed_stderr = False
+ self._mypid = None
+ self._server = server
+ self._stdoutbuffer = ""
+ self._stderrbuffer = ""
+
+ def __setattr__(self, attr, value):
+ if attr in iperf_flow.flow_scope:
+ self.flow.__setattr__(self.flow, attr, value)
+ else:
+ self.__dict__[attr] = value
+
+ # methods and attributes not here are handled by the flow object,
+ # aka, the flow object delegates to this object per composition
+ def __getattr__(self, attr):
+ if attr in iperf_flow.flow_scope:
+ return getattr(self.flow, attr)
+
+ @property
+ def finished(self):
+ return self._exited and self._closed_stdout and self._closed_stderr
+
+ def signal_exit(self):
+ if not self.finished:
+ return
+ self._server.closed.set()
+ self._server.opened.clear()
+
+ def connection_made(self, trans):
+ self._server.closed.clear()
+ self._mypid = trans.get_pid()
+ logging.debug('server connection made pid=({})'.format(self._mypid))
+
+ def pipe_data_received(self, fd, data):
+ if self.debug :
+ logging.debug('{} {}'.format(fd, data))
+ data = data.decode("utf-8")
+ if fd == 1:
+ self._stdoutbuffer += data
+ while "\n" in self._stdoutbuffer:
+ line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1)
+ self._server.adapter.info('{} (stdout,{})'.format(line, self._server.remotepid))
+ if not self._server.opened.is_set() :
+ m = self._server.regex_open_pid.match(line)
+ if m :
+ self._server.remotepid = m.group('pid')
+ self._server.opened.set()
+ logging.debug('{} pipe reading (stdout,{})'.format(self._server.name, self._server.remotepid))
+ else :
+ if self._server.proto == 'TCP' :
+ m = self._server.regex_traffic.match(line)
+ if m :
+ timestamp = datetime.now()
+ if not self._server.traffic_event.is_set() :
+ self._server.traffic_event.set()
+
+ bytes = float(m.group('bytes'))
+ if self.flowstats['current_txbytes'] :
+ flowrate = round((bytes / self.flowstats['current_txbytes']), 2)
+ # *consume* the current *txbytes* where the client pipe will repopulate on its next sample
+ # do this by setting the value to None
+ self.flowstats['current_txbytes'] = None
+ # logging.debug('{} flow ratio={:.2f}'.format(self._server.name, flowrate))
+ self.flowstats['flowrate'] = flowrate
+ else :
+ # *produce* the current *rxbytes* so the client pipe can know this event occurred
+ # indicate this by setting the value to value
+ self.flowstats['current_rxbytes'] = bytes
+ self.flowstats['rxdatetime'].append(timestamp)
+ self.flowstats['rxbytes'].append(m.group('bytes'))
+ self.flowstats['rxthroughput'].append(m.group('throughput'))
+ self.flowstats['reads'].append(m.group('reads'))
+ else :
+ m = self._server.regex_trip_time.match(line)
+ if m :
+ self.flowstats['trip_time'].append(float(m.group('trip_time')) * 1000)
+ else :
+ m = self._server.regex_traffic_udp.match(line)
+ if m :
+ timestamp = datetime.now()
+ if not self._server.traffic_event.is_set() :
+ self._server.traffic_event.set()
+ self.flowstats['rxbytes'].append(m.group('bytes'))
+ self.flowstats['rxthroughput'].append(m.group('throughput'))
+ self.flowstats['jitter'].append(m.group('jitter'))
+ self.flowstats['rxlostpkts'].append(m.group('lost_pkts'))
+ self.flowstats['rxtotpkts'].append(m.group('tot_pkts'))
+ self.flowstats['meanlat'].append(m.group('lat_mean'))
+ self.flowstats['minlat'].append(m.group('lat_min'))
+ self.flowstats['maxlat'].append(m.group('lat_max'))
+ self.flowstats['stdevlat'].append(m.group('lat_stdev'))
+ self.flowstats['rxpps'].append(m.group('pps'))
+ self.flowstats['inP'].append(m.group('inP'))
+ self.flowstats['inPvar'].append(m.group('inPvar'))
+ self.flowstats['rxpkts'].append(m.group('pkts'))
+ self.flowstats['netPower'].append(m.group('netPower'))
+ m = self._server.regex_final_histogram_traffic.match(line)
+ if m :
+ timestamp = datetime.now(timezone.utc).astimezone()
+ self.flowstats['endtime']= timestamp
+ self.flowstats['histogram_names'].add(m.group('pdfname'))
+ this_histogram = flow_histogram(name=m.group('pdfname'),values=m.group('pdf'), population=m.group('population'), binwidth=m.group('binwidth'), starttime=self.flowstats['starttime'], endtime=timestamp, outliers=m.group('outliers'), uci=m.group('uci'), uci_val=m.group('uci_val'), lci=m.group('lci'), lci_val=m.group('lci_val'))
+ self.flowstats['histograms'].append(this_histogram)
+ logging.info('pdf {} found with bin width={} us'.format(m.group('pdfname'), m.group('binwidth')))
+
+ elif fd == 2:
+ self._stderrbuffer += data
+ while "\n" in self._stderrbuffer:
+ line, self._stderrbuffer = self._stderrbuffer.split("\n", 1)
+ logging.info('{} {} (stderr)'.format(self._server.name, line))
+ m = self._server.regex_rx_bind_failed.match(line)
+ if m :
+ logging.error('RX Bind Failed. Check LAN / WLAN between server and client.')
+ iperf_flow.loop.stop()
+ raise
+
+ def pipe_connection_lost(self, fd, exc):
+ if fd == 1:
+ self._closed_stdout = True
+ logging.debug('stdout pipe to {} closed (exception={})'.format(self._server.name, exc))
+ elif fd == 2:
+ self._closed_stderr = True
+ logging.debug('stderr pipe to {} closed (exception={})'.format(self._server.name, exc))
+ if self._closed_stdout and self._closed_stderr :
+ self.remotepid = None;
+ self.signal_exit()
+
+ def process_exited(self):
+ logging.debug('subprocess with pid={} closed'.format(self._mypid))
+ self._exited = True
+ self._mypid = None
+ self.signal_exit()
+
+ class CustomAdapter(logging.LoggerAdapter):
+ def process(self, msg, kwargs):
+ return '[%s] %s' % (self.extra['connid'], msg), kwargs
+
+ def __init__(self, name='Server', loop=None, host='localhost', flow=None, debug=False):
+ self.__dict__['flow'] = flow
+ self.name = name
+ self.iperf = '/usr/local/bin/iperf'
+ self.ssh = '/usr/bin/ssh'
+ self.host = host
+ self.flow = flow
+ self.debug = debug
+ self.opened = asyncio.Event()
+ self.closed = asyncio.Event()
+ self.closed.set()
+ self.traffic_event = asyncio.Event()
+ self._transport = None
+ self._protocol = None
+ self.time = time
+ conn_id = '{}'.format(self.name)
+ self.adapter = self.CustomAdapter(logger, {'connid': conn_id})
+
+ # ex. [ 4] 0.00-0.50 sec 657090 Bytes 10513440 bits/sec 449 449:0:0:0:0:0:0:0
+ self.regex_traffic = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>[0-9]+) Bytes\s+(?P<throughput>[0-9]+) bits/sec\s+(?P<reads>[0-9]+)')
+ self.regex_traffic_udp = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>[0-9]+) Bytes\s+(?P<throughput>[0-9]+) bits/sec\s+(?P<jitter>[0-9.]+)\sms\s(?P<lost_pkts>[0-9]+)/(?P<tot_pkts>[0-9]+).+(?P<lat_mean>[0-9.]+)/(?P<lat_min>[0-9.]+)/(?P<lat_max>[0-9.]+)/(?P<lat_stdev>[0-9.]+)\sms\s(?P<pps>[0-9]+)\spps\s+(?P<netPower>[0-9\.]+)\/(?P<inP>[0-9]+)\((?P<inPvar>[0-9]+)\)\spkts\s(?P<pkts>[0-9]+)')
+ self.regex_final_histogram_traffic = re.compile(r'\[\s*\d+\] (?P<timestamp>.*) sec\s+(?P<pdfname>[A-Za-z0-9\-]+)\(f\)-PDF: bin\(w=(?P<binwidth>[0-9]+)us\):cnt\((?P<population>[0-9]+)\)=(?P<pdf>.+)\s+\((?P<lci>[0-9\.]+)/(?P<uci>[0-9\.]+)/(?P<uci2>[0-9\.]+)%=(?P<lci_val>[0-9]+)/(?P<uci_val>[0-9]+)/(?P<uci_val2>[0-9]+),Outliers=(?P<outliers>[0-9]+),obl/obu=[0-9]+/[0-9]+\)')
+ # 0.0000-0.5259 trip-time (3WHS done->fin+finack) = 0.5597 sec
+ self.regex_trip_time = re.compile(r'.+trip\-time\s+\(3WHS\sdone\->fin\+finack\)\s=\s(?P<trip_time>\d+\.\d+)\ssec')
+ self.regex_rx_bind_failed = re.compile(r'listener bind failed: Cannot assign requested address')
+
+ def __getattr__(self, attr):
+ return getattr(self.flow, attr)
+
+ async def start(self, time=time):
+ if not self.closed.is_set() :
+ return
+
+ # ex. Server listening on TCP port 61003 with pid 2565
+ self.regex_open_pid = re.compile(r'^Server listening on {} port {} with pid (?P<pid>\d+)'.format(self.proto, str(self.dstport)))
+
+ self.opened.clear()
+ self.remotepid = None
+ if time :
+ iperftime = time + 30
+ self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-s', '-p ' + str(self.dstport), '-P 1', '-e', '-t ' + str(iperftime), '-f{}'.format(self.format), '-w' , self.window, '--realtime']
+ else :
+ self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-s', '-p ' + str(self.dstport), '-P 1', '-e', '-f{}'.format(self.format), '-w' , self.window, '--realtime']
+ if self.interval >= 0.005 :
+ self.sshcmd.extend(['-i ', str(self.interval)])
+ if self.server_device and self.srcip :
+ self.sshcmd.extend(['-B ', '{}%{}'.format(self.dstip, self.server_device)])
+ if self.proto == 'UDP' :
+ self.sshcmd.extend(['-u'])
+ if self.latency :
+ self.sshcmd.extend(['--histograms=100u,100000,50,95'])
+ self.sshcmd.extend(['--jitter-histograms'])
+
+ logging.info('{}'.format(str(self.sshcmd)))
+ self._transport, self._protocol = await iperf_flow.loop.subprocess_exec(lambda: self.IperfServerProtocol(self, self.flow), *self.sshcmd)
+ await self.opened.wait()
+
+ async def signal_stop(self):
+ if self.remotepid and not self.finished :
+ childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-HUP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ logging.debug('({}) sending signal HUP to {} (pid={})'.format(self.user, self.host, self.remotepid))
+ stdout, _ = await childprocess.communicate()
+ if stdout:
+ logging.info('kill remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout))
+ if not self.closed.is_set() :
+ await self.closed.wait()
+ logging.info('await kill completed remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout))
+ logging.info('kill remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout))
+
+
+class iperf_client(object):
+
+ # Asyncio protocol for subprocess transport
+ class IperfClientProtocol(asyncio.SubprocessProtocol):
+ def __init__(self, client, flow):
+ self.__dict__['flow'] = flow
+ self._exited = False
+ self._closed_stdout = False
+ self._closed_stderr = False
+ self._mypid = None
+ self._client = client
+ self._stdoutbuffer = ""
+ self._stderrbuffer = ""
+
+ def __setattr__(self, attr, value):
+ if attr in iperf_flow.flow_scope:
+ self.flow.__setattr__(self.flow, attr, value)
+ else:
+ self.__dict__[attr] = value
+
+ def __getattr__(self, attr):
+ if attr in iperf_flow.flow_scope:
+ return getattr(self.flow, attr)
+
+ @property
+ def finished(self):
+ return self._exited and self._closed_stdout and self._closed_stderr
+
+ def signal_exit(self):
+ if not self.finished:
+ return
+ self._client.closed.set()
+ self._client.opened.clear()
+ self._client.txcompleted.set()
+
+ def connection_made(self, trans):
+ self._client.closed.clear()
+ self._mypid = trans.get_pid()
+ logging.debug('client connection made pid=({})'.format(self._mypid))
+
+ def pipe_data_received(self, fd, data):
+ if self.debug :
+ logging.debug('{} {}'.format(fd, data))
+ data = data.decode("utf-8")
+ if fd == 1:
+ self._stdoutbuffer += data
+ while "\n" in self._stdoutbuffer:
+ line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1)
+ self._client.adapter.info('{} (stdout,{})'.format(line, self._client.remotepid))
+ if not self._client.opened.is_set() :
+ m = self._client.regex_open_pid.match(line)
+ if m :
+ self._client.opened.set()
+ self._client.remotepid = m.group('pid')
+ self.flowstats['starttime'] = datetime.now(timezone.utc).astimezone()
+ logging.debug('{} pipe reading at {} (stdout,{})'.format(self._client.name, self.flowstats['starttime'].isoformat(), self._client.remotepid))
+ else :
+ if self.flowstats['flowid'] is None :
+ m = self._client.regex_flowid.match(line)
+ if m :
+ # [ 1] local 192.168.1.15%enp1s0 port 7001 connected with 192.168.1.232 port 7001 (trip-times) (sock=3) on 2021-10-11 14:39:45 (PDT)
+ # self.regex_flowid = re.compile(r'local\s(?P<srcip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}).*\sport\s(?P<srcport>[0-9]+)\sconnected with\s(?P<dstip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3})\sport\s(?P<dstport>[0-9]+)')
+ #
+ # temp = htonl(config->src_ip);
+ # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32));
+ # temp = htonl(config->dst_ip);
+ # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32));
+ # temp = (hton16(config->dst_port) << 16) | hton16(config->src_port);
+ # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32));
+ # temp = config->proto;
+ # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32));
+ # return "%08x" % netip
+ # NOTE: the network or big endian byte order
+ srcipaddr = ipaddress.ip_address(m.group('srcip'))
+ srcip32 = ctypes.c_uint32(int.from_bytes(srcipaddr.packed, byteorder='little', signed=False))
+ dstipaddr = ipaddress.ip_address(m.group('dstip'))
+ dstip32 = ctypes.c_uint32(int.from_bytes(dstipaddr.packed, byteorder='little', signed=False))
+ dstportbytestr = int(m.group('dstport')).to_bytes(2, byteorder='big', signed=False)
+ dstport16 = ctypes.c_uint16(int.from_bytes(dstportbytestr, byteorder='little', signed=False))
+ srcportbytestr = int(m.group('srcport')).to_bytes(2, byteorder='big', signed=False)
+ srcport16 = ctypes.c_uint16(int.from_bytes(srcportbytestr, byteorder='little', signed=False))
+ ports32 = ctypes.c_uint32((dstport16.value << 16) | srcport16.value)
+ if self._client.proto == 'UDP':
+ proto32 = ctypes.c_uint32(0x11)
+ else :
+ proto32 = ctypes.c_uint32(0x06)
+ quintuplehash = srcip32.value ^ dstip32.value ^ ports32.value ^ proto32.value
+ self.flowstats['flowid'] = '0x{:08x}'.format(quintuplehash)
+ if self._client.flow.name :
+ flowkey = self._client.flow.name
+ else :
+ flowkey = '0x{:08x}'.format(quintuplehash)
+ iperf_flow.flowid2name[self.flowstats['flowid']] = flowkey
+ logging.info('Flow quintuple hash of {} uses name {}'.format(self.flowstats['flowid'], flowkey))
+
+ if self._client.proto == 'TCP':
+ m = self._client.regex_traffic.match(line)
+ if m :
+ timestamp = datetime.now()
+ if not self._client.traffic_event.is_set() :
+ self._client.traffic_event.set()
+
+ bytes = float(m.group('bytes'))
+ if self.flowstats['current_rxbytes'] :
+ flowrate = round((self.flowstats['current_rxbytes'] / bytes), 2)
+ # *consume* the current *rxbytes* where the server pipe will repopulate on its next sample
+ # do this by setting the value to None
+ self.flowstats['current_rxbytes'] = None
+ # logging.debug('{} flow ratio={:.2f}'.format(self._client.name, flowrate))
+ self.flowstats['flowrate'] = flowrate
+ else :
+ # *produce* the current txbytes so the server pipe can know this event occurred
+ # indicate this by setting the value to value
+ self.flowstats['current_txbytes'] = bytes
+
+ self.flowstats['txdatetime'].append(timestamp)
+ self.flowstats['txbytes'].append(m.group('bytes'))
+ self.flowstats['txthroughput'].append(m.group('throughput'))
+ self.flowstats['writes'].append(m.group('writes'))
+ self.flowstats['errwrites'].append(m.group('errwrites'))
+ self.flowstats['retry'].append(m.group('retry'))
+ self.flowstats['cwnd'].append(m.group('cwnd'))
+ self.flowstats['rtt'].append(m.group('rtt'))
+ else :
+ m = self._client.regex_connect_time.match(line)
+ if m :
+ self.flowstats['connect_time'].append(float(m.group('connect_time')))
+ else :
+ pass
+
+ elif fd == 2:
+ self._stderrbuffer += data
+ while "\n" in self._stderrbuffer:
+ line, self._stderrbuffer = self._stderrbuffer.split("\n", 1)
+ logging.info('{} {} (stderr)'.format(self._client.name, line))
+ m = self._client.regex_tx_bind_failed.match(line)
+ if m :
+ logging.error('TX Bind Failed. Check LAN / WLAN between server and client.')
+ iperf_flow.loop.stop()
+ raise
+
+ def pipe_connection_lost(self, fd, exc):
+ if fd == 1:
+ logging.debug('stdout pipe to {} closed (exception={})'.format(self._client.name, exc))
+ self._closed_stdout = True
+ elif fd == 2:
+ logging.debug('stderr pipe to {} closed (exception={})'.format(self._client.name, exc))
+ self._closed_stderr = True
+ self.signal_exit()
+
+ def process_exited(self):
+ logging.debug('subprocess with pid={} closed'.format(self._mypid))
+ self._exited = True
+ self._mypid = None
+ self.signal_exit()
+
+ class CustomAdapter(logging.LoggerAdapter):
+ def process(self, msg, kwargs):
+ return '[%s] %s' % (self.extra['connid'], msg), kwargs
+
+ def __init__(self, name='Client', loop=None, host='localhost', flow = None, debug=False):
+ self.__dict__['flow'] = flow
+ self.opened = asyncio.Event()
+ self.closed = asyncio.Event()
+ self.txcompleted = asyncio.Event()
+ self.closed.set()
+ self.txcompleted.clear()
+ self.traffic_event = asyncio.Event()
+ self.name = name
+ self.iperf = '/usr/local/bin/iperf'
+ self.ssh = '/usr/bin/ssh'
+ self.host = host
+ self.debug = debug
+ self.flow = flow
+ self._transport = None
+ self._protocol = None
+ conn_id = '{}'.format(self.name)
+ self.adapter = self.CustomAdapter(logger, {'connid': conn_id})
+ # traffic ex: [ 3] 0.00-0.50 sec 655620 Bytes 10489920 bits/sec 14/211 446 446K/0 us
+ self.regex_traffic = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>\d+) Bytes\s+(?P<throughput>\d+) bits/sec\s+(?P<writes>\d+)/(?P<errwrites>\d+)\s+(?P<retry>\d+)\s+(?P<cwnd>\d+)K/(?P<rtt>\d+) us')
+ self.regex_connect_time = re.compile(r'\[\s+\d+]\slocal.*\(ct=(?P<connect_time>\d+\.\d+) ms\)')
+ # local 192.168.1.4 port 56949 connected with 192.168.1.1 port 61001
+ self.regex_flowid = re.compile(r'\[\s+\d+]\slocal\s(?P<srcip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}).*\sport\s(?P<srcport>[0-9]+)\sconnected with\s(?P<dstip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3})\sport\s(?P<dstport>[0-9]+)')
+ self.regex_tx_bind_failed = re.compile(r'bind failed: Cannot assign requested address')
+
+ def __getattr__(self, attr):
+ return getattr(self.flow, attr)
+
+ async def start(self, time=None, amount=None, parallel=None, epoch_sync_time=None):
+ if not self.closed.is_set() :
+ return
+
+ self.opened.clear()
+ self.txcompleted.clear()
+ self.remotepid = None
+ self.flowstats['flowid']=None
+
+ # Client connecting to 192.168.100.33, TCP port 61009 with pid 1903
+ self.regex_open_pid = re.compile(r'Client connecting to .*, {} port {} with pid (?P<pid>\d+)'.format(self.proto, str(self.dstport)))
+ if self.client_device :
+ client_dst = self.dstip + '%' + self.client_device
+ else :
+ client_dst = self.dstip
+ self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-c', client_dst, '-p ' + str(self.dstport), '-e', '-f{}'.format(self.format), '-w' , self.window ,'--realtime']
+ if self.tcp_tx_delay :
+ self.sshcmd.extend(['--tcp-tx-delay', self.tcp_tx_delay])
+ if self.tos :
+ self.sshcmd.extend(['-S ', self.tos])
+ if self.length :
+ self.sshcmd.extend(['-l ', str(self.length)])
+ if time:
+ self.sshcmd.extend(['-t ', str(time)])
+ elif amount:
+ iperftime = time
+ self.sshcmd.extend(['-n ', amount])
+ if parallel :
+ self.sshcmd.extend(['-P', str(parallel)])
+ if self.trip_times :
+ self.sshcmd.extend(['--trip-times'])
+ if self.prefetch :
+ self.sshcmd.extend(['--tcp-write-prefetch', self.prefetch])
+ self.sshcmd.extend(['--histograms=1m,100000,5,95'])
+
+ if self.srcip :
+ if self.srcport :
+ self.sshcmd.extend(['-B ', '{}:{}'.format(self.srcip, self.srcport)])
+ else :
+ self.sshcmd.extend(['-B {}'.format(self.srcip)])
+
+ if self.cca :
+ self.sshcmd.extend(['-Z ', self.cca])
+ if self.interval >= 0.005 :
+ self.sshcmd.extend(['-i ', str(self.interval)])
+
+ if self.proto == 'UDP' :
+ self.sshcmd.extend(['-u '])
+ if self.isoch :
+ self.sshcmd.extend(['--isochronous=' + self.offered_load, ' --ipg ', str(self.ipg)])
+ elif self.offered_load :
+ self.sshcmd.extend(['-b', self.offered_load])
+ elif self.proto == 'TCP' and self.offered_load :
+ self.sshcmd.extend(['-b', self.offered_load])
+ elif self.proto == 'TCP' and self.burst_size and self.burst_period :
+ self.sshcmd.extend(['--burst-size', str(self.burst_size)])
+ self.sshcmd.extend(['--burst-period', str(self.burst_period)])
+ elif self.proto == 'TCP' and self.bb :
+ self.sshcmd.extend(['--bounceback'])
+ self.sshcmd.extend(['--bounceback-hold', str(self.bb_hold)])
+ self.sshcmd.extend(['--bounceback-period', str(self.bb_period)])
+ elif self.proto == 'TCP' and self.offered_load :
+ self.sshcmd.extend(['-b', self.offered_load])
+ if not self.bb and self.fullduplex :
+ self.sshcmd.extend(['--full-duplex', str(" ")])
+
+ if self.flow.bb :
+ self.sshcmd.extend(['--bounceback'])
+ if self.flow.working_load :
+ self.sshcmd.extend(['--working-load'])
+
+ if epoch_sync_time :
+ self.sshcmd.extend(['--txstart-time', str(epoch_sync_time)])
+
+ elif self.txstart_delay_sec :
+ # use incoming txstart_delay_sec and convert it to epoch_time_sec to use with '--txstart-time' iperf parameter
+ logging.info('{}'.format(str(datetime.now())))
+ epoch_time_sec = (datetime.now()).timestamp()
+ logging.info('Current epoch_time_sec = {}'.format(str(epoch_time_sec)))
+ new_txstart_time = epoch_time_sec + self.txstart_delay_sec
+ logging.info('new_txstart_time = {}'.format(str(new_txstart_time)))
+ self.sshcmd.extend(['--txstart-time', str(new_txstart_time)])
+
+ logging.info('{}'.format(str(self.sshcmd)))
+ try :
+ self._transport, self._protocol = await iperf_flow.loop.subprocess_exec(lambda: self.IperfClientProtocol(self, self.flow), *self.sshcmd)
+ await self.opened.wait()
+ except:
+ logging.error('flow client start error per: {}'.format(str(self.sshcmd)))
+ pass
+
+ async def signal_stop(self):
+ if self.remotepid and not self.finished :
+ childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-HUP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ logging.debug('({}) sending signal HUP to {} (pid={})'.format(self.user, self.host, self.remotepid))
+ stdout, _ = await childprocess.communicate()
+ if stdout:
+ logging.info('{}({}) {}'.format(self.user, self.host, stdout))
+ if not self.closed.is_set():
+ await self.closed.wait()
+
+ async def signal_pause(self):
+ if self.remotepid :
+ childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-STOP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ logging.debug('({}) sending signal STOP to {} (pid={})'.format(self.user, self.host, self.remotepid))
+ stdout, _ = await childprocess.communicate()
+ if stdout:
+ logging.info('{}({}) {}'.format(self.user, self.host, stdout))
+ if not self.closed.is_set():
+ await self.closed.wait()
+
+ async def signal_resume(self):
+ if self.remotepid :
+ childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-CONT', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ logging.debug('({}) sending signal CONT to {} (pid={})'.format(self.user, self.host, self.remotepid))
+ stdout, _ = await childprocess.communicate()
+ if stdout:
+ logging.info('{}({}) {}'.format(self.user, self.host, stdout))
+ if not self.closed.is_set():
+ await self.closed.wait()
+
+class flow_histogram(object):
+
+ @classmethod
+ async def plot_two_sample_ks(cls, h1=None, h2=None, outputtype='png', directory='.', flowname=None, title=None):
+
+ lci_val = int(h2.lci_val) * h2.binwidth
+ uci_val = int(h2.uci_val) * h2.binwidth
+ mytitle = '{} {} two sample KS({},{}) ({} samples) {}/{}%={}/{} us outliers={}\\n{}'.format(flowname, h1.name, h1.ks_index, h2.ks_index, h2.population, h2.lci, h2.uci, lci_val, uci_val, h2.outliers, title)
+ if h1.basefilename is None :
+ h1.output_dir = directory + '/' + flowname + h1.name + '/' + h1.name + '_' + str(h1.ks_index)
+ await h1.write(directory=h1.output_dir)
+
+ if h2.basefilename is None :
+ h2.output_dir = directory + '/' + flowname + h2.name + '/' + h2.name + '_' + str(h2.ks_index)
+ await h2.write(directory=h2.output_dir)
+
+ if (h1.basefilename is not None) and (h2.basefilename is not None) :
+ basefilename = '{}_{}_{}'.format(h1.basefilename, h1.ks_index, h2.ks_index)
+ gpcfilename = basefilename + '.gpc'
+ #write out the gnuplot control file
+ with open(gpcfilename, 'w') as fid :
+ if outputtype == 'canvas' :
+ fid.write('set output \"{}.{}\"\n'.format(basefilename, 'html'))
+ fid.write('set terminal canvas standalone mousing size 1024,768\n')
+ if outputtype == 'svg' :
+ fid.write('set output \"{}_svg.{}\"\n'.format(basefilename, 'html'))
+ fid.write('set terminal svg size 1024,768 dynamic mouse\n')
+ else :
+ fid.write('set output \"{}.{}\"\n'.format(basefilename, 'png'))
+ fid.write('set terminal png size 1024,768\n')
+
+ fid.write('set key bottom\n')
+ fid.write('set title \"{}\" noenhanced\n'.format(mytitle))
+ if float(uci_val) < 400:
+ fid.write('set format x \"%.2f"\n')
+ else :
+ fid.write('set format x \"%.1f"\n')
+ fid.write('set format y \"%.1f"\n')
+ fid.write('set yrange [0:1.01]\n')
+ fid.write('set y2range [0:*]\n')
+ fid.write('set ytics add 0.1\n')
+ fid.write('set y2tics nomirror\n')
+ fid.write('set grid\n')
+ fid.write('set xlabel \"time (ms)\\n{} - {}\"\n'.format(h1.starttime, h2.endtime))
+ default_minx = -0.5
+ if float(uci_val) < 0.4:
+ fid.write('set xrange [{}:0.4]\n'.format(default_minx))
+ fid.write('set xtics auto\n')
+ elif h1.max < 2.0 and h2.max < 2.0 :
+ fid.write('set xrange [{}:2]\n'.format(default_minx))
+ fid.write('set xtics auto\n')
+ elif h1.max < 5.0 and h2.max < 5.0 :
+ fid.write('set xrange [{}:5]\n'.format(default_minx))
+ fid.write('set xtics auto\n')
+ elif h1.max < 10.0 and h2.max < 10.0:
+ fid.write('set xrange [{}:10]\n'.format(default_minx))
+ fid.write('set xtics add 1\n')
+ elif h1.max < 20.0 and h2.max < 20.0 :
+ fid.write('set xrange [{}:20]\n'.format(default_minx))
+ fid.write('set xtics add 1\n')
+ fid.write('set format x \"%.0f"\n')
+ elif h1.max < 40.0 and h2.max < 40.0:
+ fid.write('set xrange [{}:40]\n'.format(default_minx))
+ fid.write('set xtics add 5\n')
+ fid.write('set format x \"%.0f"\n')
+ elif h1.max < 50.0 and h2.max < 50.0:
+ fid.write('set xrange [{}:50]\n'.format(default_minx))
+ fid.write('set xtics add 5\n')
+ fid.write('set format x \"%.0f"\n')
+ elif h1.max < 75.0 and h2.max < 75.0:
+ fid.write('set xrange [{}:75]\n'.format(default_minx))
+ fid.write('set xtics add 5\n')
+ fid.write('set format x \"%.0f"\n')
+ elif h1.max < 100.0 and h2.max < 100.0 :
+ fid.write('set xrange [{}:100]\n'.format(default_minx))
+ fid.write('set xtics add 10\n')
+ fid.write('set format x \"%.0f"\n')
+ else :
+ fid.write('set xrange [{}:*]\n'.format(default_minx))
+ fid.write('set xtics auto\n')
+ fid.write('set format x \"%.0f"\n')
+ fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{1}\" using 1:2 index 0 axes x1y2 with impulses linetype 2 notitle, \"{1}\" using 1:3 index 0 axes x1y1 with lines linetype 1 linewidth 2 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(h1.datafilename, h2.datafilename))
+
+ childprocess = await asyncio.create_subprocess_exec(flow_histogram.gnuplot,gpcfilename, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ stdout, stderr = await childprocess.communicate()
+ if stderr :
+ logging.error('Exec {} {}'.format(flow_histogram.gnuplot, gpcfilename))
+ else :
+ logging.debug('Exec {} {}'.format(flow_histogram.gnuplot, gpcfilename))
+
+ gnuplot = '/usr/bin/gnuplot'
+ def __init__(self, binwidth=None, name=None, values=None, population=None, starttime=None, endtime=None, title=None, outliers=None, lci = None, uci = None, lci_val = None, uci_val = None) :
+ self.raw = values
+ self._entropy = None
+ self._ks_1samp_dist = None
+ self.bins = self.raw.split(',')
+ self.name = name
+ self.ks_index = None
+ self.population = int(population)
+ self.samples = np.zeros(int(self.population))
+ self.binwidth = int(binwidth)
+ self.createtime = datetime.now(timezone.utc).astimezone()
+ self.starttime=starttime
+ self.endtime=endtime
+ self.title=title
+ self.outliers=outliers
+ self.uci = uci
+ self.uci_val = uci_val
+ self.lci = lci
+ self.lci_val = lci_val
+ self.basefilename = None
+ ix = 0
+ for bin in self.bins :
+ x,y = bin.split(':')
+ for i in range(int(y)) :
+ self.samples[ix] = x
+ ix += 1
+
+ @property
+ def entropy(self) :
+ if not self._entropy :
+ self._entropy = 0
+ for bin in self.bins :
+ x,y = bin.split(':')
+ y1 = float(y) / float(self.population)
+ self._entropy -= y1 * math.log2(y1)
+ return self._entropy
+
+ @property
+ def ks_1samp_dist(self):
+ if not self._ks_1samp_dist :
+ self._ks_1samp_dist,p = stats.ks_1samp(self.samples, stats.norm.cdf)
+ return self._ks_1samp_dist
+
+ @property
+ def ampdu_dump(self) :
+ return self._ampdu_rawdump
+
+ @ampdu_dump.setter
+ def ampdu_dump(self, value):
+ self._ampdu_rawdump = value
+
+ async def __exec_gnuplot(self) :
+ logging.info('Plotting {} {}'.format(self.name, self.gpcfilename))
+ childprocess = await asyncio.create_subprocess_exec(flow_histogram.gnuplot, self.gpcfilename, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ stdout, stderr = await childprocess.communicate()
+ if stderr :
+ logging.error('Exec {} {}'.format(flow_histogram.gnuplot, self.gpcfilename))
+ else :
+ logging.debug('Exec {} {}'.format(flow_histogram.gnuplot, self.gpcfilename))
+
+ async def write(self, directory='.', filename=None) :
+ # write out the datafiles for the plotting tool, e.g. gnuplot
+ if filename is None:
+ filename = self.name
+
+ if not os.path.exists(directory):
+ logging.debug('Making results directory {}'.format(directory))
+ os.makedirs(directory)
+
+ logging.debug('Writing {} results to directory {}'.format(directory, filename))
+ basefilename = os.path.join(directory, filename)
+ datafilename = os.path.join(directory, filename + '.data')
+ self.max = None
+ with open(datafilename, 'w') as fid :
+ cummulative = 0.0
+ for bin in self.bins :
+ x,y = bin.split(':')
+ #logging.debug('bin={} x={} y={}'.format(bin, x, y))
+ if (float(y) > 1.0) or ((cummulative / float(self.population)) < 0.99) :
+ cummulative += float(y)
+ perc = cummulative / float(self.population)
+ self.max = float(x) * float(self.binwidth) / 1000.0 # max is the last value
+ fid.write('{} {} {}\n'.format((float(x) * float(self.binwidth) / 1000.0), int(y), perc))
+
+ self.basefilename = basefilename
+ self.datafilename = datafilename
+
+ async def async_plot(self, title=None, directory='.', outputtype='png', filename=None) :
+ if self.basefilename is None :
+ await self.write(directory=directory, filename=filename)
+
+ if self.basefilename is not None :
+ self.gpcfilename = self.basefilename + '.gpc'
+ #write out the gnuplot control file
+ with open(self.gpcfilename, 'w') as fid :
+ if outputtype == 'canvas' :
+ fid.write('set output \"{}.{}\"\n'.format(basefilename, 'html'))
+ fid.write('set terminal canvas standalone mousing size 1024,768\n')
+ if outputtype == 'svg' :
+ fid.write('set output \"{}_svg.{}\"\n'.format(basefilename, 'html'))
+ fid.write('set terminal svg size 1024,768 dynamic mouse\n')
+ else :
+ fid.write('set output \"{}.{}\"\n'.format(basefilename, 'png'))
+ fid.write('set terminal png size 1024,768\n')
+
+ if not title and self.title :
+ title = self.title
+
+ fid.write('set key bottom\n')
+ if self.ks_index is not None :
+ fid.write('set title \"{}({}) {}({}) E={}\" noenhanced\n'.format(self.name, str(self.ks_index), title, int(self.population), self.entropy))
+ else :
+ fid.write('set title \"{}{}({}) E={}\" noenhanced\n'.format(self.name, title, int(self.population), self.entropy))
+ fid.write('set format x \"%.0f"\n')
+ fid.write('set format y \"%.1f"\n')
+ fid.write('set yrange [0:1.01]\n')
+ fid.write('set y2range [0:*]\n')
+ fid.write('set ytics add 0.1\n')
+ fid.write('set y2tics nomirror\n')
+ fid.write('set grid\n')
+ fid.write('set xlabel \"time (ms)\\n{} - {}\"\n'.format(self.starttime, self.endtime))
+ if self.max < 5.0 :
+ fid.write('set xrange [0:5]\n')
+ fid.write('set xtics auto\n')
+ elif self.max < 10.0 :
+ fid.write('set xrange [0:10]\n')
+ fid.write('set xtics add 1\n')
+ elif self.max < 20.0 :
+ fid.write('set xrange [0:20]\n')
+ fid.write('set xtics add 1\n')
+ elif self.max < 40.0 :
+ fid.write('set xrange [0:40]\n')
+ fid.write('set xtics add 5\n')
+ elif self.max < 50.0 :
+ fid.write('set xrange [0:50]\n')
+ fid.write('set xtics add 5\n')
+ elif self.max < 75.0 :
+ fid.write('set xrange [0:75]\n')
+ fid.write('set xtics add 5\n')
+ else :
+ fid.write('set xrange [0:100]\n')
+ fid.write('set xtics add 10\n')
+ fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(datafilename))
+
+ if outputtype == 'png' :
+ # Create a thumbnail too
+ fid.write('unset output; unset xtics; unset ytics; unset key; unset xlabel; unset ylabel; unset border; unset grid; unset yzeroaxis; unset xzeroaxis; unset title; set lmargin 0; set rmargin 0; set tmargin 0; set bmargin 0\n')
+ fid.write('set output \"{}_thumb.{}\"\n'.format(basefilename, 'png'))
+ fid.write('set terminal png transparent size 64,32 crop\n')
+ fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(datafilename))
+
+ await self.__exec_gnuplot()
diff --git a/flows/ssh_nodes.py b/flows/ssh_nodes.py
new file mode 100755
index 0000000..73e69f3
--- /dev/null
+++ b/flows/ssh_nodes.py
@@ -0,0 +1,499 @@
+# ---------------------------------------------------------------
+# * Copyright (c) 2018-2023
+# * Broadcom Corporation
+# * All Rights Reserved.
+# *---------------------------------------------------------------
+# Redistribution and use in source and binary forms, with or without modification, are permitted
+# provided that the following conditions are met:
+#
+# Redistributions of source code must retain the above copyright notice, this list of conditions
+# and the following disclaimer. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the documentation and/or other
+# materials provided with the distribution. Neither the name of the Broadcom nor the names of
+# contributors may be used to endorse or promote products derived from this software without
+# specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR
+# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
+# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+#
+# Author Robert J. McMahon, Broadcom LTD
+#
+# Python object to support sending remote commands to a host
+#
+# Date April 2018 - December 2023
+
+import logging
+import asyncio, subprocess
+import time, datetime
+import weakref
+import os
+import re
+
+from datetime import datetime as datetime, timezone
+
+logger = logging.getLogger(__name__)
+
+class ssh_node:
+ DEFAULT_IO_TIMEOUT = 30.0
+ DEFAULT_CMD_TIMEOUT = 30
+ DEFAULT_CONNECT_TIMEOUT = 60.0
+ rexec_tasks = []
+ _loop = None
+ instances = weakref.WeakSet()
+ periodic_cmd_futures = []
+ periodic_cmd_running_event = asyncio.Event()
+ periodic_cmd_done_event = asyncio.Event()
+
+ @classmethod
+ @property
+ def loop(cls):
+ if not cls._loop :
+ try :
+ cls._loop = asyncio.get_running_loop()
+ except :
+ if os.name == 'nt':
+ # On Windows, the ProactorEventLoop is necessary to listen on pipes
+ cls._loop = asyncio.ProactorEventLoop()
+ else:
+ cls._loop = asyncio.new_event_loop()
+ return cls._loop
+
+ @classmethod
+ def sleep(cls, time=0, text=None, stoptext=None) :
+ if text :
+ logging.info('Sleep {} ({})'.format(time, text))
+ ssh_node.loop.run_until_complete(asyncio.sleep(time))
+ if stoptext :
+ logging.info('Sleep done ({})'.format(stoptext))
+
+ @classmethod
+ def get_instances(cls):
+ try :
+ return list(ssh_node.instances)
+ except NameError :
+ return []
+
+ @classmethod
+ def run_all_commands(cls, timeout=None, text=None, stoptext=None) :
+ if ssh_node.rexec_tasks :
+ if text :
+ logging.info('Run all tasks: {})'.format(time, text))
+ ssh_node.loop.run_until_complete(asyncio.wait(ssh_node.rexec_tasks, timeout=timeout))
+ if stoptext :
+ logging.info('Commands done ({})'.format(stoptext))
+ ssh_node.rexec_tasks = []
+
+ @classmethod
+ def open_consoles(cls, silent_mode=False) :
+ nodes = ssh_node.get_instances()
+ node_names = []
+ tasks = []
+ for node in nodes:
+ if node.sshtype.lower() == 'ssh' :
+ tasks.append(asyncio.ensure_future(node.clean(), loop=ssh_node.loop))
+ if tasks :
+ logging.info('Run consoles clean')
+ try :
+ ssh_node.loop.run_until_complete(asyncio.wait(tasks, timeout=20))
+ except asyncio.TimeoutError:
+ logging.error('console cleanup timeout')
+
+ tasks = []
+ ipaddrs = []
+ for node in nodes :
+ #see if we need control master to be started
+ if node.ssh_speedups and not node.ssh_console_session and node.ipaddr not in ipaddrs:
+ logging.info('Run consoles speedup')
+ node.ssh_console_session = ssh_session(name=node.name, hostname=node.ipaddr, node=node, control_master=True, ssh_speedups=True, silent_mode=silent_mode)
+ node.console_task = asyncio.ensure_future(node.ssh_console_session.post_cmd(cmd='/usr/bin/dmesg -w', IO_TIMEOUT=None, CMD_TIMEOUT=None), loop=ssh_node.loop)
+ tasks.append(node.console_task)
+ ipaddrs.append(node.ipaddr)
+ node_names.append(node.name)
+
+ if tasks :
+ s = " "
+ logging.info('Opening consoles: {}'.format(s.join(node_names)))
+ try :
+ ssh_node.loop.run_until_complete(asyncio.wait(tasks, timeout=60))
+ except asyncio.TimeoutError:
+ logging.error('open console timeout')
+ raise
+
+ if tasks :
+ # Sleep to let the control masters settle
+ ssh_node.loop.run_until_complete(asyncio.sleep(1))
+ logging.info('open_consoles done')
+
+ @classmethod
+ def close_consoles(cls) :
+ nodes = ssh_node.get_instances()
+ tasks = []
+ node_names = []
+ for node in nodes :
+ if node.ssh_console_session :
+ node.console_task = asyncio.ensure_future(node.ssh_console_session.close(), loop=ssh_node.loop)
+ tasks.append(node.console_task)
+ node_names.append(node.name)
+
+ if tasks :
+ s = " "
+ logging.info('Closing consoles: {}'.format(s.join(node_names)))
+ ssh_node.loop.run_until_complete(asyncio.wait(tasks, timeout=60))
+ logging.info('Closing consoles done: {}'.format(s.join(node_names)))
+
+ @classmethod
+ def periodic_cmds_stop(cls) :
+ logging.info("Stop periodic futures")
+ ssh_node.periodic_cmd_futures = []
+ ssh_node.periodic_cmd_running_event.clear()
+ while not ssh_node.periodic_cmd_done_event.is_set() :
+ ssh_node.loop.run_until_complete(asyncio.sleep(0.25))
+ logging.debug("Awaiting kill periodic futures")
+ logging.debug("Stop periodic futures done")
+
+ def __init__(self, name=None, ipaddr=None, devip=None, console=False, device=None, ssh_speedups=False, silent_mode=False, sshtype='ssh', relay=None):
+ self.ipaddr = ipaddr
+ self.name = name
+ self.my_futures = []
+ self.device = device
+ self.devip = devip
+ self.sshtype = sshtype.lower()
+ if self.sshtype.lower() == 'ssh' :
+ self.ssh_speedups = ssh_speedups
+ self.controlmasters = '/tmp/controlmasters_{}'.format(self.ipaddr)
+ else :
+ self.ssh_speedups = False
+ self.controlmasters = None
+ self.ssh_console_session = None
+
+ if relay :
+ self.relay = relay
+ self.ssh = ['/usr/bin/ssh', 'root@{}'.format(relay)]
+ else :
+ self.ssh = []
+ if self.sshtype.lower() == 'ush' :
+ self.ssh.extend(['/usr/local/bin/ush'])
+ elif self.sshtype.lower() == 'ssh' :
+ if not self.ssh :
+ logging.debug("node add /usr/bin/ssh")
+ self.ssh.extend(['/usr/bin/ssh'])
+ logging.debug("ssh={} ".format(self.ssh))
+ else :
+ raise ValueError("ssh type invalid")
+
+ ssh_node.instances.add(self)
+
+ def rexec(self, cmd='pwd', IO_TIMEOUT=DEFAULT_IO_TIMEOUT, CMD_TIMEOUT=DEFAULT_CMD_TIMEOUT, CONNECT_TIMEOUT=DEFAULT_CONNECT_TIMEOUT, run_now=True, repeat = None) :
+ io_timer = IO_TIMEOUT
+ cmd_timer = CMD_TIMEOUT
+ connect_timer = CONNECT_TIMEOUT
+ this_session = ssh_session(name=self.name, hostname=self.ipaddr, CONNECT_TIMEOUT=connect_timer, node=self, ssh_speedups=True)
+ this_future = asyncio.ensure_future(this_session.post_cmd(cmd=cmd, IO_TIMEOUT=io_timer, CMD_TIMEOUT=cmd_timer, repeat = repeat), loop=ssh_node.loop)
+ if run_now:
+ ssh_node.loop.run_until_complete(asyncio.wait([this_future], timeout=CMD_TIMEOUT))
+ else:
+ ssh_node.rexec_tasks.append(this_future)
+ self.my_futures.append(this_future)
+ return this_session
+
+ async def clean(self) :
+ childprocess = await asyncio.create_subprocess_exec('/usr/bin/ssh', 'root@{}'.format(self.ipaddr), 'pkill', 'dmesg', stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ stdout, stderr = await childprocess.communicate()
+ if stdout :
+ logging.info('{}'.format(stdout))
+ if stderr :
+ logging.info('{}'.format(stderr))
+
+ def close_console(self) :
+ if self.ssh_console_session:
+ self.ssh_console_session.close()
+
+ async def repeat(self, interval, func, *args, **kwargs):
+ """
+ Run func every interval seconds.
+ If func has not finished before *interval*, will run again
+ immediately when the previous iteration finished.
+ *args and **kwargs are passed as the arguments to func.
+ """
+ logging.debug("repeat args={} kwargs={}".format(args, kwargs))
+
+ while ssh_node.periodic_cmd_running_event.is_set() :
+ await asyncio.gather(
+ func(*args, **kwargs),
+ asyncio.sleep(interval),
+ )
+ if interval == 0 :
+ break
+
+ logging.debug("Closing log_fh={}".format(kwargs['log_fh']))
+ kwargs['log_fh'].flush()
+ kwargs['log_fh'].close()
+ ssh_node.periodic_cmd_done_event.set()
+
+ def periodic_cmd_enable(self, cmd='ls', time_period=None, cmd_log_file=None) :
+ log_file_handle = open(cmd_log_file, 'w', errors='ignore')
+
+ if ssh_node.loop :
+ future = asyncio.ensure_future(self.repeat(time_period, self.run_cmd, cmd=cmd, log_fh=log_file_handle), loop=ssh_node.loop)
+ ssh_node.periodic_cmd_futures.append(future)
+ ssh_node.periodic_cmd_running_event.set()
+ ssh_node.periodic_cmd_done_event.clear()
+ else :
+ raise
+
+ async def run_cmd(self, *args, **kwargs) :
+ log_file_handle = kwargs['log_fh']
+ cmd = kwargs['cmd']
+
+ msg = "********************** Periodic Command '{}' Begins **********************".format(cmd)
+ logging.info(msg)
+ t = '%s' % datetime.now()
+ t = t[:-3] + " " + str(msg)
+ log_file_handle.write(t + '\n')
+ logging.debug("ssh={} ipaddr={} cmd={} ".format(self.ssh, self.ipaddr, cmd))
+ this_cmd = []
+ ush_flag = False
+
+ for item in self.ssh:
+ if 'ush' in item :
+ ush_flag = True
+
+ if ush_flag :
+ this_cmd.extend([*self.ssh, self.ipaddr, cmd])
+ else:
+ this_cmd.extend([*self.ssh, 'root@{}'.format(self.ipaddr), cmd])
+ logging.info("run cmd = {}".format(this_cmd))
+
+ childprocess = await asyncio.create_subprocess_exec(*this_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ logging.info("subprocess for periodic cmd = {}".format(this_cmd))
+ stdout, stderr = await childprocess.communicate()
+ if stderr:
+ msg = 'Command {} failed with {}'.format(cmd, stderr)
+ logging.error(msg)
+ t = '%s' % datetime.now()
+ t = t[:-3] + " " + str(msg)
+ log_file_handle.write(t + '\n')
+ log_file_handle.flush()
+ if stdout:
+ stdout = stdout.decode("utf-8")
+ log_file_handle.write(stdout)
+ msg = "********************** Periodic Command Ends **********************"
+ logging.info(msg)
+ t = '%s' % datetime.now()
+ t = t[:-3] + " " + str(msg)
+ log_file_handle.write(t + '\n')
+ log_file_handle.flush()
+
+# Multiplexed sessions need a control master to connect to. The run time parameters -M and -S also correspond
+# to ControlMaster and ControlPath, respectively. So first an initial master connection is established using
+# -M when accompanied by the path to the control socket using -S.
+#
+# ssh -M -S /home/fred/.ssh/controlmasters/fred@server.example.org:22 server.example.org
+# Then subsequent multiplexed connections are made in other terminals. They use ControlPath or -S to point to the control socket.
+# ssh -O check -S ~/.ssh/controlmasters/%r@%h:%p server.example.org
+# ssh -S /home/fred/.ssh/controlmasters/fred@server.example.org:22 server.example.org
+class ssh_session:
+ sessionid = 1;
+ class SSHReaderProtocol(asyncio.SubprocessProtocol):
+ def __init__(self, session, silent_mode):
+ self._exited = False
+ self._closed_stdout = False
+ self._closed_stderr = False
+ self._mypid = None
+ self._stdoutbuffer = ""
+ self._stderrbuffer = ""
+ self.debug = False
+ self._session = session
+ self._silent_mode = silent_mode
+ if self._session.CONNECT_TIMEOUT is not None :
+ self.watchdog = ssh_node.loop.call_later(self._session.CONNECT_TIMEOUT, self.wd_timer)
+ self._session.closed.clear()
+ self.timeout_occurred = asyncio.Event()
+ self.timeout_occurred.clear()
+
+ @property
+ def finished(self):
+ return self._exited and self._closed_stdout and self._closed_stderr
+
+ def signal_exit(self):
+ if not self.finished:
+ return
+ self._session.closed.set()
+
+ def connection_made(self, transport):
+ if self._session.CONNECT_TIMEOUT is not None :
+ self.watchdog.cancel()
+ self._mypid = transport.get_pid()
+ self._transport = transport
+ self._session.sshpipe = self._transport.get_extra_info('subprocess')
+ self._session.adapter.debug('{} ssh node connection made pid=({})'.format(self._session.name, self._mypid))
+ self._session.connected.set()
+ if self._session.IO_TIMEOUT is not None :
+ self.iowatchdog = ssh_node.loop.call_later(self._session.IO_TIMEOUT, self.io_timer)
+ if self._session.CMD_TIMEOUT is not None :
+ self.watchdog = ssh_node.loop.call_later(self._session.CMD_TIMEOUT, self.wd_timer)
+
+ def connection_lost(self, exc):
+ self._session.adapter.debug('{} node connection lost pid=({})'.format(self._session.name, self._mypid))
+ self._session.connected.clear()
+
+ def pipe_data_received(self, fd, data):
+ if self._session.IO_TIMEOUT is not None :
+ self.iowatchdog.cancel()
+ if self.debug :
+ logging.debug('{} {}'.format(fd, data))
+ self._session.results.extend(data)
+ data = data.decode("utf-8")
+ if fd == 1:
+ self._stdoutbuffer += data
+ while "\n" in self._stdoutbuffer:
+ line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1)
+ if not self._silent_mode :
+ self._session.adapter.info('{}'.format(line.replace("\r","")))
+
+ elif fd == 2:
+ self._stderrbuffer += data
+ while "\n" in self._stderrbuffer:
+ line, self._stderrbuffer = self._stderrbuffer.split("\n", 1)
+ self._session.adapter.warning('{} {}'.format(self._session.name, line.replace("\r","")))
+
+ if self._session.IO_TIMEOUT is not None :
+ self.iowatchdog = ssh_node.loop.call_later(self._session.IO_TIMEOUT, self.io_timer)
+
+ def pipe_connection_lost(self, fd, exc):
+ if self._session.IO_TIMEOUT is not None :
+ self.iowatchdog.cancel()
+ if fd == 1:
+ self._session.adapter.debug('{} stdout pipe closed (exception={})'.format(self._session.name, exc))
+ self._closed_stdout = True
+ elif fd == 2:
+ self._session.adapter.debug('{} stderr pipe closed (exception={})'.format(self._session.name, exc))
+ self._closed_stderr = True
+ self.signal_exit()
+
+ def process_exited(self):
+ if self._session.CMD_TIMEOUT is not None :
+ self.watchdog.cancel()
+ logging.debug('{} subprocess with pid={} closed'.format(self._session.name, self._mypid))
+ self._exited = True
+ self._mypid = None
+ self.signal_exit()
+
+ def wd_timer(self, type=None):
+ logging.error("{}: timeout: pid={}".format(self._session.name, self._mypid))
+ self.timeout_occurred.set()
+ if self._session.sshpipe :
+ self._session.sshpipe.terminate()
+
+ def io_timer(self, type=None):
+ logging.error("{} IO timeout: cmd='{}' host(pid)={}({})".format(self._session.name, self._session.cmd, self._session.hostname, self._mypid))
+ self.timeout_occurred.set()
+ self._session.sshpipe.terminate()
+
+ class CustomAdapter(logging.LoggerAdapter):
+ def process(self, msg, kwargs):
+ return '[%s] %s' % (self.extra['connid'], msg), kwargs
+
+ def __init__(self, user='root', name=None, hostname='localhost', CONNECT_TIMEOUT=None, control_master=False, node=None, silent_mode=False, ssh_speedups=True):
+ self.hostname = hostname
+ self.name = name
+ self.user = user
+ self.opened = asyncio.Event()
+ self.closed = asyncio.Event()
+ self.connected = asyncio.Event()
+ self.closed.set()
+ self.opened.clear()
+ self.connected.clear()
+ self.results = bytearray()
+ self.sshpipe = None
+ self.node = node
+ self.CONNECT_TIMEOUT = CONNECT_TIMEOUT
+ self.IO_TIMEOUT = None
+ self.CMD_TIMEOUT = None
+ self.control_master = control_master
+ self.ssh = node.ssh.copy()
+ self.silent_mode = silent_mode
+ self.ssh_speedups = ssh_speedups
+ logger = logging.getLogger(__name__)
+ if control_master :
+ conn_id = self.name + '(console)'
+ else :
+ conn_id = '{}({})'.format(self.name, ssh_session.sessionid)
+ ssh_session.sessionid += 1
+
+ self.adapter = self.CustomAdapter(logger, {'connid': conn_id})
+
+ def __getattr__(self, attr) :
+ if self.node :
+ return getattr(self.node, attr)
+
+ @property
+ def is_established(self):
+ return self._exited and self._closed_stdout and self._closed_stderr
+
+ async def close(self) :
+ if self.control_master :
+ logging.info('control master close called {}'.format(self.controlmasters))
+ childprocess = await asyncio.create_subprocess_exec('/usr/bin/ssh', 'root@{}'.format(self.ipaddr), 'pkill', 'dmesg', stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ stdout, stderr = await childprocess.communicate()
+ if stdout :
+ logging.info('{}'.format(stdout))
+ if stderr :
+ logging.info('{}'.format(stderr))
+ self.sshpipe.terminate()
+ await self.closed.wait()
+ logging.info('control master exit called {}'.format(self.controlmasters))
+ childprocess = await asyncio.create_subprocess_exec(self.ssh, '-o ControlPath={}'.format(self.controlmasters), '-O exit dummy-arg-why-needed', stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+ stdout, stderr = await childprocess.communicate()
+ if stdout :
+ logging.info('{}'.format(stdout))
+ if stderr :
+ logging.info('{}'.format(stderr))
+
+ elif self.sshpipe :
+ self.sshpipe.terminate()
+ await self.closed.wait()
+
+ async def post_cmd(self, cmd=None, IO_TIMEOUT=None, CMD_TIMEOUT=None, ssh_speedups=True, repeat=None) :
+ logging.debug("{} Post command {}".format(self.name, cmd))
+ self.opened.clear()
+ self.cmd = cmd
+ self.IO_TIMEOUT = IO_TIMEOUT
+ self.CMD_TIMEOUT = CMD_TIMEOUT
+ self.repeatcmd = None
+ sshcmd = self.ssh.copy()
+ if self.control_master :
+ try:
+ os.remove(str(self.controlmasters))
+ except OSError:
+ pass
+ sshcmd.extend(['-o ControlMaster=yes', '-o ControlPath={}'.format(self.controlmasters), '-o ControlPersist=1'])
+ elif self.node.sshtype == 'ssh' :
+ sshcmd.append('-o ControlPath={}'.format(self.controlmasters))
+ if self.node.ssh_speedups :
+ sshcmd.extend(['{}@{}'.format(self.user, self.hostname), cmd])
+ else :
+ sshcmd.extend(['{}'.format(self.hostname), cmd])
+ s = " "
+ logging.info('{} {}'.format(self.name, s.join(sshcmd)))
+ while True :
+ # self in the ReaderProtocol() is this ssh_session instance
+ self._transport, self._protocol = await ssh_node.loop.subprocess_exec(lambda: self.SSHReaderProtocol(self, self.silent_mode), *sshcmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=None)
+ # self.sshpipe = self._transport.get_extra_info('subprocess')
+ # Establish the remote command
+ await self.connected.wait()
+ logging.debug("post_cmd connected")
+ # u = '{}\n'.format(cmd)
+ # self.sshpipe.stdin.write(u.encode())
+ # Wait for the command to complete
+ if not self.control_master :
+ await self.closed.wait()
+ if not repeat :
+ break
+ return self.results