#!/usr/bin/env python3 # ############################################################################# # Copyright (c) 2018-present lzutao # All rights reserved. # # This source code is licensed under both the BSD-style license (found in the # LICENSE file in the root directory of this source tree) and the GPLv2 (found # in the COPYING file in the root directory of this source tree). # ############################################################################# import os import subprocess import tempfile def valgrindTest(valgrind, datagen, fuzzer, zstd, fullbench): VALGRIND_ARGS = [valgrind, '--leak-check=full', '--show-leak-kinds=all', '--error-exitcode=1'] print('\n ---- valgrind tests : memory analyzer ----') subprocess.check_call([*VALGRIND_ARGS, datagen, '-g50M'], stdout=subprocess.DEVNULL) if subprocess.call([*VALGRIND_ARGS, zstd], stdout=subprocess.DEVNULL) == 0: raise subprocess.CalledProcessError('zstd without argument should have failed') with subprocess.Popen([datagen, '-g80'], stdout=subprocess.PIPE) as p1, \ subprocess.Popen([*VALGRIND_ARGS, zstd, '-', '-c'], stdin=p1.stdout, stdout=subprocess.DEVNULL) as p2: p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. p2.communicate() if p2.returncode != 0: raise subprocess.CalledProcessError() with subprocess.Popen([datagen, '-g16KB'], stdout=subprocess.PIPE) as p1, \ subprocess.Popen([*VALGRIND_ARGS, zstd, '-vf', '-', '-c'], stdin=p1.stdout, stdout=subprocess.DEVNULL) as p2: p1.stdout.close() p2.communicate() if p2.returncode != 0: raise subprocess.CalledProcessError() with tempfile.NamedTemporaryFile() as tmp_fd: with subprocess.Popen([datagen, '-g2930KB'], stdout=subprocess.PIPE) as p1, \ subprocess.Popen([*VALGRIND_ARGS, zstd, '-5', '-vf', '-', '-o', tmp_fd.name], stdin=p1.stdout) as p2: p1.stdout.close() p2.communicate() if p2.returncode != 0: raise subprocess.CalledProcessError() subprocess.check_call([*VALGRIND_ARGS, zstd, '-vdf', tmp_fd.name, '-c'], stdout=subprocess.DEVNULL) with subprocess.Popen([datagen, '-g64MB'], stdout=subprocess.PIPE) as p1, \ subprocess.Popen([*VALGRIND_ARGS, zstd, '-vf', '-', '-c'], stdin=p1.stdout, stdout=subprocess.DEVNULL) as p2: p1.stdout.close() p2.communicate() if p2.returncode != 0: raise subprocess.CalledProcessError() subprocess.check_call([*VALGRIND_ARGS, fuzzer, '-T1mn', '-t1']) subprocess.check_call([*VALGRIND_ARGS, fullbench, '-i1']) def main(): import argparse parser = argparse.ArgumentParser(description='Valgrind tests : memory analyzer') parser.add_argument('valgrind', help='valgrind path') parser.add_argument('zstd', help='zstd path') parser.add_argument('datagen', help='datagen path') parser.add_argument('fuzzer', help='fuzzer path') parser.add_argument('fullbench', help='fullbench path') args = parser.parse_args() valgrind = args.valgrind zstd = args.zstd datagen = args.datagen fuzzer = args.fuzzer fullbench = args.fullbench valgrindTest(valgrind, datagen, fuzzer, zstd, fullbench) if __name__ == '__main__': main()