From 17d6a993fc17d533460c5f40f3908c708e057c18 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Thu, 23 May 2024 18:45:17 +0200 Subject: Merging upstream version 18.2.3. Signed-off-by: Daniel Baumann --- src/pybind/mgr/rook/ci/Dockerfile | 3 +- .../ci/cluster-specs/cluster-on-pvc-minikube.yaml | 198 +++++++++++ src/pybind/mgr/rook/ci/requirements.txt | 1 + src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh | 3 + .../mgr/rook/ci/scripts/bootstrap-rook-cluster.sh | 124 +++++-- .../features/cluster-prometheus-monitoring.feature | 14 + src/pybind/mgr/rook/ci/tests/features/rook.feature | 59 +++- .../rook/ci/tests/features/steps/implementation.py | 109 +++++- .../mgr/rook/ci/tests/features/steps/utils.py | 23 ++ src/pybind/mgr/rook/module.py | 220 ++++-------- src/pybind/mgr/rook/rook_cluster.py | 382 ++------------------- 11 files changed, 581 insertions(+), 555 deletions(-) create mode 100644 src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml create mode 100644 src/pybind/mgr/rook/ci/requirements.txt create mode 100644 src/pybind/mgr/rook/ci/tests/features/cluster-prometheus-monitoring.feature (limited to 'src/pybind/mgr/rook') diff --git a/src/pybind/mgr/rook/ci/Dockerfile b/src/pybind/mgr/rook/ci/Dockerfile index 30ebea574..62ff0ab89 100644 --- a/src/pybind/mgr/rook/ci/Dockerfile +++ b/src/pybind/mgr/rook/ci/Dockerfile @@ -1,3 +1,4 @@ -FROM quay.io/ceph/daemon-base:latest-main +FROM quay.io/ceph/daemon-base:latest-reef COPY ./tmp_build/orchestrator /usr/share/ceph/mgr/orchestrator COPY ./tmp_build/rook /usr/share/ceph/mgr/rook +COPY ./tmp_build/ceph/ /usr/lib/python3.6/site-packages/ceph/ diff --git a/src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml b/src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml new file mode 100644 index 000000000..2732286ab --- /dev/null +++ b/src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml @@ -0,0 +1,198 @@ +################################################################################################################# +# Define the settings for the rook-ceph cluster with settings for a minikube cluster with a single node + +# This example expects a single node minikube cluster with three extra disks: vdb, vdc and vdd. Please modify +# it according to your environment. See the documentation for more details on storage settings available. + +# For example, to create the cluster: +# kubectl create -f crds.yaml -f common.yaml -f operator.yaml +# kubectl create -f cluster-on-pvc-minikube.yaml +################################################################################################################# +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: local-storage +provisioner: kubernetes.io/no-provisioner +volumeBindingMode: WaitForFirstConsumer +--- +kind: PersistentVolume +apiVersion: v1 +metadata: + name: local0-0 +spec: + storageClassName: local-storage + capacity: + storage: 10Gi + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: Retain + # PV for mon must be a filesystem volume. + volumeMode: Filesystem + local: + # To use dm devices like logical volume, please replace `/dev/sdb` with their device names like `/dev/vg-name/lv-name`. + path: /dev/vdb + nodeAffinity: + required: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: In + values: + - minikube +--- +kind: PersistentVolume +apiVersion: v1 +metadata: + name: local0-1 +spec: + storageClassName: local-storage + capacity: + storage: 20Gi + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: Retain + # PV for mon must be a filesystem volume. + volumeMode: Block + local: + # To use dm devices like logical volume, please replace `/dev/sdb` with their device names like `/dev/vg-name/lv-name`. + path: /dev/vdc + nodeAffinity: + required: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: In + values: + - minikube +--- +kind: PersistentVolume +apiVersion: v1 +metadata: + name: local0-2 +spec: + storageClassName: local-storage + capacity: + storage: 20Gi + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: Retain + # PV for mon must be a filesystem volume. + volumeMode: Block + local: + # To use dm devices like logical volume, please replace `/dev/sdb` with their device names like `/dev/vg-name/lv-name`. + path: /dev/vdd + nodeAffinity: + required: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: In + values: + - minikube +--- +kind: PersistentVolume +apiVersion: v1 +metadata: + name: local0-3 +spec: + storageClassName: local-storage + capacity: + storage: 20Gi + accessModes: + - ReadWriteOnce + persistentVolumeReclaimPolicy: Retain + # PV for mon must be a filesystem volume. + volumeMode: Block + local: + # To use dm devices like logical volume, please replace `/dev/sdb` with their device names like `/dev/vg-name/lv-name`. + path: /dev/vde + nodeAffinity: + required: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: In + values: + - minikube +--- +apiVersion: ceph.rook.io/v1 +kind: CephCluster +metadata: + name: my-cluster + namespace: rook-ceph # namespace:cluster +spec: + dataDirHostPath: /var/lib/rook + mon: + count: 1 + allowMultiplePerNode: true + volumeClaimTemplate: + spec: + storageClassName: local-storage + resources: + requests: + storage: 10Gi + mgr: + count: 1 + modules: + - name: pg_autoscaler + enabled: true + dashboard: + enabled: true + ssl: false + crashCollector: + disable: false + cephVersion: + image: quay.io/ceph/daemon-base:latest-main + allowUnsupported: true + skipUpgradeChecks: false + continueUpgradeAfterChecksEvenIfNotHealthy: false + storage: + storageClassDeviceSets: + - name: set1 + count: 3 + portable: false + tuneDeviceClass: true + tuneFastDeviceClass: false + encrypted: false + placement: + preparePlacement: + volumeClaimTemplates: + - metadata: + name: data + # if you are looking at giving your OSD a different CRUSH device class than the one detected by Ceph + # annotations: + # crushDeviceClass: hybrid + spec: + resources: + requests: + storage: 20Gi + # IMPORTANT: Change the storage class depending on your environment + storageClassName: local-storage + volumeMode: Block + accessModes: + - ReadWriteOnce + # when onlyApplyOSDPlacement is false, will merge both placement.All() and storageClassDeviceSets.Placement + onlyApplyOSDPlacement: false + priorityClassNames: + mon: system-node-critical + osd: system-node-critical + mgr: system-cluster-critical + disruptionManagement: + managePodBudgets: true + osdMaintenanceTimeout: 30 + pgHealthCheckTimeout: 0 + cephConfig: + global: + mon_warn_on_pool_no_redundancy: "false" +--- +apiVersion: ceph.rook.io/v1 +kind: CephBlockPool +metadata: + name: builtin-mgr + namespace: rook-ceph # namespace:cluster +spec: + name: .mgr + failureDomain: osd + replicated: + size: 1 + requireSafeReplicaSize: false diff --git a/src/pybind/mgr/rook/ci/requirements.txt b/src/pybind/mgr/rook/ci/requirements.txt new file mode 100644 index 000000000..9684f7742 --- /dev/null +++ b/src/pybind/mgr/rook/ci/requirements.txt @@ -0,0 +1 @@ +behave diff --git a/src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh b/src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh index a43e01a89..58d554757 100755 --- a/src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh +++ b/src/pybind/mgr/rook/ci/run-rook-e2e-tests.sh @@ -2,8 +2,11 @@ set -ex +export PATH=$PATH:~/.local/bin # behave is installed on this directory + # Execute tests : ${CEPH_DEV_FOLDER:=${PWD}} ${CEPH_DEV_FOLDER}/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh cd ${CEPH_DEV_FOLDER}/src/pybind/mgr/rook/ci/tests +pip install --upgrade --force-reinstall -r ../requirements.txt behave diff --git a/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh b/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh index 4b97df6ba..24a6a5da2 100755 --- a/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh +++ b/src/pybind/mgr/rook/ci/scripts/bootstrap-rook-cluster.sh @@ -3,7 +3,10 @@ set -eEx : ${CEPH_DEV_FOLDER:=${PWD}} +CLUSTER_SPEC=${CEPH_DEV_FOLDER}/src/pybind/mgr/rook/ci/cluster-specs/cluster-on-pvc-minikube.yaml +DEFAULT_NS="rook-ceph" KUBECTL="minikube kubectl --" +export ROOK_CLUSTER_NS="${ROOK_CLUSTER_NS:=$DEFAULT_NS}" ## CephCluster namespace # We build a local ceph image that contains the latest code # plus changes from the PR. This image will be used by the docker @@ -15,14 +18,6 @@ on_error() { minikube delete } -configure_libvirt(){ - sudo usermod -aG libvirt $(id -un) - sudo su -l $USER # Avoid having to log out and log in for group addition to take effect. - sudo systemctl enable --now libvirtd - sudo systemctl restart libvirtd - sleep 10 # wait some time for libvirtd service to restart -} - setup_minikube_env() { # Check if Minikube is running @@ -35,20 +30,21 @@ setup_minikube_env() { fi rm -rf ~/.minikube - minikube start --memory="4096" --cpus="2" --disk-size=10g --extra-disks=1 --driver kvm2 + minikube start --memory="6144" --disk-size=20g --extra-disks=4 --driver kvm2 # point Docker env to use docker daemon running on minikube eval $(minikube docker-env -p minikube) } build_ceph_image() { - wget -q -O cluster-test.yaml https://raw.githubusercontent.com/rook/rook/master/deploy/examples/cluster-test.yaml - CURR_CEPH_IMG=$(grep -E '^\s*image:\s+' cluster-test.yaml | sed 's/.*image: *\([^ ]*\)/\1/') + + CURR_CEPH_IMG=$(grep -E '^\s*image:\s+' $CLUSTER_SPEC | sed 's/.*image: *\([^ ]*\)/\1/') cd ${CEPH_DEV_FOLDER}/src/pybind/mgr/rook/ci mkdir -p tmp_build/rook mkdir -p tmp_build/orchestrator cp ./../../orchestrator/*.py tmp_build/orchestrator cp ../*.py tmp_build/rook + cp -r ../../../../../src/python-common/ceph/ tmp_build/ # we use the following tag to trick the Docker # running inside minikube so it uses this image instead @@ -62,28 +58,39 @@ build_ceph_image() { } create_rook_cluster() { - wget -q -O cluster-test.yaml https://raw.githubusercontent.com/rook/rook/master/deploy/examples/cluster-test.yaml $KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/crds.yaml $KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/common.yaml $KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/operator.yaml - $KUBECTL create -f cluster-test.yaml - $KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/dashboard-external-http.yaml + $KUBECTL create -f $CLUSTER_SPEC $KUBECTL create -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/toolbox.yaml } +is_operator_ready() { + local phase + phase=$($KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.phase}') + echo "PHASE: $phase" + [[ "$phase" == "Ready" ]] +} + wait_for_rook_operator() { local max_attempts=10 local sleep_interval=20 local attempts=0 + $KUBECTL rollout status deployment rook-ceph-operator -n rook-ceph --timeout=180s - PHASE=$($KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.phase}') - echo "PHASE: $PHASE" - while ! $KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.phase}' | grep -q "Ready"; do - echo "Waiting for cluster to be ready..." - sleep $sleep_interval - attempts=$((attempts+1)) + + while ! is_operator_ready; do + echo "Waiting for rook operator to be ready..." + sleep $sleep_interval + + # log current cluster state and pods info for debugging + PHASE=$($KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.phase}') + $KUBECTL -n rook-ceph get pods + + attempts=$((attempts + 1)) if [ $attempts -ge $max_attempts ]; then echo "Maximum number of attempts ($max_attempts) reached. Exiting..." + $KUBECTL -n rook-ceph get pods | grep operator | awk '{print $1}' | xargs $KUBECTL -n rook-ceph logs return 1 fi done @@ -93,9 +100,9 @@ wait_for_ceph_cluster() { local max_attempts=10 local sleep_interval=20 local attempts=0 - $KUBECTL rollout status deployment rook-ceph-tools -n rook-ceph --timeout=30s + $KUBECTL rollout status deployment rook-ceph-tools -n rook-ceph --timeout=90s while ! $KUBECTL get cephclusters.ceph.rook.io -n rook-ceph -o jsonpath='{.items[?(@.kind == "CephCluster")].status.ceph.health}' | grep -q "HEALTH_OK"; do - echo "Waiting for Ceph cluster installed" + echo "Waiting for Ceph cluster to enter HEALTH_OK" state sleep $sleep_interval attempts=$((attempts+1)) if [ $attempts -ge $max_attempts ]; then @@ -104,18 +111,66 @@ wait_for_ceph_cluster() { fi done echo "Ceph cluster installed and running" + + # add an additional wait to cover with any subttle change in the state + sleep 20 +} + +configure_libvirt(){ + if sudo usermod -aG libvirt $(id -un); then + echo "User added to libvirt group successfully." + sudo systemctl enable --now libvirtd + sudo systemctl restart libvirtd + sleep 30 # wait some time for libvirtd service to restart + newgrp libvirt + else + echo "Error adding user to libvirt group." + return 1 + fi +} + +recreate_default_network(){ + + # destroy any existing kvm default network + if sudo virsh net-destroy default; then + sudo virsh net-undefine default + fi + + # let's create a new kvm default network + sudo virsh net-define /usr/share/libvirt/networks/default.xml + if sudo virsh net-start default; then + echo "Network 'default' started successfully." + else + # Optionally, handle the error + echo "Failed to start network 'default', but continuing..." + fi + + # restart libvirtd service and wait a little bit for the service + sudo systemctl restart libvirtd + sleep 30 + + # Just some debugging information + all_networks=$(virsh net-list --all) + groups=$(groups) +} + +enable_rook_orchestrator() { + echo "Enabling rook orchestrator" + $KUBECTL rollout status deployment rook-ceph-tools -n "$ROOK_CLUSTER_NS" --timeout=90s + $KUBECTL -n "$ROOK_CLUSTER_NS" exec -it deploy/rook-ceph-tools -- ceph mgr module enable rook + $KUBECTL -n "$ROOK_CLUSTER_NS" exec -it deploy/rook-ceph-tools -- ceph orch set backend rook + $KUBECTL -n "$ROOK_CLUSTER_NS" exec -it deploy/rook-ceph-tools -- ceph orch status } -show_info() { - DASHBOARD_PASSWORD=$($KUBECTL -n rook-ceph get secret rook-ceph-dashboard-password -o jsonpath="{['data']['password']}" | base64 --decode && echo) - IP_ADDR=$($KUBECTL get po --selector="app=rook-ceph-mgr" -n rook-ceph --output jsonpath='{.items[*].status.hostIP}') - PORT="$($KUBECTL -n rook-ceph -o=jsonpath='{.spec.ports[?(@.name == "dashboard")].nodePort}' get services rook-ceph-mgr-dashboard-external-http)" - BASE_URL="http://$IP_ADDR:$PORT" - echo "===========================" - echo "Ceph Dashboard: " - echo " IP_ADDRESS: $BASE_URL" - echo " PASSWORD: $DASHBOARD_PASSWORD" - echo "===========================" +enable_monitoring() { + echo "Enabling monitoring" + $KUBECTL apply -f https://raw.githubusercontent.com/coreos/prometheus-operator/v0.40.0/bundle.yaml + $KUBECTL wait --for=condition=ready pod -l app.kubernetes.io/name=prometheus-operator --timeout=90s + $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/rbac.yaml + $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/service-monitor.yaml + $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/exporter-service-monitor.yaml + $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/prometheus.yaml + $KUBECTL apply -f https://raw.githubusercontent.com/rook/rook/master/deploy/examples/monitoring/prometheus-service.yaml } #################################################################### @@ -124,12 +179,15 @@ show_info() { trap 'on_error $? $LINENO' ERR configure_libvirt +recreate_default_network setup_minikube_env build_ceph_image create_rook_cluster wait_for_rook_operator wait_for_ceph_cluster -show_info +enable_rook_orchestrator +enable_monitoring +sleep 30 # wait for the metrics cache warmup #################################################################### #################################################################### diff --git a/src/pybind/mgr/rook/ci/tests/features/cluster-prometheus-monitoring.feature b/src/pybind/mgr/rook/ci/tests/features/cluster-prometheus-monitoring.feature new file mode 100644 index 000000000..5180c7293 --- /dev/null +++ b/src/pybind/mgr/rook/ci/tests/features/cluster-prometheus-monitoring.feature @@ -0,0 +1,14 @@ +Feature: Testing Rook orchestrator commands + Ceph has been installed using the cluster CRD available in deploy/examples/cluster-test.yaml + + Scenario: Verify Prometheus metrics endpoint is working properly + Given I can get prometheus server configuration + Given the prometheus server is serving metrics + + Scenario: Verify some basic metrics are working properly + Given I can get prometheus server configuration + Given the prometheus server is serving metrics + Then the response contains the metric "ceph_osd_in" where "ceph_daemon" is "osd.0" and value equal to 1 + Then the response contains the metric "ceph_osd_in" where "ceph_daemon" is "osd.1" and value equal to 1 + Then the response contains the metric "ceph_osd_in" where "ceph_daemon" is "osd.2" and value equal to 1 + Then the response contains the metric "ceph_mon_quorum_status" where "ceph_daemon" is "mon.a" and value equal to 1 diff --git a/src/pybind/mgr/rook/ci/tests/features/rook.feature b/src/pybind/mgr/rook/ci/tests/features/rook.feature index ae0478f8b..acf733f55 100644 --- a/src/pybind/mgr/rook/ci/tests/features/rook.feature +++ b/src/pybind/mgr/rook/ci/tests/features/rook.feature @@ -1,8 +1,8 @@ Feature: Testing Rook orchestrator commands - Ceph has been installed using the cluster CRD available in deploy/examples/cluster-test.yaml and + Ceph has been installed using the cluster CRD available in deploy/examples/cluster-test.yaml Scenario: Verify ceph cluster health - When I run + When I run ceph command """ ceph health | grep HEALTH """ @@ -10,3 +10,58 @@ Feature: Testing Rook orchestrator commands """ HEALTH_OK """ + + Scenario: Verify rook orchestrator has been enabled correctly + When I run ceph command + """ + ceph mgr module ls | grep rook + """ + Then I get something like + """ + rook +on + """ + + Scenario: Verify rook orchestrator lists services correctly + When I run ceph command + """ + ceph orch ls + """ + Then I get something like + """ + NAME +PORTS +RUNNING +REFRESHED +AGE +PLACEMENT + crash +1/1 .+ + mgr +1/1 .+ + mon +1/1 .+ + osd +3 .+ + """ + + Scenario: Verify rook orchestrator lists daemons correctly + When I run ceph command + """ + ceph orch ps + """ + Then I get something like + """ + NAME +HOST +PORTS +STATUS +REFRESHED +AGE +MEM +USE +MEM +LIM +VERSION +IMAGE +ID + ceph-exporter.exporter +minikube +running .+ + crashcollector.crash +minikube +running .+ + mgr.a +minikube +running .+ + mon.a +minikube +running .+ + osd.0 +minikube +running .+ + osd.1 +minikube +running .+ + osd.2 +minikube +running .+ + """ + + Scenario: Verify rook orchestrator lists devices correctly + When I run ceph command + """ + ceph orch device ls + """ + Then I get something like + """ + HOST +PATH +TYPE +DEVICE +ID +SIZE +AVAILABLE +REFRESHED +REJECT +REASONS + minikube +/dev/vdb +unknown +None +10.0G .+ + minikube +/dev/vdc +unknown +None +20.0G .+ + minikube +/dev/vdd +unknown +None +20.0G .+ + minikube +/dev/vde +unknown +None +20.0G .+ + """ diff --git a/src/pybind/mgr/rook/ci/tests/features/steps/implementation.py b/src/pybind/mgr/rook/ci/tests/features/steps/implementation.py index adde61afd..59cb117c8 100644 --- a/src/pybind/mgr/rook/ci/tests/features/steps/implementation.py +++ b/src/pybind/mgr/rook/ci/tests/features/steps/implementation.py @@ -1,15 +1,35 @@ +import requests +from behave import given, when, then from behave import * from utils import * +import subprocess import re -@when("I run") +PROMETHEUS_SERVER_URL = None + +def get_prometheus_pod_host_ip(): + try: + command = "minikube --profile minikube kubectl -- -n rook-ceph -o jsonpath='{.status.hostIP}' get pod prometheus-rook-prometheus-0" + result = subprocess.run(command, shell=True, capture_output=True, text=True, check=True) + host_ip = result.stdout.strip() + return host_ip + except subprocess.CalledProcessError as e: + print(f"Error running command: {e}") + return None + +@when("I run ceph command") +def run_step(context): + context.output = run_ceph_commands(context.text) + +@when("I run k8s command") def run_step(context): - context.output = run_commands(context.text) + context.output = run_k8s_commands(context.text) @then("I get") def verify_result_step(context): - print(f"Output is:\n{context.output}\n--------------\n") - assert context.text == context.output + if (context.text != context.output): + display_side_by_side(context.text, context.output) + assert context.text == context.output, "" @then("I get something like") def verify_fuzzy_result_step(context): @@ -18,4 +38,83 @@ def verify_fuzzy_result_step(context): num_lines = min(len(output_lines), len(expected_lines)) for n in range(num_lines): if not re.match(expected_lines[n], output_lines[n]): - raise + display_side_by_side(expected_lines[n], output_lines[n]) + assert False, "" + +@given('I can get prometheus server configuration') +def step_get_prometheus_server_ip(context): + global PROMETHEUS_SERVER_URL + try: + PROMETHEUS_SERVER_URL = f"http://{get_prometheus_pod_host_ip()}:30900" + except requests.exceptions.RequestException as e: + print(f"Error connecting to Prometheus server: {e}") + assert False, f"Error connecting to Prometheus server: {e}" + +@given('the prometheus server is serving metrics') +def step_given_server_running(context): + try: + params = {'match[]': '{__name__!=""}'} + response = requests.get(f"{PROMETHEUS_SERVER_URL}/federate", params) + # Check if the response status code is successful (2xx) + response.raise_for_status() + # Store the response object in the context for later use + context.response = response + print(f"Prometheus server is running. Status code: {response.status_code}") + except requests.exceptions.RequestException as e: + print(f"Error connecting to Prometheus server: {e}") + assert False, f"Error connecting to Prometheus server: {e}" + +@when('I query the Prometheus metrics endpoint') +def step_when_query_metrics_endpoint(context): + params = {'match[]': '{__name__!=""}'} + context.response = requests.get(f"{PROMETHEUS_SERVER_URL}/federate", params) + context.response.raise_for_status() + +@then('the response contains the metric "{metric_name}"') +def step_then_check_metric_value(context, metric_name): + metric_value = parse_metric_value(context.response.text, metric_name) + assert metric_value is not None, f"Metric '{metric_name}' not found in the response" + +@then('the response contains the metric "{metric_name}" with value equal to {expected_value}') +def step_then_check_metric_value(context, metric_name, expected_value): + metric_value = parse_metric_value(context.response.text, metric_name) + assert metric_value is not None, f"Metric '{metric_name}' not found in the response" + assert metric_value == float(expected_value), f"Metric '{metric_name}' value {metric_value} is not equal to {expected_value}" + +@then('the response contains the metric "{metric_name}" with value greater than {expected_value}') +def step_then_check_metric_value(context, metric_name, expected_value): + metric_value = parse_metric_value(context.response.text, metric_name) + assert metric_value is not None, f"Metric '{metric_name}' not found in the response" + assert metric_value > float(expected_value), f"Metric '{metric_name}' value {metric_value} is not greater than {expected_value}" + +@then('the response contains the metric "{metric_name}" with value less than {expected_value}') +def step_then_check_metric_value(context, metric_name, expected_value): + metric_value = parse_metric_value(context.response.text, metric_name) + assert metric_value is not None, f"Metric '{metric_name}' not found in the response" + assert metric_value < float(expected_value), f"Metric '{metric_name}' value {metric_value} is not less than {expected_value}" + +@then('the response contains the metric "{metric_name}" with value in the range {min_value}-{max_value}') +def step_then_check_metric_value(context, metric_name, min_value, max_value): + metric_value = parse_metric_value(context.response.text, metric_name) + assert metric_value is not None, f"Metric '{metric_name}' not found in the response" + assert metric_value >= float(min_value) and metric_value <= float(max_value), f"Metric '{metric_name}' value {metric_value} is not in the range {min_value}-{max_value}" + +@then('the response contains the metric "{metric_name}" where "{filter_by_field}" is "{field_value}" and value equal to {expected_value}') +def step_then_check_metric_value(context, metric_name, expected_value, filter_by_field, field_value): + metric_value = parse_metric_value(context.response.text, metric_name, filter_by_field, field_value) + assert metric_value is not None, f"Metric '{metric_name}' not found in the response" + assert metric_value == float(expected_value), f"Metric '{metric_name}' value {metric_value} is not equal to {expected_value}" + + +def parse_metric_value(metrics_text, metric_name, filter_by_field=None, field_value=None): + filter_condition = f'{filter_by_field}="{field_value}"' if filter_by_field and field_value else '' + pattern_str = rf'^{metric_name}\{{[^}}]*{filter_condition}[^}}]*\}} (\d+) (\d+)' + pattern = re.compile(pattern_str, re.MULTILINE) + match = pattern.search(metrics_text) + if match: + # Extract the values and timestamp from the matched groups + metric_value, _ = match.groups() + return float(metric_value) + else: + # Metric not found + return None diff --git a/src/pybind/mgr/rook/ci/tests/features/steps/utils.py b/src/pybind/mgr/rook/ci/tests/features/steps/utils.py index 41a71d0fb..f711ec3fe 100644 --- a/src/pybind/mgr/rook/ci/tests/features/steps/utils.py +++ b/src/pybind/mgr/rook/ci/tests/features/steps/utils.py @@ -1,4 +1,5 @@ import subprocess +from difflib import unified_diff ROOK_CEPH_COMMAND = "minikube kubectl -- -n rook-ceph exec -it deploy/rook-ceph-tools -- " CLUSTER_COMMAND = "minikube kubectl -- " @@ -27,3 +28,25 @@ def run_commands(commands: str) -> str: output = execute_command(command) return output.strip("\n") + +def run_k8s_commands(commands: str) -> str: + commands_list = commands.split("\n") + output = "" + for cmd in commands_list: + command = CLUSTER_COMMAND + cmd + output = execute_command(command) + + return output.strip("\n") + +def run_ceph_commands(commands: str) -> str: + commands_list = commands.split("\n") + output = "" + for cmd in commands_list: + command = ROOK_CEPH_COMMAND + cmd + output = execute_command(command) + + return output.strip("\n") + +def display_side_by_side(expected, got): + diff = unified_diff(expected.splitlines(), got.splitlines(), lineterm='') + print('\n'.join(diff)) diff --git a/src/pybind/mgr/rook/module.py b/src/pybind/mgr/rook/module.py index fa75db2cf..34ed15bc6 100644 --- a/src/pybind/mgr/rook/module.py +++ b/src/pybind/mgr/rook/module.py @@ -82,12 +82,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): default='local', desc='storage class name for LSO-discovered PVs', ), - Option( - 'drive_group_interval', - type='float', - default=300.0, - desc='interval in seconds between re-application of applied drive_groups', - ), ] @staticmethod @@ -126,9 +120,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self.config_notify() if TYPE_CHECKING: self.storage_class = 'foo' - self.drive_group_interval = 10.0 - self._load_drive_groups() self._shutdown = threading.Event() def config_notify(self) -> None: @@ -144,7 +136,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self.log.debug(' mgr option %s = %s', opt['name'], getattr(self, opt['name'])) # type: ignore assert isinstance(self.storage_class, str) - assert isinstance(self.drive_group_interval, float) if self._rook_cluster: self._rook_cluster.storage_class_name = self.storage_class @@ -211,10 +202,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): self._initialized.set() self.config_notify() - while not self._shutdown.is_set(): - self._apply_drivegroups(list(self._drive_group_map.values())) - self._shutdown.wait(self.drive_group_interval) - @handle_orch_error def get_inventory(self, host_filter: Optional[orchestrator.InventoryFilter] = None, refresh: bool = False) -> List[orchestrator.InventoryHost]: host_list = None @@ -257,6 +244,26 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): image_name = cl['spec'].get('cephVersion', {}).get('image', None) num_nodes = len(self.rook_cluster.get_node_names()) + def sum_running_pods(service_type: str, service_name: Optional[str] = None) -> int: + all_pods = self.rook_cluster.describe_pods(None, None, None) + if service_name is None: + return sum(pod['phase'] == 'Running' for pod in all_pods if pod['labels']['app'] == f"rook-ceph-{service_type}") + else: + if service_type == 'mds': + key = 'rook_file_system' + elif service_type == 'rgw': + key = 'rook_object_store' + elif service_type == 'nfs': + key = 'ceph_nfs' + else: + self.log.error(f"Unknow service type {service_type}") + return 0 + + return sum(pod['phase'] == 'Running' \ + for pod in all_pods \ + if pod['labels']['app'] == f"rook-ceph-{service_type}" \ + and service_name == pod['labels'][key]) + spec = {} if service_type == 'mon' or service_type is None: spec['mon'] = orchestrator.ServiceDescription( @@ -269,6 +276,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=cl['spec'].get('mon', {}).get('count', 1), container_image_name=image_name, last_refresh=now, + running=sum_running_pods('mon') ) if service_type == 'mgr' or service_type is None: spec['mgr'] = orchestrator.ServiceDescription( @@ -279,6 +287,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=1, container_image_name=image_name, last_refresh=now, + running=sum_running_pods('mgr') ) if ( @@ -293,13 +302,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=num_nodes, container_image_name=image_name, last_refresh=now, + running=sum_running_pods('crashcollector') ) if service_type == 'mds' or service_type is None: # CephFilesystems all_fs = self.rook_cluster.get_resource("cephfilesystems") for fs in all_fs: - svc = 'mds.' + fs['metadata']['name'] + fs_name = fs['metadata']['name'] + svc = 'mds.' + fs_name if svc in spec: continue # FIXME: we are conflating active (+ standby) with count @@ -316,13 +327,15 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=total_mds, container_image_name=image_name, last_refresh=now, + running=sum_running_pods('mds', fs_name) ) if service_type == 'rgw' or service_type is None: # CephObjectstores all_zones = self.rook_cluster.get_resource("cephobjectstores") for zone in all_zones: - svc = 'rgw.' + zone['metadata']['name'] + zone_name = zone['metadata']['name'] + svc = 'rgw.' + zone_name if svc in spec: continue active = zone['spec']['gateway']['instances']; @@ -344,6 +357,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): size=active, container_image_name=image_name, last_refresh=now, + running=sum_running_pods('rgw', zone_name) ) if service_type == 'nfs' or service_type is None: @@ -368,7 +382,7 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ), size=active, last_refresh=now, - running=len([1 for pod in nfs_pods if pod['labels']['ceph_nfs'] == nfs_name]), + running=sum_running_pods('nfs', nfs_name), created=creation_timestamp.astimezone(tz=datetime.timezone.utc) ) if service_type == 'osd' or service_type is None: @@ -385,18 +399,9 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ), size=len(all_osds), last_refresh=now, - running=sum(osd.status.phase == 'Running' for osd in all_osds) + running=sum_running_pods('osd') ) - # drivegroups - for name, dg in self._drive_group_map.items(): - spec[f'osd.{name}'] = orchestrator.ServiceDescription( - spec=dg, - last_refresh=now, - size=0, - running=0, - ) - if service_type == 'rbd-mirror' or service_type is None: # rbd-mirrors all_mirrors = self.rook_cluster.get_resource("cephrbdmirrors") @@ -414,13 +419,13 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): ), size=1, last_refresh=now, + running=sum_running_pods('rbd-mirror', mirror_name) ) - + for dd in self._list_daemons(): if dd.service_name() not in spec: continue service = spec[dd.service_name()] - service.running += 1 if not service.container_image_id: service.container_image_id = dd.container_image_id if not service.container_image_name: @@ -451,11 +456,25 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): daemon_id: Optional[str] = None, host: Optional[str] = None, refresh: bool = False) -> List[orchestrator.DaemonDescription]: + + def _pod_to_servicename(pod: Dict[str, Any]) -> Optional[str]: + if 'ceph_daemon_type' not in pod['labels']: + return None + daemon_type = pod['labels']['ceph_daemon_type'] + if daemon_type in ['mds', 'rgw', 'nfs', 'rbd-mirror']: + if 'app.kubernetes.io/part-of' in pod['labels']: + service_name = f"{daemon_type}.{pod['labels']['app.kubernetes.io/part-of']}" + else: + service_name = f"{daemon_type}" + else: + service_name = f"{daemon_type}" + return service_name + pods = self.rook_cluster.describe_pods(daemon_type, daemon_id, host) - self.log.debug('pods %s' % pods) result = [] for p in pods: - sd = orchestrator.DaemonDescription() + pod_svc_name = _pod_to_servicename(p) + sd = orchestrator.DaemonDescription(service_name=pod_svc_name) sd.hostname = p['hostname'] # In Rook environments, the 'ceph-exporter' daemon is named 'exporter' whereas @@ -535,9 +554,6 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): elif service_type == 'rbd-mirror': return self.rook_cluster.rm_service('cephrbdmirrors', service_id) elif service_type == 'osd': - if service_id in self._drive_group_map: - del self._drive_group_map[service_id] - self._save_drive_groups() return f'Removed {service_name}' elif service_type == 'ingress': self.log.info("{0} service '{1}' does not exist".format('ingress', service_id)) @@ -593,135 +609,33 @@ class RookOrchestrator(MgrModule, orchestrator.Orchestrator): def remove_daemons(self, names: List[str]) -> List[str]: return self.rook_cluster.remove_pods(names) - def apply_drivegroups(self, specs: List[DriveGroupSpec]) -> OrchResult[List[str]]: - for drive_group in specs: - self._drive_group_map[str(drive_group.service_id)] = drive_group - self._save_drive_groups() - return OrchResult(self._apply_drivegroups(specs)) - - def _apply_drivegroups(self, ls: List[DriveGroupSpec]) -> List[str]: - all_hosts = raise_if_exception(self.get_hosts()) - result_list: List[str] = [] - for drive_group in ls: - matching_hosts = drive_group.placement.filter_matching_hosts( - lambda label=None, as_hostspec=None: all_hosts - ) + def add_host_label(self, host: str, label: str) -> OrchResult[str]: + return self.rook_cluster.add_host_label(host, label) + + def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]: + return self.rook_cluster.remove_host_label(host, label) - if not self.rook_cluster.node_exists(matching_hosts[0]): - raise RuntimeError("Node '{0}' is not in the Kubernetes " - "cluster".format(matching_hosts)) - - # Validate whether cluster CRD can accept individual OSD - # creations (i.e. not useAllDevices) - if not self.rook_cluster.can_create_osd(): - raise RuntimeError("Rook cluster configuration does not " - "support OSD creation.") - result_list.append(self.rook_cluster.add_osds(drive_group, matching_hosts)) - return result_list - - def _load_drive_groups(self) -> None: - stored_drive_group = self.get_store("drive_group_map") - self._drive_group_map: Dict[str, DriveGroupSpec] = {} - if stored_drive_group: - for name, dg in json.loads(stored_drive_group).items(): - try: - self._drive_group_map[name] = DriveGroupSpec.from_json(dg) - except ValueError as e: - self.log.error(f'Failed to load drive group {name} ({dg}): {e}') - - def _save_drive_groups(self) -> None: - json_drive_group_map = { - name: dg.to_json() for name, dg in self._drive_group_map.items() - } - self.set_store("drive_group_map", json.dumps(json_drive_group_map)) + @handle_orch_error + def create_osds(self, drive_group: DriveGroupSpec) -> str: + raise orchestrator.OrchestratorError('Creating OSDs is not supported by rook orchestrator. Please, use Rook operator.') + @handle_orch_error def remove_osds(self, osd_ids: List[str], replace: bool = False, force: bool = False, zap: bool = False, - no_destroy: bool = False) -> OrchResult[str]: - assert self._rook_cluster is not None - if zap: - raise RuntimeError("Rook does not support zapping devices during OSD removal.") - res = self._rook_cluster.remove_osds(osd_ids, replace, force, self.mon_command) - return OrchResult(res) + no_destroy: bool = False) -> str: + raise orchestrator.OrchestratorError('Removing OSDs is not supported by rook orchestrator. Please, use Rook operator.') - def add_host_label(self, host: str, label: str) -> OrchResult[str]: - return self.rook_cluster.add_host_label(host, label) - - def remove_host_label(self, host: str, label: str, force: bool = False) -> OrchResult[str]: - return self.rook_cluster.remove_host_label(host, label) - """ @handle_orch_error - def create_osds(self, drive_group): - # type: (DriveGroupSpec) -> str - # Creates OSDs from a drive group specification. - - # $: ceph orch osd create -i - - # The drivegroup file must only contain one spec at a time. - # - - targets = [] # type: List[str] - if drive_group.data_devices and drive_group.data_devices.paths: - targets += [d.path for d in drive_group.data_devices.paths] - if drive_group.data_directories: - targets += drive_group.data_directories - - all_hosts = raise_if_exception(self.get_hosts()) - - matching_hosts = drive_group.placement.filter_matching_hosts(lambda label=None, as_hostspec=None: all_hosts) - - assert len(matching_hosts) == 1 - - if not self.rook_cluster.node_exists(matching_hosts[0]): - raise RuntimeError("Node '{0}' is not in the Kubernetes " - "cluster".format(matching_hosts)) - - # Validate whether cluster CRD can accept individual OSD - # creations (i.e. not useAllDevices) - if not self.rook_cluster.can_create_osd(): - raise RuntimeError("Rook cluster configuration does not " - "support OSD creation.") - - return self.rook_cluster.add_osds(drive_group, matching_hosts) - - # TODO: this was the code to update the progress reference: - - @handle_orch_error - def has_osds(matching_hosts: List[str]) -> bool: - - # Find OSD pods on this host - pod_osd_ids = set() - pods = self.k8s.list_namespaced_pod(self._rook_env.namespace, - label_selector="rook_cluster={},app=rook-ceph-osd".format(self._rook_env.cluster_name), - field_selector="spec.nodeName={0}".format( - matching_hosts[0] - )).items - for p in pods: - pod_osd_ids.add(int(p.metadata.labels['ceph-osd-id'])) - - self.log.debug('pod_osd_ids={0}'.format(pod_osd_ids)) - - found = [] - osdmap = self.get("osd_map") - for osd in osdmap['osds']: - osd_id = osd['osd'] - if osd_id not in pod_osd_ids: - continue - - metadata = self.get_metadata('osd', "%s" % osd_id) - if metadata and metadata['devices'] in targets: - found.append(osd_id) - else: - self.log.info("ignoring osd {0} {1}".format( - osd_id, metadata['devices'] if metadata else 'DNE' - )) + def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: + return self.rook_cluster.blink_light(ident_fault, on, locs) - return found is not None - """ + @handle_orch_error + def upgrade_status(self) -> orchestrator.UpgradeStatusSpec: + return orchestrator.UpgradeStatusSpec() @handle_orch_error - def blink_device_light(self, ident_fault: str, on: bool, locs: List[orchestrator.DeviceLightLoc]) -> List[str]: - return self.rook_cluster.blink_light(ident_fault, on, locs) + def upgrade_ls(self, image: Optional[str], tags: bool, show_all_versions: Optional[bool]) -> Dict[Any, Any]: + return {} diff --git a/src/pybind/mgr/rook/rook_cluster.py b/src/pybind/mgr/rook/rook_cluster.py index 5c7c9fc04..16d498a70 100644 --- a/src/pybind/mgr/rook/rook_cluster.py +++ b/src/pybind/mgr/rook/rook_cluster.py @@ -24,9 +24,20 @@ from urllib3.exceptions import ProtocolError from ceph.deployment.inventory import Device from ceph.deployment.drive_group import DriveGroupSpec -from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, PlacementSpec, HostPlacementSpec +from ceph.deployment.service_spec import ( + ServiceSpec, + NFSServiceSpec, + RGWSpec, + PlacementSpec, + HostPlacementSpec, + HostPattern, +) from ceph.utils import datetime_now -from ceph.deployment.drive_selection.matchers import SizeMatcher +from ceph.deployment.drive_selection.matchers import ( + AllMatcher, + Matcher, + SizeMatcher, +) from nfs.cluster import create_ganesha_pool from nfs.module import Module from nfs.export import NFSRados @@ -372,324 +383,6 @@ class KubernetesCustomResource(KubernetesResource): "{} doesn't contain a metadata.name. Unable to track changes".format( self.api_func)) -class DefaultCreator(): - def __init__(self, inventory: 'Dict[str, List[Device]]', coreV1_api: 'client.CoreV1Api', storage_class_name: 'str'): - self.coreV1_api = coreV1_api - self.storage_class_name = storage_class_name - self.inventory = inventory - - def device_to_device_set(self, drive_group: DriveGroupSpec, d: Device) -> ccl.StorageClassDeviceSetsItem: - device_set = ccl.StorageClassDeviceSetsItem( - name=d.sys_api['pv_name'], - volumeClaimTemplates= ccl.VolumeClaimTemplatesList(), - count=1, - encrypted=drive_group.encrypted, - portable=False - ) - device_set.volumeClaimTemplates.append( - ccl.VolumeClaimTemplatesItem( - metadata=ccl.Metadata( - name="data" - ), - spec=ccl.Spec( - storageClassName=self.storage_class_name, - volumeMode="Block", - accessModes=ccl.CrdObjectList(["ReadWriteOnce"]), - resources={ - "requests":{ - "storage": 1 - } - }, - volumeName=d.sys_api['pv_name'] - ) - ) - ) - return device_set - - def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: - device_list = [] - assert drive_group.data_devices is not None - sizematcher: Optional[SizeMatcher] = None - if drive_group.data_devices.size: - sizematcher = SizeMatcher('size', drive_group.data_devices.size) - limit = getattr(drive_group.data_devices, 'limit', None) - count = 0 - all = getattr(drive_group.data_devices, 'all', None) - paths = [device.path for device in drive_group.data_devices.paths] - osd_list = [] - for pod in rook_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and 'ceph.rook.io/DeviceSet' in pod.metadata.labels - ): - osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) - for _, node in self.inventory.items(): - for device in node: - if device.sys_api['pv_name'] in osd_list: - count += 1 - for _, node in self.inventory.items(): - for device in node: - if not limit or (count < limit): - if device.available: - if ( - all - or ( - device.sys_api['node'] in matching_hosts - and ((sizematcher != None) or sizematcher.compare(device)) - and ( - not drive_group.data_devices.paths - or (device.path in paths) - ) - ) - ): - device_list.append(device) - count += 1 - - return device_list - - def add_osds(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> Any: - to_create = self.filter_devices(rook_pods, drive_group,matching_hosts) - assert drive_group.data_devices is not None - def _add_osds(current_cluster, new_cluster): - # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster - if not hasattr(new_cluster.spec, 'storage') or not new_cluster.spec.storage: - new_cluster.spec.storage = ccl.Storage() - - if not hasattr(new_cluster.spec.storage, 'storageClassDeviceSets') or not new_cluster.spec.storage.storageClassDeviceSets: - new_cluster.spec.storage.storageClassDeviceSets = ccl.StorageClassDeviceSetsList() - - existing_scds = [ - scds.name for scds in new_cluster.spec.storage.storageClassDeviceSets - ] - for device in to_create: - new_scds = self.device_to_device_set(drive_group, device) - if new_scds.name not in existing_scds: - new_cluster.spec.storage.storageClassDeviceSets.append(new_scds) - return new_cluster - return _add_osds - -class LSOCreator(DefaultCreator): - def filter_devices(self, rook_pods: KubernetesResource, drive_group: DriveGroupSpec, matching_hosts: List[str]) -> List[Device]: - device_list = [] - assert drive_group.data_devices is not None - sizematcher = None - if drive_group.data_devices.size: - sizematcher = SizeMatcher('size', drive_group.data_devices.size) - limit = getattr(drive_group.data_devices, 'limit', None) - all = getattr(drive_group.data_devices, 'all', None) - paths = [device.path for device in drive_group.data_devices.paths] - vendor = getattr(drive_group.data_devices, 'vendor', None) - model = getattr(drive_group.data_devices, 'model', None) - count = 0 - osd_list = [] - for pod in rook_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and 'ceph.rook.io/DeviceSet' in pod.metadata.labels - ): - osd_list.append(pod.metadata.labels['ceph.rook.io/DeviceSet']) - for _, node in self.inventory.items(): - for device in node: - if device.sys_api['pv_name'] in osd_list: - count += 1 - for _, node in self.inventory.items(): - for device in node: - if not limit or (count < limit): - if device.available: - if ( - all - or ( - device.sys_api['node'] in matching_hosts - and ((sizematcher != None) or sizematcher.compare(device)) - and ( - not drive_group.data_devices.paths - or device.path in paths - ) - and ( - not vendor - or device.sys_api['vendor'] == vendor - ) - and ( - not model - or device.sys_api['model'].startsWith(model) - ) - ) - ): - device_list.append(device) - count += 1 - return device_list - -class DefaultRemover(): - def __init__( - self, - coreV1_api: 'client.CoreV1Api', - batchV1_api: 'client.BatchV1Api', - appsV1_api: 'client.AppsV1Api', - osd_ids: List[str], - replace_flag: bool, - force_flag: bool, - mon_command: Callable, - patch: Callable, - rook_env: 'RookEnv', - inventory: Dict[str, List[Device]] - ): - self.batchV1_api = batchV1_api - self.appsV1_api = appsV1_api - self.coreV1_api = coreV1_api - - self.osd_ids = osd_ids - self.replace_flag = replace_flag - self.force_flag = force_flag - - self.mon_command = mon_command - - self.patch = patch - self.rook_env = rook_env - - self.inventory = inventory - self.osd_pods: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_pod, - namespace=self.rook_env.namespace, - label_selector='app=rook-ceph-osd') - self.jobs: KubernetesResource = KubernetesResource(self.batchV1_api.list_namespaced_job, - namespace=self.rook_env.namespace, - label_selector='app=rook-ceph-osd-prepare') - self.pvcs: KubernetesResource = KubernetesResource(self.coreV1_api.list_namespaced_persistent_volume_claim, - namespace=self.rook_env.namespace) - - - def remove_device_sets(self) -> str: - self.to_remove: Dict[str, int] = {} - self.pvc_to_remove: List[str] = [] - for pod in self.osd_pods.items: - if ( - hasattr(pod, 'metadata') - and hasattr(pod.metadata, 'labels') - and 'osd' in pod.metadata.labels - and pod.metadata.labels['osd'] in self.osd_ids - ): - if pod.metadata.labels['ceph.rook.io/DeviceSet'] in self.to_remove: - self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] + 1 - else: - self.to_remove[pod.metadata.labels['ceph.rook.io/DeviceSet']] = 1 - self.pvc_to_remove.append(pod.metadata.labels['ceph.rook.io/pvc']) - def _remove_osds(current_cluster, new_cluster): - # type: (ccl.CephCluster, ccl.CephCluster) -> ccl.CephCluster - assert new_cluster.spec.storage is not None and new_cluster.spec.storage.storageClassDeviceSets is not None - for _set in new_cluster.spec.storage.storageClassDeviceSets: - if _set.name in self.to_remove: - if _set.count == self.to_remove[_set.name]: - new_cluster.spec.storage.storageClassDeviceSets.remove(_set) - else: - _set.count = _set.count - self.to_remove[_set.name] - return new_cluster - return self.patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _remove_osds) - - def check_force(self) -> None: - if not self.force_flag: - safe_args = {'prefix': 'osd safe-to-destroy', - 'ids': [str(x) for x in self.osd_ids]} - ret, out, err = self.mon_command(safe_args) - if ret != 0: - raise RuntimeError(err) - - def set_osds_down(self) -> None: - down_flag_args = { - 'prefix': 'osd down', - 'ids': [str(x) for x in self.osd_ids] - } - ret, out, err = self.mon_command(down_flag_args) - if ret != 0: - raise RuntimeError(err) - - def scale_deployments(self) -> None: - for osd_id in self.osd_ids: - self.appsV1_api.patch_namespaced_deployment_scale(namespace=self.rook_env.namespace, - name='rook-ceph-osd-{}'.format(osd_id), - body=client.V1Scale(spec=client.V1ScaleSpec(replicas=0))) - - def set_osds_out(self) -> None: - out_flag_args = { - 'prefix': 'osd out', - 'ids': [str(x) for x in self.osd_ids] - } - ret, out, err = self.mon_command(out_flag_args) - if ret != 0: - raise RuntimeError(err) - - def delete_deployments(self) -> None: - for osd_id in self.osd_ids: - self.appsV1_api.delete_namespaced_deployment(namespace=self.rook_env.namespace, - name='rook-ceph-osd-{}'.format(osd_id), - propagation_policy='Foreground') - - def clean_up_prepare_jobs_and_pvc(self) -> None: - for job in self.jobs.items: - if job.metadata.labels['ceph.rook.io/pvc'] in self.pvc_to_remove: - self.batchV1_api.delete_namespaced_job(name=job.metadata.name, namespace=self.rook_env.namespace, - propagation_policy='Foreground') - self.coreV1_api.delete_namespaced_persistent_volume_claim(name=job.metadata.labels['ceph.rook.io/pvc'], - namespace=self.rook_env.namespace, - propagation_policy='Foreground') - - def purge_osds(self) -> None: - for id in self.osd_ids: - purge_args = { - 'prefix': 'osd purge-actual', - 'id': int(id), - 'yes_i_really_mean_it': True - } - ret, out, err = self.mon_command(purge_args) - if ret != 0: - raise RuntimeError(err) - - def destroy_osds(self) -> None: - for id in self.osd_ids: - destroy_args = { - 'prefix': 'osd destroy-actual', - 'id': int(id), - 'yes_i_really_mean_it': True - } - ret, out, err = self.mon_command(destroy_args) - if ret != 0: - raise RuntimeError(err) - - def remove(self) -> str: - try: - self.check_force() - except Exception as e: - log.exception("Error checking if OSDs are safe to destroy") - return f"OSDs not safe to destroy or unable to check if they are safe to destroy: {e}" - try: - remove_result = self.remove_device_sets() - except Exception as e: - log.exception("Error patching ceph cluster CRD") - return f"Not possible to modify Ceph cluster CRD: {e}" - try: - self.scale_deployments() - self.delete_deployments() - self.clean_up_prepare_jobs_and_pvc() - except Exception as e: - log.exception("Ceph cluster CRD patched, but error cleaning environment") - return f"Error cleaning environment after removing OSDs from Ceph cluster CRD: {e}" - try: - self.set_osds_down() - self.set_osds_out() - if self.replace_flag: - self.destroy_osds() - else: - self.purge_osds() - except Exception as e: - log.exception("OSDs removed from environment, but not able to remove OSDs from Ceph cluster") - return f"Error removing OSDs from Ceph cluster: {e}" - - return remove_result - - - class RookCluster(object): # import of client.CoreV1Api must be optional at import time. # Instead allow mgr/rook to be imported anyway. @@ -794,7 +487,12 @@ class RookCluster(object): else: fetcher = DefaultFetcher(sc.metadata.name, self.coreV1_api, self.rook_env) fetcher.fetch() - discovered_devices.update(fetcher.devices()) + nodename_to_devices = fetcher.devices() + for node, devices in nodename_to_devices.items(): + if node in discovered_devices: + discovered_devices[node].extend(devices) + else: + discovered_devices[node] = devices return discovered_devices @@ -1096,7 +794,6 @@ class RookCluster(object): name=spec.rgw_zone ) return object_store - def _update_zone(new: cos.CephObjectStore) -> cos.CephObjectStore: if new.spec.gateway: @@ -1188,48 +885,11 @@ class RookCluster(object): return new return self._patch(ccl.CephCluster, 'cephclusters', self.rook_env.cluster_name, _update_mon_count) - def add_osds(self, drive_group, matching_hosts): - # type: (DriveGroupSpec, List[str]) -> str - assert drive_group.objectstore in ("bluestore", "filestore") - assert drive_group.service_id - storage_class = self.get_storage_class() - inventory = self.get_discovered_devices() - creator: Optional[DefaultCreator] = None - if ( - storage_class.metadata.labels - and 'local.storage.openshift.io/owner-name' in storage_class.metadata.labels - ): - creator = LSOCreator(inventory, self.coreV1_api, self.storage_class_name) - else: - creator = DefaultCreator(inventory, self.coreV1_api, self.storage_class_name) - return self._patch( - ccl.CephCluster, - 'cephclusters', - self.rook_env.cluster_name, - creator.add_osds(self.rook_pods, drive_group, matching_hosts) - ) - - def remove_osds(self, osd_ids: List[str], replace: bool, force: bool, mon_command: Callable) -> str: - inventory = self.get_discovered_devices() - self.remover = DefaultRemover( - self.coreV1_api, - self.batchV1_api, - self.appsV1_api, - osd_ids, - replace, - force, - mon_command, - self._patch, - self.rook_env, - inventory - ) - return self.remover.remove() - def get_hosts(self) -> List[orchestrator.HostSpec]: ret = [] for node in self.nodes.items: spec = orchestrator.HostSpec( - node.metadata.name, + node.metadata.name, addr='/'.join([addr.address for addr in node.status.addresses]), labels=[label.split('/')[1] for label in node.metadata.labels if label.startswith('ceph-label')], ) @@ -1585,7 +1245,7 @@ def node_selector_to_placement_spec(node_selector: ccl.NodeSelectorTermsItem) -> res.label = expression.key.split('/')[1] elif expression.key == "kubernetes.io/hostname": if expression.operator == "Exists": - res.host_pattern = "*" + res.host_pattern = HostPattern("*") elif expression.operator == "In": res.hosts = [HostPlacementSpec(hostname=value, network='', name='')for value in expression.values] return res -- cgit v1.2.3