summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/dummy.session5
-rw-r--r--tests/dummyapp.desktop5
-rw-r--r--tests/gsdtestcase.py311
-rw-r--r--tests/output_checker.py196
-rw-r--r--tests/x11session.py120
5 files changed, 637 insertions, 0 deletions
diff --git a/tests/dummy.session b/tests/dummy.session
new file mode 100644
index 0000000..96a1aa8
--- /dev/null
+++ b/tests/dummy.session
@@ -0,0 +1,5 @@
+[GNOME Session]
+Name=dummy
+RequiredComponents=dummyapp
+DesktopName=dummy
+
diff --git a/tests/dummyapp.desktop b/tests/dummyapp.desktop
new file mode 100644
index 0000000..3bb53b2
--- /dev/null
+++ b/tests/dummyapp.desktop
@@ -0,0 +1,5 @@
+[Desktop Entry]
+Name=dummyapp
+Type=Application
+Exec=sleep 3600
+
diff --git a/tests/gsdtestcase.py b/tests/gsdtestcase.py
new file mode 100644
index 0000000..51f1825
--- /dev/null
+++ b/tests/gsdtestcase.py
@@ -0,0 +1,311 @@
+'''GNOME settings daemon test base class'''
+
+__author__ = 'Martin Pitt <martin.pitt@ubuntu.com>'
+__copyright__ = '(C) 2013 Canonical Ltd.'
+__license__ = 'GPL v2 or later'
+
+import subprocess
+import time
+import os
+import os.path
+import tempfile
+import fcntl
+import shutil
+import sys
+from glob import glob
+import signal
+
+from output_checker import OutputChecker
+
+from gi.repository import GLib
+
+try:
+ import dbusmock
+except ImportError:
+ sys.stderr.write('You need python-dbusmock (http://pypi.python.org/pypi/python-dbusmock) for this test suite.\n')
+ sys.exit(77)
+
+from x11session import X11SessionTestCase
+
+try:
+ from gi.repository import Gio
+except ImportError:
+ sys.stderr.write('You need pygobject and the Gio GIR for this test suite.\n')
+ sys.exit(77)
+
+if subprocess.call(['which', 'gnome-session'], stdout=subprocess.DEVNULL) != 0:
+ sys.stderr.write('You need gnome-session for this test suite.\n')
+ sys.exit(77)
+
+
+top_builddir = os.environ.get('TOP_BUILDDIR',
+ os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+def set_nonblock(fd):
+ '''Set a file object to non-blocking'''
+
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
+
+class GSDTestCase(X11SessionTestCase):
+ '''Base class for settings daemon tests
+
+ This redirects the XDG directories to temporary directories, and runs local
+ session and system D-BUSes with a minimal GNOME session and a mock
+ notification daemon. It also provides common functionality for plugin
+ tests.
+ '''
+ @classmethod
+ def setUpClass(klass):
+ os.environ['GIO_USE_VFS'] = 'local'
+ os.environ['GVFS_DISABLE_FUSE'] = '1'
+ # we do some string checks, disable translations
+ os.environ['LC_MESSAGES'] = 'C'
+ klass.workdir = tempfile.mkdtemp(prefix='gsd-plugin-test')
+ klass.addClassCleanup(shutil.rmtree, klass.workdir)
+
+ # Prevent applications from accessing an outside session manager
+ os.environ['SESSION_MANAGER'] = ''
+
+ # Signal to mutter and gnome-session that we are using X11
+ os.environ['XDG_SESSION_TYPE'] = 'x11'
+
+ # tell dconf and friends to use our config/runtime directories
+ os.environ['XDG_CONFIG_HOME'] = os.path.join(klass.workdir, 'config')
+ os.environ['XDG_DATA_HOME'] = os.path.join(klass.workdir, 'data')
+ os.environ['XDG_RUNTIME_DIR'] = os.path.join(klass.workdir, 'runtime')
+
+ # Make dconf discoverable (requires newer dbusmock API, is not needed otherwise)
+ if hasattr(klass, 'enable_service'):
+ klass.enable_service('ca.desrt.dconf')
+
+ # Copy gschema file into XDG_DATA_HOME
+ gschema_dir = os.path.join(os.environ['XDG_DATA_HOME'], 'glib-2.0', 'schemas')
+ os.makedirs(gschema_dir)
+ shutil.copy(os.path.join(top_builddir, 'data', 'gschemas.compiled'), gschema_dir)
+
+ # work around https://bugzilla.gnome.org/show_bug.cgi?id=689136
+ os.makedirs(os.path.join(os.environ['XDG_CONFIG_HOME'], 'dconf'))
+ os.makedirs(os.environ['XDG_RUNTIME_DIR'], mode=0o700)
+
+ # Starts Xvfb and dbus busses
+ X11SessionTestCase.setUpClass()
+
+ klass.system_bus_con = klass.get_dbus(True)
+ klass.session_bus_con = klass.get_dbus(False)
+ klass.addClassCleanup(klass.system_bus_con.close)
+ klass.addClassCleanup(klass.session_bus_con.close)
+
+ # we never want to cause notifications on the actual GUI
+ klass.p_notify_log = OutputChecker()
+ klass.p_notify = klass.spawn_server_template(
+ 'notification_daemon', {}, stdout=klass.p_notify_log.fd)[0]
+ klass.p_notify_log.writer_attached()
+ klass.addClassCleanup(klass.stop_process, klass.p_notify)
+
+ klass.configure_session()
+ klass.start_monitor()
+ klass.addClassCleanup(klass.stop_monitor)
+
+ # Reset between tests in tearDown
+ klass.settings_session = Gio.Settings(schema_id='org.gnome.desktop.session')
+
+ # Make sure we get a backtrace when meson kills after a timeout
+ def r(*args):
+ raise KeyboardInterrupt()
+ signal.signal(signal.SIGTERM, r)
+
+ def setUp(self):
+ self.daemon_death_expected = False
+
+ def tearDown(self):
+ # we check this at the end so that the other cleanup always happens
+ daemon_running = self.daemon.poll() == None
+ self.assertTrue(daemon_running or self.daemon_death_expected, 'daemon died during the test')
+
+ self.reset_settings(self.settings_session)
+
+ def run(self, result=None):
+ '''Show log files on failed tests
+
+ If the environment variable $SHELL_ON_FAIL is set, this runs bash in
+ the work directory; exit the shell to continue the tests. Otherwise it
+ shows all log files.
+ '''
+ if result:
+ orig_err_fail = len(result.errors) + len(result.failures)
+ super(GSDTestCase, self).run(result)
+ if result and len(result.errors) + len(result.failures) > orig_err_fail:
+ if 'SHELL_ON_FAIL' in os.environ:
+ subprocess.call(['bash', '-i'], cwd=self.workdir)
+ else:
+ for log_file in glob(os.path.join(self.workdir, '*.log')):
+ with open(log_file) as f:
+ print('\n----- %s -----\n%s\n------\n'
+ % (log_file, f.read()))
+
+ @classmethod
+ def configure_session(klass):
+ '''Configure minimal GNOME session'''
+
+ # create dummy session type and component
+ d = os.path.join(klass.workdir, 'config', 'gnome-session', 'sessions')
+ if not os.path.isdir(d):
+ os.makedirs(d)
+ shutil.copy(os.path.join(os.path.dirname(__file__), 'dummy.session'), d)
+
+ d = os.path.join(klass.workdir, 'data', 'applications')
+ if not os.path.isdir(d):
+ os.makedirs(d)
+ shutil.copy(os.path.join(os.path.dirname(__file__), 'dummyapp.desktop'), d)
+
+ def start_session(self):
+ self.session_log = OutputChecker()
+ self.session = subprocess.Popen(['gnome-session', '-f',
+ '-a', os.path.join(self.workdir, 'autostart'),
+ '--session=dummy', '--debug'],
+ stdout=self.session_log.fd,
+ stderr=subprocess.STDOUT)
+ self.session_log.writer_attached()
+
+ # wait until the daemon is on the bus
+ self.wait_for_bus_object('org.gnome.SessionManager',
+ '/org/gnome/SessionManager',
+ timeout=100)
+
+ self.session_log.check_line(b'fill: *** Done adding required components')
+
+ def stop_session(self):
+ '''Stop GNOME session'''
+
+ assert self.session
+ self.stop_process(self.session)
+ # dummyapp.desktop survives the session. This keeps the FD open in the
+ # CI environment when gnome-session fails to redirect the child output
+ # to journald.
+ # Though, gnome-session should probably kill the child anyway.
+ #self.session_log.assert_closed()
+ self.session_log.force_close()
+
+ @classmethod
+ def start_monitor(klass):
+ '''Start dbus-monitor'''
+
+ # You can rename the log file to *.log if you want to see it on test
+ # case failures
+ klass.monitor_log = open(os.path.join(klass.workdir, 'dbus-monitor.out'), 'wb', buffering=0)
+ klass.monitor = subprocess.Popen(['dbus-monitor', '--monitor'],
+ stdout=klass.monitor_log,
+ stderr=subprocess.STDOUT)
+
+ @classmethod
+ def stop_monitor(klass):
+ '''Stop dbus-monitor'''
+
+ assert klass.monitor
+ klass.stop_process(klass.monitor)
+
+ klass.monitor_log.flush()
+ klass.monitor_log.close()
+
+ def start_logind(self, parameters=None):
+ '''start mock logind'''
+
+ if parameters is None:
+ parameters = {}
+ self.logind_log = OutputChecker()
+ self.logind, self.logind_obj = self.spawn_server_template('logind',
+ parameters,
+ stdout=self.logind_log.fd)
+ self.logind_log.writer_attached()
+
+ # Monkey patch SuspendThenHibernate functions in for dbusmock <= 0.17.2
+ # This should be removed once we can depend on dbusmock 0.17.3
+ self.logind_obj.AddMethod('org.freedesktop.login1.Manager', 'SuspendThenHibernate', 'b', '', '')
+ self.logind_obj.AddMethod('org.freedesktop.login1.Manager', 'CanSuspendThenHibernate', '', 's', 'ret = "%s"' % parameters.get('CanSuspendThenHibernate', 'yes'))
+
+ self.logind_obj.AddMethod('org.freedesktop.login1.Session', 'SetBrightness', 'ssu', '', '')
+
+ def stop_logind(self):
+ '''stop mock logind'''
+
+ self.stop_process(self.logind)
+ self.logind_log.assert_closed()
+
+ @classmethod
+ def start_mutter(klass):
+ ''' start mutter '''
+
+ os.environ['MUTTER_DEBUG_RESET_IDLETIME']='1'
+ # See https://gitlab.gnome.org/GNOME/mutter/merge_requests/15
+ klass.mutter = subprocess.Popen(['mutter', '--x11'])
+ klass.wait_for_bus_object('org.gnome.Mutter.IdleMonitor',
+ '/org/gnome/Mutter/IdleMonitor/Core',
+ timeout=100)
+
+ @classmethod
+ def stop_mutter(klass):
+ '''stop mutter'''
+
+ assert klass.monitor
+ klass.stop_process(klass.mutter, timeout=2)
+
+ def start_plugin(self, env):
+ self.plugin_death_expected = False
+
+ # We need to redirect stdout to grab the debug messages.
+ # stderr is not needed by the testing infrastructure but is useful to
+ # see warnings and errors.
+ self.plugin_log = OutputChecker()
+ self.daemon = subprocess.Popen(
+ [os.path.join(top_builddir, 'plugins', self.gsd_plugin, 'gsd-' + self.gsd_plugin), '--verbose'],
+ stdout=self.plugin_log.fd,
+ stderr=subprocess.STDOUT,
+ env=env)
+ self.plugin_log.writer_attached()
+
+
+ bus = self.get_dbus(False)
+
+ timeout = 100
+ while timeout > 0:
+ if bus.name_has_owner('org.gnome.SettingsDaemon.' + self.gsd_plugin_case):
+ break
+
+ timeout -= 1
+ time.sleep(0.1)
+ if timeout <= 0:
+ assert timeout > 0, 'timed out waiting for plugin startup: %s' % (self.gsd_plugin_case)
+
+ def stop_plugin(self):
+ daemon_running = self.daemon.poll() == None
+ if daemon_running:
+ self.stop_process(self.daemon)
+ self.plugin_log.assert_closed()
+
+ def reset_settings(self, schema):
+ # reset all changed gsettings, so that tests are independent from each
+ # other
+ for k in schema.list_keys():
+ schema.reset(k)
+ Gio.Settings.sync()
+
+ @classmethod
+ def stop_process(cls, proc, timeout=1):
+ proc.terminate()
+ try:
+ proc.wait(timeout)
+ except:
+ print("Killing %d (%s) after timeout of %f seconds" % (proc.pid, proc.args[0], timeout))
+ proc.kill()
+ proc.wait()
+
+ @classmethod
+ def reset_idle_timer(klass):
+ '''trigger activity to reset idle timer'''
+
+ obj_mutter_idlemonitor = klass.session_bus_con.get_object(
+ 'org.gnome.Mutter.IdleMonitor', '/org/gnome/Mutter/IdleMonitor/Core')
+
+ obj_mutter_idlemonitor.ResetIdletime(dbus_interface='org.gnome.Mutter.IdleMonitor')
diff --git a/tests/output_checker.py b/tests/output_checker.py
new file mode 100644
index 0000000..df378cd
--- /dev/null
+++ b/tests/output_checker.py
@@ -0,0 +1,196 @@
+#! /usr/bin/env python3
+# Copyright © 2020, RedHat Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+# Authors:
+# Benjamin Berg <bberg@redhat.com>
+
+import os
+import sys
+import fcntl
+import io
+import re
+import time
+import threading
+import select
+import errno
+
+class OutputChecker(object):
+
+ def __init__(self, out=sys.stdout):
+ self._output = out
+ self._pipe_fd_r, self._pipe_fd_w = os.pipe()
+ self._eof = False
+ self._partial_buf = b''
+ self._lines_sem = threading.Semaphore()
+ self._lines = []
+ self._reader_io = io.StringIO()
+
+ # Just to be sure, shouldn't be a problem even if we didn't set it
+ fcntl.fcntl(self._pipe_fd_r, fcntl.F_SETFL,
+ fcntl.fcntl(self._pipe_fd_r, fcntl.F_GETFL) | os.O_CLOEXEC | os.O_NONBLOCK)
+ fcntl.fcntl(self._pipe_fd_w, fcntl.F_SETFL,
+ fcntl.fcntl(self._pipe_fd_w, fcntl.F_GETFL) | os.O_CLOEXEC)
+
+ # Start copier thread
+ self._thread = threading.Thread(target=self._copy, daemon=True)
+ self._thread.start()
+
+ def _copy(self):
+ p = select.poll()
+ p.register(self._pipe_fd_r)
+ while True:
+ try:
+ # Be lazy and wake up occasionally in case _pipe_fd_r became invalid
+ # The reason to do this is because os.read() will *not* return if the
+ # FD is forcefully closed.
+ p.poll(0.1)
+
+ r = os.read(self._pipe_fd_r, 1024)
+ if not r:
+ self._eof = True
+ self._lines_sem.release()
+ return
+ except OSError as e:
+ if e.errno == errno.EWOULDBLOCK:
+ continue
+
+ # We get a bad file descriptor error when the outside closes the FD
+ self._lines_sem.release()
+ return
+
+ l = r.split(b'\n')
+ l[0] = self._partial_buf + l[0]
+ self._lines.extend(l[:-1])
+ self._partial_buf = l[-1]
+
+ self._lines_sem.release()
+
+ os.write(self._output.fileno(), r)
+
+ def check_line_re(self, needle_re, timeout=0, failmsg=None):
+ deadline = time.time() + timeout
+
+ if isinstance(needle_re, str):
+ needle_re = needle_re.encode('ascii')
+
+ r = re.compile(needle_re)
+ ret = []
+
+ while True:
+ try:
+ l = self._lines.pop(0)
+ except IndexError:
+ # EOF, throw error
+ if self._eof:
+ if failmsg:
+ raise AssertionError("No further messages: " % failmsg)
+ else:
+ raise AssertionError('No client waiting for needle %s' % (str(needle_re)))
+
+ # Check if should wake up
+ if not self._lines_sem.acquire(timeout = deadline - time.time()):
+ if failmsg:
+ raise AssertionError(failmsg)
+ else:
+ raise AssertionError('Timed out waiting for needle %s (timeout: %0.2f)' % (str(needle_re), timeout))
+ continue
+
+ ret.append(l)
+ if r.search(l):
+ return ret
+
+ def check_line(self, needle, timeout=0, failmsg=None):
+ if isinstance(needle, str):
+ needle = needle.encode('ascii')
+
+ needle_re = re.escape(needle)
+
+ return self.check_line_re(needle_re, timeout=timeout, failmsg=failmsg)
+
+ def check_no_line_re(self, needle_re, wait=0, failmsg=None):
+ deadline = time.time() + wait
+
+ if isinstance(needle_re, str):
+ needle_re = needle_re.encode('ascii')
+
+ r = re.compile(needle_re)
+ ret = []
+
+ while True:
+ try:
+ l = self._lines.pop(0)
+ except IndexError:
+ # EOF, so everything good
+ if self._pipe_fd_r == -1:
+ break
+
+ # Check if should wake up
+ if not self._lines_sem.acquire(timeout = deadline - time.time()):
+ # Timed out, so everything is good
+ break
+ continue
+
+ ret.append(l)
+ if r.search(l):
+ if failmsg:
+ raise AssertionError(failmsg)
+ else:
+ raise AssertionError('Found needle %s but shouldn\'t have been there (timeout: %0.2f)' % (str(needle_re), wait))
+
+ return ret
+
+ def check_no_line(self, needle, wait=0, failmsg=None):
+ if isinstance(needle, str):
+ needle = needle.encode('ascii')
+
+ needle_re = re.escape(needle)
+
+ return self.check_no_line_re(needle_re, wait=wait, failmsg=failmsg)
+
+ def clear(self):
+ ret = self._lines
+ self._lines = []
+ return ret
+
+ def assert_closed(self, timeout=1):
+ self._thread.join(timeout)
+
+ if self._thread.is_alive() != False:
+ raise AssertionError("OutputCheck: Write side has not been closed yet!")
+
+ def force_close(self):
+
+ fd = self._pipe_fd_r
+ self._pipe_fd_r = -1
+ if fd >= 0:
+ os.close(fd)
+
+ self._thread.join()
+
+ @property
+ def fd(self):
+ return self._pipe_fd_w
+
+ def writer_attached(self):
+ os.close(self._pipe_fd_w)
+ self._pipe_fd_w = -1
+
+ def __del__(self):
+ if self._pipe_fd_r >= 0:
+ os.close(self._pipe_fd_r)
+ self._pipe_fd_r = -1
+ if self._pipe_fd_w >= 0:
+ os.close(self._pipe_fd_w)
+ self._pipe_fd_w = -1
diff --git a/tests/x11session.py b/tests/x11session.py
new file mode 100644
index 0000000..5678073
--- /dev/null
+++ b/tests/x11session.py
@@ -0,0 +1,120 @@
+import os
+import sys
+import subprocess
+from dbusmock import DBusTestCase
+
+# This file has been submitted for inclusion in python-dbusmock
+# https://github.com/martinpitt/python-dbusmock/pull/44
+
+if sys.version_info[0] < 3:
+ PIPE_DEVNULL = open(os.devnull, 'wb')
+
+ def Popen(*args, **kwargs):
+ if 'pass_fds' in kwargs:
+ pass_fds = kwargs['pass_fds']
+ del kwargs['pass_fds']
+ else:
+ pass_fds = []
+
+ orig_preexec_fn = kwargs.get('preexec_fn', None)
+
+ def _setup():
+ for fd in range(3, subprocess.MAXFD):
+ if fd in pass_fds:
+ continue
+
+ try:
+ os.close(fd)
+ except OSError:
+ pass
+
+ if orig_preexec_fn:
+ orig_preexec_fn()
+
+ # Don't let subprocess close FDs for us
+ kwargs['close_fds'] = False
+ kwargs['preexec_fn'] = _setup
+
+ return subprocess.Popen(*args, **kwargs)
+
+else:
+ PIPE_DEVNULL = subprocess.DEVNULL
+ Popen = subprocess.Popen
+
+
+class X11SessionTestCase(DBusTestCase):
+ #: The display the X server is running on
+ X_display = -1
+ #: The X server to start
+ Xserver = 'Xvfb'
+ #: Further parameters for the X server
+ Xserver_args = ['-screen', '0', '1280x1024x24', '+extension', 'GLX']
+ #: Where to redirect the X stdout and stderr to. Set to None for debugging
+ #: purposes if the X server is failing for some reason.
+ Xserver_output = PIPE_DEVNULL
+
+ @classmethod
+ def setUpClass(klass):
+ klass.start_xorg()
+ klass.addClassCleanup(klass.stop_xorg)
+
+ klass.start_system_bus()
+ klass.start_session_bus()
+
+ @classmethod
+ def start_xorg(klass):
+ r, w = os.pipe()
+
+ klass.xorg = Popen([klass.Xserver, '-displayfd', "%d" % w, '-noreset'] + klass.Xserver_args,
+ pass_fds=(w,),
+ stdout=klass.Xserver_output,
+ stderr=subprocess.STDOUT)
+ os.close(w)
+
+ # The X server will write "%d\n", we need to make sure to read the "\n".
+ # If the server dies we get zero bytes reads as EOF is reached.
+ display = b''
+ while True:
+ tmp = os.read(r, 1024)
+ display += tmp
+
+ # Break out if the read was empty or the line feed was read
+ if not tmp or tmp[-1] == b'\n':
+ break
+
+ os.close(r)
+
+ try:
+ display = int(display.strip())
+ except ValueError:
+ # This should never happen, the X server didn't return a proper integer.
+ # Most likely it died for some odd reason.
+ # Note: Set Xserver_output to None to debug Xvfb startup issues.
+ klass.stop_xorg()
+ raise AssertionError('X server reported back no or an invalid display number (%s)' % (display))
+
+ klass.X_display = display
+ # Export information into our environment for tests to use
+ os.environ['DISPLAY'] = ":%d" % klass.X_display
+ if 'GNOME_SETUP_DISPLAY' in os.environ:
+ del os.environ['GNOME_SETUP_DISPLAY']
+ if 'WAYLAND_DISPLAY' in os.environ:
+ del os.environ['WAYLAND_DISPLAY']
+
+ # Server should still be up and running at this point
+ assert klass.xorg.poll() is None
+
+ return klass.X_display
+
+ @classmethod
+ def stop_xorg(klass):
+ if hasattr(klass, 'xorg'):
+ klass.X_display = -1
+ klass.xorg.terminate()
+ try:
+ klass.xorg.wait(1)
+ except:
+ klass.xorg.kill()
+ klass.xorg.wait()
+ del klass.xorg
+