summaryrefslogtreecommitdiffstats
path: root/heartbeat/azure-events.in
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 07:52:36 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 07:52:36 +0000
commit7de03e4e519705301265c0415b3c0af85263a7ac (patch)
tree29d819c5227e3619d18a67d2a5dde963b3229dbe /heartbeat/azure-events.in
parentInitial commit. (diff)
downloadresource-agents-7de03e4e519705301265c0415b3c0af85263a7ac.tar.xz
resource-agents-7de03e4e519705301265c0415b3c0af85263a7ac.zip
Adding upstream version 1:4.13.0.upstream/1%4.13.0
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'heartbeat/azure-events.in')
-rw-r--r--heartbeat/azure-events.in847
1 files changed, 847 insertions, 0 deletions
diff --git a/heartbeat/azure-events.in b/heartbeat/azure-events.in
new file mode 100644
index 0000000..90acaba
--- /dev/null
+++ b/heartbeat/azure-events.in
@@ -0,0 +1,847 @@
+#!@PYTHON@ -tt
+#
+# Resource agent for monitoring Azure Scheduled Events
+#
+# License: GNU General Public License (GPL)
+# (c) 2018 Tobias Niekamp, Microsoft Corp.
+# and Linux-HA contributors
+
+import os
+import sys
+import time
+import subprocess
+import json
+try:
+ import urllib2
+ from urllib2 import URLError
+except ImportError:
+ import urllib.request as urllib2
+ from urllib.error import URLError
+import socket
+from collections import defaultdict
+
+OCF_FUNCTIONS_DIR = os.environ.get("OCF_FUNCTIONS_DIR", "%s/lib/heartbeat" % os.environ.get("OCF_ROOT"))
+sys.path.append(OCF_FUNCTIONS_DIR)
+import ocf
+
+##############################################################################
+
+
+VERSION = "0.10"
+USER_AGENT = "Pacemaker-ResourceAgent/%s %s" % (VERSION, ocf.distro())
+
+attr_globalPullState = "azure-events_globalPullState"
+attr_lastDocVersion = "azure-events_lastDocVersion"
+attr_curNodeState = "azure-events_curNodeState"
+attr_pendingEventIDs = "azure-events_pendingEventIDs"
+
+default_loglevel = ocf.logging.INFO
+default_relevantEventTypes = set(["Reboot", "Redeploy"])
+
+global_pullMaxAttempts = 3
+global_pullDelaySecs = 1
+
+##############################################################################
+
+class attrDict(defaultdict):
+ """
+ A wrapper for accessing dict keys like an attribute
+ """
+ def __init__(self, data):
+ super(attrDict, self).__init__(attrDict)
+ for d in data.keys():
+ self.__setattr__(d, data[d])
+
+ def __getattr__(self, key):
+ try:
+ return self[key]
+ except KeyError:
+ raise AttributeError(key)
+
+ def __setattr__(self, key, value):
+ self[key] = value
+
+##############################################################################
+
+class azHelper:
+ """
+ Helper class for Azure's metadata API (including Scheduled Events)
+ """
+ metadata_host = "http://169.254.169.254/metadata"
+ instance_api = "instance"
+ events_api = "scheduledevents"
+ api_version = "2019-08-01"
+
+ @staticmethod
+ def _sendMetadataRequest(endpoint, postData=None):
+ """
+ Send a request to Azure's Azure Metadata Service API
+ """
+ url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, azHelper.api_version)
+ data = ""
+ ocf.logger.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData))
+ ocf.logger.debug("_sendMetadataRequest: url = %s" % url)
+
+ if postData and type(postData) != bytes:
+ postData = postData.encode()
+
+ req = urllib2.Request(url, postData)
+ req.add_header("Metadata", "true")
+ req.add_header("User-Agent", USER_AGENT)
+ try:
+ resp = urllib2.urlopen(req)
+ except URLError as e:
+ if hasattr(e, 'reason'):
+ ocf.logger.warning("Failed to reach the server: %s" % e.reason)
+ clusterHelper.setAttr(attr_globalPullState, "IDLE")
+ elif hasattr(e, 'code'):
+ ocf.logger.warning("The server couldn\'t fulfill the request. Error code: %s" % e.code)
+ clusterHelper.setAttr(attr_globalPullState, "IDLE")
+ else:
+ data = resp.read()
+ ocf.logger.debug("_sendMetadataRequest: response = %s" % data)
+
+ if data:
+ data = json.loads(data)
+
+ ocf.logger.debug("_sendMetadataRequest: finished")
+ return data
+
+ @staticmethod
+ def getInstanceInfo():
+ """
+ Fetch details about the current VM from Azure's Azure Metadata Service API
+ """
+ ocf.logger.debug("getInstanceInfo: begin")
+
+ jsondata = azHelper._sendMetadataRequest(azHelper.instance_api)
+ ocf.logger.debug("getInstanceInfo: json = %s" % jsondata)
+
+ if jsondata:
+ ocf.logger.debug("getInstanceInfo: finished, returning {}".format(jsondata["compute"]))
+ return attrDict(jsondata["compute"])
+ else:
+ ocf.ocf_exit_reason("getInstanceInfo: Unable to get instance info")
+ sys.exit(ocf.OCF_ERR_GENERIC)
+
+ @staticmethod
+ def pullScheduledEvents():
+ """
+ Retrieve all currently scheduled events via Azure Metadata Service API
+ """
+ ocf.logger.debug("pullScheduledEvents: begin")
+
+ jsondata = azHelper._sendMetadataRequest(azHelper.events_api)
+ ocf.logger.debug("pullScheduledEvents: json = %s" % jsondata)
+
+ ocf.logger.debug("pullScheduledEvents: finished")
+ return attrDict(jsondata)
+
+ @staticmethod
+ def forceEvents(eventIDs):
+ """
+ Force a set of events to start immediately
+ """
+ ocf.logger.debug("forceEvents: begin")
+
+ events = []
+ for e in eventIDs:
+ events.append({
+ "EventId": e,
+ })
+ postData = {
+ "StartRequests" : events
+ }
+ ocf.logger.info("forceEvents: postData = %s" % postData)
+ resp = azHelper._sendMetadataRequest(azHelper.events_api, postData=json.dumps(postData))
+
+ ocf.logger.debug("forceEvents: finished")
+ return
+
+##############################################################################
+
+class clusterHelper:
+ """
+ Helper functions for Pacemaker control via crm
+ """
+ @staticmethod
+ def _getLocation(node):
+ """
+ Helper function to retrieve local/global attributes
+ """
+ if node:
+ return ["--node", node]
+ else:
+ return ["--type", "crm_config"]
+
+ @staticmethod
+ def _exec(command, *args):
+ """
+ Helper function to execute a UNIX command
+ """
+ args = list(args)
+ ocf.logger.debug("_exec: begin; command = %s, args = %s" % (command, str(args)))
+
+ def flatten(*n):
+ return (str(e) for a in n
+ for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),)))
+ command = list(flatten([command] + args))
+ ocf.logger.debug("_exec: cmd = %s" % " ".join(command))
+ try:
+ ret = subprocess.check_output(command)
+ if type(ret) != str:
+ ret = ret.decode()
+ ocf.logger.debug("_exec: return = %s" % ret)
+ return ret.rstrip()
+ except Exception as err:
+ ocf.logger.exception(err)
+ return None
+
+ @staticmethod
+ def setAttr(key, value, node=None):
+ """
+ Set the value of a specific global/local attribute in the Pacemaker cluster
+ """
+ ocf.logger.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node))
+
+ if value:
+ ret = clusterHelper._exec("crm_attribute",
+ "--name", key,
+ "--update", value,
+ clusterHelper._getLocation(node))
+ else:
+ ret = clusterHelper._exec("crm_attribute",
+ "--name", key,
+ "--delete",
+ clusterHelper._getLocation(node))
+
+ ocf.logger.debug("setAttr: finished")
+ return len(ret) == 0
+
+ @staticmethod
+ def getAttr(key, node=None):
+ """
+ Retrieve a global/local attribute from the Pacemaker cluster
+ """
+ ocf.logger.debug("getAttr: begin; key = %s, node = %s" % (key, node))
+
+ val = clusterHelper._exec("crm_attribute",
+ "--name", key,
+ "--query", "--quiet",
+ "--default", "",
+ clusterHelper._getLocation(node))
+ ocf.logger.debug("getAttr: finished")
+ if not val:
+ return None
+ return val if not val.isdigit() else int(val)
+
+ @staticmethod
+ def getAllNodes():
+ """
+ Get a list of hostnames for all nodes in the Pacemaker cluster
+ """
+ ocf.logger.debug("getAllNodes: begin")
+
+ nodes = []
+ nodeList = clusterHelper._exec("crm_node", "--list")
+ for n in nodeList.split("\n"):
+ nodes.append(n.split()[1])
+ ocf.logger.debug("getAllNodes: finished; return %s" % str(nodes))
+
+ return nodes
+
+ @staticmethod
+ def getHostNameFromAzName(azName):
+ """
+ Helper function to get the actual host name from an Azure node name
+ """
+ return clusterHelper.getAttr("hostName_%s" % azName)
+
+ @staticmethod
+ def removeHoldFromNodes():
+ """
+ Remove the ON_HOLD state from all nodes in the Pacemaker cluster
+ """
+ ocf.logger.debug("removeHoldFromNodes: begin")
+
+ for n in clusterHelper.getAllNodes():
+ if clusterHelper.getAttr(attr_curNodeState, node=n) == "ON_HOLD":
+ clusterHelper.setAttr(attr_curNodeState, "AVAILABLE", node=n)
+ ocf.logger.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n)
+
+ ocf.logger.debug("removeHoldFromNodes: finished")
+ return False
+
+ @staticmethod
+ def otherNodesAvailable(exceptNode):
+ """
+ Check if there are any nodes (except a given node) in the Pacemaker cluster that have state AVAILABLE
+ """
+ ocf.logger.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode)
+
+ for n in clusterHelper.getAllNodes():
+ state = clusterHelper.getAttr(attr_curNodeState, node=n)
+ state = stringToNodeState(state) if state else AVAILABLE
+ if state == AVAILABLE and n != exceptNode.hostName:
+ ocf.logger.info("otherNodesAvailable: at least %s is available" % n)
+ ocf.logger.debug("otherNodesAvailable: finished")
+ return True
+ ocf.logger.info("otherNodesAvailable: no other nodes are available")
+ ocf.logger.debug("otherNodesAvailable: finished")
+
+ return False
+
+ @staticmethod
+ def transitionSummary():
+ """
+ Get the current Pacemaker transition summary (used to check if all resources are stopped when putting a node standby)
+ """
+ # <tniek> Is a global crm_simulate "too much"? Or would it be sufficient it there are no planned transitions for a particular node?
+ # # crm_simulate -LS
+ # Transition Summary:
+ # * Promote rsc_SAPHana_HN1_HDB03:0 (Slave -> Master hsr3-db1)
+ # * Stop rsc_SAPHana_HN1_HDB03:1 (hsr3-db0)
+ # * Move rsc_ip_HN1_HDB03 (Started hsr3-db0 -> hsr3-db1)
+ # * Start rsc_nc_HN1_HDB03 (hsr3-db1)
+ # # Excepted result when there are no pending actions:
+ # Transition Summary:
+ ocf.logger.debug("transitionSummary: begin")
+
+ summary = clusterHelper._exec("crm_simulate", "-LS")
+ if not summary:
+ ocf.logger.warning("transitionSummary: could not load transition summary")
+ return ""
+ if summary.find("Transition Summary:") < 0:
+ ocf.logger.debug("transitionSummary: no transactions: %s" % summary)
+ return ""
+ j=summary.find('Transition Summary:') + len('Transition Summary:')
+ l=summary.lower().find('executing cluster transition:')
+ ret = list(filter(str.strip, summary[j:l].split("\n")))
+
+ ocf.logger.debug("transitionSummary: finished; return = %s" % str(ret))
+ return ret
+
+ @staticmethod
+ def listOperationsOnNode(node):
+ """
+ Get a list of all current operations for a given node (used to check if any resources are pending)
+ """
+ # hsr3-db1:/home/tniek # crm_resource --list-operations -N hsr3-db0
+ # rsc_azure-events (ocf::heartbeat:azure-events): Started: rsc_azure-events_start_0 (node=hsr3-db0, call=91, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=115ms): complete
+ # rsc_azure-events (ocf::heartbeat:azure-events): Started: rsc_azure-events_monitor_10000 (node=hsr3-db0, call=93, rc=0, last-rc-change=Fri Jun 8 22:37:47 2018, exec=197ms): complete
+ # rsc_SAPHana_HN1_HDB03 (ocf::suse:SAPHana): Master: rsc_SAPHana_HN1_HDB03_start_0 (node=hsr3-db0, call=-1, rc=193, last-rc-change=Fri Jun 8 22:37:46 2018, exec=0ms): pending
+ # rsc_SAPHanaTopology_HN1_HDB03 (ocf::suse:SAPHanaTopology): Started: rsc_SAPHanaTopology_HN1_HDB03_start_0 (node=hsr3-db0, call=90, rc=0, last-rc-change=Fri Jun 8 22:37:46 2018, exec=3214ms): complete
+ ocf.logger.debug("listOperationsOnNode: begin; node = %s" % node)
+
+ resources = clusterHelper._exec("crm_resource", "--list-operations", "-N", node)
+ if len(resources) == 0:
+ ret = []
+ else:
+ ret = resources.split("\n")
+
+ ocf.logger.debug("listOperationsOnNode: finished; return = %s" % str(ret))
+ return ret
+
+ @staticmethod
+ def noPendingResourcesOnNode(node):
+ """
+ Check that there are no pending resources on a given node
+ """
+ ocf.logger.debug("noPendingResourcesOnNode: begin; node = %s" % node)
+
+ for r in clusterHelper.listOperationsOnNode(node):
+ ocf.logger.debug("noPendingResourcesOnNode: * %s" % r)
+ resource = r.split()[-1]
+ if resource == "pending":
+ ocf.logger.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource)
+ ocf.logger.debug("noPendingResourcesOnNode: finished; return = False")
+ return False
+ ocf.logger.info("noPendingResourcesOnNode: no pending resources on node %s" % node)
+ ocf.logger.debug("noPendingResourcesOnNode: finished; return = True")
+
+ return True
+
+ @staticmethod
+ def allResourcesStoppedOnNode(node):
+ """
+ Check that all resources on a given node are stopped
+ """
+ ocf.logger.debug("allResourcesStoppedOnNode: begin; node = %s" % node)
+
+ if clusterHelper.noPendingResourcesOnNode(node):
+ if len(clusterHelper.transitionSummary()) == 0:
+ ocf.logger.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node)
+ ocf.logger.debug("allResourcesStoppedOnNode: finished; return = True")
+ return True
+ ocf.logger.info("allResourcesStoppedOnNode: transition summary is not empty")
+ ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False")
+ return False
+
+ ocf.logger.info("allResourcesStoppedOnNode: still pending resources on node %s" % node)
+ ocf.logger.debug("allResourcesStoppedOnNode: finished; return = False")
+ return False
+
+##############################################################################
+
+AVAILABLE = 0 # Node is online and ready to handle events
+STOPPING = 1 # Standby has been triggered, but some resources are still running
+IN_EVENT = 2 # All resources are stopped, and event has been initiated via Azure Metadata Service
+ON_HOLD = 3 # Node has a pending event that cannot be started there are no other nodes available
+
+def stringToNodeState(name):
+ if type(name) == int: return name
+ if name == "STOPPING": return STOPPING
+ if name == "IN_EVENT": return IN_EVENT
+ if name == "ON_HOLD": return ON_HOLD
+ return AVAILABLE
+
+def nodeStateToString(state):
+ if state == STOPPING: return "STOPPING"
+ if state == IN_EVENT: return "IN_EVENT"
+ if state == ON_HOLD: return "ON_HOLD"
+ return "AVAILABLE"
+
+##############################################################################
+
+class Node:
+ """
+ Core class implementing logic for a cluster node
+ """
+ def __init__(self, ra):
+ self.raOwner = ra
+ self.azInfo = azHelper.getInstanceInfo()
+ self.azName = self.azInfo.name
+ self.hostName = socket.gethostname()
+ self.setAttr("azName", self.azName)
+ clusterHelper.setAttr("hostName_%s" % self.azName, self.hostName)
+
+ def getAttr(self, key):
+ """
+ Get a local attribute
+ """
+ return clusterHelper.getAttr(key, node=self.hostName)
+
+ def setAttr(self, key, value):
+ """
+ Set a local attribute
+ """
+ return clusterHelper.setAttr(key, value, node=self.hostName)
+
+ def selfOrOtherNode(self, node):
+ """
+ Helper function to distinguish self/other node
+ """
+ return node if node else self.hostName
+
+ def setState(self, state, node=None):
+ """
+ Set the state for a given node (or self)
+ """
+ node = self.selfOrOtherNode(node)
+ ocf.logger.debug("setState: begin; node = %s, state = %s" % (node, nodeStateToString(state)))
+
+ clusterHelper.setAttr(attr_curNodeState, nodeStateToString(state), node=node)
+
+ ocf.logger.debug("setState: finished")
+
+ def getState(self, node=None):
+ """
+ Get the state for a given node (or self)
+ """
+ node = self.selfOrOtherNode(node)
+ ocf.logger.debug("getState: begin; node = %s" % node)
+
+ state = clusterHelper.getAttr(attr_curNodeState, node=node)
+ ocf.logger.debug("getState: state = %s" % state)
+ ocf.logger.debug("getState: finished")
+ if not state:
+ return AVAILABLE
+ return stringToNodeState(state)
+
+ def setEventIDs(self, eventIDs, node=None):
+ """
+ Set pending EventIDs for a given node (or self)
+ """
+ node = self.selfOrOtherNode(node)
+ ocf.logger.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs)))
+
+ if eventIDs:
+ eventIDStr = ",".join(eventIDs)
+ else:
+ eventIDStr = None
+ clusterHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node)
+
+ ocf.logger.debug("setEventIDs: finished")
+ return
+
+ def getEventIDs(self, node=None):
+ """
+ Get pending EventIDs for a given node (or self)
+ """
+ node = self.selfOrOtherNode(node)
+ ocf.logger.debug("getEventIDs: begin; node = %s" % node)
+
+ eventIDStr = clusterHelper.getAttr(attr_pendingEventIDs, node=node)
+ if eventIDStr:
+ eventIDs = eventIDStr.split(",")
+ else:
+ eventIDs = None
+
+ ocf.logger.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs))
+ return eventIDs
+
+ def updateNodeStateAndEvents(self, state, eventIDs, node=None):
+ """
+ Set the state and pending EventIDs for a given node (or self)
+ """
+ ocf.logger.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, nodeStateToString(state), str(eventIDs)))
+
+ self.setState(state, node=node)
+ self.setEventIDs(eventIDs, node=node)
+
+ ocf.logger.debug("updateNodeStateAndEvents: finished")
+ return state
+
+ def putNodeStandby(self, node=None):
+ """
+ Put self to standby
+ """
+ node = self.selfOrOtherNode(node)
+ ocf.logger.debug("putNodeStandby: begin; node = %s" % node)
+
+ clusterHelper._exec("crm_attribute",
+ "-t", "nodes",
+ "-N", node,
+ "-n", "standby",
+ "-v", "on",
+ "--lifetime=forever")
+
+ ocf.logger.debug("putNodeStandby: finished")
+
+ def putNodeOnline(self, node=None):
+ """
+ Put self back online
+ """
+ node = self.selfOrOtherNode(node)
+ ocf.logger.debug("putNodeOnline: begin; node = %s" % node)
+
+ clusterHelper._exec("crm_attribute",
+ "-t", "nodes",
+ "-N", node,
+ "-n", "standby",
+ "-v", "off",
+ "--lifetime=forever")
+
+ ocf.logger.debug("putNodeOnline: finished")
+
+ def separateEvents(self, events):
+ """
+ Split own/other nodes' events
+ """
+ ocf.logger.debug("separateEvents: begin; events = %s" % str(events))
+
+ localEvents = []
+ remoteEvents = []
+ for e in events:
+ e = attrDict(e)
+ if e.EventType not in self.raOwner.relevantEventTypes:
+ continue
+ if self.azName in e.Resources:
+ localEvents.append(e)
+ else:
+ remoteEvents.append(e)
+ ocf.logger.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents)))
+ return (localEvents, remoteEvents)
+
+ def removeOrphanedEvents(self, azEvents):
+ """
+ Remove remote events that are already finished
+ """
+ ocf.logger.debug("removeOrphanedEvents: begin; azEvents = %s" % str(azEvents))
+
+ azEventIDs = set()
+ for e in azEvents:
+ azEventIDs.add(e.EventId)
+ # for all nodes except self ...
+ for n in clusterHelper.getAllNodes():
+ if n == self.hostName:
+ continue
+ curState = self.getState(node=n)
+ # ... that still show in an event or shutting down resources ...
+ if curState in (STOPPING, IN_EVENT):
+ ocf.logger.info("removeOrphanedEvents: node %s has state %s" % (n, curState))
+ clusterEventIDs = self.getEventIDs(node=n)
+ stillActive = False
+ # ... but don't have any more events running according to Azure, ...
+ for p in clusterEventIDs:
+ if p in azEventIDs:
+ ocf.logger.info("removeOrphanedEvents: (at least) event %s on node %s has not yet finished" % (str(p), n))
+ stillActive = True
+ break
+ if not stillActive:
+ # ... put them back online.
+ ocf.logger.info("removeOrphanedEvents: clusterEvents %s on node %s are not in azEvents %s -> bring node back online" % (str(clusterEventIDs), n, str(azEventIDs)))
+ self.putNodeOnline(node=n)
+
+ ocf.logger.debug("removeOrphanedEvents: finished")
+
+ def handleRemoteEvents(self, azEvents):
+ """
+ Handle a list of events (as provided by Azure Metadata Service) for other nodes
+ """
+ ocf.logger.debug("handleRemoteEvents: begin; hostName = %s, events = %s" % (self.hostName, str(azEvents)))
+
+ if len(azEvents) == 0:
+ ocf.logger.debug("handleRemoteEvents: no remote events to handle")
+ ocf.logger.debug("handleRemoteEvents: finished")
+ return
+ eventIDsForNode = {}
+
+ # iterate through all current events as per Azure
+ for e in azEvents:
+ ocf.logger.info("handleRemoteEvents: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources)))
+ # before we can force an event to start, we need to ensure all nodes involved have stopped their resources
+ if e.EventStatus == "Scheduled":
+ allNodesStopped = True
+ for azName in e.Resources:
+ hostName = clusterHelper.getHostNameFromAzName(azName)
+ state = self.getState(node=hostName)
+ if state == STOPPING:
+ # the only way we can continue is when node state is STOPPING, but all resources have been stopped
+ if not clusterHelper.allResourcesStoppedOnNode(hostName):
+ ocf.logger.info("handleRemoteEvents: (at least) node %s has still resources running -> wait" % hostName)
+ allNodesStopped = False
+ break
+ elif state in (AVAILABLE, IN_EVENT, ON_HOLD):
+ ocf.logger.info("handleRemoteEvents: node %s is still %s -> remote event needs to be picked up locally" % (hostName, nodeStateToString(state)))
+ allNodesStopped = False
+ break
+ if allNodesStopped:
+ ocf.logger.info("handleRemoteEvents: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId))
+ for n in e.Resources:
+ hostName = clusterHelper.getHostNameFromAzName(n)
+ if hostName in eventIDsForNode:
+ eventIDsForNode[hostName].append(e.EventId)
+ else:
+ eventIDsForNode[hostName] = [e.EventId]
+ elif e.EventStatus == "Started":
+ ocf.logger.info("handleRemoteEvents: remote event already started")
+
+ # force the start of all events whose nodes are ready (i.e. have no more resources running)
+ if len(eventIDsForNode.keys()) > 0:
+ eventIDsToForce = set([item for sublist in eventIDsForNode.values() for item in sublist])
+ ocf.logger.info("handleRemoteEvents: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce)))
+ for node, eventId in eventIDsForNode.items():
+ self.updateNodeStateAndEvents(IN_EVENT, eventId, node=node)
+ azHelper.forceEvents(eventIDsToForce)
+
+ ocf.logger.debug("handleRemoteEvents: finished")
+
+ def handleLocalEvents(self, azEvents):
+ """
+ Handle a list of own events (as provided by Azure Metadata Service)
+ """
+ ocf.logger.debug("handleLocalEvents: begin; hostName = %s, azEvents = %s" % (self.hostName, str(azEvents)))
+
+ azEventIDs = set()
+ for e in azEvents:
+ azEventIDs.add(e.EventId)
+
+ curState = self.getState()
+ clusterEventIDs = self.getEventIDs()
+ mayUpdateDocVersion = False
+ ocf.logger.info("handleLocalEvents: current state = %s; pending local clusterEvents = %s" % (nodeStateToString(curState), str(clusterEventIDs)))
+
+ # check if there are currently/still events set for the node
+ if clusterEventIDs:
+ # there are pending events set, so our state must be STOPPING or IN_EVENT
+ i = 0; touchedEventIDs = False
+ while i < len(clusterEventIDs):
+ # clean up pending events that are already finished according to AZ
+ if clusterEventIDs[i] not in azEventIDs:
+ ocf.logger.info("handleLocalEvents: remove finished local clusterEvent %s" % (clusterEventIDs[i]))
+ clusterEventIDs.pop(i)
+ touchedEventIDs = True
+ else:
+ i += 1
+ if len(clusterEventIDs) > 0:
+ # there are still pending events (either because we're still stopping, or because the event is still in place)
+ # either way, we need to wait
+ if touchedEventIDs:
+ ocf.logger.info("handleLocalEvents: added new local clusterEvent %s" % str(clusterEventIDs))
+ self.setEventIDs(clusterEventIDs)
+ else:
+ ocf.logger.info("handleLocalEvents: no local clusterEvents were updated")
+ else:
+ # there are no more pending events left after cleanup
+ if clusterHelper.noPendingResourcesOnNode(self.hostName):
+ # and no pending resources on the node -> set it back online
+ ocf.logger.info("handleLocalEvents: all local events finished -> clean up, put node online and AVAILABLE")
+ curState = self.updateNodeStateAndEvents(AVAILABLE, None)
+ self.putNodeOnline()
+ clusterHelper.removeHoldFromNodes()
+ # repeat handleLocalEvents() since we changed status to AVAILABLE
+ else:
+ ocf.logger.info("handleLocalEvents: all local events finished, but some resources have not completed startup yet -> wait")
+ else:
+ # there are no pending events set for us (yet)
+ if curState == AVAILABLE:
+ if len(azEventIDs) > 0:
+ if clusterHelper.otherNodesAvailable(self):
+ ocf.logger.info("handleLocalEvents: can handle local events %s -> set state STOPPING" % (str(azEventIDs)))
+ # this will also set mayUpdateDocVersion = True
+ curState = self.updateNodeStateAndEvents(STOPPING, azEventIDs)
+ else:
+ ocf.logger.info("handleLocalEvents: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(azEventIDs))
+ self.setState(ON_HOLD)
+ else:
+ ocf.logger.debug("handleLocalEvents: no local azEvents to handle")
+ if curState == STOPPING:
+ if clusterHelper.noPendingResourcesOnNode(self.hostName):
+ ocf.logger.info("handleLocalEvents: all local resources are started properly -> put node standby")
+ self.putNodeStandby()
+ mayUpdateDocVersion = True
+ else:
+ ocf.logger.info("handleLocalEvents: some local resources are not clean yet -> wait")
+
+ ocf.logger.debug("handleLocalEvents: finished; mayUpdateDocVersion = %s" % str(mayUpdateDocVersion))
+ return mayUpdateDocVersion
+
+##############################################################################
+
+class raAzEvents:
+ """
+ Main class for resource agent
+ """
+ def __init__(self, relevantEventTypes):
+ self.node = Node(self)
+ self.relevantEventTypes = relevantEventTypes
+
+ def monitor(self):
+ ocf.logger.debug("monitor: begin")
+
+ pullFailedAttemps = 0
+ while True:
+ # check if another node is pulling at the same time;
+ # this should only be a concern for the first pull, as setting up Scheduled Events may take up to 2 minutes.
+ if clusterHelper.getAttr(attr_globalPullState) == "PULLING":
+ pullFailedAttemps += 1
+ if pullFailedAttemps == global_pullMaxAttempts:
+ ocf.logger.warning("monitor: exceeded maximum number of attempts (%d) to pull events" % global_pullMaxAttempts)
+ ocf.logger.debug("monitor: finished")
+ return ocf.OCF_SUCCESS
+ else:
+ ocf.logger.info("monitor: another node is pulling; retry in %d seconds" % global_pullDelaySecs)
+ time.sleep(global_pullDelaySecs)
+ continue
+
+ # we can pull safely from Azure Metadata Service
+ clusterHelper.setAttr(attr_globalPullState, "PULLING")
+ events = azHelper.pullScheduledEvents()
+ clusterHelper.setAttr(attr_globalPullState, "IDLE")
+
+ # get current document version
+ curDocVersion = events.DocumentIncarnation
+ lastDocVersion = self.node.getAttr(attr_lastDocVersion)
+ ocf.logger.debug("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion))
+
+ # split events local/remote
+ (localEvents, remoteEvents) = self.node.separateEvents(events.Events)
+
+ # ensure local events are only executing once
+ if curDocVersion != lastDocVersion:
+ ocf.logger.debug("monitor: curDocVersion has not been handled yet")
+ # handleLocalEvents() returns True if mayUpdateDocVersion is True;
+ # this is only the case if we can ensure there are no pending events
+ if self.node.handleLocalEvents(localEvents):
+ ocf.logger.info("monitor: handleLocalEvents completed successfully -> update curDocVersion")
+ self.node.setAttr(attr_lastDocVersion, curDocVersion)
+ else:
+ ocf.logger.debug("monitor: handleLocalEvents still waiting -> keep curDocVersion")
+ else:
+ ocf.logger.info("monitor: already handled curDocVersion, skip")
+
+ # remove orphaned remote events and then handle the remaining remote events
+ self.node.removeOrphanedEvents(remoteEvents)
+ self.node.handleRemoteEvents(remoteEvents)
+ break
+
+ ocf.logger.debug("monitor: finished")
+ return ocf.OCF_SUCCESS
+
+##############################################################################
+
+def setLoglevel(verbose):
+ # set up writing into syslog
+ loglevel = default_loglevel
+ if verbose:
+ opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel=1))
+ urllib2.install_opener(opener)
+ loglevel = ocf.logging.DEBUG
+ ocf.log.setLevel(loglevel)
+
+description = (
+ "Microsoft Azure Scheduled Events monitoring agent",
+ """This resource agent implements a monitor for scheduled
+(maintenance) events for a Microsoft Azure VM.
+
+If any relevant events are found, it moves all Pacemaker resources
+away from the affected node to allow for a graceful shutdown.
+
+ Usage:
+ [OCF_RESKEY_eventTypes=VAL] [OCF_RESKEY_verbose=VAL] azure-events ACTION
+
+ action (required): Supported values: monitor, help, meta-data
+ eventTypes (optional): List of event types to be considered
+ relevant by the resource agent (comma-separated).
+ Supported values: Freeze,Reboot,Redeploy
+ Default = Reboot,Redeploy
+/ verbose (optional): If set to true, displays debug info.
+ Default = false
+
+ Deployment:
+ crm configure primitive rsc_azure-events ocf:heartbeat:azure-events \
+ op monitor interval=10s
+ crm configure clone cln_azure-events rsc_azure-events
+
+For further information on Microsoft Azure Scheduled Events, please
+refer to the following documentation:
+https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events
+""")
+
+def monitor_action(eventTypes):
+ relevantEventTypes = set(eventTypes.split(",") if eventTypes else [])
+ ra = raAzEvents(relevantEventTypes)
+ return ra.monitor()
+
+def validate_action(eventTypes):
+ if eventTypes:
+ for event in eventTypes.split(","):
+ if event not in ("Freeze", "Reboot", "Redeploy"):
+ ocf.ocf_exit_reason("Event type not one of Freeze, Reboot, Redeploy: " + eventTypes)
+ return ocf.OCF_ERR_CONFIGURED
+ return ocf.OCF_SUCCESS
+
+def main():
+ agent = ocf.Agent("azure-events", shortdesc=description[0], longdesc=description[1])
+ agent.add_parameter(
+ "eventTypes",
+ shortdesc="List of resources to be considered",
+ longdesc="A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy)",
+ content_type="string",
+ default="Reboot,Redeploy")
+ agent.add_parameter(
+ "verbose",
+ shortdesc="Enable verbose agent logging",
+ longdesc="Set to true to enable verbose logging",
+ content_type="boolean",
+ default="false")
+ agent.add_action("start", timeout=10, handler=lambda: ocf.OCF_SUCCESS)
+ agent.add_action("stop", timeout=10, handler=lambda: ocf.OCF_SUCCESS)
+ agent.add_action("validate-all", timeout=20, handler=validate_action)
+ agent.add_action("monitor", timeout=240, interval=10, handler=monitor_action)
+ setLoglevel(ocf.is_true(ocf.get_parameter("verbose", "false")))
+ agent.run()
+
+if __name__ == '__main__':
+ main()