summaryrefslogtreecommitdiffstats
path: root/comm/third_party/botan/src/scripts/test_cli_crypt.py
diff options
context:
space:
mode:
Diffstat (limited to 'comm/third_party/botan/src/scripts/test_cli_crypt.py')
-rwxr-xr-xcomm/third_party/botan/src/scripts/test_cli_crypt.py220
1 files changed, 220 insertions, 0 deletions
diff --git a/comm/third_party/botan/src/scripts/test_cli_crypt.py b/comm/third_party/botan/src/scripts/test_cli_crypt.py
new file mode 100755
index 0000000000..6160d03690
--- /dev/null
+++ b/comm/third_party/botan/src/scripts/test_cli_crypt.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+
+import binascii
+import argparse
+import re
+import subprocess
+import sys
+import os.path
+import logging
+import time
+from collections import OrderedDict
+import multiprocessing
+from multiprocessing.pool import ThreadPool
+
+SUPPORTED_ALGORITHMS = {
+ "AES-128/CFB": "aes-128-cfb",
+ "AES-192/CFB": "aes-192-cfb",
+ "AES-256/CFB": "aes-256-cfb",
+ "AES-128/GCM": "aes-128-gcm",
+ "AES-192/GCM": "aes-192-gcm",
+ "AES-256/GCM": "aes-256-gcm",
+ "AES-128/OCB": "aes-128-ocb",
+ "AES-128/XTS": "aes-128-xts",
+ "AES-256/XTS": "aes-256-xts",
+ "ChaCha20Poly1305": "chacha20poly1305",
+}
+
+class VecDocument:
+ def __init__(self, filepath):
+ self.data = OrderedDict()
+ last_testcase_number = 1
+ current_testcase_number = 1
+ current_group_name = ""
+ last_group_name = ""
+ current_testcase = {}
+
+ PATTERN_GROUPHEADER = "^\[(.+)\]$"
+ PATTERN_KEYVALUE = "^\s*([a-zA-Z]+)\s*=(.*)$"
+
+ with open(filepath, 'r') as f:
+ # Append one empty line to simplify parsing
+ lines = f.read().splitlines() + ["\n"]
+
+ for line in lines:
+ line = line.strip()
+ if line.startswith("#"):
+ pass # Skip
+ elif line == "":
+ current_testcase_number += 1
+ elif re.match(PATTERN_GROUPHEADER, line):
+ match = re.match(PATTERN_GROUPHEADER, line)
+ current_group_name = match.group(1)
+ elif re.match(PATTERN_KEYVALUE, line):
+ match = re.match(PATTERN_KEYVALUE, line)
+ key = match.group(1)
+ value = match.group(2).strip()
+ current_testcase[key] = value
+
+ if current_testcase_number != last_testcase_number:
+ if not current_group_name in self.data:
+ self.data[current_group_name] = []
+ if len(current_testcase) != 0:
+ self.data[current_group_name].append(current_testcase)
+ current_testcase = {}
+ last_testcase_number = current_testcase_number
+
+ if current_group_name != last_group_name:
+ last_group_name = current_group_name
+ # Reset testcase number
+ last_testcase_number = 1
+ current_testcase_number = 1
+
+ def get_data(self):
+ return self.data
+
+TESTS_RUN = 0
+TESTS_FAILED = 0
+
+class TestLogHandler(logging.StreamHandler, object):
+ def emit(self, record):
+ # Do the default stuff first
+ super(TestLogHandler, self).emit(record)
+ if record.levelno >= logging.ERROR:
+ global TESTS_FAILED
+ TESTS_FAILED += 1
+
+def setup_logging(options):
+ if options.verbose:
+ log_level = logging.DEBUG
+ elif options.quiet:
+ log_level = logging.WARNING
+ else:
+ log_level = logging.INFO
+
+ lh = TestLogHandler(sys.stdout)
+ lh.setFormatter(logging.Formatter('%(levelname) 7s: %(message)s'))
+ logging.getLogger().addHandler(lh)
+ logging.getLogger().setLevel(log_level)
+
+def test_cipher_kat(cli_binary, data):
+ iv = data['Nonce']
+ key = data['Key']
+ ad = data['AD'] if 'AD' in data else ""
+ plaintext = data['In'].lower()
+ ciphertext = data['Out'].lower()
+ algorithm = data['Algorithm']
+ direction = data['Direction']
+
+ mode = SUPPORTED_ALGORITHMS.get(algorithm)
+ if mode is None:
+ raise Exception("Unknown algorithm: '" + algorithm + "'")
+
+ cmd = [
+ cli_binary,
+ "encryption",
+ "--mode=%s" % mode,
+ "--iv=%s" % iv,
+ "--ad=%s" % ad,
+ "--key=%s" % key]
+ if direction == "decrypt":
+ cmd += ['--decrypt']
+
+ if direction == "decrypt":
+ invalue = ciphertext
+ else:
+ invalue = plaintext
+
+ p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
+ out_raw = p.communicate(input=binascii.unhexlify(invalue))[0]
+ output = binascii.hexlify(out_raw).decode("UTF-8").lower()
+
+ expected = plaintext if direction == "decrypt" else ciphertext
+ if expected != output:
+ logging.error("For test %s got %s expected %s" % (data['testname'], output, expected))
+
+def get_testdata(document, max_tests):
+ out = []
+ for algorithm in document:
+ if algorithm in SUPPORTED_ALGORITHMS:
+ testcase_number = 0
+ for testcase in document[algorithm]:
+ testcase_number += 1
+ for direction in ['encrypt', 'decrypt']:
+ testname = "{} no {:0>3} ({})".format(
+ algorithm.lower(), testcase_number, direction)
+ testname = re.sub("[^a-z0-9-]", "_", testname)
+ testname = re.sub("_+", "_", testname)
+ testname = testname.strip("_")
+ test = {'testname': testname}
+ for key in testcase:
+ value = testcase[key]
+ test[key] = value
+ test['Algorithm'] = algorithm
+ test['Direction'] = direction
+
+ out.append(test)
+
+ if max_tests > 0 and testcase_number > max_tests:
+ break
+ return out
+
+def main(args=None):
+ if args is None:
+ args = sys.argv
+
+ parser = argparse.ArgumentParser(description="")
+ parser.add_argument('cli_binary', help='path to the botan cli binary')
+ parser.add_argument('--max-tests', type=int, default=50, metavar="M")
+ parser.add_argument('--threads', type=int, default=0, metavar="T")
+ parser.add_argument('--verbose', action='store_true', default=False)
+ parser.add_argument('--quiet', action='store_true', default=False)
+ args = parser.parse_args()
+
+ setup_logging(args)
+
+ cli_binary = args.cli_binary
+ max_tests = args.max_tests
+ threads = args.threads
+
+ if threads == 0:
+ threads = multiprocessing.cpu_count()
+
+ test_data_dir = os.path.join('src', 'tests', 'data')
+
+ mode_test_data = [os.path.join(test_data_dir, 'modes', 'cfb.vec'),
+ os.path.join(test_data_dir, 'aead', 'gcm.vec'),
+ os.path.join(test_data_dir, 'aead', 'ocb.vec'),
+ os.path.join(test_data_dir, 'modes', 'xts.vec'),
+ os.path.join(test_data_dir, 'aead', 'chacha20poly1305.vec')]
+
+ kats = []
+ for f in mode_test_data:
+ vecfile = VecDocument(f)
+ kats += get_testdata(vecfile.get_data(), max_tests)
+
+ start_time = time.time()
+
+ if threads > 1:
+ pool = ThreadPool(processes=threads)
+ results = []
+ for test in kats:
+ results.append(pool.apply_async(test_cipher_kat, (cli_binary, test)))
+
+ for result in results:
+ result.get()
+ else:
+ for test in kats:
+ test_cipher_kat(test)
+
+ end_time = time.time()
+
+ print("Ran %d tests with %d failures in %.02f seconds" % (
+ len(kats), TESTS_FAILED, end_time - start_time))
+
+ if TESTS_FAILED > 0:
+ return 1
+ return 0
+
+if __name__ == '__main__':
+ sys.exit(main())