summaryrefslogtreecommitdiffstats
path: root/pendulum/testing/traveller.py
blob: 3c1d885fb1ff730296ba4133e96e7d9c7a934977 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import cast

from pendulum.datetime import DateTime
from pendulum.utils._compat import PYPY

if TYPE_CHECKING:
    from types import TracebackType


class BaseTraveller:
    def __init__(self, datetime_class: type[DateTime] = DateTime) -> None:
        self._datetime_class: type[DateTime] = datetime_class

    def freeze(self: BaseTraveller) -> BaseTraveller:
        raise NotImplementedError()

    def travel_back(self: BaseTraveller) -> BaseTraveller:
        raise NotImplementedError()

    def travel(
        self,
        years: int = 0,
        months: int = 0,
        weeks: int = 0,
        days: int = 0,
        hours: int = 0,
        minutes: int = 0,
        seconds: int = 0,
        microseconds: int = 0,
    ) -> BaseTraveller:
        raise NotImplementedError()

    def travel_to(self, dt: DateTime) -> BaseTraveller:
        raise NotImplementedError()


if not PYPY:
    import time_machine

    class Traveller(BaseTraveller):
        def __init__(self, datetime_class: type[DateTime] = DateTime) -> None:
            super().__init__(datetime_class)

            self._started: bool = False
            self._traveller: time_machine.travel | None = None
            self._coordinates: time_machine.Coordinates | None = None

        def freeze(self) -> Traveller:
            if self._started:
                cast(time_machine.Coordinates, self._coordinates).move_to(
                    self._datetime_class.now(), tick=False
                )
            else:
                self._start(freeze=True)

            return self

        def travel_back(self) -> Traveller:
            if not self._started:
                return self

            cast(time_machine.travel, self._traveller).stop()
            self._coordinates = None
            self._traveller = None
            self._started = False

            return self

        def travel(
            self,
            years: int = 0,
            months: int = 0,
            weeks: int = 0,
            days: int = 0,
            hours: int = 0,
            minutes: int = 0,
            seconds: int = 0,
            microseconds: int = 0,
            *,
            freeze: bool = False,
        ) -> Traveller:
            self._start(freeze=freeze)

            cast(time_machine.Coordinates, self._coordinates).move_to(
                self._datetime_class.now().add(
                    years=years,
                    months=months,
                    weeks=weeks,
                    days=days,
                    hours=hours,
                    minutes=minutes,
                    seconds=seconds,
                    microseconds=microseconds,
                )
            )

            return self

        def travel_to(self, dt: DateTime, *, freeze: bool = False) -> Traveller:
            self._start(freeze=freeze)

            cast(time_machine.Coordinates, self._coordinates).move_to(dt)

            return self

        def _start(self, freeze: bool = False) -> None:
            if self._started:
                return

            if not self._traveller:
                self._traveller = time_machine.travel(
                    self._datetime_class.now(), tick=not freeze
                )

            self._coordinates = self._traveller.start()

            self._started = True

        def __enter__(self) -> Traveller:
            self._start()

            return self

        def __exit__(
            self,
            exc_type: type[BaseException] | None,
            exc_val: BaseException | None,
            exc_tb: TracebackType,
        ) -> None:
            self.travel_back()

else:

    class Traveller(BaseTraveller):  # type: ignore[no-redef]

        ...