summaryrefslogtreecommitdiffstats
path: root/plugins.d
diff options
context:
space:
mode:
Diffstat (limited to 'plugins.d')
-rw-r--r--plugins.d/Makefile.am1
-rw-r--r--plugins.d/Makefile.in1
-rwxr-xr-xplugins.d/alarm-notify.sh200
-rwxr-xr-xplugins.d/cgroup-name.sh5
-rwxr-xr-xplugins.d/cgroup-network-helper.sh256
-rwxr-xr-xplugins.d/python.d.plugin869
6 files changed, 771 insertions, 561 deletions
diff --git a/plugins.d/Makefile.am b/plugins.d/Makefile.am
index 7d3bc44f7..41e6d5366 100644
--- a/plugins.d/Makefile.am
+++ b/plugins.d/Makefile.am
@@ -12,6 +12,7 @@ dist_plugins_SCRIPTS = \
alarm-notify.sh \
alarm-test.sh \
cgroup-name.sh \
+ cgroup-network-helper.sh \
charts.d.dryrun-helper.sh \
charts.d.plugin \
fping.plugin \
diff --git a/plugins.d/Makefile.in b/plugins.d/Makefile.in
index 256605f5d..059d68f6a 100644
--- a/plugins.d/Makefile.in
+++ b/plugins.d/Makefile.in
@@ -306,6 +306,7 @@ dist_plugins_SCRIPTS = \
alarm-notify.sh \
alarm-test.sh \
cgroup-name.sh \
+ cgroup-network-helper.sh \
charts.d.dryrun-helper.sh \
charts.d.plugin \
fping.plugin \
diff --git a/plugins.d/alarm-notify.sh b/plugins.d/alarm-notify.sh
index 9b7f6c8dd..0af98095d 100755
--- a/plugins.d/alarm-notify.sh
+++ b/plugins.d/alarm-notify.sh
@@ -219,11 +219,13 @@ sendmail=
# enable / disable features
SEND_SLACK="YES"
+SEND_FLOCK="YES"
SEND_DISCORD="YES"
SEND_PUSHOVER="YES"
SEND_TWILIO="YES"
SEND_HIPCHAT="YES"
SEND_MESSAGEBIRD="YES"
+SEND_KAVENEGAR="YES"
SEND_TELEGRAM="YES"
SEND_EMAIL="YES"
SEND_PUSHBULLET="YES"
@@ -236,6 +238,11 @@ SLACK_WEBHOOK_URL=
DEFAULT_RECIPIENT_SLACK=
declare -A role_recipients_slack=()
+# flock configs
+FLOCK_WEBHOOK_URL=
+DEFAULT_RECIPIENT_FLOCK=
+declare -A role_recipients_flock=()
+
# discord configs
DISCORD_WEBHOOK_URL=
DEFAULT_RECIPIENT_DISCORD=
@@ -248,6 +255,7 @@ declare -A role_recipients_pushover=()
# pushbullet configs
PUSHBULLET_ACCESS_TOKEN=
+PUSHBULLET_SOURCE_DEVICE=
DEFAULT_RECIPIENT_PUSHBULLET=
declare -A role_recipients_pushbullet=()
@@ -270,6 +278,12 @@ MESSAGEBIRD_NUMBER=
DEFAULT_RECIPIENT_MESSAGEBIRD=
declare -A role_recipients_messagebird=()
+# kavenegar configs
+KAVENEGAR_API_KEY=""
+KAVENEGAR_SENDER=""
+DEFAULT_RECIPIENT_KAVENEGAR=()
+declare -A role_recipients_kavenegar=""
+
# telegram configs
TELEGRAM_BOT_TOKEN=
DEFAULT_RECIPIENT_TELEGRAM=
@@ -372,6 +386,7 @@ filter_recipient_by_criticality() {
# find the recipients' addresses per method
declare -A arr_slack=()
+declare -A arr_flock=()
declare -A arr_discord=()
declare -A arr_pushover=()
declare -A arr_pushbullet=()
@@ -382,6 +397,7 @@ declare -A arr_pd=()
declare -A arr_email=()
declare -A arr_custom=()
declare -A arr_messagebird=()
+declare -A arr_kavenegar=()
# netdata may call us with multiple roles, and roles may have multiple but
# overlapping recipients - so, here we find the unique recipients.
@@ -439,6 +455,14 @@ do
[ "${r}" != "disabled" ] && filter_recipient_by_criticality messagebird "${r}" && arr_messagebird[${r/|*/}]="1"
done
+ # kavenegar
+ a="${role_recipients_kavenegar[${x}]}"
+ [ -z "${a}" ] && a="${DEFAULT_RECIPIENT_KAVENEGAR}"
+ for r in ${a//,/ }
+ do
+ [ "${r}" != "disabled" ] && filter_recipient_by_criticality kavenegar "${r}" && arr_kavenegar[${r/|*/}]="1"
+ done
+
# telegram
a="${role_recipients_telegram[${x}]}"
[ -z "${a}" ] && a="${DEFAULT_RECIPIENT_TELEGRAM}"
@@ -455,6 +479,14 @@ do
[ "${r}" != "disabled" ] && filter_recipient_by_criticality slack "${r}" && arr_slack[${r/|*/}]="1"
done
+ # flock
+ a="${role_recipients_flock[${x}]}"
+ [ -z "${a}" ] && a="${DEFAULT_RECIPIENT_FLOCK}"
+ for r in ${a//,/ }
+ do
+ [ "${r}" != "disabled" ] && filter_recipient_by_criticality flock "${r}" && arr_flock[${r/|*/}]="1"
+ done
+
# discord
a="${role_recipients_discord[${x}]}"
[ -z "${a}" ] && a="${DEFAULT_RECIPIENT_DISCORD}"
@@ -485,6 +517,10 @@ done
to_slack="${!arr_slack[*]}"
[ -z "${to_slack}" ] && SEND_SLACK="NO"
+# build the list of flock recipients (channels)
+to_flock="${!arr_flock[*]}"
+[ -z "${to_flock}" ] && SEND_FLOCK="NO"
+
# build the list of discord recipients (channels)
to_discord="${!arr_discord[*]}"
[ -z "${to_discord}" ] && SEND_DISCORD="NO"
@@ -509,6 +545,10 @@ to_hipchat="${!arr_hipchat[*]}"
to_messagebird="${!arr_messagebird[*]}"
[ -z "${to_messagebird}" ] && SEND_MESSAGEBIRD="NO"
+# build the list of kavenegar recipients (phone numbers)
+to_kavenegar="${!arr_kavenegar[*]}"
+[ -z "${to_kavenegar}" ] && SEND_KAVENEGAR="NO"
+
# check array of telegram recipients (chat ids)
to_telegram="${!arr_telegram[*]}"
[ -z "${to_telegram}" ] && SEND_TELEGRAM="NO"
@@ -537,6 +577,9 @@ done
# check slack
[ -z "${SLACK_WEBHOOK_URL}" ] && SEND_SLACK="NO"
+# check flock
+[ -z "${FLOCK_WEBHOOK_URL}" ] && SEND_FLOCK="NO"
+
# check discord
[ -z "${DISCORD_WEBHOOK_URL}" ] && SEND_DISCORD="NO"
@@ -555,6 +598,9 @@ done
# check messagebird
[ -z "${MESSAGEBIRD_ACCESS_KEY}" -o -z "${MESSAGEBIRD_NUMBER}" ] && SEND_MESSAGEBIRD="NO"
+# check kavenegar
+[ -z "${KAVENEGAR_API_KEY}" -o -z "${KAVENEGAR_SENDER}" ] && SEND_KAVENEGAR="NO"
+
# check telegram
[ -z "${TELEGRAM_BOT_TOKEN}" ] && SEND_TELEGRAM="NO"
@@ -578,13 +624,16 @@ fi
if [ \( \
"${SEND_PUSHOVER}" = "YES" \
-o "${SEND_SLACK}" = "YES" \
+ -o "${SEND_FLOCK}" = "YES" \
-o "${SEND_DISCORD}" = "YES" \
-o "${SEND_HIPCHAT}" = "YES" \
-o "${SEND_TWILIO}" = "YES" \
-o "${SEND_MESSAGEBIRD}" = "YES" \
+ -o "${SEND_KAVENEGAR}" = "YES" \
-o "${SEND_TELEGRAM}" = "YES" \
-o "${SEND_PUSHBULLET}" = "YES" \
-o "${SEND_KAFKA}" = "YES" \
+ -o "${SEND_CUSTOM}" = "YES" \
\) -a -z "${curl}" ]
then
curl="$(which curl 2>/dev/null || command -v curl 2>/dev/null)"
@@ -595,11 +644,14 @@ if [ \( \
SEND_PUSHBULLET="NO"
SEND_TELEGRAM="NO"
SEND_SLACK="NO"
+ SEND_FLOCK="NO"
SEND_DISCORD="NO"
SEND_TWILIO="NO"
SEND_HIPCHAT="NO"
SEND_MESSAGEBIRD="NO"
+ SEND_KAVENEGAR="NO"
SEND_KAFKA="NO"
+ SEND_CUSTOM="NO"
fi
fi
@@ -619,10 +671,12 @@ if [ "${SEND_EMAIL}" != "YES" \
-a "${SEND_PUSHOVER}" != "YES" \
-a "${SEND_TELEGRAM}" != "YES" \
-a "${SEND_SLACK}" != "YES" \
+ -a "${SEND_FLOCK}" != "YES" \
-a "${SEND_DISCORD}" != "YES" \
-a "${SEND_TWILIO}" != "YES" \
-a "${SEND_HIPCHAT}" != "YES" \
-a "${SEND_MESSAGEBIRD}" != "YES" \
+ -a "${SEND_KAVENEGAR}" != "YES" \
-a "${SEND_PUSHBULLET}" != "YES" \
-a "${SEND_KAFKA}" != "YES" \
-a "${SEND_PD}" != "YES" \
@@ -783,7 +837,7 @@ send_pushover() {
priority=-2
case "${status}" in
CLEAR) priority=-1;; # low priority: no sound or vibration
- WARNING) priotity=0;; # normal priority: respect quiet hours
+ WARNING) priority=0;; # normal priority: respect quiet hours
CRITICAL) priority=1;; # high priority: bypass quiet hours
*) priority=-2;; # lowest priority: no notification at all
esac
@@ -802,7 +856,7 @@ send_pushover() {
--form-string "priority=${priority}" \
https://api.pushover.net/1/messages.json)
- if [ "${httpcode}" == "200" ]
+ if [ "${httpcode}" = "200" ]
then
info "sent pushover notification for: ${host} ${chart}.${name} is ${status} to '${user}'"
sent=$((sent + 1))
@@ -821,7 +875,7 @@ send_pushover() {
# pushbullet sender
send_pushbullet() {
- local userapikey="${1}" recipients="${2}" title="${3}" message="${4}" httpcode sent=0 user
+ local userapikey="${1}" source_device="${2}" recipients="${3}" url="${4}" title="${5}" message="${6}" httpcode sent=0 user
if [ "${SEND_PUSHBULLET}" = "YES" -a ! -z "${userapikey}" -a ! -z "${recipients}" -a ! -z "${message}" -a ! -z "${title}" ]
then
#https://docs.pushbullet.com/#create-push
@@ -832,13 +886,15 @@ send_pushbullet() {
--header 'Content-Type: application/json' \
--data-binary @<(cat <<EOF
{"title": "${title}",
- "type": "note",
+ "type": "link",
"email": "${user}",
- "body": "$( echo -n ${message})"}
+ "body": "$( echo -n ${message})",
+ "url": "${url}",
+ "source_device_iden": "${source_device}"}
EOF
) "https://api.pushbullet.com/v2/pushes" -X POST)
- if [ "${httpcode}" == "200" ]
+ if [ "${httpcode}" = "200" ]
then
info "sent pushbullet notification for: ${host} ${chart}.${name} is ${status} to '${user}'"
sent=$((sent + 1))
@@ -864,7 +920,7 @@ send_kafka() {
--data "{host_ip:\"${KAFKA_SENDER_IP}\",when:${when},name:\"${name}\",chart:\"${chart}\",family:\"${family}\",status:\"${status}\",old_status:\"${old_status}\",value:${value},old_value:${old_value},duration:${duration},non_clear_duration:${non_clear_duration},units:\"${units}\",info:\"${info}\"}" \
"${KAFKA_URL}")
- if [ "${httpcode}" == "204" ]
+ if [ "${httpcode}" = "204" ]
then
info "sent kafka data for: ${host} ${chart}.${name} is ${status} and ip '${KAFKA_SENDER_IP}'"
sent=$((sent + 1))
@@ -951,7 +1007,7 @@ send_twilio() {
-u "${accountsid}:${accounttoken}" \
"https://api.twilio.com/2010-04-01/Accounts/${accountsid}/Messages.json")
- if [ "${httpcode}" == "201" ]
+ if [ "${httpcode}" = "201" ]
then
info "sent Twilio SMS for: ${host} ${chart}.${name} is ${status} to '${user}'"
sent=$((sent + 1))
@@ -1004,7 +1060,7 @@ send_hipchat() {
-d "{\"color\": \"${color}\", \"from\": \"${host}\", \"message_format\": \"${msg_format}\", \"message\": \"${message}\", \"notify\": \"${notify}\"}" \
"https://${HIPCHAT_SERVER}/v2/room/${room}/notification")
- if [ "${httpcode}" == "204" ]
+ if [ "${httpcode}" = "204" ]
then
info "sent HipChat notification for: ${host} ${chart}.${name} is ${status} to '${room}'"
sent=$((sent + 1))
@@ -1038,7 +1094,7 @@ send_messagebird() {
-H "Authorization: AccessKey ${accesskey}" \
"https://rest.messagebird.com/messages")
- if [ "${httpcode}" == "201" ]
+ if [ "${httpcode}" = "201" ]
then
info "sent Messagebird SMS for: ${host} ${chart}.${name} is ${status} to '${user}'"
sent=$((sent + 1))
@@ -1054,6 +1110,36 @@ send_messagebird() {
}
# -----------------------------------------------------------------------------
+# kavenegar sender
+
+send_kavenegar() {
+ local API_KEY="${1}" kavenegarsender="${2}" recipients="${3}" title="${4}" message="${5}" httpcode sent=0 user
+ if [ "${SEND_KAVENEGAR}" = "YES" -a ! -z "${API_KEY}" -a ! -z "${kavenegarsender}" -a ! -z "${recipients}" -a ! -z "${message}" -a ! -z "${title}" ]
+ then
+ # http://api.kavenegar.com/v1/{API-KEY}/sms/send.json
+ for user in ${recipients}
+ do
+ httpcode=$(docurl -X POST http://api.kavenegar.com/v1/${API_KEY}/sms/send.json \
+ --data-urlencode "sender=${kavenegarsender}" \
+ --data-urlencode "receptor=${user}" \
+ --data-urlencode "message=${title} ${message}")
+
+ if [ "${httpcode}" = "201" ]
+ then
+ info "sent Kavenegar SMS for: ${host} ${chart}.${name} is ${status} to '${user}'"
+ sent=$((sent + 1))
+ else
+ error "failed to send Kavenegar SMS for: ${host} ${chart}.${name} is ${status} to '${user}' with HTTP error code ${httpcode}."
+ fi
+ done
+
+ [ ${sent} -gt 0 ] && return 0
+ fi
+
+ return 1
+}
+
+# -----------------------------------------------------------------------------
# telegram sender
send_telegram() {
@@ -1079,11 +1165,11 @@ send_telegram() {
--data-urlencode "text=${emoji} ${message}" \
"https://api.telegram.org/bot${bottoken}/sendMessage?chat_id=${chatid}")
- if [ "${httpcode}" == "200" ]
+ if [ "${httpcode}" = "200" ]
then
info "sent telegram notification for: ${host} ${chart}.${name} is ${status} to '${chatid}'"
sent=$((sent + 1))
- elif [ "${httpcode}" == "401" ]
+ elif [ "${httpcode}" = "401" ]
then
error "failed to send telegram notification for: ${host} ${chart}.${name} is ${status} to '${chatid}': Wrong bot token."
else
@@ -1147,7 +1233,7 @@ EOF
)"
httpcode=$(docurl -X POST --data-urlencode "payload=${payload}" "${webhook}")
- if [ "${httpcode}" == "200" ]
+ if [ "${httpcode}" = "200" ]
then
info "sent slack notification for: ${host} ${chart}.${name} is ${status} to '${channel}'"
sent=$((sent + 1))
@@ -1162,6 +1248,61 @@ EOF
}
# -----------------------------------------------------------------------------
+# flock sender
+
+send_flock() {
+ local webhook="${1}" channels="${2}" httpcode sent=0 channel color payload
+
+ [ "${SEND_FLOCK}" != "YES" ] && return 1
+
+ case "${status}" in
+ WARNING) color="warning" ;;
+ CRITICAL) color="danger" ;;
+ CLEAR) color="good" ;;
+ *) color="#777777" ;;
+ esac
+
+ for channel in ${channels}
+ do
+ httpcode=$(docurl -X POST "${webhook}" -H "Content-Type: application/json" -d "{
+ \"sendAs\": {
+ \"name\" : \"netdata on ${host}\",
+ \"profileImage\" : \"${images_base_url}/images/seo-performance-128.png\"
+ },
+ \"text\": \"${host} *${status_message}*\",
+ \"timestamp\": \"${when}\",
+ \"attachments\": [
+ {
+ \"description\": \"${chart} (${family}) - ${info}\",
+ \"color\": \"${color}\",
+ \"title\": \"${alarm}\",
+ \"url\": \"${goto_url}\",
+ \"text\": \"${info}\",
+ \"views\": {
+ \"image\": {
+ \"original\": { \"src\": \"${image}\", \"width\": 400, \"height\": 400 },
+ \"thumbnail\": { \"src\": \"${image}\", \"width\": 50, \"height\": 50 },
+ \"filename\": \"${image}\"
+ }
+ }
+ }
+ ]
+ }" )
+ if [ "${httpcode}" = "200" ]
+ then
+ info "sent flock notification for: ${host} ${chart}.${name} is ${status} to '${channel}'"
+ sent=$((sent + 1))
+ else
+ error "failed to send flock notification for: ${host} ${chart}.${name} is ${status} to '${channel}', with HTTP error code ${httpcode}."
+ fi
+ done
+
+ [ ${sent} -gt 0 ] && return 0
+
+ return 1
+}
+
+# -----------------------------------------------------------------------------
# discord sender
send_discord() {
@@ -1210,7 +1351,7 @@ EOF
)"
httpcode=$(docurl -X POST --data-urlencode "payload=${payload}" "${webhook}")
- if [ "${httpcode}" == "200" ]
+ if [ "${httpcode}" = "200" ]
then
info "sent discord notification for: ${host} ${chart}.${name} is ${status} to '${channel}'"
sent=$((sent + 1))
@@ -1266,7 +1407,7 @@ case "${status}" in
WARNING)
image="${images_base_url}/images/alert-128-orange.png"
status_message="needs attention"
- color="#caca4b"
+ color="#ffc107"
;;
CLEAR)
@@ -1325,6 +1466,15 @@ send_slack "${SLACK_WEBHOOK_URL}" "${to_slack}"
SENT_SLACK=$?
# -----------------------------------------------------------------------------
+# send the flock notification
+
+# flock aggregates posts from the same username
+# so we use "${host} ${status}" as the bot username, to make them diff
+
+send_flock "${FLOCK_WEBHOOK_URL}" "${to_flock}"
+SENT_FLOCK=$?
+
+# -----------------------------------------------------------------------------
# send the discord notification
# discord aggregates posts from the same username
@@ -1351,11 +1501,11 @@ SENT_PUSHOVER=$?
# -----------------------------------------------------------------------------
# send the pushbullet notification
-send_pushbullet "${PUSHBULLET_ACCESS_TOKEN}" "${to_pushbullet}" "${host} ${status_message} - ${name//_/ } - ${chart}" "${alarm}\n
+send_pushbullet "${PUSHBULLET_ACCESS_TOKEN}" "${PUSHBULLET_SOURCE_DEVICE}" "${to_pushbullet}" "${goto_url}" "${host} ${status_message} - ${name//_/ } - ${chart}" "${alarm}\n
Severity: ${severity}\n
Chart: ${chart}\n
Family: ${family}\n
-To View Netdata go to: ${goto_url}\n
+$(date -d @${when})\n
The source of this alarm is line ${src}"
SENT_PUSHBULLET=$?
@@ -1384,6 +1534,18 @@ SENT_MESSAGEBIRD=$?
# -----------------------------------------------------------------------------
+# send the kavenegar SMS
+
+send_kavenegar "${KAVENEGAR_API_KEY}" "${KAVENEGAR_SENDER}" "${to_kavenegar}" "${host} ${status_message} - ${name//_/ } - ${chart}" "${alarm}
+Severity: ${severity}
+Chart: ${chart}
+Family: ${family}
+${info}"
+
+SENT_KAVENEGAR=$?
+
+
+# -----------------------------------------------------------------------------
# send the telegram.org message
# https://core.telegram.org/bots/api#formatting-options
@@ -1559,6 +1721,7 @@ Content-Transfer-Encoding: 8bit
</table>
</body>
</html>
+--multipart-boundary--
EOF
SENT_EMAIL=$?
@@ -1570,10 +1733,12 @@ if [ ${SENT_EMAIL} -eq 0 \
-o ${SENT_PUSHOVER} -eq 0 \
-o ${SENT_TELEGRAM} -eq 0 \
-o ${SENT_SLACK} -eq 0 \
+ -o ${SENT_FLOCK} -eq 0 \
-o ${SENT_DISCORD} -eq 0 \
-o ${SENT_TWILIO} -eq 0 \
-o ${SENT_HIPCHAT} -eq 0 \
-o ${SENT_MESSAGEBIRD} -eq 0 \
+ -o ${SENT_KAVENEGAR} -eq 0 \
-o ${SENT_PUSHBULLET} -eq 0 \
-o ${SENT_KAFKA} -eq 0 \
-o ${SENT_PD} -eq 0 \
@@ -1586,4 +1751,3 @@ fi
# we did not send anything
exit 1
-
diff --git a/plugins.d/cgroup-name.sh b/plugins.d/cgroup-name.sh
index dc0bf7553..acdd6f4f9 100755
--- a/plugins.d/cgroup-name.sh
+++ b/plugins.d/cgroup-name.sh
@@ -126,6 +126,11 @@ if [ -z "${NAME}" ]
# NAME="$(echo ${CGROUP} | sed 's/machine.slice_machine.*-qemu//; s/\/x2d//; s/\/x2d/\-/g; s/\.scope//g')"
NAME="qemu_$(echo ${CGROUP} | sed 's/machine.slice_machine.*-qemu//; s/\/x2d[[:digit:]]*//; s/\/x2d//g; s/\.scope//g')"
+ elif [[ "${CGROUP}" =~ machine_.*\.libvirt-qemu ]]
+ then
+ # libvirtd / qemu virtual machines
+ NAME="qemu_$(echo ${CGROUP} | sed 's/^machine_//; s/\.libvirt-qemu$//; s/-/_/;')"
+
elif [[ "${CGROUP}" =~ qemu.slice_([0-9]+).scope && -d /etc/pve ]]
then
# Proxmox VMs
diff --git a/plugins.d/cgroup-network-helper.sh b/plugins.d/cgroup-network-helper.sh
new file mode 100755
index 000000000..d93fe356a
--- /dev/null
+++ b/plugins.d/cgroup-network-helper.sh
@@ -0,0 +1,256 @@
+#!/usr/bin/env bash
+
+# cgroup-network-helper.sh
+# detect container and virtual machine interfaces
+#
+# (C) 2017 Costa Tsaousis
+# GPL v3+
+#
+# This script is called as root (by cgroup-network), with either a pid, or a cgroup path.
+# It tries to find all the network interfaces that belong to the same cgroup.
+#
+# It supports several method for this detection:
+#
+# 1. cgroup-network (the binary father of this script) detects veth network interfaces,
+# by examining iflink and ifindex IDs and switching namespaces
+# (it also detects the interface name as it is used by the container).
+#
+# 2. this script, uses /proc/PID/fdinfo to find tun/tap network interfaces.
+#
+# 3. this script, calls virsh to find libvirt network interfaces.
+#
+
+# -----------------------------------------------------------------------------
+
+export PATH="${PATH}:/sbin:/usr/sbin:/usr/local/sbin"
+export LC_ALL=C
+
+PROGRAM_NAME="$(basename "${0}")"
+
+logdate() {
+ date "+%Y-%m-%d %H:%M:%S"
+}
+
+log() {
+ local status="${1}"
+ shift
+
+ echo >&2 "$(logdate): ${PROGRAM_NAME}: ${status}: ${*}"
+
+}
+
+warning() {
+ log WARNING "${@}"
+}
+
+error() {
+ log ERROR "${@}"
+}
+
+info() {
+ log INFO "${@}"
+}
+
+fatal() {
+ log FATAL "${@}"
+ exit 1
+}
+
+debug=0
+debug() {
+ [ "${debug}" = "1" ] && log DEBUG "${@}"
+}
+
+# -----------------------------------------------------------------------------
+# check for BASH v4+ (required for associative arrays)
+
+[ $(( ${BASH_VERSINFO[0]} )) -lt 4 ] && \
+ fatal "BASH version 4 or later is required (this is ${BASH_VERSION})."
+
+# -----------------------------------------------------------------------------
+# defaults to allow running this script by hand
+
+[ -z "${NETDATA_PLUGINS_DIR}" ] && NETDATA_PLUGINS_DIR="$(dirname "${0}")"
+[ -z "${NETDATA_CONFIG_DIR}" ] && NETDATA_CONFIG_DIR="$(dirname "${0}")/../../../../etc/netdata"
+[ -z "${NETDATA_CACHE_DIR}" ] && NETDATA_CACHE_DIR="$(dirname "${0}")/../../../../var/cache/netdata"
+
+# -----------------------------------------------------------------------------
+# parse the arguments
+
+pid=
+cgroup=
+while [ ! -z "${1}" ]
+do
+ case "${1}" in
+ --cgroup) cgroup="${2}"; shift 1;;
+ --pid|-p) pid="${2}"; shift 1;;
+ --debug|debug) debug=1;;
+ *) fatal "Cannot understand argument '${1}'";;
+ esac
+
+ shift
+done
+
+if [ -z "${pid}" -a -z "${cgroup}" ]
+then
+ fatal "Either --pid or --cgroup is required"
+fi
+
+# -----------------------------------------------------------------------------
+
+set_source() {
+ [ ${debug} -eq 1 ] && echo "SRC ${*}"
+}
+
+
+# -----------------------------------------------------------------------------
+# veth interfaces via cgroup
+
+# cgroup-network can detect veth interfaces by itself (written in C).
+# If you seek for a shell version of what it does, check this:
+# https://github.com/firehol/netdata/issues/474#issuecomment-317866709
+
+
+# -----------------------------------------------------------------------------
+# tun/tap interfaces via /proc/PID/fdinfo
+
+# find any tun/tap devices linked to a pid
+proc_pid_fdinfo_iff() {
+ local p="${1}" # the pid
+
+ debug "Searching for tun/tap interfaces for pid ${p}..."
+ set_source "fdinfo"
+ grep ^iff:.* "${NETDATA_HOST_PREFIX}/proc/${p}/fdinfo"/* 2>/dev/null | cut -f 2
+}
+
+find_tun_tap_interfaces_for_cgroup() {
+ local c="${1}" # the cgroup path
+
+ # for each pid of the cgroup
+ # find any tun/tap devices linked to the pid
+ if [ -f "${c}/emulator/cgroup.procs" ]
+ then
+ local p
+ for p in $(< "${c}/emulator/cgroup.procs" )
+ do
+ proc_pid_fdinfo_iff ${p}
+ done
+ fi
+}
+
+
+# -----------------------------------------------------------------------------
+# virsh domain network interfaces
+
+virsh_cgroup_to_domain_name() {
+ local c="${1}" # the cgroup path
+
+ debug "extracting a possible virsh domain from cgroup ${c}..."
+
+ # extract for the cgroup path
+ sed -n -e "s|.*/machine-qemu\\\\x2d[0-9]\+\\\\x2d\(.*\)\.scope$|\1|p" \
+ -e "s|.*/machine/\(.*\)\.libvirt-qemu$|\1|p" \
+ <<EOF
+${c}
+EOF
+}
+
+virsh_find_all_interfaces_for_cgroup() {
+ local c="${1}" # the cgroup path
+
+ # the virsh command
+ local virsh="$(which virsh 2>/dev/null || command -v virsh 2>/dev/null)"
+
+ if [ ! -z "${virsh}" ]
+ then
+ local d="$(virsh_cgroup_to_domain_name "${c}")"
+
+ if [ ! -z "${d}" ]
+ then
+ debug "running: virsh domiflist ${d}; to find the network interfaces"
+
+ # match only 'network' interfaces from virsh output
+
+ set_source "virsh"
+ "${virsh}" domiflist ${d} |\
+ sed -n \
+ -e "s|^\([^[:space:]]\+\)[[:space:]]\+network[[:space:]]\+\([^[:space:]]\+\)[[:space:]]\+[^[:space:]]\+[[:space:]]\+[^[:space:]]\+$|\1 \1_\2|p" \
+ -e "s|^\([^[:space:]]\+\)[[:space:]]\+bridge[[:space:]]\+\([^[:space:]]\+\)[[:space:]]\+[^[:space:]]\+[[:space:]]\+[^[:space:]]\+$|\1 \1_\2|p"
+ else
+ debug "no virsh domain extracted from cgroup ${c}"
+ fi
+ else
+ debug "virsh command is not available"
+ fi
+}
+
+# -----------------------------------------------------------------------------
+
+find_all_interfaces_of_pid_or_cgroup() {
+ local p="${1}" c="${2}" # the pid and the cgroup path
+
+ if [ ! -z "${pid}" ]
+ then
+ # we have been called with a pid
+
+ proc_pid_fdinfo_iff ${p}
+
+ elif [ ! -z "${c}" ]
+ then
+ # we have been called with a cgroup
+
+ info "searching for network interfaces of cgroup '${c}'"
+
+ find_tun_tap_interfaces_for_cgroup "${c}"
+ virsh_find_all_interfaces_for_cgroup "${c}"
+
+ else
+
+ error "Either a pid or a cgroup path is needed"
+ return 1
+
+ fi
+
+ return 0
+}
+
+# -----------------------------------------------------------------------------
+
+# an associative array to store the interfaces
+# the index is the interface name as seen by the host
+# the value is the interface name as seen by the guest / container
+declare -A devs=()
+
+# store all interfaces found in the associative array
+# this will also give the unique devices, as seen by the host
+last_src=
+while read host_device guest_device
+do
+ [ -z "${host_device}" ] && continue
+
+ [ "${host_device}" = "SRC" ] && last_src="${guest_device}" && continue
+
+ # the default guest_device is the host_device
+ [ -z "${guest_device}" ] && guest_device="${host_device}"
+
+ # when we run in debug, show the source
+ debug "Found host device '${host_device}', guest device '${guest_device}', detected via '${last_src}'"
+
+ [ -z "${devs[${host_device}]}" -o "${devs[${host_device}]}" = "${host_device}" ] && \
+ devs[${host_device}]="${guest_device}"
+
+done < <( find_all_interfaces_of_pid_or_cgroup "${pid}" "${cgroup}" )
+
+# print the interfaces found, in the format netdata expects them
+found=0
+for x in "${!devs[@]}"
+do
+ found=$((found + 1))
+ echo "${x} ${devs[${x}]}"
+done
+
+debug "found ${found} network interfaces for pid '${pid}', cgroup '${cgroup}', run as ${USER}, ${UID}"
+
+# let netdata know if we found any
+[ ${found} -eq 0 ] && exit 1
+exit 0
diff --git a/plugins.d/python.d.plugin b/plugins.d/python.d.plugin
index 03c156f41..855080e81 100755
--- a/plugins.d/python.d.plugin
+++ b/plugins.d/python.d.plugin
@@ -1,599 +1,382 @@
#!/usr/bin/env bash
-'''':; exec "$(command -v python || command -v python3 || command -v python2 || echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
-# -*- coding: utf-8 -*-
+'''':; exec "$(command -v python || command -v python3 || command -v python2 ||
+echo "ERROR python IS NOT AVAILABLE IN THIS SYSTEM")" "$0" "$@" # '''
-# Description: netdata python modules supervisor
+# -*- coding: utf-8 -*-
+# Description:
# Author: Pawel Krupa (paulfantom)
+# Author: Ilya Mashchenko (l2isbad)
import os
import sys
-import time
import threading
+
from re import sub
+from sys import version_info, argv
+from time import sleep
+
+try:
+ from time import monotonic as time
+except ImportError:
+ from time import time
+
+PY_VERSION = version_info[:2]
+PLUGIN_CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR', os.path.dirname(__file__) + '/../../../../etc/netdata') + '/'
+CHARTS_PY_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR', os.path.dirname(__file__)) + '/../python.d') + '/'
+CHARTS_PY_CONFIG_DIR = PLUGIN_CONFIG_DIR + 'python.d/'
+PYTHON_MODULES_DIR = CHARTS_PY_DIR + 'python_modules'
+
+sys.path.append(PYTHON_MODULES_DIR)
+
+from bases.loaders import ModuleAndConfigLoader
+from bases.loggers import PythonDLogger
+from bases.collection import setdefault_values, run_and_exit
+
+try:
+ from collections import OrderedDict
+except ImportError:
+ from third_party.ordereddict import OrderedDict
-# -----------------------------------------------------------------------------
-# globals & environment setup
-# https://github.com/firehol/netdata/wiki/External-Plugins#environment-variables
-MODULE_EXTENSION = ".chart.py"
BASE_CONFIG = {'update_every': os.getenv('NETDATA_UPDATE_EVERY', 1),
- 'priority': 90000,
- 'retries': 10}
+ 'retries': 60,
+ 'priority': 60000,
+ 'autodetection_retry': 0,
+ 'chart_cleanup': 10,
+ 'name': str()}
-MODULES_DIR = os.path.abspath(os.getenv('NETDATA_PLUGINS_DIR',
- os.path.dirname(__file__)) + "/../python.d") + "/"
-CONFIG_DIR = os.getenv('NETDATA_CONFIG_DIR',
- os.path.dirname(__file__) + "/../../../../etc/netdata")
+MODULE_EXTENSION = '.chart.py'
+OBSOLETE_MODULES = ['apache_cache', 'gunicorn_log', 'nginx_log']
-# directories should end with '/'
-if CONFIG_DIR[-1] != "/":
- CONFIG_DIR += "/"
-sys.path.append(MODULES_DIR + "python_modules")
-PROGRAM = os.path.basename(__file__).replace(".plugin", "")
-DEBUG_FLAG = False
-TRACE_FLAG = False
-OVERRIDE_UPDATE_EVERY = False
+def module_ok(m):
+ return m.endswith(MODULE_EXTENSION) and m[:-len(MODULE_EXTENSION)] not in OBSOLETE_MODULES
-# -----------------------------------------------------------------------------
-# custom, third party and version specific python modules management
-import msg
-try:
- assert sys.version_info >= (3, 1)
- import importlib.machinery
- PY_VERSION = 3
- # change this hack below if we want PY_VERSION to be used in modules
- # import builtins
- # builtins.PY_VERSION = 3
- msg.info('Using python v3')
-except (AssertionError, ImportError):
- try:
- import imp
-
- # change this hack below if we want PY_VERSION to be used in modules
- # import __builtin__
- # __builtin__.PY_VERSION = 2
- PY_VERSION = 2
- msg.info('Using python v2')
- except ImportError:
- msg.fatal('Cannot start. No importlib.machinery on python3 or lack of imp on python2')
-# try:
-# import yaml
-# except ImportError:
-# msg.fatal('Cannot find yaml library')
-try:
- if PY_VERSION == 3:
- import pyyaml3 as yaml
- else:
- import pyyaml2 as yaml
-except ImportError:
- msg.fatal('Cannot find yaml library')
+ALL_MODULES = [m for m in sorted(os.listdir(CHARTS_PY_DIR)) if module_ok(m)]
-try:
- from collections import OrderedDict
- ORDERED = True
- DICT = OrderedDict
- msg.info('YAML output is ordered')
-except ImportError:
- try:
- from ordereddict import OrderedDict
- ORDERED = True
- DICT = OrderedDict
- msg.info('YAML output is ordered')
- except ImportError:
- ORDERED = False
- DICT = dict
- msg.info('YAML output is unordered')
-if ORDERED:
- def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict):
- class OrderedLoader(Loader):
- pass
-
- def construct_mapping(loader, node):
- loader.flatten_mapping(node)
- return object_pairs_hook(loader.construct_pairs(node))
- OrderedLoader.add_constructor(
- yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG,
- construct_mapping)
- return yaml.load(stream, OrderedLoader)
-
-
-class PythonCharts(object):
- """
- Main class used to control every python module.
- """
-
- def __init__(self,
- modules=None,
- modules_path='../python.d/',
- modules_configs='../conf.d/',
- modules_disabled=None,
- modules_enabled=None,
- default_run=None):
+
+def parse_cmd():
+ debug = 'debug' in argv[1:]
+ trace = 'trace' in argv[1:]
+ override_update_every = next((arg for arg in argv[1:] if arg.isdigit() and int(arg) > 1), False)
+ modules = [''.join([m, MODULE_EXTENSION]) for m in argv[1:] if ''.join([m, MODULE_EXTENSION]) in ALL_MODULES]
+ return debug, trace, override_update_every, modules or ALL_MODULES
+
+
+def multi_job_check(config):
+ return next((True for key in config if isinstance(config[key], dict)), False)
+
+
+class Job(object):
+ def __init__(self, initialized_job, job_id):
"""
- :param modules: list
- :param modules_path: str
- :param modules_configs: str
- :param modules_disabled: list
- :param modules_enabled: list
- :param default_run: bool
+ :param initialized_job: instance of <Class Service>
+ :param job_id: <str>
"""
+ self.job = initialized_job
+ self.id = job_id # key in Modules.jobs()
+ self.module_name = self.job.__module__ # used in Plugin.delete_job()
+ self.recheck_every = self.job.configuration.pop('autodetection_retry')
+ self.checked = False # used in Plugin.check_job()
+ self.created = False # used in Plugin.create_job_charts()
+ if OVERRIDE_UPDATE_EVERY:
+ self.job.update_every = int(OVERRIDE_UPDATE_EVERY)
- if modules is None:
- modules = []
- if modules_disabled is None:
- modules_disabled = []
+ def __getattr__(self, item):
+ return getattr(self.job, item)
- self.first_run = True
- # set configuration directory
- self.configs = modules_configs
+ def __repr__(self):
+ return self.job.__repr__()
- # load modules
- loaded_modules = self._load_modules(modules_path, modules, modules_disabled, modules_enabled, default_run)
+ def is_dead(self):
+ return bool(self.ident) and not self.is_alive()
- # load configuration files
- configured_modules = self._load_configs(loaded_modules)
+ def not_launched(self):
+ return not bool(self.ident)
- # good economy and prosperity:
- self.jobs = self._create_jobs(configured_modules) # type <list>
+ def is_autodetect(self):
+ return self.recheck_every
- # enable timetable override like `python.d.plugin mysql debug 1`
- if DEBUG_FLAG and OVERRIDE_UPDATE_EVERY:
- for job in self.jobs:
- job.create_timetable(BASE_CONFIG['update_every'])
- @staticmethod
- def _import_module(path, name=None):
+class Module(object):
+ def __init__(self, service, config):
"""
- Try to import module using only its path.
- :param path: str
- :param name: str
- :return: object
+ :param service: <Module>
+ :param config: <dict>
"""
+ self.service = service
+ self.name = service.__name__
+ self.config = self.jobs_configurations_builder(config)
+ self.jobs = OrderedDict()
+ self.counter = 1
- if name is None:
- name = path.split('/')[-1]
- if name[-len(MODULE_EXTENSION):] != MODULE_EXTENSION:
- return None
- name = name[:-len(MODULE_EXTENSION)]
- try:
- if PY_VERSION == 3:
- return importlib.machinery.SourceFileLoader(name, path).load_module()
- else:
- return imp.load_source(name, path)
- except Exception as e:
- msg.error("Problem loading", name, str(e))
- return None
+ self.initialize_jobs()
+
+ def __repr__(self):
+ return "<Class Module '{name}'>".format(name=self.name)
+
+ def __iter__(self):
+ return iter(OrderedDict(self.jobs).values())
+
+ def __getitem__(self, item):
+ return self.jobs[item]
+
+ def __delitem__(self, key):
+ del self.jobs[key]
+
+ def __len__(self):
+ return len(self.jobs)
+
+ def __bool__(self):
+ return bool(self.jobs)
- def _load_modules(self, path, modules, disabled, enabled, default_run):
+ def __nonzero__(self):
+ return self.__bool__()
+
+ def jobs_configurations_builder(self, config):
"""
- Load modules from 'modules' list or dynamically every file from 'path' (only .chart.py files)
- :param path: str
- :param modules: list
- :param disabled: list
- :return: list
+ :param config: <dict>
+ :return:
"""
+ counter = 0
+ job_base_config = dict()
- # check if plugin directory exists
- if not os.path.isdir(path):
- msg.fatal("cannot find charts directory ", path)
-
- # load modules
- loaded = []
- if len(modules) > 0:
- for m in modules:
- if m in disabled:
- continue
- mod = self._import_module(path + m + MODULE_EXTENSION)
- if mod is not None:
- loaded.append(mod)
- else: # exit if plugin is not found
- msg.fatal('no modules found.')
- else:
- # scan directory specified in path and load all modules from there
- if default_run is False:
- names = [module for module in os.listdir(path) if module[:-9] in enabled]
- else:
- names = os.listdir(path)
- for mod in names:
- if mod.replace(MODULE_EXTENSION, "") in disabled:
- msg.error(mod + ": disabled module ", mod.replace(MODULE_EXTENSION, ""))
- continue
- m = self._import_module(path + mod)
- if m is not None:
- msg.debug(mod + ": loading module '" + path + mod + "'")
- loaded.append(m)
- return loaded
-
- def _load_configs(self, modules):
+ for attr in BASE_CONFIG:
+ job_base_config[attr] = config.pop(attr, getattr(self.service, attr, BASE_CONFIG[attr]))
+
+ if not config:
+ config = {str(): dict()}
+ elif not multi_job_check(config):
+ config = {str(): config}
+
+ for job_name in config:
+ if not isinstance(config[job_name], dict):
+ continue
+
+ job_config = setdefault_values(config[job_name], base_dict=job_base_config)
+ job_name = sub(r'\s+', '_', job_name)
+ config[job_name]['name'] = sub(r'\s+', '_', config[job_name]['name'])
+ counter += 1
+ job_id = 'job' + str(counter).zfill(3)
+
+ yield job_id, job_name, job_config
+
+ def initialize_jobs(self):
"""
- Append configuration in list named `config` to every module.
- For multi-job modules `config` list is created in _parse_config,
- otherwise it is created here based on BASE_CONFIG prototype with None as identifier.
- :param modules: list
- :return: list
+ :return:
"""
- for mod in modules:
- configfile = self.configs + mod.__name__ + ".conf"
- if os.path.isfile(configfile):
- msg.debug(mod.__name__ + ": loading module configuration: '" + configfile + "'")
- try:
- if not hasattr(mod, 'config'):
- mod.config = {}
- setattr(mod,
- 'config',
- self._parse_config(mod, read_config(configfile)))
- except Exception as e:
- msg.error(mod.__name__ + ": cannot parse configuration file '" + configfile + "':", str(e))
+ for job_id, job_name, job_config in self.config:
+ job_config['job_name'] = job_name
+ job_config['override_name'] = job_config.pop('name')
+
+ try:
+ initialized_job = self.service.Service(configuration=job_config)
+ except Exception as error:
+ Logger.error("job initialization: '{module_name} {job_name}' "
+ "=> ['FAILED'] ({error})".format(module_name=self.name,
+ job_name=job_name,
+ error=error))
+ continue
+ else:
+ Logger.debug("job initialization: '{module_name} {job_name}' "
+ "=> ['OK']".format(module_name=self.name,
+ job_name=job_name or self.name))
+ self.jobs[job_id] = Job(initialized_job=initialized_job,
+ job_id=job_id)
+ del self.config
+ del self.service
+
+
+class Plugin(object):
+ def __init__(self):
+ self.loader = ModuleAndConfigLoader()
+ self.modules = OrderedDict()
+ self.sleep_time = 1
+ self.runs_counter = 0
+ self.config, error = self.loader.load_config_from_file(PLUGIN_CONFIG_DIR + 'python.d.conf')
+ if error:
+ run_and_exit(Logger.error)(error)
+
+ if not self.config.get('enabled', True):
+ run_and_exit(Logger.info)('DISABLED in configuration file.')
+
+ self.load_and_initialize_modules()
+ if not self.modules:
+ run_and_exit(Logger.info)('No modules to run. Exit...')
+
+ def __iter__(self):
+ return iter(OrderedDict(self.modules).values())
+
+ @property
+ def jobs(self):
+ return (job for mod in self for job in mod)
+
+ @property
+ def dead_jobs(self):
+ return (job for job in self.jobs if job.is_dead())
+
+ @property
+ def autodetect_jobs(self):
+ return [job for job in self.jobs if job.not_launched()]
+
+ def enabled_modules(self):
+ for mod in MODULES_TO_RUN:
+ mod_name = mod[:-len(MODULE_EXTENSION)]
+ mod_path = CHARTS_PY_DIR + mod
+ conf_path = ''.join([CHARTS_PY_CONFIG_DIR, mod_name, '.conf'])
+
+ if DEBUG:
+ yield mod, mod_name, mod_path, conf_path
else:
- msg.error(mod.__name__ + ": configuration file '" + configfile + "' not found. Using defaults.")
- # set config if not found
- if not hasattr(mod, 'config'):
- msg.debug(mod.__name__ + ": setting configuration for only one job")
- mod.config = {None: {}}
- for var in BASE_CONFIG:
- try:
- mod.config[None][var] = getattr(mod, var)
- except AttributeError:
- mod.config[None][var] = BASE_CONFIG[var]
- return modules
+ if all([self.config.get('default_run', True),
+ self.config.get(mod_name, True)]):
+ yield mod, mod_name, mod_path, conf_path
+
+ elif all([not self.config.get('default_run'),
+ self.config.get(mod_name)]):
+ yield mod, mod_name, mod_path, conf_path
+
+ def load_and_initialize_modules(self):
+ for mod, mod_name, mod_path, conf_path in self.enabled_modules():
+
+ # Load module from file ------------------------------------------------------------
+ loaded_module, error = self.loader.load_module_from_file(mod_name, mod_path)
+ log = Logger.error if error else Logger.debug
+ log("module load source: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
+ module_name=mod_name))
+ if error:
+ Logger.error("load source error : {0}".format(error))
+ continue
+
+ # Load module config from file ------------------------------------------------------
+ loaded_config, error = self.loader.load_config_from_file(conf_path)
+ log = Logger.error if error else Logger.debug
+ log("module load config: '{module_name}' => [{status}]".format(status='FAILED' if error else 'OK',
+ module_name=mod_name))
+ if error:
+ Logger.error('load config error : {0}'.format(error))
+
+ # Service instance initialization ---------------------------------------------------
+ initialized_module = Module(service=loaded_module, config=loaded_config)
+ Logger.debug("module status: '{module_name}' => [{status}] "
+ "(jobs: {jobs_number})".format(status='OK' if initialized_module else 'FAILED',
+ module_name=initialized_module.name,
+ jobs_number=len(initialized_module)))
+
+ if initialized_module:
+ self.modules[initialized_module.name] = initialized_module
@staticmethod
- def _parse_config(module, config):
+ def check_job(job):
"""
- Parse configuration file or extract configuration from module file.
- Example of returned dictionary:
- config = {'name': {
- 'update_every': 2,
- 'retries': 3,
- 'priority': 30000
- 'other_val': 123}}
- :param module: object
- :param config: dict
- :return: dict
+ :param job: <Job>
+ :return:
"""
- if config is None:
- config = {}
- # get default values
- defaults = {}
- msg.debug(module.__name__ + ": reading configuration")
- for key in BASE_CONFIG:
- try:
- # get defaults from module config
- defaults[key] = int(config.pop(key))
- except (KeyError, ValueError):
- try:
- # get defaults from module source code
- defaults[key] = getattr(module, key)
- except (KeyError, ValueError, AttributeError):
- # if above failed, get defaults from global dict
- defaults[key] = BASE_CONFIG[key]
-
- # check if there are dict in config dict
- many_jobs = False
- for name in config:
- if isinstance(config[name], DICT):
- many_jobs = True
- break
-
- # assign variables needed by supervisor to every job configuration
- if many_jobs:
- for name in config:
- for key in defaults:
- if key not in config[name]:
- config[name][key] = defaults[key]
- # if only one job is needed, values doesn't have to be in dict (in YAML)
+ try:
+ check_ok = bool(job.check())
+ except Exception as error:
+ job.error('check() unhandled exception: {error}'.format(error=error))
+ return None
else:
- config = {None: config.copy()}
- config[None].update(defaults)
-
- # return dictionary of jobs where every job has BASE_CONFIG variables
- return config
+ return check_ok
@staticmethod
- def _create_jobs(modules):
- """
- Create jobs based on module.config dictionary and module.Service class definition.
- :param modules: list
- :return: list
+ def create_job_charts(job):
"""
- jobs = []
- for module in modules:
- for name in module.config:
- # register a new job
- conf = module.config[name]
- try:
- job = module.Service(configuration=conf, name=name)
- except Exception as e:
- msg.error(module.__name__ +
- ("/" + str(name) if name is not None else "") +
- ": cannot start job: '" +
- str(e))
- continue
- else:
- # set chart_name (needed to plot run time graphs)
- job.chart_name = module.__name__
- if name is not None:
- job.chart_name += "_" + name
- jobs.append(job)
- msg.debug(module.__name__ + ("/" + str(name) if name is not None else "") + ": job added")
-
- return [j for j in jobs if j is not None]
-
- def _stop(self, job, reason=None):
+ :param job: <Job>
+ :return:
"""
- Stop specified job and remove it from self.jobs list
- Also notifies user about job failure if DEBUG_FLAG is set
- :param job: object
- :param reason: str
- """
- prefix = job.__module__
- if job.name is not None and len(job.name) != 0:
- prefix += "/" + job.name
try:
- msg.error("DISABLED:", prefix)
- self.jobs.remove(job)
- except Exception as e:
- msg.debug("This shouldn't happen. NO " + prefix + " IN LIST:" + str(self.jobs) + " ERROR: " + str(e))
-
- # TODO remove section below and remove `reason`.
- prefix += ": "
- if reason is None:
- return
- elif reason[:3] == "no ":
- msg.error(prefix +
- "does not seem to have " +
- reason[3:] +
- "() function. Disabling it.")
- elif reason[:7] == "failed ":
- msg.error(prefix +
- reason[7:] +
- "() function reports failure.")
- elif reason[:13] == "configuration":
- msg.error(prefix +
- "configuration file '" +
- self.configs +
- job.__module__ +
- ".conf' not found. Using defaults.")
- elif reason[:11] == "misbehaving":
- msg.error(prefix + "is " + reason)
-
- def check(self):
- """
- Tries to execute check() on every job.
- This cannot fail thus it is catching every exception
- If job.check() fails job is stopped
- """
- i = 0
- overridden = []
- msg.debug("all job objects", str(self.jobs))
- while i < len(self.jobs):
- job = self.jobs[i]
- try:
- if not job.check():
- msg.error(job.chart_name, "check() failed - disabling job")
- self._stop(job)
- else:
- msg.info("CHECKED OK:", job.chart_name)
- i += 1
- try:
- if job.override_name is not None:
- new_name = job.__module__ + '_' + sub(r'\s+', '_', job.override_name)
- if new_name in overridden:
- msg.info("DROPPED:", job.name, ", job '" + job.override_name +
- "' is already served by another job.")
- self._stop(job)
- i -= 1
- else:
- job.name = job.override_name
- msg.info("RENAMED:", new_name, ", from " + job.chart_name)
- job.chart_name = new_name
- overridden.append(job.chart_name)
- except Exception:
- pass
- except AttributeError as e:
- self._stop(job)
- msg.error(job.chart_name, "cannot find check() function or it thrown unhandled exception.")
- msg.debug(str(e))
- except (UnboundLocalError, Exception) as e:
- msg.error(job.chart_name, str(e))
- self._stop(job)
- msg.debug("overridden job names:", str(overridden))
- msg.debug("all remaining job objects:", str(self.jobs))
-
- def create(self):
- """
- Tries to execute create() on every job.
- This cannot fail thus it is catching every exception.
- If job.create() fails job is stopped.
- This is also creating job run time chart.
- """
- i = 0
- while i < len(self.jobs):
- job = self.jobs[i]
- try:
- if not job.create():
- msg.error(job.chart_name, "create function failed.")
- self._stop(job)
- else:
- chart = job.chart_name
- sys.stdout.write(
- "CHART netdata.plugin_pythond_" +
- chart +
- " '' 'Execution time for " +
- chart +
- " plugin' 'milliseconds / run' python.d netdata.plugin_python area 145000 " +
- str(job.timetable['freq']) +
- '\n')
- sys.stdout.write("DIMENSION run_time 'run time' absolute 1 1\n\n")
- msg.debug("created charts for", job.chart_name)
- # sys.stdout.flush()
- i += 1
- except AttributeError:
- msg.error(job.chart_name, "cannot find create() function or it thrown unhandled exception.")
- self._stop(job)
- except (UnboundLocalError, Exception) as e:
- msg.error(job.chart_name, str(e))
- self._stop(job)
-
- def update(self):
+ create_ok = job.create()
+ except Exception as error:
+ job.error('create() unhandled exception: {error}'.format(error=error))
+ return False
+ else:
+ return create_ok
+
+ def delete_job(self, job):
"""
- Creates and supervises every job thread.
- This will stay forever and ever and ever forever and ever it'll be the one...
+ :param job: <Job>
+ :return:
"""
- for job in self.jobs:
- job.start()
+ del self.modules[job.module_name][job.id]
- while True:
- if threading.active_count() <= 1:
- msg.fatal("no more jobs")
- time.sleep(1)
-
-
-def read_config(path):
- """
- Read YAML configuration from specified file
- :param path: str
- :return: dict
- """
- try:
- with open(path, 'r') as stream:
- if ORDERED:
- config = ordered_load(stream, yaml.SafeLoader)
+ def run_check(self):
+ checked = list()
+ for job in self.jobs:
+ if job.name in checked:
+ job.info('check() => [DROPPED] (already served by another job)')
+ self.delete_job(job)
+ continue
+ ok = self.check_job(job)
+ if ok:
+ job.info('check() => [OK]')
+ checked.append(job.name)
+ job.checked = True
+ continue
+ if not job.is_autodetect() or ok is None:
+ job.error('check() => [FAILED]')
+ self.delete_job(job)
else:
- config = yaml.load(stream)
- except (OSError, IOError) as error:
- msg.error(str(path), 'reading error:', str(error))
- return None
- except yaml.YAMLError as error:
- msg.error(str(path), "is malformed:", str(error))
- return None
- return config
-
-
-def parse_cmdline(directory, *commands):
- """
- Parse parameters from command line.
- :param directory: str
- :param commands: list of str
- :return: dict
- """
- global DEBUG_FLAG, TRACE_FLAG
- global OVERRIDE_UPDATE_EVERY
- global BASE_CONFIG
-
- changed_update = False
- mods = []
- for cmd in commands[1:]:
- if cmd == "check":
- pass
- elif cmd == "debug" or cmd == "all":
- DEBUG_FLAG = True
- # redirect stderr to stdout?
- elif cmd == "trace" or cmd == "all":
- TRACE_FLAG = True
- elif os.path.isfile(directory + cmd + ".chart.py") or os.path.isfile(directory + cmd):
- # DEBUG_FLAG = True
- mods.append(cmd.replace(".chart.py", ""))
- else:
- try:
- BASE_CONFIG['update_every'] = int(cmd)
- changed_update = True
- except ValueError:
- pass
- if changed_update and DEBUG_FLAG:
- OVERRIDE_UPDATE_EVERY = True
- msg.debug(PROGRAM, "overriding update interval to", str(BASE_CONFIG['update_every']))
-
- msg.debug("started from", commands[0], "with options:", *commands[1:])
-
- return mods
-
-
-# if __name__ == '__main__':
-def run():
- """
- Main program.
- """
- global DEBUG_FLAG, TRACE_FLAG, BASE_CONFIG
-
- # read configuration file
- disabled = ['nginx_log', 'gunicorn_log', 'apache_cache']
- enabled = list()
- default_run = True
- configfile = CONFIG_DIR + "python.d.conf"
- msg.PROGRAM = PROGRAM
- msg.info("reading configuration file:", configfile)
- log_throttle = 200
- log_interval = 3600
-
- conf = read_config(configfile)
- if conf is not None:
- try:
- # exit the whole plugin when 'enabled: no' is set in 'python.d.conf'
- if conf['enabled'] is False:
- msg.fatal('disabled in configuration file.\n')
- except (KeyError, TypeError):
- pass
-
- try:
- for param in BASE_CONFIG:
- BASE_CONFIG[param] = conf[param]
- except (KeyError, TypeError):
- pass # use default update_every from NETDATA_UPDATE_EVERY
-
- try:
- DEBUG_FLAG = conf['debug']
- except (KeyError, TypeError):
- pass
-
- try:
- TRACE_FLAG = conf['trace']
- except (KeyError, TypeError):
- pass
-
- try:
- log_throttle = conf['logs_per_interval']
- except (KeyError, TypeError):
- pass
+ job.error('check() => [RECHECK] (autodetection_retry: {0})'.format(job.recheck_every))
- try:
- log_interval = conf['log_interval']
- except (KeyError, TypeError):
- pass
+ def run_create(self):
+ for job in self.jobs:
+ if not job.checked:
+ # skip autodetection_retry jobs
+ continue
+ ok = self.create_job_charts(job)
+ if ok:
+ job.debug('create() => [OK] (charts: {0})'.format(len(job.charts)))
+ job.created = True
+ continue
+ job.error('create() => [FAILED] (charts: {0})'.format(len(job.charts)))
+ self.delete_job(job)
- default_run = True if ('default_run' not in conf or conf.get('default_run')) else False
+ def start(self):
+ self.run_check()
+ self.run_create()
+ for job in self.jobs:
+ if job.created:
+ job.start()
- for k, v in conf.items():
- if k in ("update_every", "debug", "enabled", "default_run"):
- continue
- if default_run:
- if v is False:
- disabled.append(k)
- else:
- if v is True:
- enabled.append(k)
- # parse passed command line arguments
- modules = parse_cmdline(MODULES_DIR, *sys.argv)
- msg.DEBUG_FLAG = DEBUG_FLAG
- msg.TRACE_FLAG = TRACE_FLAG
- msg.LOG_THROTTLE = log_throttle
- msg.LOG_INTERVAL = log_interval
- msg.LOG_COUNTER = 0
- msg.LOG_NEXT_CHECK = 0
- msg.info("MODULES_DIR='" + MODULES_DIR +
- "', CONFIG_DIR='" + CONFIG_DIR +
- "', UPDATE_EVERY=" + str(BASE_CONFIG['update_every']) +
- ", ONLY_MODULES=" + str(modules))
-
- # run plugins
- charts = PythonCharts(modules, MODULES_DIR, CONFIG_DIR + "python.d/", disabled, enabled, default_run)
- charts.check()
- charts.create()
- charts.update()
- msg.fatal("finished")
+ while True:
+ if threading.active_count() <= 1 and not self.autodetect_jobs:
+ run_and_exit(Logger.info)('FINISHED')
+
+ sleep(self.sleep_time)
+ self.cleanup()
+ self.autodetect_retry()
+
+ def cleanup(self):
+ for job in self.dead_jobs:
+ self.delete_job(job)
+ for mod in self:
+ if not mod:
+ del self.modules[mod.name]
+
+ def autodetect_retry(self):
+ self.runs_counter += self.sleep_time
+ for job in self.autodetect_jobs:
+ if self.runs_counter % job.recheck_every == 0:
+ checked = self.check_job(job)
+ if checked:
+ created = self.create_job_charts(job)
+ if not created:
+ self.delete_job(job)
+ continue
+ job.start()
if __name__ == '__main__':
- run()
+ DEBUG, TRACE, OVERRIDE_UPDATE_EVERY, MODULES_TO_RUN = parse_cmd()
+ Logger = PythonDLogger()
+ if DEBUG:
+ Logger.logger.severity = 'DEBUG'
+ if TRACE:
+ Logger.log_traceback = True
+ Logger.info('Using python {version}'.format(version=PY_VERSION[0]))
+
+ plugin = Plugin()
+ plugin.start()