From 009d0b0f17cc82919a683a1ecb6a334f5354090d Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Mon, 8 Mar 2021 07:40:40 +0100 Subject: Merging upstream version 1.24.1. Signed-off-by: Daniel Baumann --- mycli/config.py | 104 +++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 81 insertions(+), 23 deletions(-) (limited to 'mycli/config.py') diff --git a/mycli/config.py b/mycli/config.py index e0f2d1f..5d71109 100644 --- a/mycli/config.py +++ b/mycli/config.py @@ -1,5 +1,3 @@ -import io -import shutil from copy import copy from io import BytesIO, TextIOWrapper import logging @@ -7,11 +5,16 @@ import os from os.path import exists import struct import sys -from typing import Union +from typing import Union, IO from configobj import ConfigObj, ConfigObjError -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.backends import default_backend +import pyaes + +try: + import importlib.resources as resources +except ImportError: + # Python < 3.7 + import importlib_resources as resources try: basestring @@ -49,9 +52,9 @@ def read_config_file(f, list_values=True): config = ConfigObj(f, interpolation=False, encoding='utf8', list_values=list_values) except ConfigObjError as e: - log(logger, logging.ERROR, "Unable to parse line {0} of config file " + log(logger, logging.WARNING, "Unable to parse line {0} of config file " "'{1}'.".format(e.line_number, f)) - log(logger, logging.ERROR, "Using successfully parsed config values.") + log(logger, logging.WARNING, "Using successfully parsed config values.") return e.config except (IOError, OSError) as e: log(logger, logging.WARNING, "You don't have permission to read " @@ -61,7 +64,7 @@ def read_config_file(f, list_values=True): return config -def get_included_configs(config_file: Union[str, io.TextIOWrapper]) -> list: +def get_included_configs(config_file: Union[str, TextIOWrapper]) -> list: """Get a list of configuration files that are included into config_path with !includedir directive. @@ -95,7 +98,7 @@ def get_included_configs(config_file: Union[str, io.TextIOWrapper]) -> list: def read_config_files(files, list_values=True): """Read and merge a list of config files.""" - config = ConfigObj(list_values=list_values) + config = create_default_config(list_values=list_values) _files = copy(files) while _files: _file = _files.pop(0) @@ -112,12 +115,21 @@ def read_config_files(files, list_values=True): return config -def write_default_config(source, destination, overwrite=False): +def create_default_config(list_values=True): + import mycli + default_config_file = resources.open_text(mycli, 'myclirc') + return read_config_file(default_config_file, list_values=list_values) + + +def write_default_config(destination, overwrite=False): + import mycli + default_config = resources.read_text(mycli, 'myclirc') destination = os.path.expanduser(destination) if not overwrite and exists(destination): return - shutil.copyfile(source, destination) + with open(destination, 'w') as f: + f.write(default_config) def get_mylogin_cnf_path(): @@ -160,6 +172,58 @@ def open_mylogin_cnf(name): return TextIOWrapper(plaintext) +# TODO reuse code between encryption an decryption +def encrypt_mylogin_cnf(plaintext: IO[str]): + """Encryption of .mylogin.cnf file, analogous to calling + mysql_config_editor. + + Code is based on the python implementation by Kristian Koehntopp + https://github.com/isotopp/mysql-config-coder + + """ + def realkey(key): + """Create the AES key from the login key.""" + rkey = bytearray(16) + for i in range(len(key)): + rkey[i % 16] ^= key[i] + return bytes(rkey) + + def encode_line(plaintext, real_key, buf_len): + aes = pyaes.AESModeOfOperationECB(real_key) + text_len = len(plaintext) + pad_len = buf_len - text_len + pad_chr = bytes(chr(pad_len), "utf8") + plaintext = plaintext.encode() + pad_chr * pad_len + encrypted_text = b''.join( + [aes.encrypt(plaintext[i: i + 16]) + for i in range(0, len(plaintext), 16)] + ) + return encrypted_text + + LOGIN_KEY_LENGTH = 20 + key = os.urandom(LOGIN_KEY_LENGTH) + real_key = realkey(key) + + outfile = BytesIO() + + outfile.write(struct.pack("i", 0)) + outfile.write(key) + + while True: + line = plaintext.readline() + if not line: + break + real_len = len(line) + pad_len = (int(real_len / 16) + 1) * 16 + + outfile.write(struct.pack("i", pad_len)) + x = encode_line(line, real_key, pad_len) + outfile.write(x) + + outfile.seek(0) + return outfile + + def read_and_decrypt_mylogin_cnf(f): """Read and decrypt the contents of .mylogin.cnf. @@ -201,11 +265,9 @@ def read_and_decrypt_mylogin_cnf(f): return None rkey = struct.pack('16B', *rkey) - # Create a decryptor object using the key. - decryptor = _get_decryptor(rkey) - # Create a bytes buffer to hold the plaintext. plaintext = BytesIO() + aes = pyaes.AESModeOfOperationECB(rkey) while True: # Read the length of the ciphertext. @@ -216,7 +278,10 @@ def read_and_decrypt_mylogin_cnf(f): # Read cipher_len bytes from the file and decrypt. cipher = f.read(cipher_len) - plain = _remove_pad(decryptor.update(cipher)) + plain = _remove_pad( + b''.join([aes.decrypt(cipher[i: i + 16]) + for i in range(0, cipher_len, 16)]) + ) if plain is False: continue plaintext.write(plain) @@ -244,7 +309,7 @@ def str_to_bool(s): elif s.lower() in false_values: return False else: - raise ValueError('not a recognized boolean value: %s'.format(s)) + raise ValueError('not a recognized boolean value: {0}'.format(s)) def strip_matching_quotes(s): @@ -260,15 +325,8 @@ def strip_matching_quotes(s): return s -def _get_decryptor(key): - """Get the AES decryptor.""" - c = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend()) - return c.decryptor() - - def _remove_pad(line): """Remove the pad from the *line*.""" - pad_length = ord(line[-1:]) try: # Determine pad length. pad_length = ord(line[-1:]) -- cgit v1.2.3