From 06eaf7232e9a920468c0f8d74dcf2fe8b555501c Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 13 Apr 2024 14:24:36 +0200 Subject: Adding upstream version 1:10.11.6. Signed-off-by: Daniel Baumann --- storage/rocksdb/myrocks_hotbackup.py | 697 +++++++++++++++++++++++++++++++++++ 1 file changed, 697 insertions(+) create mode 100755 storage/rocksdb/myrocks_hotbackup.py (limited to 'storage/rocksdb/myrocks_hotbackup.py') diff --git a/storage/rocksdb/myrocks_hotbackup.py b/storage/rocksdb/myrocks_hotbackup.py new file mode 100755 index 00000000..42c25c95 --- /dev/null +++ b/storage/rocksdb/myrocks_hotbackup.py @@ -0,0 +1,697 @@ +#!@PYTHON_SHEBANG@ + +from __future__ import division +from optparse import OptionParser +import collections +import signal +import os +import stat +import sys +import re +import subprocess +import logging +import logging.handlers +import time +import datetime +import shutil +import traceback +import tempfile + +import MySQLdb +import MySQLdb.connections +from MySQLdb import OperationalError, ProgrammingError + +logger = None +opts = None +rocksdb_files = ['MANIFEST', 'CURRENT', 'OPTIONS'] +rocksdb_data_suffix = '.sst' +rocksdb_wal_suffix = '.log' +exclude_files = ['master.info', 'relay-log.info', 'worker-relay-log.info', + 'auto.cnf', 'gaplock.log', 'ibdata', 'ib_logfile', '.trash'] +wdt_bin = 'wdt' + +def is_manifest(fname): + for m in rocksdb_files: + if fname.startswith(m): + return True + return False + +class Writer(object): + a = None + def __init__(self): + a = None + +class StreamWriter(Writer): + stream_cmd= '' + + def __init__(self, stream_option, direct = 0): + super(StreamWriter, self).__init__() + if stream_option == 'tar': + self.stream_cmd= 'tar chf -' + elif stream_option == 'xbstream': + self.stream_cmd= 'xbstream -c' + if direct: + self.stream_cmd = self.stream_cmd + ' -d' + else: + raise Exception("Only tar or xbstream is supported as streaming option.") + + def write(self, file_name): + rc= os.system(self.stream_cmd + " " + file_name) + if (rc != 0): + raise Exception("Got error on stream write: " + str(rc) + " " + file_name) + + +class MiscFilesProcessor(): + datadir = None + wildcard = r'.*\.[frm|MYD|MYI|MAD|MAI|MRG|TRG|TRN|ARM|ARZ|CSM|CSV|opt|par]' + regex = None + start_backup_time = None + skip_check_frm_timestamp = None + + def __init__(self, datadir, skip_check_frm_timestamp, start_backup_time): + self.datadir = datadir + self.regex = re.compile(self.wildcard) + self.skip_check_frm_timestamp = skip_check_frm_timestamp + self.start_backup_time = start_backup_time + + def process_db(self, db): + # do nothing + pass + + def process_file(self, path): + # do nothing + pass + + def check_frm_timestamp(self, fname, path): + if not self.skip_check_frm_timestamp and fname.endswith('.frm'): + if os.path.getmtime(path) > self.start_backup_time: + logger.error('FRM file %s was updated after starting backups. ' + 'Schema could have changed and the resulting copy may ' + 'not be valid. Aborting. ' + '(backup time: %s, file modifled time: %s)', + path, datetime.datetime.fromtimestamp(self.start_backup_time).strftime('%Y-%m-%d %H:%M:%S'), + datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime('%Y-%m-%d %H:%M:%S')) + raise Exception("Inconsistent frm file timestamp"); + + def process(self): + os.chdir(self.datadir) + for db in self.get_databases(): + logger.info("Starting MySQL misc file traversal from database %s..", db) + self.process_db(db) + for f in self.get_files(db): + if self.match(f): + rel_path = os.path.join(db, f) + self.check_frm_timestamp(f, rel_path) + self.process_file(rel_path) + logger.info("Traversing misc files from data directory..") + for f in self.get_files(""): + should_skip = False + for e in exclude_files: + if f.startswith(e) or f.endswith(e): + logger.info("Skipping %s", f) + should_skip = True + break + if not should_skip: + self.process_file(f) + + def match(self, filename): + if self.regex.match(filename): + return True + else: + return False + + def get_databases(self): + dbs = [] + dirs = [ d for d in os.listdir(self.datadir) \ + if not os.path.isfile(os.path.join(self.datadir,d))] + for db in dirs: + if not db.startswith('.') and not self._is_socket(db) and not db == "#rocksdb": + dbs.append(db) + return dbs + + def get_files(self, db): + dbdir = self.datadir + "/" + db + return [ f for f in os.listdir(dbdir) \ + if os.path.isfile(os.path.join(dbdir,f))] + + def _is_socket(self, item): + mode = os.stat(os.path.join(self.datadir, item)).st_mode + if stat.S_ISSOCK(mode): + return True + return False + + +class MySQLBackup(MiscFilesProcessor): + writer = None + + def __init__(self, datadir, writer, skip_check_frm_timestamp, start_backup_time): + MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time) + self.writer = writer + + def process_file(self, fname): # overriding base class + self.writer.write(fname) + + +class MiscFilesLinkCreator(MiscFilesProcessor): + snapshot_dir = None + + def __init__(self, datadir, snapshot_dir, skip_check_frm_timestamp, start_backup_time): + MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time) + self.snapshot_dir = snapshot_dir + + def process_db(self, db): + snapshot_sub_dir = os.path.join(self.snapshot_dir, db) + os.makedirs(snapshot_sub_dir) + + def process_file(self, path): + dst_path = os.path.join(self.snapshot_dir, path) + os.link(path, dst_path) + + +# RocksDB backup +class RocksDBBackup(): + source_dir = None + writer = None + # sst files sent in this backup round + sent_sst = {} + # target sst files in this backup round + target_sst = {} + # sst files sent in all backup rounds + total_sent_sst= {} + # sum of sst file size sent in this backup round + sent_sst_size = 0 + # sum of target sst file size in this backup round + # if sent_sst_size becomes equal to target_sst_size, + # it means the backup round finished backing up all sst files + target_sst_size = 0 + # sum of all sst file size sent all backup rounds + total_sent_sst_size= 0 + # sum of all target sst file size from all backup rounds + total_target_sst_size = 0 + show_progress_size_interval= 1073741824 # 1GB + wal_files= [] + manifest_files= [] + finished= False + + def __init__(self, source_dir, writer, prev): + self.source_dir = source_dir + self.writer = writer + os.chdir(self.source_dir) + self.init_target_files(prev) + + def init_target_files(self, prev): + sst = {} + self.sent_sst = {} + self.target_sst= {} + self.total_sent_sst = {} + self.sent_sst_size = 0 + self.target_sst_size = 0 + self.total_sent_sst_size= 0 + self.total_target_sst_size= 0 + self.wal_files= [] + self.manifest_files= [] + + for f in os.listdir(self.source_dir): + if f.endswith(rocksdb_data_suffix): + # exactly the same file (same size) was sent in previous backup rounds + if prev is not None and f in prev.total_sent_sst and int(os.stat(f).st_size) == prev.total_sent_sst[f]: + continue + sst[f]= int(os.stat(f).st_size) + self.target_sst_size = self.target_sst_size + os.stat(f).st_size + elif is_manifest(f): + self.manifest_files.append(f) + elif f.endswith(rocksdb_wal_suffix): + self.wal_files.append(f) + self.target_sst= collections.OrderedDict(sorted(sst.items())) + + if prev is not None: + self.total_sent_sst = prev.total_sent_sst + self.total_sent_sst_size = prev.total_sent_sst_size + self.total_target_sst_size = self.target_sst_size + prev.total_sent_sst_size + else: + self.total_target_sst_size = self.target_sst_size + + def do_backup_single(self, fname): + self.writer.write(fname) + os.remove(fname) + + def do_backup_sst(self, fname, size): + self.do_backup_single(fname) + self.sent_sst[fname]= size + self.total_sent_sst[fname]= size + self.sent_sst_size = self.sent_sst_size + size + self.total_sent_sst_size = self.total_sent_sst_size + size + + def do_backup_manifest(self): + for f in self.manifest_files: + self.do_backup_single(f) + + def do_backup_wal(self): + for f in self.wal_files: + self.do_backup_single(f) + + # this is the last snapshot round. backing up all the rest files + def do_backup_final(self): + logger.info("Backup WAL..") + self.do_backup_wal() + logger.info("Backup Manifest..") + self.do_backup_manifest() + self.do_cleanup() + self.finished= True + + def do_cleanup(self): + shutil.rmtree(self.source_dir) + logger.info("Cleaned up checkpoint from %s", self.source_dir) + + def do_backup_until(self, time_limit): + logger.info("Starting backup from snapshot: target files %d", len(self.target_sst)) + start_time= time.time() + last_progress_time= start_time + progress_size= 0 + for fname, size in self.target_sst.iteritems(): + self.do_backup_sst(fname, size) + progress_size= progress_size + size + elapsed_seconds = time.time() - start_time + progress_seconds = time.time() - last_progress_time + + if self.should_show_progress(size): + self.show_progress(progress_size, progress_seconds) + progress_size=0 + last_progress_time= time.time() + + if elapsed_seconds > time_limit and self.has_sent_all_sst() is False: + logger.info("Snapshot round finished. Elapsed Time: %5.2f. Remaining sst files: %d", + elapsed_seconds, len(self.target_sst) - len(self.sent_sst)) + self.do_cleanup() + break; + if self.has_sent_all_sst(): + self.do_backup_final() + + return self + + def should_show_progress(self, size): + if int(self.total_sent_sst_size/self.show_progress_size_interval) > int((self.total_sent_sst_size-size)/self.show_progress_size_interval): + return True + else: + return False + + def show_progress(self, size, seconds): + logger.info("Backup Progress: %5.2f%% Sent %6.2f GB of %6.2f GB data, Transfer Speed: %6.2f MB/s", + self.total_sent_sst_size*100/self.total_target_sst_size, + self.total_sent_sst_size/1024/1024/1024, + self.total_target_sst_size/1024/1024/1024, + size/seconds/1024/1024) + + def print_backup_report(self): + logger.info("Sent %6.2f GB of sst files, %d files in total.", + self.total_sent_sst_size/1024/1024/1024, + len(self.total_sent_sst)) + + def has_sent_all_sst(self): + if self.sent_sst_size == self.target_sst_size: + return True + return False + + +class MySQLUtil: + @staticmethod + def connect(user, password, port, socket=None): + if socket: + dbh = MySQLdb.Connect(user=user, + passwd=password, + unix_socket=socket) + else: + dbh = MySQLdb.Connect(user=user, + passwd=password, + port=port, + host="127.0.0.1") + return dbh + + @staticmethod + def create_checkpoint(dbh, checkpoint_dir): + sql = ("SET GLOBAL rocksdb_create_checkpoint='{0}'" + .format(checkpoint_dir)) + cur= dbh.cursor() + cur.execute(sql) + cur.close() + + @staticmethod + def get_datadir(dbh): + sql = "SELECT @@datadir" + cur = dbh.cursor() + cur.execute(sql) + row = cur.fetchone() + return row[0] + + @staticmethod + def is_directio_enabled(dbh): + sql = "SELECT @@global.rocksdb_use_direct_reads" + cur = dbh.cursor() + cur.execute(sql) + row = cur.fetchone() + return row[0] + +class BackupRunner: + datadir = None + start_backup_time = None + + def __init__(self, datadir): + self.datadir = datadir + self.start_backup_time = time.time() + + def start_backup_round(self, backup_round, prev_backup): + def signal_handler(*args): + logger.info("Got signal. Exit") + if b is not None: + logger.info("Cleaning up snapshot directory..") + b.do_cleanup() + sys.exit(1) + + b = None + try: + signal.signal(signal.SIGINT, signal_handler) + w = None + if not opts.output_stream: + raise Exception("Currently only streaming backup is supported.") + + snapshot_dir = opts.checkpoint_directory + '/' + str(backup_round) + dbh = MySQLUtil.connect(opts.mysql_user, + opts.mysql_password, + opts.mysql_port, + opts.mysql_socket) + direct = MySQLUtil.is_directio_enabled(dbh) + logger.info("Direct I/O: %d", direct) + + w = StreamWriter(opts.output_stream, direct) + + if not self.datadir: + self.datadir = MySQLUtil.get_datadir(dbh) + logger.info("Set datadir: %s", self.datadir) + logger.info("Creating checkpoint at %s", snapshot_dir) + MySQLUtil.create_checkpoint(dbh, snapshot_dir) + logger.info("Created checkpoint at %s", snapshot_dir) + b = RocksDBBackup(snapshot_dir, w, prev_backup) + return b.do_backup_until(opts.checkpoint_interval) + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + if b is not None: + logger.info("Cleaning up snapshot directory.") + b.do_cleanup() + sys.exit(1) + + def backup_mysql(self): + try: + w = None + if opts.output_stream: + w = StreamWriter(opts.output_stream) + else: + raise Exception("Currently only streaming backup is supported.") + b = MySQLBackup(self.datadir, w, opts.skip_check_frm_timestamp, + self.start_backup_time) + logger.info("Taking MySQL misc backups..") + b.process() + logger.info("MySQL misc backups done.") + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + sys.exit(1) + + +class WDTBackup: + datadir = None + start_backup_time = None + + def __init__(self, datadir): + self.datadir = datadir + self.start_backup_time = time.time() + + def cleanup(self, snapshot_dir, server_log): + if server_log: + server_log.seek(0) + logger.info("WDT server log:") + logger.info(server_log.read()) + server_log.close() + if snapshot_dir: + logger.info("Cleaning up snapshot dir %s", snapshot_dir) + shutil.rmtree(snapshot_dir) + + def backup_with_timeout(self, backup_round): + def signal_handler(*args): + logger.info("Got signal. Exit") + self.cleanup(snapshot_dir, server_log) + sys.exit(1) + + logger.info("Starting backup round %d", backup_round) + snapshot_dir = None + server_log = None + try: + signal.signal(signal.SIGINT, signal_handler) + # create rocksdb snapshot + snapshot_dir = os.path.join(opts.checkpoint_directory, str(backup_round)) + dbh = MySQLUtil.connect(opts.mysql_user, + opts.mysql_password, + opts.mysql_port, + opts.mysql_socket) + logger.info("Creating checkpoint at %s", snapshot_dir) + MySQLUtil.create_checkpoint(dbh, snapshot_dir) + logger.info("Created checkpoint at %s", snapshot_dir) + + # get datadir if not provided + if not self.datadir: + self.datadir = MySQLUtil.get_datadir(dbh) + logger.info("Set datadir: %s", self.datadir) + + # create links for misc files + link_creator = MiscFilesLinkCreator(self.datadir, snapshot_dir, + opts.skip_check_frm_timestamp, + self.start_backup_time) + link_creator.process() + + current_path = os.path.join(opts.backupdir, "CURRENT") + + # construct receiver cmd, using the data directory as recovery-id. + # we delete the current file because it is not append-only, therefore not + # resumable. + remote_cmd = ( + "ssh {0} rm -f {1}; " + "{2} -directory {3} -enable_download_resumption " + "-recovery_id {4} -start_port 0 -abort_after_seconds {5} {6}" + ).format(opts.destination, + current_path, + wdt_bin, + opts.backupdir, + self.datadir, + opts.checkpoint_interval, + opts.extra_wdt_receiver_options) + logger.info("WDT remote cmd %s", remote_cmd) + server_log = tempfile.TemporaryFile() + remote_process = subprocess.Popen(remote_cmd.split(), + stdout=subprocess.PIPE, + stderr=server_log) + wdt_url = remote_process.stdout.readline().strip() + if not wdt_url: + raise Exception("Unable to get connection url from wdt receiver") + sender_cmd = ( + "{0} -connection_url \'{1}\' -directory {2} -app_name=myrocks " + "-avg_mbytes_per_sec {3} " + "-enable_download_resumption -abort_after_seconds {4} {5}" + ).format(wdt_bin, + wdt_url, + snapshot_dir, + opts.avg_mbytes_per_sec, + opts.checkpoint_interval, + opts.extra_wdt_sender_options) + sender_status = os.system(sender_cmd) >> 8 + remote_status = remote_process.wait() + self.cleanup(snapshot_dir, server_log) + # TODO: handle retryable and non-retyable errors differently + return (sender_status == 0 and remote_status == 0) + + except Exception as e: + logger.error(e) + logger.error(traceback.format_exc()) + self.cleanup(snapshot_dir, server_log) + sys.exit(1) + + +def backup_using_wdt(): + if not opts.destination: + logger.error("Must provide remote destination when using WDT") + sys.exit(1) + + # TODO: detect whether WDT is installed + logger.info("Backing up myrocks to %s using WDT", opts.destination) + wdt_backup = WDTBackup(opts.datadir) + finished = False + backup_round = 1 + while not finished: + start_time = time.time() + finished = wdt_backup.backup_with_timeout(backup_round) + end_time = time.time() + duration_seconds = end_time - start_time + if (not finished) and (duration_seconds < opts.checkpoint_interval): + # round finished before timeout + sleep_duration = (opts.checkpoint_interval - duration_seconds) + logger.info("Sleeping for %f seconds", sleep_duration) + time.sleep(sleep_duration) + + backup_round = backup_round + 1 + logger.info("Finished myrocks backup using WDT") + + +def init_logger(): + global logger + logger = logging.getLogger('myrocks_hotbackup') + logger.setLevel(logging.INFO) + h1= logging.StreamHandler(sys.stderr) + f = logging.Formatter("%(asctime)s.%(msecs)03d %(levelname)s %(message)s", + "%Y-%m-%d %H:%M:%S") + h1.setFormatter(f) + logger.addHandler(h1) + +backup_wdt_usage = ("Backup using WDT: myrocks_hotbackup " + "--user=root --password=pw --stream=wdt " + "--checkpoint_dir= --destination= --backup_dir=" + ". This has to be executed at the src " + "host.") +backup_usage= "Backup: set -o pipefail; myrocks_hotbackup --user=root --password=pw --port=3306 --checkpoint_dir= | ssh -o NoneEnabled=yes remote_server 'tar -xi -C ' . You need to execute backup command on a server where you take backups." +move_back_usage= "Move-Back: myrocks_hotbackup --move_back --datadir= --rocksdb_datadir= --rocksdb_waldir= --backup_dir= . You need to execute move-back command on a server where backup files are sent." + + +def parse_options(): + global opts + parser = OptionParser(usage = "\n\n" + backup_usage + "\n\n" + \ + backup_wdt_usage + "\n\n" + move_back_usage) + parser.add_option('-i', '--interval', type='int', dest='checkpoint_interval', + default=300, + help='Number of seconds to renew checkpoint') + parser.add_option('-c', '--checkpoint_dir', type='string', dest='checkpoint_directory', + default='/data/mysql/backup/snapshot', + help='Local directory name where checkpoints will be created.') + parser.add_option('-d', '--datadir', type='string', dest='datadir', + default=None, + help='backup mode: src MySQL datadir. move_back mode: dest MySQL datadir') + parser.add_option('-s', '--stream', type='string', dest='output_stream', + default='tar', + help='Setting streaming backup options. Currently tar, WDT ' + 'and xbstream are supported. Default is tar') + parser.add_option('--destination', type='string', dest='destination', + default='', + help='Remote server name. Only used for WDT mode so far.') + parser.add_option('--avg_mbytes_per_sec', type='int', + dest='avg_mbytes_per_sec', + default=500, + help='Average backup rate in MBytes/sec. WDT only.') + parser.add_option('--extra_wdt_sender_options', type='string', + dest='extra_wdt_sender_options', + default='', + help='Extra options for WDT sender') + parser.add_option('--extra_wdt_receiver_options', type='string', + dest='extra_wdt_receiver_options', + default='', + help='Extra options for WDT receiver') + parser.add_option('-u', '--user', type='string', dest='mysql_user', + default='root', + help='MySQL user name') + parser.add_option('-p', '--password', type='string', dest='mysql_password', + default='', + help='MySQL password name') + parser.add_option('-P', '--port', type='int', dest='mysql_port', + default=3306, + help='MySQL port number') + parser.add_option('-S', '--socket', type='string', dest='mysql_socket', + default=None, + help='MySQL socket path. Takes precedence over --port.') + parser.add_option('-m', '--move_back', action='store_true', dest='move_back', + default=False, + help='Moving MyRocks backup files to proper locations.') + parser.add_option('-r', '--rocksdb_datadir', type='string', dest='rocksdb_datadir', + default=None, + help='RocksDB target data directory where backup data files will be moved. Must be empty.') + parser.add_option('-w', '--rocksdb_waldir', type='string', dest='rocksdb_waldir', + default=None, + help='RocksDB target data directory where backup wal files will be moved. Must be empty.') + parser.add_option('-b', '--backup_dir', type='string', dest='backupdir', + default=None, + help='backup mode for WDT: Remote directory to store ' + 'backup. move_back mode: Locations where backup ' + 'files are stored.') + parser.add_option('-f', '--skip_check_frm_timestamp', + dest='skip_check_frm_timestamp', + action='store_true', default=False, + help='skipping to check if frm files are updated after starting backup.') + parser.add_option('-D', '--debug_signal_file', type='string', dest='debug_signal_file', + default=None, + help='debugging purpose: waiting until the specified file is created') + + opts, args = parser.parse_args() + + +def create_moveback_dir(directory): + if not os.path.exists(directory): + os.makedirs(directory) + else: + for f in os.listdir(directory): + logger.error("Directory %s has file or directory %s!", directory, f) + raise + +def print_move_back_usage(): + logger.warning(move_back_usage) + +def move_back(): + if opts.rocksdb_datadir is None or opts.rocksdb_waldir is None or opts.backupdir is None or opts.datadir is None: + print_move_back_usage() + sys.exit() + create_moveback_dir(opts.datadir) + create_moveback_dir(opts.rocksdb_datadir) + create_moveback_dir(opts.rocksdb_waldir) + + os.chdir(opts.backupdir) + for f in os.listdir(opts.backupdir): + if os.path.isfile(os.path.join(opts.backupdir,f)): + if f.endswith(rocksdb_wal_suffix): + shutil.move(f, opts.rocksdb_waldir) + elif f.endswith(rocksdb_data_suffix) or is_manifest(f): + shutil.move(f, opts.rocksdb_datadir) + else: + shutil.move(f, opts.datadir) + else: #directory + if f.endswith('.rocksdb'): + continue + shutil.move(f, opts.datadir) + +def start_backup(): + logger.info("Starting backup.") + runner = BackupRunner(opts.datadir) + b = None + backup_round= 1 + while True: + b = runner.start_backup_round(backup_round, b) + backup_round = backup_round + 1 + if b.finished is True: + b.print_backup_report() + logger.info("RocksDB Backup Done.") + break + if opts.debug_signal_file: + while not os.path.exists(opts.debug_signal_file): + logger.info("Waiting until %s is created..", opts.debug_signal_file) + time.sleep(1) + runner.backup_mysql() + logger.info("All Backups Done.") + + +def main(): + parse_options() + init_logger() + + if opts.move_back is True: + move_back() + elif opts.output_stream == 'wdt': + backup_using_wdt() + else: + start_backup() + +if __name__ == "__main__": + main() -- cgit v1.2.3