diff options
Diffstat (limited to 'pendulum/tz/zoneinfo')
-rw-r--r-- | pendulum/tz/zoneinfo/__init__.py | 16 | ||||
-rw-r--r-- | pendulum/tz/zoneinfo/exceptions.py | 18 | ||||
-rw-r--r-- | pendulum/tz/zoneinfo/posix_timezone.py | 270 | ||||
-rw-r--r-- | pendulum/tz/zoneinfo/reader.py | 224 | ||||
-rw-r--r-- | pendulum/tz/zoneinfo/timezone.py | 128 | ||||
-rw-r--r-- | pendulum/tz/zoneinfo/transition.py | 77 | ||||
-rw-r--r-- | pendulum/tz/zoneinfo/transition_type.py | 35 |
7 files changed, 768 insertions, 0 deletions
diff --git a/pendulum/tz/zoneinfo/__init__.py b/pendulum/tz/zoneinfo/__init__.py new file mode 100644 index 0000000..c183365 --- /dev/null +++ b/pendulum/tz/zoneinfo/__init__.py @@ -0,0 +1,16 @@ +from .reader import Reader +from .timezone import Timezone + + +def read(name, extend=True): # type: (str, bool) -> Timezone + """ + Read the zoneinfo structure for a given timezone name. + """ + return Reader(extend=extend).read_for(name) + + +def read_file(path, extend=True): # type: (str, bool) -> Timezone + """ + Read the zoneinfo structure for a given path. + """ + return Reader(extend=extend).read(path) diff --git a/pendulum/tz/zoneinfo/exceptions.py b/pendulum/tz/zoneinfo/exceptions.py new file mode 100644 index 0000000..5412181 --- /dev/null +++ b/pendulum/tz/zoneinfo/exceptions.py @@ -0,0 +1,18 @@ +class ZoneinfoError(Exception): + + pass + + +class InvalidZoneinfoFile(ZoneinfoError): + + pass + + +class InvalidTimezone(ZoneinfoError): + def __init__(self, name): + super(InvalidTimezone, self).__init__('Invalid timezone "{}"'.format(name)) + + +class InvalidPosixSpec(ZoneinfoError): + def __init__(self, spec): + super(InvalidPosixSpec, self).__init__("Invalid POSIX spec: {}".format(spec)) diff --git a/pendulum/tz/zoneinfo/posix_timezone.py b/pendulum/tz/zoneinfo/posix_timezone.py new file mode 100644 index 0000000..74a32eb --- /dev/null +++ b/pendulum/tz/zoneinfo/posix_timezone.py @@ -0,0 +1,270 @@ +""" +Parsing of a POSIX zone spec as described in the TZ part of section 8.3 in +http://pubs.opengroup.org/onlinepubs/009695399/basedefs/xbd_chap08.html. +""" +import re + +from typing import Optional + +from pendulum.constants import MONTHS_OFFSETS +from pendulum.constants import SECS_PER_DAY + +from .exceptions import InvalidPosixSpec + + +_spec = re.compile( + "^" + r"(?P<std_abbr><.*?>|[^-+,\d]{3,})" + r"(?P<std_offset>([+-])?(\d{1,2})(:\d{2}(:\d{2})?)?)" + r"(?P<dst_info>" + r" (?P<dst_abbr><.*?>|[^-+,\d]{3,})" + r" (?P<dst_offset>([+-])?(\d{1,2})(:\d{2}(:\d{2})?)?)?" + r")?" + r"(?:,(?P<rules>" + r" (?P<dst_start>" + r" (?:J\d+|\d+|M\d{1,2}.\d.[0-6])" + r" (?:/(?P<dst_start_offset>([+-])?(\d+)(:\d{2}(:\d{2})?)?))?" + " )" + " ," + r" (?P<dst_end>" + r" (?:J\d+|\d+|M\d{1,2}.\d.[0-6])" + r" (?:/(?P<dst_end_offset>([+-])?(\d+)(:\d{2}(:\d{2})?)?))?" + " )" + "))?" + "$", + re.VERBOSE, +) + + +def posix_spec(spec): # type: (str) -> PosixTimezone + try: + return _posix_spec(spec) + except ValueError: + raise InvalidPosixSpec(spec) + + +def _posix_spec(spec): # type: (str) -> PosixTimezone + m = _spec.match(spec) + if not m: + raise ValueError("Invalid posix spec") + + std_abbr = _parse_abbr(m.group("std_abbr")) + std_offset = _parse_offset(m.group("std_offset")) + + dst_abbr = None + dst_offset = None + if m.group("dst_info"): + dst_abbr = _parse_abbr(m.group("dst_abbr")) + if m.group("dst_offset"): + dst_offset = _parse_offset(m.group("dst_offset")) + else: + dst_offset = std_offset + 3600 + + dst_start = None + dst_end = None + if m.group("rules"): + dst_start = _parse_rule(m.group("dst_start")) + dst_end = _parse_rule(m.group("dst_end")) + + return PosixTimezone(std_abbr, std_offset, dst_abbr, dst_offset, dst_start, dst_end) + + +def _parse_abbr(text): # type: (str) -> str + return text.lstrip("<").rstrip(">") + + +def _parse_offset(text, sign=-1): # type: (str, int) -> int + if text.startswith(("+", "-")): + if text.startswith("-"): + sign *= -1 + + text = text[1:] + + minutes = 0 + seconds = 0 + + parts = text.split(":") + hours = int(parts[0]) + + if len(parts) > 1: + minutes = int(parts[1]) + + if len(parts) > 2: + seconds = int(parts[2]) + + return sign * ((((hours * 60) + minutes) * 60) + seconds) + + +def _parse_rule(rule): # type: (str) -> PosixTransition + klass = NPosixTransition + args = () + + if rule.startswith("M"): + rule = rule[1:] + parts = rule.split(".") + month = int(parts[0]) + week = int(parts[1]) + day = int(parts[2].split("/")[0]) + + args += (month, week, day) + klass = MPosixTransition + elif rule.startswith("J"): + rule = rule[1:] + args += (int(rule.split("/")[0]),) + klass = JPosixTransition + else: + args += (int(rule.split("/")[0]),) + + # Checking offset + parts = rule.split("/") + if len(parts) > 1: + offset = _parse_offset(parts[-1], sign=1) + else: + offset = 7200 + + args += (offset,) + + return klass(*args) + + +class PosixTransition(object): + def __init__(self, offset): # type: (int) -> None + self._offset = offset + + @property + def offset(self): # type: () -> int + return self._offset + + def trans_offset(self, is_leap, jan1_weekday): # type: (bool, int) -> int + raise NotImplementedError() + + +class JPosixTransition(PosixTransition): + def __init__(self, day, offset): # type: (int, int) -> None + self._day = day + + super(JPosixTransition, self).__init__(offset) + + @property + def day(self): # type: () -> int + """ + day of non-leap year [1:365] + """ + return self._day + + def trans_offset(self, is_leap, jan1_weekday): # type: (bool, int) -> int + days = self._day + if not is_leap or days < MONTHS_OFFSETS[1][3]: + days -= 1 + + return (days * SECS_PER_DAY) + self._offset + + +class NPosixTransition(PosixTransition): + def __init__(self, day, offset): # type: (int, int) -> None + self._day = day + + super(NPosixTransition, self).__init__(offset) + + @property + def day(self): # type: () -> int + """ + day of year [0:365] + """ + return self._day + + def trans_offset(self, is_leap, jan1_weekday): # type: (bool, int) -> int + days = self._day + + return (days * SECS_PER_DAY) + self._offset + + +class MPosixTransition(PosixTransition): + def __init__(self, month, week, weekday, offset): + # type: (int, int, int, int) -> None + self._month = month + self._week = week + self._weekday = weekday + + super(MPosixTransition, self).__init__(offset) + + @property + def month(self): # type: () -> int + """ + month of year [1:12] + """ + return self._month + + @property + def week(self): # type: () -> int + """ + week of month [1:5] (5==last) + """ + return self._week + + @property + def weekday(self): # type: () -> int + """ + 0==Sun, ..., 6=Sat + """ + return self._weekday + + def trans_offset(self, is_leap, jan1_weekday): # type: (bool, int) -> int + last_week = self._week == 5 + days = MONTHS_OFFSETS[is_leap][self._month + int(last_week)] + weekday = (jan1_weekday + days) % 7 + if last_week: + days -= (weekday + 7 - 1 - self._weekday) % 7 + 1 + else: + days += (self._weekday + 7 - weekday) % 7 + days += (self._week - 1) * 7 + + return (days * SECS_PER_DAY) + self._offset + + +class PosixTimezone: + """ + The entirety of a POSIX-string specified time-zone rule. + + The standard abbreviation and offset are always given. + """ + + def __init__( + self, + std_abbr, # type: str + std_offset, # type: int + dst_abbr, # type: Optional[str] + dst_offset, # type: Optional[int] + dst_start=None, # type: Optional[PosixTransition] + dst_end=None, # type: Optional[PosixTransition] + ): + self._std_abbr = std_abbr + self._std_offset = std_offset + self._dst_abbr = dst_abbr + self._dst_offset = dst_offset + self._dst_start = dst_start + self._dst_end = dst_end + + @property + def std_abbr(self): # type: () -> str + return self._std_abbr + + @property + def std_offset(self): # type: () -> int + return self._std_offset + + @property + def dst_abbr(self): # type: () -> Optional[str] + return self._dst_abbr + + @property + def dst_offset(self): # type: () -> Optional[int] + return self._dst_offset + + @property + def dst_start(self): # type: () -> Optional[PosixTransition] + return self._dst_start + + @property + def dst_end(self): # type: () -> Optional[PosixTransition] + return self._dst_end diff --git a/pendulum/tz/zoneinfo/reader.py b/pendulum/tz/zoneinfo/reader.py new file mode 100644 index 0000000..f4c1fa6 --- /dev/null +++ b/pendulum/tz/zoneinfo/reader.py @@ -0,0 +1,224 @@ +import os + +from collections import namedtuple +from struct import unpack +from typing import IO +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple + +import pytzdata + +from pytzdata.exceptions import TimezoneNotFound + +from pendulum.utils._compat import PY2 + +from .exceptions import InvalidTimezone +from .exceptions import InvalidZoneinfoFile +from .posix_timezone import PosixTimezone +from .posix_timezone import posix_spec +from .timezone import Timezone +from .transition import Transition +from .transition_type import TransitionType + + +_offset = namedtuple("offset", "utc_total_offset is_dst abbr_idx") + +header = namedtuple( + "header", + "version " "utclocals " "stdwalls " "leaps " "transitions " "types " "abbr_size", +) + + +class Reader: + """ + Reads compiled zoneinfo TZif (\0, 2 or 3) files. + """ + + def __init__(self, extend=True): # type: (bool) -> None + self._extend = extend + + def read_for(self, timezone): # type: (str) -> Timezone + """ + Read the zoneinfo structure for a given timezone name. + + :param timezone: The timezone. + """ + try: + file_path = pytzdata.tz_path(timezone) + except TimezoneNotFound: + raise InvalidTimezone(timezone) + + return self.read(file_path) + + def read(self, file_path): # type: (str) -> Timezone + """ + Read a zoneinfo structure from the given path. + + :param file_path: The path of a zoneinfo file. + """ + if not os.path.exists(file_path): + raise InvalidZoneinfoFile("The tzinfo file does not exist") + + with open(file_path, "rb") as fd: + return self._parse(fd) + + def _check_read(self, fd, nbytes): # type: (...) -> bytes + """ + Reads the given number of bytes from the given file + and checks that the correct number of bytes could be read. + """ + result = fd.read(nbytes) + + if (not result and nbytes > 0) or len(result) != nbytes: + raise InvalidZoneinfoFile( + "Expected {} bytes reading {}, " + "but got {}".format(nbytes, fd.name, len(result) if result else 0) + ) + + if PY2: + return bytearray(result) + + return result + + def _parse(self, fd): # type: (...) -> Timezone + """ + Parse a zoneinfo file. + """ + hdr = self._parse_header(fd) + + if hdr.version in (2, 3): + # We're skipping the entire v1 file since + # at least the same data will be found in TZFile 2. + fd.seek( + hdr.transitions * 5 + + hdr.types * 6 + + hdr.abbr_size + + hdr.leaps * 4 + + hdr.stdwalls + + hdr.utclocals, + 1, + ) + + # Parse the second header + hdr = self._parse_header(fd) + + if hdr.version != 2 and hdr.version != 3: + raise InvalidZoneinfoFile( + "Header versions mismatch for file {}".format(fd.name) + ) + + # Parse the v2 data + trans = self._parse_trans_64(fd, hdr.transitions) + type_idx = self._parse_type_idx(fd, hdr.transitions) + types = self._parse_types(fd, hdr.types) + abbrs = self._parse_abbrs(fd, hdr.abbr_size, types) + + fd.seek(hdr.leaps * 8 + hdr.stdwalls + hdr.utclocals, 1) + + trule = self._parse_posix_tz(fd) + else: + # TZFile v1 + trans = self._parse_trans_32(fd, hdr.transitions) + type_idx = self._parse_type_idx(fd, hdr.transitions) + types = self._parse_types(fd, hdr.types) + abbrs = self._parse_abbrs(fd, hdr.abbr_size, types) + trule = None + + types = [ + TransitionType(off, is_dst, abbrs[abbr]) for off, is_dst, abbr in types + ] + + transitions = [] + previous = None + for trans, idx in zip(trans, type_idx): + transition = Transition(trans, types[idx], previous) + transitions.append(transition) + + previous = transition + + if not transitions: + transitions.append(Transition(0, types[0], None)) + + return Timezone(transitions, posix_rule=trule, extended=self._extend) + + def _parse_header(self, fd): # type: (...) -> header + buff = self._check_read(fd, 44) + + if buff[:4] != b"TZif": + raise InvalidZoneinfoFile( + 'The file "{}" has an invalid header.'.format(fd.name) + ) + + version = {0x00: 1, 0x32: 2, 0x33: 3}.get(buff[4]) + + if version is None: + raise InvalidZoneinfoFile( + 'The file "{}" has an invalid version.'.format(fd.name) + ) + + hdr = header(version, *unpack(">6l", buff[20:44])) + + return hdr + + def _parse_trans_64(self, fd, n): # type: (IO[Any], int) -> List[int] + trans = [] + for _ in range(n): + buff = self._check_read(fd, 8) + trans.append(unpack(">q", buff)[0]) + + return trans + + def _parse_trans_32(self, fd, n): # type: (IO[Any], int) -> List[int] + trans = [] + for _ in range(n): + buff = self._check_read(fd, 4) + trans.append(unpack(">i", buff)[0]) + + return trans + + def _parse_type_idx(self, fd, n): # type: (IO[Any], int) -> List[int] + buff = self._check_read(fd, n) + + return list(unpack("{}B".format(n), buff)) + + def _parse_types( + self, fd, n + ): # type: (IO[Any], int) -> List[Tuple[Any, bool, int]] + types = [] + + for _ in range(n): + buff = self._check_read(fd, 6) + offset = unpack(">l", buff[:4])[0] + is_dst = buff[4] == 1 + types.append((offset, is_dst, buff[5])) + + return types + + def _parse_abbrs( + self, fd, n, types + ): # type: (IO[Any], int, List[Tuple[Any, bool, int]]) -> Dict[int, str] + abbrs = {} + buff = self._check_read(fd, n) + + for offset, is_dst, idx in types: + if idx not in abbrs: + abbr = buff[idx : buff.find(b"\0", idx)].decode("utf-8") + abbrs[idx] = abbr + + return abbrs + + def _parse_posix_tz(self, fd): # type: (...) -> Optional[PosixTimezone] + s = fd.read().decode("utf-8") + + if not s.startswith("\n") or not s.endswith("\n"): + raise InvalidZoneinfoFile('Invalid posix rule in file "{}"'.format(fd.name)) + + s = s.strip() + + if not s: + return + + return posix_spec(s) diff --git a/pendulum/tz/zoneinfo/timezone.py b/pendulum/tz/zoneinfo/timezone.py new file mode 100644 index 0000000..abdb0ec --- /dev/null +++ b/pendulum/tz/zoneinfo/timezone.py @@ -0,0 +1,128 @@ +from datetime import datetime +from typing import List +from typing import Optional + +from pendulum.constants import DAYS_PER_YEAR +from pendulum.constants import SECS_PER_YEAR +from pendulum.helpers import is_leap +from pendulum.helpers import local_time +from pendulum.helpers import timestamp +from pendulum.helpers import week_day + +from .posix_timezone import PosixTimezone +from .transition import Transition +from .transition_type import TransitionType + + +class Timezone: + def __init__( + self, + transitions, # type: List[Transition] + posix_rule=None, # type: Optional[PosixTimezone] + extended=True, # type: bool + ): + self._posix_rule = posix_rule + self._transitions = transitions + + if extended: + self._extends() + + @property + def transitions(self): # type: () -> List[Transition] + return self._transitions + + @property + def posix_rule(self): + return self._posix_rule + + def _extends(self): + if not self._posix_rule: + return + + posix = self._posix_rule + + if not posix.dst_abbr: + # std only + # The future specification should match the last/default transition + ttype = self._transitions[-1].ttype + if not self._check_ttype(ttype, posix.std_offset, False, posix.std_abbr): + raise ValueError("Posix spec does not match last transition") + + return + + if len(self._transitions) < 2: + raise ValueError("Too few transitions for POSIX spec") + + # Extend the transitions for an additional 400 years + # using the future specification + + # The future specification should match the last two transitions, + # and those transitions should have different is_dst flags. + tr0 = self._transitions[-1] + tr1 = self._transitions[-2] + tt0 = tr0.ttype + tt1 = tr1.ttype + if tt0.is_dst(): + dst = tt0 + std = tt1 + else: + dst = tt1 + std = tt0 + + self._check_ttype(dst, posix.dst_offset, True, posix.dst_abbr) + self._check_ttype(std, posix.std_offset, False, posix.std_abbr) + + # Add the transitions to tr1 and back to tr0 for each extra year. + last_year = local_time(tr0.local, 0, 0)[0] + leap_year = is_leap(last_year) + jan1 = datetime(last_year, 1, 1) + jan1_time = timestamp(jan1) + jan1_weekday = week_day(jan1.year, jan1.month, jan1.day) % 7 + + if local_time(tr1.local, 0, 0)[0] != last_year: + # Add a single extra transition to align to a calendar year. + if tt0.is_dst(): + pt1 = posix.dst_end + else: + pt1 = posix.dst_start + + tr1_offset = pt1.trans_offset(leap_year, jan1_weekday) + tr = Transition(jan1_time + tr1_offset - tt0.offset, tr1.ttype, tr0) + tr0 = tr + tr1 = tr0 + tt0 = tr0.ttype + tt1 = tr1.ttype + + if tt0.is_dst(): + pt1 = posix.dst_end + pt0 = posix.dst_start + else: + pt1 = posix.dst_start + pt0 = posix.dst_end + + tr = tr0 + for year in range(last_year + 1, last_year + 401): + jan1_time += SECS_PER_YEAR[leap_year] + jan1_weekday = (jan1_weekday + DAYS_PER_YEAR[leap_year]) % 7 + leap_year = not leap_year and is_leap(year) + + tr1_offset = pt1.trans_offset(leap_year, jan1_weekday) + tr = Transition(jan1_time + tr1_offset - tt0.offset, tt1, tr) + self._transitions.append(tr) + + tr0_offset = pt0.trans_offset(leap_year, jan1_weekday) + tr = Transition(jan1_time + tr0_offset - tt1.offset, tt0, tr) + self._transitions.append(tr) + + def _check_ttype( + self, + ttype, # type: TransitionType + offset, # type: int + is_dst, # type: bool + abbr, # type: str + ): # type: (...) -> bool + return ( + ttype.offset == offset + and ttype.is_dst() == is_dst + and ttype.abbreviation == abbr + ) diff --git a/pendulum/tz/zoneinfo/transition.py b/pendulum/tz/zoneinfo/transition.py new file mode 100644 index 0000000..dcbd5d3 --- /dev/null +++ b/pendulum/tz/zoneinfo/transition.py @@ -0,0 +1,77 @@ +from datetime import timedelta +from typing import Optional + +from .transition_type import TransitionType + + +class Transition: + def __init__( + self, + at, # type: int + ttype, # type: TransitionType + previous, # type: Optional[Transition] + ): + self._at = at + + if previous: + self._local = at + previous.ttype.offset + else: + self._local = at + ttype.offset + + self._ttype = ttype + self._previous = previous + + if self.previous: + self._fix = self._ttype.offset - self.previous.ttype.offset + else: + self._fix = 0 + + self._to = self._local + self._fix + self._to_utc = self._at + self._fix + self._utcoffset = timedelta(seconds=ttype.offset) + + @property + def at(self): # type: () -> int + return self._at + + @property + def local(self): # type: () -> int + return self._local + + @property + def to(self): # type: () -> int + return self._to + + @property + def to_utc(self): # type: () -> int + return self._to + + @property + def ttype(self): # type: () -> TransitionType + return self._ttype + + @property + def previous(self): # type: () -> Optional[Transition] + return self._previous + + @property + def fix(self): # type: () -> int + return self._fix + + def is_ambiguous(self, stamp): # type: (int) -> bool + return self._to <= stamp < self._local + + def is_missing(self, stamp): # type: (int) -> bool + return self._local <= stamp < self._to + + def utcoffset(self): # type: () -> timedelta + return self._utcoffset + + def __contains__(self, stamp): # type: (int) -> bool + if self.previous is None: + return stamp < self.local + + return self.previous.local <= stamp < self.local + + def __repr__(self): # type: () -> str + return "Transition({} -> {}, {})".format(self._local, self._to, self._ttype) diff --git a/pendulum/tz/zoneinfo/transition_type.py b/pendulum/tz/zoneinfo/transition_type.py new file mode 100644 index 0000000..c2c33c6 --- /dev/null +++ b/pendulum/tz/zoneinfo/transition_type.py @@ -0,0 +1,35 @@ +from datetime import timedelta + +from pendulum.utils._compat import PY2 +from pendulum.utils._compat import encode + + +class TransitionType: + def __init__(self, offset, is_dst, abbr): + self._offset = offset + self._is_dst = is_dst + self._abbr = abbr + + self._utcoffset = timedelta(seconds=offset) + + @property + def offset(self): # type: () -> int + return self._offset + + @property + def abbreviation(self): # type: () -> str + if PY2: + return encode(self._abbr) + + return self._abbr + + def is_dst(self): # type: () -> bool + return self._is_dst + + def utcoffset(self): # type: () -> timedelta + return self._utcoffset + + def __repr__(self): # type: () -> str + return "TransitionType({}, {}, {})".format( + self._offset, self._is_dst, self._abbr + ) |