summaryrefslogtreecommitdiffstats
path: root/test/test-avahi.py
blob: 3529104c06c8bf33d3958f98536f1e874ed03899 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/python3
import shutil
import logging
import unittest
from staslib import avahi
import dasbus.connection
import subprocess

SYSTEMCTL = shutil.which('systemctl')


class Test(unittest.TestCase):
    '''Unit tests for class Avahi'''

    def test_new(self):
        sysbus = dasbus.connection.SystemMessageBus()
        srv = avahi.Avahi(sysbus, lambda: "ok")
        self.assertEqual(srv.info(), {'avahi wake up timer': '60.0s [off]', 'service types': [], 'services': {}})
        self.assertEqual(srv.get_controllers(), [])

        try:
            # Check that the Avahi daemon is running
            subprocess.run([SYSTEMCTL, 'is-active', 'avahi-daemon.service'], check=True)
            self.assertFalse(srv._on_kick_avahi())
        except subprocess.CalledProcessError:
            self.assertTrue(srv._on_kick_avahi())

        with self.assertLogs(logger=logging.getLogger(), level='INFO') as captured:
            srv._avahi_available(None)
        self.assertEqual(len(captured.records), 1)
        self.assertEqual(captured.records[0].getMessage(), "avahi-daemon service available, zeroconf supported.")
        with self.assertLogs(logger=logging.getLogger(), level='WARN') as captured:
            srv._avahi_unavailable(None)
        self.assertEqual(len(captured.records), 1)
        self.assertEqual(captured.records[0].getMessage(), "avahi-daemon not available, zeroconf not supported.")
        srv.kill()
        self.assertEqual(srv.info(), {'avahi wake up timer': 'None', 'service types': [], 'services': {}})


if __name__ == '__main__':
    unittest.main()