summaryrefslogtreecommitdiffstats
path: root/tests/test_python/test_watcher.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_python/test_watcher.py')
-rw-r--r--tests/test_python/test_watcher.py245
1 files changed, 245 insertions, 0 deletions
diff --git a/tests/test_python/test_watcher.py b/tests/test_python/test_watcher.py
new file mode 100644
index 0000000..a246d0b
--- /dev/null
+++ b/tests/test_python/test_watcher.py
@@ -0,0 +1,245 @@
+# vim:fileencoding=utf-8:noet
+from __future__ import (unicode_literals, division, absolute_import, print_function)
+
+import shutil
+import os
+
+from time import sleep
+from functools import partial
+from errno import ENOENT
+
+from powerline.lib.watcher import create_file_watcher, create_tree_watcher, INotifyError
+from powerline.lib.watcher.uv import UvNotFound
+from powerline import get_fallback_logger
+from powerline.lib.monotonic import monotonic
+
+from tests.modules import TestCase, SkipTest
+
+
+INOTIFY_DIR = 'inotify' + os.path.basename(os.environ.get('PYTHON', ''))
+
+
+def clear_dir(dir):
+ for root, dirs, files in list(os.walk(dir, topdown=False)):
+ for f in files:
+ os.remove(os.path.join(root, f))
+ for d in dirs:
+ os.rmdir(os.path.join(root, d))
+
+
+def set_watcher_tests(l):
+ byte_tests = (('bytes', True), ('unicode', False))
+
+ for btn, use_bytes in byte_tests:
+ def test_inotify_file_watcher(self, use_bytes=use_bytes):
+ try:
+ w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
+ except INotifyError:
+ raise SkipTest('This test is not suitable for a stat based file watcher')
+ self.do_test_file_watcher(w, use_bytes)
+
+ def test_uv_file_watcher(self, use_bytes=use_bytes):
+ raise SkipTest('Uv watcher tests are not stable')
+ try:
+ w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv')
+ except UvNotFound:
+ raise SkipTest('Pyuv is not available')
+ self.do_test_file_watcher(w, use_bytes)
+
+ def test_inotify_tree_watcher(self, use_bytes=use_bytes):
+ try:
+ tw = create_tree_watcher(get_fallback_logger(), watcher_type='inotify')
+ except INotifyError:
+ raise SkipTest('INotify is not available')
+ self.do_test_tree_watcher(tw, use_bytes)
+
+ def test_uv_tree_watcher(self, use_bytes=use_bytes):
+ raise SkipTest('Uv watcher tests are not stable')
+ try:
+ tw = create_tree_watcher(get_fallback_logger(), 'uv')
+ except UvNotFound:
+ raise SkipTest('Pyuv is not available')
+ self.do_test_tree_watcher(tw, use_bytes)
+
+ def test_inotify_file_watcher_is_watching(self, use_bytes=use_bytes):
+ try:
+ w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
+ except INotifyError:
+ raise SkipTest('INotify is not available')
+ self.do_test_file_watcher_is_watching(w, use_bytes)
+
+ def test_stat_file_watcher_is_watching(self, use_bytes=use_bytes):
+ w = create_file_watcher(pl=get_fallback_logger(), watcher_type='stat')
+ self.do_test_file_watcher_is_watching(w, use_bytes)
+
+ def test_uv_file_watcher_is_watching(self, use_bytes=use_bytes):
+ try:
+ w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv')
+ except UvNotFound:
+ raise SkipTest('Pyuv is not available')
+ self.do_test_file_watcher_is_watching(w, use_bytes)
+
+ for wt in ('uv', 'inotify'):
+ l['test_{0}_file_watcher_{1}'.format(wt, btn)] = locals()['test_{0}_file_watcher'.format(wt)]
+ l['test_{0}_tree_watcher_{1}'.format(wt, btn)] = locals()['test_{0}_tree_watcher'.format(wt)]
+ l['test_{0}_file_watcher_is_watching_{1}'.format(wt, btn)] = (
+ locals()['test_{0}_file_watcher_is_watching'.format(wt)])
+ l['test_{0}_file_watcher_is_watching_{1}'.format('stat', btn)] = (
+ locals()['test_{0}_file_watcher_is_watching'.format('stat')])
+
+
+class TestFilesystemWatchers(TestCase):
+ def do_test_for_change(self, watcher, path):
+ st = monotonic()
+ while monotonic() - st < 1:
+ if watcher(path):
+ return
+ sleep(0.1)
+ self.fail('The change to {0} was not detected'.format(path))
+
+ def do_test_file_watcher(self, w, use_bytes=False):
+ try:
+ f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
+ ne = os.path.join(INOTIFY_DIR, 'notexists')
+ if use_bytes:
+ f1 = f1.encode('utf-8')
+ f2 = f2.encode('utf-8')
+ f3 = f3.encode('utf-8')
+ ne = ne.encode('utf-8')
+ with open(f1, 'wb'):
+ with open(f2, 'wb'):
+ with open(f3, 'wb'):
+ pass
+ self.assertRaises(OSError, w, ne)
+ self.assertTrue(w(f1))
+ self.assertTrue(w(f2))
+ os.utime(f1, None), os.utime(f2, None)
+ self.do_test_for_change(w, f1)
+ self.do_test_for_change(w, f2)
+ # Repeat once
+ os.utime(f1, None), os.utime(f2, None)
+ self.do_test_for_change(w, f1)
+ self.do_test_for_change(w, f2)
+ # Check that no false changes are reported
+ self.assertFalse(w(f1), 'Spurious change detected')
+ self.assertFalse(w(f2), 'Spurious change detected')
+ # Check that open the file with 'w' triggers a change
+ with open(f1, 'wb'):
+ with open(f2, 'wb'):
+ pass
+ self.do_test_for_change(w, f1)
+ self.do_test_for_change(w, f2)
+ # Check that writing to a file with 'a' triggers a change
+ with open(f1, 'ab') as f:
+ f.write(b'1')
+ self.do_test_for_change(w, f1)
+ # Check that deleting a file registers as a change
+ os.unlink(f1)
+ self.do_test_for_change(w, f1)
+ # Test that changing the inode of a file does not cause it to stop
+ # being watched
+ os.rename(f3, f2)
+ self.do_test_for_change(w, f2)
+ self.assertFalse(w(f2), 'Spurious change detected')
+ os.utime(f2, None)
+ self.do_test_for_change(w, f2)
+ finally:
+ clear_dir(INOTIFY_DIR)
+
+ def do_test_tree_watcher(self, tw, use_bytes=False):
+ try:
+ inotify_dir = INOTIFY_DIR
+ subdir = os.path.join(inotify_dir, 'subdir')
+ t1 = os.path.join(inotify_dir, 'tree1')
+ ts1 = os.path.join(subdir, 'tree1')
+ suffix = '1'
+ f = os.path.join(subdir, 'f')
+ if use_bytes:
+ inotify_dir = inotify_dir.encode('utf-8')
+ subdir = subdir.encode('utf-8')
+ t1 = t1.encode('utf-8')
+ ts1 = ts1.encode('utf-8')
+ suffix = suffix.encode('utf-8')
+ f = f.encode('utf-8')
+ os.mkdir(subdir)
+ try:
+ if tw.watch(inotify_dir).is_dummy:
+ raise SkipTest('No tree watcher available')
+ except UvNotFound:
+ raise SkipTest('Pyuv is not available')
+ except INotifyError:
+ raise SkipTest('INotify is not available')
+ self.assertTrue(tw(inotify_dir))
+ self.assertFalse(tw(inotify_dir))
+ changed = partial(self.do_test_for_change, tw, inotify_dir)
+ open(t1, 'w').close()
+ changed()
+ open(ts1, 'w').close()
+ changed()
+ os.unlink(ts1)
+ changed()
+ os.rmdir(subdir)
+ changed()
+ os.mkdir(subdir)
+ changed()
+ os.rename(subdir, subdir + suffix)
+ changed()
+ shutil.rmtree(subdir + suffix)
+ changed()
+ os.mkdir(subdir)
+ open(f, 'w').close()
+ changed()
+ with open(f, 'a') as s:
+ s.write(' ')
+ changed()
+ os.rename(f, f + suffix)
+ changed()
+ finally:
+ clear_dir(inotify_dir)
+
+ def do_test_file_watcher_is_watching(self, w, use_bytes=False):
+ try:
+ f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
+ ne = os.path.join(INOTIFY_DIR, 'notexists')
+ if use_bytes:
+ f1 = f1.encode('utf-8')
+ f2 = f2.encode('utf-8')
+ f3 = f3.encode('utf-8')
+ ne = ne.encode('utf-8')
+ with open(f1, 'wb'):
+ with open(f2, 'wb'):
+ with open(f3, 'wb'):
+ pass
+ self.assertRaises(OSError, w, ne)
+ try:
+ w(ne)
+ except OSError as e:
+ self.assertEqual(e.errno, ENOENT)
+ self.assertTrue(w(f1))
+ self.assertFalse(w.is_watching(ne))
+ self.assertTrue(w.is_watching(f1))
+ self.assertFalse(w.is_watching(f2))
+ finally:
+ clear_dir(INOTIFY_DIR)
+
+ set_watcher_tests(locals())
+
+
+old_cwd = None
+
+
+def setUpModule():
+ global old_cwd
+ old_cwd = os.getcwd()
+ os.chdir(os.path.dirname(os.path.dirname(__file__)))
+ os.mkdir(INOTIFY_DIR)
+
+
+def tearDownModule():
+ shutil.rmtree(INOTIFY_DIR)
+ os.chdir(old_cwd)
+
+
+if __name__ == '__main__':
+ from tests.modules import main
+ main()