diff options
Diffstat (limited to 'extra/mariabackup/ds_tmpfile.cc')
-rw-r--r-- | extra/mariabackup/ds_tmpfile.cc | 230 |
1 files changed, 230 insertions, 0 deletions
diff --git a/extra/mariabackup/ds_tmpfile.cc b/extra/mariabackup/ds_tmpfile.cc new file mode 100644 index 00000000..80b9d3bb --- /dev/null +++ b/extra/mariabackup/ds_tmpfile.cc @@ -0,0 +1,230 @@ +/****************************************************** +Copyright (c) 2012 Percona LLC and/or its affiliates. + +tmpfile datasink for XtraBackup. + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation; version 2 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA + +*******************************************************/ + +/* Do all writes to temporary files first, then pipe them to the specified +datasink in a serialized way in deinit(). */ + +#include <my_global.h> +#include <my_base.h> +#include "common.h" +#include "datasink.h" + +typedef struct { + pthread_mutex_t mutex; + LIST *file_list; +} ds_tmpfile_ctxt_t; + +typedef struct { + LIST list; + File fd; + char *orig_path; + MY_STAT mystat; + ds_file_t *file; +} ds_tmp_file_t; + +static ds_ctxt_t *tmpfile_init(const char *root); +static ds_file_t *tmpfile_open(ds_ctxt_t *ctxt, const char *path, + MY_STAT *mystat); +static int tmpfile_write(ds_file_t *file, const uchar *buf, size_t len); +static int tmpfile_close(ds_file_t *file); +static void tmpfile_deinit(ds_ctxt_t *ctxt); + +datasink_t datasink_tmpfile = { + &tmpfile_init, + &tmpfile_open, + &tmpfile_write, + &tmpfile_close, + &dummy_remove, + &tmpfile_deinit +}; + + +static ds_ctxt_t * +tmpfile_init(const char *root) +{ + ds_ctxt_t *ctxt; + ds_tmpfile_ctxt_t *tmpfile_ctxt; + + ctxt = (ds_ctxt_t *)my_malloc(PSI_NOT_INSTRUMENTED, + sizeof(ds_ctxt_t) + sizeof(ds_tmpfile_ctxt_t), MYF(MY_FAE)); + tmpfile_ctxt = (ds_tmpfile_ctxt_t *) (ctxt + 1); + tmpfile_ctxt->file_list = NULL; + if (pthread_mutex_init(&tmpfile_ctxt->mutex, NULL)) { + + my_free(ctxt); + return NULL; + } + + ctxt->ptr = tmpfile_ctxt; + ctxt->root = my_strdup(PSI_NOT_INSTRUMENTED, root, MYF(MY_FAE)); + + return ctxt; +} + +static ds_file_t * +tmpfile_open(ds_ctxt_t *ctxt, const char *path, + MY_STAT *mystat) +{ + ds_tmpfile_ctxt_t *tmpfile_ctxt; + char tmp_path[FN_REFLEN]; + ds_tmp_file_t *tmp_file; + ds_file_t *file; + size_t path_len; + File fd; + + /* Create a temporary file in tmpdir. The file will be automatically + removed on close. Code copied from mysql_tmpfile(). */ + fd = create_temp_file(tmp_path,xtrabackup_tmpdir, + "xbtemp", O_BINARY | O_SEQUENTIAL, + MYF(MY_WME | MY_TEMPORARY)); + + if (fd < 0) { + return NULL; + } + + path_len = strlen(path) + 1; /* terminating '\0' */ + + file = (ds_file_t *) my_malloc(PSI_NOT_INSTRUMENTED, + sizeof(ds_file_t) + sizeof(ds_tmp_file_t) + path_len, MYF(MY_FAE)); + + tmp_file = (ds_tmp_file_t *) (file + 1); + tmp_file->file = file; + memcpy(&tmp_file->mystat, mystat, sizeof(MY_STAT)); + /* Save a copy of 'path', since it may not be accessible later */ + tmp_file->orig_path = (char *) tmp_file + sizeof(ds_tmp_file_t); + + tmp_file->fd = fd; + memcpy(tmp_file->orig_path, path, path_len); + + /* Store the real temporary file name in file->path */ + file->path = my_strdup(PSI_NOT_INSTRUMENTED, tmp_path, MYF(MY_FAE)); + file->ptr = tmp_file; + + /* Store the file object in the list to be piped later */ + tmpfile_ctxt = (ds_tmpfile_ctxt_t *) ctxt->ptr; + tmp_file->list.data = tmp_file; + + pthread_mutex_lock(&tmpfile_ctxt->mutex); + tmpfile_ctxt->file_list = list_add(tmpfile_ctxt->file_list, + &tmp_file->list); + pthread_mutex_unlock(&tmpfile_ctxt->mutex); + + return file; +} + +static int +tmpfile_write(ds_file_t *file, const uchar *buf, size_t len) +{ + File fd = ((ds_tmp_file_t *) file->ptr)->fd; + + if (!my_write(fd, buf, len, MYF(MY_WME | MY_NABP))) { + posix_fadvise(fd, 0, 0, POSIX_FADV_DONTNEED); + return 0; + } + + return 1; +} + +static int +tmpfile_close(ds_file_t *file) +{ + /* Do nothing -- we will close (and thus remove) the file after piping + it to the destination datasink in tmpfile_deinit(). */ + + my_free(file->path); + + return 0; +} + +static void +tmpfile_deinit(ds_ctxt_t *ctxt) +{ + LIST *list; + ds_tmpfile_ctxt_t *tmpfile_ctxt; + MY_STAT mystat; + ds_tmp_file_t *tmp_file; + ds_file_t *dst_file; + ds_ctxt_t *pipe_ctxt; + void *buf = NULL; + const size_t buf_size = 10 * 1024 * 1024; + size_t bytes; + size_t offset; + + pipe_ctxt = ctxt->pipe_ctxt; + xb_a(pipe_ctxt != NULL); + + buf = my_malloc(PSI_NOT_INSTRUMENTED, buf_size, MYF(MY_FAE)); + + tmpfile_ctxt = (ds_tmpfile_ctxt_t *) ctxt->ptr; + list = tmpfile_ctxt->file_list; + + /* Walk the files in the order they have been added */ + list = list_reverse(list); + while (list != NULL) { + tmp_file = (ds_tmp_file_t *)list->data; + /* Stat the file to replace size and mtime on the original + * mystat struct */ + if (my_fstat(tmp_file->fd, &mystat, MYF(0))) { + die("my_fstat() failed."); + } + tmp_file->mystat.st_size = mystat.st_size; + tmp_file->mystat.st_mtime = mystat.st_mtime; + + dst_file = ds_open(pipe_ctxt, tmp_file->orig_path, + &tmp_file->mystat); + if (dst_file == NULL) { + die("could not stream a temporary file to " + "'%s'", tmp_file->orig_path); + } + + /* copy to the destination datasink */ + posix_fadvise(tmp_file->fd, 0, 0, POSIX_FADV_SEQUENTIAL); + if (my_seek(tmp_file->fd, 0, SEEK_SET, MYF(0)) == + MY_FILEPOS_ERROR) { + die("my_seek() failed for '%s', errno = %d.", + tmp_file->file->path, my_errno); + } + offset = 0; + while ((bytes = my_read(tmp_file->fd, (unsigned char *)buf, buf_size, + MYF(MY_WME))) > 0) { + posix_fadvise(tmp_file->fd, offset, buf_size, POSIX_FADV_DONTNEED); + offset += buf_size; + if (ds_write(dst_file, buf, bytes)) { + die("cannot write to stream for '%s'.", + tmp_file->orig_path); + } + } + if (bytes == (size_t) -1) { + die("my_read failed for %s", tmp_file->orig_path); + } + + my_close(tmp_file->fd, MYF(MY_WME)); + ds_close(dst_file); + + list = list_rest(list); + my_free(tmp_file->file); + } + + pthread_mutex_destroy(&tmpfile_ctxt->mutex); + + my_free(buf); + my_free(ctxt->root); + my_free(ctxt); +} |