summaryrefslogtreecommitdiffstats
path: root/src/pendulum/parsing/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pendulum/parsing/__init__.py')
-rw-r--r--src/pendulum/parsing/__init__.py235
1 files changed, 235 insertions, 0 deletions
diff --git a/src/pendulum/parsing/__init__.py b/src/pendulum/parsing/__init__.py
new file mode 100644
index 0000000..761f52c
--- /dev/null
+++ b/src/pendulum/parsing/__init__.py
@@ -0,0 +1,235 @@
+from __future__ import annotations
+
+import contextlib
+import copy
+import os
+import re
+import struct
+
+from datetime import date
+from datetime import datetime
+from datetime import time
+from typing import Any
+from typing import Optional
+from typing import cast
+
+from dateutil import parser
+
+from pendulum.parsing.exceptions import ParserError
+
+
+with_extensions = os.getenv("PENDULUM_EXTENSIONS", "1") == "1"
+
+try:
+ if not with_extensions or struct.calcsize("P") == 4:
+ raise ImportError()
+
+ from pendulum._pendulum import Duration
+ from pendulum._pendulum import parse_iso8601
+except ImportError:
+ from pendulum.duration import Duration # type: ignore[assignment]
+ from pendulum.parsing.iso8601 import parse_iso8601 # type: ignore[assignment]
+
+
+COMMON = re.compile(
+ # Date (optional) # noqa: ERA001
+ "^"
+ "(?P<date>"
+ " (?P<classic>" # Classic date (YYYY-MM-DD)
+ r" (?P<year>\d{4})" # Year
+ " (?P<monthday>"
+ r" (?P<monthsep>[/:])?(?P<month>\d{2})" # Month (optional)
+ r" ((?P<daysep>[/:])?(?P<day>\d{2}))" # Day (optional)
+ " )?"
+ " )"
+ ")?"
+ # Time (optional) # noqa: ERA001
+ "(?P<time>" r" (?P<timesep>\ )?" # Separator (space)
+ # HH:mm:ss (optional mm and ss)
+ r" (?P<hour>\d{1,2}):(?P<minute>\d{1,2})?(?::(?P<second>\d{1,2}))?"
+ # Subsecond part (optional)
+ " (?P<subsecondsection>"
+ " (?:[.|,])" # Subsecond separator (optional)
+ r" (?P<subsecond>\d{1,9})" # Subsecond
+ " )?"
+ ")?"
+ "$",
+ re.VERBOSE,
+)
+
+DEFAULT_OPTIONS = {
+ "day_first": False,
+ "year_first": True,
+ "strict": True,
+ "exact": False,
+ "now": None,
+}
+
+
+def parse(text: str, **options: Any) -> datetime | date | time | _Interval | Duration:
+ """
+ Parses a string with the given options.
+
+ :param text: The string to parse.
+ """
+ _options: dict[str, Any] = copy.copy(DEFAULT_OPTIONS)
+ _options.update(options)
+
+ return _normalize(_parse(text, **_options), **_options)
+
+
+def _normalize(
+ parsed: datetime | date | time | _Interval | Duration, **options: Any
+) -> datetime | date | time | _Interval | Duration:
+ """
+ Normalizes the parsed element.
+
+ :param parsed: The parsed elements.
+ """
+ if options.get("exact"):
+ return parsed
+
+ if isinstance(parsed, time):
+ now = cast(Optional[datetime], options["now"]) or datetime.now()
+
+ return datetime(
+ now.year,
+ now.month,
+ now.day,
+ parsed.hour,
+ parsed.minute,
+ parsed.second,
+ parsed.microsecond,
+ )
+ elif isinstance(parsed, date) and not isinstance(parsed, datetime):
+ return datetime(parsed.year, parsed.month, parsed.day)
+
+ return parsed
+
+
+def _parse(text: str, **options: Any) -> datetime | date | time | _Interval | Duration:
+ # Trying to parse ISO8601
+ with contextlib.suppress(ValueError):
+ return parse_iso8601(text)
+
+ with contextlib.suppress(ValueError):
+ return _parse_iso8601_interval(text)
+
+ with contextlib.suppress(ParserError):
+ return _parse_common(text, **options)
+
+ # We couldn't parse the string
+ # so we fallback on the dateutil parser
+ # If not strict
+ if options.get("strict", True):
+ raise ParserError(f"Unable to parse string [{text}]")
+
+ try:
+ dt = parser.parse(
+ text, dayfirst=options["day_first"], yearfirst=options["year_first"]
+ )
+ except ValueError:
+ raise ParserError(f"Invalid date string: {text}")
+
+ return dt
+
+
+def _parse_common(text: str, **options: Any) -> datetime | date | time:
+ """
+ Tries to parse the string as a common datetime format.
+
+ :param text: The string to parse.
+ """
+ m = COMMON.match(text)
+ has_date = False
+ year = 0
+ month = 1
+ day = 1
+
+ if not m:
+ raise ParserError("Invalid datetime string")
+
+ if m.group("date"):
+ # A date has been specified
+ has_date = True
+
+ year = int(m.group("year"))
+
+ if not m.group("monthday"):
+ # No month and day
+ month = 1
+ day = 1
+ else:
+ if options["day_first"]:
+ month = int(m.group("day"))
+ day = int(m.group("month"))
+ else:
+ month = int(m.group("month"))
+ day = int(m.group("day"))
+
+ if not m.group("time"):
+ return date(year, month, day)
+
+ # Grabbing hh:mm:ss
+ hour = int(m.group("hour"))
+
+ minute = int(m.group("minute"))
+
+ second = int(m.group("second")) if m.group("second") else 0
+
+ # Grabbing subseconds, if any
+ microsecond = 0
+ if m.group("subsecondsection"):
+ # Limiting to 6 chars
+ subsecond = m.group("subsecond")[:6]
+
+ microsecond = int(f"{subsecond:0<6}")
+
+ if has_date:
+ return datetime(year, month, day, hour, minute, second, microsecond)
+
+ return time(hour, minute, second, microsecond)
+
+
+class _Interval:
+ """
+ Special class to handle ISO 8601 intervals
+ """
+
+ def __init__(
+ self,
+ start: datetime | None = None,
+ end: datetime | None = None,
+ duration: Duration | None = None,
+ ) -> None:
+ self.start = start
+ self.end = end
+ self.duration = duration
+
+
+def _parse_iso8601_interval(text: str) -> _Interval:
+ if "/" not in text:
+ raise ParserError("Invalid interval")
+
+ first, last = text.split("/")
+ start = end = duration = None
+
+ if first[0] == "P":
+ # duration/end
+ duration = parse_iso8601(first)
+ end = parse_iso8601(last)
+ elif last[0] == "P":
+ # start/duration
+ start = parse_iso8601(first)
+ duration = parse_iso8601(last)
+ else:
+ # start/end
+ start = parse_iso8601(first)
+ end = parse_iso8601(last)
+
+ return _Interval(
+ cast(datetime, start), cast(datetime, end), cast(Duration, duration)
+ )
+
+
+__all__ = ["parse", "parse_iso8601"]