diff options
Diffstat (limited to 'src/pendulum/parsing')
-rw-r--r-- | src/pendulum/parsing/__init__.py | 235 | ||||
-rw-r--r-- | src/pendulum/parsing/exceptions/__init__.py | 5 | ||||
-rw-r--r-- | src/pendulum/parsing/iso8601.py | 453 |
3 files changed, 693 insertions, 0 deletions
diff --git a/src/pendulum/parsing/__init__.py b/src/pendulum/parsing/__init__.py new file mode 100644 index 0000000..761f52c --- /dev/null +++ b/src/pendulum/parsing/__init__.py @@ -0,0 +1,235 @@ +from __future__ import annotations + +import contextlib +import copy +import os +import re +import struct + +from datetime import date +from datetime import datetime +from datetime import time +from typing import Any +from typing import Optional +from typing import cast + +from dateutil import parser + +from pendulum.parsing.exceptions import ParserError + + +with_extensions = os.getenv("PENDULUM_EXTENSIONS", "1") == "1" + +try: + if not with_extensions or struct.calcsize("P") == 4: + raise ImportError() + + from pendulum._pendulum import Duration + from pendulum._pendulum import parse_iso8601 +except ImportError: + from pendulum.duration import Duration # type: ignore[assignment] + from pendulum.parsing.iso8601 import parse_iso8601 # type: ignore[assignment] + + +COMMON = re.compile( + # Date (optional) # noqa: ERA001 + "^" + "(?P<date>" + " (?P<classic>" # Classic date (YYYY-MM-DD) + r" (?P<year>\d{4})" # Year + " (?P<monthday>" + r" (?P<monthsep>[/:])?(?P<month>\d{2})" # Month (optional) + r" ((?P<daysep>[/:])?(?P<day>\d{2}))" # Day (optional) + " )?" + " )" + ")?" + # Time (optional) # noqa: ERA001 + "(?P<time>" r" (?P<timesep>\ )?" # Separator (space) + # HH:mm:ss (optional mm and ss) + r" (?P<hour>\d{1,2}):(?P<minute>\d{1,2})?(?::(?P<second>\d{1,2}))?" + # Subsecond part (optional) + " (?P<subsecondsection>" + " (?:[.|,])" # Subsecond separator (optional) + r" (?P<subsecond>\d{1,9})" # Subsecond + " )?" + ")?" + "$", + re.VERBOSE, +) + +DEFAULT_OPTIONS = { + "day_first": False, + "year_first": True, + "strict": True, + "exact": False, + "now": None, +} + + +def parse(text: str, **options: Any) -> datetime | date | time | _Interval | Duration: + """ + Parses a string with the given options. + + :param text: The string to parse. + """ + _options: dict[str, Any] = copy.copy(DEFAULT_OPTIONS) + _options.update(options) + + return _normalize(_parse(text, **_options), **_options) + + +def _normalize( + parsed: datetime | date | time | _Interval | Duration, **options: Any +) -> datetime | date | time | _Interval | Duration: + """ + Normalizes the parsed element. + + :param parsed: The parsed elements. + """ + if options.get("exact"): + return parsed + + if isinstance(parsed, time): + now = cast(Optional[datetime], options["now"]) or datetime.now() + + return datetime( + now.year, + now.month, + now.day, + parsed.hour, + parsed.minute, + parsed.second, + parsed.microsecond, + ) + elif isinstance(parsed, date) and not isinstance(parsed, datetime): + return datetime(parsed.year, parsed.month, parsed.day) + + return parsed + + +def _parse(text: str, **options: Any) -> datetime | date | time | _Interval | Duration: + # Trying to parse ISO8601 + with contextlib.suppress(ValueError): + return parse_iso8601(text) + + with contextlib.suppress(ValueError): + return _parse_iso8601_interval(text) + + with contextlib.suppress(ParserError): + return _parse_common(text, **options) + + # We couldn't parse the string + # so we fallback on the dateutil parser + # If not strict + if options.get("strict", True): + raise ParserError(f"Unable to parse string [{text}]") + + try: + dt = parser.parse( + text, dayfirst=options["day_first"], yearfirst=options["year_first"] + ) + except ValueError: + raise ParserError(f"Invalid date string: {text}") + + return dt + + +def _parse_common(text: str, **options: Any) -> datetime | date | time: + """ + Tries to parse the string as a common datetime format. + + :param text: The string to parse. + """ + m = COMMON.match(text) + has_date = False + year = 0 + month = 1 + day = 1 + + if not m: + raise ParserError("Invalid datetime string") + + if m.group("date"): + # A date has been specified + has_date = True + + year = int(m.group("year")) + + if not m.group("monthday"): + # No month and day + month = 1 + day = 1 + else: + if options["day_first"]: + month = int(m.group("day")) + day = int(m.group("month")) + else: + month = int(m.group("month")) + day = int(m.group("day")) + + if not m.group("time"): + return date(year, month, day) + + # Grabbing hh:mm:ss + hour = int(m.group("hour")) + + minute = int(m.group("minute")) + + second = int(m.group("second")) if m.group("second") else 0 + + # Grabbing subseconds, if any + microsecond = 0 + if m.group("subsecondsection"): + # Limiting to 6 chars + subsecond = m.group("subsecond")[:6] + + microsecond = int(f"{subsecond:0<6}") + + if has_date: + return datetime(year, month, day, hour, minute, second, microsecond) + + return time(hour, minute, second, microsecond) + + +class _Interval: + """ + Special class to handle ISO 8601 intervals + """ + + def __init__( + self, + start: datetime | None = None, + end: datetime | None = None, + duration: Duration | None = None, + ) -> None: + self.start = start + self.end = end + self.duration = duration + + +def _parse_iso8601_interval(text: str) -> _Interval: + if "/" not in text: + raise ParserError("Invalid interval") + + first, last = text.split("/") + start = end = duration = None + + if first[0] == "P": + # duration/end + duration = parse_iso8601(first) + end = parse_iso8601(last) + elif last[0] == "P": + # start/duration + start = parse_iso8601(first) + duration = parse_iso8601(last) + else: + # start/end + start = parse_iso8601(first) + end = parse_iso8601(last) + + return _Interval( + cast(datetime, start), cast(datetime, end), cast(Duration, duration) + ) + + +__all__ = ["parse", "parse_iso8601"] diff --git a/src/pendulum/parsing/exceptions/__init__.py b/src/pendulum/parsing/exceptions/__init__.py new file mode 100644 index 0000000..9f2d809 --- /dev/null +++ b/src/pendulum/parsing/exceptions/__init__.py @@ -0,0 +1,5 @@ +from __future__ import annotations + + +class ParserError(ValueError): + pass diff --git a/src/pendulum/parsing/iso8601.py b/src/pendulum/parsing/iso8601.py new file mode 100644 index 0000000..cc4dd7a --- /dev/null +++ b/src/pendulum/parsing/iso8601.py @@ -0,0 +1,453 @@ +from __future__ import annotations + +import datetime +import re + +from typing import cast + +from pendulum.constants import HOURS_PER_DAY +from pendulum.constants import MINUTES_PER_HOUR +from pendulum.constants import MONTHS_OFFSETS +from pendulum.constants import SECONDS_PER_MINUTE +from pendulum.duration import Duration +from pendulum.helpers import days_in_year +from pendulum.helpers import is_leap +from pendulum.helpers import is_long_year +from pendulum.helpers import week_day +from pendulum.parsing.exceptions import ParserError +from pendulum.tz.timezone import UTC +from pendulum.tz.timezone import FixedTimezone +from pendulum.tz.timezone import Timezone + + +ISO8601_DT = re.compile( + # Date (optional) # noqa: ERA001 + "^" + "(?P<date>" + " (?P<classic>" # Classic date (YYYY-MM-DD) or ordinal (YYYY-DDD) + r" (?P<year>\d{4})" # Year + " (?P<monthday>" + r" (?P<monthsep>-)?(?P<month>\d{2})" # Month (optional) + r" ((?P<daysep>-)?(?P<day>\d{1,2}))?" # Day (optional) + " )?" + " )" + " |" + " (?P<isocalendar>" # Calendar date (2016-W05 or 2016-W05-5) + r" (?P<isoyear>\d{4})" # Year + " (?P<weeksep>-)?" # Separator (optional) + " W" # W separator + r" (?P<isoweek>\d{2})" # Week number + " (?P<weekdaysep>-)?" # Separator (optional) + r" (?P<isoweekday>\d)?" # Weekday (optional) + " )" + ")?" + # Time (optional) # noqa: ERA001 + "(?P<time>" r" (?P<timesep>[T\ ])?" # Separator (T or space) + # HH:mm:ss (optional mm and ss) + r" (?P<hour>\d{1,2})(?P<minsep>:)?(?P<minute>\d{1,2})?(?P<secsep>:)?(?P<second>\d{1,2})?" # noqa: E501 + # Subsecond part (optional) + " (?P<subsecondsection>" + " (?:[.,])" # Subsecond separator (optional) + r" (?P<subsecond>\d{1,9})" # Subsecond + " )?" + # Timezone offset + " (?P<tz>" + r" (?:[-+])\d{2}:?(?:\d{2})?|Z" # Offset (+HH:mm or +HHmm or +HH or Z) + " )?" + ")?" + "$", + re.VERBOSE, +) + +ISO8601_DURATION = re.compile( + "^P" # Duration P indicator + # Years, months and days (optional) # noqa: ERA001 + "(?P<w>" + r" (?P<weeks>\d+(?:[.,]\d+)?W)" + ")?" + "(?P<ymd>" + r" (?P<years>\d+(?:[.,]\d+)?Y)?" + r" (?P<months>\d+(?:[.,]\d+)?M)?" + r" (?P<days>\d+(?:[.,]\d+)?D)?" + ")?" + "(?P<hms>" + " (?P<timesep>T)" # Separator (T) + r" (?P<hours>\d+(?:[.,]\d+)?H)?" + r" (?P<minutes>\d+(?:[.,]\d+)?M)?" + r" (?P<seconds>\d+(?:[.,]\d+)?S)?" + ")?" + "$", + re.VERBOSE, +) + + +def parse_iso8601( + text: str, +) -> datetime.datetime | datetime.date | datetime.time | Duration: + """ + ISO 8601 compliant parser. + + :param text: The string to parse + :type text: str + + :rtype: datetime.datetime or datetime.time or datetime.date + """ + parsed = _parse_iso8601_duration(text) + if parsed is not None: + return parsed + + m = ISO8601_DT.match(text) + if not m: + raise ParserError("Invalid ISO 8601 string") + + ambiguous_date = False + is_date = False + is_time = False + year = 0 + month = 1 + day = 1 + minute = 0 + second = 0 + microsecond = 0 + tzinfo: FixedTimezone | Timezone | None = None + + if m.group("date"): + # A date has been specified + is_date = True + + if m.group("isocalendar"): + # We have a ISO 8601 string defined + # by week number + if ( + m.group("weeksep") + and not m.group("weekdaysep") + and m.group("isoweekday") + ): + raise ParserError(f"Invalid date string: {text}") + + if not m.group("weeksep") and m.group("weekdaysep"): + raise ParserError(f"Invalid date string: {text}") + + try: + date = _get_iso_8601_week( + m.group("isoyear"), m.group("isoweek"), m.group("isoweekday") + ) + except ParserError: + raise + except ValueError: + raise ParserError(f"Invalid date string: {text}") + + year = date["year"] + month = date["month"] + day = date["day"] + else: + # We have a classic date representation + year = int(m.group("year")) + + if not m.group("monthday"): + # No month and day + month = 1 + day = 1 + else: + if m.group("month") and m.group("day"): + # Month and day + if not m.group("daysep") and len(m.group("day")) == 1: + # Ordinal day + ordinal = int(m.group("month") + m.group("day")) + leap = is_leap(year) + months_offsets = MONTHS_OFFSETS[leap] + + if ordinal > months_offsets[13]: + raise ParserError("Ordinal day is out of range") + + for i in range(1, 14): + if ordinal <= months_offsets[i]: + day = ordinal - months_offsets[i - 1] + month = i - 1 + + break + else: + month = int(m.group("month")) + day = int(m.group("day")) + else: + # Only month + if not m.group("monthsep"): + # The date looks like 201207 + # which is invalid for a date + # But it might be a time in the form hhmmss + ambiguous_date = True + + month = int(m.group("month")) + day = 1 + + if not m.group("time"): + # No time has been specified + if ambiguous_date: + # We can "safely" assume that the ambiguous date + # was actually a time in the form hhmmss + hhmmss = f"{year!s}{month!s:0>2}" + + return datetime.time(int(hhmmss[:2]), int(hhmmss[2:4]), int(hhmmss[4:])) + + return datetime.date(year, month, day) + + if ambiguous_date: + raise ParserError(f"Invalid date string: {text}") + + if is_date and not m.group("timesep"): + raise ParserError(f"Invalid date string: {text}") + + if not is_date: + is_time = True + + # Grabbing hh:mm:ss + hour = int(m.group("hour")) + minsep = m.group("minsep") + + if m.group("minute"): + minute = int(m.group("minute")) + elif minsep: + raise ParserError("Invalid ISO 8601 time part") + + secsep = m.group("secsep") + if secsep and not minsep and m.group("minute"): + # minute/second separator but no hour/minute separator + raise ParserError("Invalid ISO 8601 time part") + + if m.group("second"): + if not secsep and minsep: + # No minute/second separator but hour/minute separator + raise ParserError("Invalid ISO 8601 time part") + + second = int(m.group("second")) + elif secsep: + raise ParserError("Invalid ISO 8601 time part") + + # Grabbing subseconds, if any + if m.group("subsecondsection"): + # Limiting to 6 chars + subsecond = m.group("subsecond")[:6] + + microsecond = int(f"{subsecond:0<6}") + + # Grabbing timezone, if any + tz = m.group("tz") + if tz: + if tz == "Z": + tzinfo = UTC + else: + negative = bool(tz.startswith("-")) + tz = tz[1:] + if ":" not in tz: + if len(tz) == 2: + tz = f"{tz}00" + + off_hour = tz[0:2] + off_minute = tz[2:4] + else: + off_hour, off_minute = tz.split(":") + + offset = ((int(off_hour) * 60) + int(off_minute)) * 60 + + if negative: + offset = -1 * offset + + tzinfo = FixedTimezone(offset) + + if is_time: + return datetime.time(hour, minute, second, microsecond, tzinfo=tzinfo) + + return datetime.datetime( + year, month, day, hour, minute, second, microsecond, tzinfo=tzinfo + ) + + +def _parse_iso8601_duration(text: str, **options: str) -> Duration | None: + m = ISO8601_DURATION.match(text) + if not m: + return None + + years = 0 + months = 0 + weeks = 0 + days: int | float = 0 + hours: int | float = 0 + minutes: int | float = 0 + seconds: int | float = 0 + microseconds: int | float = 0 + fractional = False + + _days: str | float + _hour: str | int | None + _minutes: str | int | None + _seconds: str | int | None + if m.group("w"): + # Weeks + if m.group("ymd") or m.group("hms"): + # Specifying anything more than weeks is not supported + raise ParserError("Invalid duration string") + + _weeks = m.group("weeks") + if not _weeks: + raise ParserError("Invalid duration string") + + _weeks = _weeks.replace(",", ".").replace("W", "") + if "." in _weeks: + _weeks, portion = _weeks.split(".") + weeks = int(_weeks) + _days = int(portion) / 10 * 7 + days, hours = int(_days // 1), int(_days % 1 * HOURS_PER_DAY) + else: + weeks = int(_weeks) + + if m.group("ymd"): + # Years, months and/or days + _years = m.group("years") + _months = m.group("months") + _days = m.group("days") + + # Checking order + years_start = m.start("years") if _years else -3 + months_start = m.start("months") if _months else years_start + 1 + days_start = m.start("days") if _days else months_start + 1 + + # Check correct order + if not (years_start < months_start < days_start): + raise ParserError("Invalid duration") + + if _years: + _years = _years.replace(",", ".").replace("Y", "") + if "." in _years: + raise ParserError("Float years in duration are not supported") + else: + years = int(_years) + + if _months: + if fractional: + raise ParserError("Invalid duration") + + _months = _months.replace(",", ".").replace("M", "") + if "." in _months: + raise ParserError("Float months in duration are not supported") + else: + months = int(_months) + + if _days: + if fractional: + raise ParserError("Invalid duration") + + _days = _days.replace(",", ".").replace("D", "") + + if "." in _days: + fractional = True + + _days, _hours = _days.split(".") + days = int(_days) + hours = int(_hours) / 10 * HOURS_PER_DAY + else: + days = int(_days) + + if m.group("hms"): + # Hours, minutes and/or seconds + _hours = m.group("hours") or 0 + _minutes = m.group("minutes") or 0 + _seconds = m.group("seconds") or 0 + + # Checking order + hours_start = m.start("hours") if _hours else -3 + minutes_start = m.start("minutes") if _minutes else hours_start + 1 + seconds_start = m.start("seconds") if _seconds else minutes_start + 1 + + # Check correct order + if not (hours_start < minutes_start < seconds_start): + raise ParserError("Invalid duration") + + if _hours: + if fractional: + raise ParserError("Invalid duration") + + _hours = cast(str, _hours).replace(",", ".").replace("H", "") + + if "." in _hours: + fractional = True + + _hours, _mins = _hours.split(".") + hours += int(_hours) + minutes += int(_mins) / 10 * MINUTES_PER_HOUR + else: + hours += int(_hours) + + if _minutes: + if fractional: + raise ParserError("Invalid duration") + + _minutes = cast(str, _minutes).replace(",", ".").replace("M", "") + + if "." in _minutes: + fractional = True + + _minutes, _secs = _minutes.split(".") + minutes += int(_minutes) + seconds += int(_secs) / 10 * SECONDS_PER_MINUTE + else: + minutes += int(_minutes) + + if _seconds: + if fractional: + raise ParserError("Invalid duration") + + _seconds = cast(str, _seconds).replace(",", ".").replace("S", "") + + if "." in _seconds: + _seconds, _microseconds = _seconds.split(".") + seconds += int(_seconds) + microseconds += int(f"{_microseconds[:6]:0<6}") + else: + seconds += int(_seconds) + + return Duration( + years=years, + months=months, + weeks=weeks, + days=days, + hours=hours, + minutes=minutes, + seconds=seconds, + microseconds=microseconds, + ) + + +def _get_iso_8601_week( + year: int | str, week: int | str, weekday: int | str +) -> dict[str, int]: + weekday = 1 if not weekday else int(weekday) + + year = int(year) + week = int(week) + + if week > 53 or week > 52 and not is_long_year(year): + raise ParserError("Invalid week for week date") + + if weekday > 7: + raise ParserError("Invalid weekday for week date") + + # We can't rely on strptime directly here since + # it does not support ISO week date + ordinal = week * 7 + weekday - (week_day(year, 1, 4) + 3) + + if ordinal < 1: + # Previous year + ordinal += days_in_year(year - 1) + year -= 1 + + if ordinal > days_in_year(year): + # Next year + ordinal -= days_in_year(year) + year += 1 + + fmt = "%Y-%j" + string = f"{year}-{ordinal}" + + dt = datetime.datetime.strptime(string, fmt) + + return {"year": dt.year, "month": dt.month, "day": dt.day} |