summaryrefslogtreecommitdiffstats
path: root/powerline/segments/common/net.py
diff options
context:
space:
mode:
Diffstat (limited to 'powerline/segments/common/net.py')
-rw-r--r--powerline/segments/common/net.py315
1 files changed, 315 insertions, 0 deletions
diff --git a/powerline/segments/common/net.py b/powerline/segments/common/net.py
new file mode 100644
index 0000000..b5d9062
--- /dev/null
+++ b/powerline/segments/common/net.py
@@ -0,0 +1,315 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import re
+import os
+import socket
+
+from powerline.lib.url import urllib_read
+from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
+from powerline.lib.monotonic import monotonic
+from powerline.lib.humanize_bytes import humanize_bytes
+from powerline.segments import with_docstring
+from powerline.theme import requires_segment_info
+
+
+@requires_segment_info
+def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False):
+ '''Return the current hostname.
+
+ :param bool only_if_ssh:
+ only return the hostname if currently in an SSH session
+ :param bool exclude_domain:
+ return the hostname without domain if there is one
+ '''
+ if (
+ segment_info['environ'].get('_POWERLINE_RUNNING_SHELL_TESTS')
+ == 'ee5bcdc6-b749-11e7-9456-50465d597777'
+ ):
+ return 'hostname'
+ if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'):
+ return None
+ if exclude_domain:
+ return socket.gethostname().split('.')[0]
+ return socket.gethostname()
+
+
+def _external_ip(query_url='http://ipv4.icanhazip.com/'):
+ return urllib_read(query_url).strip()
+
+
+class ExternalIpSegment(ThreadedSegment):
+ interval = 300
+
+ def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs):
+ self.query_url = query_url
+ super(ExternalIpSegment, self).set_state(**kwargs)
+
+ def update(self, old_ip):
+ return _external_ip(query_url=self.query_url)
+
+ def render(self, ip, **kwargs):
+ if not ip:
+ return None
+ return [{'contents': ip, 'divider_highlight_group': 'background:divider'}]
+
+
+external_ip = with_docstring(ExternalIpSegment(),
+'''Return external IP address.
+
+:param str query_url:
+ URI to query for IP address, should return only the IP address as a text string
+
+ Suggested URIs:
+
+ * http://ipv4.icanhazip.com/
+ * http://ipv6.icanhazip.com/
+ * http://icanhazip.com/ (returns IPv6 address if available, else IPv4)
+
+Divider highlight group used: ``background:divider``.
+''')
+
+
+try:
+ import netifaces
+except ImportError:
+ def internal_ip(pl, interface='auto', ipv=4):
+ return None
+else:
+ _interface_starts = {
+ 'eth': 10, # Regular ethernet adapters : eth1
+ 'enp': 10, # Regular ethernet adapters, Gentoo : enp2s0
+ 'en': 10, # OS X : en0
+ 'ath': 9, # Atheros WiFi adapters : ath0
+ 'wlan': 9, # Other WiFi adapters : wlan1
+ 'wlp': 9, # Other WiFi adapters, Gentoo : wlp5s0
+ 'teredo': 1, # miredo interface : teredo
+ 'lo': -10, # Loopback interface : lo
+ 'docker': -5, # Docker bridge interface : docker0
+ 'vmnet': -5, # VMWare bridge interface : vmnet1
+ 'vboxnet': -5, # VirtualBox bridge interface : vboxnet0
+ }
+
+ _interface_start_re = re.compile(r'^([a-z]+?)(\d|$)')
+
+ def _interface_key(interface):
+ match = _interface_start_re.match(interface)
+ if match:
+ try:
+ base = _interface_starts[match.group(1)] * 100
+ except KeyError:
+ base = 500
+ if match.group(2):
+ return base - int(match.group(2))
+ else:
+ return base
+ else:
+ return 0
+
+ def internal_ip(pl, interface='auto', ipv=4):
+ family = netifaces.AF_INET6 if ipv == 6 else netifaces.AF_INET
+ if interface == 'auto':
+ try:
+ interface = next(iter(sorted(netifaces.interfaces(), key=_interface_key, reverse=True)))
+ except StopIteration:
+ pl.info('No network interfaces found')
+ return None
+ elif interface == 'default_gateway':
+ try:
+ interface = netifaces.gateways()['default'][family][1]
+ except KeyError:
+ pl.info('No default gateway found for IPv{0}', ipv)
+ return None
+ addrs = netifaces.ifaddresses(interface)
+ try:
+ return addrs[family][0]['addr']
+ except (KeyError, IndexError):
+ pl.info("No IPv{0} address found for interface {1}", ipv, interface)
+ return None
+
+
+internal_ip = with_docstring(internal_ip,
+'''Return internal IP address
+
+Requires ``netifaces`` module to work properly.
+
+:param str interface:
+ Interface on which IP will be checked. Use ``auto`` to automatically
+ detect interface. In this case interfaces with lower numbers will be
+ preferred over interfaces with similar names. Order of preference based on
+ names:
+
+ #. ``eth`` and ``enp`` followed by number or the end of string.
+ #. ``ath``, ``wlan`` and ``wlp`` followed by number or the end of string.
+ #. ``teredo`` followed by number or the end of string.
+ #. Any other interface that is not ``lo*``.
+ #. ``lo`` followed by number or the end of string.
+
+ Use ``default_gateway`` to detect the interface based on the machine's
+ `default gateway <https://en.wikipedia.org/wiki/Default_gateway>`_ (i.e.,
+ the router to which it is connected).
+
+:param int ipv:
+ 4 or 6 for ipv4 and ipv6 respectively, depending on which IP address you
+ need exactly.
+''')
+
+
+try:
+ import psutil
+
+ def _get_bytes(interface):
+ try:
+ io_counters = psutil.net_io_counters(pernic=True)
+ except AttributeError:
+ io_counters = psutil.network_io_counters(pernic=True)
+ if_io = io_counters.get(interface)
+ if not if_io:
+ return None
+ return if_io.bytes_recv, if_io.bytes_sent
+
+ def _get_interfaces():
+ try:
+ io_counters = psutil.net_io_counters(pernic=True)
+ except AttributeError:
+ io_counters = psutil.network_io_counters(pernic=True)
+ for interface, data in io_counters.items():
+ if data:
+ yield interface, data.bytes_recv, data.bytes_sent
+except ImportError:
+ def _get_bytes(interface):
+ with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
+ rx = int(file_obj.read())
+ with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
+ tx = int(file_obj.read())
+ return (rx, tx)
+
+ def _get_interfaces():
+ for interface in os.listdir('/sys/class/net'):
+ x = _get_bytes(interface)
+ if x is not None:
+ yield interface, x[0], x[1]
+
+
+class NetworkLoadSegment(KwThreadedSegment):
+ interfaces = {}
+ replace_num_pat = re.compile(r'[a-zA-Z]+')
+
+ @staticmethod
+ def key(interface='auto', **kwargs):
+ return interface
+
+ def compute_state(self, interface):
+ if interface == 'auto':
+ proc_exists = getattr(self, 'proc_exists', None)
+ if proc_exists is None:
+ proc_exists = self.proc_exists = os.path.exists('/proc/net/route')
+ if proc_exists:
+ # Look for default interface in routing table
+ with open('/proc/net/route', 'rb') as f:
+ for line in f.readlines():
+ parts = line.split()
+ if len(parts) > 1:
+ iface, destination = parts[:2]
+ if not destination.replace(b'0', b''):
+ interface = iface.decode('utf-8')
+ break
+ if interface == 'auto':
+ # Choose interface with most total activity, excluding some
+ # well known interface names
+ interface, total = 'eth0', -1
+ for name, rx, tx in _get_interfaces():
+ base = self.replace_num_pat.match(name)
+ if None in (base, rx, tx) or base.group() in ('lo', 'vmnet', 'sit'):
+ continue
+ activity = rx + tx
+ if activity > total:
+ total = activity
+ interface = name
+
+ try:
+ idata = self.interfaces[interface]
+ try:
+ idata['prev'] = idata['last']
+ except KeyError:
+ pass
+ except KeyError:
+ idata = {}
+ if self.run_once:
+ idata['prev'] = (monotonic(), _get_bytes(interface))
+ self.shutdown_event.wait(self.interval)
+ self.interfaces[interface] = idata
+
+ idata['last'] = (monotonic(), _get_bytes(interface))
+ return idata.copy()
+
+ def render_one(self, idata, recv_format='DL {value:>8}', sent_format='UL {value:>8}', suffix='B/s', si_prefix=False, **kwargs):
+ if not idata or 'prev' not in idata:
+ return None
+
+ t1, b1 = idata['prev']
+ t2, b2 = idata['last']
+ measure_interval = t2 - t1
+
+ if None in (b1, b2):
+ return None
+
+ r = []
+ for i, key in zip((0, 1), ('recv', 'sent')):
+ format = locals()[key + '_format']
+ try:
+ value = (b2[i] - b1[i]) / measure_interval
+ except ZeroDivisionError:
+ self.warn('Measure interval zero.')
+ value = 0
+ max_key = key + '_max'
+ is_gradient = max_key in kwargs
+ hl_groups = ['network_load_' + key, 'network_load']
+ if is_gradient:
+ hl_groups[:0] = (group + '_gradient' for group in hl_groups)
+ r.append({
+ 'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)),
+ 'divider_highlight_group': 'network_load:divider',
+ 'highlight_groups': hl_groups,
+ })
+ if is_gradient:
+ max = kwargs[max_key]
+ if value >= max:
+ r[-1]['gradient_level'] = 100
+ else:
+ r[-1]['gradient_level'] = value * 100.0 / max
+
+ return r
+
+
+network_load = with_docstring(NetworkLoadSegment(),
+'''Return the network load.
+
+Uses the ``psutil`` module if available for multi-platform compatibility,
+falls back to reading
+:file:`/sys/class/net/{interface}/statistics/{rx,tx}_bytes`.
+
+:param str interface:
+ Network interface to measure (use the special value "auto" to have powerline
+ try to auto-detect the network interface).
+:param str suffix:
+ String appended to each load string.
+:param bool si_prefix:
+ Use SI prefix, e.g. MB instead of MiB.
+:param str recv_format:
+ Format string that determines how download speed should look like. Receives
+ ``value`` as argument.
+:param str sent_format:
+ Format string that determines how upload speed should look like. Receives
+ ``value`` as argument.
+:param float recv_max:
+ Maximum number of received bytes per second. Is only used to compute
+ gradient level.
+:param float sent_max:
+ Maximum number of sent bytes per second. Is only used to compute gradient
+ level.
+
+Divider highlight group used: ``network_load:divider``.
+
+Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_load_recv_gradient`` (gradient) or ``network_load_gradient`` (gradient), ``network_load_sent`` or ``network_load_recv`` or ``network_load``.
+''')