summaryrefslogtreecommitdiffstats
path: root/port_for/store.py
blob: 685753c9990ba2a2b4fde3df2451a56030431ed5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""PortStore implementation."""
import os
from configparser import DEFAULTSECT, ConfigParser
from typing import List, Optional, Tuple, Union

from .api import select_random
from .exceptions import PortForException

DEFAULT_CONFIG_PATH = "/etc/port-for.conf"


class PortStore(object):
    """PortStore binds, reads and stores bound ports in config."""

    def __init__(self, config_filename: str = DEFAULT_CONFIG_PATH):
        """Initialize PortStore."""
        self._config = config_filename

    def bind_port(
        self, app: str, port: Optional[Union[int, str]] = None
    ) -> int:
        """Binds port to app in the config."""
        if "=" in app or ":" in app:
            raise Exception('invalid app name: "%s"' % app)

        requested_port: Optional[str] = None
        if port is not None:
            requested_port = str(port)

        parser = self._get_parser()

        # this app already use some port; return it
        if parser.has_option(DEFAULTSECT, app):
            actual_port = parser.get(DEFAULTSECT, app)
            if requested_port is not None and requested_port != actual_port:
                msg = (
                    "Can't bind to port %s: %s is already associated "
                    "with port %s" % (requested_port, app, actual_port)
                )
                raise PortForException(msg)
            return int(actual_port)

        # port is already used by an another app
        app_by_port = dict((v, k) for k, v in parser.items(DEFAULTSECT))
        bound_port_numbers = map(int, app_by_port.keys())

        if requested_port is None:
            requested_port = str(
                select_random(exclude_ports=bound_port_numbers)
            )

        if requested_port in app_by_port:
            binding_app = app_by_port[requested_port]
            if binding_app != app:
                raise PortForException(
                    "Port %s is already used by %s!"
                    % (requested_port, binding_app)
                )

        # new app & new port
        parser.set(DEFAULTSECT, app, requested_port)
        self._save(parser)

        return int(requested_port)

    def unbind_port(self, app: str) -> None:
        """Remove port assignement to application."""
        parser = self._get_parser()
        parser.remove_option(DEFAULTSECT, app)
        self._save(parser)

    def bound_ports(self) -> List[Tuple[str, int]]:
        """List all bound ports."""
        return [
            (app, int(port))
            for app, port in self._get_parser().items(DEFAULTSECT)
        ]

    def _ensure_config_exists(self) -> None:
        if not os.path.exists(self._config):
            with open(self._config, "wb"):
                pass

    def _get_parser(self) -> ConfigParser:
        self._ensure_config_exists()
        parser = ConfigParser()
        parser.read(self._config)
        return parser

    def _save(self, parser: ConfigParser) -> None:
        with open(self._config, "wt") as f:
            parser.write(f)