diff options
Diffstat (limited to 'src/pybind/mgr/dashboard/controllers/docs.py')
-rw-r--r-- | src/pybind/mgr/dashboard/controllers/docs.py | 429 |
1 files changed, 429 insertions, 0 deletions
diff --git a/src/pybind/mgr/dashboard/controllers/docs.py b/src/pybind/mgr/dashboard/controllers/docs.py new file mode 100644 index 000000000..ecbe45979 --- /dev/null +++ b/src/pybind/mgr/dashboard/controllers/docs.py @@ -0,0 +1,429 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import + +import logging +from typing import Any, Dict, List, Optional, Union + +import cherrypy + +from .. import mgr +from ..api.doc import Schema, SchemaInput, SchemaType +from . import ENDPOINT_MAP, BaseController, Endpoint, Router +from ._version import APIVersion + +NO_DESCRIPTION_AVAILABLE = "*No description available*" + +logger = logging.getLogger('controllers.docs') + + +@Router('/docs', secure=False) +class Docs(BaseController): + + @classmethod + def _gen_tags(cls, all_endpoints): + """ Generates a list of all tags and corresponding descriptions. """ + # Scenarios to consider: + # * Intentionally make up a new tag name at controller => New tag name displayed. + # * Misspell or make up a new tag name at endpoint => Neither tag or endpoint displayed. + # * Misspell tag name at controller (when referring to another controller) => + # Tag displayed but no endpoints assigned + # * Description for a tag added at multiple locations => Only one description displayed. + list_of_ctrl = set() + for endpoints in ENDPOINT_MAP.values(): + for endpoint in endpoints: + if endpoint.is_api or all_endpoints: + list_of_ctrl.add(endpoint.ctrl) + + tag_map: Dict[str, str] = {} + for ctrl in sorted(list_of_ctrl, key=lambda ctrl: ctrl.__name__): + tag_name = ctrl.__name__ + tag_descr = "" + if hasattr(ctrl, 'doc_info'): + if ctrl.doc_info['tag']: + tag_name = ctrl.doc_info['tag'] + tag_descr = ctrl.doc_info['tag_descr'] + if tag_name not in tag_map or not tag_map[tag_name]: + tag_map[tag_name] = tag_descr + + tags = [{'name': k, 'description': v if v else NO_DESCRIPTION_AVAILABLE} + for k, v in tag_map.items()] + tags.sort(key=lambda e: e['name']) + return tags + + @classmethod + def _get_tag(cls, endpoint): + """ Returns the name of a tag to assign to a path. """ + ctrl = endpoint.ctrl + func = endpoint.func + tag = ctrl.__name__ + if hasattr(func, 'doc_info') and func.doc_info['tag']: + tag = func.doc_info['tag'] + elif hasattr(ctrl, 'doc_info') and ctrl.doc_info['tag']: + tag = ctrl.doc_info['tag'] + return tag + + @classmethod + def _gen_type(cls, param): + # pylint: disable=too-many-return-statements + """ + Generates the type of parameter based on its name and default value, + using very simple heuristics. + Used if type is not explicitly defined. + """ + param_name = param['name'] + def_value = param['default'] if 'default' in param else None + if param_name.startswith("is_"): + return str(SchemaType.BOOLEAN) + if "size" in param_name: + return str(SchemaType.INTEGER) + if "count" in param_name: + return str(SchemaType.INTEGER) + if "num" in param_name: + return str(SchemaType.INTEGER) + if isinstance(def_value, bool): + return str(SchemaType.BOOLEAN) + if isinstance(def_value, int): + return str(SchemaType.INTEGER) + return str(SchemaType.STRING) + + @classmethod + # isinstance doesn't work: input is always <type 'type'>. + def _type_to_str(cls, type_as_type): + """ Used if type is explicitly defined. """ + if type_as_type is str: + type_as_str = str(SchemaType.STRING) + elif type_as_type is int: + type_as_str = str(SchemaType.INTEGER) + elif type_as_type is bool: + type_as_str = str(SchemaType.BOOLEAN) + elif type_as_type is list or type_as_type is tuple: + type_as_str = str(SchemaType.ARRAY) + elif type_as_type is float: + type_as_str = str(SchemaType.NUMBER) + else: + type_as_str = str(SchemaType.OBJECT) + return type_as_str + + @classmethod + def _add_param_info(cls, parameters, p_info): + # Cases to consider: + # * Parameter name (if not nested) misspelt in decorator => parameter not displayed + # * Sometimes a parameter is used for several endpoints (e.g. fs_id in CephFS). + # Currently, there is no possibility of reuse. Should there be? + # But what if there are two parameters with same name but different functionality? + """ + Adds explicitly described information for parameters of an endpoint. + + There are two cases: + * Either the parameter in p_info corresponds to an endpoint parameter. Implicit information + has higher priority, so only information that doesn't already exist is added. + * Or the parameter in p_info describes a nested parameter inside an endpoint parameter. + In that case there is no implicit information at all so all explicitly described info needs + to be added. + """ + for p in p_info: + if not p['nested']: + for parameter in parameters: + if p['name'] == parameter['name']: + parameter['type'] = p['type'] + parameter['description'] = p['description'] + if 'nested_params' in p: + parameter['nested_params'] = cls._add_param_info([], p['nested_params']) + else: + nested_p = { + 'name': p['name'], + 'type': p['type'], + 'description': p['description'], + 'required': p['required'], + } + if 'default' in p: + nested_p['default'] = p['default'] + if 'nested_params' in p: + nested_p['nested_params'] = cls._add_param_info([], p['nested_params']) + parameters.append(nested_p) + + return parameters + + @classmethod + def _gen_schema_for_content(cls, params: List[Any]) -> Dict[str, Any]: + """ + Generates information to the content-object in OpenAPI Spec. + Used to for request body and responses. + """ + required_params = [] + properties = {} + schema_type = SchemaType.OBJECT + if isinstance(params, SchemaInput): + schema_type = params.type + params = params.params + + for param in params: + if param['required']: + required_params.append(param['name']) + + props = {} + if 'type' in param: + props['type'] = cls._type_to_str(param['type']) + if 'nested_params' in param: + if props['type'] == str(SchemaType.ARRAY): # dict in array + props['items'] = cls._gen_schema_for_content(param['nested_params']) + else: # dict in dict + props = cls._gen_schema_for_content(param['nested_params']) + elif props['type'] == str(SchemaType.OBJECT): # e.g. [int] + props['type'] = str(SchemaType.ARRAY) + props['items'] = {'type': cls._type_to_str(param['type'][0])} + else: + props['type'] = cls._gen_type(param) + if 'description' in param: + props['description'] = param['description'] + if 'default' in param: + props['default'] = param['default'] + properties[param['name']] = props + + schema = Schema(schema_type=schema_type, properties=properties, + required=required_params) + + return schema.as_dict() + + @classmethod + def _gen_responses(cls, method, resp_object=None, + version: Optional[APIVersion] = None): + resp: Dict[str, Dict[str, Union[str, Any]]] = { + '400': { + "description": "Operation exception. Please check the " + "response body for details." + }, + '401': { + "description": "Unauthenticated access. Please login first." + }, + '403': { + "description": "Unauthorized access. Please check your " + "permissions." + }, + '500': { + "description": "Unexpected error. Please check the " + "response body for the stack trace." + } + } + + if not version: + version = APIVersion.DEFAULT + + if method.lower() == 'get': + resp['200'] = {'description': "OK", + 'content': {version.to_mime_type(): + {'type': 'object'}}} + if method.lower() == 'post': + resp['201'] = {'description': "Resource created.", + 'content': {version.to_mime_type(): + {'type': 'object'}}} + if method.lower() == 'put': + resp['200'] = {'description': "Resource updated.", + 'content': {version.to_mime_type(): + {'type': 'object'}}} + if method.lower() == 'delete': + resp['204'] = {'description': "Resource deleted.", + 'content': {version.to_mime_type(): + {'type': 'object'}}} + if method.lower() in ['post', 'put', 'delete']: + resp['202'] = {'description': "Operation is still executing." + " Please check the task queue.", + 'content': {version.to_mime_type(): + {'type': 'object'}}} + + if resp_object: + for status_code, response_body in resp_object.items(): + if status_code in resp: + resp[status_code].update( + {'content': + {version.to_mime_type(): + {'schema': cls._gen_schema_for_content(response_body)} + }}) + + return resp + + @classmethod + def _gen_params(cls, params, location): + parameters = [] + for param in params: + if 'type' in param: + _type = cls._type_to_str(param['type']) + else: + _type = cls._gen_type(param) + res = { + 'name': param['name'], + 'in': location, + 'schema': { + 'type': _type + }, + } + if param.get('description'): + res['description'] = param['description'] + if param['required']: + res['required'] = True + elif param['default'] is None: + res['allowEmptyValue'] = True + else: + res['default'] = param['default'] + parameters.append(res) + + return parameters + + @classmethod + def gen_paths(cls, all_endpoints): + # pylint: disable=R0912 + method_order = ['get', 'post', 'put', 'delete'] + paths = {} + for path, endpoints in sorted(list(ENDPOINT_MAP.items()), + key=lambda p: p[0]): + methods = {} + skip = False + + endpoint_list = sorted(endpoints, key=lambda e: + method_order.index(e.method.lower())) + for endpoint in endpoint_list: + if not endpoint.is_api and not all_endpoints: + skip = True + break + + method = endpoint.method + func = endpoint.func + + summary = '' + version = None + resp = {} + p_info = [] + + if hasattr(func, '__method_map_method__'): + version = func.__method_map_method__['version'] + + elif hasattr(func, '__resource_method__'): + version = func.__resource_method__['version'] + + elif hasattr(func, '__collection_method__'): + version = func.__collection_method__['version'] + + if hasattr(func, 'doc_info'): + if func.doc_info['summary']: + summary = func.doc_info['summary'] + resp = func.doc_info['response'] + p_info = func.doc_info['parameters'] + params = [] + if endpoint.path_params: + params.extend( + cls._gen_params( + cls._add_param_info(endpoint.path_params, p_info), 'path')) + if endpoint.query_params: + params.extend( + cls._gen_params( + cls._add_param_info(endpoint.query_params, p_info), 'query')) + + methods[method.lower()] = { + 'tags': [cls._get_tag(endpoint)], + 'description': func.__doc__, + 'parameters': params, + 'responses': cls._gen_responses(method, resp, version) + } + if summary: + methods[method.lower()]['summary'] = summary + + if method.lower() in ['post', 'put']: + if endpoint.body_params: + body_params = cls._add_param_info(endpoint.body_params, p_info) + methods[method.lower()]['requestBody'] = { + 'content': { + 'application/json': { + 'schema': cls._gen_schema_for_content(body_params)}}} + + if endpoint.query_params: + query_params = cls._add_param_info(endpoint.query_params, p_info) + methods[method.lower()]['requestBody'] = { + 'content': { + 'application/json': { + 'schema': cls._gen_schema_for_content(query_params)}}} + + if endpoint.is_secure: + methods[method.lower()]['security'] = [{'jwt': []}] + + if not skip: + paths[path] = methods + + return paths + + @classmethod + def _gen_spec(cls, all_endpoints=False, base_url="", offline=False): + if all_endpoints: + base_url = "" + + host = cherrypy.request.base.split('://', 1)[1] if not offline else 'example.com' + logger.debug("Host: %s", host) + + paths = cls.gen_paths(all_endpoints) + + if not base_url: + base_url = "/" + + scheme = 'https' if offline or mgr.get_localized_module_option('ssl') else 'http' + + spec = { + 'openapi': "3.0.0", + 'info': { + 'description': "This is the official Ceph REST API", + 'version': "v1", + 'title': "Ceph REST API" + }, + 'host': host, + 'basePath': base_url, + 'servers': [{'url': "{}{}".format( + cherrypy.request.base if not offline else '', + base_url)}], + 'tags': cls._gen_tags(all_endpoints), + 'schemes': [scheme], + 'paths': paths, + 'components': { + 'securitySchemes': { + 'jwt': { + 'type': 'http', + 'scheme': 'bearer', + 'bearerFormat': 'JWT' + } + } + } + } + + return spec + + @Endpoint(path="openapi.json", version=None) + def open_api_json(self): + return self._gen_spec(False, "/") + + @Endpoint(path="api-all.json", version=None) + def api_all_json(self): + return self._gen_spec(True, "/") + + +if __name__ == "__main__": + import sys + + import yaml + + def fix_null_descr(obj): + """ + A hot fix for errors caused by null description values when generating + static documentation: better fix would be default values in source + to be 'None' strings: however, decorator changes didn't resolve + """ + return {k: fix_null_descr(v) for k, v in obj.items() if v is not None} \ + if isinstance(obj, dict) else obj + + Router.generate_routes("/api") + try: + with open(sys.argv[1], 'w') as f: + # pylint: disable=protected-access + yaml.dump( + fix_null_descr(Docs._gen_spec(all_endpoints=False, base_url="/", offline=True)), + f) + except IndexError: + sys.exit("Output file name missing; correct syntax is: `cmd <file.yml>`") + except IsADirectoryError: + sys.exit("Specified output is a directory; correct syntax is: `cmd <file.yml>`") |