summaryrefslogtreecommitdiffstats
path: root/pendulum/tz/timezone.py
diff options
context:
space:
mode:
Diffstat (limited to 'pendulum/tz/timezone.py')
-rw-r--r--pendulum/tz/timezone.py377
1 files changed, 377 insertions, 0 deletions
diff --git a/pendulum/tz/timezone.py b/pendulum/tz/timezone.py
new file mode 100644
index 0000000..bc94d56
--- /dev/null
+++ b/pendulum/tz/timezone.py
@@ -0,0 +1,377 @@
+from datetime import datetime
+from datetime import timedelta
+from datetime import tzinfo
+from typing import Optional
+from typing import TypeVar
+from typing import overload
+
+import pendulum
+
+from pendulum.helpers import local_time
+from pendulum.helpers import timestamp
+from pendulum.utils._compat import _HAS_FOLD
+
+from .exceptions import AmbiguousTime
+from .exceptions import NonExistingTime
+from .zoneinfo import read
+from .zoneinfo import read_file
+from .zoneinfo.transition import Transition
+
+
+POST_TRANSITION = "post"
+PRE_TRANSITION = "pre"
+TRANSITION_ERROR = "error"
+
+_datetime = datetime
+_D = TypeVar("_D", bound=datetime)
+
+
+class Timezone(tzinfo):
+ """
+ Represents a named timezone.
+
+ The accepted names are those provided by the IANA time zone database.
+
+ >>> from pendulum.tz.timezone import Timezone
+ >>> tz = Timezone('Europe/Paris')
+ """
+
+ def __init__(self, name, extended=True): # type: (str, bool) -> None
+ tz = read(name, extend=extended)
+
+ self._name = name
+ self._transitions = tz.transitions
+ self._hint = {True: None, False: None}
+
+ @property
+ def name(self): # type: () -> str
+ return self._name
+
+ def convert(self, dt, dst_rule=None): # type: (_D, Optional[str]) -> _D
+ """
+ Converts a datetime in the current timezone.
+
+ If the datetime is naive, it will be normalized.
+
+ >>> from datetime import datetime
+ >>> from pendulum import timezone
+ >>> paris = timezone('Europe/Paris')
+ >>> dt = datetime(2013, 3, 31, 2, 30, fold=1)
+ >>> in_paris = paris.convert(dt)
+ >>> in_paris.isoformat()
+ '2013-03-31T03:30:00+02:00'
+
+ If the datetime is aware, it will be properly converted.
+
+ >>> new_york = timezone('America/New_York')
+ >>> in_new_york = new_york.convert(in_paris)
+ >>> in_new_york.isoformat()
+ '2013-03-30T21:30:00-04:00'
+ """
+ if dt.tzinfo is None:
+ return self._normalize(dt, dst_rule=dst_rule)
+
+ return self._convert(dt)
+
+ def datetime(
+ self, year, month, day, hour=0, minute=0, second=0, microsecond=0
+ ): # type: (int, int, int, int, int, int, int) -> _datetime
+ """
+ Return a normalized datetime for the current timezone.
+ """
+ if _HAS_FOLD:
+ return self.convert(
+ datetime(year, month, day, hour, minute, second, microsecond, fold=1)
+ )
+
+ return self.convert(
+ datetime(year, month, day, hour, minute, second, microsecond),
+ dst_rule=POST_TRANSITION,
+ )
+
+ def _normalize(self, dt, dst_rule=None): # type: (_D, Optional[str]) -> _D
+ sec = timestamp(dt)
+ fold = 0
+ transition = self._lookup_transition(sec)
+
+ if not _HAS_FOLD and dst_rule is None:
+ dst_rule = POST_TRANSITION
+
+ if dst_rule is None:
+ dst_rule = PRE_TRANSITION
+ if dt.fold == 1:
+ dst_rule = POST_TRANSITION
+
+ if sec < transition.local:
+ if transition.is_ambiguous(sec):
+ # Ambiguous time
+ if dst_rule == TRANSITION_ERROR:
+ raise AmbiguousTime(dt)
+
+ # We set the fold attribute for later
+ if dst_rule == POST_TRANSITION:
+ fold = 1
+ elif transition.previous is not None:
+ transition = transition.previous
+
+ if transition:
+ if transition.is_ambiguous(sec):
+ # Ambiguous time
+ if dst_rule == TRANSITION_ERROR:
+ raise AmbiguousTime(dt)
+
+ # We set the fold attribute for later
+ if dst_rule == POST_TRANSITION:
+ fold = 1
+ elif transition.is_missing(sec):
+ # Skipped time
+ if dst_rule == TRANSITION_ERROR:
+ raise NonExistingTime(dt)
+
+ # We adjust accordingly
+ if dst_rule == POST_TRANSITION:
+ sec += transition.fix
+ fold = 1
+ else:
+ sec -= transition.fix
+
+ kwargs = {"tzinfo": self}
+ if _HAS_FOLD or isinstance(dt, pendulum.DateTime):
+ kwargs["fold"] = fold
+
+ return dt.__class__(*local_time(sec, 0, dt.microsecond), **kwargs)
+
+ def _convert(self, dt): # type: (_D) -> _D
+ if dt.tzinfo is self:
+ return self._normalize(dt, dst_rule=POST_TRANSITION)
+
+ if not isinstance(dt.tzinfo, Timezone):
+ return dt.astimezone(self)
+
+ stamp = timestamp(dt)
+
+ if isinstance(dt.tzinfo, FixedTimezone):
+ offset = dt.tzinfo.offset
+ else:
+ transition = dt.tzinfo._lookup_transition(stamp)
+ offset = transition.ttype.offset
+
+ if stamp < transition.local and transition.previous is not None:
+ if (
+ transition.previous.is_ambiguous(stamp)
+ and getattr(dt, "fold", 1) == 0
+ ):
+ pass
+ else:
+ offset = transition.previous.ttype.offset
+
+ stamp -= offset
+
+ transition = self._lookup_transition(stamp, is_utc=True)
+ if stamp < transition.at and transition.previous is not None:
+ transition = transition.previous
+
+ offset = transition.ttype.offset
+ stamp += offset
+ fold = int(not transition.ttype.is_dst())
+
+ kwargs = {"tzinfo": self}
+
+ if _HAS_FOLD or isinstance(dt, pendulum.DateTime):
+ kwargs["fold"] = fold
+
+ return dt.__class__(*local_time(stamp, 0, dt.microsecond), **kwargs)
+
+ def _lookup_transition(
+ self, stamp, is_utc=False
+ ): # type: (int, bool) -> Transition
+ lo, hi = 0, len(self._transitions)
+ hint = self._hint[is_utc]
+ if hint:
+ if stamp == hint[0]:
+ return self._transitions[hint[1]]
+ elif stamp < hint[0]:
+ hi = hint[1]
+ else:
+ lo = hint[1]
+
+ if not is_utc:
+ while lo < hi:
+ mid = (lo + hi) // 2
+ if stamp < self._transitions[mid].to:
+ hi = mid
+ else:
+ lo = mid + 1
+ else:
+ while lo < hi:
+ mid = (lo + hi) // 2
+ if stamp < self._transitions[mid].at:
+ hi = mid
+ else:
+ lo = mid + 1
+
+ if lo >= len(self._transitions):
+ # Beyond last transition
+ lo = len(self._transitions) - 1
+
+ self._hint[is_utc] = (stamp, lo)
+
+ return self._transitions[lo]
+
+ @overload
+ def utcoffset(self, dt): # type: (None) -> None
+ pass
+
+ @overload
+ def utcoffset(self, dt): # type: (_datetime) -> timedelta
+ pass
+
+ def utcoffset(self, dt):
+ if dt is None:
+ return
+
+ transition = self._get_transition(dt)
+
+ return transition.utcoffset()
+
+ def dst(
+ self, dt # type: Optional[_datetime]
+ ): # type: (...) -> Optional[timedelta]
+ if dt is None:
+ return
+
+ transition = self._get_transition(dt)
+
+ if not transition.ttype.is_dst():
+ return timedelta()
+
+ return timedelta(seconds=transition.fix)
+
+ def tzname(self, dt): # type: (Optional[_datetime]) -> Optional[str]
+ if dt is None:
+ return
+
+ transition = self._get_transition(dt)
+
+ return transition.ttype.abbreviation
+
+ def _get_transition(self, dt): # type: (_datetime) -> Transition
+ if dt.tzinfo is not None and dt.tzinfo is not self:
+ dt = dt - dt.utcoffset()
+
+ stamp = timestamp(dt)
+
+ transition = self._lookup_transition(stamp, is_utc=True)
+ else:
+ stamp = timestamp(dt)
+
+ transition = self._lookup_transition(stamp)
+
+ if stamp < transition.local and transition.previous is not None:
+ fold = getattr(dt, "fold", 1)
+ if transition.is_ambiguous(stamp):
+ if fold == 0:
+ transition = transition.previous
+ elif transition.previous.is_ambiguous(stamp) and fold == 0:
+ pass
+ else:
+ transition = transition.previous
+
+ return transition
+
+ def fromutc(self, dt): # type: (_D) -> _D
+ stamp = timestamp(dt)
+
+ transition = self._lookup_transition(stamp, is_utc=True)
+ if stamp < transition.at and transition.previous is not None:
+ transition = transition.previous
+
+ stamp += transition.ttype.offset
+
+ return dt.__class__(*local_time(stamp, 0, dt.microsecond), tzinfo=self)
+
+ def __repr__(self): # type: () -> str
+ return "Timezone('{}')".format(self._name)
+
+ def __getinitargs__(self): # type: () -> tuple
+ return (self._name,)
+
+
+class FixedTimezone(Timezone):
+ def __init__(self, offset, name=None):
+ sign = "-" if offset < 0 else "+"
+
+ minutes = offset / 60
+ hour, minute = divmod(abs(int(minutes)), 60)
+
+ if not name:
+ name = "{0}{1:02d}:{2:02d}".format(sign, hour, minute)
+
+ self._name = name
+ self._offset = offset
+ self._utcoffset = timedelta(seconds=offset)
+
+ @property
+ def offset(self): # type: () -> int
+ return self._offset
+
+ def _normalize(self, dt, dst_rule=None): # type: (_D, Optional[str]) -> _D
+ if _HAS_FOLD:
+ dt = dt.__class__(
+ dt.year,
+ dt.month,
+ dt.day,
+ dt.hour,
+ dt.minute,
+ dt.second,
+ dt.microsecond,
+ tzinfo=self,
+ fold=0,
+ )
+ else:
+ dt = dt.__class__(
+ dt.year,
+ dt.month,
+ dt.day,
+ dt.hour,
+ dt.minute,
+ dt.second,
+ dt.microsecond,
+ tzinfo=self,
+ )
+
+ return dt
+
+ def _convert(self, dt): # type: (_D) -> _D
+ if dt.tzinfo is not self:
+ return dt.astimezone(self)
+
+ return dt
+
+ def utcoffset(self, dt): # type: (Optional[_datetime]) -> timedelta
+ return self._utcoffset
+
+ def dst(self, dt): # type: (Optional[_datetime]) -> timedelta
+ return timedelta()
+
+ def fromutc(self, dt): # type: (_D) -> _D
+ # Use the stdlib datetime's add method to avoid infinite recursion
+ return (datetime.__add__(dt, self._utcoffset)).replace(tzinfo=self)
+
+ def tzname(self, dt): # type: (Optional[_datetime]) -> Optional[str]
+ return self._name
+
+ def __getinitargs__(self): # type: () -> tuple
+ return self._offset, self._name
+
+
+class TimezoneFile(Timezone):
+ def __init__(self, path):
+ tz = read_file(path)
+
+ self._name = ""
+ self._transitions = tz.transitions
+ self._hint = {True: None, False: None}
+
+
+UTC = FixedTimezone(0, "UTC")