summaryrefslogtreecommitdiffstats
path: root/powerline/lib/watcher
diff options
context:
space:
mode:
Diffstat (limited to 'powerline/lib/watcher')
-rw-r--r--powerline/lib/watcher/__init__.py76
-rw-r--r--powerline/lib/watcher/inotify.py268
-rw-r--r--powerline/lib/watcher/stat.py44
-rw-r--r--powerline/lib/watcher/tree.py90
-rw-r--r--powerline/lib/watcher/uv.py207
5 files changed, 685 insertions, 0 deletions
diff --git a/powerline/lib/watcher/__init__.py b/powerline/lib/watcher/__init__.py
new file mode 100644
index 0000000..4fe9896
--- /dev/null
+++ b/powerline/lib/watcher/__init__.py
@@ -0,0 +1,76 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import sys
+
+from powerline.lib.watcher.stat import StatFileWatcher
+from powerline.lib.watcher.inotify import INotifyFileWatcher
+from powerline.lib.watcher.tree import TreeWatcher
+from powerline.lib.watcher.uv import UvFileWatcher, UvNotFound
+from powerline.lib.inotify import INotifyError
+
+
+def create_file_watcher(pl, watcher_type='auto', expire_time=10):
+ '''Create an object that can watch for changes to specified files
+
+ Use ``.__call__()`` method of the returned object to start watching the file
+ or check whether file has changed since last call.
+
+ Use ``.unwatch()`` method of the returned object to stop watching the file.
+
+ Uses inotify if available, then pyuv, otherwise tracks mtimes. expire_time
+ is the number of minutes after the last query for a given path for the
+ inotify watch for that path to be automatically removed. This conserves
+ kernel resources.
+
+ :param PowerlineLogger pl:
+ Logger.
+ :param str watcher_type
+ One of ``inotify`` (linux only), ``uv``, ``stat``, ``auto``. Determines
+ what watcher will be used. ``auto`` will use ``inotify`` if available,
+ then ``libuv`` and then fall back to ``stat``.
+ :param int expire_time:
+ Number of minutes since last ``.__call__()`` before inotify watcher will
+ stop watching given file.
+ '''
+ if watcher_type == 'stat':
+ pl.debug('Using requested stat-based watcher', prefix='watcher')
+ return StatFileWatcher()
+ if watcher_type == 'inotify':
+ # Explicitly selected inotify watcher: do not catch INotifyError then.
+ pl.debug('Using requested inotify watcher', prefix='watcher')
+ return INotifyFileWatcher(expire_time=expire_time)
+ elif watcher_type == 'uv':
+ pl.debug('Using requested uv watcher', prefix='watcher')
+ return UvFileWatcher()
+
+ if sys.platform.startswith('linux'):
+ try:
+ pl.debug('Trying to use inotify watcher', prefix='watcher')
+ return INotifyFileWatcher(expire_time=expire_time)
+ except INotifyError:
+ pl.info('Failed to create inotify watcher', prefix='watcher')
+
+ try:
+ pl.debug('Using libuv-based watcher')
+ return UvFileWatcher()
+ except UvNotFound:
+ pl.debug('Failed to import pyuv')
+
+ pl.debug('Using stat-based watcher')
+ return StatFileWatcher()
+
+
+def create_tree_watcher(pl, watcher_type='auto', expire_time=10):
+ '''Create an object that can watch for changes in specified directories
+
+ :param PowerlineLogger pl:
+ Logger.
+ :param str watcher_type:
+ Watcher type. Currently the only supported types are ``inotify`` (linux
+ only), ``uv``, ``dummy`` and ``auto``.
+ :param int expire_time:
+ Number of minutes since last ``.__call__()`` before inotify watcher will
+ stop watching given file.
+ '''
+ return TreeWatcher(pl, watcher_type, expire_time)
diff --git a/powerline/lib/watcher/inotify.py b/powerline/lib/watcher/inotify.py
new file mode 100644
index 0000000..c4f1200
--- /dev/null
+++ b/powerline/lib/watcher/inotify.py
@@ -0,0 +1,268 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import errno
+import os
+import ctypes
+
+from threading import RLock
+
+from powerline.lib.inotify import INotify
+from powerline.lib.monotonic import monotonic
+from powerline.lib.path import realpath
+
+
+class INotifyFileWatcher(INotify):
+ def __init__(self, expire_time=10):
+ super(INotifyFileWatcher, self).__init__()
+ self.watches = {}
+ self.modified = {}
+ self.last_query = {}
+ self.lock = RLock()
+ self.expire_time = expire_time * 60
+
+ def expire_watches(self):
+ now = monotonic()
+ for path, last_query in tuple(self.last_query.items()):
+ if last_query - now > self.expire_time:
+ self.unwatch(path)
+
+ def process_event(self, wd, mask, cookie, name):
+ if wd == -1 and (mask & self.Q_OVERFLOW):
+ # We missed some INOTIFY events, so we don't
+ # know the state of any tracked files.
+ for path in tuple(self.modified):
+ if os.path.exists(path):
+ self.modified[path] = True
+ else:
+ self.watches.pop(path, None)
+ self.modified.pop(path, None)
+ self.last_query.pop(path, None)
+ return
+
+ for path, num in tuple(self.watches.items()):
+ if num == wd:
+ if mask & self.IGNORED:
+ self.watches.pop(path, None)
+ self.modified.pop(path, None)
+ self.last_query.pop(path, None)
+ else:
+ if mask & self.ATTRIB:
+ # The watched file could have had its inode changed, in
+ # which case we will not get any more events for this
+ # file, so re-register the watch. For example by some
+ # other file being renamed as this file.
+ try:
+ self.unwatch(path)
+ except OSError:
+ pass
+ try:
+ self.watch(path)
+ except OSError as e:
+ if getattr(e, 'errno', None) != errno.ENOENT:
+ raise
+ else:
+ self.modified[path] = True
+ else:
+ self.modified[path] = True
+
+ def unwatch(self, path):
+ ''' Remove the watch for path. Raises an OSError if removing the watch
+ fails for some reason. '''
+ path = realpath(path)
+ with self.lock:
+ self.modified.pop(path, None)
+ self.last_query.pop(path, None)
+ wd = self.watches.pop(path, None)
+ if wd is not None:
+ if self._rm_watch(self._inotify_fd, wd) != 0:
+ self.handle_error()
+
+ def watch(self, path):
+ ''' Register a watch for the file/directory named path. Raises an OSError if path
+ does not exist. '''
+ path = realpath(path)
+ with self.lock:
+ if path not in self.watches:
+ bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
+ flags = self.MOVE_SELF | self.DELETE_SELF
+ buf = ctypes.c_char_p(bpath)
+ # Try watching path as a directory
+ wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR)
+ if wd == -1:
+ eno = ctypes.get_errno()
+ if eno != errno.ENOTDIR:
+ self.handle_error()
+ # Try watching path as a file
+ flags |= (self.MODIFY | self.ATTRIB)
+ wd = self._add_watch(self._inotify_fd, buf, flags)
+ if wd == -1:
+ self.handle_error()
+ self.watches[path] = wd
+ self.modified[path] = False
+
+ def is_watching(self, path):
+ with self.lock:
+ return realpath(path) in self.watches
+
+ def __call__(self, path):
+ ''' Return True if path has been modified since the last call. Can
+ raise OSError if the path does not exist. '''
+ path = realpath(path)
+ with self.lock:
+ self.last_query[path] = monotonic()
+ self.expire_watches()
+ if path not in self.watches:
+ # Try to re-add the watch, it will fail if the file does not
+ # exist/you don't have permission
+ self.watch(path)
+ return True
+ self.read(get_name=False)
+ if path not in self.modified:
+ # An ignored event was received which means the path has been
+ # automatically unwatched
+ return True
+ ans = self.modified[path]
+ if ans:
+ self.modified[path] = False
+ return ans
+
+ def close(self):
+ with self.lock:
+ for path in tuple(self.watches):
+ try:
+ self.unwatch(path)
+ except OSError:
+ pass
+ super(INotifyFileWatcher, self).close()
+
+
+class NoSuchDir(ValueError):
+ pass
+
+
+class BaseDirChanged(ValueError):
+ pass
+
+
+class DirTooLarge(ValueError):
+ def __init__(self, bdir):
+ ValueError.__init__(self, 'The directory {0} is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'.format(bdir))
+
+
+class INotifyTreeWatcher(INotify):
+ is_dummy = False
+
+ def __init__(self, basedir, ignore_event=None):
+ super(INotifyTreeWatcher, self).__init__()
+ self.basedir = realpath(basedir)
+ self.watch_tree()
+ self.modified = True
+ self.ignore_event = (lambda path, name: False) if ignore_event is None else ignore_event
+
+ def watch_tree(self):
+ self.watched_dirs = {}
+ self.watched_rmap = {}
+ try:
+ self.add_watches(self.basedir)
+ except OSError as e:
+ if e.errno == errno.ENOSPC:
+ raise DirTooLarge(self.basedir)
+
+ def add_watches(self, base, top_level=True):
+ ''' Add watches for this directory and all its descendant directories,
+ recursively. '''
+ base = realpath(base)
+ # There may exist a link which leads to an endless
+ # add_watches loop or to maximum recursion depth exceeded
+ if not top_level and base in self.watched_dirs:
+ return
+ try:
+ is_dir = self.add_watch(base)
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ # The entry could have been deleted between listdir() and
+ # add_watch().
+ if top_level:
+ raise NoSuchDir('The dir {0} does not exist'.format(base))
+ return
+ if e.errno == errno.EACCES:
+ # We silently ignore entries for which we don't have permission,
+ # unless they are the top level dir
+ if top_level:
+ raise NoSuchDir('You do not have permission to monitor {0}'.format(base))
+ return
+ raise
+ else:
+ if is_dir:
+ try:
+ files = os.listdir(base)
+ except OSError as e:
+ if e.errno in (errno.ENOTDIR, errno.ENOENT):
+ # The dir was deleted/replaced between the add_watch()
+ # and listdir()
+ if top_level:
+ raise NoSuchDir('The dir {0} does not exist'.format(base))
+ return
+ raise
+ for x in files:
+ self.add_watches(os.path.join(base, x), top_level=False)
+ elif top_level:
+ # The top level dir is a file, not good.
+ raise NoSuchDir('The dir {0} does not exist'.format(base))
+
+ def add_watch(self, path):
+ bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
+ wd = self._add_watch(
+ self._inotify_fd,
+ ctypes.c_char_p(bpath),
+
+ # Ignore symlinks and watch only directories
+ self.DONT_FOLLOW | self.ONLYDIR |
+
+ self.MODIFY | self.CREATE | self.DELETE |
+ self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO |
+ self.ATTRIB | self.DELETE_SELF
+ )
+ if wd == -1:
+ eno = ctypes.get_errno()
+ if eno == errno.ENOTDIR:
+ return False
+ raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno)))
+ self.watched_dirs[path] = wd
+ self.watched_rmap[wd] = path
+ return True
+
+ def process_event(self, wd, mask, cookie, name):
+ if wd == -1 and (mask & self.Q_OVERFLOW):
+ # We missed some INOTIFY events, so we don't
+ # know the state of any tracked dirs.
+ self.watch_tree()
+ self.modified = True
+ return
+ path = self.watched_rmap.get(wd, None)
+ if path is not None:
+ if not self.ignore_event(path, name):
+ self.modified = True
+ if mask & self.CREATE:
+ # A new sub-directory might have been created, monitor it.
+ try:
+ if not isinstance(path, bytes):
+ name = name.decode(self.fenc)
+ self.add_watch(os.path.join(path, name))
+ except OSError as e:
+ if e.errno == errno.ENOENT:
+ # Deleted before add_watch()
+ pass
+ elif e.errno == errno.ENOSPC:
+ raise DirTooLarge(self.basedir)
+ else:
+ raise
+ if (mask & self.DELETE_SELF or mask & self.MOVE_SELF) and path == self.basedir:
+ raise BaseDirChanged('The directory %s was moved/deleted' % path)
+
+ def __call__(self):
+ self.read()
+ ret = self.modified
+ self.modified = False
+ return ret
diff --git a/powerline/lib/watcher/stat.py b/powerline/lib/watcher/stat.py
new file mode 100644
index 0000000..0c08971
--- /dev/null
+++ b/powerline/lib/watcher/stat.py
@@ -0,0 +1,44 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+
+from threading import RLock
+
+from powerline.lib.path import realpath
+
+
+class StatFileWatcher(object):
+ def __init__(self):
+ self.watches = {}
+ self.lock = RLock()
+
+ def watch(self, path):
+ path = realpath(path)
+ with self.lock:
+ self.watches[path] = os.path.getmtime(path)
+
+ def unwatch(self, path):
+ path = realpath(path)
+ with self.lock:
+ self.watches.pop(path, None)
+
+ def is_watching(self, path):
+ with self.lock:
+ return realpath(path) in self.watches
+
+ def __call__(self, path):
+ path = realpath(path)
+ with self.lock:
+ if path not in self.watches:
+ self.watches[path] = os.path.getmtime(path)
+ return True
+ mtime = os.path.getmtime(path)
+ if mtime != self.watches[path]:
+ self.watches[path] = mtime
+ return True
+ return False
+
+ def close(self):
+ with self.lock:
+ self.watches.clear()
diff --git a/powerline/lib/watcher/tree.py b/powerline/lib/watcher/tree.py
new file mode 100644
index 0000000..7d2b83f
--- /dev/null
+++ b/powerline/lib/watcher/tree.py
@@ -0,0 +1,90 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import sys
+
+from powerline.lib.monotonic import monotonic
+from powerline.lib.inotify import INotifyError
+from powerline.lib.path import realpath
+from powerline.lib.watcher.inotify import INotifyTreeWatcher, DirTooLarge, NoSuchDir, BaseDirChanged
+from powerline.lib.watcher.uv import UvTreeWatcher, UvNotFound
+
+
+class DummyTreeWatcher(object):
+ is_dummy = True
+
+ def __init__(self, basedir):
+ self.basedir = realpath(basedir)
+
+ def __call__(self):
+ return False
+
+
+class TreeWatcher(object):
+ def __init__(self, pl, watcher_type, expire_time):
+ self.watches = {}
+ self.last_query_times = {}
+ self.expire_time = expire_time * 60
+ self.pl = pl
+ self.watcher_type = watcher_type
+
+ def get_watcher(self, path, ignore_event):
+ if self.watcher_type == 'inotify':
+ return INotifyTreeWatcher(path, ignore_event=ignore_event)
+ if self.watcher_type == 'uv':
+ return UvTreeWatcher(path, ignore_event=ignore_event)
+ if self.watcher_type == 'dummy':
+ return DummyTreeWatcher(path)
+ # FIXME
+ if self.watcher_type == 'stat':
+ return DummyTreeWatcher(path)
+ if self.watcher_type == 'auto':
+ if sys.platform.startswith('linux'):
+ try:
+ return INotifyTreeWatcher(path, ignore_event=ignore_event)
+ except (INotifyError, DirTooLarge) as e:
+ if not isinstance(e, INotifyError):
+ self.pl.warn('Failed to watch path: {0} with error: {1}'.format(path, e))
+ try:
+ return UvTreeWatcher(path, ignore_event=ignore_event)
+ except UvNotFound:
+ pass
+ return DummyTreeWatcher(path)
+ else:
+ raise ValueError('Unknown watcher type: {0}'.format(self.watcher_type))
+
+ def watch(self, path, ignore_event=None):
+ path = realpath(path)
+ w = self.get_watcher(path, ignore_event)
+ self.watches[path] = w
+ return w
+
+ def expire_old_queries(self):
+ pop = []
+ now = monotonic()
+ for path, lt in self.last_query_times.items():
+ if now - lt > self.expire_time:
+ pop.append(path)
+ for path in pop:
+ del self.last_query_times[path]
+
+ def __call__(self, path, ignore_event=None):
+ path = realpath(path)
+ self.expire_old_queries()
+ self.last_query_times[path] = monotonic()
+ w = self.watches.get(path, None)
+ if w is None:
+ try:
+ self.watch(path, ignore_event=ignore_event)
+ except NoSuchDir:
+ pass
+ return True
+ try:
+ return w()
+ except BaseDirChanged:
+ self.watches.pop(path, None)
+ return True
+ except DirTooLarge as e:
+ self.pl.warn(str(e))
+ self.watches[path] = DummyTreeWatcher(path)
+ return False
diff --git a/powerline/lib/watcher/uv.py b/powerline/lib/watcher/uv.py
new file mode 100644
index 0000000..272db0f
--- /dev/null
+++ b/powerline/lib/watcher/uv.py
@@ -0,0 +1,207 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import os
+
+from collections import defaultdict
+from threading import RLock
+from functools import partial
+from threading import Thread
+from errno import ENOENT
+
+from powerline.lib.path import realpath
+from powerline.lib.encoding import get_preferred_file_name_encoding
+
+
+class UvNotFound(NotImplementedError):
+ pass
+
+
+pyuv = None
+pyuv_version_info = None
+
+
+def import_pyuv():
+ global pyuv
+ global pyuv_version_info
+ if not pyuv:
+ try:
+ pyuv = __import__('pyuv')
+ except ImportError:
+ raise UvNotFound
+ else:
+ pyuv_version_info = tuple((int(c) for c in pyuv.__version__.split('.')))
+
+
+class UvThread(Thread):
+ daemon = True
+
+ def __init__(self, loop):
+ self.uv_loop = loop
+ self.async_handle = pyuv.Async(loop, self._async_cb)
+ super(UvThread, self).__init__()
+
+ def _async_cb(self, handle):
+ self.uv_loop.stop()
+ self.async_handle.close()
+
+ def run(self):
+ self.uv_loop.run()
+
+ def join(self):
+ self.async_handle.send()
+ return super(UvThread, self).join()
+
+
+_uv_thread = None
+
+
+def start_uv_thread():
+ global _uv_thread
+ if _uv_thread is None:
+ loop = pyuv.Loop()
+ _uv_thread = UvThread(loop)
+ _uv_thread.start()
+ return _uv_thread.uv_loop
+
+
+def normpath(path, fenc):
+ path = realpath(path)
+ if isinstance(path, bytes):
+ return path.decode(fenc)
+ else:
+ return path
+
+
+class UvWatcher(object):
+ def __init__(self):
+ import_pyuv()
+ self.watches = {}
+ self.lock = RLock()
+ self.loop = start_uv_thread()
+ self.fenc = get_preferred_file_name_encoding()
+ if pyuv_version_info >= (1, 0):
+ self._start_watch = self._start_watch_1_x
+ else:
+ self._start_watch = self._start_watch_0_x
+
+ def _start_watch_1_x(self, path):
+ handle = pyuv.fs.FSEvent(self.loop)
+ handle.start(path, 0, partial(self._record_event, path))
+ self.watches[path] = handle
+
+ def _start_watch_0_x(self, path):
+ self.watches[path] = pyuv.fs.FSEvent(
+ self.loop,
+ path,
+ partial(self._record_event, path),
+ pyuv.fs.UV_CHANGE | pyuv.fs.UV_RENAME
+ )
+
+ def watch(self, path):
+ path = normpath(path, self.fenc)
+ with self.lock:
+ if path not in self.watches:
+ try:
+ self._start_watch(path)
+ except pyuv.error.FSEventError as e:
+ code = e.args[0]
+ if code == pyuv.errno.UV_ENOENT:
+ raise OSError(ENOENT, 'No such file or directory: ' + path)
+ else:
+ raise
+
+ def unwatch(self, path):
+ path = normpath(path, self.fenc)
+ with self.lock:
+ try:
+ watch = self.watches.pop(path)
+ except KeyError:
+ return
+ watch.close(partial(self._stopped_watching, path))
+
+ def is_watching(self, path):
+ with self.lock:
+ return normpath(path, self.fenc) in self.watches
+
+ def __del__(self):
+ try:
+ lock = self.lock
+ except AttributeError:
+ pass
+ else:
+ with lock:
+ while self.watches:
+ path, watch = self.watches.popitem()
+ watch.close(partial(self._stopped_watching, path))
+
+
+class UvFileWatcher(UvWatcher):
+ def __init__(self):
+ super(UvFileWatcher, self).__init__()
+ self.events = defaultdict(list)
+
+ def _record_event(self, path, fsevent_handle, filename, events, error):
+ with self.lock:
+ self.events[path].append(events)
+ if events | pyuv.fs.UV_RENAME:
+ if not os.path.exists(path):
+ self.watches.pop(path).close()
+
+ def _stopped_watching(self, path, *args):
+ self.events.pop(path, None)
+
+ def __call__(self, path):
+ path = normpath(path, self.fenc)
+ with self.lock:
+ events = self.events.pop(path, None)
+ if events:
+ return True
+ if path not in self.watches:
+ self.watch(path)
+ return True
+ return False
+
+
+class UvTreeWatcher(UvWatcher):
+ is_dummy = False
+
+ def __init__(self, basedir, ignore_event=None):
+ super(UvTreeWatcher, self).__init__()
+ self.ignore_event = ignore_event or (lambda path, name: False)
+ self.basedir = normpath(basedir, self.fenc)
+ self.modified = True
+ self.watch_directory(self.basedir)
+
+ def watch_directory(self, path):
+ for root, dirs, files in os.walk(normpath(path, self.fenc)):
+ self.watch_one_directory(root)
+
+ def watch_one_directory(self, dirname):
+ try:
+ self.watch(dirname)
+ except OSError:
+ pass
+
+ def _stopped_watching(self, path, *args):
+ pass
+
+ def _record_event(self, path, fsevent_handle, filename, events, error):
+ if not self.ignore_event(path, filename):
+ self.modified = True
+ if events == pyuv.fs.UV_CHANGE | pyuv.fs.UV_RENAME:
+ # Stat changes to watched directory are UV_CHANGE|UV_RENAME. It
+ # is weird.
+ pass
+ elif events | pyuv.fs.UV_RENAME:
+ if not os.path.isdir(path):
+ self.unwatch(path)
+ else:
+ full_name = os.path.join(path, filename)
+ if os.path.isdir(full_name):
+ # For some reason mkdir and rmdir both fall into this
+ # category
+ self.watch_directory(full_name)
+
+ def __call__(self):
+ return self.__dict__.pop('modified', False)