summaryrefslogtreecommitdiffstats
path: root/src/pybind/mgr/rook
diff options
context:
space:
mode:
Diffstat (limited to 'src/pybind/mgr/rook')
-rw-r--r--src/pybind/mgr/rook/ci/Dockerfile3
-rw-r--r--src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml198
-rw-r--r--src/pybind/mgr/rook/ci/requirements.txt1
-rwxr-xr-xsrc/pybind/mgr/rook/ci/run-rook-e2e-tests.sh3
-rwxr-xr-xsrc/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh124
-rw-r--r--src/pybind/mgr/rook/ci/tests/features/cluster-prometheus-monitoring.feature14
-rw-r--r--src/pybind/mgr/rook/ci/tests/features/rook.feature59
-rw-r--r--src/pybind/mgr/rook/ci/tests/features/steps/implementation.py109
-rw-r--r--src/pybind/mgr/rook/ci/tests/features/steps/utils.py23
-rw-r--r--src/pybind/mgr/rook/module.py220
-rw-r--r--src/pybind/mgr/rook/rook_cluster.py382
11 files changed, 581 insertions, 555 deletions
diff --git a/src/pybind/mgr/rook/ci/Dockerfile b/src/pybind/mgr/rook/ci/Dockerfile
index 30ebea574..62ff0ab89 100644
--- a/src/pybind/mgr/rook/ci/Dockerfile
+++ b/src/pybind/mgr/rook/ci/Dockerfile
@@ -1,3 +1,4 @@
-FROM quay.io/ceph/daemon-base:latest-main
+FROM quay.io/ceph/daemon-base:latest-reef
COPY ./tmp_build/orchestrator /usr/share/ceph/mgr/orchestrator
COPY ./tmp_build/rook /usr/share/ceph/mgr/rook
+COPY ./tmp_build/ceph/ /usr/lib/python3.6/site-packages/ceph/
diff --git a/src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml b/src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml
new file mode 100644
index 000000000..2732286ab
--- /dev/null
+++ b/src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml
@@ -0,0 +1,198 @@
+#################################################################################################################
+# Define the settings for the rook-ceph cluster with settings for a minikube cluster with a single node
+
+# This example expects a single node minikube cluster with three extra disks: vdb, vdc and vdd. Please modify
+# it according to your environment. See the documentation for more details on storage settings available.
+
+# For example, to create the cluster:
+# kubectl create -f crds.yaml -f common.yaml -f operator.yaml
+# kubectl create -f cluster-on-pvc-minikube.yaml
+#################################################################################################################
+kind: StorageClass
+apiVersion: storage.k8s.io/v1
+metadata:
+ name: local-storage
+provisioner: kubernetes.io/no-provisioner
+volumeBindingMode: WaitForFirstConsumer
+---
+kind: PersistentVolume
+apiVersion: v1
+metadata:
+ name: local0-0
+spec:
+ storageClassName: local-storage
+ capacity:
+ storage: 10Gi
+ accessModes:
+ - ReadWriteOnce
+ persistentVolumeReclaimPolicy: Retain
+ # PV for mon must be a filesystem volume.
+ volumeMode: Filesystem
+ local:
+ # To use dm devices like logical volume, please replace `/dev/sdb` with their device names like `/dev/vg-name/lv-name`.
+ path: /dev/vdb
+ nodeAffinity:
+ required:
+ nodeSelectorTerms:
+ - matchExpressions:
+ - key: kubernetes.io/hostname
+ operator: In
+ values:
+ - minikube
+---
+kind: PersistentVolume
+apiVersion: v1
+metadata:
+ name: local0-1
+spec:
+ storageClassName: local-storage
+ capacity:
+ storage: 20Gi
+ accessModes:
+ - ReadWriteOnce
+ persistentVolumeReclaimPolicy: Retain
+ # PV for mon must be a filesystem volume.
+ volumeMode: Block
+ local:
+ # To use dm devices like logical volume, please replace `/dev/sdb` with their device names like `/dev/vg-name/lv-name`.
+ path: /dev/vdc
+ nodeAffinity:
+ required:
+ nodeSelectorTerms:
+ - matchExpressions:
+ - key: kubernetes.io/hostname
+ operator: In
+ values:
+ - minikube
+---
+kind: PersistentVolume
+apiVersion: v1
+metadata:
+ name: local0-2
+spec:
+ storageClassName: local-storage
+ capacity:
+ storage: 20Gi
+ accessModes:
+ - ReadWriteOnce
+ persistentVolumeReclaimPolicy: Retain
+ # PV for mon must be a filesystem volume.
+ volumeMode: Block
+ local:
+ # To use dm devices like logical volume, please replace `/dev/sdb` with their device names like `/dev/vg-name/lv-name`.
+ path: /dev/vdd
+ nodeAffinity:
+ required:
+ nodeSelectorTerms:
+ - matchExpressions:
+ - key: kubernetes.io/hostname
+ operator: In
+ values:
+ - minikube
+---
+kind: PersistentVolume
+apiVersion: v1
+metadata:
+ name: local0-3
+spec:
+ storageClassName: local-storage
+ capacity:
+ storage: 20Gi
+ accessModes:
+ - ReadWriteOnce
+ persistentVolumeReclaimPolicy: Retain
+ # PV for mon must be a filesystem volume.
+ volumeMode: Block
+ local:
+ # To use dm devices like logical volume, please replace `/dev/sdb` with their device names like `/dev/vg-name/lv-name`.
+ path: /dev/vde
+ nodeAffinity:
+ required:
+ nodeSelectorTerms:
+ - matchExpressions:
+ - key: kubernetes.io/hostname
+ operator: In
+ values:
+ - minikube
+---
+apiVersion: ceph.rook.io/v1
+kind: CephCluster
+metadata:
+ name: my-cluster
+ namespace: rook-ceph # namespace:cluster
+spec:
+ dataDirHostPath: /var/lib/rook
+ mon:
+ count: 1
+ allowMultiplePerNode: true
+ volumeClaimTemplate:
+ spec:
+ storageClassName: local-storage
+ resources:
+ requests:
+ storage: 10Gi
+ mgr:
+ count: 1
+ modules:
+ - name: pg_autoscaler
+ enabled: true
+ dashboard:
+ enabled: true
+ ssl: false
+ crashCollector:
+ disable: false
+ cephVersion:
+ image: quay.io/ceph/daemon-base:latest-main
+ allowUnsupported: true
+ skipUpgradeChecks: false
+ continueUpgradeAfterChecksEvenIfNotHealthy: false
+ storage:
+ storageClassDeviceSets:
+ - name: set1
+ count: 3
+ portable: false
+ tuneDeviceClass: true
+ tuneFastDeviceClass: false
+ encrypted: false
+ placement:
+ preparePlacement:
+ volumeClaimTemplates:
+ - metadata:
+ name: data
+ # if you are looking at giving your OSD a different CRUSH device class than the one detected by Ceph
+ # annotations:
+ # crushDeviceClass: hybrid
+ spec:
+ resources:
+ requests:
+ storage: 20Gi
+ # IMPORTANT: Change the storage class depending on your environment
+ storageClassName: local-storage
+ volumeMode: Block
+ accessModes:
+ - ReadWriteOnce
+ # when onlyApplyOSDPlacement is false, will merge both placement.All() and storageClassDeviceSets.Placement
+ onlyApplyOSDPlacement: false
+ priorityClassNames:
+ mon: system-node-critical
+ osd: system-node-critical
+ mgr: system-cluster-critical
+ disruptionManagement:
+ managePodBudgets: true
+ osdMaintenanceTimeout: 30
+ pgHealthCheckTimeout: 0
+ cephConfig:
+ global:
+ mon_warn_on_pool_no_redundancy: "false"
+---
+apiVersion: ceph.rook.io/v1
+kind: CephBlockPool
+metadata:
+ name: builtin-mgr
+ namespace: rook-ceph # namespace:cluster
+spec:
+ name: .mgr
+ failureDomain: osd
+ replicated:
+ size: 1
+ requireSafeReplicaSize: false
diff --git a/src/pybind/mgr/rook/ci/requirements.txt b/src/pybind/mgr/rook/ci/requirements.txt
new file mode 100644
index 000000000..9684f7742
--- /dev/null
+++ b/src/pybind/mgr/rook/ci/requirements.txt
@@ -0,0 +1 @@
+behave
diff --git a/src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh b/src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh
index a43e01a89..58d554757 100755
--- a/src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh
+++ b/src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh
@@ -2,8 +2,11 @@
set -ex
+export PATH=$PATH:~/.local/bin # behave is installed on this directory
+
# Execute tests
: ${CEPH_DEV_FOLDER:=${PWD}}
${CEPH_DEV_FOLDER}/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh
cd ${CEPH_DEV_FOLDER}/src/pybind/mgr/rook/ci/tests
+pip install --upgrade --force-reinstall -r ../requirements.txt
behave
diff --git a/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh b/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh
index 4b97df6ba..24a6a5da2 100755
--- a/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh
+++ b/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh
@@ -3,7 +3,10 @@
set -eEx
: ${CEPH_DEV_FOLDER:=${PWD}}
+CLUSTER_SPEC=${CEPH_DEV_FOLDER}/src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml
+DEFAULT_NS="rook-ceph"
KUBECTL="minikube kubectl --"
+export ROOK_CLUSTER_NS="${ROOK_CLUSTER_NS:=$DEFAULT_NS}" ## CephCluster namespace
# We build a local ceph image that contains the latest code
# plus changes from the PR. This image will be used by the docker
@@ -15,14 +18,6 @@ on_error() {
minikube delete
}
-configure_libvirt(){
- sudo usermod -aG libvirt $(id -un)
- sudo su -l $USER # Avoid having to log out and log in for group addition to take effect.
- sudo systemctl enable --now libvirtd
- sudo systemctl restart libvirtd
- sleep 10 # wait some time for libvirtd service to restart
-}
-
setup_minikube_env() {
# Check if Minikube is running
@@ -35,20 +30,21 @@ setup_minikube_env() {
fi
rm -rf ~/.minikube
- minikube start --memory="4096" --cpus="2" --disk-size=10g --extra-disks=1 --driver kvm2
+ minikube start --memory="6144" --disk-size=20g --extra-disks=4 --driver kvm2
# point Docker env to use docker daemon running on minikube
eval $(minikube docker-env -p minikube)
}
build_ceph_image() {
- wget -q -O cluster-test.yaml https://raw.githubusercontent.com/rook/rook/master/deploy/examples/cluster-test.yaml
- CURR_CEPH_IMG=$(grep -E '^\s*image:\s+' cluster-test.yaml | sed 's/.*image: *\([^ ]*\)/\1/')
+
+ CURR_CEPH_IMG=$(grep -E '^\s*image:\s+' $CLUSTER_SPEC | sed 's/.*image: *\([^ ]*\)/\1/')
cd ${CEPH_DEV_FOLDER}/src/pybind/mgr/rook/ci
mkdir -p tmp_build/rook
mkdir -p tmp_build/orchestrator
cp ./../../orchestrator/*.py tmp_build/orchestrator
cp ../*.py tmp_build/rook
+ cp -r ../../../../../src/python-common/ceph/ tmp_build/
# we use the following tag to trick the Docker
# running inside minikube so it uses this image instead
@@ -62,28 +58,39 @@ build_ceph_image() {
}
create_rook_cluster() {
- wget -q -O cluster-test.yaml https://raw.githubusercontent.com/rook/rook/master/deploy/examples/cluster-test.yaml
$KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/crds.yaml
$KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/common.yaml
$KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/operator.yaml
- $KUBECTL create -f cluster-test.yaml
- $KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/dashboard-external-http.yaml
+ $KUBECTL create -f $CLUSTER_SPEC
$KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/toolbox.yaml
}
+is_operator_ready() {
+ local phase
+ phase=$($KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.phase}')
+ echo "PHASE: $phase"
+ [[ "$phase" == "Ready" ]]
+}
+
wait_for_rook_operator() {
local max_attempts=10
local sleep_interval=20
local attempts=0
+
$KUBECTL rollout status deployment rook-ceph-operator -n rook-ceph --timeout=180s
- PHASE=$($KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.phase}')
- echo "PHASE: $PHASE"
- while ! $KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.phase}' | grep -q "Ready"; do
- echo "Waiting for cluster to be ready..."
- sleep $sleep_interval
- attempts=$((attempts+1))
+
+ while ! is_operator_ready; do
+ echo "Waiting for rook operator to be ready..."
+ sleep $sleep_interval
+
+ # log current cluster state and pods info for debugging
+ PHASE=$($KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.phase}')
+ $KUBECTL -n rook-ceph get pods
+
+ attempts=$((attempts + 1))
if [ $attempts -ge $max_attempts ]; then
echo "Maximum number of attempts ($max_attempts) reached. Exiting..."
+ $KUBECTL -n rook-ceph get pods | grep operator | awk '{print $1}' | xargs $KUBECTL -n rook-ceph logs
return 1
fi
done
@@ -93,9 +100,9 @@ wait_for_ceph_cluster() {
local max_attempts=10
local sleep_interval=20
local attempts=0
- $KUBECTL rollout status deployment rook-ceph-tools -n rook-ceph --timeout=30s
+ $KUBECTL rollout status deployment rook-ceph-tools -n rook-ceph --timeout=90s
while ! $KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.ceph.health}' | grep -q "HEALTH_OK"; do
- echo "Waiting for Ceph cluster installed"
+ echo "Waiting for Ceph cluster to enter HEALTH_OK" state
sleep $sleep_interval
attempts=$((attempts+1))
if [ $attempts -ge $max_attempts ]; then
@@ -104,18 +111,66 @@ wait_for_ceph_cluster() {
fi
done
echo "Ceph cluster installed and running"
+
+ # add an additional wait to cover with any subttle change in the state
+ sleep 20
+}
+
+configure_libvirt(){
+ if sudo usermod -aG libvirt $(id -un); then
+ echo "User added to libvirt group successfully."
+ sudo systemctl enable --now libvirtd
+ sudo systemctl restart libvirtd
+ sleep 30 # wait some time for libvirtd service to restart
+ newgrp libvirt
+ else
+ echo "Error adding user to libvirt group."
+ return 1
+ fi
+}
+
+recreate_default_network(){
+
+ # destroy any existing kvm default network
+ if sudo virsh net-destroy default; then
+ sudo virsh net-undefine default
+ fi
+
+ # let's create a new kvm default network
+ sudo virsh net-define /usr/share/libvirt/networks/default.xml
+ if sudo virsh net-start default; then
+ echo "Network 'default' started successfully."
+ else
+ # Optionally, handle the error
+ echo "Failed to start network 'default', but continuing..."
+ fi
+
+ # restart libvirtd service and wait a little bit for the service
+ sudo systemctl restart libvirtd
+ sleep 30
+
+ # Just some debugging information
+ all_networks=$(virsh net-list --all)
+ groups=$(groups)
+}
+
+enable_rook_orchestrator() {
+ echo "Enabling rook orchestrator"
+ $KUBECTL rollout status deployment rook-ceph-tools -n "$ROOK_CLUSTER_NS" --timeout=90s
+ $KUBECTL -n "$ROOK_CLUSTER_NS" exec -it deploy/rook-ceph-tools -- ceph mgr module enable rook
+ $KUBECTL -n "$ROOK_CLUSTER_NS" exec -it deploy/rook-ceph-tools -- ceph orch set backend rook
+ $KUBECTL -n "$ROOK_CLUSTER_NS" exec -it deploy/rook-ceph-tools -- ceph orch status
}
-show_info() {
- DASHBOARD_PASSWORD=$($KUBECTL -n rook-ceph get secret rook-ceph-dashboard-password -o jsonpath="{['data']['password']}" | base64 --decode && echo)
- IP_ADDR=$($KUBECTL get po --selector="app=rook-ceph-mgr" -n rook-ceph --output jsonpath='{.items[*].status.hostIP}')
- PORT="$($KUBECTL -n rook-ceph -o=jsonpath='{.spec.ports[?(@.name == "dashboard")].nodePort}' get services rook-ceph-mgr-dashboard-external-http)"
- BASE_URL="http://$IP_ADDR:$PORT"
- echo "==========================="
- echo "Ceph Dashboard: "
- echo " IP_ADDRESS: $BASE_URL"
- echo " PASSWORD: $DASHBOARD_PASSWORD"
- echo "==========================="
+enable_monitoring() {
+ echo "Enabling monitoring"
+ $KUBECTL apply -f https://raw.githubusercontent.com/coreos/prometheus-operator/v0.40.0/bundle.yaml
+ $KUBECTL wait --for=condition=ready pod -l app.kubernetes.io/name=prometheus-operator --timeout=90s
+ $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/rbac.yaml
+ $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/service-monitor.yaml
+ $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/exporter-service-monitor.yaml
+ $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/prometheus.yaml
+ $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/prometheus-service.yaml
}
####################################################################
@@ -124,12 +179,15 @@ show_info() {
trap 'on_error $? $LINENO' ERR
configure_libvirt
+recreate_default_network
setup_minikube_env
build_ceph_image
create_rook_cluster
wait_for_rook_operator
wait_for_ceph_cluster
-show_info
+enable_rook_orchestrator
+enable_monitoring
+sleep 30 # wait for the metrics cache warmup
####################################################################
####################################################################
diff --git a/src/pybind/mgr/rook/ci/tests/features/cluster-prometheus-monitoring.feature b/src/pybind/mgr/rook/ci/tests/features/cluster-prometheus-monitoring.feature
new file mode 100644
index 000000000..5180c7293
--- /dev/null
+++ b/src/pybind/mgr/rook/ci/tests/features/cluster-prometheus-monitoring.feature
@@ -0,0 +1,14 @@
+Feature: Testing Rook orchestrator commands
+ Ceph has been installed using the cluster CRD available in deploy/examples/cluster-test.yaml
+
+ Scenario: Verify Prometheus metrics endpoint is working properly
+ Given I can get prometheus server configuration
+ Given the prometheus server is serving metrics
+
+ Scenario: Verify some basic metrics are working properly
+ Given I can get prometheus server configuration
+ Given the prometheus server is serving metrics
+ Then the response contains the metric "ceph_osd_in" where "ceph_daemon" is "osd.0" and value equal to 1
+ Then the response contains the metric "ceph_osd_in" where "ceph_daemon" is "osd.1" and value equal to 1
+ Then the response contains the metric "ceph_osd_in" where "ceph_daemon" is "osd.2" and value equal to 1
+ Then the response contains the metric "ceph_mon_quorum_status" where "ceph_daemon" is "mon.a" and value equal to 1
diff --git a/src/pybind/mgr/rook/ci/tests/features/rook.feature b/src/pybind/mgr/rook/ci/tests/features/rook.feature
index ae0478f8b..acf733f55 100644
--- a/src/pybind/mgr/rook/ci/tests/features/rook.feature
+++ b/src/pybind/mgr/rook/ci/tests/features/rook.feature
@@ -1,8 +1,8 @@
Feature: Testing Rook orchestrator commands
- Ceph has been installed using the cluster CRD available in deploy/examples/cluster-test.yaml and
+ Ceph has been installed using the cluster CRD available in deploy/examples/cluster-test.yaml
Scenario: Verify ceph cluster health
- When I run
+ When I run ceph command
"""
ceph health | grep HEALTH
"""
@@ -10,3 +10,58 @@ Feature: Testing Rook orchestrator commands
"""
HEALTH_OK
"""
+
+ Scenario: Verify rook orchestrator has been enabled correctly
+ When I run ceph command
+ """
+ ceph mgr module ls | grep rook
+ """
+ Then I get something like
+ """
+ rook +on
+ """
+
+ Scenario: Verify rook orchestrator lists services correctly
+ When I run ceph command
+ """
+ ceph orch ls
+ """
+ Then I get something like
+ """
+ NAME +PORTS +RUNNING +REFRESHED +AGE +PLACEMENT
+ crash +1/1 .+
+ mgr +1/1 .+
+ mon +1/1 .+
+ osd +3 .+
+ """
+
+ Scenario: Verify rook orchestrator lists daemons correctly
+ When I run ceph command
+ """
+ ceph orch ps
+ """
+ Then I get something like
+ """
+ NAME +HOST +PORTS +STATUS +REFRESHED +AGE +MEM +USE +MEM +LIM +VERSION +IMAGE +ID
+ ceph-exporter.exporter +minikube +running .+
+ crashcollector.crash +minikube +running .+
+ mgr.a +minikube +running .+
+ mon.a +minikube +running .+
+ osd.0 +minikube +running .+
+ osd.1 +minikube +running .+
+ osd.2 +minikube +running .+
+ """
+
+ Scenario: Verify rook orchestrator lists devices correctly
+ When I run ceph command
+ """
+ ceph orch device ls
+ """
+ Then I get something like
+ """
+ HOST +PATH +TYPE +DEVICE +ID +SIZE +AVAILABLE +REFRESHED +REJECT +REASONS
+ minikube +/dev/vdb +unknown +None +10.0G .+
+ minikube +/dev/vdc +unknown +None +20.0G .+
+ minikube +/dev/vdd +unknown +None +20.0G .+
+ minikube +/dev/vde +unknown +None +20.0G .+
+ """
diff --git a/src/pybind/mgr/rook/ci/tests/features/steps/implementation.py b/src/pybind/mgr/rook/ci/tests/features/steps/implementation.py
index adde61afd..59cb117c8 100644
--- a/src/pybind/mgr/rook/ci/tests/features/steps/implementation.py
+++ b/src/pybind/mgr/rook/ci/tests/features/steps/implementation.py
@@ -1,15 +1,35 @@
+import requests
+from behave import given, when, then
from behave import *
from utils import *
+import subprocess
import re
-@when("I run")
+PROMETHEUS_SERVER_URL = None
+
+def get_prometheus_pod_host_ip():
+ try:
+ command = "minikube --profile minikube kubectl -- -n rook-ceph -o jsonpath='{.status.hostIP}' get pod prometheus-rook-prometheus-0"
+ result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True)
+ host_ip = result.stdout.strip()
+ return host_ip
+ except subprocess.CalledProcessError as e:
+ print(f"Error running command: {e}")
+ return None
+
+@when("I run ceph command")
+def run_step(context):
+ context.output = run_ceph_commands(context.text)
+
+@when("I run k8s command")
def run_step(context):
- context.output = run_commands(context.text)
+ context.output = run_k8s_commands(context.text)
@then("I get")
def verify_result_step(context):
- print(f"Output is:\n{context.output}\n--------------\n")
- assert context.text == context.output
+ if (context.text != context.output):
+ display_side_by_side(context.text, context.output)
+ assert context.text == context.output, ""
@then("I get something like")
def verify_fuzzy_result_step(context):
@@ -18,4 +38,83 @@ def verify_fuzzy_result_step(context):
num_lines = min(len(output_lines), len(expected_lines))
for n in range(num_lines):
if not re.match(expected_lines[n], output_lines[n]):
- raise
+ display_side_by_side(expected_lines[n], output_lines[n])
+ assert False, ""
+
+@given('I can get prometheus server configuration')
+def step_get_prometheus_server_ip(context):
+ global PROMETHEUS_SERVER_URL
+ try:
+ PROMETHEUS_SERVER_URL = f"http://{get_prometheus_pod_host_ip()}:30900"
+ except requests.exceptions.RequestException as e:
+ print(f"Error connecting to Prometheus server: {e}")
+ assert False, f"Error connecting to Prometheus server: {e}"
+
+@given('the prometheus server is serving metrics')
+def step_given_server_running(context):
+ try:
+ params = {'match[]': '{__name__!=""}'}
+ response = requests.get(f"{PROMETHEUS_SERVER_URL}/federate", params)
+ # Check if the response status code is successful (2xx)
+ response.raise_for_status()
+ # Store the response object in the context for later use
+ context.response = response
+ print(f"Prometheus server is running. Status code: {response.status_code}")
+ except requests.exceptions.RequestException as e:
+ print(f"Error connecting to Prometheus server: {e}")
+ assert False, f"Error connecting to Prometheus server: {e}"
+
+@when('I query the Prometheus metrics endpoint')
+def step_when_query_metrics_endpoint(context):
+ params = {'match[]': '{__name__!=""}'}
+ context.response = requests.get(f"{PROMETHEUS_SERVER_URL}/federate", params)
+ context.response.raise_for_status()
+
+@then('the response contains the metric "{metric_name}"')
+def step_then_check_metric_value(context, metric_name):
+ metric_value = parse_metric_value(context.response.text, metric_name)
+ assert metric_value is not None, f"Metric '{metric_name}' not found in the response"
+
+@then('the response contains the metric "{metric_name}" with value equal to {expected_value}')
+def step_then_check_metric_value(context, metric_name, expected_value):
+ metric_value = parse_metric_value(context.response.text, metric_name)
+ assert metric_value is not None, f"Metric '{metric_name}' not found in the response"
+ assert metric_value == float(expected_value), f"Metric '{metric_name}' value {metric_value} is not equal to {expected_value}"
+
+@then('the response contains the metric "{metric_name}" with value greater than {expected_value}')
+def step_then_check_metric_value(context, metric_name, expected_value):
+ metric_value = parse_metric_value(context.response.text, metric_name)
+ assert metric_value is not None, f"Metric '{metric_name}' not found in the response"
+ assert metric_value > float(expected_value), f"Metric '{metric_name}' value {metric_value} is not greater than {expected_value}"
+
+@then('the response contains the metric "{metric_name}" with value less than {expected_value}')
+def step_then_check_metric_value(context, metric_name, expected_value):
+ metric_value = parse_metric_value(context.response.text, metric_name)
+ assert metric_value is not None, f"Metric '{metric_name}' not found in the response"
+ assert metric_value < float(expected_value), f"Metric '{metric_name}' value {metric_value} is not less than {expected_value}"
+
+@then('the response contains the metric "{metric_name}" with value in the range {min_value}-{max_value}')
+def step_then_check_metric_value(context, metric_name, min_value, max_value):
+ metric_value = parse_metric_value(context.response.text, metric_name)
+ assert metric_value is not None, f"Metric '{metric_name}' not found in the response"
+ assert metric_value >= float(min_value) and metric_value <= float(max_value), f"Metric '{metric_name}' value {metric_value} is not in the range {min_value}-{max_value}"
+
+@then('the response contains the metric "{metric_name}" where "{filter_by_field}" is "{field_value}" and value equal to {expected_value}')
+def step_then_check_metric_value(context, metric_name, expected_value, filter_by_field, field_value):
+ metric_value = parse_metric_value(context.response.text, metric_name, filter_by_field, field_value)
+ assert metric_value is not None, f"Metric '{metric_name}' not found in the response"
+ assert metric_value == float(expected_value), f"Metric '{metric_name}' value {metric_value} is not equal to {expected_value}"
+
+
+def parse_metric_value(metrics_text, metric_name, filter_by_field=None, field_value=None):
+ filter_condition = f'{filter_by_field}="{field_value}"' if filter_by_field and field_value else ''
+ pattern_str = rf'^{metric_name}\{{[^}}]*{filter_condition}[^}}]*\}} (\d+) (\d+)'
+ pattern = re.compile(pattern_str, re.MULTILINE)
+ match = pattern.search(metrics_text)
+ if match:
+ # Extract the values and timestamp from the matched groups
+ metric_value, _ = match.groups()
+ return float(metric_value)
+ else:
+ # Metric not found
+ return None
diff --git a/src/pybind/mgr/rook/ci/tests/features/steps/utils.py b/src/pybind/mgr/rook/ci/tests/features/steps/utils.py
index 41a71d0fb..f711ec3fe 100644
--- a/src/pybind/mgr/rook/ci/tests/features/steps/utils.py
+++ b/src/pybind/mgr/rook/ci/tests/features/steps/utils.py
@@ -1,4 +1,5 @@
import subprocess
+from difflib import unified_diff
ROOK_CEPH_COMMAND = "minikube kubectl -- -n rook-ceph exec -it deploy/rook-ceph-tools -- "
CLUSTER_COMMAND = "minikube kubectl -- "
@@ -27,3 +28,25 @@ def run_commands(commands: str) -> str:
output = execute_command(command)
return output.strip("\n")
+
+def run_k8s_commands(commands: str) -> str:
+ commands_list = commands.split("\n")
+ output = ""
+ for cmd in commands_list:
+ command = CLUSTER_COMMAND + cmd
+ output = execute_command(command)
+
+ return output.strip("\n")
+
+def run_ceph_commands(commands: str) -> str:
+ commands_list = commands.split("\n")
+ output = ""
+ for cmd in commands_list:
+ command = ROOK_CEPH_COMMAND + cmd
+ output = execute_command(command)
+
+ return output.strip("\n")
+
+def display_side_by_side(expected, got):
+ diff = unified_diff(expected.splitlines(), got.splitlines(), lineterm='')
+ print('\n'.join(diff))
diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py
index fa75db2cf..34ed15bc6 100644
--- a/src/pybind/mgr/rook/module.py
+++ b/src/pybind/mgr/rook/module.py
@@ -82,12 +82,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
default='local',
desc='storage class name for LSO-discovered PVs',
),
- Option(
- 'drive_group_interval',
- type='float',
- default=300.0,
- desc='interval in seconds between re-application of applied drive_groups',
- ),
]
@staticmethod
@@ -126,9 +120,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self.config_notify()
if TYPE_CHECKING:
self.storage_class = 'foo'
- self.drive_group_interval = 10.0
- self._load_drive_groups()
self._shutdown = threading.Event()
def config_notify(self) -> None:
@@ -144,7 +136,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self.log.debug(' mgr option %s = %s',
opt['name'], getattr(self, opt['name'])) # type: ignore
assert isinstance(self.storage_class, str)
- assert isinstance(self.drive_group_interval, float)
if self._rook_cluster:
self._rook_cluster.storage_class_name = self.storage_class
@@ -211,10 +202,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
self._initialized.set()
self.config_notify()
- while not self._shutdown.is_set():
- self._apply_drivegroups(list(self._drive_group_map.values()))
- self._shutdown.wait(self.drive_group_interval)
-
@handle_orch_error
def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]:
host_list = None
@@ -257,6 +244,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
image_name = cl['spec'].get('cephVersion', {}).get('image', None)
num_nodes = len(self.rook_cluster.get_node_names())
+ def sum_running_pods(service_type: str, service_name: Optional[str] = None) -> int:
+ all_pods = self.rook_cluster.describe_pods(None, None, None)
+ if service_name is None:
+ return sum(pod['phase'] == 'Running' for pod in all_pods if pod['labels']['app'] == f"rook-ceph-{service_type}")
+ else:
+ if service_type == 'mds':
+ key = 'rook_file_system'
+ elif service_type == 'rgw':
+ key = 'rook_object_store'
+ elif service_type == 'nfs':
+ key = 'ceph_nfs'
+ else:
+ self.log.error(f"Unknow service type {service_type}")
+ return 0
+
+ return sum(pod['phase'] == 'Running' \
+ for pod in all_pods \
+ if pod['labels']['app'] == f"rook-ceph-{service_type}" \
+ and service_name == pod['labels'][key])
+
spec = {}
if service_type == 'mon' or service_type is None:
spec['mon'] = orchestrator.ServiceDescription(
@@ -269,6 +276,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=cl['spec'].get('mon', {}).get('count', 1),
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mon')
)
if service_type == 'mgr' or service_type is None:
spec['mgr'] = orchestrator.ServiceDescription(
@@ -279,6 +287,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=1,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mgr')
)
if (
@@ -293,13 +302,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=num_nodes,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('crashcollector')
)
if service_type == 'mds' or service_type is None:
# CephFilesystems
all_fs = self.rook_cluster.get_resource("cephfilesystems")
for fs in all_fs:
- svc = 'mds.' + fs['metadata']['name']
+ fs_name = fs['metadata']['name']
+ svc = 'mds.' + fs_name
if svc in spec:
continue
# FIXME: we are conflating active (+ standby) with count
@@ -316,13 +327,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=total_mds,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('mds', fs_name)
)
if service_type == 'rgw' or service_type is None:
# CephObjectstores
all_zones = self.rook_cluster.get_resource("cephobjectstores")
for zone in all_zones:
- svc = 'rgw.' + zone['metadata']['name']
+ zone_name = zone['metadata']['name']
+ svc = 'rgw.' + zone_name
if svc in spec:
continue
active = zone['spec']['gateway']['instances'];
@@ -344,6 +357,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
size=active,
container_image_name=image_name,
last_refresh=now,
+ running=sum_running_pods('rgw', zone_name)
)
if service_type == 'nfs' or service_type is None:
@@ -368,7 +382,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
),
size=active,
last_refresh=now,
- running=len([1 for pod in nfs_pods if pod['labels']['ceph_nfs'] == nfs_name]),
+ running=sum_running_pods('nfs', nfs_name),
created=creation_timestamp.astimezone(tz=datetime.timezone.utc)
)
if service_type == 'osd' or service_type is None:
@@ -385,18 +399,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
),
size=len(all_osds),
last_refresh=now,
- running=sum(osd.status.phase == 'Running' for osd in all_osds)
+ running=sum_running_pods('osd')
)
- # drivegroups
- for name, dg in self._drive_group_map.items():
- spec[f'osd.{name}'] = orchestrator.ServiceDescription(
- spec=dg,
- last_refresh=now,
- size=0,
- running=0,
- )
-
if service_type == 'rbd-mirror' or service_type is None:
# rbd-mirrors
all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors")
@@ -414,13 +419,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
),
size=1,
last_refresh=now,
+ running=sum_running_pods('rbd-mirror', mirror_name)
)
-
+
for dd in self._list_daemons():
if dd.service_name() not in spec:
continue
service = spec[dd.service_name()]
- service.running += 1
if not service.container_image_id:
service.container_image_id = dd.container_image_id
if not service.container_image_name:
@@ -451,11 +456,25 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
daemon_id: Optional[str] = None,
host: Optional[str] = None,
refresh: bool = False) -> List[orchestrator.DaemonDescription]:
+
+ def _pod_to_servicename(pod: Dict[str, Any]) -> Optional[str]:
+ if 'ceph_daemon_type' not in pod['labels']:
+ return None
+ daemon_type = pod['labels']['ceph_daemon_type']
+ if daemon_type in ['mds', 'rgw', 'nfs', 'rbd-mirror']:
+ if 'app.kubernetes.io/part-of' in pod['labels']:
+ service_name = f"{daemon_type}.{pod['labels']['app.kubernetes.io/part-of']}"
+ else:
+ service_name = f"{daemon_type}"
+ else:
+ service_name = f"{daemon_type}"
+ return service_name
+
pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host)
- self.log.debug('pods %s' % pods)
result = []
for p in pods:
- sd = orchestrator.DaemonDescription()
+ pod_svc_name = _pod_to_servicename(p)
+ sd = orchestrator.DaemonDescription(service_name=pod_svc_name)
sd.hostname = p['hostname']
# In Rook environments, the 'ceph-exporter' daemon is named 'exporter' whereas
@@ -535,9 +554,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
elif service_type == 'rbd-mirror':
return self.rook_cluster.rm_service('cephrbdmirrors', service_id)
elif service_type == 'osd':
- if service_id in self._drive_group_map:
- del self._drive_group_map[service_id]
- self._save_drive_groups()
return f'Removed {service_name}'
elif service_type == 'ingress':
self.log.info("{0} service '{1}' does not exist".format('ingress', service_id))
@@ -593,135 +609,33 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator):
def remove_daemons(self, names: List[str]) -> List[str]:
return self.rook_cluster.remove_pods(names)
- def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]:
- for drive_group in specs:
- self._drive_group_map[str(drive_group.service_id)] = drive_group
- self._save_drive_groups()
- return OrchResult(self._apply_drivegroups(specs))
-
- def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]:
- all_hosts = raise_if_exception(self.get_hosts())
- result_list: List[str] = []
- for drive_group in ls:
- matching_hosts = drive_group.placement.filter_matching_hosts(
- lambda label=None, as_hostspec=None: all_hosts
- )
+ def add_host_label(self, host: str, label: str) -> OrchResult[str]:
+ return self.rook_cluster.add_host_label(host, label)
+
+ def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
+ return self.rook_cluster.remove_host_label(host, label)
- if not self.rook_cluster.node_exists(matching_hosts[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(matching_hosts))
-
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
- result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts))
- return result_list
-
- def _load_drive_groups(self) -> None:
- stored_drive_group = self.get_store("drive_group_map")
- self._drive_group_map: Dict[str, DriveGroupSpec] = {}
- if stored_drive_group:
- for name, dg in json.loads(stored_drive_group).items():
- try:
- self._drive_group_map[name] = DriveGroupSpec.from_json(dg)
- except ValueError as e:
- self.log.error(f'Failed to load drive group {name} ({dg}): {e}')
-
- def _save_drive_groups(self) -> None:
- json_drive_group_map = {
- name: dg.to_json() for name, dg in self._drive_group_map.items()
- }
- self.set_store("drive_group_map", json.dumps(json_drive_group_map))
+ @handle_orch_error
+ def create_osds(self, drive_group: DriveGroupSpec) -> str:
+ raise orchestrator.OrchestratorError('Creating OSDs is not supported by rook orchestrator. Please, use Rook operator.')
+ @handle_orch_error
def remove_osds(self,
osd_ids: List[str],
replace: bool = False,
force: bool = False,
zap: bool = False,
- no_destroy: bool = False) -> OrchResult[str]:
- assert self._rook_cluster is not None
- if zap:
- raise RuntimeError("Rook does not support zapping devices during OSD removal.")
- res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command)
- return OrchResult(res)
+ no_destroy: bool = False) -> str:
+ raise orchestrator.OrchestratorError('Removing OSDs is not supported by rook orchestrator. Please, use Rook operator.')
- def add_host_label(self, host: str, label: str) -> OrchResult[str]:
- return self.rook_cluster.add_host_label(host, label)
-
- def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]:
- return self.rook_cluster.remove_host_label(host, label)
- """
@handle_orch_error
- def create_osds(self, drive_group):
- # type: (DriveGroupSpec) -> str
- # Creates OSDs from a drive group specification.
-
- # $: ceph orch osd create -i <dg.file>
-
- # The drivegroup file must only contain one spec at a time.
- #
-
- targets = [] # type: List[str]
- if drive_group.data_devices and drive_group.data_devices.paths:
- targets += [d.path for d in drive_group.data_devices.paths]
- if drive_group.data_directories:
- targets += drive_group.data_directories
-
- all_hosts = raise_if_exception(self.get_hosts())
-
- matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts)
-
- assert len(matching_hosts) == 1
-
- if not self.rook_cluster.node_exists(matching_hosts[0]):
- raise RuntimeError("Node '{0}' is not in the Kubernetes "
- "cluster".format(matching_hosts))
-
- # Validate whether cluster CRD can accept individual OSD
- # creations (i.e. not useAllDevices)
- if not self.rook_cluster.can_create_osd():
- raise RuntimeError("Rook cluster configuration does not "
- "support OSD creation.")
-
- return self.rook_cluster.add_osds(drive_group, matching_hosts)
-
- # TODO: this was the code to update the progress reference:
-
- @handle_orch_error
- def has_osds(matching_hosts: List[str]) -> bool:
-
- # Find OSD pods on this host
- pod_osd_ids = set()
- pods = self.k8s.list_namespaced_pod(self._rook_env.namespace,
- label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name),
- field_selector="spec.nodeName={0}".format(
- matching_hosts[0]
- )).items
- for p in pods:
- pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id']))
-
- self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids))
-
- found = []
- osdmap = self.get("osd_map")
- for osd in osdmap['osds']:
- osd_id = osd['osd']
- if osd_id not in pod_osd_ids:
- continue
-
- metadata = self.get_metadata('osd', "%s" % osd_id)
- if metadata and metadata['devices'] in targets:
- found.append(osd_id)
- else:
- self.log.info("ignoring osd {0} {1}".format(
- osd_id, metadata['devices'] if metadata else 'DNE'
- ))
+ def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
+ return self.rook_cluster.blink_light(ident_fault, on, locs)
- return found is not None
- """
+ @handle_orch_error
+ def upgrade_status(self) -> orchestrator.UpgradeStatusSpec:
+ return orchestrator.UpgradeStatusSpec()
@handle_orch_error
- def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]:
- return self.rook_cluster.blink_light(ident_fault, on, locs)
+ def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]:
+ return {}
diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py
index 5c7c9fc04..16d498a70 100644
--- a/src/pybind/mgr/rook/rook_cluster.py
+++ b/src/pybind/mgr/rook/rook_cluster.py
@@ -24,9 +24,20 @@ from urllib3.exceptions import ProtocolError
from ceph.deployment.inventory import Device
from ceph.deployment.drive_group import DriveGroupSpec
-from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec
+from ceph.deployment.service_spec import (
+ ServiceSpec,
+ NFSServiceSpec,
+ RGWSpec,
+ PlacementSpec,
+ HostPlacementSpec,
+ HostPattern,
+)
from ceph.utils import datetime_now
-from ceph.deployment.drive_selection.matchers import SizeMatcher
+from ceph.deployment.drive_selection.matchers import (
+ AllMatcher,
+ Matcher,
+ SizeMatcher,
+)
from nfs.cluster import create_ganesha_pool
from nfs.module import Module
from nfs.export import NFSRados
@@ -372,324 +383,6 @@ class KubernetesCustomResource(KubernetesResource):
"{} doesn't contain a metadata.name. Unable to track changes".format(
self.api_func))
-class DefaultCreator():
- def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'):
- self.coreV1_api = coreV1_api
- self.storage_class_name = storage_class_name
- self.inventory = inventory
-
- def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem:
- device_set = ccl.StorageClassDeviceSetsItem(
- name=d.sys_api['pv_name'],
- volumeClaimTemplates= ccl.VolumeClaimTemplatesList(),
- count=1,
- encrypted=drive_group.encrypted,
- portable=False
- )
- device_set.volumeClaimTemplates.append(
- ccl.VolumeClaimTemplatesItem(
- metadata=ccl.Metadata(
- name="data"
- ),
- spec=ccl.Spec(
- storageClassName=self.storage_class_name,
- volumeMode="Block",
- accessModes=ccl.CrdObjectList(["ReadWriteOnce"]),
- resources={
- "requests":{
- "storage": 1
- }
- },
- volumeName=d.sys_api['pv_name']
- )
- )
- )
- return device_set
-
- def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
- device_list = []
- assert drive_group.data_devices is not None
- sizematcher: Optional[SizeMatcher] = None
- if drive_group.data_devices.size:
- sizematcher = SizeMatcher('size', drive_group.data_devices.size)
- limit = getattr(drive_group.data_devices, 'limit', None)
- count = 0
- all = getattr(drive_group.data_devices, 'all', None)
- paths = [device.path for device in drive_group.data_devices.paths]
- osd_list = []
- for pod in rook_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
- ):
- osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
- for _, node in self.inventory.items():
- for device in node:
- if device.sys_api['pv_name'] in osd_list:
- count += 1
- for _, node in self.inventory.items():
- for device in node:
- if not limit or (count < limit):
- if device.available:
- if (
- all
- or (
- device.sys_api['node'] in matching_hosts
- and ((sizematcher != None) or sizematcher.compare(device))
- and (
- not drive_group.data_devices.paths
- or (device.path in paths)
- )
- )
- ):
- device_list.append(device)
- count += 1
-
- return device_list
-
- def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any:
- to_create = self.filter_devices(rook_pods, drive_group,matching_hosts)
- assert drive_group.data_devices is not None
- def _add_osds(current_cluster, new_cluster):
- # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
- if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage:
- new_cluster.spec.storage = ccl.Storage()
-
- if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets:
- new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList()
-
- existing_scds = [
- scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets
- ]
- for device in to_create:
- new_scds = self.device_to_device_set(drive_group, device)
- if new_scds.name not in existing_scds:
- new_cluster.spec.storage.storageClassDeviceSets.append(new_scds)
- return new_cluster
- return _add_osds
-
-class LSOCreator(DefaultCreator):
- def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]:
- device_list = []
- assert drive_group.data_devices is not None
- sizematcher = None
- if drive_group.data_devices.size:
- sizematcher = SizeMatcher('size', drive_group.data_devices.size)
- limit = getattr(drive_group.data_devices, 'limit', None)
- all = getattr(drive_group.data_devices, 'all', None)
- paths = [device.path for device in drive_group.data_devices.paths]
- vendor = getattr(drive_group.data_devices, 'vendor', None)
- model = getattr(drive_group.data_devices, 'model', None)
- count = 0
- osd_list = []
- for pod in rook_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and 'ceph.rook.io/DeviceSet' in pod.metadata.labels
- ):
- osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet'])
- for _, node in self.inventory.items():
- for device in node:
- if device.sys_api['pv_name'] in osd_list:
- count += 1
- for _, node in self.inventory.items():
- for device in node:
- if not limit or (count < limit):
- if device.available:
- if (
- all
- or (
- device.sys_api['node'] in matching_hosts
- and ((sizematcher != None) or sizematcher.compare(device))
- and (
- not drive_group.data_devices.paths
- or device.path in paths
- )
- and (
- not vendor
- or device.sys_api['vendor'] == vendor
- )
- and (
- not model
- or device.sys_api['model'].startsWith(model)
- )
- )
- ):
- device_list.append(device)
- count += 1
- return device_list
-
-class DefaultRemover():
- def __init__(
- self,
- coreV1_api: 'client.CoreV1Api',
- batchV1_api: 'client.BatchV1Api',
- appsV1_api: 'client.AppsV1Api',
- osd_ids: List[str],
- replace_flag: bool,
- force_flag: bool,
- mon_command: Callable,
- patch: Callable,
- rook_env: 'RookEnv',
- inventory: Dict[str, List[Device]]
- ):
- self.batchV1_api = batchV1_api
- self.appsV1_api = appsV1_api
- self.coreV1_api = coreV1_api
-
- self.osd_ids = osd_ids
- self.replace_flag = replace_flag
- self.force_flag = force_flag
-
- self.mon_command = mon_command
-
- self.patch = patch
- self.rook_env = rook_env
-
- self.inventory = inventory
- self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod,
- namespace=self.rook_env.namespace,
- label_selector='app=rook-ceph-osd')
- self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job,
- namespace=self.rook_env.namespace,
- label_selector='app=rook-ceph-osd-prepare')
- self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim,
- namespace=self.rook_env.namespace)
-
-
- def remove_device_sets(self) -> str:
- self.to_remove: Dict[str, int] = {}
- self.pvc_to_remove: List[str] = []
- for pod in self.osd_pods.items:
- if (
- hasattr(pod, 'metadata')
- and hasattr(pod.metadata, 'labels')
- and 'osd' in pod.metadata.labels
- and pod.metadata.labels['osd'] in self.osd_ids
- ):
- if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove:
- self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1
- else:
- self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1
- self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc'])
- def _remove_osds(current_cluster, new_cluster):
- # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster
- assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None
- for _set in new_cluster.spec.storage.storageClassDeviceSets:
- if _set.name in self.to_remove:
- if _set.count == self.to_remove[_set.name]:
- new_cluster.spec.storage.storageClassDeviceSets.remove(_set)
- else:
- _set.count = _set.count - self.to_remove[_set.name]
- return new_cluster
- return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds)
-
- def check_force(self) -> None:
- if not self.force_flag:
- safe_args = {'prefix': 'osd safe-to-destroy',
- 'ids': [str(x) for x in self.osd_ids]}
- ret, out, err = self.mon_command(safe_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def set_osds_down(self) -> None:
- down_flag_args = {
- 'prefix': 'osd down',
- 'ids': [str(x) for x in self.osd_ids]
- }
- ret, out, err = self.mon_command(down_flag_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def scale_deployments(self) -> None:
- for osd_id in self.osd_ids:
- self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace,
- name='rook-ceph-osd-{}'.format(osd_id),
- body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0)))
-
- def set_osds_out(self) -> None:
- out_flag_args = {
- 'prefix': 'osd out',
- 'ids': [str(x) for x in self.osd_ids]
- }
- ret, out, err = self.mon_command(out_flag_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def delete_deployments(self) -> None:
- for osd_id in self.osd_ids:
- self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace,
- name='rook-ceph-osd-{}'.format(osd_id),
- propagation_policy='Foreground')
-
- def clean_up_prepare_jobs_and_pvc(self) -> None:
- for job in self.jobs.items:
- if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove:
- self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace,
- propagation_policy='Foreground')
- self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'],
- namespace=self.rook_env.namespace,
- propagation_policy='Foreground')
-
- def purge_osds(self) -> None:
- for id in self.osd_ids:
- purge_args = {
- 'prefix': 'osd purge-actual',
- 'id': int(id),
- 'yes_i_really_mean_it': True
- }
- ret, out, err = self.mon_command(purge_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def destroy_osds(self) -> None:
- for id in self.osd_ids:
- destroy_args = {
- 'prefix': 'osd destroy-actual',
- 'id': int(id),
- 'yes_i_really_mean_it': True
- }
- ret, out, err = self.mon_command(destroy_args)
- if ret != 0:
- raise RuntimeError(err)
-
- def remove(self) -> str:
- try:
- self.check_force()
- except Exception as e:
- log.exception("Error checking if OSDs are safe to destroy")
- return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}"
- try:
- remove_result = self.remove_device_sets()
- except Exception as e:
- log.exception("Error patching ceph cluster CRD")
- return f"Not possible to modify Ceph cluster CRD: {e}"
- try:
- self.scale_deployments()
- self.delete_deployments()
- self.clean_up_prepare_jobs_and_pvc()
- except Exception as e:
- log.exception("Ceph cluster CRD patched, but error cleaning environment")
- return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}"
- try:
- self.set_osds_down()
- self.set_osds_out()
- if self.replace_flag:
- self.destroy_osds()
- else:
- self.purge_osds()
- except Exception as e:
- log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster")
- return f"Error removing OSDs from Ceph cluster: {e}"
-
- return remove_result
-
-
-
class RookCluster(object):
# import of client.CoreV1Api must be optional at import time.
# Instead allow mgr/rook to be imported anyway.
@@ -794,7 +487,12 @@ class RookCluster(object):
else:
fetcher = DefaultFetcher(sc.metadata.name, self.coreV1_api, self.rook_env)
fetcher.fetch()
- discovered_devices.update(fetcher.devices())
+ nodename_to_devices = fetcher.devices()
+ for node, devices in nodename_to_devices.items():
+ if node in discovered_devices:
+ discovered_devices[node].extend(devices)
+ else:
+ discovered_devices[node] = devices
return discovered_devices
@@ -1096,7 +794,6 @@ class RookCluster(object):
name=spec.rgw_zone
)
return object_store
-
def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore:
if new.spec.gateway:
@@ -1188,48 +885,11 @@ class RookCluster(object):
return new
return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count)
- def add_osds(self, drive_group, matching_hosts):
- # type: (DriveGroupSpec, List[str]) -> str
- assert drive_group.objectstore in ("bluestore", "filestore")
- assert drive_group.service_id
- storage_class = self.get_storage_class()
- inventory = self.get_discovered_devices()
- creator: Optional[DefaultCreator] = None
- if (
- storage_class.metadata.labels
- and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels
- ):
- creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name)
- else:
- creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name)
- return self._patch(
- ccl.CephCluster,
- 'cephclusters',
- self.rook_env.cluster_name,
- creator.add_osds(self.rook_pods, drive_group, matching_hosts)
- )
-
- def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str:
- inventory = self.get_discovered_devices()
- self.remover = DefaultRemover(
- self.coreV1_api,
- self.batchV1_api,
- self.appsV1_api,
- osd_ids,
- replace,
- force,
- mon_command,
- self._patch,
- self.rook_env,
- inventory
- )
- return self.remover.remove()
-
def get_hosts(self) -> List[orchestrator.HostSpec]:
ret = []
for node in self.nodes.items:
spec = orchestrator.HostSpec(
- node.metadata.name,
+ node.metadata.name,
addr='/'.join([addr.address for addr in node.status.addresses]),
labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')],
)
@@ -1585,7 +1245,7 @@ def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) ->
res.label = expression.key.split('/')[1]
elif expression.key == "kubernetes.io/hostname":
if expression.operator == "Exists":
- res.host_pattern = "*"
+ res.host_pattern = HostPattern("*")
elif expression.operator == "In":
res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values]
return res