#!@PYTHON_SHEBANG@ from __future__ import division from optparse import OptionParser import collections import signal import os import stat import sys import re import subprocess import logging import logging.handlers import time import datetime import shutil import traceback import tempfile import MySQLdb import MySQLdb.connections from MySQLdb import OperationalError, ProgrammingError logger = None opts = None rocksdb_files = ['MANIFEST', 'CURRENT', 'OPTIONS'] rocksdb_data_suffix = '.sst' rocksdb_wal_suffix = '.log' exclude_files = ['master.info', 'relay-log.info', 'worker-relay-log.info', 'auto.cnf', 'gaplock.log', 'ibdata', 'ib_logfile', '.trash'] wdt_bin = 'wdt' def is_manifest(fname): for m in rocksdb_files: if fname.startswith(m): return True return False class Writer(object): a = None def __init__(self): a = None class StreamWriter(Writer): stream_cmd= '' def __init__(self, stream_option, direct = 0): super(StreamWriter, self).__init__() if stream_option == 'tar': self.stream_cmd= 'tar chf -' elif stream_option == 'xbstream': self.stream_cmd= 'xbstream -c' if direct: self.stream_cmd = self.stream_cmd + ' -d' else: raise Exception("Only tar or xbstream is supported as streaming option.") def write(self, file_name): rc= os.system(self.stream_cmd + " " + file_name) if (rc != 0): raise Exception("Got error on stream write: " + str(rc) + " " + file_name) class MiscFilesProcessor(): datadir = None wildcard = r'.*\.[frm|MYD|MYI|MAD|MAI|MRG|TRG|TRN|ARM|ARZ|CSM|CSV|opt|par]' regex = None start_backup_time = None skip_check_frm_timestamp = None def __init__(self, datadir, skip_check_frm_timestamp, start_backup_time): self.datadir = datadir self.regex = re.compile(self.wildcard) self.skip_check_frm_timestamp = skip_check_frm_timestamp self.start_backup_time = start_backup_time def process_db(self, db): # do nothing pass def process_file(self, path): # do nothing pass def check_frm_timestamp(self, fname, path): if not self.skip_check_frm_timestamp and fname.endswith('.frm'): if os.path.getmtime(path) > self.start_backup_time: logger.error('FRM file %s was updated after starting backups. ' 'Schema could have changed and the resulting copy may ' 'not be valid. Aborting. ' '(backup time: %s, file modifled time: %s)', path, datetime.datetime.fromtimestamp(self.start_backup_time).strftime('%Y-%m-%d %H:%M:%S'), datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime('%Y-%m-%d %H:%M:%S')) raise Exception("Inconsistent frm file timestamp"); def process(self): os.chdir(self.datadir) for db in self.get_databases(): logger.info("Starting MySQL misc file traversal from database %s..", db) self.process_db(db) for f in self.get_files(db): if self.match(f): rel_path = os.path.join(db, f) self.check_frm_timestamp(f, rel_path) self.process_file(rel_path) logger.info("Traversing misc files from data directory..") for f in self.get_files(""): should_skip = False for e in exclude_files: if f.startswith(e) or f.endswith(e): logger.info("Skipping %s", f) should_skip = True break if not should_skip: self.process_file(f) def match(self, filename): if self.regex.match(filename): return True else: return False def get_databases(self): dbs = [] dirs = [ d for d in os.listdir(self.datadir) \ if not os.path.isfile(os.path.join(self.datadir,d))] for db in dirs: if not db.startswith('.') and not self._is_socket(db) and not db == "#rocksdb": dbs.append(db) return dbs def get_files(self, db): dbdir = self.datadir + "/" + db return [ f for f in os.listdir(dbdir) \ if os.path.isfile(os.path.join(dbdir,f))] def _is_socket(self, item): mode = os.stat(os.path.join(self.datadir, item)).st_mode if stat.S_ISSOCK(mode): return True return False class MySQLBackup(MiscFilesProcessor): writer = None def __init__(self, datadir, writer, skip_check_frm_timestamp, start_backup_time): MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time) self.writer = writer def process_file(self, fname): # overriding base class self.writer.write(fname) class MiscFilesLinkCreator(MiscFilesProcessor): snapshot_dir = None def __init__(self, datadir, snapshot_dir, skip_check_frm_timestamp, start_backup_time): MiscFilesProcessor.__init__(self, datadir, skip_check_frm_timestamp, start_backup_time) self.snapshot_dir = snapshot_dir def process_db(self, db): snapshot_sub_dir = os.path.join(self.snapshot_dir, db) os.makedirs(snapshot_sub_dir) def process_file(self, path): dst_path = os.path.join(self.snapshot_dir, path) os.link(path, dst_path) # RocksDB backup class RocksDBBackup(): source_dir = None writer = None # sst files sent in this backup round sent_sst = {} # target sst files in this backup round target_sst = {} # sst files sent in all backup rounds total_sent_sst= {} # sum of sst file size sent in this backup round sent_sst_size = 0 # sum of target sst file size in this backup round # if sent_sst_size becomes equal to target_sst_size, # it means the backup round finished backing up all sst files target_sst_size = 0 # sum of all sst file size sent all backup rounds total_sent_sst_size= 0 # sum of all target sst file size from all backup rounds total_target_sst_size = 0 show_progress_size_interval= 1073741824 # 1GB wal_files= [] manifest_files= [] finished= False def __init__(self, source_dir, writer, prev): self.source_dir = source_dir self.writer = writer os.chdir(self.source_dir) self.init_target_files(prev) def init_target_files(self, prev): sst = {} self.sent_sst = {} self.target_sst= {} self.total_sent_sst = {} self.sent_sst_size = 0 self.target_sst_size = 0 self.total_sent_sst_size= 0 self.total_target_sst_size= 0 self.wal_files= [] self.manifest_files= [] for f in os.listdir(self.source_dir): if f.endswith(rocksdb_data_suffix): # exactly the same file (same size) was sent in previous backup rounds if prev is not None and f in prev.total_sent_sst and int(os.stat(f).st_size) == prev.total_sent_sst[f]: continue sst[f]= int(os.stat(f).st_size) self.target_sst_size = self.target_sst_size + os.stat(f).st_size elif is_manifest(f): self.manifest_files.append(f) elif f.endswith(rocksdb_wal_suffix): self.wal_files.append(f) self.target_sst= collections.OrderedDict(sorted(sst.items())) if prev is not None: self.total_sent_sst = prev.total_sent_sst self.total_sent_sst_size = prev.total_sent_sst_size self.total_target_sst_size = self.target_sst_size + prev.total_sent_sst_size else: self.total_target_sst_size = self.target_sst_size def do_backup_single(self, fname): self.writer.write(fname) os.remove(fname) def do_backup_sst(self, fname, size): self.do_backup_single(fname) self.sent_sst[fname]= size self.total_sent_sst[fname]= size self.sent_sst_size = self.sent_sst_size + size self.total_sent_sst_size = self.total_sent_sst_size + size def do_backup_manifest(self): for f in self.manifest_files: self.do_backup_single(f) def do_backup_wal(self): for f in self.wal_files: self.do_backup_single(f) # this is the last snapshot round. backing up all the rest files def do_backup_final(self): logger.info("Backup WAL..") self.do_backup_wal() logger.info("Backup Manifest..") self.do_backup_manifest() self.do_cleanup() self.finished= True def do_cleanup(self): shutil.rmtree(self.source_dir) logger.info("Cleaned up checkpoint from %s", self.source_dir) def do_backup_until(self, time_limit): logger.info("Starting backup from snapshot: target files %d", len(self.target_sst)) start_time= time.time() last_progress_time= start_time progress_size= 0 for fname, size in self.target_sst.iteritems(): self.do_backup_sst(fname, size) progress_size= progress_size + size elapsed_seconds = time.time() - start_time progress_seconds = time.time() - last_progress_time if self.should_show_progress(size): self.show_progress(progress_size, progress_seconds) progress_size=0 last_progress_time= time.time() if elapsed_seconds > time_limit and self.has_sent_all_sst() is False: logger.info("Snapshot round finished. Elapsed Time: %5.2f. Remaining sst files: %d", elapsed_seconds, len(self.target_sst) - len(self.sent_sst)) self.do_cleanup() break; if self.has_sent_all_sst(): self.do_backup_final() return self def should_show_progress(self, size): if int(self.total_sent_sst_size/self.show_progress_size_interval) > int((self.total_sent_sst_size-size)/self.show_progress_size_interval): return True else: return False def show_progress(self, size, seconds): logger.info("Backup Progress: %5.2f%% Sent %6.2f GB of %6.2f GB data, Transfer Speed: %6.2f MB/s", self.total_sent_sst_size*100/self.total_target_sst_size, self.total_sent_sst_size/1024/1024/1024, self.total_target_sst_size/1024/1024/1024, size/seconds/1024/1024) def print_backup_report(self): logger.info("Sent %6.2f GB of sst files, %d files in total.", self.total_sent_sst_size/1024/1024/1024, len(self.total_sent_sst)) def has_sent_all_sst(self): if self.sent_sst_size == self.target_sst_size: return True return False class MySQLUtil: @staticmethod def connect(user, password, port, socket=None): if socket: dbh = MySQLdb.Connect(user=user, passwd=password, unix_socket=socket) else: dbh = MySQLdb.Connect(user=user, passwd=password, port=port, host="127.0.0.1") return dbh @staticmethod def create_checkpoint(dbh, checkpoint_dir): sql = ("SET GLOBAL rocksdb_create_checkpoint='{0}'" .format(checkpoint_dir)) cur= dbh.cursor() cur.execute(sql) cur.close() @staticmethod def get_datadir(dbh): sql = "SELECT @@datadir" cur = dbh.cursor() cur.execute(sql) row = cur.fetchone() return row[0] @staticmethod def is_directio_enabled(dbh): sql = "SELECT @@global.rocksdb_use_direct_reads" cur = dbh.cursor() cur.execute(sql) row = cur.fetchone() return row[0] class BackupRunner: datadir = None start_backup_time = None def __init__(self, datadir): self.datadir = datadir self.start_backup_time = time.time() def start_backup_round(self, backup_round, prev_backup): def signal_handler(*args): logger.info("Got signal. Exit") if b is not None: logger.info("Cleaning up snapshot directory..") b.do_cleanup() sys.exit(1) b = None try: signal.signal(signal.SIGINT, signal_handler) w = None if not opts.output_stream: raise Exception("Currently only streaming backup is supported.") snapshot_dir = opts.checkpoint_directory + '/' + str(backup_round) dbh = MySQLUtil.connect(opts.mysql_user, opts.mysql_password, opts.mysql_port, opts.mysql_socket) direct = MySQLUtil.is_directio_enabled(dbh) logger.info("Direct I/O: %d", direct) w = StreamWriter(opts.output_stream, direct) if not self.datadir: self.datadir = MySQLUtil.get_datadir(dbh) logger.info("Set datadir: %s", self.datadir) logger.info("Creating checkpoint at %s", snapshot_dir) MySQLUtil.create_checkpoint(dbh, snapshot_dir) logger.info("Created checkpoint at %s", snapshot_dir) b = RocksDBBackup(snapshot_dir, w, prev_backup) return b.do_backup_until(opts.checkpoint_interval) except Exception as e: logger.error(e) logger.error(traceback.format_exc()) if b is not None: logger.info("Cleaning up snapshot directory.") b.do_cleanup() sys.exit(1) def backup_mysql(self): try: w = None if opts.output_stream: w = StreamWriter(opts.output_stream) else: raise Exception("Currently only streaming backup is supported.") b = MySQLBackup(self.datadir, w, opts.skip_check_frm_timestamp, self.start_backup_time) logger.info("Taking MySQL misc backups..") b.process() logger.info("MySQL misc backups done.") except Exception as e: logger.error(e) logger.error(traceback.format_exc()) sys.exit(1) class WDTBackup: datadir = None start_backup_time = None def __init__(self, datadir): self.datadir = datadir self.start_backup_time = time.time() def cleanup(self, snapshot_dir, server_log): if server_log: server_log.seek(0) logger.info("WDT server log:") logger.info(server_log.read()) server_log.close() if snapshot_dir: logger.info("Cleaning up snapshot dir %s", snapshot_dir) shutil.rmtree(snapshot_dir) def backup_with_timeout(self, backup_round): def signal_handler(*args): logger.info("Got signal. Exit") self.cleanup(snapshot_dir, server_log) sys.exit(1) logger.info("Starting backup round %d", backup_round) snapshot_dir = None server_log = None try: signal.signal(signal.SIGINT, signal_handler) # create rocksdb snapshot snapshot_dir = os.path.join(opts.checkpoint_directory, str(backup_round)) dbh = MySQLUtil.connect(opts.mysql_user, opts.mysql_password, opts.mysql_port, opts.mysql_socket) logger.info("Creating checkpoint at %s", snapshot_dir) MySQLUtil.create_checkpoint(dbh, snapshot_dir) logger.info("Created checkpoint at %s", snapshot_dir) # get datadir if not provided if not self.datadir: self.datadir = MySQLUtil.get_datadir(dbh) logger.info("Set datadir: %s", self.datadir) # create links for misc files link_creator = MiscFilesLinkCreator(self.datadir, snapshot_dir, opts.skip_check_frm_timestamp, self.start_backup_time) link_creator.process() current_path = os.path.join(opts.backupdir, "CURRENT") # construct receiver cmd, using the data directory as recovery-id. # we delete the current file because it is not append-only, therefore not # resumable. remote_cmd = ( "ssh {0} rm -f {1}; " "{2} -directory {3} -enable_download_resumption " "-recovery_id {4} -start_port 0 -abort_after_seconds {5} {6}" ).format(opts.destination, current_path, wdt_bin, opts.backupdir, self.datadir, opts.checkpoint_interval, opts.extra_wdt_receiver_options) logger.info("WDT remote cmd %s", remote_cmd) server_log = tempfile.TemporaryFile() remote_process = subprocess.Popen(remote_cmd.split(), stdout=subprocess.PIPE, stderr=server_log) wdt_url = remote_process.stdout.readline().strip() if not wdt_url: raise Exception("Unable to get connection url from wdt receiver") sender_cmd = ( "{0} -connection_url \'{1}\' -directory {2} -app_name=myrocks " "-avg_mbytes_per_sec {3} " "-enable_download_resumption -abort_after_seconds {4} {5}" ).format(wdt_bin, wdt_url, snapshot_dir, opts.avg_mbytes_per_sec, opts.checkpoint_interval, opts.extra_wdt_sender_options) sender_status = os.system(sender_cmd) >> 8 remote_status = remote_process.wait() self.cleanup(snapshot_dir, server_log) # TODO: handle retryable and non-retyable errors differently return (sender_status == 0 and remote_status == 0) except Exception as e: logger.error(e) logger.error(traceback.format_exc()) self.cleanup(snapshot_dir, server_log) sys.exit(1) def backup_using_wdt(): if not opts.destination: logger.error("Must provide remote destination when using WDT") sys.exit(1) # TODO: detect whether WDT is installed logger.info("Backing up myrocks to %s using WDT", opts.destination) wdt_backup = WDTBackup(opts.datadir) finished = False backup_round = 1 while not finished: start_time = time.time() finished = wdt_backup.backup_with_timeout(backup_round) end_time = time.time() duration_seconds = end_time - start_time if (not finished) and (duration_seconds < opts.checkpoint_interval): # round finished before timeout sleep_duration = (opts.checkpoint_interval - duration_seconds) logger.info("Sleeping for %f seconds", sleep_duration) time.sleep(sleep_duration) backup_round = backup_round + 1 logger.info("Finished myrocks backup using WDT") def init_logger(): global logger logger = logging.getLogger('myrocks_hotbackup') logger.setLevel(logging.INFO) h1= logging.StreamHandler(sys.stderr) f = logging.Formatter("%(asctime)s.%(msecs)03d %(levelname)s %(message)s", "%Y-%m-%d %H:%M:%S") h1.setFormatter(f) logger.addHandler(h1) backup_wdt_usage = ("Backup using WDT: myrocks_hotbackup " "--user=root --password=pw --stream=wdt " "--checkpoint_dir= --destination= --backup_dir=" ". This has to be executed at the src " "host.") backup_usage= "Backup: set -o pipefail; myrocks_hotbackup --user=root --password=pw --port=3306 --checkpoint_dir= | ssh -o NoneEnabled=yes remote_server 'tar -xi -C ' . You need to execute backup command on a server where you take backups." move_back_usage= "Move-Back: myrocks_hotbackup --move_back --datadir= --rocksdb_datadir= --rocksdb_waldir= --backup_dir= . You need to execute move-back command on a server where backup files are sent." def parse_options(): global opts parser = OptionParser(usage = "\n\n" + backup_usage + "\n\n" + \ backup_wdt_usage + "\n\n" + move_back_usage) parser.add_option('-i', '--interval', type='int', dest='checkpoint_interval', default=300, help='Number of seconds to renew checkpoint') parser.add_option('-c', '--checkpoint_dir', type='string', dest='checkpoint_directory', default='/data/mysql/backup/snapshot', help='Local directory name where checkpoints will be created.') parser.add_option('-d', '--datadir', type='string', dest='datadir', default=None, help='backup mode: src MySQL datadir. move_back mode: dest MySQL datadir') parser.add_option('-s', '--stream', type='string', dest='output_stream', default='tar', help='Setting streaming backup options. Currently tar, WDT ' 'and xbstream are supported. Default is tar') parser.add_option('--destination', type='string', dest='destination', default='', help='Remote server name. Only used for WDT mode so far.') parser.add_option('--avg_mbytes_per_sec', type='int', dest='avg_mbytes_per_sec', default=500, help='Average backup rate in MBytes/sec. WDT only.') parser.add_option('--extra_wdt_sender_options', type='string', dest='extra_wdt_sender_options', default='', help='Extra options for WDT sender') parser.add_option('--extra_wdt_receiver_options', type='string', dest='extra_wdt_receiver_options', default='', help='Extra options for WDT receiver') parser.add_option('-u', '--user', type='string', dest='mysql_user', default='root', help='MySQL user name') parser.add_option('-p', '--password', type='string', dest='mysql_password', default='', help='MySQL password name') parser.add_option('-P', '--port', type='int', dest='mysql_port', default=3306, help='MySQL port number') parser.add_option('-S', '--socket', type='string', dest='mysql_socket', default=None, help='MySQL socket path. Takes precedence over --port.') parser.add_option('-m', '--move_back', action='store_true', dest='move_back', default=False, help='Moving MyRocks backup files to proper locations.') parser.add_option('-r', '--rocksdb_datadir', type='string', dest='rocksdb_datadir', default=None, help='RocksDB target data directory where backup data files will be moved. Must be empty.') parser.add_option('-w', '--rocksdb_waldir', type='string', dest='rocksdb_waldir', default=None, help='RocksDB target data directory where backup wal files will be moved. Must be empty.') parser.add_option('-b', '--backup_dir', type='string', dest='backupdir', default=None, help='backup mode for WDT: Remote directory to store ' 'backup. move_back mode: Locations where backup ' 'files are stored.') parser.add_option('-f', '--skip_check_frm_timestamp', dest='skip_check_frm_timestamp', action='store_true', default=False, help='skipping to check if frm files are updated after starting backup.') parser.add_option('-D', '--debug_signal_file', type='string', dest='debug_signal_file', default=None, help='debugging purpose: waiting until the specified file is created') opts, args = parser.parse_args() def create_moveback_dir(directory): if not os.path.exists(directory): os.makedirs(directory) else: for f in os.listdir(directory): logger.error("Directory %s has file or directory %s!", directory, f) raise def print_move_back_usage(): logger.warning(move_back_usage) def move_back(): if opts.rocksdb_datadir is None or opts.rocksdb_waldir is None or opts.backupdir is None or opts.datadir is None: print_move_back_usage() sys.exit() create_moveback_dir(opts.datadir) create_moveback_dir(opts.rocksdb_datadir) create_moveback_dir(opts.rocksdb_waldir) os.chdir(opts.backupdir) for f in os.listdir(opts.backupdir): if os.path.isfile(os.path.join(opts.backupdir,f)): if f.endswith(rocksdb_wal_suffix): shutil.move(f, opts.rocksdb_waldir) elif f.endswith(rocksdb_data_suffix) or is_manifest(f): shutil.move(f, opts.rocksdb_datadir) else: shutil.move(f, opts.datadir) else: #directory if f.endswith('.rocksdb'): continue shutil.move(f, opts.datadir) def start_backup(): logger.info("Starting backup.") runner = BackupRunner(opts.datadir) b = None backup_round= 1 while True: b = runner.start_backup_round(backup_round, b) backup_round = backup_round + 1 if b.finished is True: b.print_backup_report() logger.info("RocksDB Backup Done.") break if opts.debug_signal_file: while not os.path.exists(opts.debug_signal_file): logger.info("Waiting until %s is created..", opts.debug_signal_file) time.sleep(1) runner.backup_mysql() logger.info("All Backups Done.") def main(): parse_options() init_logger() if opts.move_back is True: move_back() elif opts.output_stream == 'wdt': backup_using_wdt() else: start_backup() if __name__ == "__main__": main()