#!/usr/bin/env python import sys import tempfile import shutil import inspect import os import logging from timeit import default_timer as perf_timer from argparse import ArgumentParser from cli_common import ( find_utility, run_proc, run_proc_fast, pswd_pipe, rnp_file_path, size_to_readable, raise_err ) RNP = '' RNPK = '' GPG = '' WORKDIR = '' RNPDIR = '' GPGDIR = '' RMWORKDIR = False SMALL_ITERATIONS = 100 LARGE_ITERATIONS = 5 LARGESIZE = 1024*1024*100 SMALLSIZE = 0 SMALLFILE = 'smalltest.txt' LARGEFILE = 'largetest.txt' PASSWORD = 'password' def setup(workdir): # Searching for rnp and gnupg global RNP, GPG, RNPK, WORKDIR, RNPDIR, GPGDIR, SMALLSIZE, RMWORKDIR logging.basicConfig(stream=sys.stdout, format="%(message)s") logging.getLogger().setLevel(logging.INFO) RNP = rnp_file_path('src/rnp/rnp') RNPK = rnp_file_path('src/rnpkeys/rnpkeys') GPG = find_utility('gpg') if workdir: WORKDIR = workdir else: WORKDIR = tempfile.mkdtemp(prefix = 'rnpptmp') RMWORKDIR = True logging.debug('Setting up test in {} ...'.format(WORKDIR)) # Creating working directory and populating it with test files RNPDIR = os.path.join(WORKDIR, '.rnp') GPGDIR = os.path.join(WORKDIR, '.gpg') os.mkdir(RNPDIR, 0o700) os.mkdir(GPGDIR, 0o700) # Generating key pipe = pswd_pipe(PASSWORD) params = ['--homedir', RNPDIR, '--pass-fd', str(pipe), '--userid', 'performance@rnp', '--generate-key'] # Run key generation run_proc(RNPK, params) os.close(pipe) # Importing keys to GnuPG so it can build trustdb and so on run_proc(GPG, ['--batch', '--passphrase', '', '--homedir', GPGDIR, '--import', os.path.join(RNPDIR, 'pubring.gpg'), os.path.join(RNPDIR, 'secring.gpg')]) # Generating small file for tests SMALLSIZE = 3312 st = 'lorem ipsum dol ' * (SMALLSIZE//16+1) with open(os.path.join(WORKDIR, SMALLFILE), 'w+') as small_file: small_file.write(st) # Generating large file for tests print('Generating large file of size {}'.format(size_to_readable(LARGESIZE))) st = '0123456789ABCDEF' * (1024//16) with open(os.path.join(WORKDIR, LARGEFILE), 'w') as fd: for i in range(0, LARGESIZE // 1024): fd.write(st) def run_iterated(iterations, func, src, dst, *args): runtime = 0 for i in range(0, iterations): tstart = perf_timer() func(src, dst, *args) runtime += perf_timer() - tstart os.remove(dst) res = runtime / iterations #print '{} average run time: {}'.format(func.__name__, res) return res def rnp_symencrypt_file(src, dst, cipher, zlevel = 6, zalgo = 'zip', armor = False): params = ['--homedir', RNPDIR, '--password', PASSWORD, '--cipher', cipher, '-z', str(zlevel), '--' + zalgo, '-c', src, '--output', dst] if armor: params += ['--armor'] ret = run_proc_fast(RNP, params) if ret != 0: raise_err('rnp symmetric encryption failed') def rnp_decrypt_file(src, dst): ret = run_proc_fast(RNP, ['--homedir', RNPDIR, '--password', PASSWORD, '--decrypt', src, '--output', dst]) if ret != 0: raise_err('rnp decryption failed') def gpg_symencrypt_file(src, dst, cipher = 'AES', zlevel = 6, zalgo = 1, armor = False): params = ['--homedir', GPGDIR, '-c', '-z', str(zlevel), '--s2k-count', '524288', '--compress-algo', str(zalgo), '--batch', '--passphrase', PASSWORD, '--cipher-algo', cipher, '--output', dst, src] if armor: params.insert(2, '--armor') ret = run_proc_fast(GPG, params) if ret != 0: raise_err('gpg symmetric encryption failed for cipher ' + cipher) def gpg_decrypt_file(src, dst, keypass): ret = run_proc_fast(GPG, ['--homedir', GPGDIR, '--pinentry-mode=loopback', '--batch', '--yes', '--passphrase', keypass, '--trust-model', 'always', '-o', dst, '-d', src]) if ret != 0: raise_err('gpg decryption failed') def print_test_results(fsize, rnptime, gpgtime, operation): if not rnptime or not gpgtime: logging.info('{}:TEST FAILED'.format(operation)) if fsize == SMALLSIZE: rnpruns = 1.0 / rnptime gpgruns = 1.0 / gpgtime runstr = '{:.2f} runs/sec vs {:.2f} runs/sec'.format(rnpruns, gpgruns) if rnpruns >= gpgruns: percents = (rnpruns - gpgruns) / gpgruns * 100 logging.info('{:<30}: RNP is {:>3.0f}% FASTER then GnuPG ({})'.format( operation, percents, runstr)) else: percents = (gpgruns - rnpruns) / gpgruns * 100 logging.info('{:<30}: RNP is {:>3.0f}% SLOWER then GnuPG ({})'.format( operation, percents, runstr)) else: rnpspeed = fsize / 1024.0 / 1024.0 / rnptime gpgspeed = fsize / 1024.0 / 1024.0 / gpgtime spdstr = '{:.2f} MB/sec vs {:.2f} MB/sec'.format(rnpspeed, gpgspeed) if rnpspeed >= gpgspeed: percents = (rnpspeed - gpgspeed) / gpgspeed * 100 logging.info('{:<30}: RNP is {:>3.0f}% FASTER then GnuPG ({})'.format( operation, percents, spdstr)) else: percents = (gpgspeed - rnpspeed) / gpgspeed * 100 logging.info('{:<30}: RNP is {:>3.0f}% SLOWER then GnuPG ({})'.format( operation, percents, spdstr)) def get_file_params(filetype): if filetype == 'small': infile, outfile, iterations, fsize = (SMALLFILE, SMALLFILE + '.gpg', SMALL_ITERATIONS, SMALLSIZE) else: infile, outfile, iterations, fsize = (LARGEFILE, LARGEFILE + '.gpg', LARGE_ITERATIONS, LARGESIZE) infile = os.path.join(WORKDIR, infile) rnpout = os.path.join(WORKDIR, outfile + '.rnp') gpgout = os.path.join(WORKDIR, outfile + '.gpg') return (infile, rnpout, gpgout, iterations, fsize) class Benchmark(object): rnphome = ['--homedir', RNPDIR] gpghome = ['--homedir', GPGDIR] def small_file_symmetric_encryption(self): # Running each operation iteratively for a small and large file(s), calculating the average # 1. Encryption ''' Small file symmetric encryption ''' infile, rnpout, gpgout, iterations, fsize = get_file_params('small') for armor in [False, True]: tmrnp = run_iterated(iterations, rnp_symencrypt_file, infile, rnpout, 'AES128', 0, 'zip', armor) tmgpg = run_iterated(iterations, gpg_symencrypt_file, infile, gpgout, 'AES128', 0, 1, armor) testname = 'ENCRYPT-SMALL-{}'.format('ARMOR' if armor else 'BINARY') print_test_results(fsize, tmrnp, tmgpg, testname) def large_file_symmetric_encryption(self): ''' Large file symmetric encryption ''' infile, rnpout, gpgout, iterations, fsize = get_file_params('large') for cipher in ['AES128', 'AES192', 'AES256', 'TWOFISH', 'BLOWFISH', 'CAST5', 'CAMELLIA128', 'CAMELLIA192', 'CAMELLIA256']: tmrnp = run_iterated(iterations, rnp_symencrypt_file, infile, rnpout, cipher, 0, 'zip', False) tmgpg = run_iterated(iterations, gpg_symencrypt_file, infile, gpgout, cipher, 0, 1, False) testname = 'ENCRYPT-{}-BINARY'.format(cipher) print_test_results(fsize, tmrnp, tmgpg, testname) def large_file_armored_encryption(self): ''' Large file armored encryption ''' infile, rnpout, gpgout, iterations, fsize = get_file_params('large') tmrnp = run_iterated(iterations, rnp_symencrypt_file, infile, rnpout, 'AES128', 0, 'zip', True) tmgpg = run_iterated(iterations, gpg_symencrypt_file, infile, gpgout, 'AES128', 0, 1, True) print_test_results(fsize, tmrnp, tmgpg, 'ENCRYPT-LARGE-ARMOR') def small_file_symmetric_decryption(self): ''' Small file symmetric decryption ''' infile, rnpout, gpgout, iterations, fsize = get_file_params('small') inenc = infile + '.enc' for armor in [False, True]: gpg_symencrypt_file(infile, inenc, 'AES', 0, 1, armor) tmrnp = run_iterated(iterations, rnp_decrypt_file, inenc, rnpout) tmgpg = run_iterated(iterations, gpg_decrypt_file, inenc, gpgout, PASSWORD) testname = 'DECRYPT-SMALL-{}'.format('ARMOR' if armor else 'BINARY') print_test_results(fsize, tmrnp, tmgpg, testname) os.remove(inenc) def large_file_symmetric_decryption(self): ''' Large file symmetric decryption ''' infile, rnpout, gpgout, iterations, fsize = get_file_params('large') inenc = infile + '.enc' for cipher in ['AES128', 'AES192', 'AES256', 'TWOFISH', 'BLOWFISH', 'CAST5', 'CAMELLIA128', 'CAMELLIA192', 'CAMELLIA256']: gpg_symencrypt_file(infile, inenc, cipher, 0, 1, False) tmrnp = run_iterated(iterations, rnp_decrypt_file, inenc, rnpout) tmgpg = run_iterated(iterations, gpg_decrypt_file, inenc, gpgout, PASSWORD) testname = 'DECRYPT-{}-BINARY'.format(cipher) print_test_results(fsize, tmrnp, tmgpg, testname) os.remove(inenc) def large_file_armored_decryption(self): ''' Large file armored decryption ''' infile, rnpout, gpgout, iterations, fsize = get_file_params('large') inenc = infile + '.enc' gpg_symencrypt_file(infile, inenc, 'AES128', 0, 1, True) tmrnp = run_iterated(iterations, rnp_decrypt_file, inenc, rnpout) tmgpg = run_iterated(iterations, gpg_decrypt_file, inenc, gpgout, PASSWORD) print_test_results(fsize, tmrnp, tmgpg, 'DECRYPT-LARGE-ARMOR') os.remove(inenc) # 3. Signing #print '\n#3. Signing\n' # 4. Verification #print '\n#4. Verification\n' # 5. Cleartext signing #print '\n#5. Cleartext signing and verification\n' # 6. Detached signature #print '\n#6. Detached signing and verification\n' # Usage ./cli_perf.py [working_directory] # # It's better to use RAMDISK to perform tests # in order to speed up disk reads/writes # # On linux: # mkdir -p /tmp/working # sudo mount -t tmpfs -o size=512m tmpfs /tmp/working # ./cli_perf.py -w /tmp/working # sudo umount /tmp/working if __name__ == '__main__': # parse options parser = ArgumentParser(description="RNP benchmarking") parser.add_argument("-b", "--bench", dest="benchmarks", help="Name of the comma-separated benchmarks to run", metavar="benchmarks") parser.add_argument("-w", "--workdir", dest="workdir", help="Working directory to use", metavar="workdir") parser.add_argument("-l", "--list", help="Print list of available benchmarks and exit", action="store_true") args = parser.parse_args() # get list of benchamrks to run bench_methods = [ x[0] for x in inspect.getmembers(Benchmark, predicate=lambda x: inspect.ismethod(x) or inspect.isfunction(x))] print(bench_methods) if args.list: for name in bench_methods: logging.info(("\t " + name)) sys.exit(0) if args.benchmarks: bench_methods = filter(lambda x: x in args.benchmarks.split(","), bench_methods) # setup operations setup(args.workdir) for name in bench_methods: method = getattr(Benchmark, name) logging.info(("\n" + name + "(): " + inspect.getdoc(method))) method(Benchmark()) try: shutil.rmtree(WORKDIR) except Exception: logging.info(("Cleanup failed"))