summaryrefslogtreecommitdiffstats
path: root/powerline/segments/common/net.py
blob: b5d9062d64fc8b8c68a23005c9f4301fd8184d0c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, division, absolute_import, print_function)

import re
import os
import socket

from powerline.lib.url import urllib_read
from powerline.lib.threaded import ThreadedSegment, KwThreadedSegment
from powerline.lib.monotonic import monotonic
from powerline.lib.humanize_bytes import humanize_bytes
from powerline.segments import with_docstring
from powerline.theme import requires_segment_info


@requires_segment_info
def hostname(pl, segment_info, only_if_ssh=False, exclude_domain=False):
	'''Return the current hostname.

	:param bool only_if_ssh:
		only return the hostname if currently in an SSH session
	:param bool exclude_domain:
		return the hostname without domain if there is one
	'''
	if (
		segment_info['environ'].get('_POWERLINE_RUNNING_SHELL_TESTS')
		== 'ee5bcdc6-b749-11e7-9456-50465d597777'
	):
		return 'hostname'
	if only_if_ssh and not segment_info['environ'].get('SSH_CLIENT'):
		return None
	if exclude_domain:
		return socket.gethostname().split('.')[0]
	return socket.gethostname()


def _external_ip(query_url='http://ipv4.icanhazip.com/'):
	return urllib_read(query_url).strip()


class ExternalIpSegment(ThreadedSegment):
	interval = 300

	def set_state(self, query_url='http://ipv4.icanhazip.com/', **kwargs):
		self.query_url = query_url
		super(ExternalIpSegment, self).set_state(**kwargs)

	def update(self, old_ip):
		return _external_ip(query_url=self.query_url)

	def render(self, ip, **kwargs):
		if not ip:
			return None
		return [{'contents': ip, 'divider_highlight_group': 'background:divider'}]


external_ip = with_docstring(ExternalIpSegment(),
'''Return external IP address.

:param str query_url:
	URI to query for IP address, should return only the IP address as a text string

	Suggested URIs:

	* http://ipv4.icanhazip.com/
	* http://ipv6.icanhazip.com/
	* http://icanhazip.com/ (returns IPv6 address if available, else IPv4)

Divider highlight group used: ``background:divider``.
''')


try:
	import netifaces
except ImportError:
	def internal_ip(pl, interface='auto', ipv=4):
		return None
else:
	_interface_starts = {
		'eth':      10,  # Regular ethernet adapters         : eth1
		'enp':      10,  # Regular ethernet adapters, Gentoo : enp2s0
		'en':       10,  # OS X                              : en0 
		'ath':       9,  # Atheros WiFi adapters             : ath0
		'wlan':      9,  # Other WiFi adapters               : wlan1
		'wlp':       9,  # Other WiFi adapters, Gentoo       : wlp5s0
		'teredo':    1,  # miredo interface                  : teredo
		'lo':      -10,  # Loopback interface                : lo
		'docker':   -5,  # Docker bridge interface           : docker0
		'vmnet':    -5,  # VMWare bridge interface           : vmnet1
		'vboxnet':  -5,  # VirtualBox bridge interface       : vboxnet0
	}

	_interface_start_re = re.compile(r'^([a-z]+?)(\d|$)')

	def _interface_key(interface):
		match = _interface_start_re.match(interface)
		if match:
			try:
				base = _interface_starts[match.group(1)] * 100
			except KeyError:
				base = 500
			if match.group(2):
				return base - int(match.group(2))
			else:
				return base
		else:
			return 0

	def internal_ip(pl, interface='auto', ipv=4):
		family = netifaces.AF_INET6 if ipv == 6 else netifaces.AF_INET
		if interface == 'auto':
			try:
				interface = next(iter(sorted(netifaces.interfaces(), key=_interface_key, reverse=True)))
			except StopIteration:
				pl.info('No network interfaces found')
				return None
		elif interface == 'default_gateway':
			try:
				interface = netifaces.gateways()['default'][family][1]
			except KeyError:
				pl.info('No default gateway found for IPv{0}', ipv)
				return None
		addrs = netifaces.ifaddresses(interface)
		try:
			return addrs[family][0]['addr']
		except (KeyError, IndexError):
			pl.info("No IPv{0} address found for interface {1}", ipv, interface)
			return None


internal_ip = with_docstring(internal_ip,
'''Return internal IP address

Requires ``netifaces`` module to work properly.

:param str interface:
	Interface on which IP will be checked. Use ``auto`` to automatically 
	detect interface. In this case interfaces with lower numbers will be 
	preferred over interfaces with similar names. Order of preference based on 
	names:

	#. ``eth`` and ``enp`` followed by number or the end of string.
	#. ``ath``, ``wlan`` and ``wlp`` followed by number or the end of string.
	#. ``teredo`` followed by number or the end of string.
	#. Any other interface that is not ``lo*``.
	#. ``lo`` followed by number or the end of string.

	Use ``default_gateway`` to detect the interface based on the machine's
	`default gateway <https://en.wikipedia.org/wiki/Default_gateway>`_ (i.e.,
	the router to which it is connected).

:param int ipv:
	4 or 6 for ipv4 and ipv6 respectively, depending on which IP address you 
	need exactly.
''')


try:
	import psutil

	def _get_bytes(interface):
		try:
			io_counters = psutil.net_io_counters(pernic=True)
		except AttributeError:
			io_counters = psutil.network_io_counters(pernic=True)
		if_io = io_counters.get(interface)
		if not if_io:
			return None
		return if_io.bytes_recv, if_io.bytes_sent

	def _get_interfaces():
		try:
			io_counters = psutil.net_io_counters(pernic=True)
		except AttributeError:
			io_counters = psutil.network_io_counters(pernic=True)
		for interface, data in io_counters.items():
			if data:
				yield interface, data.bytes_recv, data.bytes_sent
except ImportError:
	def _get_bytes(interface):
		with open('/sys/class/net/{interface}/statistics/rx_bytes'.format(interface=interface), 'rb') as file_obj:
			rx = int(file_obj.read())
		with open('/sys/class/net/{interface}/statistics/tx_bytes'.format(interface=interface), 'rb') as file_obj:
			tx = int(file_obj.read())
		return (rx, tx)

	def _get_interfaces():
		for interface in os.listdir('/sys/class/net'):
			x = _get_bytes(interface)
			if x is not None:
				yield interface, x[0], x[1]


class NetworkLoadSegment(KwThreadedSegment):
	interfaces = {}
	replace_num_pat = re.compile(r'[a-zA-Z]+')

	@staticmethod
	def key(interface='auto', **kwargs):
		return interface

	def compute_state(self, interface):
		if interface == 'auto':
			proc_exists = getattr(self, 'proc_exists', None)
			if proc_exists is None:
				proc_exists = self.proc_exists = os.path.exists('/proc/net/route')
			if proc_exists:
				# Look for default interface in routing table
				with open('/proc/net/route', 'rb') as f:
					for line in f.readlines():
						parts = line.split()
						if len(parts) > 1:
							iface, destination = parts[:2]
							if not destination.replace(b'0', b''):
								interface = iface.decode('utf-8')
								break
			if interface == 'auto':
				# Choose interface with most total activity, excluding some
				# well known interface names
				interface, total = 'eth0', -1
				for name, rx, tx in _get_interfaces():
					base = self.replace_num_pat.match(name)
					if None in (base, rx, tx) or base.group() in ('lo', 'vmnet', 'sit'):
						continue
					activity = rx + tx
					if activity > total:
						total = activity
						interface = name

		try:
			idata = self.interfaces[interface]
			try:
				idata['prev'] = idata['last']
			except KeyError:
				pass
		except KeyError:
			idata = {}
			if self.run_once:
				idata['prev'] = (monotonic(), _get_bytes(interface))
				self.shutdown_event.wait(self.interval)
			self.interfaces[interface] = idata

		idata['last'] = (monotonic(), _get_bytes(interface))
		return idata.copy()

	def render_one(self, idata, recv_format='DL {value:>8}', sent_format='UL {value:>8}', suffix='B/s', si_prefix=False, **kwargs):
		if not idata or 'prev' not in idata:
			return None

		t1, b1 = idata['prev']
		t2, b2 = idata['last']
		measure_interval = t2 - t1

		if None in (b1, b2):
			return None

		r = []
		for i, key in zip((0, 1), ('recv', 'sent')):
			format = locals()[key + '_format']
			try:
				value = (b2[i] - b1[i]) / measure_interval
			except ZeroDivisionError:
				self.warn('Measure interval zero.')
				value = 0
			max_key = key + '_max'
			is_gradient = max_key in kwargs
			hl_groups = ['network_load_' + key, 'network_load']
			if is_gradient:
				hl_groups[:0] = (group + '_gradient' for group in hl_groups)
			r.append({
				'contents': format.format(value=humanize_bytes(value, suffix, si_prefix)),
				'divider_highlight_group': 'network_load:divider',
				'highlight_groups': hl_groups,
			})
			if is_gradient:
				max = kwargs[max_key]
				if value >= max:
					r[-1]['gradient_level'] = 100
				else:
					r[-1]['gradient_level'] = value * 100.0 / max

		return r


network_load = with_docstring(NetworkLoadSegment(),
'''Return the network load.

Uses the ``psutil`` module if available for multi-platform compatibility,
falls back to reading
:file:`/sys/class/net/{interface}/statistics/{rx,tx}_bytes`.

:param str interface:
	Network interface to measure (use the special value "auto" to have powerline 
	try to auto-detect the network interface).
:param str suffix:
	String appended to each load string.
:param bool si_prefix:
	Use SI prefix, e.g. MB instead of MiB.
:param str recv_format:
	Format string that determines how download speed should look like. Receives 
	``value`` as argument.
:param str sent_format:
	Format string that determines how upload speed should look like. Receives 
	``value`` as argument.
:param float recv_max:
	Maximum number of received bytes per second. Is only used to compute
	gradient level.
:param float sent_max:
	Maximum number of sent bytes per second. Is only used to compute gradient
	level.

Divider highlight group used: ``network_load:divider``.

Highlight groups used: ``network_load_sent_gradient`` (gradient) or ``network_load_recv_gradient`` (gradient) or ``network_load_gradient`` (gradient), ``network_load_sent`` or ``network_load_recv`` or ``network_load``.
''')