summaryrefslogtreecommitdiffstats
path: root/tests/api/fuzzer.py
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2021-02-07 11:49:00 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2021-02-07 12:42:05 +0000
commit2e85f9325a797977eea9dfea0a925775ddd211d9 (patch)
tree452c7f30d62fca5755f659b99e4e53c7b03afc21 /tests/api/fuzzer.py
parentReleasing debian version 1.19.0-4. (diff)
downloadnetdata-2e85f9325a797977eea9dfea0a925775ddd211d9.tar.xz
netdata-2e85f9325a797977eea9dfea0a925775ddd211d9.zip
Merging upstream version 1.29.0.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'tests/api/fuzzer.py')
-rw-r--r--tests/api/fuzzer.py378
1 files changed, 378 insertions, 0 deletions
diff --git a/tests/api/fuzzer.py b/tests/api/fuzzer.py
new file mode 100644
index 000000000..ee12a028a
--- /dev/null
+++ b/tests/api/fuzzer.py
@@ -0,0 +1,378 @@
+import argparse
+import json
+import logging
+import posixpath
+import random
+import re
+import requests
+import string
+import sys
+import urllib.parse
+
+#######################################################################################################################
+# Utilities
+
+
+def some(s):
+ return random.choice(sorted(s))
+
+
+def not_some(s):
+ test_set = random.choice([string.ascii_uppercase + string.ascii_lowercase,
+ string.digits,
+ string.digits + ".E-",
+ '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJK'
+ 'LMNOPQRSTUVWXYZ!"#$%\'()*+,-./:;<=>?@[\\]^_`{|}~ '])
+ test_len = random.choice([1, 2, 3, 37, 61, 121])
+ while True:
+ x = ''.join([random.choice(test_set) for _ in range(test_len)])
+ if x not in s:
+ return x
+
+
+def build_url(host_maybe_scheme, base_path):
+ try:
+ if '//' not in host_maybe_scheme:
+ host_maybe_scheme = '//' + host_maybe_scheme
+ url_tuple = urllib.parse.urlparse(host_maybe_scheme)
+ if base_path[0] == '/':
+ base_path = base_path[1:]
+ return url_tuple.netloc, posixpath.join(url_tuple.path, base_path)
+ except Exception as e:
+ L.error(f"Critical failure decoding arguments -> {e}")
+ sys.exit(-1)
+
+
+#######################################################################################################################
+# Data-model and processing
+
+
+class Param(object):
+ def __init__(self, name, location, kind):
+ self.location = location
+ self.kind = kind
+ self.name = name
+ self.values = set()
+
+ def dump(self):
+ print(f"{self.name} in {self.location} is {self.kind} : {{{self.values}}}")
+
+
+def does_response_fit_schema(schema_path, schema, resp):
+ '''The schema_path argument tells us where we are (globally) in the schema. The schema argument is the
+ sub-tree within the schema json that we are validating against. The resp is the json subtree from the
+ target host's response.
+
+ The basic idea is this: swagger defines a model of valid json trees. In this sense it is a formal
+ language and we can validate a given server response by checking if the language accepts a particular
+ server response. This is basically a parser, but instead of strings we are operating on languages
+ of trees.
+
+ This could probably be extended to arbitrary swagger definitions - but the amount of work increases
+ rapidly as we attempt to cover the full semantics of languages of trees defined in swagger. Instead
+ we have some special cases that describe the parts of the semantics that we've used to describe the
+ netdata API.
+
+ If we hit an error (in the schema) that prevents further checks then we return early, otherwise we
+ try to collect as many errors as possible.
+ '''
+ success = True
+ if "type" not in schema:
+ L.error(f"Cannot progress past {schema_path} -> no type specified in dictionary")
+ print(json.dumps(schema, indent=2))
+ return False
+ if schema["type"] == "object":
+ if isinstance(resp, dict) and "properties" in schema and isinstance(schema["properties"], dict):
+ L.debug(f"Validate properties against dictionary at {schema_path}")
+ for k, v in schema["properties"].items():
+ L.debug(f"Validate {k} received with {v}")
+ if v.get("required", False) and k not in resp:
+ L.error(f"Missing {k} in response at {schema_path}")
+ print(json.dumps(resp, indent=2))
+ return False
+ if k in resp:
+ if not does_response_fit_schema(posixpath.join(schema_path, k), v, resp[k]):
+ success = False
+ elif isinstance(resp, dict) and "additionalProperties" in schema \
+ and isinstance(schema["additionalProperties"], dict):
+ kv_schema = schema["additionalProperties"]
+ L.debug(f"Validate additionalProperties against every value in dictionary at {schema_path}")
+ if "type" in kv_schema and kv_schema["type"] == "object":
+ for k, v in resp.items():
+ if not does_response_fit_schema(posixpath.join(schema_path, k), kv_schema, v):
+ success = False
+ else:
+ L.error("Don't understand what the additionalProperties means (it has no type?)")
+ return False
+ else:
+ L.error(f"Can't understand schema at {schema_path}")
+ print(json.dumps(schema, indent=2))
+ return False
+ elif schema["type"] == "string":
+ if isinstance(resp, str):
+ L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path}")
+ return True
+ L.error(f"{repr(resp)} does not match schema {repr(schema)} at {schema_path}")
+ return False
+ elif schema["type"] == "boolean":
+ if isinstance(resp, bool):
+ L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path}")
+ return True
+ L.error(f"{repr(resp)} does not match schema {repr(schema)} at {schema_path}")
+ return False
+ elif schema["type"] == "number":
+ if 'nullable' in schema and resp is None:
+ L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path} (because nullable)")
+ return True
+ if isinstance(resp, int) or isinstance(resp, float):
+ L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path}")
+ return True
+ L.error(f"{repr(resp)} does not match schema {repr(schema)} at {schema_path}")
+ return False
+ elif schema["type"] == "integer":
+ if 'nullable' in schema and resp is None:
+ L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path} (because nullable)")
+ return True
+ if isinstance(resp, int):
+ L.debug(f"{repr(resp)} matches {repr(schema)} at {schema_path}")
+ return True
+ L.error(f"{repr(resp)} does not match schema {repr(schema)} at {schema_path}")
+ return False
+ elif schema["type"] == "array":
+ if "items" not in schema:
+ L.error(f"Schema for array at {schema_path} does not specify items!")
+ return False
+ item_schema = schema["items"]
+ if not isinstance(resp, list):
+ L.error(f"Server did not return a list for {schema_path} (typed as array in schema)")
+ return False
+ for i, item in enumerate(resp):
+ if not does_response_fit_schema(posixpath.join(schema_path, str(i)), item_schema, item):
+ success = False
+ else:
+ L.error(f"Invalid swagger type {schema['type']} for {type(resp)} at {schema_path}")
+ print(json.dumps(schema, indent=2))
+ return False
+ return success
+
+
+class GetPath(object):
+ def __init__(self, url, spec):
+ self.url = url
+ self.req_params = {}
+ self.opt_params = {}
+ self.success = None
+ self.failures = {}
+ if 'parameters' in spec.keys():
+ for p in spec['parameters']:
+ name = p['name']
+ req = p.get('required', False)
+ target = self.req_params if req else self.opt_params
+ target[name] = Param(name, p['in'], p['type'])
+ if 'default' in p:
+ defs = p['default']
+ if isinstance(defs, list):
+ for d in defs:
+ target[name].values.add(d)
+ else:
+ target[name].values.add(defs)
+ if 'enum' in p:
+ for v in p['enum']:
+ target[name].values.add(v)
+ if req and len(target[name].values) == 0:
+ print(f"FAIL: No default values in swagger for required parameter {name} in {self.url}")
+ for code, schema in spec['responses'].items():
+ if code[0] == "2" and 'schema' in schema:
+ self.success = schema['schema']
+ elif code[0] == "2":
+ L.error(f"2xx response with no schema in {self.url}")
+ else:
+ self.failures[code] = schema
+
+ def generate_success(self, host):
+ url_args = "&".join([f"{p.name}={some(p.values)}" for p in self.req_params.values()])
+ base_url = urllib.parse.urljoin(host, self.url)
+ test_url = f"{base_url}?{url_args}"
+ if url_filter.match(test_url):
+ try:
+ resp = requests.get(url=test_url, verify=(not args.tls_no_verify))
+ self.validate(test_url, resp, True)
+ except Exception as e:
+ L.error(f"Network failure in test {e}")
+ else:
+ L.debug(f"url_filter skips {test_url}")
+
+ def generate_failure(self, host):
+ all_params = list(self.req_params.values()) + list(self.opt_params.values())
+ bad_param = ''.join([random.choice(string.ascii_lowercase) for _ in range(5)])
+ while bad_param in all_params:
+ bad_param = ''.join([random.choice(string.ascii_lowercase) for _ in range(5)])
+ all_params.append(Param(bad_param, "query", "string"))
+ url_args = "&".join([f"{p.name}={not_some(p.values)}" for p in all_params])
+ base_url = urllib.parse.urljoin(host, self.url)
+ test_url = f"{base_url}?{url_args}"
+ if url_filter.match(test_url):
+ try:
+ resp = requests.get(url=test_url, verify=(not args.tls_no_verify))
+ self.validate(test_url, resp, False)
+ except Exception as e:
+ L.error(f"Network failure in test {e}")
+
+ def validate(self, test_url, resp, expect_success):
+ try:
+ resp_json = json.loads(resp.text)
+ except json.decoder.JSONDecodeError as e:
+ L.error(f"Non-json response from {test_url}")
+ return
+ success_code = resp.status_code >= 200 and resp.status_code < 300
+ if success_code and expect_success:
+ if self.success is not None:
+ if does_response_fit_schema(posixpath.join(self.url, str(resp.status_code)), self.success, resp_json):
+ L.info(f"tested {test_url}")
+ else:
+ L.error(f"tested {test_url}")
+ else:
+ L.error(f"Missing schema {test_url}")
+ elif not success_code and not expect_success:
+ schema = self.failures.get(str(resp.status_code), None)
+ if schema is not None:
+ if does_response_fit_schema(posixpath.join(self.url, str(resp.status_code)), schema, resp_json):
+ L.info(f"tested {test_url}")
+ else:
+ L.error(f"tested {test_url}")
+ else:
+ L.error("Missing schema for {resp.status_code} from {test_url}")
+ else:
+ L.error(f"Received incorrect status code {resp.status_code} against {test_url}")
+
+
+def get_the_spec(url):
+ if url[:7] == "file://":
+ with open(url[7:]) as f:
+ return f.read()
+ return requests.get(url=url).text
+
+
+# Swagger paths look absolute but they are relative to the base.
+def not_absolute(path):
+ return path[1:] if path[0] == '/' else path
+
+
+def find_ref(spec, path):
+ if len(path) > 0 and path[0] == '#':
+ return find_ref(spec, path[1:])
+ if len(path) == 1:
+ return spec[path[0]]
+ return find_ref(spec[path[0]], path[1:])
+
+
+def resolve_refs(spec, spec_root=None):
+ '''Find all "$ref" keys in the swagger spec and inline their target schemas.
+
+ As with all inliners this will break if a definition recursively links to itself, but this should not
+ happen in swagger as embedding a structure inside itself would produce a record of infinite size.'''
+ if spec_root is None:
+ spec_root = spec
+ newspec = {}
+ for k, v in spec.items():
+ if k == "$ref":
+ path = v.split('/')
+ target = find_ref(spec_root, path)
+ # Unfold one level of the tree and erase the $ref if possible.
+ if isinstance(target, dict):
+ for kk, vv in resolve_refs(target, spec_root).items():
+ newspec[kk] = vv
+ else:
+ newspec[k] = target
+ elif isinstance(v, dict):
+ newspec[k] = resolve_refs(v, spec_root)
+ else:
+ newspec[k] = v
+ # This is an artifact of inline the $refs when they are inside a properties key as their children should be
+ # pushed up into the parent dictionary. They must be merged (union) rather than replace as we use this to
+ # implement polymorphism in the data-model.
+ if 'properties' in newspec and isinstance(newspec['properties'], dict) and \
+ 'properties' in newspec['properties']:
+ sub = newspec['properties']['properties']
+ del newspec['properties']['properties']
+ if 'type' in newspec['properties']:
+ del newspec['properties']['type']
+ for k, v in sub.items():
+ newspec['properties'][k] = v
+ return newspec
+
+
+#######################################################################################################################
+# Initialization
+
+random.seed(7) # Default is reproducible sequences
+
+parser = argparse.ArgumentParser()
+parser.add_argument('--url', type=str,
+ default='https://raw.githubusercontent.com/netdata/netdata/master/web/api/netdata-swagger.json',
+ help='The URL of the API definition in swagger. The default will pull the latest version '
+ 'from the main branch.')
+parser.add_argument('--host', type=str,
+ help='The URL of the target host to fuzz. The default will read the host from the swagger '
+ 'definition.')
+parser.add_argument('--reseed', action='store_true',
+ help="Pick a random seed for the PRNG. The default uses a constant seed for reproducibility.")
+parser.add_argument('--passes', action='store_true',
+ help="Log information about tests that pass")
+parser.add_argument('--detail', action='store_true',
+ help="Log information about the response/schema comparisons during each test")
+parser.add_argument('--filter', type=str,
+ default=".*",
+ help="Supply a regex used to filter the testing URLs generated")
+parser.add_argument('--tls-no-verify', action='store_true',
+ help="Disable TLS certification verification to allow connection to hosts that use"
+ "self-signed certificates")
+parser.add_argument('--dump-inlined', action='store_true',
+ help='Dump the inlined swagger spec instead of fuzzing. For "reasons".')
+
+args = parser.parse_args()
+if args.reseed:
+ random.seed()
+
+spec = json.loads(get_the_spec(args.url))
+inlined_spec = resolve_refs(spec)
+if args.dump_inlined:
+ print(json.dumps(inlined_spec, indent=2))
+ sys.exit(-1)
+
+logging.addLevelName(40, "FAIL")
+logging.addLevelName(20, "PASS")
+logging.addLevelName(10, "DETAIL")
+L = logging.getLogger()
+handler = logging.StreamHandler(sys.stdout)
+if not args.passes and not args.detail:
+ L.setLevel(logging.ERROR)
+elif args.passes and not args.detail:
+ L.setLevel(logging.INFO)
+elif args.detail:
+ L.setLevel(logging.DEBUG)
+handler.setFormatter(logging.Formatter(fmt="%(levelname)s %(message)s"))
+L.addHandler(handler)
+
+url_filter = re.compile(args.filter)
+
+if spec['swagger'] != '2.0':
+ L.error(f"Unexpected swagger version")
+ sys.exit(-1)
+L.info(f"Fuzzing {spec['info']['title']} / {spec['info']['version']}")
+
+host, base_url = build_url(args.host or spec['host'], inlined_spec['basePath'])
+
+L.info(f"Target host is {base_url}")
+paths = []
+for name, p in inlined_spec['paths'].items():
+ if 'get' in p:
+ name = not_absolute(name)
+ paths.append(GetPath(posixpath.join(base_url, name), p['get']))
+ elif 'put' in p:
+ L.error(f"Generation of PUT methods (for {name} is unimplemented")
+
+for s in inlined_spec['schemes']:
+ for p in paths:
+ resp = p.generate_success(s + "://" + host)
+ resp = p.generate_failure(s+"://"+host)