diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
commit | 3f619478f796eddbba6e39502fe941b285dd97b1 (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /storage/maria/s3_func.c | |
parent | Initial commit. (diff) | |
download | mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.tar.xz mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/maria/s3_func.c')
-rw-r--r-- | storage/maria/s3_func.c | 1625 |
1 files changed, 1625 insertions, 0 deletions
diff --git a/storage/maria/s3_func.c b/storage/maria/s3_func.c new file mode 100644 index 00000000..3d18ba88 --- /dev/null +++ b/storage/maria/s3_func.c @@ -0,0 +1,1625 @@ +/* Copyright (C) 2019 MariaDB Corporation Ab + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software Foundation, + Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA */ + +/* + Interface function used by S3 storage engine and aria_copy_for_s3 +*/ + +#include "maria_def.h" +#include "s3_func.h" +#include <aria_backup.h> +#include <mysqld_error.h> +#include <sql_const.h> +#include <mysys_err.h> +#include <mysql_com.h> +#include <zlib.h> + +/* number of '.' to print during a copy in verbose mode */ +#define DISPLAY_WITH 79 + +static void convert_index_to_s3_format(uchar *header, ulong block_size, + int compression); +static void convert_index_to_disk_format(uchar *header); +static void convert_frm_to_s3_format(uchar *header); +static void convert_frm_to_disk_format(uchar *header); +static int s3_read_file_from_disk(const char *filename, uchar **to, + size_t *to_size, my_bool print_error); + +/* Used by ha_s3.cc and tools to define different protocol options */ + +static const char *protocol_types[]= {"Auto", "Original", "Amazon", NullS}; +TYPELIB s3_protocol_typelib= {array_elements(protocol_types)-1,"", + protocol_types, NULL}; + +/****************************************************************************** + Allocations handler for libmarias3 + To be removed when we do the init allocation in mysqld.cc +******************************************************************************/ + +static void *s3_wrap_malloc(size_t size) +{ + return my_malloc(PSI_NOT_INSTRUMENTED, size, MYF(MY_WME)); +} + +static void *s3_wrap_calloc(size_t nmemb, size_t size) +{ + return my_malloc(PSI_NOT_INSTRUMENTED, nmemb * size, + MYF(MY_WME | MY_ZEROFILL)); +} + +static void *s3_wrap_realloc(void *ptr, size_t size) +{ + return my_realloc(PSI_NOT_INSTRUMENTED, ptr, size, + MYF(MY_WME | MY_ALLOW_ZERO_PTR)); +} + +static char *s3_wrap_strdup(const char *str) +{ + return my_strdup(PSI_NOT_INSTRUMENTED, str, MYF(MY_WME)); +} + +static void s3_wrap_free(void *ptr) +{ + if (ptr) /* Avoid tracing of null */ + my_free(ptr); +} + +void s3_init_library() +{ + ms3_library_init_malloc(s3_wrap_malloc, s3_wrap_free, s3_wrap_realloc, + s3_wrap_strdup, s3_wrap_calloc); +} + +void s3_deinit_library() +{ + ms3_library_deinit(); +} + +/****************************************************************************** + Functions on S3_INFO and S3_BLOCK +******************************************************************************/ + +/* + Free memory allocated by s3_get_object +*/ + +void s3_free(S3_BLOCK *data) +{ + my_free(data->alloc_ptr); + data->alloc_ptr= 0; +} + + +/* + Copy a S3_INFO structure +*/ + +S3_INFO *s3_info_copy(S3_INFO *old) +{ + S3_INFO *to, tmp; + + /* Copy lengths */ + memcpy(&tmp, old, sizeof(tmp)); + /* Allocate new buffers */ + if (!my_multi_malloc(PSI_NOT_INSTRUMENTED, MY_WME, &to, sizeof(S3_INFO), + &tmp.access_key.str, old->access_key.length+1, + &tmp.secret_key.str, old->secret_key.length+1, + &tmp.region.str, old->region.length+1, + &tmp.bucket.str, old->bucket.length+1, + &tmp.database.str, old->database.length+1, + &tmp.table.str, old->table.length+1, + &tmp.base_table.str, old->base_table.length+1, + NullS)) + return 0; + /* Copy lengths and new pointers to to */ + memcpy(to, &tmp, sizeof(tmp)); + /* Copy data */ + strmov((char*) to->access_key.str, old->access_key.str); + strmov((char*) to->secret_key.str, old->secret_key.str); + strmov((char*) to->region.str, old->region.str); + strmov((char*) to->bucket.str, old->bucket.str); + /* Database may not be null terminated */ + strmake((char*) to->database.str, old->database.str, old->database.length); + strmov((char*) to->table.str, old->table.str); + strmov((char*) to->base_table.str, old->base_table.str); + return to; +} + +/** + Open a connection to s3 +*/ + +ms3_st *s3_open_connection(S3_INFO *s3) +{ + ms3_st *s3_client; + if (!(s3_client= ms3_init(s3->access_key.str, + s3->secret_key.str, + s3->region.str, + s3->host_name.str))) + { + my_printf_error(HA_ERR_NO_SUCH_TABLE, + "Can't open connection to S3, error: %d %s", MYF(0), + errno, ms3_error(errno)); + my_errno= HA_ERR_NO_SUCH_TABLE; + } + if (s3->protocol_version) + ms3_set_option(s3_client, MS3_OPT_FORCE_PROTOCOL_VERSION, + &s3->protocol_version); + if (s3->port) + ms3_set_option(s3_client, MS3_OPT_PORT_NUMBER, &s3->port); + + if (s3->use_http) + ms3_set_option(s3_client, MS3_OPT_USE_HTTP, NULL); + + return s3_client; +} + +/** + close a connection to s3 +*/ + +void s3_deinit(ms3_st *s3_client) +{ + DBUG_PUSH(""); /* Avoid tracing free calls */ + ms3_deinit(s3_client); + DBUG_POP(); +} + + +/****************************************************************************** + High level functions to copy tables to and from S3 +******************************************************************************/ + +/** + Create suffix for object name + @param to_end end of suffix (from previous call or 000000 at start) + + The suffix is a 6 length '0' prefixed number. If the number + gets longer than 6, then it's extended to 7 and more digits. +*/ + +static void fix_suffix(char *to_end, ulong nr) +{ + char buff[11]; + uint length= (uint) (int10_to_str(nr, buff, 10) - buff); + set_if_smaller(length, 6); + strmov(to_end - length, buff); +} + +/** + Copy file to 'aws_path' in blocks of block_size + + @return 0 ok + @return 1 error. Error message is printed to stderr + + Notes: + file is always closed before return +*/ + +static my_bool copy_from_file(ms3_st *s3_client, const char *aws_bucket, + char *aws_path, + File file, my_off_t start, my_off_t file_end, + uchar *block, size_t block_size, + my_bool compression, my_bool display) +{ + my_off_t pos; + char *path_end= strend(aws_path); + ulong bnr; + my_bool print_done= 0; + size_t length; + + for (pos= start, bnr=1 ; pos < file_end ; pos+= length, bnr++) + { + if ((length= my_pread(file, block, block_size, pos, MYF(MY_WME))) == + MY_FILE_ERROR) + goto err; + if (length == 0) + { + my_error(EE_EOFERR, MYF(0), my_filename(file), my_errno); + goto err; + } + + fix_suffix(path_end, bnr); + if (s3_put_object(s3_client, aws_bucket, aws_path, block, length, + compression)) + goto err; + + /* Write up to DISPLAY_WITH number of '.' during copy */ + if (display && + ((pos + block_size) * DISPLAY_WITH / file_end) > + (pos * DISPLAY_WITH/file_end)) + { + fputc('.', stdout); fflush(stdout); + print_done= 1; + } + } + if (print_done) + { + fputc('\n', stdout); fflush(stdout); + } + my_close(file, MYF(MY_WME)); + return 0; + +err: + my_close(file, MYF(MY_WME)); + if (print_done) + { + fputc('\n', stdout); fflush(stdout); + } + return 1; +} + + +/** + Copy an Aria table to S3 + @param s3_client connection to S3 + @param aws_bucket Aws bucket + @param path Path for Aria table (can be temp table) + @param database database name + @param table_name table name + @param block_size Block size in s3. If 0 then use block size + and compression as specified in the .MAI file as + specified as part of open. + @param compression Compression algorithm (0 = none, 1 = zip) + If block size is 0 then use .MAI file. + @return 0 ok + @return 1 error + + The table will be copied in S3 into the following locations: + + frm file (for discovery): + aws_bucket/database/table/frm + + First index block (contains description if the Aria file): + aws_bucket/database/table/aria + + Rest of the index file: + aws_bucket/database/table/index/block_number + + Data file: + aws_bucket/database/table/data/block_number + + block_number is 6 digits decimal number, prefixed with 0 + (Can be larger than 6 numbers, the prefix is just for nice output) + + frm and base blocks are small (just the needed data). + index and blocks are of size 's3_block_size' + + If compression is used, then original block size is s3_block_size + but the stored block will be the size of the compressed block. +*/ + +int aria_copy_to_s3(ms3_st *s3_client, const char *aws_bucket, + const char *path, + const char *database, const char *table_name, + ulong block_size, my_bool compression, + my_bool force, my_bool display, my_bool copy_frm) +{ + ARIA_TABLE_CAPABILITIES cap; + char aws_path[FN_REFLEN+100]; + char filename[FN_REFLEN]; + char *aws_path_end, *end; + uchar *alloc_block= 0, *block; + ms3_status_st status; + File file= -1; + my_off_t file_size; + size_t frm_length; + int error; + my_bool frm_created= 0; + DBUG_ENTER("aria_copy_to_s3"); + DBUG_PRINT("enter",("from: %s database: %s table: %s", + path, database, table_name)); + + aws_path_end= strxmov(aws_path, database, "/", table_name, NullS); + strmov(aws_path_end, "/aria"); + + if (!ms3_status(s3_client, aws_bucket, aws_path, &status)) + { + if (!force) + { + my_printf_error(EE_CANTCREATEFILE, "File %s exists in s3", MYF(0), + aws_path); + DBUG_RETURN(EE_CANTCREATEFILE); + } + if ((error= aria_delete_from_s3(s3_client, aws_bucket, database, + table_name, display))) + DBUG_RETURN(error); + } + + if (copy_frm) + { + /* + Copy frm file if it exists + We do this first to ensure that .frm always exists. This is needed to + ensure that discovery of the table will work. + */ + fn_format(filename, path, "", ".frm", MY_REPLACE_EXT); + if (!s3_read_file_from_disk(filename, &alloc_block, &frm_length,0)) + { + if (display) + printf("Copying frm file %s\n", filename); + + strmov(aws_path_end,"/frm"); + convert_frm_to_s3_format(alloc_block); + + /* Note that frm is not compressed! */ + if (s3_put_object(s3_client, aws_bucket, aws_path, alloc_block, frm_length, + 0)) + goto err; + + frm_created= 1; + my_free(alloc_block); + alloc_block= 0; + } + } + + if (display) + printf("Copying aria table: %s.%s to s3\n", database, table_name); + + /* Index file name */ + fn_format(filename, path, "", ".MAI", MY_REPLACE_EXT); + if ((file= my_open(filename, + O_RDONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(MY_WME))) < 0) + DBUG_RETURN(1); + if ((error= aria_get_capabilities(file, &cap))) + { + fprintf(stderr, "Got error %d when reading Aria header from %s\n", + error, path); + goto err; + } + if (cap.transactional || cap.data_file_type != BLOCK_RECORD || + cap.encrypted) + { + fprintf(stderr, + "Aria table %s doesn't match criteria to be copied to S3.\n" + "It should be non-transactional and should have row_format page\n", + path); + goto err; + } + /* + If block size is not specified, use the values specified as part of + create + */ + if (block_size == 0) + { + block_size= cap.s3_block_size; + compression= cap.compression; + } + + /* Align S3_BLOCK size with table block size */ + block_size= (block_size/cap.block_size)*cap.block_size; + + /* Allocate block for data + flag for compress header */ + if (!(alloc_block= (uchar*) my_malloc(PSI_NOT_INSTRUMENTED, + block_size+ALIGN_SIZE(1), + MYF(MY_WME)))) + goto err; + /* Read/write data here, but with prefix space for compression flag */ + block= alloc_block+ ALIGN_SIZE(1); + + if (my_pread(file, block, cap.header_size, 0, MYF(MY_WME | MY_FNABP))) + goto err; + + strmov(aws_path_end, "/aria"); + + if (display) + printf("Creating aria table information %s\n", aws_path); + + convert_index_to_s3_format(block, block_size, compression); + + /* + The first page is not compressed as we need it to know if the rest is + compressed + */ + if (s3_put_object(s3_client, aws_bucket, aws_path, block, cap.header_size, + 0 /* no compression */ )) + goto err; + + file_size= my_seek(file, 0L, MY_SEEK_END, MYF(0)); + + end= strmov(aws_path_end,"/index"); + + if (display) + printf("Copying index information %s\n", aws_path); + + /* The 000000 will be update with block number by fix_suffix() */ + end= strmov(end, "/000000"); + + error= copy_from_file(s3_client, aws_bucket, aws_path, file, cap.header_size, + file_size, block, block_size, compression, display); + file= -1; + if (error) + goto err; + + /* Copy data file */ + fn_format(filename, path, "", ".MAD", MY_REPLACE_EXT); + if ((file= my_open(filename, + O_RDONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(MY_WME))) < 0) + DBUG_RETURN(1); + + file_size= my_seek(file, 0L, MY_SEEK_END, MYF(0)); + + end= strmov(aws_path_end, "/data"); + + if (display) + printf("Copying data information %s\n", aws_path); + + /* The 000000 will be update with block number by fix_suffix() */ + end= strmov(end, "/000000"); + + error= copy_from_file(s3_client, aws_bucket, aws_path, file, 0, file_size, + block, block_size, compression, display); + file= -1; + if (error) + goto err; + + my_free(alloc_block); + DBUG_RETURN(0); + +err: + if (frm_created) + { + end= strmov(aws_path_end,"/frm"); + (void) s3_delete_object(s3_client, aws_bucket, aws_path, MYF(ME_NOTE)); + } + if (file >= 0) + my_close(file, MYF(0)); + my_free(alloc_block); + DBUG_RETURN(1); +} + + +/** + Copy file to 'aws_path' in blocks of block_size + + @return 0 ok + @return 1 error. Error message is printed to stderr + + Notes: + file is always closed before return +*/ + +static my_bool copy_to_file(ms3_st *s3_client, const char *aws_bucket, + char *aws_path, File file, my_off_t start, + my_off_t file_end, my_bool compression, + my_bool display) +{ + my_off_t pos; + char *path_end= strend(aws_path); + size_t error; + ulong bnr; + my_bool print_done= 0; + S3_BLOCK block; + DBUG_ENTER("copy_to_file"); + DBUG_PRINT("enter", ("path: %s start: %llu end: %llu", + aws_path, (ulonglong) start, (ulonglong) file_end)); + + for (pos= start, bnr=1 ; pos < file_end ; pos+= block.length, bnr++) + { + fix_suffix(path_end, bnr); + if (s3_get_object(s3_client, aws_bucket, aws_path, &block, compression, 1)) + goto err; + + error= my_write(file, block.str, block.length, MYF(MY_WME | MY_FNABP)); + s3_free(&block); + if (error == MY_FILE_ERROR) + goto err; + + /* Write up to DISPLAY_WITH number of '.' during copy */ + if (display && + ((pos + block.length) * DISPLAY_WITH /file_end) > + (pos * DISPLAY_WITH/file_end)) + { + fputc('.', stdout); fflush(stdout); + print_done= 1; + } + } + if (print_done) + { + fputc('\n', stdout); fflush(stdout); + } + my_close(file, MYF(MY_WME)); + DBUG_RETURN(0); + +err: + my_close(file, MYF(MY_WME)); + if (print_done) + { + fputc('\n', stdout); fflush(stdout); + } + DBUG_RETURN(1); +} + + +/** + Copy a table from S3 to current directory +*/ + +int aria_copy_from_s3(ms3_st *s3_client, const char *aws_bucket, + const char *path, const char *database, + my_bool compression, my_bool force, my_bool display) + +{ + MARIA_STATE_INFO state; + MY_STAT stat_info; + char table_name[FN_REFLEN], aws_path[FN_REFLEN+100]; + char filename[FN_REFLEN]; + char *aws_path_end, *end; + File file= -1; + S3_BLOCK block; + my_off_t index_file_size, data_file_size; + uint offset; + int error; + DBUG_ENTER("aria_copy_from_s3"); + + /* Check if index file exists */ + fn_format(filename, path, "", ".MAI", MY_REPLACE_EXT); + if (!force && my_stat(filename, &stat_info, MYF(0))) + { + my_printf_error(EE_CANTCREATEFILE, "Table %s already exists on disk", + MYF(0), filename); + DBUG_RETURN(EE_CANTCREATEFILE); + } + + fn_format(table_name, path, "", "", MY_REPLACE_DIR | MY_REPLACE_EXT); + block.str= 0; + + aws_path_end= strxmov(aws_path, database, "/", table_name, NullS); + strmov(aws_path_end, "/aria"); + + if (s3_get_object(s3_client, aws_bucket, aws_path, &block, 0, 0)) + { + my_printf_error(EE_FILENOTFOUND, "File %s/%s doesn't exist in s3", MYF(0), + database,filename); + goto err; + } + if (block.length < MARIA_STATE_INFO_SIZE) + { + fprintf(stderr, "Wrong block length for first block: %lu\n", + (ulong) block.length); + goto err_with_free; + } + + if (display) + printf("Copying aria table: %s.%s from s3\n", database, table_name); + + /* For offset positions, check _ma_state_info_readlength() */ + offset= sizeof(state.header) + 4+ LSN_STORE_SIZE*3 + 8*5; + index_file_size= mi_sizekorr(block.str + offset); + data_file_size= mi_sizekorr(block.str + offset+8); + + if ((file= my_create(filename, 0, + O_WRONLY | O_TRUNC | O_NOFOLLOW, MYF(MY_WME))) < 0) + goto err_with_free; + + convert_index_to_disk_format(block.str); + + if (my_write(file, block.str, block.length, MYF(MY_WME | MY_FNABP))) + goto err_with_free; + + if (display) + printf("Copying index information %s\n", aws_path); + + end= strmov(aws_path_end,"/index/000000"); + + error= copy_to_file(s3_client, aws_bucket, aws_path, file, block.length, + index_file_size, compression, display); + file= -1; + if (error) + goto err_with_free; + + /* Copy data file */ + fn_format(filename, path, "", ".MAD", MY_REPLACE_EXT); + if ((file= my_create(filename, 0, + O_WRONLY | O_TRUNC | O_NOFOLLOW, MYF(MY_WME))) < 0) + DBUG_RETURN(1); + + end= strmov(aws_path_end, "/data"); + + if (display) + printf("Copying data information %s\n", aws_path); + + /* The 000000 will be update with block number by fix_suffix() */ + strmov(end, "/000000"); + + error= copy_to_file(s3_client, aws_bucket, aws_path, file, 0, data_file_size, + compression, display); + file= -1; + s3_free(&block); + block.str= 0; + if (error) + goto err; + + /* Copy frm file if it exists */ + strmov(aws_path_end, "/frm"); + if (!s3_get_object(s3_client, aws_bucket, aws_path, &block, 0, 0)) + { + fn_format(filename, path, "", ".frm", MY_REPLACE_EXT); + if ((file= my_create(filename, 0, + O_WRONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(0))) >= 0) + { + if (display) + printf("Copying frm file %s\n", filename); + + convert_frm_to_disk_format(block.str); + + if (my_write(file, block.str, block.length, MYF(MY_WME | MY_FNABP))) + goto err_with_free; + } + s3_free(&block); + my_close(file, MYF(MY_WME)); + file= -1; + } + + DBUG_RETURN(0); + +err_with_free: + s3_free(&block); +err: + if (file >= 0) + my_close(file, MYF(0)); + DBUG_RETURN(1); +} + + +/** + Drop all files related to a table from S3 +*/ + +int aria_delete_from_s3(ms3_st *s3_client, const char *aws_bucket, + const char *database, const char *table, + my_bool display) +{ + ms3_status_st status; + char aws_path[FN_REFLEN+100]; + char *aws_path_end; + int error; + DBUG_ENTER("aria_delete_from_s3"); + + aws_path_end= strxmov(aws_path, database, "/", table, NullS); + strmov(aws_path_end, "/aria"); + + /* Check if either /aria or /frm exists */ + + if (ms3_status(s3_client, aws_bucket, aws_path, &status)) + { + strmov(aws_path_end, "/frm"); + if (ms3_status(s3_client, aws_bucket, aws_path, &status)) + { + my_printf_error(HA_ERR_NO_SUCH_TABLE, + "Table %s.%s doesn't exist in s3", MYF(0), + database, table); + my_errno= HA_ERR_NO_SUCH_TABLE; + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + } + } + + if (display) + printf("Delete of aria table: %s.%s\n", database, table); + + strmov(aws_path_end,"/index"); + + if (display) + printf("Delete of index information %s\n", aws_path); + + error= s3_delete_directory(s3_client, aws_bucket, aws_path); + + strmov(aws_path_end,"/data"); + if (display) + printf("Delete of data information %s\n", aws_path); + + error|= s3_delete_directory(s3_client, aws_bucket, aws_path); + + if (display) + printf("Delete of base information and frm\n"); + + strmov(aws_path_end,"/aria"); + if (s3_delete_object(s3_client, aws_bucket, aws_path, MYF(MY_WME))) + error= 1; + + /* + Delete .frm last as this is used by discovery to check if a s3 table + exists + */ + strmov(aws_path_end,"/frm"); + /* Ignore error if .frm file doesn't exist */ + s3_delete_object(s3_client, aws_bucket, aws_path, MYF(ME_NOTE)); + + DBUG_RETURN(error); +} + + +/** + Rename a table in s3 +*/ + +int aria_rename_s3(ms3_st *s3_client, const char *aws_bucket, + const char *from_database, const char *from_table, + const char *to_database, const char *to_table, + my_bool rename_frm) +{ + ms3_status_st status; + char to_aws_path[FN_REFLEN+100], from_aws_path[FN_REFLEN+100]; + char *to_aws_path_end, *from_aws_path_end; + int error; + DBUG_ENTER("aria_rename_s3"); + + from_aws_path_end= strxmov(from_aws_path, from_database, "/", from_table, + NullS); + to_aws_path_end= strxmov(to_aws_path, to_database, "/", to_table, NullS); + strmov(from_aws_path_end, "/aria"); + + if (ms3_status(s3_client, aws_bucket, from_aws_path, &status)) + { + my_printf_error(HA_ERR_NO_SUCH_TABLE, + "Table %s.%s doesn't exist in s3", MYF(0), from_database, + from_table); + my_errno= HA_ERR_NO_SUCH_TABLE; + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + } + + strmov(from_aws_path_end,"/index"); + strmov(to_aws_path_end,"/index"); + + error= s3_rename_directory(s3_client, aws_bucket, from_aws_path, to_aws_path, + MYF(MY_WME)); + + strmov(from_aws_path_end,"/data"); + strmov(to_aws_path_end,"/data"); + + error|= s3_rename_directory(s3_client, aws_bucket, from_aws_path, + to_aws_path, MYF(MY_WME)); + + if (rename_frm) { + strmov(from_aws_path_end, "/frm"); + strmov(to_aws_path_end, "/frm"); + + s3_rename_object(s3_client, aws_bucket, from_aws_path, to_aws_path, + MYF(MY_WME)); + } + + strmov(from_aws_path_end,"/aria"); + strmov(to_aws_path_end,"/aria"); + if (s3_rename_object(s3_client, aws_bucket, from_aws_path, to_aws_path, + MYF(MY_WME))) + error= 1; + DBUG_RETURN(error); +} + +/** + Copy all partition files related to a table from S3 (.frm and .par) + + @param s3_client s3 client connection + @param aws_bucket bucket to use + @param path The path to the partitioned table files (no extension) + @param old_path In some cases the partioned files are not yet renamed. + This points to the temporary files that will later + be renamed to the partioned table + @param database Database for the partitioned table + @param database table name for the partitioned table +*/ + +int partition_copy_to_s3(ms3_st *s3_client, const char *aws_bucket, + const char *path, const char *old_path, + const char *database, const char *table_name) +{ + char aws_path[FN_REFLEN+100]; + char filename[FN_REFLEN]; + char *aws_path_end; + uchar *alloc_block= 0; + ms3_status_st status; + size_t frm_length; + int error; + DBUG_ENTER("partition_copy_to_s3"); + DBUG_PRINT("enter",("from: %s database: %s table: %s", + path, database, table_name)); + + if (!old_path) + old_path= path; + + aws_path_end= strxmov(aws_path, database, "/", table_name, "/", NullS); + strmov(aws_path_end, "frm"); + fn_format(filename, old_path, "", ".frm", MY_REPLACE_EXT); + + /* Just to be safe, delete any conflicting object */ + if (!ms3_status(s3_client, aws_bucket, aws_path, &status)) + { + if ((error= s3_delete_object(s3_client, aws_bucket, aws_path, + MYF(ME_FATAL)))) + DBUG_RETURN(error); + } + if ((error= s3_read_file_from_disk(filename, &alloc_block, &frm_length, 0))) + { + /* + In case of ADD PARTITION PARTITON the .frm file is already renamed. + Copy the renamed file if it exists. + */ + fn_format(filename, path, "", ".frm", MY_REPLACE_EXT); + if ((error= s3_read_file_from_disk(filename, &alloc_block, &frm_length, + 1))) + goto err; + } + if ((error= s3_put_object(s3_client, aws_bucket, aws_path, alloc_block, + frm_length, 0))) + goto err; + + /* + Note that because ha_partiton::rename_table() is called before + this function, the .par table already has it's final name! + */ + fn_format(filename, path, "", ".par", MY_REPLACE_EXT); + strmov(aws_path_end, "par"); + if (!ms3_status(s3_client, aws_bucket, aws_path, &status)) + { + if ((error= s3_delete_object(s3_client, aws_bucket, aws_path, + MYF(ME_FATAL)))) + goto err; + } + + my_free(alloc_block); + alloc_block= 0; + if ((error=s3_read_file_from_disk(filename, &alloc_block, &frm_length, 1))) + goto err; + if ((error= s3_put_object(s3_client, aws_bucket, aws_path, alloc_block, + frm_length, 0))) + { + /* Delete the .frm file created above */ + strmov(aws_path_end, "frm"); + (void) s3_delete_object(s3_client, aws_bucket, aws_path, + MYF(ME_FATAL)); + goto err; + } + error= 0; + +err: + my_free(alloc_block); + DBUG_RETURN(error); +} + + +/** + Drop all partition files related to a table from S3 +*/ + +int partition_delete_from_s3(ms3_st *s3_client, const char *aws_bucket, + const char *database, const char *table, + myf error_flags) +{ + char aws_path[FN_REFLEN+100]; + char *aws_path_end; + int error=0, res; + DBUG_ENTER("partition_delete_from_s3"); + + aws_path_end= strxmov(aws_path, database, "/", table, NullS); + strmov(aws_path_end, "/par"); + + if ((res= s3_delete_object(s3_client, aws_bucket, aws_path, error_flags))) + error= res; + /* + Delete .frm last as this is used by discovery to check if a s3 table + exists + */ + strmov(aws_path_end, "/frm"); + if ((res= s3_delete_object(s3_client, aws_bucket, aws_path, error_flags))) + error= res; + + DBUG_RETURN(error); +} + +/****************************************************************************** + Low level functions interfacing with libmarias3 +******************************************************************************/ + +/** + Create an object for index or data information + + Note that if compression is used, the data may be overwritten and + there must be COMPRESS_HEADER length of free space before the data! + +*/ + +int s3_put_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, uchar *data, size_t length, + my_bool compression) +{ + uint8_t error; + const char *errmsg; + DBUG_ENTER("s3_put_object"); + DBUG_PRINT("enter", ("name: %s", name)); + + if (compression) + { + size_t comp_len; + + data[-COMPRESS_HEADER]= 0; // No compression + if (!my_compress(data, &length, &comp_len)) + data[-COMPRESS_HEADER]= 1; // Compressed package + data-= COMPRESS_HEADER; + length+= COMPRESS_HEADER; + int3store(data+1, comp_len); // Original length or 0 + } + + if (likely(!(error= ms3_put(s3_client, aws_bucket, name, data, length)))) + DBUG_RETURN(0); + + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_WRITE, "Got error from put_object(%s): %d %s", MYF(0), + name, error, errmsg); + DBUG_RETURN(EE_WRITE); +} + + +/** + Read an object for index or data information + + @param print_error 0 Don't print error + @param print_error 1 Print error that object doesn't exists + @param print_error 2 Print error that table doesn't exists +*/ + +int s3_get_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, S3_BLOCK *block, + my_bool compression, int print_error) +{ + uint8_t error; + int result= 0; + uchar *data; + DBUG_ENTER("s3_get_object"); + DBUG_PRINT("enter", ("name: %s compression: %d", name, compression)); + + block->str= block->alloc_ptr= 0; + if (likely(!(error= ms3_get(s3_client, aws_bucket, name, + (uint8_t**) &block->alloc_ptr, + &block->length)))) + { + block->str= block->alloc_ptr; + if (compression) + { + ulong length; + + /* If not compressed */ + if (!block->str[0]) + { + block->length-= COMPRESS_HEADER; + block->str+= COMPRESS_HEADER; + + /* Simple check to ensure that it's a correct block */ + if (block->length % 1024) + { + s3_free(block); + my_printf_error(HA_ERR_NOT_A_TABLE, + "Block '%s' is not compressed", MYF(0), name); + DBUG_RETURN(HA_ERR_NOT_A_TABLE); + } + DBUG_RETURN(0); + } + + if (((uchar*)block->str)[0] > 1) + { + s3_free(block); + my_printf_error(HA_ERR_NOT_A_TABLE, + "Block '%s' is not compressed", MYF(0), name); + DBUG_RETURN(HA_ERR_NOT_A_TABLE); + } + + length= uint3korr(block->str+1); + + if (!(data= (uchar*) my_malloc(PSI_NOT_INSTRUMENTED, + length, MYF(MY_WME | MY_THREAD_SPECIFIC)))) + { + s3_free(block); + DBUG_RETURN(EE_OUTOFMEMORY); + } + if (uncompress(data, &length, block->str + COMPRESS_HEADER, + block->length - COMPRESS_HEADER)) + { + my_printf_error(ER_NET_UNCOMPRESS_ERROR, + "Got error uncompressing s3 packet", MYF(0)); + s3_free(block); + my_free(data); + DBUG_RETURN(ER_NET_UNCOMPRESS_ERROR); + } + s3_free(block); + block->str= block->alloc_ptr= data; + block->length= length; + } + DBUG_RETURN(0); + } + + if (error == 9) + { + result= my_errno= (print_error == 1 ? EE_FILENOTFOUND : + HA_ERR_NO_SUCH_TABLE); + if (print_error) + my_printf_error(my_errno, "Expected object '%s' didn't exist", + MYF(0), name); + } + else + { + result= my_errno= EE_READ; + if (print_error) + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_READ, "Got error from get_object(%s): %d %s", MYF(0), + name, error, errmsg); + } + } + s3_free(block); + DBUG_RETURN(result); +} + + +int s3_delete_object(ms3_st *s3_client, const char *aws_bucket, + const char *name, myf error_flags) +{ + uint8_t error; + int result= 0; + DBUG_ENTER("s3_delete_object"); + DBUG_PRINT("enter", ("name: %s", name)); + + if (likely(!(error= ms3_delete(s3_client, aws_bucket, name)))) + DBUG_RETURN(0); + + if (error_flags) + { + error_flags&= ~MY_WME; + if (error == 9) + my_printf_error(result= EE_FILENOTFOUND, + "Expected object '%s' didn't exist", + error_flags, name); + else + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(result= EE_READ, + "Got error from delete_object(%s): %d %s", + error_flags, name, error, errmsg); + } + } + DBUG_RETURN(result); +} + + +/* + Drop all files in a 'directory' in s3 +*/ + +int s3_delete_directory(ms3_st *s3_client, const char *aws_bucket, + const char *path) +{ + ms3_list_st *list, *org_list= 0; + my_bool error; + DBUG_ENTER("delete_directory"); + DBUG_PRINT("enter", ("path: %s", path)); + + if ((error= ms3_list(s3_client, aws_bucket, path, &org_list))) + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_FILENOTFOUND, + "Can't get list of files from %s. Error: %d %s", MYF(0), + path, error, errmsg); + DBUG_RETURN(EE_FILENOTFOUND); + } + + for (list= org_list ; list ; list= list->next) + if (s3_delete_object(s3_client, aws_bucket, list->key, MYF(MY_WME))) + error= 1; + if (org_list) + ms3_list_free(org_list); + DBUG_RETURN(error); +} + + +my_bool s3_rename_object(ms3_st *s3_client, const char *aws_bucket, + const char *from_name, const char *to_name, + myf error_flags) +{ + uint8_t error; + DBUG_ENTER("s3_rename_object"); + DBUG_PRINT("enter", ("from: %s to: %s", from_name, to_name)); + + if (likely(!(error= ms3_move(s3_client, + aws_bucket, from_name, + aws_bucket, to_name)))) + DBUG_RETURN(FALSE); + + if (error_flags) + { + error_flags&= ~MY_WME; + if (error == 9) + { + my_printf_error(EE_FILENOTFOUND, "Expected object '%s' didn't exist", + error_flags, from_name); + } + else + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_READ, "Got error from move_object(%s -> %s): %d %", + error_flags, + from_name, to_name, error, errmsg); + } + } + DBUG_RETURN(TRUE); +} + + +int s3_rename_directory(ms3_st *s3_client, const char *aws_bucket, + const char *from_name, const char *to_name, + myf error_flags) +{ + ms3_list_st *list, *org_list= 0; + my_bool error= 0; + char name[AWS_PATH_LENGTH], *end; + DBUG_ENTER("s3_delete_directory"); + + if ((error= ms3_list(s3_client, aws_bucket, from_name, &org_list))) + { + const char *errmsg; + if (!(errmsg= ms3_server_error(s3_client))) + errmsg= ms3_error(error); + + my_printf_error(EE_FILENOTFOUND, + "Can't get list of files from %s. Error: %d %s", + MYF(error_flags & ~MY_WME), + from_name, error, errmsg); + DBUG_RETURN(EE_FILENOTFOUND); + } + + end= strmov(name, to_name); + for (list= org_list ; list ; list= list->next) + { + const char *sep= strrchr(list->key, '/'); + if (sep) /* Safety */ + { + strmake(end, sep, (sizeof(name) - (end-name) - 1)); + if (s3_rename_object(s3_client, aws_bucket, list->key, name, + error_flags)) + error= 1; + } + } + if (org_list) + ms3_list_free(org_list); + DBUG_RETURN(error); +} + + +/****************************************************************************** + Converting index and frm files to from S3 storage engine +******************************************************************************/ + +/** + Change index information to be of type s3 + + @param header Copy of header in index file + @param block_size S3 block size + @param compression Compression algorithm to use + + The position are from _ma_base_info_write() +*/ + +static void convert_index_to_s3_format(uchar *header, ulong block_size, + int compression) +{ + MARIA_STATE_INFO state; + uchar *base_pos; + uint base_offset; + + memcpy(&state.header, header, sizeof(state.header)); + base_offset= mi_uint2korr(state.header.base_pos); + base_pos= header + base_offset; + + base_pos[107]= (uchar) compression; + mi_int3store(base_pos+119, block_size); +} + + +/** + Change index information to be a normal disk based table +*/ + +static void convert_index_to_disk_format(uchar *header) +{ + MARIA_STATE_INFO state; + uchar *base_pos; + uint base_offset; + + memcpy(&state.header, header, sizeof(state.header)); + base_offset= mi_uint2korr(state.header.base_pos); + base_pos= header + base_offset; + + base_pos[107]= 0; + mi_int3store(base_pos+119, 0); +} + +/** + Change storage engine in the .frm file from Aria to s3 + + For information about engine types, see legacy_db_type +*/ + +static void convert_frm_to_s3_format(uchar *header) +{ + DBUG_ASSERT(header[3] == 42 || header[3] == 41); /* Aria or S3 */ + header[3]= 41; /* S3 */ +} + +/** + Change storage engine in the .frm file from S3 to Aria + + For information about engine types, see legacy_db_type +*/ + +static void convert_frm_to_disk_format(uchar *header) +{ + DBUG_ASSERT(header[3] == 41); /* S3 */ + header[3]= 42; /* Aria */ +} + + +/****************************************************************************** + Helper functions +******************************************************************************/ + +/** + Set database and table name from path + + s3->database and s3->table_name will be pointed into path + Note that s3->database will not be null terminated! +*/ + +my_bool set_database_and_table_from_path(S3_INFO *s3, const char *path) +{ + size_t org_length= dirname_length(path); + size_t length= 0; + + if (!org_length) + return 1; + + s3->table.str= path+org_length; + s3->table.length= strlen(s3->table.str); + for (length= --org_length; length > 0 ; length --) + { + if (path[length-1] == FN_LIBCHAR || path[length-1] == '/') + break; +#ifdef FN_DEVCHAR + if (path[length-1] == FN_DEVCHAR) + break; +#endif + } + if (length && + (path[length] != FN_CURLIB || org_length - length != 1)) + { + s3->database.str= path + length; + s3->database.length= org_length - length; + return 0; + } + return 1; /* Can't find database */ +} + + +/** + Read frm from the disk +*/ + +static int s3_read_file_from_disk(const char *filename, uchar **to, + size_t *to_size, my_bool print_error) +{ + File file; + uchar *alloc_block; + size_t file_size; + int error; + + *to= 0; + if ((file= my_open(filename, + O_RDONLY | O_SHARE | O_NOFOLLOW | O_CLOEXEC, + MYF(print_error ? MY_WME: 0))) < 0) + return(my_errno); + + file_size= (size_t) my_seek(file, 0L, MY_SEEK_END, MYF(0)); + if (!(alloc_block= my_malloc(PSI_NOT_INSTRUMENTED, file_size, MYF(MY_WME)))) + goto err; + + if (my_pread(file, alloc_block, file_size, 0, MYF(MY_WME | MY_FNABP))) + goto err; + + *to= alloc_block; + *to_size= file_size; + my_close(file, MYF(0)); + return 0; + +err: + error= my_errno; + my_free(alloc_block); + my_close(file, MYF(0)); + return error; +} + + +/** + Get .frm or par from S3 + + @return 0 ok + @return 1 error +*/ + +my_bool s3_get_def(ms3_st *s3_client, S3_INFO *s3_info, S3_BLOCK *block, + const char *ext) +{ + char aws_path[AWS_PATH_LENGTH]; + + strxnmov(aws_path, sizeof(aws_path)-1, s3_info->database.str, "/", + s3_info->table.str, "/", ext, NullS); + + return s3_get_object(s3_client, s3_info->bucket.str, aws_path, block, + 0, 0); +} + +/** + Check if .frm exits in S3 + + @return 0 frm exists + @return 1 error +*/ + +my_bool s3_frm_exists(ms3_st *s3_client, S3_INFO *s3_info) +{ + char aws_path[AWS_PATH_LENGTH]; + ms3_status_st status; + + strxnmov(aws_path, sizeof(aws_path)-1, s3_info->database.str, "/", + s3_info->table.str, "/frm", NullS); + + return ms3_status(s3_client, s3_info->bucket.str, aws_path, &status); +} + + +/** + Get version from frm file + + @param out Store the table_version_here. It's of size MY_UUID_SIZE + @param frm_image Frm image + @param frm_length size of image + + @return 0 Was able to read table version + @return 1 Wrong information in frm file +*/ + +#define FRM_HEADER_SIZE 64 +#define EXTRA2_TABLEDEF_VERSION 0 + +static inline my_bool is_binary_frm_header(const uchar *head) +{ + return head[0] == 254 + && head[1] == 1 + && head[2] >= FRM_VER + && head[2] <= FRM_VER_CURRENT; +} + +static my_bool get_tabledef_version_from_frm(char *out, const uchar *frm_image, + size_t frm_length) +{ + uint segment_len; + const uchar *extra, *extra_end; + if (!is_binary_frm_header(frm_image) || frm_length <= FRM_HEADER_SIZE) + return 1; + + /* Length of the MariaDB extra2 segment in the form file. */ + segment_len= uint2korr(frm_image + 4); + if (frm_length < FRM_HEADER_SIZE + segment_len) + return 1; + + extra= frm_image + FRM_HEADER_SIZE; + if (*extra == '/') // old frm had '/' there + return 1; + + extra_end= extra + segment_len; + while (extra + 4 < extra_end) + { + uchar type= *extra++; + size_t length= *extra++; + if (!length) + { + length= uint2korr(extra); + extra+= 2; + if (length < 256) + return 1; /* Something is wrong */ + } + if (extra + length > extra_end) + return 1; + if (type == EXTRA2_TABLEDEF_VERSION) + { + if (length != MY_UUID_SIZE) + return 1; + memcpy(out, extra, length); + return 0; /* Found it */ + } + extra+= length; + } + return 1; +} + + +/** + Check if version in frm file matches what the server expects + + @return 0 table definitions matches + @return 1 table definitions doesn't match + @return 2 Can't find the frm version + @return 3 Can't read the frm version +*/ + +int s3_check_frm_version(ms3_st *s3_client, S3_INFO *s3_info) +{ + my_bool res= 0; + char aws_path[AWS_PATH_LENGTH]; + char uuid[MY_UUID_SIZE]; + S3_BLOCK block; + DBUG_ENTER("s3_check_frm_version"); + + strxnmov(aws_path, sizeof(aws_path)-1, s3_info->database.str, "/", + s3_info->base_table.str, "/frm", NullS); + + if (s3_get_object(s3_client, s3_info->bucket.str, aws_path, &block, 0, 0)) + { + DBUG_PRINT("exit", ("No object found")); + DBUG_RETURN(2); /* Ignore check, use old frm */ + } + + if (get_tabledef_version_from_frm(uuid, (uchar*) block.str, block.length) || + s3_info->tabledef_version.length != MY_UUID_SIZE) + { + s3_free(&block); + DBUG_PRINT("error", ("Wrong definition")); + DBUG_RETURN(3); /* Wrong definition */ + } + /* res is set to 1 if versions numbers doesn't match */ + res= bcmp(s3_info->tabledef_version.str, uuid, MY_UUID_SIZE) != 0; + s3_free(&block); + if (res) + DBUG_PRINT("error", ("Wrong table version")); + else + DBUG_PRINT("exit", ("Version strings matches")); + DBUG_RETURN(res); +} + + +/****************************************************************************** + Reading blocks from index or data from S3 +******************************************************************************/ + +/* + Read the index header (first page) from the index file + + In case of error, my_error() is called +*/ + +my_bool read_index_header(ms3_st *client, S3_INFO *s3, S3_BLOCK *block) +{ + char aws_path[AWS_PATH_LENGTH]; + DBUG_ENTER("read_index_header"); + strxnmov(aws_path, sizeof(aws_path)-1, s3->database.str, "/", s3->table.str, + "/aria", NullS); + DBUG_RETURN(s3_get_object(client, s3->bucket.str, aws_path, block, 0, 2)); +} + + +#ifdef FOR_FUTURE_IF_NEEDED_FOR_DEBUGGING_WITHOUT_S3 +/** + Read a big block from disk +*/ + +my_bool s3_block_read(struct st_pagecache *pagecache, + PAGECACHE_IO_HOOK_ARGS *args, + struct st_pagecache_file *file, + LEX_STRING *data) +{ + MARIA_SHARE *share= (MARIA_SHARE*) file->callback_data; + my_bool datafile= file != &share->kfile; + + DBUG_ASSERT(file->big_block_size > 0); + DBUG_ASSERT(((((my_off_t) args->pageno - file->head_blocks) << + pagecache->shift) % + file->big_block_size) == 0); + + if (!(data->str= (char *) my_malloc(file->big_block_size, MYF(MY_WME)))) + return TRUE; + + data->length= mysql_file_pread(file->file, + (unsigned char *)data->str, + file->big_block_size, + ((my_off_t) args->pageno << pagecache->shift), + MYF(MY_WME)); + if (data->length == 0 || data->length == MY_FILE_ERROR) + { + if (data->length == 0) + { + LEX_STRING *file_name= (datafile ? + &share->data_file_name : + &share->index_file_name); + my_error(EE_EOFERR, MYF(0), file_name->str, my_errno); + } + my_free(data->str); + data->length= 0; + data->str= 0; + return TRUE; + } + return FALSE; +} +#endif + + +/** + Read a block from S3 to page cache +*/ + +my_bool s3_block_read(struct st_pagecache *pagecache, + PAGECACHE_IO_HOOK_ARGS *args, + struct st_pagecache_file *file, + S3_BLOCK *block) +{ + char aws_path[AWS_PATH_LENGTH]; + MARIA_SHARE *share= (MARIA_SHARE*) file->callback_data; + my_bool datafile= file->file != share->kfile.file; + MARIA_HA *info= (MARIA_HA*) my_thread_var->keycache_file; + ms3_st *client= info->s3; + const char *path_suffix= datafile ? "/data/" : "/index/"; + char *end; + S3_INFO *s3= share->s3_path; + ulong block_number; + DBUG_ENTER("s3_block_read"); + + DBUG_ASSERT(file->big_block_size > 0); + DBUG_ASSERT(((((my_off_t) args->pageno - file->head_blocks) << + pagecache->shift) % + file->big_block_size) == 0); + + block_number= (((args->pageno - file->head_blocks) << pagecache->shift) / + file->big_block_size) + 1; + + end= strxnmov(aws_path, sizeof(aws_path)-12, s3->database.str, "/", + s3->table.str, path_suffix, "000000", NullS); + fix_suffix(end, block_number); + + DBUG_RETURN(s3_get_object(client, s3->bucket.str, aws_path, block, + share->base.compression_algorithm, 1)); +} + +/* + Start file numbers from 1000 to more easily find bugs when the file number + could be mistaken for a real file +*/ +static volatile int32 unique_file_number= 1000; + +int32 s3_unique_file_number() +{ + return my_atomic_add32_explicit(&unique_file_number, 1, + MY_MEMORY_ORDER_RELAXED); +} |