#!/usr/bin/env bash
'''':;
pybinary=$(which python || which python3 || which python2)
filtered=()
for arg in "$@"
do
    case $arg in
        -p*) pybinary=${arg:2}
              shift 1          ;;
        *)  filtered+=("$arg") ;;
    esac
done
if [ "$pybinary" = "" ]
then
    echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM"
    exit 1
fi
exec "$pybinary"  "$0" "${filtered[@]}" # '''

# -*- coding: utf-8 -*-
# Description:
# Author: Pawel Krupa (paulfantom)
# Author: Ilya Mashchenko (l2isbad)
# SPDX-License-Identifier: GPL-3.0-or-later

import collections
import copy
import gc
import json
import os
import pprint
import re
import sys
import time
import threading
import types

try:
    from queue import Queue
except ImportError:
    from Queue import Queue

PY_VERSION = sys.version_info[:2]  # (major=3, minor=7, micro=3, releaselevel='final', serial=0)

if PY_VERSION > (3, 1):
    from importlib.machinery import SourceFileLoader
else:
    from imp import load_source as SourceFileLoader

ENV_NETDATA_USER_CONFIG_DIR = 'NETDATA_USER_CONFIG_DIR'
ENV_NETDATA_STOCK_CONFIG_DIR = 'NETDATA_STOCK_CONFIG_DIR'
ENV_NETDATA_PLUGINS_DIR = 'NETDATA_PLUGINS_DIR'
ENV_NETDATA_LIB_DIR = 'NETDATA_LIB_DIR'
ENV_NETDATA_UPDATE_EVERY = 'NETDATA_UPDATE_EVERY'
ENV_NETDATA_LOCK_DIR = 'NETDATA_LOCK_DIR'


def add_pythond_packages():
    pluginsd = os.getenv(ENV_NETDATA_PLUGINS_DIR, os.path.dirname(__file__))
    pythond = os.path.abspath(pluginsd + '/../python.d')
    packages = os.path.join(pythond, 'python_modules')
    sys.path.append(packages)


add_pythond_packages()

from bases.collection import safe_print
from bases.loggers import PythonDLogger
from bases.loaders import load_config
from third_party import filelock

try:
    from collections import OrderedDict
except ImportError:
    from third_party.ordereddict import OrderedDict


def dirs():
    var_lib = os.getenv(
        ENV_NETDATA_LIB_DIR,
        '@varlibdir_POST@',
    )
    plugin_user_config = os.getenv(
        ENV_NETDATA_USER_CONFIG_DIR,
        '@configdir_POST@',
    )
    plugin_stock_config = os.getenv(
        ENV_NETDATA_STOCK_CONFIG_DIR,
        '@libconfigdir_POST@',
    )
    pluginsd = os.getenv(
        ENV_NETDATA_PLUGINS_DIR,
        os.path.dirname(__file__),
    )
    locks = os.getenv(
        ENV_NETDATA_LOCK_DIR,
        os.path.join('@varlibdir_POST@', 'lock')
    )
    modules_user_config = os.path.join(plugin_user_config, 'python.d')
    modules_stock_config = os.path.join(plugin_stock_config, 'python.d')
    modules = os.path.abspath(pluginsd + '/../python.d')

    Dirs = collections.namedtuple(
        'Dirs',
        [
            'plugin_user_config',
            'plugin_stock_config',
            'modules_user_config',
            'modules_stock_config',
            'modules',
            'var_lib',
            'locks',
        ]
    )
    return Dirs(
        plugin_user_config,
        plugin_stock_config,
        modules_user_config,
        modules_stock_config,
        modules,
        var_lib,
        locks,
    )


DIRS = dirs()

IS_ATTY = sys.stdout.isatty()

MODULE_SUFFIX = '.chart.py'


def available_modules():
    obsolete = (
        'apache_cache',  # replaced by web_log
        'cpuidle',  # rewritten in C
        'cpufreq',  # rewritten in C
        'gunicorn_log',  # replaced by web_log
        'linux_power_supply',  # rewritten in C
        'nginx_log',  # replaced by web_log
        'mdstat',  # rewritten in C
        'sslcheck',  # rewritten in Go, memory leak bug https://github.com/netdata/netdata/issues/5624
        'unbound',  # rewritten in Go
    )

    files = sorted(os.listdir(DIRS.modules))
    modules = [m[:-len(MODULE_SUFFIX)] for m in files if m.endswith(MODULE_SUFFIX)]
    avail = [m for m in modules if m not in obsolete]
    return tuple(avail)


AVAILABLE_MODULES = available_modules()

JOB_BASE_CONF = {
    'update_every': int(os.getenv(ENV_NETDATA_UPDATE_EVERY, 1)),
    'priority': 60000,
    'autodetection_retry': 0,
    'chart_cleanup': 10,
    'penalty': True,
    'name': str(),
}

PLUGIN_BASE_CONF = {
    'enabled': True,
    'default_run': True,
    'gc_run': True,
    'gc_interval': 300,
}


def multi_path_find(name, *paths):
    for path in paths:
        abs_name = os.path.join(path, name)
        if os.path.isfile(abs_name):
            return abs_name
    return str()


def load_module(name):
    abs_path = os.path.join(DIRS.modules, '{0}{1}'.format(name, MODULE_SUFFIX))
    module = SourceFileLoader('pythond_' + name, abs_path)
    if isinstance(module, types.ModuleType):
        return module
    return module.load_module()


class ModuleConfig:
    def __init__(self, name, config=None):
        self.name = name
        self.config = config or OrderedDict()

    def load(self, abs_path):
        self.config.update(load_config(abs_path) or dict())

    def defaults(self):
        keys = (
            'update_every',
            'priority',
            'autodetection_retry',
            'chart_cleanup',
            'penalty',
        )
        return dict((k, self.config[k]) for k in keys if k in self.config)

    def create_job(self, job_name, job_config=None):
        job_config = job_config or dict()

        config = OrderedDict()
        config.update(job_config)
        config['job_name'] = job_name
        for k, v in self.defaults().items():
            config.setdefault(k, v)

        return config

    def job_names(self):
        return [v for v in self.config if isinstance(self.config.get(v), dict)]

    def single_job(self):
        return [self.create_job(self.name, self.config)]

    def multi_job(self):
        return [self.create_job(n, self.config[n]) for n in self.job_names()]

    def create_jobs(self):
        return self.multi_job() or self.single_job()


class JobsConfigsBuilder:
    def __init__(self, config_dirs):
        self.config_dirs = config_dirs
        self.log = PythonDLogger()
        self.job_defaults = None
        self.module_defaults = None
        self.min_update_every = None

    def load_module_config(self, module_name):
        name = '{0}.conf'.format(module_name)
        self.log.debug("[{0}] looking for '{1}' in {2}".format(module_name, name, self.config_dirs))
        config = ModuleConfig(module_name)

        abs_path = multi_path_find(name, *self.config_dirs)
        if not abs_path:
            self.log.warning("[{0}] '{1}' was not found".format(module_name, name))
            return config

        self.log.debug("[{0}] loading '{1}'".format(module_name, abs_path))
        try:
            config.load(abs_path)
        except Exception as error:
            self.log.error("[{0}] error on loading '{1}' : {2}".format(module_name, abs_path, repr(error)))
            return None

        self.log.debug("[{0}] '{1}' is loaded".format(module_name, abs_path))
        return config

    @staticmethod
    def apply_defaults(jobs, defaults):
        if defaults is None:
            return
        for k, v in defaults.items():
            for job in jobs:
                job.setdefault(k, v)

    def set_min_update_every(self, jobs, min_update_every):
        if min_update_every is None:
            return
        for job in jobs:
            if 'update_every' in job and job['update_every'] < self.min_update_every:
                job['update_every'] = self.min_update_every

    def build(self, module_name):
        config = self.load_module_config(module_name)
        if config is None:
            return None

        configs = config.create_jobs()
        self.log.info("[{0}] built {1} job(s) configs".format(module_name, len(configs)))

        self.apply_defaults(configs, self.module_defaults)
        self.apply_defaults(configs, self.job_defaults)
        self.set_min_update_every(configs, self.min_update_every)

        return configs


JOB_STATUS_ACTIVE = 'active'
JOB_STATUS_RECOVERING = 'recovering'
JOB_STATUS_DROPPED = 'dropped'
JOB_STATUS_INIT = 'initial'


class Job(threading.Thread):
    inf = -1

    def __init__(self, service, module_name, config):
        threading.Thread.__init__(self)
        self.daemon = True
        self.service = service
        self.module_name = module_name
        self.config = config
        self.real_name = config['job_name']
        self.actual_name = config['override_name'] or self.real_name
        self.autodetection_retry = config['autodetection_retry']
        self.checks = self.inf
        self.job = None
        self.status = JOB_STATUS_INIT

    def is_inited(self):
        return self.job is not None

    def init(self):
        self.job = self.service(configuration=copy.deepcopy(self.config))

    def full_name(self):
        return self.job.name

    def check(self):
        ok = self.job.check()
        self.checks -= self.checks != self.inf and not ok
        return ok

    def create(self):
        self.job.create()

    def need_to_recheck(self):
        return self.autodetection_retry != 0 and self.checks != 0

    def run(self):
        self.job.run()


class ModuleSrc:
    def __init__(self, name):
        self.name = name
        self.src = None

    def load(self):
        self.src = load_module(self.name)

    def get(self, key):
        return getattr(self.src, key, None)

    def service(self):
        return self.get('Service')

    def defaults(self):
        keys = (
            'update_every',
            'priority',
            'autodetection_retry',
            'chart_cleanup',
            'penalty',
        )
        return dict((k, self.get(k)) for k in keys if self.get(k) is not None)

    def is_disabled_by_default(self):
        return bool(self.get('disabled_by_default'))


class JobsStatuses:
    def __init__(self):
        self.items = OrderedDict()

    def dump(self):
        return json.dumps(self.items, indent=2)

    def get(self, module_name, job_name):
        if module_name not in self.items:
            return None
        return self.items[module_name].get(job_name)

    def has(self, module_name, job_name):
        return self.get(module_name, job_name) is not None

    def from_file(self, path):
        with open(path) as f:
            data = json.load(f)
            return self.from_json(data)

    @staticmethod
    def from_json(items):
        if not isinstance(items, dict):
            raise Exception('items obj has wrong type : {0}'.format(type(items)))
        if not items:
            return JobsStatuses()

        v = OrderedDict()
        for mod_name in sorted(items):
            if not items[mod_name]:
                continue
            v[mod_name] = OrderedDict()
            for job_name in sorted(items[mod_name]):
                v[mod_name][job_name] = items[mod_name][job_name]

        rv = JobsStatuses()
        rv.items = v
        return rv

    @staticmethod
    def from_jobs(jobs):
        v = OrderedDict()
        for job in jobs:
            status = job.status
            if status not in (JOB_STATUS_ACTIVE, JOB_STATUS_RECOVERING):
                continue
            if job.module_name not in v:
                v[job.module_name] = OrderedDict()
            v[job.module_name][job.real_name] = status

        rv = JobsStatuses()
        rv.items = v
        return rv


class StdoutSaver:
    @staticmethod
    def save(dump):
        print(dump)


class CachedFileSaver:
    def __init__(self, path):
        self.last_save_success = False
        self.last_saved_dump = str()
        self.path = path

    def save(self, dump):
        if self.last_save_success and self.last_saved_dump == dump:
            return
        try:
            with open(self.path, 'w') as out:
                out.write(dump)
        except Exception:
            self.last_save_success = False
            raise
        self.last_saved_dump = dump
        self.last_save_success = True


class PluginConfig(dict):
    def __init__(self, *args):
        dict.__init__(self, *args)

    def is_module_explicitly_enabled(self, module_name):
        return self._is_module_enabled(module_name, True)

    def is_module_enabled(self, module_name):
        return self._is_module_enabled(module_name, False)

    def _is_module_enabled(self, module_name, explicit):
        if module_name in self:
            return self[module_name]
        if explicit:
            return False
        return self['default_run']


class FileLockRegistry:
    def __init__(self, path):
        self.path = path
        self.locks = dict()

    def register(self, name):
        if name in self.locks:
            return
        file = os.path.join(self.path, '{0}.collector.lock'.format(name))
        lock = filelock.FileLock(file)
        lock.acquire(timeout=0)
        self.locks[name] = lock

    def unregister(self, name):
        if name not in self.locks:
            return
        lock = self.locks[name]
        lock.release()
        del self.locks[name]


class DummyRegistry:
    def register(self, name):
        pass

    def unregister(self, name):
        pass


class Plugin:
    config_name = 'python.d.conf'
    jobs_status_dump_name = 'pythond-jobs-statuses.json'

    def __init__(self, modules_to_run, min_update_every, registry):
        self.modules_to_run = modules_to_run
        self.min_update_every = min_update_every
        self.config = PluginConfig(PLUGIN_BASE_CONF)
        self.log = PythonDLogger()
        self.registry = registry
        self.started_jobs = collections.defaultdict(dict)
        self.jobs = list()
        self.saver = None
        self.runs = 0

    def load_config(self):
        paths = [
            DIRS.plugin_user_config,
            DIRS.plugin_stock_config,
        ]
        self.log.debug("looking for '{0}' in {1}".format(self.config_name, paths))
        abs_path = multi_path_find(self.config_name, *paths)
        if not abs_path:
            self.log.warning("'{0}' was not found, using defaults".format(self.config_name))
            return True

        self.log.debug("loading '{0}'".format(abs_path))
        try:
            config = load_config(abs_path)
        except Exception as error:
            self.log.error("error on loading '{0}' : {1}".format(abs_path, repr(error)))
            return False

        self.log.debug("'{0}' is loaded".format(abs_path))
        self.config.update(config)
        return True

    def load_job_statuses(self):
        self.log.debug("looking for '{0}' in {1}".format(self.jobs_status_dump_name, DIRS.var_lib))
        abs_path = multi_path_find(self.jobs_status_dump_name, DIRS.var_lib)
        if not abs_path:
            self.log.warning("'{0}' was not found".format(self.jobs_status_dump_name))
            return

        self.log.debug("loading '{0}'".format(abs_path))
        try:
            statuses = JobsStatuses().from_file(abs_path)
        except Exception as error:
            self.log.warning("error on loading '{0}' : {1}".format(abs_path, repr(error)))
            return None
        self.log.debug("'{0}' is loaded".format(abs_path))
        return statuses

    def create_jobs(self, job_statuses=None):
        paths = [
            DIRS.modules_user_config,
            DIRS.modules_stock_config,
        ]

        builder = JobsConfigsBuilder(paths)
        builder.job_defaults = JOB_BASE_CONF
        builder.min_update_every = self.min_update_every

        jobs = list()
        for mod_name in self.modules_to_run:
            if not self.config.is_module_enabled(mod_name):
                self.log.info("[{0}] is disabled in the configuration file, skipping it".format(mod_name))
                continue

            src = ModuleSrc(mod_name)
            try:
                src.load()
            except Exception as error:
                self.log.warning("[{0}] error on loading source : {1}, skipping it".format(mod_name, repr(error)))
                continue

            if not (src.service() and callable(src.service())):
                self.log.warning("[{0}] has no callable Service object, skipping it".format(mod_name))
                continue

            if src.is_disabled_by_default() and not self.config.is_module_explicitly_enabled(mod_name):
                self.log.info("[{0}] is disabled by default, skipping it".format(mod_name))
                continue

            builder.module_defaults = src.defaults()
            configs = builder.build(mod_name)
            if not configs:
                self.log.info("[{0}] has no job configs, skipping it".format(mod_name))
                continue

            for config in configs:
                config['job_name'] = re.sub(r'\s+', '_', config['job_name'])
                config['override_name'] = re.sub(r'\s+', '_', config.pop('name'))

                job = Job(src.service(), mod_name, config)

                was_previously_active = job_statuses and job_statuses.has(job.module_name, job.real_name)
                if was_previously_active and job.autodetection_retry == 0:
                    self.log.debug('{0}[{1}] was previously active, applying recovering settings'.format(
                        job.module_name, job.real_name))
                    job.checks = 11
                    job.autodetection_retry = 30

                jobs.append(job)

        return jobs

    def setup(self):
        if not self.load_config():
            return False

        if not self.config['enabled']:
            self.log.info('disabled in the configuration file')
            return False

        statuses = self.load_job_statuses()

        self.jobs = self.create_jobs(statuses)
        if not self.jobs:
            self.log.info('no jobs to run')
            return False

        if not IS_ATTY:
            abs_path = os.path.join(DIRS.var_lib, self.jobs_status_dump_name)
            self.saver = CachedFileSaver(abs_path)
        return True

    def start_jobs(self, *jobs):
        for job in jobs:
            if job.status not in (JOB_STATUS_INIT, JOB_STATUS_RECOVERING):
                continue

            if job.actual_name in self.started_jobs[job.module_name]:
                self.log.info('{0}[{1}] : already served by another job, skipping it'.format(
                    job.module_name, job.real_name))
                job.status = JOB_STATUS_DROPPED
                continue

            if not job.is_inited():
                try:
                    job.init()
                except Exception as error:
                    self.log.warning("{0}[{1}] : unhandled exception on init : {2}, skipping the job".format(
                        job.module_name, job.real_name, repr(error)))
                    job.status = JOB_STATUS_DROPPED
                    continue

            try:
                ok = job.check()
            except Exception as error:
                self.log.warning("{0}[{1}] : unhandled exception on check : {2}, skipping the job".format(
                    job.module_name, job.real_name, repr(error)))
                job.status = JOB_STATUS_DROPPED
                continue
            if not ok:
                self.log.info('{0}[{1}] : check failed'.format(job.module_name, job.real_name))
                job.status = JOB_STATUS_RECOVERING if job.need_to_recheck() else JOB_STATUS_DROPPED
                continue
            self.log.info('{0}[{1}] : check success'.format(job.module_name, job.real_name))

            try:
                self.registry.register(job.full_name())
            except filelock.Timeout as error:
                self.log.info('{0}[{1}] : already registered by another process, skipping the job ({2})'.format(
                    job.module_name, job.real_name, error))
                job.status = JOB_STATUS_DROPPED
                continue
            except Exception as error:
                self.log.warning('{0}[{1}] : registration failed: {2}, skipping the job'.format(
                    job.module_name, job.real_name, error))
                job.status = JOB_STATUS_DROPPED
                continue

            try:
                job.create()
            except Exception as error:
                self.log.warning("{0}[{1}] : unhandled exception on create : {2}, skipping the job".format(
                    job.module_name, job.real_name, repr(error)))
                job.status = JOB_STATUS_DROPPED
                try:
                    self.registry.unregister(job.full_name())
                except Exception as error:
                    self.log.warning('{0}[{1}] : deregistration failed: {2}'.format(
                        job.module_name, job.real_name, error))
                continue

            self.started_jobs[job.module_name] = job.actual_name
            job.status = JOB_STATUS_ACTIVE
            job.start()

    @staticmethod
    def keep_alive():
        if not IS_ATTY:
            safe_print('\n')

    def garbage_collection(self):
        if self.config['gc_run'] and self.runs % self.config['gc_interval'] == 0:
            v = gc.collect()
            self.log.debug('GC collection run result: {0}'.format(v))

    def restart_recovering_jobs(self):
        for job in self.jobs:
            if job.status != JOB_STATUS_RECOVERING:
                continue
            if self.runs % job.autodetection_retry != 0:
                continue
            self.start_jobs(job)

    def cleanup_jobs(self):
        self.jobs = [j for j in self.jobs if j.status != JOB_STATUS_DROPPED]

    def have_alive_jobs(self):
        return next(
            (True for job in self.jobs if job.status in (JOB_STATUS_RECOVERING, JOB_STATUS_ACTIVE)),
            False,
        )

    def save_job_statuses(self):
        if self.saver is None:
            return
        if self.runs % 10 != 0:
            return
        dump = JobsStatuses().from_jobs(self.jobs).dump()
        try:
            self.saver.save(dump)
        except Exception as error:
            self.log.error("error on saving jobs statuses dump : {0}".format(repr(error)))

    def serve_once(self):
        if not self.have_alive_jobs():
            self.log.info('no jobs to serve')
            return False

        time.sleep(1)
        self.runs += 1

        self.keep_alive()
        self.garbage_collection()
        self.cleanup_jobs()
        self.restart_recovering_jobs()
        self.save_job_statuses()
        return True

    def serve(self):
        while self.serve_once():
            pass

    def run(self):
        self.start_jobs(*self.jobs)
        self.serve()


def parse_command_line():
    opts = sys.argv[:][1:]

    debug = False
    trace = False
    nolock = False
    update_every = 1
    modules_to_run = list()

    def find_first_positive_int(values):
        return next((v for v in values if v.isdigit() and int(v) >= 1), None)

    u = find_first_positive_int(opts)
    if u is not None:
        update_every = int(u)
        opts.remove(u)
    if 'debug' in opts:
        debug = True
        opts.remove('debug')
    if 'trace' in opts:
        trace = True
        opts.remove('trace')
    if 'nolock' in opts:
        nolock = True
        opts.remove('nolock')
    if opts:
        modules_to_run = list(opts)

    cmd = collections.namedtuple(
        'CMD',
        [
            'update_every',
            'debug',
            'trace',
            'nolock',
            'modules_to_run',
        ])
    return cmd(
        update_every,
        debug,
        trace,
        nolock,
        modules_to_run,
    )


def guess_module(modules, *names):
    def guess(n):
        found = None
        for i, _ in enumerate(n):
            cur = [x for x in modules if x.startswith(name[:i + 1])]
            if not cur:
                return found
            found = cur
        return found

    guessed = list()
    for name in names:
        name = name.lower()
        m = guess(name)
        if m:
            guessed.extend(m)
    return sorted(set(guessed))


def disable():
    if not IS_ATTY:
        safe_print('DISABLE')
    exit(0)


def main():
    cmd = parse_command_line()
    log = PythonDLogger()

    if cmd.debug:
        log.logger.severity = 'DEBUG'
    if cmd.trace:
        log.log_traceback = True

    log.info('using python v{0}'.format(PY_VERSION[0]))

    unknown = set(cmd.modules_to_run) - set(AVAILABLE_MODULES)
    if unknown:
        log.error('unknown modules : {0}'.format(sorted(list(unknown))))
        guessed = guess_module(AVAILABLE_MODULES, *cmd.modules_to_run)
        if guessed:
            log.info('probably you meant : \n{0}'.format(pprint.pformat(guessed, width=1)))
        return

    if DIRS.locks and not cmd.nolock:
        registry = FileLockRegistry(DIRS.locks)
    else:
        registry = DummyRegistry()

    p = Plugin(
        cmd.modules_to_run or AVAILABLE_MODULES,
        cmd.update_every,
        registry,
    )

    try:
        if not p.setup():
            return
        p.run()
    except KeyboardInterrupt:
        pass
    log.info('exiting from main...')


if __name__ == "__main__":
    main()
    disable()