# Copyright (C) Internet Systems Consortium, Inc. ("ISC") # # SPDX-License-Identifier: MPL-2.0 # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, you can obtain one at https://mozilla.org/MPL/2.0/. # # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. from datetime import datetime, timedelta from time import sleep import os # ISO datetime format without msec fmt = "%Y-%m-%dT%H:%M:%SZ" # The constants were taken from BIND 9 source code (lib/dns/zone.c) max_refresh = timedelta(seconds=2419200) # 4 weeks max_expires = timedelta(seconds=14515200) # 24 weeks dayzero = datetime.utcfromtimestamp(0).replace(microsecond=0) # Wait for the secondary zone files to appear to extract their mtime max_secondary_zone_waittime_sec = 5 # Generic helper functions def check_expires(expires, min_time, max_time): assert expires >= min_time assert expires <= max_time def check_refresh(refresh, min_time, max_time): assert refresh >= min_time assert refresh <= max_time def check_loaded(loaded, expected, now): # Sanity check the zone timers values assert (loaded - expected).total_seconds() < max_secondary_zone_waittime_sec assert loaded <= now def check_zone_timers(loaded, expires, refresh, loaded_exp): now = datetime.utcnow().replace(microsecond=0) # Sanity checks the zone timers values if expires is not None: check_expires(expires, now, now + max_expires) if refresh is not None: check_refresh(refresh, now, now + max_refresh) check_loaded(loaded, loaded_exp, now) # # The output is gibberish, but at least make sure it does not crash. # def check_manykeys(name, zone=None): # pylint: disable=unused-argument assert name == "manykeys" def zone_mtime(zonedir, name): try: si = os.stat(os.path.join(zonedir, "{}.db".format(name))) except FileNotFoundError: return dayzero mtime = datetime.utcfromtimestamp(si.st_mtime).replace(microsecond=0) return mtime def test_zone_timers_primary(fetch_zones, load_timers, **kwargs): statsip = kwargs["statsip"] statsport = kwargs["statsport"] zonedir = kwargs["zonedir"] zones = fetch_zones(statsip, statsport) for zone in zones: (name, loaded, expires, refresh) = load_timers(zone, True) mtime = zone_mtime(zonedir, name) check_zone_timers(loaded, expires, refresh, mtime) def test_zone_timers_secondary(fetch_zones, load_timers, **kwargs): statsip = kwargs["statsip"] statsport = kwargs["statsport"] zonedir = kwargs["zonedir"] # If any one of the zone files isn't ready, then retry until timeout. tries = max_secondary_zone_waittime_sec while tries >= 0: zones = fetch_zones(statsip, statsport) again = False for zone in zones: (name, loaded, expires, refresh) = load_timers(zone, False) mtime = zone_mtime(zonedir, name) if (mtime != dayzero) or (tries == 0): # mtime was either retrieved successfully or no tries were # left, run the check anyway. check_zone_timers(loaded, expires, refresh, mtime) else: tries = tries - 1 again = True break if again: sleep(1) else: break def test_zone_with_many_keys(fetch_zones, load_zone, **kwargs): statsip = kwargs["statsip"] statsport = kwargs["statsport"] zones = fetch_zones(statsip, statsport) for zone in zones: name = load_zone(zone) if name == "manykeys": check_manykeys(name)