summaryrefslogtreecommitdiffstats
path: root/mycli/config.py
diff options
context:
space:
mode:
Diffstat (limited to 'mycli/config.py')
-rw-r--r--mycli/config.py104
1 files changed, 81 insertions, 23 deletions
diff --git a/mycli/config.py b/mycli/config.py
index e0f2d1f..5d71109 100644
--- a/mycli/config.py
+++ b/mycli/config.py
@@ -1,5 +1,3 @@
-import io
-import shutil
from copy import copy
from io import BytesIO, TextIOWrapper
import logging
@@ -7,11 +5,16 @@ import os
from os.path import exists
import struct
import sys
-from typing import Union
+from typing import Union, IO
from configobj import ConfigObj, ConfigObjError
-from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
-from cryptography.hazmat.backends import default_backend
+import pyaes
+
+try:
+ import importlib.resources as resources
+except ImportError:
+ # Python < 3.7
+ import importlib_resources as resources
try:
basestring
@@ -49,9 +52,9 @@ def read_config_file(f, list_values=True):
config = ConfigObj(f, interpolation=False, encoding='utf8',
list_values=list_values)
except ConfigObjError as e:
- log(logger, logging.ERROR, "Unable to parse line {0} of config file "
+ log(logger, logging.WARNING, "Unable to parse line {0} of config file "
"'{1}'.".format(e.line_number, f))
- log(logger, logging.ERROR, "Using successfully parsed config values.")
+ log(logger, logging.WARNING, "Using successfully parsed config values.")
return e.config
except (IOError, OSError) as e:
log(logger, logging.WARNING, "You don't have permission to read "
@@ -61,7 +64,7 @@ def read_config_file(f, list_values=True):
return config
-def get_included_configs(config_file: Union[str, io.TextIOWrapper]) -> list:
+def get_included_configs(config_file: Union[str, TextIOWrapper]) -> list:
"""Get a list of configuration files that are included into config_path
with !includedir directive.
@@ -95,7 +98,7 @@ def get_included_configs(config_file: Union[str, io.TextIOWrapper]) -> list:
def read_config_files(files, list_values=True):
"""Read and merge a list of config files."""
- config = ConfigObj(list_values=list_values)
+ config = create_default_config(list_values=list_values)
_files = copy(files)
while _files:
_file = _files.pop(0)
@@ -112,12 +115,21 @@ def read_config_files(files, list_values=True):
return config
-def write_default_config(source, destination, overwrite=False):
+def create_default_config(list_values=True):
+ import mycli
+ default_config_file = resources.open_text(mycli, 'myclirc')
+ return read_config_file(default_config_file, list_values=list_values)
+
+
+def write_default_config(destination, overwrite=False):
+ import mycli
+ default_config = resources.read_text(mycli, 'myclirc')
destination = os.path.expanduser(destination)
if not overwrite and exists(destination):
return
- shutil.copyfile(source, destination)
+ with open(destination, 'w') as f:
+ f.write(default_config)
def get_mylogin_cnf_path():
@@ -160,6 +172,58 @@ def open_mylogin_cnf(name):
return TextIOWrapper(plaintext)
+# TODO reuse code between encryption an decryption
+def encrypt_mylogin_cnf(plaintext: IO[str]):
+ """Encryption of .mylogin.cnf file, analogous to calling
+ mysql_config_editor.
+
+ Code is based on the python implementation by Kristian Koehntopp
+ https://github.com/isotopp/mysql-config-coder
+
+ """
+ def realkey(key):
+ """Create the AES key from the login key."""
+ rkey = bytearray(16)
+ for i in range(len(key)):
+ rkey[i % 16] ^= key[i]
+ return bytes(rkey)
+
+ def encode_line(plaintext, real_key, buf_len):
+ aes = pyaes.AESModeOfOperationECB(real_key)
+ text_len = len(plaintext)
+ pad_len = buf_len - text_len
+ pad_chr = bytes(chr(pad_len), "utf8")
+ plaintext = plaintext.encode() + pad_chr * pad_len
+ encrypted_text = b''.join(
+ [aes.encrypt(plaintext[i: i + 16])
+ for i in range(0, len(plaintext), 16)]
+ )
+ return encrypted_text
+
+ LOGIN_KEY_LENGTH = 20
+ key = os.urandom(LOGIN_KEY_LENGTH)
+ real_key = realkey(key)
+
+ outfile = BytesIO()
+
+ outfile.write(struct.pack("i", 0))
+ outfile.write(key)
+
+ while True:
+ line = plaintext.readline()
+ if not line:
+ break
+ real_len = len(line)
+ pad_len = (int(real_len / 16) + 1) * 16
+
+ outfile.write(struct.pack("i", pad_len))
+ x = encode_line(line, real_key, pad_len)
+ outfile.write(x)
+
+ outfile.seek(0)
+ return outfile
+
+
def read_and_decrypt_mylogin_cnf(f):
"""Read and decrypt the contents of .mylogin.cnf.
@@ -201,11 +265,9 @@ def read_and_decrypt_mylogin_cnf(f):
return None
rkey = struct.pack('16B', *rkey)
- # Create a decryptor object using the key.
- decryptor = _get_decryptor(rkey)
-
# Create a bytes buffer to hold the plaintext.
plaintext = BytesIO()
+ aes = pyaes.AESModeOfOperationECB(rkey)
while True:
# Read the length of the ciphertext.
@@ -216,7 +278,10 @@ def read_and_decrypt_mylogin_cnf(f):
# Read cipher_len bytes from the file and decrypt.
cipher = f.read(cipher_len)
- plain = _remove_pad(decryptor.update(cipher))
+ plain = _remove_pad(
+ b''.join([aes.decrypt(cipher[i: i + 16])
+ for i in range(0, cipher_len, 16)])
+ )
if plain is False:
continue
plaintext.write(plain)
@@ -244,7 +309,7 @@ def str_to_bool(s):
elif s.lower() in false_values:
return False
else:
- raise ValueError('not a recognized boolean value: %s'.format(s))
+ raise ValueError('not a recognized boolean value: {0}'.format(s))
def strip_matching_quotes(s):
@@ -260,15 +325,8 @@ def strip_matching_quotes(s):
return s
-def _get_decryptor(key):
- """Get the AES decryptor."""
- c = Cipher(algorithms.AES(key), modes.ECB(), backend=default_backend())
- return c.decryptor()
-
-
def _remove_pad(line):
"""Remove the pad from the *line*."""
- pad_length = ord(line[-1:])
try:
# Determine pad length.
pad_length = ord(line[-1:])