diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /qa/workunits/rgw/test-keystone-service-token.py | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rwxr-xr-x | qa/workunits/rgw/test-keystone-service-token.py | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/qa/workunits/rgw/test-keystone-service-token.py b/qa/workunits/rgw/test-keystone-service-token.py new file mode 100755 index 000000000..2c7f21e93 --- /dev/null +++ b/qa/workunits/rgw/test-keystone-service-token.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python3 +# +# Copyright (C) 2022 Binero +# +# Author: Tobias Urdin <tobias.urdin@binero.com> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU Library Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Library Public License for more details. + +import sys +import requests +import time + + +# b4221c214dd64ee6a464g2153fae3813 is ID of deadbeef project +SWIFT_URL = 'http://localhost:8000/swift/v1/AUTH_b4221c214dd64ee6a464g2153fae3813' +KEYSTONE_URL = 'http://localhost:5000' + + +def get_stats(): + stats_url = '%s/stats' % KEYSTONE_URL + return requests.get(stats_url) + + +def test_list_containers(): + # Loop five list container requests with same token + for i in range(0, 5): + r = requests.get(SWIFT_URL, headers={'X-Auth-Token': 'user-token-1'}) + if r.status_code != 204: + print('FAILED, status code is %d not 204' % r.status_code) + sys.exit(1) + + # Get stats from fake Keystone server + r = get_stats() + if r.status_code != 200: + print('FAILED, status code is %d not 200' % r.status_code) + sys.exit(1) + stats = r.json() + + # Verify admin token was cached + if stats['post_total'] != 1: + print('FAILED, post_total stat is %d not 1' % stats['post_total']) + sys.exit(1) + + # Verify user token was cached + if stats['get_total'] != 1: + print('FAILED, get_total stat is %d not 1' % stats['get_total']) + sys.exit(1) + + print('Wait for cache to be invalid') + time.sleep(11) + + r = requests.get(SWIFT_URL, headers={'X-Auth-Token': 'user-token-1'}) + if r.status_code != 204: + print('FAILED, status code is %d not 204' % r.status_code) + sys.exit(1) + + # Get stats from fake Keystone server + r = get_stats() + if r.status_code != 200: + print('FAILED, status code is %d not 200' % r.status_code) + sys.exit(1) + stats = r.json() + + if stats['post_total'] != 2: + print('FAILED, post_total stat is %d not 2' % stats['post_total']) + sys.exit(1) + + if stats['get_total'] != 2: + print('FAILED, get_total stat is %d not 2' % stats['get_total']) + sys.exit(1) + + +def test_expired_token(): + # Try listing containers with an expired token + for i in range(0, 3): + r = requests.get(SWIFT_URL, headers={'X-Auth-Token': 'user-token-2'}) + if r.status_code != 401: + print('FAILED, status code is %d not 401' % r.status_code) + sys.exit(1) + + # Get stats from fake Keystone server + r = get_stats() + if r.status_code != 200: + print('FAILED, status code is %d not 200' % r.status_code) + sys.exit(1) + stats = r.json() + + # Verify admin token was cached + if stats['post_total'] != 2: + print('FAILED, post_total stat is %d not 2' % stats['post_total']) + sys.exit(1) + + # Verify we got to fake Keystone server since expired tokens is not cached + if stats['get_total'] != 5: + print('FAILED, get_total stat is %d not 5' % stats['get_total']) + sys.exit(1) + + +def test_expired_token_with_service_token(): + # Try listing containers with an expired token but with a service token + for i in range(0, 3): + r = requests.get(SWIFT_URL, headers={'X-Auth-Token': 'user-token-2', 'X-Service-Token': 'admin-token-1'}) + if r.status_code != 204: + print('FAILED, status code is %d not 204' % r.status_code) + sys.exit(1) + + # Get stats from fake Keystone server + r = get_stats() + if r.status_code != 200: + print('FAILED, status code is %d not 200' % r.status_code) + sys.exit(1) + stats = r.json() + + # Verify admin token was cached + if stats['post_total'] != 2: + print('FAILED, post_total stat is %d not 2' % stats['post_total']) + sys.exit(1) + + # Verify we got to fake Keystone server since expired tokens is not cached + if stats['get_total'] != 7: + print('FAILED, get_total stat is %d not 7' % stats['get_total']) + sys.exit(1) + + print('Wait for cache to be invalid') + time.sleep(11) + + r = requests.get(SWIFT_URL, headers={'X-Auth-Token': 'user-token-2', 'X-Service-Token': 'admin-token-1'}) + if r.status_code != 204: + print('FAILED, status code is %d not 204' % r.status_code) + sys.exit(1) + + # Get stats from fake Keystone server + r = get_stats() + if r.status_code != 200: + print('FAILED, status code is %d not 200' % r.status_code) + sys.exit(1) + stats = r.json() + + if stats['post_total'] != 3: + print('FAILED, post_total stat is %d not 3' % stats['post_total']) + sys.exit(1) + + if stats['get_total'] != 9: + print('FAILED, get_total stat is %d not 9' % stats['get_total']) + sys.exit(1) + + +def test_expired_token_with_invalid_service_token(): + print('Wait for cache to be invalid') + time.sleep(11) + + # Test with a token that doesn't have allowed role as service token + for i in range(0, 3): + r = requests.get(SWIFT_URL, headers={'X-Auth-Token': 'user-token-2', 'X-Service-Token': 'user-token-1'}) + if r.status_code != 401: + print('FAILED, status code is %d not 401' % r.status_code) + sys.exit(1) + + # Make sure we get user-token-1 cached + r = requests.get(SWIFT_URL, headers={'X-Auth-Token': 'user-token-1'}) + if r.status_code != 204: + print('FAILED, status code is %d not 204' % r.status_code) + sys.exit(1) + + # Test that a cached token (that is invalid as service token) cannot be used as service token + for i in range(0, 3): + r = requests.get(SWIFT_URL, headers={'X-Auth-Token': 'user-token-2', 'X-Service-Token': 'user-token-1'}) + if r.status_code != 401: + print('FAILED, status code is %d not 401' % r.status_code) + sys.exit(1) + + +def main(): + test_list_containers() + test_expired_token() + test_expired_token_with_service_token() + test_expired_token_with_invalid_service_token() + + +if __name__ == '__main__': + main() |