summaryrefslogtreecommitdiffstats
path: root/port_for/api.py
blob: c4e15e69b2a0bee1c3ef8de879c7367ab8e6020f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""main port-for functionality."""
import contextlib
import errno
import random
import socket
from itertools import chain
from typing import Iterable, List, Optional, Set, Tuple, Type, TypeVar, Union

from port_for import ephemeral, utils

from ._ranges import UNASSIGNED_RANGES
from .exceptions import PortForException

SYSTEM_PORT_RANGE = (0, 1024)


def select_random(
    ports: Optional[Set[int]] = None,
    exclude_ports: Optional[Iterable[int]] = None,
) -> int:
    """Return random unused port number."""
    if ports is None:
        ports = available_good_ports()

    if exclude_ports is None:
        exclude_ports = set()

    ports.difference_update(set(exclude_ports))

    for port in random.sample(tuple(ports), min(len(ports), 100)):
        if not port_is_used(port):
            return port
    raise PortForException("Can't select a port")


def is_available(port: int) -> bool:
    """Return if port is good to choose."""
    return port in available_ports() and not port_is_used(port)


def available_ports(
    low: int = 1024,
    high: int = 65535,
    exclude_ranges: Optional[List[Tuple[int, int]]] = None,
) -> Set[int]:
    """Return a set of possible ports.

    .. note::

        Excluding system, ephemeral and well-known ports.

    Pass ``high`` and/or ``low`` to limit the port range.
    """
    if exclude_ranges is None:
        exclude_ranges = []
    available = utils.ranges_to_set(UNASSIGNED_RANGES)
    exclude = utils.ranges_to_set(
        # Motivation behind excluding ephemeral port ranges:
        # let's say you decided to use an ephemeral local port
        # as a persistent port, and "reserve" it to your software.
        # OS won't know about it, and still can try to use this port.
        # This is not a problem if your service is always running and occupying
        # this port (OS would pick next one). But if the service is temporarily
        # not using the port (because of restart of other reason),
        # OS might reuse the same port,
        # which might prevent the service from starting.
        ephemeral.port_ranges()
        + exclude_ranges
        + [SYSTEM_PORT_RANGE, (SYSTEM_PORT_RANGE[1], low), (high, 65536)]
    )
    return available.difference(exclude)


def good_port_ranges(
    ports: Optional[Set[int]] = None, min_range_len: int = 20, border: int = 3
) -> List[Tuple[int, int]]:
    """Return a list of 'good' port ranges.

    Such ranges are large and don't contain ephemeral or well-known ports.
    Ranges borders are also excluded.
    """
    min_range_len += border * 2
    if ports is None:
        ports = available_ports()
    ranges = utils.to_ranges(list(ports))
    lenghts = sorted([(r[1] - r[0], r) for r in ranges], reverse=True)
    long_ranges = [
        length[1] for length in lenghts if length[0] >= min_range_len
    ]
    without_borders = [
        (low + border, high - border) for low, high in long_ranges
    ]
    return without_borders


def available_good_ports(min_range_len: int = 20, border: int = 3) -> Set[int]:
    """List available good ports."""
    return utils.ranges_to_set(
        good_port_ranges(min_range_len=min_range_len, border=border)
    )


def port_is_used(port: int, host: str = "127.0.0.1") -> bool:
    """Return if port is used.

    Port is considered used if the current process
    can't bind to it or the port doesn't refuse connections.
    """
    unused = _can_bind(port, host) and _refuses_connection(port, host)
    return not unused


def _can_bind(port: int, host: str) -> bool:
    sock = socket.socket()
    with contextlib.closing(sock):
        try:
            sock.bind((host, port))
        except socket.error:
            return False
    return True


def _refuses_connection(port: int, host: str) -> bool:
    sock = socket.socket()
    with contextlib.closing(sock):
        sock.settimeout(1)
        err = sock.connect_ex((host, port))
        return err == errno.ECONNREFUSED


T = TypeVar("T")


def filter_by_type(lst: Iterable, type_of: Type[T]) -> List[T]:
    """Return a list of elements with given type."""
    return [e for e in lst if isinstance(e, type_of)]


PortType = Union[
    str,
    int,
    Tuple[int, int],
    Set[int],
    List[str],
    List[int],
    List[Tuple[int, int]],
    List[Set[int]],
    List[Union[Set[int], Tuple[int, int]]],
    List[Union[str, int, Tuple[int, int], Set[int]]],
]


def get_port(
    ports: Optional[PortType],
    exclude_ports: Optional[Iterable[int]] = None,
) -> Optional[int]:
    """Retun a random available port.

    If there's only one port passed (e.g. 5000 or '5000') function
    does not check if port is available.
    If there's -1 passed as an argument, function returns None.

    :param ports:
        exact port (e.g. '8000', 8000)
        randomly selected port (None) - any random available port
        [(2000,3000)] or (2000,3000) - random available port from a given range
        [{4002,4003}] or {4002,4003} - random of 4002 or 4003 ports
        [(2000,3000), {4002,4003}] -random of given range and set
    :param exclude_ports: A set of known ports that can not be selected.
    :returns: a random free port
    :raises: ValueError
    """
    if ports == -1:
        return None
    elif not ports:
        return select_random(None, exclude_ports)

    try:
        return int(ports)  # type: ignore[arg-type]
    except TypeError:
        pass

    ports_set: Set[int] = set()

    try:
        if not isinstance(ports, list):
            ports = [ports]
        ranges: Set[int] = utils.ranges_to_set(
            filter_by_type(ports, tuple)  # type: ignore[arg-type]
        )
        nums: Set[int] = set(filter_by_type(ports, int))
        sets: Set[int] = set(
            chain(
                *filter_by_type(
                    ports, (set, frozenset)  # type: ignore[arg-type]
                )
            )
        )
        ports_set = ports_set.union(ranges, sets, nums)
    except ValueError:
        raise PortForException(
            "Unknown format of ports: %s.\n"
            'You should provide a ports range "[(4000,5000)]"'
            'or "(4000,5000)" or a comma-separated ports set'
            '"[{4000,5000,6000}]" or list of ints "[400,5000,6000,8000]"'
            'or all of them "[(20000, 30000), {48889, 50121}, 4000, 4004]"'
            % (ports,)
        )

    return select_random(ports_set, exclude_ports)