diff options
Diffstat (limited to '')
-rw-r--r-- | flows/flows.py | 1300 |
1 files changed, 1300 insertions, 0 deletions
diff --git a/flows/flows.py b/flows/flows.py new file mode 100644 index 0000000..5465b6b --- /dev/null +++ b/flows/flows.py @@ -0,0 +1,1300 @@ +# ---------------------------------------------------------------- +# * Copyright (c) 2018-2023 +# * Broadcom Corporation +# * All Rights Reserved. +# *--------------------------------------------------------------- +# Redistribution and use in source and binary forms, with or without modification, are permitted +# provided that the following conditions are met: +# +# Redistributions of source code must retain the above copyright notice, this list of conditions +# and the following disclaimer. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. Neither the name of the Broadcom nor the names of +# contributors may be used to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND +# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USEn, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER +# IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Author Robert J. McMahon, Broadcom LTD +# Date April 2016 - December 2023 + +import re +import subprocess +import logging +import asyncio, sys +import time, datetime +import locale +import signal +import weakref +import os +import getpass +import math +import scipy +import scipy.spatial +import numpy as np +import tkinter +import ctypes +import ipaddress +import collections +import csv + +from datetime import datetime as datetime, timezone +from scipy import stats +from scipy.cluster import hierarchy +from scipy.cluster.hierarchy import linkage +import matplotlib.pyplot as plt +from collections import defaultdict + +logger = logging.getLogger(__name__) + +class iperf_flow(object): + port = 61000 + iperf = '/usr/bin/iperf' + instances = weakref.WeakSet() + _loop = None + flow_scope = ("flowstats") + tasks = [] + flowid2name = defaultdict(str) + + @classmethod + def get_instances(cls): + return list(iperf_flow.instances) + + @classmethod + @property + def loop(cls): + if not cls._loop : + try : + cls._loop = asyncio.get_running_loop() + except : + if os.name == 'nt': + # On Windows, the ProactorEventLoop is necessary to listen on pipes + cls._loop = asyncio.ProactorEventLoop() + else: + cls._loop = asyncio.new_event_loop() + return cls._loop + + + @classmethod + def close_loop(cls): + if iperf_flow.loop.is_running(): + iperf_flow.loop.run_until_complete(loop.shutdown_asyncgens()) + iperf_flow.loop.close() + + @classmethod + def sleep(cls, time=0, text=None, stoptext=None) : + if text : + logging.info('Sleep {} ({})'.format(time, text)) + iperf_flow.loop.run_until_complete(asyncio.sleep(time)) + if stoptext : + logging.info('Sleep done ({})'.format(stoptext)) + + + @classmethod + def run(cls, time=None, amount=None, flows='all', sample_delay=None, io_timer=None, preclean=True, parallel=None, epoch_sync=False) : + if flows == 'all' : + flows = iperf_flow.get_instances() + if not flows: + logging.warn('flow run method called with no flows instantiated') + return + + if preclean: + hosts = [flow.server for flow in flows] + hosts.extend([flow.client for flow in flows]) + hosts=list(set(hosts)) + tasks = [asyncio.ensure_future(iperf_flow.cleanup(user='root', host=host), loop=iperf_flow.loop) for host in hosts] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('preclean timeout') + raise + + logging.info('flow run invoked') + tasks = [asyncio.ensure_future(flow.rx.start(time=time), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow server start timeout') + raise + iperf_flow.sleep(time=0.3, text="wait for rx up", stoptext="rx up done") + + if epoch_sync : + dt = (datetime.now()).timestamp() + tsec = str(dt).split('.') + epoch_sync_time = int(tsec[0]) + 2 + else : + epoch_sync_time = None + + tasks = [asyncio.ensure_future(flow.tx.start(time=time, amount=amount, parallel=parallel, epoch_sync_time=epoch_sync_time), loop=iperf_flow.loop) for flow in flows] + + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow client start timeout') + raise + if sample_delay : + iperf_flow.sleep(time=0.3, text="ramp up", stoptext="ramp up done") + if io_timer : + tasks = [asyncio.ensure_future(flow.is_traffic(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow traffic check timeout') + raise + if time : + iperf_flow.sleep(time=time + 4, text="Running traffic start", stoptext="Stopping flows") + # Signal the remote iperf client sessions to stop them + tasks = [asyncio.ensure_future(flow.tx.signal_stop(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=3)) + except asyncio.TimeoutError: + logging.error('flow tx stop timeout') + raise + + elif amount: + tasks = [asyncio.ensure_future(flow.transmit_completed(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow tx completed timed out') + raise + logging.info('flow transmit completed') + + # Now signal the remote iperf server sessions to stop them + tasks = [asyncio.ensure_future(flow.rx.signal_stop(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=3)) + except asyncio.TimeoutError: + logging.error('flow tx stop timeout') + raise + + # iperf_flow.loop.close() + logging.info('flow run finished') + + @classmethod + def commence(cls, time=None, flows='all', sample_delay=None, io_timer=None, preclean=True) : + if flows == 'all' : + flows = iperf_flow.get_instances() + if not flows: + logging.warn('flow run method called with no flows instantiated') + return + + if preclean: + hosts = [flow.server for flow in flows] + hosts.extend([flow.client for flow in flows]) + hosts=list(set(hosts)) + tasks = [asyncio.ensure_future(iperf_flow.cleanup(user='root', host=host), loop=iperf_flow.loop) for host in hosts] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('preclean timeout') + raise + + logging.info('flow start invoked') + tasks = [asyncio.ensure_future(flow.rx.start(time=time), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow server start timeout') + raise + iperf_flow.sleep(time=0.3, text="wait for rx up", stoptext="rx up done") + tasks = [asyncio.ensure_future(flow.tx.start(time=time), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow client start timeout') + raise + + @classmethod + def plot(cls, flows='all', title='None', directory='None') : + if flows == 'all' : + flows = iperf_flow.get_instances() + + tasks = [] + for flow in flows : + for this_name in flow.histogram_names : + path = directory + '/' + this_name + os.makedirs(path, exist_ok=True) + i = 0 + # group by name + histograms = [h for h in flow.histograms if h.name == this_name] + for histogram in histograms : + if histogram.ks_index is not None : + histogram.output_dir = directory + '/' + this_name + '/' + this_name + str(i) + else : + histogram.output_dir = directory + '/' + this_name + '/' + this_name + str(histogram.ks_index) + + logging.info('scheduling task {}'.format(histogram.output_dir)) + tasks.append(asyncio.ensure_future(histogram.async_plot(directory=histogram.output_dir, title=title), loop=iperf_flow.loop)) + i += 1 + try : + logging.info('runnings tasks') + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=600)) + except asyncio.TimeoutError: + logging.error('plot timed out') + raise + + + @classmethod + def cease(cls, flows='all') : + + if flows == 'all' : + flows = iperf_flow.get_instances() + + # Signal the remote iperf client sessions to stop them + tasks = [asyncio.ensure_future(flow.tx.signal_stop(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow tx stop timeout') + + # Now signal the remote iperf server sessions to stop them + tasks = [asyncio.ensure_future(flow.rx.signal_stop(), loop=iperf_flow.loop) for flow in flows] + try : + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=10)) + except asyncio.TimeoutError: + logging.error('flow rx stop timeout') + + @classmethod + async def cleanup(cls, host=None, sshcmd='/usr/bin/ssh', user='root') : + if host: + logging.info('ssh {}@{} pkill iperf'.format(user, host)) + childprocess = await asyncio.create_subprocess_exec(sshcmd, '{}@{}'.format(user, host), 'pkill', 'iperf', stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('cleanup: host({}) stdout={} '.format(host, stdout)) + + @classmethod + def tos_to_txt(cls, tos) : + switcher = { + int(0x0) : "BE", + int(0x02) : "BK", + int(0xC0) : "VO", + int(0x80) : "VI", + } + return switcher.get(int(tos), None) + + @classmethod + def txt_to_tos(cls, txt) : + switcher = { + "BE" : "0x0", + "BESTEFFORT" : "0x0", + "0x0" : "0x0", + "BK" : "0x20", + "BACKGROUND" : "0x20", + "0x20" : "0x20", + "VO" : "0xC0", + "VOICE" : "0xC0", + "0xC0" : "0xC0", + "VI" : "0x80", + "VIDEO" : "0x80", + "0x80" : "0x80", + } + return switcher.get(txt.upper(), None) + + def __init__(self, name='iperf', server=None, client=None, user=None, proto='TCP', dstip='127.0.0.1', interval=1, format='b', offered_load=None, tos='BE', window='4M', src=None, srcip=None, srcport=None, dstport=None, debug=False, length=None, ipg=0.0, amount=None, trip_times=True, prefetch=None, latency=False, bb=False, working_load=False, bb_period=None, bb_hold=None, txstart_delay_sec=None, burst_size=None, burst_period=None, fullduplex=False, cca=None, tcp_tx_delay=None): + iperf_flow.instances.add(self) + self.name = name + self.latency = latency + if not dstport : + iperf_flow.port += 1 + self.dstport = iperf_flow.port + else: + self.dstport = dstport + self.dstip = dstip + self.srcip = srcip + self.srcport = srcport + try : + self.server = server.ipaddr + except AttributeError: + self.server = server + try : + self.client = client.ipaddr + except AttributeError: + self.client = client + + self.client_device = client.device + self.server_device = server.device + + if not user : + self.user = getpass.getuser() + else : + self.user = user + self.proto = proto + self.tcp_tx_delay = tcp_tx_delay + self.tos = tos + if length : + self.length = length + + if amount : + self.amount = amount + if trip_times : + self.trip_times = trip_times + if burst_period : + self.burst_period = burst_period + if burst_size : + self.burst_size = burst_size + + if txstart_delay_sec: + self.txstart_delay_sec = txstart_delay_sec + + if cca: + self.cca = cca + + self.interval = round(interval,3) + self.format = format + self.offered_load = offered_load + if self.offered_load : + if len(self.offered_load.split(':')) == 2 : + self.isoch = True + self.name += '-isoch' + else : + self.isoch = False + self.prefetch = prefetch + self.ipg = ipg + self.debug = debug + self.TRAFFIC_EVENT_TIMEOUT = round(self.interval * 4, 3) + self.bb = bb + self.working_load = working_load + self.bb_period = bb_period + self.bb_hold = bb_hold + self.fullduplex = fullduplex + # use python composition for the server and client + # i.e. a flow has a server and a client + self.rx = iperf_server(name='{}->RX({})'.format(name, str(self.server)), loop=iperf_flow.loop, host=self.server, flow=self, debug=self.debug) + self.tx = iperf_client(name='{}->TX({})'.format(name, str(self.client)), loop=iperf_flow.loop, host=self.client, flow=self, debug=self.debug) + self.rx.window=window + self.tx.window=window + self.ks_critical_p = 0.01 + self.stats_reset() + + #def __del__(self) : + # iperf_flow.instances.remove(self) + + def destroy(self) : + iperf_flow.instances.remove(self) + + def __getattr__(self, attr) : + if attr in self.flowstats : + return self.flowstats[attr] + + def stats_reset(self) : + # Initialize the flow stats dictionary + self.flowstats = {'current_rxbytes' : None , 'current_txbytes' : None , 'flowrate' : None, 'starttime' : None, 'flowid' : None, 'endtime' : None} + self.flowstats['txdatetime']=[] + self.flowstats['txbytes']=[] + self.flowstats['txthroughput']=[] + self.flowstats['writes']=[] + self.flowstats['errwrites']=[] + self.flowstats['retry']=[] + self.flowstats['cwnd']=[] + self.flowstats['rtt']=[] + self.flowstats['rxdatetime']=[] + self.flowstats['rxbytes']=[] + self.flowstats['rxthroughput']=[] + self.flowstats['reads']=[] + self.flowstats['histograms']=[] + self.flowstats['histogram_names'] = set() + self.flowstats['connect_time']=[] + self.flowstats['trip_time']=[] + self.flowstats['jitter']=[] + self.flowstats['rxlostpkts']=[] + self.flowstats['rxtotpkts']=[] + self.flowstats['meanlat']=[] + self.flowstats['minlat']=[] + self.flowstats['maxlat']=[] + self.flowstats['stdevlat']=[] + self.flowstats['rxpps']=[] + self.flowstats['inP']=[] + self.flowstats['inPvar']=[] + self.flowstats['rxpkts']=[] + self.flowstats['netPower']=[] + + async def start(self): + self.flowstats = {'current_rxbytes' : None , 'current_txbytes' : None , 'flowrate' : None, 'flowid' : None} + await self.rx.start() + await self.tx.start() + + async def is_traffic(self) : + if self.interval < 0.005 : + logging.warn('{} {}'.format(self.name, 'traffic check invoked without interval sampling')) + else : + self.rx.traffic_event.clear() + self.tx.traffic_event.clear() + logging.info('{} {}'.format(self.name, 'traffic check invoked')) + await self.rx.traffic_event.wait() + await self.tx.traffic_event.wait() + + async def transmit_completed(self) : + logging.info('{} {}'.format(self.name, 'waiting for transmit to complete')) + await self.tx.txcompleted.wait() + + async def stop(self): + self.tx.stop() + self.rx.stop() + + def stats(self): + logging.info('stats') + + def compute_ks_table(self, plot=True, directory='.', title=None) : + + if len(self.histogram_names) < 1 : + tmp = "***Failed. Expected 1 histogram_names, but instead got {0}".format(len(self.histogram_names)) + logging.info(tmp) + print(tmp) + #raise + + for this_name in self.histogram_names : + # group by name + histograms = [h for h in self.histograms if h.name == this_name] + for index, h in enumerate(histograms) : + h.ks_index = index + tmp = "{} KS Table has {} entries".format(self.name, len(histograms)) + logging.info(tmp) + print(tmp) + + self.condensed_distance_matrix = ([]) + + tasks = [] + for rowindex, h1 in enumerate(histograms) : + resultstr = rowindex * 'x' + maxp = None + minp = None + for h2 in histograms[rowindex:] : + d,p = stats.ks_2samp(h1.samples, h2.samples) + if h1 is not h2 : + self.condensed_distance_matrix = np.append(self.condensed_distance_matrix,d) + logging.debug('D,p={},{} cp={}'.format(str(d),str(p), str(self.ks_critical_p))) + if not minp or p < minp : + minp = p + if not maxp or (p != 1 and p > maxp) : + maxp = p + if p > self.ks_critical_p : + resultstr += '1' + else : + resultstr += '0' + if plot : + tasks.append(asyncio.ensure_future(flow_histogram.plot_two_sample_ks(h1=h1, h2=h2, flowname=self.name, title=title, directory=directory), loop=iperf_flow.loop)) + print('KS: {0}({1:3d}):{2} minp={3} ptest={4}'.format(this_name, rowindex, resultstr, str(minp), str(self.ks_critical_p))) + logging.info('KS: {0}({1:3d}):{2} minp={3} ptest={4}'.format(this_name, rowindex, resultstr, str(minp), str(self.ks_critical_p))) + if tasks : + try : + logging.debug('running KS table plotting coroutines for {} row {}'.format(this_name,str(rowindex))) + iperf_flow.loop.run_until_complete(asyncio.wait(tasks, timeout=300)) + except asyncio.TimeoutError: + logging.error('plot timed out') + raise + logging.info('{} {}(condensed distance matrix)\n{}'.format(self.name, this_name,self.condensed_distance_matrix)) + self.linkage_matrix=linkage(self.condensed_distance_matrix, 'ward') + try : + plt.figure(figsize=(18,10)) + dn = hierarchy.dendrogram(self.linkage_matrix) + plt.title("{} {}".format(self.name, this_name)) + plt.savefig('{}/dn_{}_{}.png'.format(directory,self.name,this_name)) + logging.info('{} {}(distance matrix)\n{}'.format(self.name, this_name,scipy.spatial.distance.squareform(self.condensed_distance_matrix))) + print('{} {}(distance matrix)\n{}'.format(self.name, this_name,scipy.spatial.distance.squareform(self.condensed_distance_matrix))) + print('{} {}(cluster linkage)\n{}'.format(self.name,this_name,self.linkage_matrix)) + logging.info('{} {}(cluster linkage)\n{}'.format(self.name,this_name,self.linkage_matrix)) + flattened=scipy.cluster.hierarchy.fcluster(self.linkage_matrix, 0.75*self.condensed_distance_matrix.max(), criterion='distance') + print('{} {} Clusters:{}'.format(self.name, this_name, flattened)) + logging.info('{} {} Clusters:{}'.format(self.name, this_name, flattened)) + except: + pass + + def dump_stats(self, directory='.') : + logging.info("\n********************** dump_stats for flow {} **********************".format(self.name)) + + #logging.info('This flow Name={} id={} items_cnt={}'.format(iperf_flow.flowid2name[self.flowstats['flowid']], str(self.flowstats['flowid']), len(self.flowstats))) + #logging.info('All flows Name and id: {}'.format(str(iperf_flow.flowid2name))) + #logging.info('This flow Name={} flowstats={}'.format(self.name, str(self.flowstats))) + + csvfilename = os.path.join(directory, '{}.csv'.format(self.name)) + if not os.path.exists(directory): + logging.debug('Making results directory {}'.format(directory)) + os.makedirs(directory) + + logging.info("Writing stats to '{}'".format(csvfilename)) + + for stat_name in [stat for stat in self.flowstats.keys() if stat != 'histograms'] : + logging.info("{}={}".format(stat_name, str(self.flowstats[stat_name]))) + + with open(csvfilename, 'w', newline='') as fd : + keynames = self.flowstats.keys() + writer = csv.writer(fd) + writer.writerow(keynames) + writer.writerow([self.flowstats[keyname] for keyname in keynames]) + writer.writerow([h.samples for h in self.flowstats['histograms']]) + +class iperf_server(object): + + class IperfServerProtocol(asyncio.SubprocessProtocol): + def __init__(self, server, flow): + self.__dict__['flow'] = flow + self._exited = False + self._closed_stdout = False + self._closed_stderr = False + self._mypid = None + self._server = server + self._stdoutbuffer = "" + self._stderrbuffer = "" + + def __setattr__(self, attr, value): + if attr in iperf_flow.flow_scope: + self.flow.__setattr__(self.flow, attr, value) + else: + self.__dict__[attr] = value + + # methods and attributes not here are handled by the flow object, + # aka, the flow object delegates to this object per composition + def __getattr__(self, attr): + if attr in iperf_flow.flow_scope: + return getattr(self.flow, attr) + + @property + def finished(self): + return self._exited and self._closed_stdout and self._closed_stderr + + def signal_exit(self): + if not self.finished: + return + self._server.closed.set() + self._server.opened.clear() + + def connection_made(self, trans): + self._server.closed.clear() + self._mypid = trans.get_pid() + logging.debug('server connection made pid=({})'.format(self._mypid)) + + def pipe_data_received(self, fd, data): + if self.debug : + logging.debug('{} {}'.format(fd, data)) + data = data.decode("utf-8") + if fd == 1: + self._stdoutbuffer += data + while "\n" in self._stdoutbuffer: + line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1) + self._server.adapter.info('{} (stdout,{})'.format(line, self._server.remotepid)) + if not self._server.opened.is_set() : + m = self._server.regex_open_pid.match(line) + if m : + self._server.remotepid = m.group('pid') + self._server.opened.set() + logging.debug('{} pipe reading (stdout,{})'.format(self._server.name, self._server.remotepid)) + else : + if self._server.proto == 'TCP' : + m = self._server.regex_traffic.match(line) + if m : + timestamp = datetime.now() + if not self._server.traffic_event.is_set() : + self._server.traffic_event.set() + + bytes = float(m.group('bytes')) + if self.flowstats['current_txbytes'] : + flowrate = round((bytes / self.flowstats['current_txbytes']), 2) + # *consume* the current *txbytes* where the client pipe will repopulate on its next sample + # do this by setting the value to None + self.flowstats['current_txbytes'] = None + # logging.debug('{} flow ratio={:.2f}'.format(self._server.name, flowrate)) + self.flowstats['flowrate'] = flowrate + else : + # *produce* the current *rxbytes* so the client pipe can know this event occurred + # indicate this by setting the value to value + self.flowstats['current_rxbytes'] = bytes + self.flowstats['rxdatetime'].append(timestamp) + self.flowstats['rxbytes'].append(m.group('bytes')) + self.flowstats['rxthroughput'].append(m.group('throughput')) + self.flowstats['reads'].append(m.group('reads')) + else : + m = self._server.regex_trip_time.match(line) + if m : + self.flowstats['trip_time'].append(float(m.group('trip_time')) * 1000) + else : + m = self._server.regex_traffic_udp.match(line) + if m : + timestamp = datetime.now() + if not self._server.traffic_event.is_set() : + self._server.traffic_event.set() + self.flowstats['rxbytes'].append(m.group('bytes')) + self.flowstats['rxthroughput'].append(m.group('throughput')) + self.flowstats['jitter'].append(m.group('jitter')) + self.flowstats['rxlostpkts'].append(m.group('lost_pkts')) + self.flowstats['rxtotpkts'].append(m.group('tot_pkts')) + self.flowstats['meanlat'].append(m.group('lat_mean')) + self.flowstats['minlat'].append(m.group('lat_min')) + self.flowstats['maxlat'].append(m.group('lat_max')) + self.flowstats['stdevlat'].append(m.group('lat_stdev')) + self.flowstats['rxpps'].append(m.group('pps')) + self.flowstats['inP'].append(m.group('inP')) + self.flowstats['inPvar'].append(m.group('inPvar')) + self.flowstats['rxpkts'].append(m.group('pkts')) + self.flowstats['netPower'].append(m.group('netPower')) + m = self._server.regex_final_histogram_traffic.match(line) + if m : + timestamp = datetime.now(timezone.utc).astimezone() + self.flowstats['endtime']= timestamp + self.flowstats['histogram_names'].add(m.group('pdfname')) + this_histogram = flow_histogram(name=m.group('pdfname'),values=m.group('pdf'), population=m.group('population'), binwidth=m.group('binwidth'), starttime=self.flowstats['starttime'], endtime=timestamp, outliers=m.group('outliers'), uci=m.group('uci'), uci_val=m.group('uci_val'), lci=m.group('lci'), lci_val=m.group('lci_val')) + self.flowstats['histograms'].append(this_histogram) + logging.info('pdf {} found with bin width={} us'.format(m.group('pdfname'), m.group('binwidth'))) + + elif fd == 2: + self._stderrbuffer += data + while "\n" in self._stderrbuffer: + line, self._stderrbuffer = self._stderrbuffer.split("\n", 1) + logging.info('{} {} (stderr)'.format(self._server.name, line)) + m = self._server.regex_rx_bind_failed.match(line) + if m : + logging.error('RX Bind Failed. Check LAN / WLAN between server and client.') + iperf_flow.loop.stop() + raise + + def pipe_connection_lost(self, fd, exc): + if fd == 1: + self._closed_stdout = True + logging.debug('stdout pipe to {} closed (exception={})'.format(self._server.name, exc)) + elif fd == 2: + self._closed_stderr = True + logging.debug('stderr pipe to {} closed (exception={})'.format(self._server.name, exc)) + if self._closed_stdout and self._closed_stderr : + self.remotepid = None; + self.signal_exit() + + def process_exited(self): + logging.debug('subprocess with pid={} closed'.format(self._mypid)) + self._exited = True + self._mypid = None + self.signal_exit() + + class CustomAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['connid'], msg), kwargs + + def __init__(self, name='Server', loop=None, host='localhost', flow=None, debug=False): + self.__dict__['flow'] = flow + self.name = name + self.iperf = '/usr/local/bin/iperf' + self.ssh = '/usr/bin/ssh' + self.host = host + self.flow = flow + self.debug = debug + self.opened = asyncio.Event() + self.closed = asyncio.Event() + self.closed.set() + self.traffic_event = asyncio.Event() + self._transport = None + self._protocol = None + self.time = time + conn_id = '{}'.format(self.name) + self.adapter = self.CustomAdapter(logger, {'connid': conn_id}) + + # ex. [ 4] 0.00-0.50 sec 657090 Bytes 10513440 bits/sec 449 449:0:0:0:0:0:0:0 + self.regex_traffic = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>[0-9]+) Bytes\s+(?P<throughput>[0-9]+) bits/sec\s+(?P<reads>[0-9]+)') + self.regex_traffic_udp = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>[0-9]+) Bytes\s+(?P<throughput>[0-9]+) bits/sec\s+(?P<jitter>[0-9.]+)\sms\s(?P<lost_pkts>[0-9]+)/(?P<tot_pkts>[0-9]+).+(?P<lat_mean>[0-9.]+)/(?P<lat_min>[0-9.]+)/(?P<lat_max>[0-9.]+)/(?P<lat_stdev>[0-9.]+)\sms\s(?P<pps>[0-9]+)\spps\s+(?P<netPower>[0-9\.]+)\/(?P<inP>[0-9]+)\((?P<inPvar>[0-9]+)\)\spkts\s(?P<pkts>[0-9]+)') + self.regex_final_histogram_traffic = re.compile(r'\[\s*\d+\] (?P<timestamp>.*) sec\s+(?P<pdfname>[A-Za-z0-9\-]+)\(f\)-PDF: bin\(w=(?P<binwidth>[0-9]+)us\):cnt\((?P<population>[0-9]+)\)=(?P<pdf>.+)\s+\((?P<lci>[0-9\.]+)/(?P<uci>[0-9\.]+)/(?P<uci2>[0-9\.]+)%=(?P<lci_val>[0-9]+)/(?P<uci_val>[0-9]+)/(?P<uci_val2>[0-9]+),Outliers=(?P<outliers>[0-9]+),obl/obu=[0-9]+/[0-9]+\)') + # 0.0000-0.5259 trip-time (3WHS done->fin+finack) = 0.5597 sec + self.regex_trip_time = re.compile(r'.+trip\-time\s+\(3WHS\sdone\->fin\+finack\)\s=\s(?P<trip_time>\d+\.\d+)\ssec') + self.regex_rx_bind_failed = re.compile(r'listener bind failed: Cannot assign requested address') + + def __getattr__(self, attr): + return getattr(self.flow, attr) + + async def start(self, time=time): + if not self.closed.is_set() : + return + + # ex. Server listening on TCP port 61003 with pid 2565 + self.regex_open_pid = re.compile(r'^Server listening on {} port {} with pid (?P<pid>\d+)'.format(self.proto, str(self.dstport))) + + self.opened.clear() + self.remotepid = None + if time : + iperftime = time + 30 + self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-s', '-p ' + str(self.dstport), '-P 1', '-e', '-t ' + str(iperftime), '-f{}'.format(self.format), '-w' , self.window, '--realtime'] + else : + self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-s', '-p ' + str(self.dstport), '-P 1', '-e', '-f{}'.format(self.format), '-w' , self.window, '--realtime'] + if self.interval >= 0.005 : + self.sshcmd.extend(['-i ', str(self.interval)]) + if self.server_device and self.srcip : + self.sshcmd.extend(['-B ', '{}%{}'.format(self.dstip, self.server_device)]) + if self.proto == 'UDP' : + self.sshcmd.extend(['-u']) + if self.latency : + self.sshcmd.extend(['--histograms=100u,100000,50,95']) + self.sshcmd.extend(['--jitter-histograms']) + + logging.info('{}'.format(str(self.sshcmd))) + self._transport, self._protocol = await iperf_flow.loop.subprocess_exec(lambda: self.IperfServerProtocol(self, self.flow), *self.sshcmd) + await self.opened.wait() + + async def signal_stop(self): + if self.remotepid and not self.finished : + childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-HUP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug('({}) sending signal HUP to {} (pid={})'.format(self.user, self.host, self.remotepid)) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('kill remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) + if not self.closed.is_set() : + await self.closed.wait() + logging.info('await kill completed remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) + logging.info('kill remote pid {} {}({}) {}'.format(self.remotepid, self.user, self.host, stdout)) + + +class iperf_client(object): + + # Asyncio protocol for subprocess transport + class IperfClientProtocol(asyncio.SubprocessProtocol): + def __init__(self, client, flow): + self.__dict__['flow'] = flow + self._exited = False + self._closed_stdout = False + self._closed_stderr = False + self._mypid = None + self._client = client + self._stdoutbuffer = "" + self._stderrbuffer = "" + + def __setattr__(self, attr, value): + if attr in iperf_flow.flow_scope: + self.flow.__setattr__(self.flow, attr, value) + else: + self.__dict__[attr] = value + + def __getattr__(self, attr): + if attr in iperf_flow.flow_scope: + return getattr(self.flow, attr) + + @property + def finished(self): + return self._exited and self._closed_stdout and self._closed_stderr + + def signal_exit(self): + if not self.finished: + return + self._client.closed.set() + self._client.opened.clear() + self._client.txcompleted.set() + + def connection_made(self, trans): + self._client.closed.clear() + self._mypid = trans.get_pid() + logging.debug('client connection made pid=({})'.format(self._mypid)) + + def pipe_data_received(self, fd, data): + if self.debug : + logging.debug('{} {}'.format(fd, data)) + data = data.decode("utf-8") + if fd == 1: + self._stdoutbuffer += data + while "\n" in self._stdoutbuffer: + line, self._stdoutbuffer = self._stdoutbuffer.split("\n", 1) + self._client.adapter.info('{} (stdout,{})'.format(line, self._client.remotepid)) + if not self._client.opened.is_set() : + m = self._client.regex_open_pid.match(line) + if m : + self._client.opened.set() + self._client.remotepid = m.group('pid') + self.flowstats['starttime'] = datetime.now(timezone.utc).astimezone() + logging.debug('{} pipe reading at {} (stdout,{})'.format(self._client.name, self.flowstats['starttime'].isoformat(), self._client.remotepid)) + else : + if self.flowstats['flowid'] is None : + m = self._client.regex_flowid.match(line) + if m : + # [ 1] local 192.168.1.15%enp1s0 port 7001 connected with 192.168.1.232 port 7001 (trip-times) (sock=3) on 2021-10-11 14:39:45 (PDT) + # self.regex_flowid = re.compile(r'local\s(?P<srcip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}).*\sport\s(?P<srcport>[0-9]+)\sconnected with\s(?P<dstip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3})\sport\s(?P<dstport>[0-9]+)') + # + # temp = htonl(config->src_ip); + # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); + # temp = htonl(config->dst_ip); + # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); + # temp = (hton16(config->dst_port) << 16) | hton16(config->src_port); + # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); + # temp = config->proto; + # checksum ^= bcm_compute_xor32((volatile uint32 *)&temp, sizeof(temp) / sizeof(uint32)); + # return "%08x" % netip + # NOTE: the network or big endian byte order + srcipaddr = ipaddress.ip_address(m.group('srcip')) + srcip32 = ctypes.c_uint32(int.from_bytes(srcipaddr.packed, byteorder='little', signed=False)) + dstipaddr = ipaddress.ip_address(m.group('dstip')) + dstip32 = ctypes.c_uint32(int.from_bytes(dstipaddr.packed, byteorder='little', signed=False)) + dstportbytestr = int(m.group('dstport')).to_bytes(2, byteorder='big', signed=False) + dstport16 = ctypes.c_uint16(int.from_bytes(dstportbytestr, byteorder='little', signed=False)) + srcportbytestr = int(m.group('srcport')).to_bytes(2, byteorder='big', signed=False) + srcport16 = ctypes.c_uint16(int.from_bytes(srcportbytestr, byteorder='little', signed=False)) + ports32 = ctypes.c_uint32((dstport16.value << 16) | srcport16.value) + if self._client.proto == 'UDP': + proto32 = ctypes.c_uint32(0x11) + else : + proto32 = ctypes.c_uint32(0x06) + quintuplehash = srcip32.value ^ dstip32.value ^ ports32.value ^ proto32.value + self.flowstats['flowid'] = '0x{:08x}'.format(quintuplehash) + if self._client.flow.name : + flowkey = self._client.flow.name + else : + flowkey = '0x{:08x}'.format(quintuplehash) + iperf_flow.flowid2name[self.flowstats['flowid']] = flowkey + logging.info('Flow quintuple hash of {} uses name {}'.format(self.flowstats['flowid'], flowkey)) + + if self._client.proto == 'TCP': + m = self._client.regex_traffic.match(line) + if m : + timestamp = datetime.now() + if not self._client.traffic_event.is_set() : + self._client.traffic_event.set() + + bytes = float(m.group('bytes')) + if self.flowstats['current_rxbytes'] : + flowrate = round((self.flowstats['current_rxbytes'] / bytes), 2) + # *consume* the current *rxbytes* where the server pipe will repopulate on its next sample + # do this by setting the value to None + self.flowstats['current_rxbytes'] = None + # logging.debug('{} flow ratio={:.2f}'.format(self._client.name, flowrate)) + self.flowstats['flowrate'] = flowrate + else : + # *produce* the current txbytes so the server pipe can know this event occurred + # indicate this by setting the value to value + self.flowstats['current_txbytes'] = bytes + + self.flowstats['txdatetime'].append(timestamp) + self.flowstats['txbytes'].append(m.group('bytes')) + self.flowstats['txthroughput'].append(m.group('throughput')) + self.flowstats['writes'].append(m.group('writes')) + self.flowstats['errwrites'].append(m.group('errwrites')) + self.flowstats['retry'].append(m.group('retry')) + self.flowstats['cwnd'].append(m.group('cwnd')) + self.flowstats['rtt'].append(m.group('rtt')) + else : + m = self._client.regex_connect_time.match(line) + if m : + self.flowstats['connect_time'].append(float(m.group('connect_time'))) + else : + pass + + elif fd == 2: + self._stderrbuffer += data + while "\n" in self._stderrbuffer: + line, self._stderrbuffer = self._stderrbuffer.split("\n", 1) + logging.info('{} {} (stderr)'.format(self._client.name, line)) + m = self._client.regex_tx_bind_failed.match(line) + if m : + logging.error('TX Bind Failed. Check LAN / WLAN between server and client.') + iperf_flow.loop.stop() + raise + + def pipe_connection_lost(self, fd, exc): + if fd == 1: + logging.debug('stdout pipe to {} closed (exception={})'.format(self._client.name, exc)) + self._closed_stdout = True + elif fd == 2: + logging.debug('stderr pipe to {} closed (exception={})'.format(self._client.name, exc)) + self._closed_stderr = True + self.signal_exit() + + def process_exited(self): + logging.debug('subprocess with pid={} closed'.format(self._mypid)) + self._exited = True + self._mypid = None + self.signal_exit() + + class CustomAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return '[%s] %s' % (self.extra['connid'], msg), kwargs + + def __init__(self, name='Client', loop=None, host='localhost', flow = None, debug=False): + self.__dict__['flow'] = flow + self.opened = asyncio.Event() + self.closed = asyncio.Event() + self.txcompleted = asyncio.Event() + self.closed.set() + self.txcompleted.clear() + self.traffic_event = asyncio.Event() + self.name = name + self.iperf = '/usr/local/bin/iperf' + self.ssh = '/usr/bin/ssh' + self.host = host + self.debug = debug + self.flow = flow + self._transport = None + self._protocol = None + conn_id = '{}'.format(self.name) + self.adapter = self.CustomAdapter(logger, {'connid': conn_id}) + # traffic ex: [ 3] 0.00-0.50 sec 655620 Bytes 10489920 bits/sec 14/211 446 446K/0 us + self.regex_traffic = re.compile(r'\[\s+\d+] (?P<timestamp>.*) sec\s+(?P<bytes>\d+) Bytes\s+(?P<throughput>\d+) bits/sec\s+(?P<writes>\d+)/(?P<errwrites>\d+)\s+(?P<retry>\d+)\s+(?P<cwnd>\d+)K/(?P<rtt>\d+) us') + self.regex_connect_time = re.compile(r'\[\s+\d+]\slocal.*\(ct=(?P<connect_time>\d+\.\d+) ms\)') + # local 192.168.1.4 port 56949 connected with 192.168.1.1 port 61001 + self.regex_flowid = re.compile(r'\[\s+\d+]\slocal\s(?P<srcip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}).*\sport\s(?P<srcport>[0-9]+)\sconnected with\s(?P<dstip>[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3}\.[0-9]{0,3})\sport\s(?P<dstport>[0-9]+)') + self.regex_tx_bind_failed = re.compile(r'bind failed: Cannot assign requested address') + + def __getattr__(self, attr): + return getattr(self.flow, attr) + + async def start(self, time=None, amount=None, parallel=None, epoch_sync_time=None): + if not self.closed.is_set() : + return + + self.opened.clear() + self.txcompleted.clear() + self.remotepid = None + self.flowstats['flowid']=None + + # Client connecting to 192.168.100.33, TCP port 61009 with pid 1903 + self.regex_open_pid = re.compile(r'Client connecting to .*, {} port {} with pid (?P<pid>\d+)'.format(self.proto, str(self.dstport))) + if self.client_device : + client_dst = self.dstip + '%' + self.client_device + else : + client_dst = self.dstip + self.sshcmd=[self.ssh, self.user + '@' + self.host, self.iperf, '-c', client_dst, '-p ' + str(self.dstport), '-e', '-f{}'.format(self.format), '-w' , self.window ,'--realtime'] + if self.tcp_tx_delay : + self.sshcmd.extend(['--tcp-tx-delay', self.tcp_tx_delay]) + if self.tos : + self.sshcmd.extend(['-S ', self.tos]) + if self.length : + self.sshcmd.extend(['-l ', str(self.length)]) + if time: + self.sshcmd.extend(['-t ', str(time)]) + elif amount: + iperftime = time + self.sshcmd.extend(['-n ', amount]) + if parallel : + self.sshcmd.extend(['-P', str(parallel)]) + if self.trip_times : + self.sshcmd.extend(['--trip-times']) + if self.prefetch : + self.sshcmd.extend(['--tcp-write-prefetch', self.prefetch]) + self.sshcmd.extend(['--histograms=1m,100000,5,95']) + + if self.srcip : + if self.srcport : + self.sshcmd.extend(['-B ', '{}:{}'.format(self.srcip, self.srcport)]) + else : + self.sshcmd.extend(['-B {}'.format(self.srcip)]) + + if self.cca : + self.sshcmd.extend(['-Z ', self.cca]) + if self.interval >= 0.005 : + self.sshcmd.extend(['-i ', str(self.interval)]) + + if self.proto == 'UDP' : + self.sshcmd.extend(['-u ']) + if self.isoch : + self.sshcmd.extend(['--isochronous=' + self.offered_load, ' --ipg ', str(self.ipg)]) + elif self.offered_load : + self.sshcmd.extend(['-b', self.offered_load]) + elif self.proto == 'TCP' and self.offered_load : + self.sshcmd.extend(['-b', self.offered_load]) + elif self.proto == 'TCP' and self.burst_size and self.burst_period : + self.sshcmd.extend(['--burst-size', str(self.burst_size)]) + self.sshcmd.extend(['--burst-period', str(self.burst_period)]) + elif self.proto == 'TCP' and self.bb : + self.sshcmd.extend(['--bounceback']) + self.sshcmd.extend(['--bounceback-hold', str(self.bb_hold)]) + self.sshcmd.extend(['--bounceback-period', str(self.bb_period)]) + elif self.proto == 'TCP' and self.offered_load : + self.sshcmd.extend(['-b', self.offered_load]) + if not self.bb and self.fullduplex : + self.sshcmd.extend(['--full-duplex', str(" ")]) + + if self.flow.bb : + self.sshcmd.extend(['--bounceback']) + if self.flow.working_load : + self.sshcmd.extend(['--working-load']) + + if epoch_sync_time : + self.sshcmd.extend(['--txstart-time', str(epoch_sync_time)]) + + elif self.txstart_delay_sec : + # use incoming txstart_delay_sec and convert it to epoch_time_sec to use with '--txstart-time' iperf parameter + logging.info('{}'.format(str(datetime.now()))) + epoch_time_sec = (datetime.now()).timestamp() + logging.info('Current epoch_time_sec = {}'.format(str(epoch_time_sec))) + new_txstart_time = epoch_time_sec + self.txstart_delay_sec + logging.info('new_txstart_time = {}'.format(str(new_txstart_time))) + self.sshcmd.extend(['--txstart-time', str(new_txstart_time)]) + + logging.info('{}'.format(str(self.sshcmd))) + try : + self._transport, self._protocol = await iperf_flow.loop.subprocess_exec(lambda: self.IperfClientProtocol(self, self.flow), *self.sshcmd) + await self.opened.wait() + except: + logging.error('flow client start error per: {}'.format(str(self.sshcmd))) + pass + + async def signal_stop(self): + if self.remotepid and not self.finished : + childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-HUP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug('({}) sending signal HUP to {} (pid={})'.format(self.user, self.host, self.remotepid)) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('{}({}) {}'.format(self.user, self.host, stdout)) + if not self.closed.is_set(): + await self.closed.wait() + + async def signal_pause(self): + if self.remotepid : + childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-STOP', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug('({}) sending signal STOP to {} (pid={})'.format(self.user, self.host, self.remotepid)) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('{}({}) {}'.format(self.user, self.host, stdout)) + if not self.closed.is_set(): + await self.closed.wait() + + async def signal_resume(self): + if self.remotepid : + childprocess = await asyncio.create_subprocess_exec(self.ssh, '{}@{}'.format(self.user, self.host), 'kill', '-CONT', '{}'.format(self.remotepid), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + logging.debug('({}) sending signal CONT to {} (pid={})'.format(self.user, self.host, self.remotepid)) + stdout, _ = await childprocess.communicate() + if stdout: + logging.info('{}({}) {}'.format(self.user, self.host, stdout)) + if not self.closed.is_set(): + await self.closed.wait() + +class flow_histogram(object): + + @classmethod + async def plot_two_sample_ks(cls, h1=None, h2=None, outputtype='png', directory='.', flowname=None, title=None): + + lci_val = int(h2.lci_val) * h2.binwidth + uci_val = int(h2.uci_val) * h2.binwidth + mytitle = '{} {} two sample KS({},{}) ({} samples) {}/{}%={}/{} us outliers={}\\n{}'.format(flowname, h1.name, h1.ks_index, h2.ks_index, h2.population, h2.lci, h2.uci, lci_val, uci_val, h2.outliers, title) + if h1.basefilename is None : + h1.output_dir = directory + '/' + flowname + h1.name + '/' + h1.name + '_' + str(h1.ks_index) + await h1.write(directory=h1.output_dir) + + if h2.basefilename is None : + h2.output_dir = directory + '/' + flowname + h2.name + '/' + h2.name + '_' + str(h2.ks_index) + await h2.write(directory=h2.output_dir) + + if (h1.basefilename is not None) and (h2.basefilename is not None) : + basefilename = '{}_{}_{}'.format(h1.basefilename, h1.ks_index, h2.ks_index) + gpcfilename = basefilename + '.gpc' + #write out the gnuplot control file + with open(gpcfilename, 'w') as fid : + if outputtype == 'canvas' : + fid.write('set output \"{}.{}\"\n'.format(basefilename, 'html')) + fid.write('set terminal canvas standalone mousing size 1024,768\n') + if outputtype == 'svg' : + fid.write('set output \"{}_svg.{}\"\n'.format(basefilename, 'html')) + fid.write('set terminal svg size 1024,768 dynamic mouse\n') + else : + fid.write('set output \"{}.{}\"\n'.format(basefilename, 'png')) + fid.write('set terminal png size 1024,768\n') + + fid.write('set key bottom\n') + fid.write('set title \"{}\" noenhanced\n'.format(mytitle)) + if float(uci_val) < 400: + fid.write('set format x \"%.2f"\n') + else : + fid.write('set format x \"%.1f"\n') + fid.write('set format y \"%.1f"\n') + fid.write('set yrange [0:1.01]\n') + fid.write('set y2range [0:*]\n') + fid.write('set ytics add 0.1\n') + fid.write('set y2tics nomirror\n') + fid.write('set grid\n') + fid.write('set xlabel \"time (ms)\\n{} - {}\"\n'.format(h1.starttime, h2.endtime)) + default_minx = -0.5 + if float(uci_val) < 0.4: + fid.write('set xrange [{}:0.4]\n'.format(default_minx)) + fid.write('set xtics auto\n') + elif h1.max < 2.0 and h2.max < 2.0 : + fid.write('set xrange [{}:2]\n'.format(default_minx)) + fid.write('set xtics auto\n') + elif h1.max < 5.0 and h2.max < 5.0 : + fid.write('set xrange [{}:5]\n'.format(default_minx)) + fid.write('set xtics auto\n') + elif h1.max < 10.0 and h2.max < 10.0: + fid.write('set xrange [{}:10]\n'.format(default_minx)) + fid.write('set xtics add 1\n') + elif h1.max < 20.0 and h2.max < 20.0 : + fid.write('set xrange [{}:20]\n'.format(default_minx)) + fid.write('set xtics add 1\n') + fid.write('set format x \"%.0f"\n') + elif h1.max < 40.0 and h2.max < 40.0: + fid.write('set xrange [{}:40]\n'.format(default_minx)) + fid.write('set xtics add 5\n') + fid.write('set format x \"%.0f"\n') + elif h1.max < 50.0 and h2.max < 50.0: + fid.write('set xrange [{}:50]\n'.format(default_minx)) + fid.write('set xtics add 5\n') + fid.write('set format x \"%.0f"\n') + elif h1.max < 75.0 and h2.max < 75.0: + fid.write('set xrange [{}:75]\n'.format(default_minx)) + fid.write('set xtics add 5\n') + fid.write('set format x \"%.0f"\n') + elif h1.max < 100.0 and h2.max < 100.0 : + fid.write('set xrange [{}:100]\n'.format(default_minx)) + fid.write('set xtics add 10\n') + fid.write('set format x \"%.0f"\n') + else : + fid.write('set xrange [{}:*]\n'.format(default_minx)) + fid.write('set xtics auto\n') + fid.write('set format x \"%.0f"\n') + fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{1}\" using 1:2 index 0 axes x1y2 with impulses linetype 2 notitle, \"{1}\" using 1:3 index 0 axes x1y1 with lines linetype 1 linewidth 2 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(h1.datafilename, h2.datafilename)) + + childprocess = await asyncio.create_subprocess_exec(flow_histogram.gnuplot,gpcfilename, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = await childprocess.communicate() + if stderr : + logging.error('Exec {} {}'.format(flow_histogram.gnuplot, gpcfilename)) + else : + logging.debug('Exec {} {}'.format(flow_histogram.gnuplot, gpcfilename)) + + gnuplot = '/usr/bin/gnuplot' + def __init__(self, binwidth=None, name=None, values=None, population=None, starttime=None, endtime=None, title=None, outliers=None, lci = None, uci = None, lci_val = None, uci_val = None) : + self.raw = values + self._entropy = None + self._ks_1samp_dist = None + self.bins = self.raw.split(',') + self.name = name + self.ks_index = None + self.population = int(population) + self.samples = np.zeros(int(self.population)) + self.binwidth = int(binwidth) + self.createtime = datetime.now(timezone.utc).astimezone() + self.starttime=starttime + self.endtime=endtime + self.title=title + self.outliers=outliers + self.uci = uci + self.uci_val = uci_val + self.lci = lci + self.lci_val = lci_val + self.basefilename = None + ix = 0 + for bin in self.bins : + x,y = bin.split(':') + for i in range(int(y)) : + self.samples[ix] = x + ix += 1 + + @property + def entropy(self) : + if not self._entropy : + self._entropy = 0 + for bin in self.bins : + x,y = bin.split(':') + y1 = float(y) / float(self.population) + self._entropy -= y1 * math.log2(y1) + return self._entropy + + @property + def ks_1samp_dist(self): + if not self._ks_1samp_dist : + self._ks_1samp_dist,p = stats.ks_1samp(self.samples, stats.norm.cdf) + return self._ks_1samp_dist + + @property + def ampdu_dump(self) : + return self._ampdu_rawdump + + @ampdu_dump.setter + def ampdu_dump(self, value): + self._ampdu_rawdump = value + + async def __exec_gnuplot(self) : + logging.info('Plotting {} {}'.format(self.name, self.gpcfilename)) + childprocess = await asyncio.create_subprocess_exec(flow_histogram.gnuplot, self.gpcfilename, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout, stderr = await childprocess.communicate() + if stderr : + logging.error('Exec {} {}'.format(flow_histogram.gnuplot, self.gpcfilename)) + else : + logging.debug('Exec {} {}'.format(flow_histogram.gnuplot, self.gpcfilename)) + + async def write(self, directory='.', filename=None) : + # write out the datafiles for the plotting tool, e.g. gnuplot + if filename is None: + filename = self.name + + if not os.path.exists(directory): + logging.debug('Making results directory {}'.format(directory)) + os.makedirs(directory) + + logging.debug('Writing {} results to directory {}'.format(directory, filename)) + basefilename = os.path.join(directory, filename) + datafilename = os.path.join(directory, filename + '.data') + self.max = None + with open(datafilename, 'w') as fid : + cummulative = 0.0 + for bin in self.bins : + x,y = bin.split(':') + #logging.debug('bin={} x={} y={}'.format(bin, x, y)) + if (float(y) > 1.0) or ((cummulative / float(self.population)) < 0.99) : + cummulative += float(y) + perc = cummulative / float(self.population) + self.max = float(x) * float(self.binwidth) / 1000.0 # max is the last value + fid.write('{} {} {}\n'.format((float(x) * float(self.binwidth) / 1000.0), int(y), perc)) + + self.basefilename = basefilename + self.datafilename = datafilename + + async def async_plot(self, title=None, directory='.', outputtype='png', filename=None) : + if self.basefilename is None : + await self.write(directory=directory, filename=filename) + + if self.basefilename is not None : + self.gpcfilename = self.basefilename + '.gpc' + #write out the gnuplot control file + with open(self.gpcfilename, 'w') as fid : + if outputtype == 'canvas' : + fid.write('set output \"{}.{}\"\n'.format(basefilename, 'html')) + fid.write('set terminal canvas standalone mousing size 1024,768\n') + if outputtype == 'svg' : + fid.write('set output \"{}_svg.{}\"\n'.format(basefilename, 'html')) + fid.write('set terminal svg size 1024,768 dynamic mouse\n') + else : + fid.write('set output \"{}.{}\"\n'.format(basefilename, 'png')) + fid.write('set terminal png size 1024,768\n') + + if not title and self.title : + title = self.title + + fid.write('set key bottom\n') + if self.ks_index is not None : + fid.write('set title \"{}({}) {}({}) E={}\" noenhanced\n'.format(self.name, str(self.ks_index), title, int(self.population), self.entropy)) + else : + fid.write('set title \"{}{}({}) E={}\" noenhanced\n'.format(self.name, title, int(self.population), self.entropy)) + fid.write('set format x \"%.0f"\n') + fid.write('set format y \"%.1f"\n') + fid.write('set yrange [0:1.01]\n') + fid.write('set y2range [0:*]\n') + fid.write('set ytics add 0.1\n') + fid.write('set y2tics nomirror\n') + fid.write('set grid\n') + fid.write('set xlabel \"time (ms)\\n{} - {}\"\n'.format(self.starttime, self.endtime)) + if self.max < 5.0 : + fid.write('set xrange [0:5]\n') + fid.write('set xtics auto\n') + elif self.max < 10.0 : + fid.write('set xrange [0:10]\n') + fid.write('set xtics add 1\n') + elif self.max < 20.0 : + fid.write('set xrange [0:20]\n') + fid.write('set xtics add 1\n') + elif self.max < 40.0 : + fid.write('set xrange [0:40]\n') + fid.write('set xtics add 5\n') + elif self.max < 50.0 : + fid.write('set xrange [0:50]\n') + fid.write('set xtics add 5\n') + elif self.max < 75.0 : + fid.write('set xrange [0:75]\n') + fid.write('set xtics add 5\n') + else : + fid.write('set xrange [0:100]\n') + fid.write('set xtics add 10\n') + fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(datafilename)) + + if outputtype == 'png' : + # Create a thumbnail too + fid.write('unset output; unset xtics; unset ytics; unset key; unset xlabel; unset ylabel; unset border; unset grid; unset yzeroaxis; unset xzeroaxis; unset title; set lmargin 0; set rmargin 0; set tmargin 0; set bmargin 0\n') + fid.write('set output \"{}_thumb.{}\"\n'.format(basefilename, 'png')) + fid.write('set terminal png transparent size 64,32 crop\n') + fid.write('plot \"{0}\" using 1:2 index 0 axes x1y2 with impulses linetype 3 notitle, \"{0}\" using 1:3 index 0 axes x1y1 with lines linetype -1 linewidth 2 notitle\n'.format(datafilename)) + + await self.__exec_gnuplot() |