summaryrefslogtreecommitdiffstats
path: root/src/tests/cli_perf.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/tests/cli_perf.py')
-rwxr-xr-xsrc/tests/cli_perf.py316
1 files changed, 316 insertions, 0 deletions
diff --git a/src/tests/cli_perf.py b/src/tests/cli_perf.py
new file mode 100755
index 0000000..9015292
--- /dev/null
+++ b/src/tests/cli_perf.py
@@ -0,0 +1,316 @@
+#!/usr/bin/env python
+
+import sys
+import tempfile
+import shutil
+import inspect
+import os
+import logging
+from timeit import default_timer as perf_timer
+from argparse import ArgumentParser
+from cli_common import (
+ find_utility,
+ run_proc,
+ run_proc_fast,
+ pswd_pipe,
+ rnp_file_path,
+ size_to_readable,
+ raise_err
+)
+
+RNP = ''
+RNPK = ''
+GPG = ''
+WORKDIR = ''
+RNPDIR = ''
+GPGDIR = ''
+RMWORKDIR = False
+SMALL_ITERATIONS = 100
+LARGE_ITERATIONS = 5
+LARGESIZE = 1024*1024*100
+SMALLSIZE = 0
+SMALLFILE = 'smalltest.txt'
+LARGEFILE = 'largetest.txt'
+PASSWORD = 'password'
+
+def setup(workdir):
+ # Searching for rnp and gnupg
+ global RNP, GPG, RNPK, WORKDIR, RNPDIR, GPGDIR, SMALLSIZE, RMWORKDIR
+ logging.basicConfig(stream=sys.stdout, format="%(message)s")
+ logging.getLogger().setLevel(logging.INFO)
+
+ RNP = rnp_file_path('src/rnp/rnp')
+ RNPK = rnp_file_path('src/rnpkeys/rnpkeys')
+ GPG = find_utility('gpg')
+ if workdir:
+ WORKDIR = workdir
+ else:
+ WORKDIR = tempfile.mkdtemp(prefix = 'rnpptmp')
+ RMWORKDIR = True
+
+ logging.debug('Setting up test in {} ...'.format(WORKDIR))
+
+ # Creating working directory and populating it with test files
+ RNPDIR = os.path.join(WORKDIR, '.rnp')
+ GPGDIR = os.path.join(WORKDIR, '.gpg')
+ os.mkdir(RNPDIR, 0o700)
+ os.mkdir(GPGDIR, 0o700)
+
+ # Generating key
+ pipe = pswd_pipe(PASSWORD)
+ params = ['--homedir', RNPDIR, '--pass-fd', str(pipe), '--userid', 'performance@rnp',
+ '--generate-key']
+ # Run key generation
+ run_proc(RNPK, params)
+ os.close(pipe)
+
+
+ # Importing keys to GnuPG so it can build trustdb and so on
+ run_proc(GPG, ['--batch', '--passphrase', '', '--homedir', GPGDIR, '--import',
+ os.path.join(RNPDIR, 'pubring.gpg'), os.path.join(RNPDIR, 'secring.gpg')])
+
+ # Generating small file for tests
+ SMALLSIZE = 3312
+ st = 'lorem ipsum dol ' * (SMALLSIZE//16+1)
+ with open(os.path.join(WORKDIR, SMALLFILE), 'w+') as small_file:
+ small_file.write(st)
+
+ # Generating large file for tests
+ print('Generating large file of size {}'.format(size_to_readable(LARGESIZE)))
+
+ st = '0123456789ABCDEF' * (1024//16)
+ with open(os.path.join(WORKDIR, LARGEFILE), 'w') as fd:
+ for i in range(0, LARGESIZE // 1024):
+ fd.write(st)
+
+def run_iterated(iterations, func, src, dst, *args):
+ runtime = 0
+
+ for i in range(0, iterations):
+ tstart = perf_timer()
+ func(src, dst, *args)
+ runtime += perf_timer() - tstart
+ os.remove(dst)
+
+ res = runtime / iterations
+ #print '{} average run time: {}'.format(func.__name__, res)
+ return res
+
+def rnp_symencrypt_file(src, dst, cipher, zlevel = 6, zalgo = 'zip', armor = False):
+ params = ['--homedir', RNPDIR, '--password', PASSWORD, '--cipher', cipher,
+ '-z', str(zlevel), '--' + zalgo, '-c', src, '--output', dst]
+ if armor:
+ params += ['--armor']
+ ret = run_proc_fast(RNP, params)
+ if ret != 0:
+ raise_err('rnp symmetric encryption failed')
+
+def rnp_decrypt_file(src, dst):
+ ret = run_proc_fast(RNP, ['--homedir', RNPDIR, '--password', PASSWORD, '--decrypt', src,
+ '--output', dst])
+ if ret != 0:
+ raise_err('rnp decryption failed')
+
+def gpg_symencrypt_file(src, dst, cipher = 'AES', zlevel = 6, zalgo = 1, armor = False):
+ params = ['--homedir', GPGDIR, '-c', '-z', str(zlevel), '--s2k-count', '524288',
+ '--compress-algo', str(zalgo), '--batch', '--passphrase', PASSWORD,
+ '--cipher-algo', cipher, '--output', dst, src]
+ if armor:
+ params.insert(2, '--armor')
+ ret = run_proc_fast(GPG, params)
+ if ret != 0:
+ raise_err('gpg symmetric encryption failed for cipher ' + cipher)
+
+def gpg_decrypt_file(src, dst, keypass):
+ ret = run_proc_fast(GPG, ['--homedir', GPGDIR, '--pinentry-mode=loopback', '--batch',
+ '--yes', '--passphrase', keypass, '--trust-model', 'always',
+ '-o', dst, '-d', src])
+ if ret != 0:
+ raise_err('gpg decryption failed')
+
+def print_test_results(fsize, rnptime, gpgtime, operation):
+ if not rnptime or not gpgtime:
+ logging.info('{}:TEST FAILED'.format(operation))
+
+ if fsize == SMALLSIZE:
+ rnpruns = 1.0 / rnptime
+ gpgruns = 1.0 / gpgtime
+ runstr = '{:.2f} runs/sec vs {:.2f} runs/sec'.format(rnpruns, gpgruns)
+
+ if rnpruns >= gpgruns:
+ percents = (rnpruns - gpgruns) / gpgruns * 100
+ logging.info('{:<30}: RNP is {:>3.0f}% FASTER then GnuPG ({})'.format(
+ operation, percents, runstr))
+ else:
+ percents = (gpgruns - rnpruns) / gpgruns * 100
+ logging.info('{:<30}: RNP is {:>3.0f}% SLOWER then GnuPG ({})'.format(
+ operation, percents, runstr))
+ else:
+ rnpspeed = fsize / 1024.0 / 1024.0 / rnptime
+ gpgspeed = fsize / 1024.0 / 1024.0 / gpgtime
+ spdstr = '{:.2f} MB/sec vs {:.2f} MB/sec'.format(rnpspeed, gpgspeed)
+
+ if rnpspeed >= gpgspeed:
+ percents = (rnpspeed - gpgspeed) / gpgspeed * 100
+ logging.info('{:<30}: RNP is {:>3.0f}% FASTER then GnuPG ({})'.format(
+ operation, percents, spdstr))
+ else:
+ percents = (gpgspeed - rnpspeed) / gpgspeed * 100
+ logging.info('{:<30}: RNP is {:>3.0f}% SLOWER then GnuPG ({})'.format(
+ operation, percents, spdstr))
+
+def get_file_params(filetype):
+ if filetype == 'small':
+ infile, outfile, iterations, fsize = (SMALLFILE, SMALLFILE + '.gpg',
+ SMALL_ITERATIONS, SMALLSIZE)
+ else:
+ infile, outfile, iterations, fsize = (LARGEFILE, LARGEFILE + '.gpg',
+ LARGE_ITERATIONS, LARGESIZE)
+
+ infile = os.path.join(WORKDIR, infile)
+ rnpout = os.path.join(WORKDIR, outfile + '.rnp')
+ gpgout = os.path.join(WORKDIR, outfile + '.gpg')
+ return (infile, rnpout, gpgout, iterations, fsize)
+
+
+class Benchmark(object):
+ rnphome = ['--homedir', RNPDIR]
+ gpghome = ['--homedir', GPGDIR]
+
+ def small_file_symmetric_encryption(self):
+ # Running each operation iteratively for a small and large file(s), calculating the average
+ # 1. Encryption
+ '''
+ Small file symmetric encryption
+ '''
+ infile, rnpout, gpgout, iterations, fsize = get_file_params('small')
+ for armor in [False, True]:
+ tmrnp = run_iterated(iterations, rnp_symencrypt_file, infile, rnpout,
+ 'AES128', 0, 'zip', armor)
+ tmgpg = run_iterated(iterations, gpg_symencrypt_file, infile, gpgout,
+ 'AES128', 0, 1, armor)
+ testname = 'ENCRYPT-SMALL-{}'.format('ARMOR' if armor else 'BINARY')
+ print_test_results(fsize, tmrnp, tmgpg, testname)
+
+ def large_file_symmetric_encryption(self):
+ '''
+ Large file symmetric encryption
+ '''
+ infile, rnpout, gpgout, iterations, fsize = get_file_params('large')
+ for cipher in ['AES128', 'AES192', 'AES256', 'TWOFISH', 'BLOWFISH', 'CAST5', 'CAMELLIA128', 'CAMELLIA192', 'CAMELLIA256']:
+ tmrnp = run_iterated(iterations, rnp_symencrypt_file, infile, rnpout,
+ cipher, 0, 'zip', False)
+ tmgpg = run_iterated(iterations, gpg_symencrypt_file, infile, gpgout,
+ cipher, 0, 1, False)
+ testname = 'ENCRYPT-{}-BINARY'.format(cipher)
+ print_test_results(fsize, tmrnp, tmgpg, testname)
+
+ def large_file_armored_encryption(self):
+ '''
+ Large file armored encryption
+ '''
+ infile, rnpout, gpgout, iterations, fsize = get_file_params('large')
+ tmrnp = run_iterated(iterations, rnp_symencrypt_file, infile, rnpout,
+ 'AES128', 0, 'zip', True)
+ tmgpg = run_iterated(iterations, gpg_symencrypt_file, infile, gpgout, 'AES128', 0, 1, True)
+ print_test_results(fsize, tmrnp, tmgpg, 'ENCRYPT-LARGE-ARMOR')
+
+ def small_file_symmetric_decryption(self):
+ '''
+ Small file symmetric decryption
+ '''
+ infile, rnpout, gpgout, iterations, fsize = get_file_params('small')
+ inenc = infile + '.enc'
+ for armor in [False, True]:
+ gpg_symencrypt_file(infile, inenc, 'AES', 0, 1, armor)
+ tmrnp = run_iterated(iterations, rnp_decrypt_file, inenc, rnpout)
+ tmgpg = run_iterated(iterations, gpg_decrypt_file, inenc, gpgout, PASSWORD)
+ testname = 'DECRYPT-SMALL-{}'.format('ARMOR' if armor else 'BINARY')
+ print_test_results(fsize, tmrnp, tmgpg, testname)
+ os.remove(inenc)
+
+ def large_file_symmetric_decryption(self):
+ '''
+ Large file symmetric decryption
+ '''
+ infile, rnpout, gpgout, iterations, fsize = get_file_params('large')
+ inenc = infile + '.enc'
+ for cipher in ['AES128', 'AES192', 'AES256', 'TWOFISH', 'BLOWFISH', 'CAST5',
+ 'CAMELLIA128', 'CAMELLIA192', 'CAMELLIA256']:
+ gpg_symencrypt_file(infile, inenc, cipher, 0, 1, False)
+ tmrnp = run_iterated(iterations, rnp_decrypt_file, inenc, rnpout)
+ tmgpg = run_iterated(iterations, gpg_decrypt_file, inenc, gpgout, PASSWORD)
+ testname = 'DECRYPT-{}-BINARY'.format(cipher)
+ print_test_results(fsize, tmrnp, tmgpg, testname)
+ os.remove(inenc)
+
+ def large_file_armored_decryption(self):
+ '''
+ Large file armored decryption
+ '''
+ infile, rnpout, gpgout, iterations, fsize = get_file_params('large')
+ inenc = infile + '.enc'
+ gpg_symencrypt_file(infile, inenc, 'AES128', 0, 1, True)
+ tmrnp = run_iterated(iterations, rnp_decrypt_file, inenc, rnpout)
+ tmgpg = run_iterated(iterations, gpg_decrypt_file, inenc, gpgout, PASSWORD)
+ print_test_results(fsize, tmrnp, tmgpg, 'DECRYPT-LARGE-ARMOR')
+ os.remove(inenc)
+
+ # 3. Signing
+ #print '\n#3. Signing\n'
+ # 4. Verification
+ #print '\n#4. Verification\n'
+ # 5. Cleartext signing
+ #print '\n#5. Cleartext signing and verification\n'
+ # 6. Detached signature
+ #print '\n#6. Detached signing and verification\n'
+
+# Usage ./cli_perf.py [working_directory]
+#
+# It's better to use RAMDISK to perform tests
+# in order to speed up disk reads/writes
+#
+# On linux:
+# mkdir -p /tmp/working
+# sudo mount -t tmpfs -o size=512m tmpfs /tmp/working
+# ./cli_perf.py -w /tmp/working
+# sudo umount /tmp/working
+
+
+if __name__ == '__main__':
+
+ # parse options
+ parser = ArgumentParser(description="RNP benchmarking")
+ parser.add_argument("-b", "--bench", dest="benchmarks",
+ help="Name of the comma-separated benchmarks to run", metavar="benchmarks")
+ parser.add_argument("-w", "--workdir", dest="workdir",
+ help="Working directory to use", metavar="workdir")
+ parser.add_argument("-l", "--list", help="Print list of available benchmarks and exit",
+ action="store_true")
+ args = parser.parse_args()
+
+ # get list of benchamrks to run
+ bench_methods = [ x[0] for x in inspect.getmembers(Benchmark,
+ predicate=lambda x: inspect.ismethod(x) or inspect.isfunction(x))]
+ print(bench_methods)
+
+ if args.list:
+ for name in bench_methods:
+ logging.info(("\t " + name))
+ sys.exit(0)
+
+ if args.benchmarks:
+ bench_methods = filter(lambda x: x in args.benchmarks.split(","), bench_methods)
+
+ # setup operations
+ setup(args.workdir)
+
+ for name in bench_methods:
+ method = getattr(Benchmark, name)
+ logging.info(("\n" + name + "(): " + inspect.getdoc(method)))
+ method(Benchmark())
+
+ try:
+ shutil.rmtree(WORKDIR)
+ except Exception:
+ logging.info(("Cleanup failed"))