Adding upstream version 2.8.4.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
This commit is contained in:
parent
09b97de4ea
commit
52d7687863
381 changed files with 40831 additions and 0 deletions
245
tests/test_python/test_watcher.py
Normal file
245
tests/test_python/test_watcher.py
Normal file
|
@ -0,0 +1,245 @@
|
|||
# vim:fileencoding=utf-8:noet
|
||||
from __future__ import (unicode_literals, division, absolute_import, print_function)
|
||||
|
||||
import shutil
|
||||
import os
|
||||
|
||||
from time import sleep
|
||||
from functools import partial
|
||||
from errno import ENOENT
|
||||
|
||||
from powerline.lib.watcher import create_file_watcher, create_tree_watcher, INotifyError
|
||||
from powerline.lib.watcher.uv import UvNotFound
|
||||
from powerline import get_fallback_logger
|
||||
from powerline.lib.monotonic import monotonic
|
||||
|
||||
from tests.modules import TestCase, SkipTest
|
||||
|
||||
|
||||
INOTIFY_DIR = 'inotify' + os.path.basename(os.environ.get('PYTHON', ''))
|
||||
|
||||
|
||||
def clear_dir(dir):
|
||||
for root, dirs, files in list(os.walk(dir, topdown=False)):
|
||||
for f in files:
|
||||
os.remove(os.path.join(root, f))
|
||||
for d in dirs:
|
||||
os.rmdir(os.path.join(root, d))
|
||||
|
||||
|
||||
def set_watcher_tests(l):
|
||||
byte_tests = (('bytes', True), ('unicode', False))
|
||||
|
||||
for btn, use_bytes in byte_tests:
|
||||
def test_inotify_file_watcher(self, use_bytes=use_bytes):
|
||||
try:
|
||||
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
|
||||
except INotifyError:
|
||||
raise SkipTest('This test is not suitable for a stat based file watcher')
|
||||
self.do_test_file_watcher(w, use_bytes)
|
||||
|
||||
def test_uv_file_watcher(self, use_bytes=use_bytes):
|
||||
raise SkipTest('Uv watcher tests are not stable')
|
||||
try:
|
||||
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv')
|
||||
except UvNotFound:
|
||||
raise SkipTest('Pyuv is not available')
|
||||
self.do_test_file_watcher(w, use_bytes)
|
||||
|
||||
def test_inotify_tree_watcher(self, use_bytes=use_bytes):
|
||||
try:
|
||||
tw = create_tree_watcher(get_fallback_logger(), watcher_type='inotify')
|
||||
except INotifyError:
|
||||
raise SkipTest('INotify is not available')
|
||||
self.do_test_tree_watcher(tw, use_bytes)
|
||||
|
||||
def test_uv_tree_watcher(self, use_bytes=use_bytes):
|
||||
raise SkipTest('Uv watcher tests are not stable')
|
||||
try:
|
||||
tw = create_tree_watcher(get_fallback_logger(), 'uv')
|
||||
except UvNotFound:
|
||||
raise SkipTest('Pyuv is not available')
|
||||
self.do_test_tree_watcher(tw, use_bytes)
|
||||
|
||||
def test_inotify_file_watcher_is_watching(self, use_bytes=use_bytes):
|
||||
try:
|
||||
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='inotify')
|
||||
except INotifyError:
|
||||
raise SkipTest('INotify is not available')
|
||||
self.do_test_file_watcher_is_watching(w, use_bytes)
|
||||
|
||||
def test_stat_file_watcher_is_watching(self, use_bytes=use_bytes):
|
||||
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='stat')
|
||||
self.do_test_file_watcher_is_watching(w, use_bytes)
|
||||
|
||||
def test_uv_file_watcher_is_watching(self, use_bytes=use_bytes):
|
||||
try:
|
||||
w = create_file_watcher(pl=get_fallback_logger(), watcher_type='uv')
|
||||
except UvNotFound:
|
||||
raise SkipTest('Pyuv is not available')
|
||||
self.do_test_file_watcher_is_watching(w, use_bytes)
|
||||
|
||||
for wt in ('uv', 'inotify'):
|
||||
l['test_{0}_file_watcher_{1}'.format(wt, btn)] = locals()['test_{0}_file_watcher'.format(wt)]
|
||||
l['test_{0}_tree_watcher_{1}'.format(wt, btn)] = locals()['test_{0}_tree_watcher'.format(wt)]
|
||||
l['test_{0}_file_watcher_is_watching_{1}'.format(wt, btn)] = (
|
||||
locals()['test_{0}_file_watcher_is_watching'.format(wt)])
|
||||
l['test_{0}_file_watcher_is_watching_{1}'.format('stat', btn)] = (
|
||||
locals()['test_{0}_file_watcher_is_watching'.format('stat')])
|
||||
|
||||
|
||||
class TestFilesystemWatchers(TestCase):
|
||||
def do_test_for_change(self, watcher, path):
|
||||
st = monotonic()
|
||||
while monotonic() - st < 1:
|
||||
if watcher(path):
|
||||
return
|
||||
sleep(0.1)
|
||||
self.fail('The change to {0} was not detected'.format(path))
|
||||
|
||||
def do_test_file_watcher(self, w, use_bytes=False):
|
||||
try:
|
||||
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
|
||||
ne = os.path.join(INOTIFY_DIR, 'notexists')
|
||||
if use_bytes:
|
||||
f1 = f1.encode('utf-8')
|
||||
f2 = f2.encode('utf-8')
|
||||
f3 = f3.encode('utf-8')
|
||||
ne = ne.encode('utf-8')
|
||||
with open(f1, 'wb'):
|
||||
with open(f2, 'wb'):
|
||||
with open(f3, 'wb'):
|
||||
pass
|
||||
self.assertRaises(OSError, w, ne)
|
||||
self.assertTrue(w(f1))
|
||||
self.assertTrue(w(f2))
|
||||
os.utime(f1, None), os.utime(f2, None)
|
||||
self.do_test_for_change(w, f1)
|
||||
self.do_test_for_change(w, f2)
|
||||
# Repeat once
|
||||
os.utime(f1, None), os.utime(f2, None)
|
||||
self.do_test_for_change(w, f1)
|
||||
self.do_test_for_change(w, f2)
|
||||
# Check that no false changes are reported
|
||||
self.assertFalse(w(f1), 'Spurious change detected')
|
||||
self.assertFalse(w(f2), 'Spurious change detected')
|
||||
# Check that open the file with 'w' triggers a change
|
||||
with open(f1, 'wb'):
|
||||
with open(f2, 'wb'):
|
||||
pass
|
||||
self.do_test_for_change(w, f1)
|
||||
self.do_test_for_change(w, f2)
|
||||
# Check that writing to a file with 'a' triggers a change
|
||||
with open(f1, 'ab') as f:
|
||||
f.write(b'1')
|
||||
self.do_test_for_change(w, f1)
|
||||
# Check that deleting a file registers as a change
|
||||
os.unlink(f1)
|
||||
self.do_test_for_change(w, f1)
|
||||
# Test that changing the inode of a file does not cause it to stop
|
||||
# being watched
|
||||
os.rename(f3, f2)
|
||||
self.do_test_for_change(w, f2)
|
||||
self.assertFalse(w(f2), 'Spurious change detected')
|
||||
os.utime(f2, None)
|
||||
self.do_test_for_change(w, f2)
|
||||
finally:
|
||||
clear_dir(INOTIFY_DIR)
|
||||
|
||||
def do_test_tree_watcher(self, tw, use_bytes=False):
|
||||
try:
|
||||
inotify_dir = INOTIFY_DIR
|
||||
subdir = os.path.join(inotify_dir, 'subdir')
|
||||
t1 = os.path.join(inotify_dir, 'tree1')
|
||||
ts1 = os.path.join(subdir, 'tree1')
|
||||
suffix = '1'
|
||||
f = os.path.join(subdir, 'f')
|
||||
if use_bytes:
|
||||
inotify_dir = inotify_dir.encode('utf-8')
|
||||
subdir = subdir.encode('utf-8')
|
||||
t1 = t1.encode('utf-8')
|
||||
ts1 = ts1.encode('utf-8')
|
||||
suffix = suffix.encode('utf-8')
|
||||
f = f.encode('utf-8')
|
||||
os.mkdir(subdir)
|
||||
try:
|
||||
if tw.watch(inotify_dir).is_dummy:
|
||||
raise SkipTest('No tree watcher available')
|
||||
except UvNotFound:
|
||||
raise SkipTest('Pyuv is not available')
|
||||
except INotifyError:
|
||||
raise SkipTest('INotify is not available')
|
||||
self.assertTrue(tw(inotify_dir))
|
||||
self.assertFalse(tw(inotify_dir))
|
||||
changed = partial(self.do_test_for_change, tw, inotify_dir)
|
||||
open(t1, 'w').close()
|
||||
changed()
|
||||
open(ts1, 'w').close()
|
||||
changed()
|
||||
os.unlink(ts1)
|
||||
changed()
|
||||
os.rmdir(subdir)
|
||||
changed()
|
||||
os.mkdir(subdir)
|
||||
changed()
|
||||
os.rename(subdir, subdir + suffix)
|
||||
changed()
|
||||
shutil.rmtree(subdir + suffix)
|
||||
changed()
|
||||
os.mkdir(subdir)
|
||||
open(f, 'w').close()
|
||||
changed()
|
||||
with open(f, 'a') as s:
|
||||
s.write(' ')
|
||||
changed()
|
||||
os.rename(f, f + suffix)
|
||||
changed()
|
||||
finally:
|
||||
clear_dir(inotify_dir)
|
||||
|
||||
def do_test_file_watcher_is_watching(self, w, use_bytes=False):
|
||||
try:
|
||||
f1, f2, f3 = map(lambda x: os.path.join(INOTIFY_DIR, 'file%d' % x), (1, 2, 3))
|
||||
ne = os.path.join(INOTIFY_DIR, 'notexists')
|
||||
if use_bytes:
|
||||
f1 = f1.encode('utf-8')
|
||||
f2 = f2.encode('utf-8')
|
||||
f3 = f3.encode('utf-8')
|
||||
ne = ne.encode('utf-8')
|
||||
with open(f1, 'wb'):
|
||||
with open(f2, 'wb'):
|
||||
with open(f3, 'wb'):
|
||||
pass
|
||||
self.assertRaises(OSError, w, ne)
|
||||
try:
|
||||
w(ne)
|
||||
except OSError as e:
|
||||
self.assertEqual(e.errno, ENOENT)
|
||||
self.assertTrue(w(f1))
|
||||
self.assertFalse(w.is_watching(ne))
|
||||
self.assertTrue(w.is_watching(f1))
|
||||
self.assertFalse(w.is_watching(f2))
|
||||
finally:
|
||||
clear_dir(INOTIFY_DIR)
|
||||
|
||||
set_watcher_tests(locals())
|
||||
|
||||
|
||||
old_cwd = None
|
||||
|
||||
|
||||
def setUpModule():
|
||||
global old_cwd
|
||||
old_cwd = os.getcwd()
|
||||
os.chdir(os.path.dirname(os.path.dirname(__file__)))
|
||||
os.mkdir(INOTIFY_DIR)
|
||||
|
||||
|
||||
def tearDownModule():
|
||||
shutil.rmtree(INOTIFY_DIR)
|
||||
os.chdir(old_cwd)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from tests.modules import main
|
||||
main()
|
Loading…
Add table
Add a link
Reference in a new issue