diff options
Diffstat (limited to 'powerline/lib/watcher')
-rw-r--r-- | powerline/lib/watcher/__init__.py | 76 | ||||
-rw-r--r-- | powerline/lib/watcher/inotify.py | 268 | ||||
-rw-r--r-- | powerline/lib/watcher/stat.py | 44 | ||||
-rw-r--r-- | powerline/lib/watcher/tree.py | 90 | ||||
-rw-r--r-- | powerline/lib/watcher/uv.py | 207 |
5 files changed, 685 insertions, 0 deletions
diff --git a/powerline/lib/watcher/__init__.py b/powerline/lib/watcher/__init__.py new file mode 100644 index 0000000..4fe9896 --- /dev/null +++ b/powerline/lib/watcher/__init__.py @@ -0,0 +1,76 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys + +from powerline.lib.watcher.stat import StatFileWatcher +from powerline.lib.watcher.inotify import INotifyFileWatcher +from powerline.lib.watcher.tree import TreeWatcher +from powerline.lib.watcher.uv import UvFileWatcher, UvNotFound +from powerline.lib.inotify import INotifyError + + +def create_file_watcher(pl, watcher_type='auto', expire_time=10): + '''Create an object that can watch for changes to specified files + + Use ``.__call__()`` method of the returned object to start watching the file + or check whether file has changed since last call. + + Use ``.unwatch()`` method of the returned object to stop watching the file. + + Uses inotify if available, then pyuv, otherwise tracks mtimes. expire_time + is the number of minutes after the last query for a given path for the + inotify watch for that path to be automatically removed. This conserves + kernel resources. + + :param PowerlineLogger pl: + Logger. + :param str watcher_type + One of ``inotify`` (linux only), ``uv``, ``stat``, ``auto``. Determines + what watcher will be used. ``auto`` will use ``inotify`` if available, + then ``libuv`` and then fall back to ``stat``. + :param int expire_time: + Number of minutes since last ``.__call__()`` before inotify watcher will + stop watching given file. + ''' + if watcher_type == 'stat': + pl.debug('Using requested stat-based watcher', prefix='watcher') + return StatFileWatcher() + if watcher_type == 'inotify': + # Explicitly selected inotify watcher: do not catch INotifyError then. + pl.debug('Using requested inotify watcher', prefix='watcher') + return INotifyFileWatcher(expire_time=expire_time) + elif watcher_type == 'uv': + pl.debug('Using requested uv watcher', prefix='watcher') + return UvFileWatcher() + + if sys.platform.startswith('linux'): + try: + pl.debug('Trying to use inotify watcher', prefix='watcher') + return INotifyFileWatcher(expire_time=expire_time) + except INotifyError: + pl.info('Failed to create inotify watcher', prefix='watcher') + + try: + pl.debug('Using libuv-based watcher') + return UvFileWatcher() + except UvNotFound: + pl.debug('Failed to import pyuv') + + pl.debug('Using stat-based watcher') + return StatFileWatcher() + + +def create_tree_watcher(pl, watcher_type='auto', expire_time=10): + '''Create an object that can watch for changes in specified directories + + :param PowerlineLogger pl: + Logger. + :param str watcher_type: + Watcher type. Currently the only supported types are ``inotify`` (linux + only), ``uv``, ``dummy`` and ``auto``. + :param int expire_time: + Number of minutes since last ``.__call__()`` before inotify watcher will + stop watching given file. + ''' + return TreeWatcher(pl, watcher_type, expire_time) diff --git a/powerline/lib/watcher/inotify.py b/powerline/lib/watcher/inotify.py new file mode 100644 index 0000000..c4f1200 --- /dev/null +++ b/powerline/lib/watcher/inotify.py @@ -0,0 +1,268 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import errno +import os +import ctypes + +from threading import RLock + +from powerline.lib.inotify import INotify +from powerline.lib.monotonic import monotonic +from powerline.lib.path import realpath + + +class INotifyFileWatcher(INotify): + def __init__(self, expire_time=10): + super(INotifyFileWatcher, self).__init__() + self.watches = {} + self.modified = {} + self.last_query = {} + self.lock = RLock() + self.expire_time = expire_time * 60 + + def expire_watches(self): + now = monotonic() + for path, last_query in tuple(self.last_query.items()): + if last_query - now > self.expire_time: + self.unwatch(path) + + def process_event(self, wd, mask, cookie, name): + if wd == -1 and (mask & self.Q_OVERFLOW): + # We missed some INOTIFY events, so we don't + # know the state of any tracked files. + for path in tuple(self.modified): + if os.path.exists(path): + self.modified[path] = True + else: + self.watches.pop(path, None) + self.modified.pop(path, None) + self.last_query.pop(path, None) + return + + for path, num in tuple(self.watches.items()): + if num == wd: + if mask & self.IGNORED: + self.watches.pop(path, None) + self.modified.pop(path, None) + self.last_query.pop(path, None) + else: + if mask & self.ATTRIB: + # The watched file could have had its inode changed, in + # which case we will not get any more events for this + # file, so re-register the watch. For example by some + # other file being renamed as this file. + try: + self.unwatch(path) + except OSError: + pass + try: + self.watch(path) + except OSError as e: + if getattr(e, 'errno', None) != errno.ENOENT: + raise + else: + self.modified[path] = True + else: + self.modified[path] = True + + def unwatch(self, path): + ''' Remove the watch for path. Raises an OSError if removing the watch + fails for some reason. ''' + path = realpath(path) + with self.lock: + self.modified.pop(path, None) + self.last_query.pop(path, None) + wd = self.watches.pop(path, None) + if wd is not None: + if self._rm_watch(self._inotify_fd, wd) != 0: + self.handle_error() + + def watch(self, path): + ''' Register a watch for the file/directory named path. Raises an OSError if path + does not exist. ''' + path = realpath(path) + with self.lock: + if path not in self.watches: + bpath = path if isinstance(path, bytes) else path.encode(self.fenc) + flags = self.MOVE_SELF | self.DELETE_SELF + buf = ctypes.c_char_p(bpath) + # Try watching path as a directory + wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR) + if wd == -1: + eno = ctypes.get_errno() + if eno != errno.ENOTDIR: + self.handle_error() + # Try watching path as a file + flags |= (self.MODIFY | self.ATTRIB) + wd = self._add_watch(self._inotify_fd, buf, flags) + if wd == -1: + self.handle_error() + self.watches[path] = wd + self.modified[path] = False + + def is_watching(self, path): + with self.lock: + return realpath(path) in self.watches + + def __call__(self, path): + ''' Return True if path has been modified since the last call. Can + raise OSError if the path does not exist. ''' + path = realpath(path) + with self.lock: + self.last_query[path] = monotonic() + self.expire_watches() + if path not in self.watches: + # Try to re-add the watch, it will fail if the file does not + # exist/you don't have permission + self.watch(path) + return True + self.read(get_name=False) + if path not in self.modified: + # An ignored event was received which means the path has been + # automatically unwatched + return True + ans = self.modified[path] + if ans: + self.modified[path] = False + return ans + + def close(self): + with self.lock: + for path in tuple(self.watches): + try: + self.unwatch(path) + except OSError: + pass + super(INotifyFileWatcher, self).close() + + +class NoSuchDir(ValueError): + pass + + +class BaseDirChanged(ValueError): + pass + + +class DirTooLarge(ValueError): + def __init__(self, bdir): + ValueError.__init__(self, 'The directory {0} is too large to monitor. Try increasing the value in /proc/sys/fs/inotify/max_user_watches'.format(bdir)) + + +class INotifyTreeWatcher(INotify): + is_dummy = False + + def __init__(self, basedir, ignore_event=None): + super(INotifyTreeWatcher, self).__init__() + self.basedir = realpath(basedir) + self.watch_tree() + self.modified = True + self.ignore_event = (lambda path, name: False) if ignore_event is None else ignore_event + + def watch_tree(self): + self.watched_dirs = {} + self.watched_rmap = {} + try: + self.add_watches(self.basedir) + except OSError as e: + if e.errno == errno.ENOSPC: + raise DirTooLarge(self.basedir) + + def add_watches(self, base, top_level=True): + ''' Add watches for this directory and all its descendant directories, + recursively. ''' + base = realpath(base) + # There may exist a link which leads to an endless + # add_watches loop or to maximum recursion depth exceeded + if not top_level and base in self.watched_dirs: + return + try: + is_dir = self.add_watch(base) + except OSError as e: + if e.errno == errno.ENOENT: + # The entry could have been deleted between listdir() and + # add_watch(). + if top_level: + raise NoSuchDir('The dir {0} does not exist'.format(base)) + return + if e.errno == errno.EACCES: + # We silently ignore entries for which we don't have permission, + # unless they are the top level dir + if top_level: + raise NoSuchDir('You do not have permission to monitor {0}'.format(base)) + return + raise + else: + if is_dir: + try: + files = os.listdir(base) + except OSError as e: + if e.errno in (errno.ENOTDIR, errno.ENOENT): + # The dir was deleted/replaced between the add_watch() + # and listdir() + if top_level: + raise NoSuchDir('The dir {0} does not exist'.format(base)) + return + raise + for x in files: + self.add_watches(os.path.join(base, x), top_level=False) + elif top_level: + # The top level dir is a file, not good. + raise NoSuchDir('The dir {0} does not exist'.format(base)) + + def add_watch(self, path): + bpath = path if isinstance(path, bytes) else path.encode(self.fenc) + wd = self._add_watch( + self._inotify_fd, + ctypes.c_char_p(bpath), + + # Ignore symlinks and watch only directories + self.DONT_FOLLOW | self.ONLYDIR | + + self.MODIFY | self.CREATE | self.DELETE | + self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO | + self.ATTRIB | self.DELETE_SELF + ) + if wd == -1: + eno = ctypes.get_errno() + if eno == errno.ENOTDIR: + return False + raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno))) + self.watched_dirs[path] = wd + self.watched_rmap[wd] = path + return True + + def process_event(self, wd, mask, cookie, name): + if wd == -1 and (mask & self.Q_OVERFLOW): + # We missed some INOTIFY events, so we don't + # know the state of any tracked dirs. + self.watch_tree() + self.modified = True + return + path = self.watched_rmap.get(wd, None) + if path is not None: + if not self.ignore_event(path, name): + self.modified = True + if mask & self.CREATE: + # A new sub-directory might have been created, monitor it. + try: + if not isinstance(path, bytes): + name = name.decode(self.fenc) + self.add_watch(os.path.join(path, name)) + except OSError as e: + if e.errno == errno.ENOENT: + # Deleted before add_watch() + pass + elif e.errno == errno.ENOSPC: + raise DirTooLarge(self.basedir) + else: + raise + if (mask & self.DELETE_SELF or mask & self.MOVE_SELF) and path == self.basedir: + raise BaseDirChanged('The directory %s was moved/deleted' % path) + + def __call__(self): + self.read() + ret = self.modified + self.modified = False + return ret diff --git a/powerline/lib/watcher/stat.py b/powerline/lib/watcher/stat.py new file mode 100644 index 0000000..0c08971 --- /dev/null +++ b/powerline/lib/watcher/stat.py @@ -0,0 +1,44 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os + +from threading import RLock + +from powerline.lib.path import realpath + + +class StatFileWatcher(object): + def __init__(self): + self.watches = {} + self.lock = RLock() + + def watch(self, path): + path = realpath(path) + with self.lock: + self.watches[path] = os.path.getmtime(path) + + def unwatch(self, path): + path = realpath(path) + with self.lock: + self.watches.pop(path, None) + + def is_watching(self, path): + with self.lock: + return realpath(path) in self.watches + + def __call__(self, path): + path = realpath(path) + with self.lock: + if path not in self.watches: + self.watches[path] = os.path.getmtime(path) + return True + mtime = os.path.getmtime(path) + if mtime != self.watches[path]: + self.watches[path] = mtime + return True + return False + + def close(self): + with self.lock: + self.watches.clear() diff --git a/powerline/lib/watcher/tree.py b/powerline/lib/watcher/tree.py new file mode 100644 index 0000000..7d2b83f --- /dev/null +++ b/powerline/lib/watcher/tree.py @@ -0,0 +1,90 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import sys + +from powerline.lib.monotonic import monotonic +from powerline.lib.inotify import INotifyError +from powerline.lib.path import realpath +from powerline.lib.watcher.inotify import INotifyTreeWatcher, DirTooLarge, NoSuchDir, BaseDirChanged +from powerline.lib.watcher.uv import UvTreeWatcher, UvNotFound + + +class DummyTreeWatcher(object): + is_dummy = True + + def __init__(self, basedir): + self.basedir = realpath(basedir) + + def __call__(self): + return False + + +class TreeWatcher(object): + def __init__(self, pl, watcher_type, expire_time): + self.watches = {} + self.last_query_times = {} + self.expire_time = expire_time * 60 + self.pl = pl + self.watcher_type = watcher_type + + def get_watcher(self, path, ignore_event): + if self.watcher_type == 'inotify': + return INotifyTreeWatcher(path, ignore_event=ignore_event) + if self.watcher_type == 'uv': + return UvTreeWatcher(path, ignore_event=ignore_event) + if self.watcher_type == 'dummy': + return DummyTreeWatcher(path) + # FIXME + if self.watcher_type == 'stat': + return DummyTreeWatcher(path) + if self.watcher_type == 'auto': + if sys.platform.startswith('linux'): + try: + return INotifyTreeWatcher(path, ignore_event=ignore_event) + except (INotifyError, DirTooLarge) as e: + if not isinstance(e, INotifyError): + self.pl.warn('Failed to watch path: {0} with error: {1}'.format(path, e)) + try: + return UvTreeWatcher(path, ignore_event=ignore_event) + except UvNotFound: + pass + return DummyTreeWatcher(path) + else: + raise ValueError('Unknown watcher type: {0}'.format(self.watcher_type)) + + def watch(self, path, ignore_event=None): + path = realpath(path) + w = self.get_watcher(path, ignore_event) + self.watches[path] = w + return w + + def expire_old_queries(self): + pop = [] + now = monotonic() + for path, lt in self.last_query_times.items(): + if now - lt > self.expire_time: + pop.append(path) + for path in pop: + del self.last_query_times[path] + + def __call__(self, path, ignore_event=None): + path = realpath(path) + self.expire_old_queries() + self.last_query_times[path] = monotonic() + w = self.watches.get(path, None) + if w is None: + try: + self.watch(path, ignore_event=ignore_event) + except NoSuchDir: + pass + return True + try: + return w() + except BaseDirChanged: + self.watches.pop(path, None) + return True + except DirTooLarge as e: + self.pl.warn(str(e)) + self.watches[path] = DummyTreeWatcher(path) + return False diff --git a/powerline/lib/watcher/uv.py b/powerline/lib/watcher/uv.py new file mode 100644 index 0000000..272db0f --- /dev/null +++ b/powerline/lib/watcher/uv.py @@ -0,0 +1,207 @@ +# vim:fileencoding=utf-8:noet +from __future__ import (unicode_literals, division, absolute_import, print_function) + +import os + +from collections import defaultdict +from threading import RLock +from functools import partial +from threading import Thread +from errno import ENOENT + +from powerline.lib.path import realpath +from powerline.lib.encoding import get_preferred_file_name_encoding + + +class UvNotFound(NotImplementedError): + pass + + +pyuv = None +pyuv_version_info = None + + +def import_pyuv(): + global pyuv + global pyuv_version_info + if not pyuv: + try: + pyuv = __import__('pyuv') + except ImportError: + raise UvNotFound + else: + pyuv_version_info = tuple((int(c) for c in pyuv.__version__.split('.'))) + + +class UvThread(Thread): + daemon = True + + def __init__(self, loop): + self.uv_loop = loop + self.async_handle = pyuv.Async(loop, self._async_cb) + super(UvThread, self).__init__() + + def _async_cb(self, handle): + self.uv_loop.stop() + self.async_handle.close() + + def run(self): + self.uv_loop.run() + + def join(self): + self.async_handle.send() + return super(UvThread, self).join() + + +_uv_thread = None + + +def start_uv_thread(): + global _uv_thread + if _uv_thread is None: + loop = pyuv.Loop() + _uv_thread = UvThread(loop) + _uv_thread.start() + return _uv_thread.uv_loop + + +def normpath(path, fenc): + path = realpath(path) + if isinstance(path, bytes): + return path.decode(fenc) + else: + return path + + +class UvWatcher(object): + def __init__(self): + import_pyuv() + self.watches = {} + self.lock = RLock() + self.loop = start_uv_thread() + self.fenc = get_preferred_file_name_encoding() + if pyuv_version_info >= (1, 0): + self._start_watch = self._start_watch_1_x + else: + self._start_watch = self._start_watch_0_x + + def _start_watch_1_x(self, path): + handle = pyuv.fs.FSEvent(self.loop) + handle.start(path, 0, partial(self._record_event, path)) + self.watches[path] = handle + + def _start_watch_0_x(self, path): + self.watches[path] = pyuv.fs.FSEvent( + self.loop, + path, + partial(self._record_event, path), + pyuv.fs.UV_CHANGE | pyuv.fs.UV_RENAME + ) + + def watch(self, path): + path = normpath(path, self.fenc) + with self.lock: + if path not in self.watches: + try: + self._start_watch(path) + except pyuv.error.FSEventError as e: + code = e.args[0] + if code == pyuv.errno.UV_ENOENT: + raise OSError(ENOENT, 'No such file or directory: ' + path) + else: + raise + + def unwatch(self, path): + path = normpath(path, self.fenc) + with self.lock: + try: + watch = self.watches.pop(path) + except KeyError: + return + watch.close(partial(self._stopped_watching, path)) + + def is_watching(self, path): + with self.lock: + return normpath(path, self.fenc) in self.watches + + def __del__(self): + try: + lock = self.lock + except AttributeError: + pass + else: + with lock: + while self.watches: + path, watch = self.watches.popitem() + watch.close(partial(self._stopped_watching, path)) + + +class UvFileWatcher(UvWatcher): + def __init__(self): + super(UvFileWatcher, self).__init__() + self.events = defaultdict(list) + + def _record_event(self, path, fsevent_handle, filename, events, error): + with self.lock: + self.events[path].append(events) + if events | pyuv.fs.UV_RENAME: + if not os.path.exists(path): + self.watches.pop(path).close() + + def _stopped_watching(self, path, *args): + self.events.pop(path, None) + + def __call__(self, path): + path = normpath(path, self.fenc) + with self.lock: + events = self.events.pop(path, None) + if events: + return True + if path not in self.watches: + self.watch(path) + return True + return False + + +class UvTreeWatcher(UvWatcher): + is_dummy = False + + def __init__(self, basedir, ignore_event=None): + super(UvTreeWatcher, self).__init__() + self.ignore_event = ignore_event or (lambda path, name: False) + self.basedir = normpath(basedir, self.fenc) + self.modified = True + self.watch_directory(self.basedir) + + def watch_directory(self, path): + for root, dirs, files in os.walk(normpath(path, self.fenc)): + self.watch_one_directory(root) + + def watch_one_directory(self, dirname): + try: + self.watch(dirname) + except OSError: + pass + + def _stopped_watching(self, path, *args): + pass + + def _record_event(self, path, fsevent_handle, filename, events, error): + if not self.ignore_event(path, filename): + self.modified = True + if events == pyuv.fs.UV_CHANGE | pyuv.fs.UV_RENAME: + # Stat changes to watched directory are UV_CHANGE|UV_RENAME. It + # is weird. + pass + elif events | pyuv.fs.UV_RENAME: + if not os.path.isdir(path): + self.unwatch(path) + else: + full_name = os.path.join(path, filename) + if os.path.isdir(full_name): + # For some reason mkdir and rmdir both fall into this + # category + self.watch_directory(full_name) + + def __call__(self): + return self.__dict__.pop('modified', False) |