diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
commit | 3f619478f796eddbba6e39502fe941b285dd97b1 (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /storage/archive | |
parent | Initial commit. (diff) | |
download | mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.tar.xz mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/archive')
-rw-r--r-- | storage/archive/CMakeLists.txt | 18 | ||||
-rw-r--r-- | storage/archive/archive_reader.c | 423 | ||||
-rw-r--r-- | storage/archive/archive_test.c | 292 | ||||
-rw-r--r-- | storage/archive/azio.c | 937 | ||||
-rw-r--r-- | storage/archive/azlib.h | 345 | ||||
-rw-r--r-- | storage/archive/ha_archive.cc | 1965 | ||||
-rw-r--r-- | storage/archive/ha_archive.h | 163 |
7 files changed, 4143 insertions, 0 deletions
diff --git a/storage/archive/CMakeLists.txt b/storage/archive/CMakeLists.txt new file mode 100644 index 00000000..5b6818fc --- /dev/null +++ b/storage/archive/CMakeLists.txt @@ -0,0 +1,18 @@ +# Copyright (c) 2006, 2010, Oracle and/or its affiliates. All rights reserved. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 2 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA + +SET(ARCHIVE_SOURCES azio.c ha_archive.cc ha_archive.h) +MYSQL_ADD_PLUGIN(archive ${ARCHIVE_SOURCES} STORAGE_ENGINE LINK_LIBRARIES ${ZLIB_LIBRARY}) + diff --git a/storage/archive/archive_reader.c b/storage/archive/archive_reader.c new file mode 100644 index 00000000..0e02127e --- /dev/null +++ b/storage/archive/archive_reader.c @@ -0,0 +1,423 @@ +/* Copyright (c) 2007, 2010, Oracle and/or its affiliates + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */ + +#include "azlib.h" +#include <string.h> +#include <assert.h> +#include <stdio.h> +#include <stdarg.h> +#include <m_ctype.h> +#include <m_string.h> +#include <my_getopt.h> +#include <mysql_version.h> + +#define BUFFER_LEN 1024 +#define ARCHIVE_ROW_HEADER_SIZE 4 + +#define SHOW_VERSION "0.1" + +static void get_options(int *argc,char * * *argv); +static void print_version(void); +static void usage(void); +static const char *opt_tmpdir; +static const char *new_auto_increment; +unsigned long long new_auto_increment_value; +static const char *load_default_groups[]= { "archive_reader", 0 }; +static char **default_argv; +int opt_check, opt_force, opt_quiet, opt_backup= 0, opt_extract_frm; +int opt_autoincrement; + +int main(int argc, char *argv[]) +{ + unsigned int ret; + azio_stream reader_handle; + + MY_INIT(argv[0]); + get_options(&argc, &argv); + + if (argc < 1) + { + printf("No file specified. \n"); + return 0; + } + + if (!(ret= azopen(&reader_handle, argv[0], O_RDONLY|O_BINARY))) + { + printf("Could not open Archive file\n"); + return 0; + } + + if (opt_autoincrement) + { + azio_stream writer_handle; + + if (new_auto_increment_value) + { + if (reader_handle.auto_increment >= new_auto_increment_value) + { + printf("Value is lower then current value\n"); + goto end; + } + } + else + { + new_auto_increment_value= reader_handle.auto_increment + 1; + } + + if (!(ret= azopen(&writer_handle, argv[0], O_CREAT|O_RDWR|O_BINARY))) + { + printf("Could not open file for update: %s\n", argv[0]); + goto end; + } + + writer_handle.auto_increment= new_auto_increment_value; + + azclose(&writer_handle); + azflush(&reader_handle, Z_SYNC_FLUSH); + } + + printf("Version %u\n", reader_handle.version); + if (reader_handle.version > 2) + { + printf("\tMinor version %u\n", reader_handle.minor_version); + printf("\tStart position %llu\n", (unsigned long long)reader_handle.start); + printf("\tBlock size %u\n", reader_handle.block_size); + printf("\tRows %llu\n", reader_handle.rows); + printf("\tAutoincrement %llu\n", reader_handle.auto_increment); + printf("\tCheck Point %llu\n", reader_handle.check_point); + printf("\tForced Flushes %llu\n", reader_handle.forced_flushes); + printf("\tLongest Row %u\n", reader_handle.longest_row); + printf("\tShortest Row %u\n", reader_handle.shortest_row); + printf("\tState %s\n", ( reader_handle.dirty ? "dirty" : "clean")); + printf("\tFRM stored at %u\n", reader_handle.frm_start_pos); + printf("\tComment stored at %u\n", reader_handle.comment_start_pos); + printf("\tData starts at %u\n", (unsigned int)reader_handle.start); + if (reader_handle.frm_start_pos) + printf("\tFRM length %u\n", reader_handle.frm_length); + if (reader_handle.comment_start_pos) + { + char *comment = (char *) my_malloc(reader_handle.comment_length, + MYF(MY_WME)); + if (comment) + { + azread_comment(&reader_handle, comment); + printf("\tComment length %u\n\t\t%.*s\n", + reader_handle.comment_length, + reader_handle.comment_length, comment); + my_free(comment,MYF(0)); + } + } + } + else + { + goto end; + } + + printf("\n"); + + if (opt_check) + { + uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE]; + int error; + unsigned int x; + unsigned int read; + unsigned int row_len; + unsigned long long row_count= 0; + char buffer; + + while ((read= azread(&reader_handle, (uchar *)size_buffer, + ARCHIVE_ROW_HEADER_SIZE, &error))) + { + if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE)) + { + printf("Table is damaged\n"); + goto end; + } + + /* If we read nothing we are at the end of the file */ + if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE) + break; + + row_len= uint4korr(size_buffer); + row_count++; + + if (row_len > reader_handle.longest_row) + { + printf("Table is damaged, row %llu is invalid\n", + row_count); + goto end; + } + + + for (read= x= 0; x < row_len ; x++) + { + read+= (unsigned int)azread(&reader_handle, &buffer, sizeof(char), &error); + if (!read) + break; + } + + + if (row_len != read) + { + printf("Row length did not match row (at %llu). %u != %u \n", + row_count, row_len, read); + goto end; + } + } + + if (0) + { + printf("Table is damaged\n"); + goto end; + } + else + { + printf("Found %llu rows\n", row_count); + } + } + + if (opt_backup) + { + uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE]; + int error; + unsigned int read; + unsigned int row_len; + unsigned long long row_count= 0; + char *buffer; + + azio_stream writer_handle; + + buffer= (char *) my_malloc(reader_handle.longest_row, MYF(0)); + if (buffer == NULL) + { + printf("Could not allocate memory for row %llu\n", row_count); + goto end; + } + + + if (!(ret= azopen(&writer_handle, argv[1], O_CREAT|O_RDWR|O_BINARY))) + { + printf("Could not open file for backup: %s\n", argv[1]); + goto end; + } + + writer_handle.auto_increment= reader_handle.auto_increment; + if (reader_handle.frm_length) + { + char *ptr; + ptr= (char *)my_malloc(sizeof(char) * reader_handle.frm_length, MYF(0)); + azread_frm(&reader_handle, ptr); + azwrite_frm(&writer_handle, ptr, reader_handle.frm_length); + my_free(ptr); + } + + if (reader_handle.comment_length) + { + char *ptr; + ptr= (char *)my_malloc(sizeof(char) * reader_handle.comment_length, MYF(0)); + azread_comment(&reader_handle, ptr); + azwrite_comment(&writer_handle, ptr, reader_handle.comment_length); + my_free(ptr); + } + + while ((read= azread(&reader_handle, (uchar *)size_buffer, + ARCHIVE_ROW_HEADER_SIZE, &error))) + { + if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE)) + { + printf("Table is damaged\n"); + goto end; + } + + /* If we read nothing we are at the end of the file */ + if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE) + break; + + row_len= uint4korr(size_buffer); + + row_count++; + + memcpy(buffer, size_buffer, ARCHIVE_ROW_HEADER_SIZE); + + read= (unsigned int)azread(&reader_handle, buffer + ARCHIVE_ROW_HEADER_SIZE, + row_len, &error); + + DBUG_ASSERT(read == row_len); + + azwrite(&writer_handle, buffer, row_len + ARCHIVE_ROW_HEADER_SIZE); + + + if (row_len != read) + { + printf("Row length did not match row (at %llu). %u != %u \n", + row_count, row_len, read); + goto end; + } + + if (reader_handle.rows == writer_handle.rows) + break; + } + + my_free(buffer, MYF(0)); + + azclose(&writer_handle); + } + + if (opt_extract_frm) + { + File frm_file; + char *ptr; + frm_file= my_open(argv[1], O_CREAT|O_RDWR|O_BINARY, MYF(0)); + ptr= (char *)my_malloc(sizeof(char) * reader_handle.frm_length, MYF(0)); + azread_frm(&reader_handle, ptr); + my_write(frm_file, (uchar*) ptr, reader_handle.frm_length, MYF(0)); + my_close(frm_file, MYF(0)); + my_free(ptr); + } + +end: + printf("\n"); + azclose(&reader_handle); + + return 0; +} + +static my_bool +get_one_option(int optid, + const struct my_option *opt __attribute__((unused)), + char *argument) +{ + switch (optid) { + case 'b': + opt_backup= 1; + break; + case 'c': + opt_check= 1; + break; + case 'e': + opt_extract_frm= 1; + break; + case 'f': + opt_force= 1; + printf("Not implemented yet\n"); + break; + case 'q': + opt_quiet= 1; + printf("Not implemented yet\n"); + break; + case 'V': + print_version(); + exit(0); + case 't': + printf("Not implemented yet\n"); + break; + case 'A': + opt_autoincrement= 1; + if (argument) + new_auto_increment_value= strtoull(argument, NULL, 0); + else + new_auto_increment_value= 0; + break; + case '?': + usage(); + exit(0); + case '#': + if (argument == disabled_my_option) + { + DBUG_POP(); + } + else + { + DBUG_PUSH(argument ? argument : "d:t:o,/tmp/archive_reader.trace"); + } + break; + } + return 0; +} + +static struct my_option my_long_options[] = +{ + {"backup", 'b', + "Make a backup of an archive table.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"check", 'c', "Check table for errors.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, +#ifndef DBUG_OFF + {"debug", '#', + "Output debug log. Often this is 'd:t:o,filename'.", + 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, +#endif + {"extract-frm", 'e', + "Extract the frm file.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"force", 'f', + "Restart with -r if there are any errors in the table.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"help", '?', + "Display this help and exit.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"quick", 'q', "Faster repair by not modifying the data file.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"repair", 'r', "Repair a damaged Archive version 3 or above file.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"set-auto-increment", 'A', + "Force auto_increment to start at this or higher value. If no value is given, then sets the next auto_increment value to the highest used value for the auto key + 1.", + &new_auto_increment, &new_auto_increment, + 0, GET_ULL, OPT_ARG, 0, 0, 0, 0, 0, 0}, + {"silent", 's', + "Only print errors. One can use two -s to make archive_reader very silent.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + {"tmpdir", 't', + "Path for temporary files.", + &opt_tmpdir, + 0, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, + {"version", 'V', + "Print version and exit.", + 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}, + { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} +}; + +static void usage(void) +{ + print_version(); + puts("Copyright 2007-2008 MySQL AB, 2008 Sun Microsystems, Inc."); + puts("This software comes with ABSOLUTELY NO WARRANTY. This is free software,\nand you are welcome to modify and redistribute it under the GPL license\n"); + puts("Read and modify Archive files directly\n"); + printf("Usage: %s [OPTIONS] file_to_be_looked_at [file_for_backup]\n", my_progname); + print_defaults("my", load_default_groups); + my_print_help(my_long_options); +} + +static void print_version(void) +{ + printf("%s Ver %s Distrib %s, for %s (%s)\n", my_progname, SHOW_VERSION, + MYSQL_SERVER_VERSION, SYSTEM_TYPE, MACHINE_TYPE); +} + +static void get_options(int *argc, char ***argv) +{ + load_defaults_or_exit("my", load_default_groups, argc, argv); + default_argv= *argv; + + handle_options(argc, argv, my_long_options, get_one_option); + + if (*argc == 0) + { + usage(); + exit(-1); + } + + return; +} /* get options */ diff --git a/storage/archive/archive_test.c b/storage/archive/archive_test.c new file mode 100644 index 00000000..72f6d05e --- /dev/null +++ b/storage/archive/archive_test.c @@ -0,0 +1,292 @@ +/* Copyright (C) 2006 MySQL AB + Use is subject to license terms + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */ + +#include "azlib.h" +#include <string.h> +#include <assert.h> +#include <stdio.h> +#include <string.h> +#include <my_getopt.h> +#include <mysql_version.h> + +#define ARCHIVE_ROW_HEADER_SIZE 4 + +#define COMMENT_STRING "Your bases" +#define FRM_STRING "My bases" +#define TEST_FILENAME "test.az" +#define TEST_STRING_INIT "YOU don't know about me without you have read a book by the name of The Adventures of Tom Sawyer; but that ain't no matter. That book was made by Mr. Mark Twain, and he told the truth, mainly. There was things which he stretched, but mainly he told the truth. That is nothing. I never seen anybody but lied one time or another, without it was Aunt Polly, or the widow, or maybe Mary. Aunt Polly--Tom's Aunt Polly, she is--and Mary, and the Widow Douglas is all told about in that book, which is mostly a true book, with some stretchers, as I said before. Now the way that the book winds up is this: Tom and me found the money that the robbers hid in the cave, and it made us rich. We got six thousand dollars apiece--all gold. It was an awful sight of money when it was piled up. Well, Judge Thatcher he took it and put it out at interest, and it fetched us a dollar a day apiece all the year round --more than a body could tell what to do with. The Widow Douglas she took me for her son, and allowed she would..." +#define TEST_LOOP_NUM 100 + + +#define ARCHIVE_ROW_HEADER_SIZE 4 + +#define BUFFER_LEN (1024 + ARCHIVE_ROW_HEADER_SIZE) + +char test_string[BUFFER_LEN]; + +#define TWOGIG LL(2147483648) +#define FOURGIG LL(4294967296) +#define EIGHTGIG LL(8589934592) + +/* prototypes */ +int size_test(unsigned long long length, unsigned long long rows_to_test_for); + + +int main(int argc, char *argv[]) +{ + unsigned int ret; + char comment_str[10]; + + int error; + unsigned int x; + int written_rows= 0; + azio_stream writer_handle, reader_handle; + char buffer[BUFFER_LEN]; + + int4store(test_string, 1024); + memcpy(test_string+sizeof(unsigned int), TEST_STRING_INIT, 1024); + + unlink(TEST_FILENAME); + + if (argc > 1) + return 0; + + MY_INIT(argv[0]); + + if (!(ret= azopen(&writer_handle, TEST_FILENAME, O_CREAT|O_RDWR|O_BINARY))) + { + printf("Could not create test file\n"); + return 0; + } + + azwrite_comment(&writer_handle, (char *)COMMENT_STRING, + (unsigned int)strlen(COMMENT_STRING)); + azread_comment(&writer_handle, comment_str); + assert(!memcmp(COMMENT_STRING, comment_str, + strlen(COMMENT_STRING))); + + azwrite_frm(&writer_handle, (char *)FRM_STRING, + (unsigned int)strlen(FRM_STRING)); + azread_frm(&writer_handle, comment_str); + assert(!memcmp(FRM_STRING, comment_str, + strlen(FRM_STRING))); + + + if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY))) + { + printf("Could not open test file\n"); + return 0; + } + + assert(reader_handle.rows == 0); + assert(reader_handle.auto_increment == 0); + assert(reader_handle.check_point == 0); + assert(reader_handle.forced_flushes == 0); + assert(reader_handle.dirty == AZ_STATE_DIRTY); + + for (x= 0; x < TEST_LOOP_NUM; x++) + { + ret= azwrite(&writer_handle, test_string, BUFFER_LEN); + assert(ret == BUFFER_LEN); + written_rows++; + } + azflush(&writer_handle, Z_SYNC_FLUSH); + + azread_comment(&writer_handle, comment_str); + assert(!memcmp(COMMENT_STRING, comment_str, + strlen(COMMENT_STRING))); + + /* Lets test that our internal stats are good */ + assert(writer_handle.rows == TEST_LOOP_NUM); + + /* Reader needs to be flushed to make sure it is up to date */ + azflush(&reader_handle, Z_SYNC_FLUSH); + assert(reader_handle.rows == TEST_LOOP_NUM); + assert(reader_handle.auto_increment == 0); + assert(reader_handle.check_point == 96); + assert(reader_handle.forced_flushes == 1); + assert(reader_handle.comment_length == 10); + assert(reader_handle.dirty == AZ_STATE_SAVED); + + writer_handle.auto_increment= 4; + azflush(&writer_handle, Z_SYNC_FLUSH); + assert(writer_handle.rows == TEST_LOOP_NUM); + assert(writer_handle.auto_increment == 4); + assert(writer_handle.check_point == 96); + assert(writer_handle.forced_flushes == 2); + assert(writer_handle.dirty == AZ_STATE_SAVED); + + if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY))) + { + printf("Could not open test file\n"); + return 0; + } + + /* Read the original data */ + for (x= 0; x < writer_handle.rows; x++) + { + ret= azread(&reader_handle, buffer, BUFFER_LEN, &error); + assert(!error); + assert(ret == BUFFER_LEN); + assert(!memcmp(buffer, test_string, ret)); + } + assert(writer_handle.rows == TEST_LOOP_NUM); + + /* Test here for falling off the planet */ + + /* Final Write before closing */ + ret= azwrite(&writer_handle, test_string, BUFFER_LEN); + assert(ret == BUFFER_LEN); + + /* We don't use FINISH, but I want to have it tested */ + azflush(&writer_handle, Z_FINISH); + + assert(writer_handle.rows == TEST_LOOP_NUM+1); + + /* Read final write */ + azrewind(&reader_handle); + for (x= 0; x < writer_handle.rows; x++) + { + ret= azread(&reader_handle, buffer, BUFFER_LEN, &error); + assert(ret == BUFFER_LEN); + assert(!error); + assert(!memcmp(buffer, test_string, ret)); + } + + + azclose(&writer_handle); + + /* Rewind and full test */ + azrewind(&reader_handle); + for (x= 0; x < writer_handle.rows; x++) + { + ret= azread(&reader_handle, buffer, BUFFER_LEN, &error); + assert(ret == BUFFER_LEN); + assert(!error); + assert(!memcmp(buffer, test_string, ret)); + } + + printf("Finished reading\n"); + + if (!(ret= azopen(&writer_handle, TEST_FILENAME, O_RDWR|O_BINARY))) + { + printf("Could not open file (%s) for appending\n", TEST_FILENAME); + return 0; + } + ret= azwrite(&writer_handle, test_string, BUFFER_LEN); + assert(ret == BUFFER_LEN); + azflush(&writer_handle, Z_SYNC_FLUSH); + + /* Rewind and full test */ + azrewind(&reader_handle); + for (x= 0; x < writer_handle.rows; x++) + { + ret= azread(&reader_handle, buffer, BUFFER_LEN, &error); + assert(!error); + assert(ret == BUFFER_LEN); + assert(!memcmp(buffer, test_string, ret)); + } + + /* Reader needs to be flushed to make sure it is up to date */ + azflush(&reader_handle, Z_SYNC_FLUSH); + assert(reader_handle.rows == 102); + assert(reader_handle.auto_increment == 4); + assert(reader_handle.check_point == 1290); + assert(reader_handle.forced_flushes == 4); + assert(reader_handle.dirty == AZ_STATE_SAVED); + + azflush(&writer_handle, Z_SYNC_FLUSH); + assert(writer_handle.rows == reader_handle.rows); + assert(writer_handle.auto_increment == reader_handle.auto_increment); + assert(writer_handle.check_point == reader_handle.check_point); + /* This is +1 because we do a flush right before we read */ + assert(writer_handle.forced_flushes == reader_handle.forced_flushes + 1); + assert(writer_handle.dirty == reader_handle.dirty); + + azclose(&writer_handle); + azclose(&reader_handle); + unlink(TEST_FILENAME); + + /* Start size tests */ + printf("About to run 2/4/8 gig tests now, you may want to hit CTRL-C\n"); + size_test(TWOGIG, 2088992L); + size_test(FOURGIG, 4177984L); + size_test(EIGHTGIG, 8355968L); + + return 0; +} + +int size_test(unsigned long long length, unsigned long long rows_to_test_for) +{ + azio_stream writer_handle, reader_handle; + unsigned long long write_length; + unsigned long long read_length= 0; + unsigned long long count; + unsigned int ret; + char buffer[BUFFER_LEN]; + int error; + + if (!(ret= azopen(&writer_handle, TEST_FILENAME, O_CREAT|O_RDWR|O_TRUNC|O_BINARY))) + { + printf("Could not create test file\n"); + return 0; + } + + for (count= 0, write_length= 0; write_length < length ; + write_length+= ret) + { + count++; + ret= azwrite(&writer_handle, test_string, BUFFER_LEN); + if (ret != BUFFER_LEN) + { + printf("Size %u\n", ret); + assert(ret != BUFFER_LEN); + } + if ((write_length % 14031) == 0) + { + azflush(&writer_handle, Z_SYNC_FLUSH); + } + } + assert(write_length != count * BUFFER_LEN); /* Number of rows time BUFFER_LEN */ + azflush(&writer_handle, Z_SYNC_FLUSH); + + printf("Reading back data\n"); + + if (!(ret= azopen(&reader_handle, TEST_FILENAME, O_RDONLY|O_BINARY))) + { + printf("Could not open test file\n"); + return 0; + } + + while ((ret= azread(&reader_handle, buffer, BUFFER_LEN, &error))) + { + read_length+= ret; + assert(!memcmp(buffer, test_string, ret)); + if (ret != BUFFER_LEN) + { + printf("Size %u\n", ret); + assert(ret != BUFFER_LEN); + } + } + + assert(read_length == write_length); + assert(writer_handle.rows == rows_to_test_for); + azclose(&writer_handle); + azclose(&reader_handle); + unlink(TEST_FILENAME); + + return 0; +} diff --git a/storage/archive/azio.c b/storage/archive/azio.c new file mode 100644 index 00000000..01911b4b --- /dev/null +++ b/storage/archive/azio.c @@ -0,0 +1,937 @@ +/* + azio is a modified version of gzio. It makes use of mysys and removes mallocs. + -Brian Aker +*/ + +/* gzio.c -- IO on .gz files + * Copyright (C) 1995-2005 Jean-loup Gailly. + * For conditions of distribution and use, see copyright notice in zlib.h + * + */ + +/* @(#) $Id$ */ + +#include "azlib.h" + +#include <stdio.h> +#include <string.h> + +#include "my_sys.h" + +static int const gz_magic[2] = {0x1f, 0x8b}; /* gzip magic header */ +static int const az_magic[3] = {0xfe, 0x03, 0x01}; /* az magic header */ + +/* gzip flag uchar */ +#define ASCII_FLAG 0x01 /* bit 0 set: file probably ascii text */ +#define HEAD_CRC 0x02 /* bit 1 set: header CRC present */ +#define EXTRA_FIELD 0x04 /* bit 2 set: extra field present */ +#define ORIG_NAME 0x08 /* bit 3 set: original file name present */ +#define COMMENT 0x10 /* bit 4 set: file comment present */ +#define RESERVED 0xE0 /* bits 5..7: reserved */ + +int az_open(azio_stream *s, const char *path, int Flags, File fd); +int do_flush(azio_stream *file, int flush); +int get_byte(azio_stream *s); +void check_header(azio_stream *s); +int write_header(azio_stream *s); +int destroy(azio_stream *s); +void putLong(File file, uLong x); +uLong getLong(azio_stream *s); +void read_header(azio_stream *s, unsigned char *buffer); + +#ifdef HAVE_PSI_INTERFACE +extern PSI_file_key arch_key_file_data; +#endif + +/* =========================================================================== + Opens a gzip (.gz) file for reading or writing. The mode parameter + is as in fopen ("rb" or "wb"). The file is given either by file descriptor + or path name (if fd == -1). + az_open returns NULL if the file could not be opened or if there was + insufficient memory to allocate the (de)compression state; errno + can be checked to distinguish the two cases (if errno is zero, the + zlib error is Z_MEM_ERROR). +*/ +int az_open (azio_stream *s, const char *path, int Flags, File fd) +{ + int err; + int level = Z_DEFAULT_COMPRESSION; /* compression level */ + int strategy = Z_DEFAULT_STRATEGY; /* compression strategy */ + + s->stream.zalloc = my_az_allocator; + s->stream.zfree = my_az_free; + s->stream.opaque = (voidpf)0; + memset(s->inbuf, 0, AZ_BUFSIZE_READ); + memset(s->outbuf, 0, AZ_BUFSIZE_WRITE); + s->stream.next_in = s->inbuf; + s->stream.next_out = s->outbuf; + s->stream.avail_in = s->stream.avail_out = 0; + s->z_err = Z_OK; + s->z_eof = 0; + s->in = 0; + s->out = 0; + s->back = EOF; + s->crc = 0; + s->transparent = 0; + s->mode = 'r'; + s->version = (unsigned char)az_magic[1]; /* this needs to be a define to version */ + s->minor_version= (unsigned char) az_magic[2]; /* minor version */ + s->dirty= AZ_STATE_CLEAN; + s->start= 0; + + /* + We do our own version of append by nature. + We must always have write access to take card of the header. + */ + DBUG_ASSERT(Flags | O_APPEND); + DBUG_ASSERT(Flags | O_WRONLY); + + if (Flags & O_RDWR) + s->mode = 'w'; + + if (s->mode == 'w') + { + err = deflateInit2(&(s->stream), level, + Z_DEFLATED, -MAX_WBITS, 8, strategy); + /* windowBits is passed < 0 to suppress zlib header */ + + s->stream.next_out = s->outbuf; + if (err != Z_OK) + { + destroy(s); + return Z_NULL; + } + } else { + s->stream.next_in = s->inbuf; + + err = inflateInit2(&(s->stream), -MAX_WBITS); + /* windowBits is passed < 0 to tell that there is no zlib header. + * Note that in this case inflate *requires* an extra "dummy" byte + * after the compressed stream in order to complete decompression and + * return Z_STREAM_END. Here the gzip CRC32 ensures that 4 bytes are + * present after the compressed stream. + */ + if (err != Z_OK) + { + destroy(s); + return Z_NULL; + } + } + s->stream.avail_out = AZ_BUFSIZE_WRITE; + + errno = 0; + s->file = fd < 0 ? mysql_file_open(arch_key_file_data, path, Flags, MYF(0)) : fd; + DBUG_EXECUTE_IF("simulate_archive_open_failure", + { + if (s->file >= 0) + { + my_close(s->file, MYF(0)); + s->file= -1; + my_errno= EMFILE; + } + }); + + if (s->file < 0 ) + { + destroy(s); + return Z_NULL; + } + + if (Flags & O_CREAT || Flags & O_TRUNC) + { + s->rows= 0; + s->forced_flushes= 0; + s->shortest_row= 0; + s->longest_row= 0; + s->auto_increment= 0; + s->check_point= 0; + s->comment_start_pos= 0; + s->comment_length= 0; + s->frm_start_pos= 0; + s->frm_length= 0; + s->dirty= 1; /* We create the file dirty */ + s->start = AZHEADER_SIZE + AZMETA_BUFFER_SIZE; + write_header(s); + my_seek(s->file, 0, MY_SEEK_END, MYF(0)); + } + else if (s->mode == 'w') + { + uchar buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE]; + my_pread(s->file, buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0, + MYF(0)); + read_header(s, buffer); /* skip the .az header */ + my_seek(s->file, 0, MY_SEEK_END, MYF(0)); + } + else + { + /* Reset values in case of old version of archive file */ + s->rows= 0; + s->forced_flushes= 0; + s->shortest_row= 0; + s->longest_row= 0; + s->auto_increment= 0; + s->check_point= 0; + s->comment_start_pos= 0; + s->comment_length= 0; + s->frm_start_pos= 0; + s->frm_length= 0; + check_header(s); /* skip the .az header */ + } + + return 1; +} + + +int write_header(azio_stream *s) +{ + char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE]; + char *ptr= buffer; + + if (s->version == 1) + return 0; + + s->block_size= AZ_BUFSIZE_WRITE; + s->version = (unsigned char)az_magic[1]; + s->minor_version = (unsigned char)az_magic[2]; + + + /* Write a very simple .az header: */ + memset(buffer, 0, AZHEADER_SIZE + AZMETA_BUFFER_SIZE); + *(ptr + AZ_MAGIC_POS)= (char) az_magic[0]; + *(ptr + AZ_VERSION_POS)= (unsigned char)s->version; + *(ptr + AZ_MINOR_VERSION_POS)= (unsigned char)s->minor_version; + *(ptr + AZ_BLOCK_POS)= (unsigned char)(s->block_size/1024); /* Reserved for block size */ + *(ptr + AZ_STRATEGY_POS)= (unsigned char)Z_DEFAULT_STRATEGY; /* Compression Type */ + + int4store(ptr + AZ_FRM_POS, s->frm_start_pos); /* FRM Block */ + int4store(ptr + AZ_FRM_LENGTH_POS, s->frm_length); /* FRM Block */ + int4store(ptr + AZ_COMMENT_POS, s->comment_start_pos); /* COMMENT Block */ + int4store(ptr + AZ_COMMENT_LENGTH_POS, s->comment_length); /* COMMENT Block */ + int4store(ptr + AZ_META_POS, 0); /* Meta Block */ + int4store(ptr + AZ_META_LENGTH_POS, 0); /* Meta Block */ + int8store(ptr + AZ_START_POS, (unsigned long long)s->start); /* Start of Data Block Index Block */ + int8store(ptr + AZ_ROW_POS, (unsigned long long)s->rows); /* Start of Data Block Index Block */ + int8store(ptr + AZ_FLUSH_POS, (unsigned long long)s->forced_flushes); /* Start of Data Block Index Block */ + int8store(ptr + AZ_CHECK_POS, (unsigned long long)s->check_point); /* Start of Data Block Index Block */ + int8store(ptr + AZ_AUTOINCREMENT_POS, (unsigned long long)s->auto_increment); /* Start of Data Block Index Block */ + int4store(ptr+ AZ_LONGEST_POS , s->longest_row); /* Longest row */ + int4store(ptr+ AZ_SHORTEST_POS, s->shortest_row); /* Shorest row */ + int4store(ptr+ AZ_FRM_POS, + AZHEADER_SIZE + AZMETA_BUFFER_SIZE); /* FRM position */ + *(ptr + AZ_DIRTY_POS)= (unsigned char)s->dirty; /* Start of Data Block Index Block */ + + /* Always begin at the beginning, and end there as well */ + return my_pwrite(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, + 0, MYF(MY_NABP)) ? 1 : 0; +} + +/* =========================================================================== + Opens a gzip (.gz) file for reading or writing. +*/ +int azopen(azio_stream *s, const char *path, int Flags) +{ + return az_open(s, path, Flags, -1); +} + +/* =========================================================================== + Associate a gzFile with the file descriptor fd. fd is not dup'ed here + to mimic the behavio(u)r of fdopen. +*/ +int azdopen(azio_stream *s, File fd, int Flags) +{ + if (fd < 0) return 0; + + return az_open (s, NULL, Flags, fd); +} + +/* =========================================================================== + Read a byte from a azio_stream; update next_in and avail_in. Return EOF + for end of file. + IN assertion: the stream s has been sucessfully opened for reading. +*/ +int get_byte(azio_stream *s) +{ + if (s->z_eof) return EOF; + if (s->stream.avail_in == 0) + { + errno = 0; + s->stream.avail_in= (uInt) mysql_file_read(s->file, (uchar *)s->inbuf, + AZ_BUFSIZE_READ, MYF(0)); + if (s->stream.avail_in == 0) + { + s->z_eof = 1; + return EOF; + } + else if (s->stream.avail_in == (uInt) -1) + { + s->z_eof= 1; + s->z_err= Z_ERRNO; + return EOF; + } + s->stream.next_in = s->inbuf; + } + s->stream.avail_in--; + return *(s->stream.next_in)++; +} + +/* =========================================================================== + Check the gzip header of a azio_stream opened for reading. Set the stream + mode to transparent if the gzip magic header is not present; set s->err + to Z_DATA_ERROR if the magic header is present but the rest of the header + is incorrect. + IN assertion: the stream s has already been created sucessfully; + s->stream.avail_in is zero for the first time, but may be non-zero + for concatenated .gz files. +*/ +void check_header(azio_stream *s) +{ + int method; /* method uchar */ + int flags; /* flags uchar */ + uInt len; + int c; + + /* Assure two bytes in the buffer so we can peek ahead -- handle case + where first byte of header is at the end of the buffer after the last + gzip segment */ + len = s->stream.avail_in; + if (len < 2) { + if (len) s->inbuf[0] = s->stream.next_in[0]; + errno = 0; + len = (uInt)mysql_file_read(s->file, (uchar *)s->inbuf + len, + AZ_BUFSIZE_READ >> len, MYF(0)); + if (len == (uInt)-1) s->z_err = Z_ERRNO; + s->stream.avail_in += len; + s->stream.next_in = s->inbuf; + if (s->stream.avail_in < 2) { + s->transparent = s->stream.avail_in; + return; + } + } + + /* Peek ahead to check the gzip magic header */ + if ( s->stream.next_in[0] == gz_magic[0] && s->stream.next_in[1] == gz_magic[1]) + { + read_header(s, s->stream.next_in); + s->stream.avail_in -= 2; + s->stream.next_in += 2; + + /* Check the rest of the gzip header */ + method = get_byte(s); + flags = get_byte(s); + if (method != Z_DEFLATED || (flags & RESERVED) != 0) { + s->z_err = Z_DATA_ERROR; + return; + } + + /* Discard time, xflags and OS code: */ + for (len = 0; len < 6; len++) (void)get_byte(s); + + if ((flags & EXTRA_FIELD) != 0) { /* skip the extra field */ + len = (uInt)get_byte(s); + len += ((uInt)get_byte(s))<<8; + /* len is garbage if EOF but the loop below will quit anyway */ + while (len-- != 0 && get_byte(s) != EOF) ; + } + if ((flags & ORIG_NAME) != 0) { /* skip the original file name */ + while ((c = get_byte(s)) != 0 && c != EOF) ; + } + if ((flags & COMMENT) != 0) { /* skip the .gz file comment */ + while ((c = get_byte(s)) != 0 && c != EOF) ; + } + if ((flags & HEAD_CRC) != 0) { /* skip the header crc */ + for (len = 0; len < 2; len++) (void)get_byte(s); + } + s->z_err = s->z_eof ? Z_DATA_ERROR : Z_OK; + if (!s->start) + s->start= my_tell(s->file, MYF(0)) - s->stream.avail_in; + } + else if ( s->stream.next_in[0] == az_magic[0] && s->stream.next_in[1] == az_magic[1]) + { + unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE]; + + for (len = 0; len < (AZHEADER_SIZE + AZMETA_BUFFER_SIZE); len++) + buffer[len]= get_byte(s); + s->z_err = s->z_eof ? Z_DATA_ERROR : Z_OK; + read_header(s, buffer); + for (; len < s->start; len++) + get_byte(s); + } + else + { + s->z_err = Z_OK; + + return; + } +} + +void read_header(azio_stream *s, unsigned char *buffer) +{ + if (buffer[0] == az_magic[0] && buffer[1] == az_magic[1]) + { + uchar tmp[AZ_FRMVER_LEN + 2]; + + s->version= (unsigned int)buffer[AZ_VERSION_POS]; + s->minor_version= (unsigned int)buffer[AZ_MINOR_VERSION_POS]; + s->block_size= 1024 * buffer[AZ_BLOCK_POS]; + s->start= (unsigned long long)uint8korr(buffer + AZ_START_POS); + s->rows= (unsigned long long)uint8korr(buffer + AZ_ROW_POS); + s->check_point= (unsigned long long)uint8korr(buffer + AZ_CHECK_POS); + s->forced_flushes= (unsigned long long)uint8korr(buffer + AZ_FLUSH_POS); + s->auto_increment= (unsigned long long)uint8korr(buffer + AZ_AUTOINCREMENT_POS); + s->longest_row= (unsigned int)uint4korr(buffer + AZ_LONGEST_POS); + s->shortest_row= (unsigned int)uint4korr(buffer + AZ_SHORTEST_POS); + s->frm_start_pos= (unsigned int)uint4korr(buffer + AZ_FRM_POS); + s->frm_length= (unsigned int)uint4korr(buffer + AZ_FRM_LENGTH_POS); + s->comment_start_pos= (unsigned int)uint4korr(buffer + AZ_COMMENT_POS); + s->comment_length= (unsigned int)uint4korr(buffer + AZ_COMMENT_LENGTH_POS); + s->dirty= (unsigned int)buffer[AZ_DIRTY_POS]; + + /* + we'll hard-code the current frm format for now, to avoid + changing archive table versions. + */ + if (s->frm_length == 0 || + my_pread(s->file, tmp, sizeof(tmp), s->frm_start_pos + 64, MYF(MY_NABP)) || + tmp[0] != 0 || tmp[1] != AZ_FRMVER_LEN) + { + s->frmver_length= 0; + } + else + { + s->frmver_length= tmp[1]; + memcpy(s->frmver, tmp+2, s->frmver_length); + } + } + else if (buffer[0] == gz_magic[0] && buffer[1] == gz_magic[1]) + { + /* + Set version number to previous version (1). + */ + s->version= 1; + s->auto_increment= 0; + s->frm_length= 0; + s->longest_row= 0; + s->shortest_row= 0; + } else { + /* + Unknown version. + Most probably due to a corrupt archive. + */ + s->dirty= AZ_STATE_DIRTY; + s->z_err= Z_VERSION_ERROR; + } +} + +/* =========================================================================== + * Cleanup then free the given azio_stream. Return a zlib error code. + Try freeing in the reverse order of allocations. + */ +int destroy (azio_stream *s) +{ + int err = Z_OK; + + if (s->stream.state != NULL) + { + if (s->mode == 'w') + err = deflateEnd(&(s->stream)); + else if (s->mode == 'r') + err = inflateEnd(&(s->stream)); + } + + if (s->file > 0 && my_close(s->file, MYF(0))) + err = Z_ERRNO; + + s->file= -1; + + if (s->z_err < 0) err = s->z_err; + + return err; +} + +/* =========================================================================== + Reads the given number of uncompressed bytes from the compressed file. + azread returns the number of bytes actually read (0 for end of file). +*/ +unsigned int ZEXPORT azread ( azio_stream *s, voidp buf, size_t len, int *error) +{ + Bytef *start = (Bytef*)buf; /* starting point for crc computation */ + Byte *next_out; /* == stream.next_out but not forced far (for MSDOS) */ + *error= 0; + + if (s->mode != 'r') + { + *error= Z_STREAM_ERROR; + return 0; + } + + if (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO) + { + *error= s->z_err; + return 0; + } + + if (s->z_err == Z_STREAM_END) /* EOF */ + { + return 0; + } + + next_out = (Byte*)buf; + s->stream.next_out = (Bytef*)buf; + s->stream.avail_out = (uInt)len; + + if (s->stream.avail_out && s->back != EOF) { + *next_out++ = s->back; + s->stream.next_out++; + s->stream.avail_out--; + s->back = EOF; + s->out++; + start++; + if (s->last) { + s->z_err = Z_STREAM_END; + { + return 1; + } + } + } + + while (s->stream.avail_out != 0) { + + if (s->transparent) { + /* Copy first the lookahead bytes: */ + uInt n = s->stream.avail_in; + if (n > s->stream.avail_out) n = s->stream.avail_out; + if (n > 0) { + memcpy(s->stream.next_out, s->stream.next_in, n); + next_out += n; + s->stream.next_out = (Bytef *)next_out; + s->stream.next_in += n; + s->stream.avail_out -= n; + s->stream.avail_in -= n; + } + if (s->stream.avail_out > 0) + { + s->stream.avail_out -= + (uInt)mysql_file_read(s->file, (uchar *)next_out, + s->stream.avail_out, MYF(0)); + } + len -= s->stream.avail_out; + s->in += len; + s->out += len; + if (len == 0) s->z_eof = 1; + { + return (uint)len; + } + } + if (s->stream.avail_in == 0 && !s->z_eof) { + + errno = 0; + s->stream.avail_in = (uInt)mysql_file_read(s->file, (uchar *)s->inbuf, + AZ_BUFSIZE_READ, MYF(0)); + if (s->stream.avail_in == 0) + { + s->z_eof = 1; + } + s->stream.next_in = (Bytef *)s->inbuf; + } + s->in += s->stream.avail_in; + s->out += s->stream.avail_out; + s->z_err = inflate(&(s->stream), Z_NO_FLUSH); + s->in -= s->stream.avail_in; + s->out -= s->stream.avail_out; + + if (s->z_err == Z_STREAM_END) { + /* Check CRC and original size */ + s->crc = crc32(s->crc, start, (uInt)(s->stream.next_out - start)); + start = s->stream.next_out; + + if (getLong(s) != s->crc) { + s->z_err = Z_DATA_ERROR; + } else { + (void)getLong(s); + /* The uncompressed length returned by above getlong() may be + * different from s->out in case of concatenated .gz files. + * Check for such files: + */ + check_header(s); + if (s->z_err == Z_OK) + { + inflateReset(&(s->stream)); + s->crc = crc32(0L, Z_NULL, 0); + } + } + } + if (s->z_err != Z_OK || s->z_eof) break; + } + s->crc = crc32(s->crc, start, (uInt)(s->stream.next_out - start)); + + if (len == s->stream.avail_out && + (s->z_err == Z_DATA_ERROR || s->z_err == Z_ERRNO)) + { + *error= s->z_err; + + return 0; + } + + return (uint)(len - s->stream.avail_out); +} + + +/* =========================================================================== + Writes the given number of uncompressed bytes into the compressed file. + azwrite returns the number of bytes actually written (0 in case of error). +*/ +unsigned int azwrite (azio_stream *s, const voidp buf, unsigned int len) +{ + s->stream.next_in = (Bytef*)buf; + s->stream.avail_in = len; + + s->rows++; + + while (s->stream.avail_in != 0) + { + if (s->stream.avail_out == 0) + { + + s->stream.next_out = s->outbuf; + if (mysql_file_write(s->file, (uchar *)s->outbuf, AZ_BUFSIZE_WRITE, + MYF(0)) != AZ_BUFSIZE_WRITE) + { + s->z_err = Z_ERRNO; + break; + } + s->stream.avail_out = AZ_BUFSIZE_WRITE; + } + s->in += s->stream.avail_in; + s->out += s->stream.avail_out; + s->z_err = deflate(&(s->stream), Z_NO_FLUSH); + s->in -= s->stream.avail_in; + s->out -= s->stream.avail_out; + if (s->z_err != Z_OK) break; + } + s->crc = crc32(s->crc, (const Bytef *)buf, len); + + if (len > s->longest_row) + s->longest_row= len; + + if (len < s->shortest_row || !(s->shortest_row)) + s->shortest_row= len; + + return (unsigned int)(len - s->stream.avail_in); +} + + +/* =========================================================================== + Flushes all pending output into the compressed file. The parameter + flush is as in the deflate() function. +*/ +int do_flush (azio_stream *s, int flush) +{ + uInt len; + int done = 0; + my_off_t afterwrite_pos; + + if (s == NULL || s->mode != 'w') return Z_STREAM_ERROR; + + s->stream.avail_in = 0; /* should be zero already anyway */ + + for (;;) + { + len = AZ_BUFSIZE_WRITE - s->stream.avail_out; + + if (len != 0) + { + s->check_point= my_tell(s->file, MYF(0)); + if ((uInt)mysql_file_write(s->file, (uchar *)s->outbuf, len, MYF(0)) != len) + { + s->z_err = Z_ERRNO; + return Z_ERRNO; + } + s->stream.next_out = s->outbuf; + s->stream.avail_out = AZ_BUFSIZE_WRITE; + } + if (done) break; + s->out += s->stream.avail_out; + s->z_err = deflate(&(s->stream), flush); + s->out -= s->stream.avail_out; + + /* Ignore the second of two consecutive flushes: */ + if (len == 0 && s->z_err == Z_BUF_ERROR) s->z_err = Z_OK; + + /* deflate has finished flushing only when it hasn't used up + * all the available space in the output buffer: + */ + done = (s->stream.avail_out != 0 || s->z_err == Z_STREAM_END); + + if (s->z_err != Z_OK && s->z_err != Z_STREAM_END) break; + } + + if (flush == Z_FINISH) + s->dirty= AZ_STATE_CLEAN; /* Mark it clean, we should be good now */ + else + s->dirty= AZ_STATE_SAVED; /* Mark it clean, we should be good now */ + + afterwrite_pos= my_tell(s->file, MYF(0)); + write_header(s); + my_seek(s->file, afterwrite_pos, SEEK_SET, MYF(0)); + + return s->z_err == Z_STREAM_END ? Z_OK : s->z_err; +} + +int ZEXPORT azflush (azio_stream *s, int flush) +{ + int err; + + if (s->mode == 'r') + { + unsigned char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE]; + my_pread(s->file, (uchar*) buffer, AZHEADER_SIZE + AZMETA_BUFFER_SIZE, 0, + MYF(0)); + read_header(s, buffer); /* skip the .az header */ + + return Z_OK; + } + else + { + s->forced_flushes++; + err= do_flush(s, flush); + + if (err) return err; + my_sync(s->file, MYF(0)); + return s->z_err == Z_STREAM_END ? Z_OK : s->z_err; + } +} + +/* =========================================================================== + Rewinds input file. +*/ +int azrewind (azio_stream *s) +{ + if (s == NULL || s->mode != 'r') return -1; + + s->z_err = Z_OK; + s->z_eof = 0; + s->back = EOF; + s->stream.avail_in = 0; + s->stream.next_in = (Bytef *)s->inbuf; + s->crc = crc32(0L, Z_NULL, 0); + if (!s->transparent) (void)inflateReset(&s->stream); + s->in = 0; + s->out = 0; + return my_seek(s->file, (int)s->start, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR; +} + +/* =========================================================================== + Sets the starting position for the next azread or azwrite on the given + compressed file. The offset represents a number of bytes in the + azseek returns the resulting offset location as measured in bytes from + the beginning of the uncompressed stream, or -1 in case of error. + SEEK_END is not implemented, returns error. + In this version of the library, azseek can be extremely slow. +*/ +my_off_t azseek (azio_stream *s, my_off_t offset, int whence) +{ + + if (s == NULL || whence == SEEK_END || + s->z_err == Z_ERRNO || s->z_err == Z_DATA_ERROR) { + return -1L; + } + + if (s->mode == 'w') + { + if (whence == SEEK_SET) + offset -= s->in; + + /* At this point, offset is the number of zero bytes to write. */ + /* There was a zmemzero here if inbuf was null -Brian */ + while (offset > 0) + { + uInt size = AZ_BUFSIZE_READ; + if (offset < AZ_BUFSIZE_READ) size = (uInt)offset; + + size = azwrite(s, s->inbuf, size); + if (size == 0) return -1L; + + offset -= size; + } + return s->in; + } + /* Rest of function is for reading only */ + + /* compute absolute position */ + if (whence == SEEK_CUR) { + offset += s->out; + } + + if (s->transparent) { + /* map to my_seek */ + s->back = EOF; + s->stream.avail_in = 0; + s->stream.next_in = (Bytef *)s->inbuf; + if (my_seek(s->file, offset, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR) return -1L; + + s->in = s->out = offset; + return offset; + } + + /* For a negative seek, rewind and use positive seek */ + if (offset >= s->out) { + offset -= s->out; + } else if (azrewind(s)) { + return -1L; + } + /* offset is now the number of bytes to skip. */ + + if (offset && s->back != EOF) { + s->back = EOF; + s->out++; + offset--; + if (s->last) s->z_err = Z_STREAM_END; + } + while (offset > 0) { + int error; + unsigned int size = AZ_BUFSIZE_WRITE; + if (offset < AZ_BUFSIZE_WRITE) size = (int)offset; + + size = azread(s, s->outbuf, size, &error); + if (error < 0) return -1L; + offset -= size; + } + return s->out; +} + +/* =========================================================================== + Returns the starting position for the next azread or azwrite on the + given compressed file. This position represents a number of bytes in the + uncompressed data stream. +*/ +my_off_t ZEXPORT aztell (azio_stream *file) +{ + return azseek(file, 0L, SEEK_CUR); +} + + +/* =========================================================================== + Outputs a long in LSB order to the given file +*/ +void putLong (File file, uLong x) +{ + int n; + uchar buffer[1]; + + for (n = 0; n < 4; n++) + { + buffer[0]= (int)(x & 0xff); + mysql_file_write(file, buffer, 1, MYF(0)); + x >>= 8; + } +} + +/* =========================================================================== + Reads a long in LSB order from the given azio_stream. Sets z_err in case + of error. +*/ +uLong getLong (azio_stream *s) +{ + uLong x = (uLong)get_byte(s); + int c; + + x += ((uLong)get_byte(s))<<8; + x += ((uLong)get_byte(s))<<16; + c = get_byte(s); + if (c == EOF) s->z_err = Z_DATA_ERROR; + x += ((uLong)c)<<24; + return x; +} + +/* =========================================================================== + Flushes all pending output if necessary, closes the compressed file + and deallocates all the (de)compression state. +*/ +int azclose (azio_stream *s) +{ + + if (s == NULL) return Z_STREAM_ERROR; + + if (s->file < 1) return Z_OK; + + if (s->mode == 'w') + { + if (do_flush(s, Z_FINISH) != Z_OK) + { + destroy(s); + return Z_ERRNO; + } + + putLong(s->file, s->crc); + putLong(s->file, (uLong)(s->in & 0xffffffff)); + s->dirty= AZ_STATE_CLEAN; + s->check_point= my_tell(s->file, MYF(0)); + write_header(s); + } + + return destroy(s); +} + +/* + Though this was added to support MySQL's FRM file, anything can be + stored in this location. +*/ +int azwrite_frm(azio_stream *s, const uchar *blob, size_t length) +{ + if (s->mode == 'r') + return 1; + + if (s->rows > 0) + return 1; + + s->frm_start_pos= (uint) s->start; + s->frm_length= (uint)length; + s->start+= length; + + if (my_pwrite(s->file, blob, s->frm_length, + s->frm_start_pos, MYF(MY_NABP)) || + write_header(s) || + (my_seek(s->file, 0, MY_SEEK_END, MYF(0)) == MY_FILEPOS_ERROR)) + return 1; + + return 0; +} + +int azread_frm(azio_stream *s, uchar *blob) +{ + return my_pread(s->file, blob, s->frm_length, + s->frm_start_pos, MYF(MY_NABP)) ? 1 : 0; +} + + +/* + Simple comment field +*/ +int azwrite_comment(azio_stream *s, const char *blob, size_t length) +{ + if (s->mode == 'r') + return 1; + + if (s->rows > 0) + return 1; + + s->comment_start_pos= (uint) s->start; + s->comment_length= (uint)length; + s->start+= length; + + my_pwrite(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos, + MYF(0)); + + write_header(s); + my_seek(s->file, 0, MY_SEEK_END, MYF(0)); + + return 0; +} + +int azread_comment(azio_stream *s, char *blob) +{ + my_pread(s->file, (uchar*) blob, s->comment_length, s->comment_start_pos, + MYF(0)); + + return 0; +} diff --git a/storage/archive/azlib.h b/storage/archive/azlib.h new file mode 100644 index 00000000..20725aac --- /dev/null +++ b/storage/archive/azlib.h @@ -0,0 +1,345 @@ +/* + This libary has been modified for use by the MySQL Archive Engine. + -Brian Aker +*/ + +/* zlib.h -- interface of the 'zlib' general purpose compression library + version 1.2.3, July 18th, 2005 + + Copyright (C) 1995-2005 Jean-loup Gailly and Mark Adler + + This software is provided 'as-is', without any express or implied + warranty. In no event will the authors be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + 3. This notice may not be removed or altered from any source distribution. + + Jean-loup Gailly Mark Adler + jloup@gzip.org madler@alumni.caltech.edu + + + The data format used by the zlib library is described by RFCs (Request for + Comments) 1950 to 1952 in the files http://www.ietf.org/rfc/rfc1950.txt + (zlib format), rfc1951.txt (deflate format) and rfc1952.txt (gzip format). +*/ + +#include "../../mysys/mysys_priv.h" +#include <my_dir.h> +#include <zlib.h> + +#ifdef __cplusplus +extern "C" { +#endif +/* Start of MySQL Specific Information */ + +/* + ulonglong + ulonglong + ulonglong + ulonglong + uchar +*/ +#define AZMETA_BUFFER_SIZE sizeof(unsigned long long) \ + + sizeof(unsigned long long) + sizeof(unsigned long long) + sizeof(unsigned long long) \ + + sizeof(unsigned int) + sizeof(unsigned int) \ + + sizeof(unsigned int) + sizeof(unsigned int) \ + + sizeof(unsigned char) + +#define AZHEADER_SIZE 29 + +#define AZ_MAGIC_POS 0 +#define AZ_VERSION_POS 1 +#define AZ_MINOR_VERSION_POS 2 +#define AZ_BLOCK_POS 3 +#define AZ_STRATEGY_POS 4 +#define AZ_FRM_POS 5 +#define AZ_FRM_LENGTH_POS 9 +#define AZ_META_POS 13 +#define AZ_META_LENGTH_POS 17 +#define AZ_START_POS 21 +#define AZ_ROW_POS 29 +#define AZ_FLUSH_POS 37 +#define AZ_CHECK_POS 45 +#define AZ_AUTOINCREMENT_POS 53 +#define AZ_LONGEST_POS 61 +#define AZ_SHORTEST_POS 65 +#define AZ_COMMENT_POS 69 +#define AZ_COMMENT_LENGTH_POS 73 +#define AZ_DIRTY_POS 77 + + +/* + Flags for state +*/ +#define AZ_STATE_CLEAN 0 +#define AZ_STATE_DIRTY 1 +#define AZ_STATE_SAVED 2 +#define AZ_STATE_CRASHED 3 + +/* + The 'zlib' compression library provides in-memory compression and + decompression functions, including integrity checks of the uncompressed + data. This version of the library supports only one compression method + (deflation) but other algorithms will be added later and will have the same + stream interface. + + Compression can be done in a single step if the buffers are large + enough (for example if an input file is mmap'ed), or can be done by + repeated calls of the compression function. In the latter case, the + application must provide more input and/or consume the output + (providing more output space) before each call. + + The compressed data format used by default by the in-memory functions is + the zlib format, which is a zlib wrapper documented in RFC 1950, wrapped + around a deflate stream, which is itself documented in RFC 1951. + + The library also supports reading and writing files in gzip (.gz) format + with an interface similar to that of stdio using the functions that start + with "gz". The gzip format is different from the zlib format. gzip is a + gzip wrapper, documented in RFC 1952, wrapped around a deflate stream. + + This library can optionally read and write gzip streams in memory as well. + + The zlib format was designed to be compact and fast for use in memory + and on communications channels. The gzip format was designed for single- + file compression on file systems, has a larger header than zlib to maintain + directory information, and uses a different, slower check method than zlib. + + The library does not install any signal handler. The decoder checks + the consistency of the compressed data, so the library should never + crash even in case of corrupted input. +*/ + + +/* + The application must update next_in and avail_in when avail_in has + dropped to zero. It must update next_out and avail_out when avail_out + has dropped to zero. The application must initialize zalloc, zfree and + opaque before calling the init function. All other fields are set by the + compression library and must not be updated by the application. + + The opaque value provided by the application will be passed as the first + parameter for calls of zalloc and zfree. This can be useful for custom + memory management. The compression library attaches no meaning to the + opaque value. + + zalloc must return Z_NULL if there is not enough memory for the object. + If zlib is used in a multi-threaded application, zalloc and zfree must be + thread safe. + + On 16-bit systems, the functions zalloc and zfree must be able to allocate + exactly 65536 bytes, but will not be required to allocate more than this + if the symbol MAXSEG_64K is defined (see zconf.h). WARNING: On MSDOS, + pointers returned by zalloc for objects of exactly 65536 bytes *must* + have their offset normalized to zero. The default allocation function + provided by this library ensures this (see zutil.c). To reduce memory + requirements and avoid any allocation of 64K objects, at the expense of + compression ratio, compile the library with -DMAX_WBITS=14 (see zconf.h). + + The fields total_in and total_out can be used for statistics or + progress reports. After compression, total_in holds the total size of + the uncompressed data and may be saved for use in the decompressor + (particularly if the decompressor wants to decompress everything in + a single step). +*/ + + /* constants */ + +#define Z_NO_FLUSH 0 +#define Z_PARTIAL_FLUSH 1 /* will be removed, use Z_SYNC_FLUSH instead */ +#define Z_SYNC_FLUSH 2 +#define Z_FULL_FLUSH 3 +#define Z_FINISH 4 +#define Z_BLOCK 5 +/* Allowed flush values; see deflate() and inflate() below for details */ + +#define Z_OK 0 +#define Z_STREAM_END 1 +#define Z_NEED_DICT 2 +#define Z_ERRNO (-1) +#define Z_STREAM_ERROR (-2) +#define Z_DATA_ERROR (-3) +#define Z_MEM_ERROR (-4) +#define Z_BUF_ERROR (-5) +#define Z_VERSION_ERROR (-6) +/* Return codes for the compression/decompression functions. Negative + * values are errors, positive values are used for special but normal events. + */ + +#define Z_NO_COMPRESSION 0 +#define Z_BEST_SPEED 1 +#define Z_BEST_COMPRESSION 9 +#define Z_DEFAULT_COMPRESSION (-1) +/* compression levels */ + +#define Z_FILTERED 1 +#define Z_HUFFMAN_ONLY 2 +#define Z_RLE 3 +#define Z_FIXED 4 +#define Z_DEFAULT_STRATEGY 0 +/* compression strategy; see deflateInit2() below for details */ + +#define Z_BINARY 0 +#define Z_TEXT 1 +#define Z_ASCII Z_TEXT /* for compatibility with 1.2.2 and earlier */ +#define Z_UNKNOWN 2 +/* Possible values of the data_type field (though see inflate()) */ + +#define Z_DEFLATED 8 +/* The deflate compression method (the only one supported in this version) */ + +#define Z_NULL 0 /* for initializing zalloc, zfree, opaque */ +#define AZ_BUFSIZE_READ 32768 +#define AZ_BUFSIZE_WRITE 16384 + +#define AZ_FRMVER_LEN 16 /* same as MY_UUID_SIZE in 10.0.2 */ + +typedef struct azio_stream { + z_stream stream; + int z_err; /* error code for last stream operation */ + int z_eof; /* set if end of input file */ + File file; /* .gz file */ + Byte inbuf[AZ_BUFSIZE_READ]; /* input buffer */ + Byte outbuf[AZ_BUFSIZE_WRITE]; /* output buffer */ + uLong crc; /* crc32 of uncompressed data */ + char *msg; /* error message */ + int transparent; /* 1 if input file is not a .gz file */ + char mode; /* 'w' or 'r' */ + my_off_t start; /* start of compressed data in file (header skipped) */ + my_off_t in; /* bytes into deflate or inflate */ + my_off_t out; /* bytes out of deflate or inflate */ + int back; /* one character push-back */ + int last; /* true if push-back is last character */ + unsigned char version; /* Version */ + unsigned char minor_version; /* Version */ + unsigned int block_size; /* Block Size */ + unsigned long long check_point; /* Last position we checked */ + unsigned long long forced_flushes; /* Forced Flushes */ + unsigned long long rows; /* rows */ + unsigned long long auto_increment; /* auto increment field */ + unsigned int longest_row; /* Longest row */ + unsigned int shortest_row; /* Shortest row */ + unsigned char dirty; /* State of file */ + unsigned int frm_start_pos; /* Position for start of FRM */ + unsigned int frm_length; /* Position for start of FRM */ + unsigned char frmver[AZ_FRMVER_LEN]; + unsigned int frmver_length; + unsigned int comment_start_pos; /* Position for start of comment */ + unsigned int comment_length; /* Position for start of comment */ +} azio_stream; + + /* basic functions */ + +extern int azopen(azio_stream *s, const char *path, int Flags); +/* + Opens a gzip (.gz) file for reading or writing. The mode parameter + is as in fopen ("rb" or "wb") but can also include a compression level + ("wb9") or a strategy: 'f' for filtered data as in "wb6f", 'h' for + Huffman only compression as in "wb1h", or 'R' for run-length encoding + as in "wb1R". (See the description of deflateInit2 for more information + about the strategy parameter.) + + azopen can be used to read a file which is not in gzip format; in this + case gzread will directly read from the file without decompression. + + azopen returns NULL if the file could not be opened or if there was + insufficient memory to allocate the (de)compression state; errno + can be checked to distinguish the two cases (if errno is zero, the + zlib error is Z_MEM_ERROR). */ + +int azdopen(azio_stream *s,File fd, int Flags); +/* + azdopen() associates a azio_stream with the file descriptor fd. File + descriptors are obtained from calls like open, dup, creat, pipe or + fileno (in the file has been previously opened with fopen). + The mode parameter is as in azopen. + The next call of gzclose on the returned azio_stream will also close the + file descriptor fd, just like fclose(fdopen(fd), mode) closes the file + descriptor fd. If you want to keep fd open, use azdopen(dup(fd), mode). + azdopen returns NULL if there was insufficient memory to allocate + the (de)compression state. +*/ + + +extern unsigned int azread ( azio_stream *s, voidp buf, size_t len, int *error); +/* + Reads the given number of uncompressed bytes from the compressed file. + If the input file was not in gzip format, gzread copies the given number + of bytes into the buffer. + gzread returns the number of uncompressed bytes actually read (0 for + end of file, -1 for error). */ + +extern unsigned int azwrite (azio_stream *s, const voidp buf, unsigned int len); +/* + Writes the given number of uncompressed bytes into the compressed file. + azwrite returns the number of uncompressed bytes actually written + (0 in case of error). +*/ + + +extern int azflush(azio_stream *file, int flush); +/* + Flushes all pending output into the compressed file. The parameter + flush is as in the deflate() function. The return value is the zlib + error number (see function gzerror below). gzflush returns Z_OK if + the flush parameter is Z_FINISH and all output could be flushed. + gzflush should be called only when strictly necessary because it can + degrade compression. +*/ + +extern my_off_t azseek (azio_stream *file, + my_off_t offset, int whence); +/* + Sets the starting position for the next gzread or gzwrite on the + given compressed file. The offset represents a number of bytes in the + uncompressed data stream. The whence parameter is defined as in lseek(2); + the value SEEK_END is not supported. + If the file is opened for reading, this function is emulated but can be + extremely slow. If the file is opened for writing, only forward seeks are + supported; gzseek then compresses a sequence of zeroes up to the new + starting position. + + gzseek returns the resulting offset location as measured in bytes from + the beginning of the uncompressed stream, or -1 in case of error, in + particular if the file is opened for writing and the new starting position + would be before the current position. +*/ + +extern int azrewind(azio_stream *file); +/* + Rewinds the given file. This function is supported only for reading. + + gzrewind(file) is equivalent to (int)gzseek(file, 0L, SEEK_SET) +*/ + +extern my_off_t aztell(azio_stream *file); +/* + Returns the starting position for the next gzread or gzwrite on the + given compressed file. This position represents a number of bytes in the + uncompressed data stream. + + gztell(file) is equivalent to gzseek(file, 0L, SEEK_CUR) +*/ + +extern int azclose(azio_stream *file); +/* + Flushes all pending output if necessary, closes the compressed file + and deallocates all the (de)compression state. The return value is the zlib + error number (see function gzerror below). +*/ + +extern int azwrite_frm (azio_stream *s, const uchar *blob,size_t length); +extern int azread_frm (azio_stream *s, uchar *blob); +extern int azwrite_comment (azio_stream *s, const char *blob, + size_t length); +extern int azread_comment (azio_stream *s, char *blob); + +#ifdef __cplusplus +} +#endif diff --git a/storage/archive/ha_archive.cc b/storage/archive/ha_archive.cc new file mode 100644 index 00000000..19a0ffe0 --- /dev/null +++ b/storage/archive/ha_archive.cc @@ -0,0 +1,1965 @@ +/* + Copyright (c) 2004, 2014, Oracle and/or its affiliates + Copyright (c) 2010, 2014, SkySQL Ab. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA +*/ + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + +#include <my_global.h> +#include "sql_class.h" // SSV +#include "sql_table.h" // build_table_filename +#include <myisam.h> // T_EXTEND + +#include "ha_archive.h" +#include "discover.h" +#include <my_dir.h> + +#include <mysql/plugin.h> + +/* + First, if you want to understand storage engines you should look at + ha_example.cc and ha_example.h. + + This example was written as a test case for a customer who needed + a storage engine without indexes that could compress data very well. + So, welcome to a completely compressed storage engine. This storage + engine only does inserts. No replace, deletes, or updates. All reads are + complete table scans. Compression is done through a combination of packing + and making use of the zlib library + + We keep a file pointer open for each instance of ha_archive for each read + but for writes we keep one open file handle just for that. We flush it + only if we have a read occur. azip handles compressing lots of records + at once much better then doing lots of little records between writes. + It is possible to not lock on writes but this would then mean we couldn't + handle bulk inserts as well (that is if someone was trying to read at + the same time since we would want to flush). + + A "meta" file is kept alongside the data file. This file serves two purpose. + The first purpose is to track the number of rows in the table. The second + purpose is to determine if the table was closed properly or not. When the + meta file is first opened it is marked as dirty. It is opened when the table + itself is opened for writing. When the table is closed the new count for rows + is written to the meta file and the file is marked as clean. If the meta file + is opened and it is marked as dirty, it is assumed that a crash occurred. At + this point an error occurs and the user is told to rebuild the file. + A rebuild scans the rows and rewrites the meta file. If corruption is found + in the data file then the meta file is not repaired. + + At some point a recovery method for such a drastic case needs to be divised. + + Locks are row level, and you will get a consistant read. + + For performance as far as table scans go it is quite fast. I don't have + good numbers but locally it has out performed both Innodb and MyISAM. For + Innodb the question will be if the table can be fit into the buffer + pool. For MyISAM its a question of how much the file system caches the + MyISAM file. With enough free memory MyISAM is faster. Its only when the OS + doesn't have enough memory to cache entire table that archive turns out + to be any faster. + + Examples between MyISAM (packed) and Archive. + + Table with 76695844 identical rows: + 29680807 a_archive.ARZ + 920350317 a.MYD + + + Table with 8991478 rows (all of Slashdot's comments): + 1922964506 comment_archive.ARZ + 2944970297 comment_text.MYD + + + TODO: + Allow users to set compression level. + Allow adjustable block size. + Implement versioning, should be easy. + Allow for errors, find a way to mark bad rows. + Add optional feature so that rows can be flushed at interval (which will cause less + compression but may speed up ordered searches). + Checkpoint the meta file to allow for faster rebuilds. + Option to allow for dirty reads, this would lower the sync calls, which would make + inserts a lot faster, but would mean highly arbitrary reads. + + -Brian + + Archive file format versions: + <5.1.5 - v.1 + 5.1.5-5.1.15 - v.2 + >5.1.15 - v.3 +*/ + +/* The file extension */ +#define ARZ ".ARZ" // The data file +#define ARN ".ARN" // Files used during an optimize call +#define ARM ".ARM" // Meta file (deprecated) + +/* 5.0 compatibility */ +#define META_V1_OFFSET_CHECK_HEADER 0 +#define META_V1_OFFSET_VERSION 1 +#define META_V1_OFFSET_ROWS_RECORDED 2 +#define META_V1_OFFSET_CHECK_POINT 10 +#define META_V1_OFFSET_CRASHED 18 +#define META_V1_LENGTH 19 + +/* + uchar + uchar +*/ +#define DATA_BUFFER_SIZE 2 // Size of the data used in the data file +#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption + +#ifdef HAVE_PSI_INTERFACE +extern "C" PSI_file_key arch_key_file_data; +#endif + +/* Static declarations for handerton */ +static handler *archive_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root); +int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share); + +/* + Number of rows that will force a bulk insert. +*/ +#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2 + +/* + Size of header used for row +*/ +#define ARCHIVE_ROW_HEADER_SIZE 4 + +static handler *archive_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_archive(hton, table); +} + +#ifdef HAVE_PSI_INTERFACE +PSI_mutex_key az_key_mutex_Archive_share_mutex; + +static PSI_mutex_info all_archive_mutexes[]= +{ + { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0} +}; + +PSI_file_key arch_key_file_metadata, arch_key_file_data; +static PSI_file_info all_archive_files[]= +{ + { &arch_key_file_metadata, "metadata", 0}, + { &arch_key_file_data, "data", 0} +}; + +static void init_archive_psi_keys(void) +{ + const char* category= "archive"; + int count; + + if (!PSI_server) + return; + + count= array_elements(all_archive_mutexes); + mysql_mutex_register(category, all_archive_mutexes, count); + + count= array_elements(all_archive_files); + mysql_file_register(category, all_archive_files, count); +} + +#endif /* HAVE_PSI_INTERFACE */ + +/* + Initialize the archive handler. + + SYNOPSIS + archive_db_init() + void * + + RETURN + FALSE OK + TRUE Error +*/ + +/* + We just implement one additional file extension. + ARM is here just to properly drop 5.0 tables. +*/ +static const char *ha_archive_exts[] = { + ARZ, + ARM, + NullS +}; + +int archive_db_init(void *p) +{ + DBUG_ENTER("archive_db_init"); + handlerton *archive_hton; + +#ifdef HAVE_PSI_INTERFACE + init_archive_psi_keys(); +#endif + + archive_hton= (handlerton *)p; + archive_hton->db_type= DB_TYPE_ARCHIVE_DB; + archive_hton->create= archive_create_handler; + archive_hton->flags= HTON_NO_FLAGS; + archive_hton->discover_table= archive_discover; + archive_hton->tablefile_extensions= ha_archive_exts; + + DBUG_RETURN(0); +} + + +Archive_share::Archive_share() +{ + crashed= false; + in_optimize= false; + archive_write_open= false; + dirty= false; + DBUG_PRINT("ha_archive", ("Archive_share: %p", + this)); + thr_lock_init(&lock); + /* + We will use this lock for rows. + */ + mysql_mutex_init(az_key_mutex_Archive_share_mutex, + &mutex, MY_MUTEX_INIT_FAST); +} + + +Archive_share::~Archive_share() +{ + DBUG_PRINT("ha_archive", ("~Archive_share: %p", this)); + if (archive_write_open) + { + mysql_mutex_lock(&mutex); + (void) close_archive_writer(); // Will reset archive_write_open + mysql_mutex_unlock(&mutex); + } + thr_lock_delete(&lock); + mysql_mutex_destroy(&mutex); +} + + +ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg) + :handler(hton, table_arg), delayed_insert(0), bulk_insert(0) +{ + /* Set our original buffer from pre-allocated memory */ + buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info); + + /* The size of the offset value we will use for position() */ + ref_length= sizeof(my_off_t); + archive_reader_open= FALSE; +} + +int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share) +{ + DBUG_ENTER("archive_discover"); + DBUG_PRINT("archive_discover", ("db: '%s' name: '%s'", share->db.str, + share->table_name.str)); + azio_stream frm_stream; + char az_file[FN_REFLEN]; + uchar *frm_ptr; + MY_STAT file_stat; + + strxmov(az_file, share->normalized_path.str, ARZ, NullS); + + if (!(mysql_file_stat(/* arch_key_file_data */ 0, az_file, &file_stat, MYF(0)))) + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + + if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY))) + { + if (errno == EROFS || errno == EACCES) + DBUG_RETURN(my_errno= errno); + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + if (frm_stream.frm_length == 0) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + frm_ptr= (uchar *)my_malloc(PSI_INSTRUMENT_ME, frm_stream.frm_length, + MYF(MY_THREAD_SPECIFIC | MY_WME)); + if (!frm_ptr) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + if (azread_frm(&frm_stream, frm_ptr)) + goto ret; + + azclose(&frm_stream); + + my_errno= share->init_from_binary_frm_image(thd, 1, + frm_ptr, frm_stream.frm_length); +ret: + my_free(frm_ptr); + DBUG_RETURN(my_errno); +} + +/** + @brief Read version 1 meta file (5.0 compatibility routine). + + @return Completion status + @retval 0 Success + @retval !0 Failure +*/ + +int Archive_share::read_v1_metafile() +{ + char file_name[FN_REFLEN]; + uchar buf[META_V1_LENGTH]; + File fd; + DBUG_ENTER("Archive_share::read_v1_metafile"); + + fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT); + if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1) + DBUG_RETURN(-1); + + if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf)) + { + mysql_file_close(fd, MYF(0)); + DBUG_RETURN(-1); + } + + rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED); + crashed= buf[META_V1_OFFSET_CRASHED]; + mysql_file_close(fd, MYF(0)); + DBUG_RETURN(0); +} + + +/** + @brief Write version 1 meta file (5.0 compatibility routine). + + @return Completion status + @retval 0 Success + @retval !0 Failure +*/ + +int Archive_share::write_v1_metafile() +{ + char file_name[FN_REFLEN]; + uchar buf[META_V1_LENGTH]; + File fd; + DBUG_ENTER("Archive_share::write_v1_metafile"); + + buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER; + buf[META_V1_OFFSET_VERSION]= 1; + int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded); + int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0); + buf[META_V1_OFFSET_CRASHED]= crashed; + + fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT); + if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1) + DBUG_RETURN(-1); + + if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf)) + { + mysql_file_close(fd, MYF(0)); + DBUG_RETURN(-1); + } + + mysql_file_close(fd, MYF(0)); + DBUG_RETURN(0); +} + +/** + @brief Pack version 1 row (5.0 compatibility routine). + + @param[in] record the record to pack + + @return Length of packed row +*/ + +unsigned int ha_archive::pack_row_v1(const uchar *record) +{ + uint *blob, *end; + uchar *pos; + DBUG_ENTER("pack_row_v1"); + memcpy(record_buffer->buffer, record, table->s->reclength); + + /* + The end of VARCHAR fields are filled with garbage,so here + we explicitly set the end of the VARCHAR fields with zeroes + */ + + for (Field** field= table->field; (*field) ; field++) + { + Field *fld= *field; + if (fld->type() == MYSQL_TYPE_VARCHAR) + { + if (!(fld->is_real_null(record - table->record[0]))) + { + ptrdiff_t start= (fld->ptr - table->record[0]); + Field_varstring *const field_var= (Field_varstring *)fld; + uint offset= field_var->data_length() + field_var->length_size(); + memset(record_buffer->buffer + start + offset, 0, + fld->field_length - offset + 1); + } + } + } + pos= record_buffer->buffer + table->s->reclength; + for (blob= table->s->blob_field, end= blob + table->s->blob_fields; + blob != end; blob++) + { + uint32 length= ((Field_blob *) table->field[*blob])->get_length(); + if (length) + { + uchar *data_ptr= ((Field_blob *) table->field[*blob])->get_ptr(); + memcpy(pos, data_ptr, length); + pos+= length; + } + } + DBUG_RETURN((int)(pos - record_buffer->buffer)); +} + +/* + This method reads the header of a datafile and returns whether or not it was successful. +*/ +int ha_archive::read_data_header(azio_stream *file_to_read) +{ + int error; + unsigned long ret; + uchar data_buffer[DATA_BUFFER_SIZE]; + DBUG_ENTER("ha_archive::read_data_header"); + + if (azrewind(file_to_read) == -1) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + if (file_to_read->version >= 3) + DBUG_RETURN(0); + /* Everything below this is just legacy to version 2< */ + + DBUG_PRINT("ha_archive", ("Reading legacy data header")); + + ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error); + + if (ret != DATA_BUFFER_SIZE) + { + DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu", + DATA_BUFFER_SIZE, ret)); + DBUG_RETURN(1); + } + + if (error) + { + DBUG_PRINT("ha_archive", ("Compression error (%d)", error)); + DBUG_RETURN(1); + } + + DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0])); + DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1])); + + if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) && + (data_buffer[1] == 1 || data_buffer[1] == 2)) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + DBUG_RETURN(0); +} + + +/* + We create the shared memory space that we will use for the open table. + No matter what we try to get or create a share. This is so that a repair + table operation can occur. + + See ha_example.cc for a longer description. +*/ +Archive_share *ha_archive::get_share(const char *table_name, int *rc) +{ + Archive_share *tmp_share; + + DBUG_ENTER("ha_archive::get_share"); + + lock_shared_ha_data(); + if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr()))) + { + azio_stream archive_tmp; + + tmp_share= new Archive_share; + + if (!tmp_share) + { + *rc= HA_ERR_OUT_OF_MEM; + goto err; + } + DBUG_PRINT("ha_archive", ("new Archive_share: %p", + tmp_share)); + + fn_format(tmp_share->data_file_name, table_name, "", + ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME); + strmov(tmp_share->table_name, table_name); + DBUG_PRINT("ha_archive", ("Data File %s", + tmp_share->data_file_name)); + + /* + We read the meta file, but do not mark it dirty. Since we are not + doing a write we won't mark it dirty (and we won't open it for + anything but reading... open it for write and we will generate null + compression writes). + */ + if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY))) + { + delete tmp_share; + *rc= my_errno ? my_errno : HA_ERR_CRASHED; + tmp_share= NULL; + goto err; + } + stats.auto_increment_value= archive_tmp.auto_increment + 1; + tmp_share->rows_recorded= (ha_rows)archive_tmp.rows; + tmp_share->crashed= archive_tmp.dirty; + share= tmp_share; + if (archive_tmp.version == 1) + share->read_v1_metafile(); + else if (frm_compare(&archive_tmp)) + *rc= HA_ERR_TABLE_DEF_CHANGED; + + azclose(&archive_tmp); + + set_ha_share_ptr(static_cast<Handler_share*>(tmp_share)); + } + if (tmp_share->crashed) + *rc= HA_ERR_CRASHED_ON_USAGE; +err: + unlock_shared_ha_data(); + + DBUG_ASSERT(tmp_share || *rc); + + DBUG_RETURN(tmp_share); +} + + +int Archive_share::init_archive_writer() +{ + DBUG_ENTER("Archive_share::init_archive_writer"); + /* + It is expensive to open and close the data files and since you can't have + a gzip file that can be both read and written we keep a writer open + that is shared amoung all open tables. + */ + if (!(azopen(&archive_write, data_file_name, + O_RDWR|O_BINARY))) + { + DBUG_PRINT("ha_archive", ("Could not open archive write file")); + crashed= true; + DBUG_RETURN(1); + } + archive_write_open= true; + + DBUG_RETURN(0); +} + + +void Archive_share::close_archive_writer() +{ + mysql_mutex_assert_owner(&mutex); + if (archive_write_open) + { + if (archive_write.version == 1) + (void) write_v1_metafile(); + azclose(&archive_write); + archive_write_open= false; + dirty= false; + } +} + + +/* + No locks are required because it is associated with just one handler instance +*/ +int ha_archive::init_archive_reader() +{ + DBUG_ENTER("ha_archive::init_archive_reader"); + /* + It is expensive to open and close the data files and since you can't have + a gzip file that can be both read and written we keep a writer open + that is shared amoung all open tables, but have one reader open for + each handler instance. + */ + if (!archive_reader_open) + { + if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY))) + { + DBUG_PRINT("ha_archive", ("Could not open archive read file")); + share->crashed= TRUE; + DBUG_RETURN(1); + } + archive_reader_open= TRUE; + } + + DBUG_RETURN(0); +} + + +/* + When opening a file we: + Create/get our shared structure. + Init out lock. + We open the file we will read from. +*/ +int ha_archive::open(const char *name, int mode, uint open_options) +{ + int rc= 0; + DBUG_ENTER("ha_archive::open"); + + DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s", + (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no")); + share= get_share(name, &rc); + if (!share) + DBUG_RETURN(rc); + + /* Allow open on crashed table in repair mode only. */ + switch (rc) + { + case 0: + break; + case HA_ERR_TABLE_DEF_CHANGED: + case HA_ERR_CRASHED_ON_USAGE: + if (open_options & HA_OPEN_FOR_REPAIR) + { + rc= 0; + break; + } + /* fall through */ + default: + DBUG_RETURN(rc); + } + + DBUG_ASSERT(share); + + record_buffer= create_record_buffer(table->s->reclength + + ARCHIVE_ROW_HEADER_SIZE); + + if (!record_buffer) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + thr_lock_data_init(&share->lock, &lock, NULL); + + DBUG_PRINT("ha_archive", ("archive table was crashed %s", + rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no")); + if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR) + { + DBUG_RETURN(0); + } + + DBUG_RETURN(rc); +} + + +/* + Closes the file. + + SYNOPSIS + close(); + + IMPLEMENTATION: + + We first close this storage engines file handle to the archive and + then remove our reference count to the table (and possibly free it + as well). + + RETURN + 0 ok + 1 Error +*/ + +int ha_archive::close(void) +{ + int rc= 0; + DBUG_ENTER("ha_archive::close"); + + destroy_record_buffer(record_buffer); + + /* First close stream */ + if (archive_reader_open) + { + if (azclose(&archive)) + rc= 1; + } + DBUG_RETURN(rc); +} + + +/** + Copy a frm blob between streams. + + @param src The source stream. + @param dst The destination stream. + + @return Zero on success, non-zero otherwise. +*/ + +int ha_archive::frm_copy(azio_stream *src, azio_stream *dst) +{ + int rc= 0; + uchar *frm_ptr; + + if (!src->frm_length) + { + size_t frm_len; + if (!table_share->read_frm_image((const uchar**) &frm_ptr, &frm_len)) + { + azwrite_frm(dst, frm_ptr, frm_len); + table_share->free_frm_image(frm_ptr); + } + return 0; + } + + if (!(frm_ptr= (uchar *) my_malloc(PSI_INSTRUMENT_ME, src->frm_length, + MYF(MY_THREAD_SPECIFIC | MY_WME)))) + return HA_ERR_OUT_OF_MEM; + + /* Write file offset is set to the end of the file. */ + if (azread_frm(src, frm_ptr) || + azwrite_frm(dst, frm_ptr, src->frm_length)) + rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR; + + my_free(frm_ptr); + + return rc; +} + + +/** + Compare frm blob with the on-disk frm file + + @param s The azio stream. + + @return Zero if equal, non-zero otherwise. +*/ + +int ha_archive::frm_compare(azio_stream *s) +{ + if (!s->frmver_length) + return 0; // Old pre-10.0 archive table. Never rediscover. + + LEX_CUSTRING *ver= &table->s->tabledef_version; + return ver->length != s->frmver_length || + memcmp(ver->str, s->frmver, ver->length); +} + + +/* + We create our data file here. The format is pretty simple. + You can read about the format of the data file above. + Unlike other storage engines we do not "pack" our data. Since we + are about to do a general compression, packing would just be a waste of + CPU time. If the table has blobs they are written after the row in the order + of creation. +*/ + +int ha_archive::create(const char *name, TABLE *table_arg, + HA_CREATE_INFO *create_info) +{ + char name_buff[FN_REFLEN]; + char linkname[FN_REFLEN]; + int error; + azio_stream create_stream; /* Archive file we are working with */ + const uchar *frm_ptr; + size_t frm_len; + + DBUG_ENTER("ha_archive::create"); + + stats.auto_increment_value= create_info->auto_increment_value; + + for (uint key= 0; key < table_arg->s->keys; key++) + { + KEY *pos= table_arg->key_info+key; + KEY_PART_INFO *key_part= pos->key_part; + KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts; + + for (; key_part != key_part_end; key_part++) + { + Field *field= key_part->field; + + if (!(field->flags & AUTO_INCREMENT_FLAG) || + key_part->key_part_flag & HA_REVERSE_SORT) + + { + error= HA_WRONG_CREATE_OPTION; + DBUG_PRINT("ha_archive", ("Index error in creating archive table")); + goto error; + } + } + } + + /* + We reuse name_buff since it is available. + */ +#ifdef HAVE_READLINK + if (my_use_symdir && + create_info->data_file_name && + create_info->data_file_name[0] != '#') + { + DBUG_PRINT("ha_archive", ("archive will create stream file %s", + create_info->data_file_name)); + + fn_format(name_buff, create_info->data_file_name, "", ARZ, + MY_REPLACE_EXT | MY_UNPACK_FILENAME); + fn_format(linkname, name, "", ARZ, + MY_REPLACE_EXT | MY_UNPACK_FILENAME); + } + else +#endif /* HAVE_READLINK */ + { + if (create_info->data_file_name) + my_error(WARN_OPTION_IGNORED, MYF(ME_WARNING), "DATA DIRECTORY"); + + fn_format(name_buff, name, "", ARZ, + MY_REPLACE_EXT | MY_UNPACK_FILENAME); + linkname[0]= 0; + } + + /* Archive engine never uses INDEX DIRECTORY. */ + if (create_info->index_file_name) + my_error(WARN_OPTION_IGNORED, MYF(ME_WARNING), "INDEX DIRECTORY"); + + /* + There is a chance that the file was "discovered". In this case + just use whatever file is there. + */ + my_errno= 0; + if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY))) + { + error= errno; + goto error2; + } + + if (linkname[0]) + my_symlink(name_buff, linkname, MYF(0)); + + /* + Here is where we open up the frm and pass it to archive to store + */ + if (!table_arg->s->read_frm_image(&frm_ptr, &frm_len)) + { + azwrite_frm(&create_stream, frm_ptr, frm_len); + table_arg->s->free_frm_image(frm_ptr); + } + + if (create_info->comment.str) + azwrite_comment(&create_stream, create_info->comment.str, + create_info->comment.length); + + /* + Yes you need to do this, because the starting value + for the autoincrement may not be zero. + */ + create_stream.auto_increment= stats.auto_increment_value ? + stats.auto_increment_value - 1 : 0; + if (azclose(&create_stream)) + { + error= errno; + goto error2; + } + + DBUG_PRINT("ha_archive", ("Creating File %s", name_buff)); + DBUG_PRINT("ha_archive", ("Creating Link %s", linkname)); + + + DBUG_RETURN(0); + +error2: + delete_table(name); +error: + /* Return error number, if we got one */ + DBUG_RETURN(error ? error : -1); +} + +/* + This is where the actual row is written out. +*/ +int ha_archive::real_write_row(const uchar *buf, azio_stream *writer) +{ + my_off_t written; + unsigned int r_pack_length; + DBUG_ENTER("ha_archive::real_write_row"); + + /* We pack the row for writing */ + r_pack_length= pack_row(buf, writer); + + written= azwrite(writer, record_buffer->buffer, r_pack_length); + if (written != r_pack_length) + { + DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d", + (uint32) written, + (uint32)r_pack_length)); + DBUG_RETURN(-1); + } + + if (!delayed_insert || !bulk_insert) + share->dirty= TRUE; + + DBUG_RETURN(0); +} + + +/* + Calculate max length needed for row. This includes + the bytes required for the length in the header. +*/ + +uint32 ha_archive::max_row_length(const uchar *record) +{ + uint32 length= (uint32)(table->s->reclength + table->s->fields*2); + length+= ARCHIVE_ROW_HEADER_SIZE; + my_ptrdiff_t const rec_offset= record - table->record[0]; + + uint *ptr, *end; + for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; + ptr != end ; + ptr++) + { + if (!table->field[*ptr]->is_null(rec_offset)) + length += 2 + ((Field_blob*)table->field[*ptr])->get_length(rec_offset); + } + + return length; +} + + +unsigned int ha_archive::pack_row(const uchar *record, azio_stream *writer) +{ + uchar *ptr; + my_ptrdiff_t const rec_offset= record - table->record[0]; + DBUG_ENTER("ha_archive::pack_row"); + + if (fix_rec_buff(max_row_length(record))) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */ + + if (writer->version == 1) + DBUG_RETURN(pack_row_v1(record)); + + /* Copy null bits */ + memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE, + record, table->s->null_bytes); + ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE; + + for (Field **field=table->field ; *field ; field++) + { + if (!((*field)->is_null(rec_offset))) + ptr= (*field)->pack(ptr, record + (*field)->offset(record)); + } + + int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer - + ARCHIVE_ROW_HEADER_SIZE)); + DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int) + (ptr - record_buffer->buffer - + ARCHIVE_ROW_HEADER_SIZE))); + + DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer)); +} + + +/* + Look at ha_archive::open() for an explanation of the row format. + Here we just write out the row. + + Wondering about start_bulk_insert()? We don't implement it for + archive since it optimizes for lots of writes. The only save + for implementing start_bulk_insert() is that we could skip + setting dirty to true each time. +*/ +int ha_archive::write_row(const uchar *buf) +{ + int rc; + uchar *read_buf= NULL; + ulonglong temp_auto; + uchar *record= table->record[0]; + DBUG_ENTER("ha_archive::write_row"); + + if (share->crashed) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + mysql_mutex_lock(&share->mutex); + + if (!share->archive_write_open && share->init_archive_writer()) + { + rc= errno; + goto error; + } + + if (table->next_number_field && record == table->record[0]) + { + KEY *mkey= &table->key_info[0]; // We only support one key right now + update_auto_increment(); + temp_auto= table->next_number_field->val_int(); + + /* + We don't support decremening auto_increment. They make the performance + just cry. + */ + if (temp_auto <= share->archive_write.auto_increment && + mkey->flags & HA_NOSAME) + { + rc= HA_ERR_FOUND_DUPP_KEY; + goto error; + } +#ifdef DEAD_CODE + /* + Bad news, this will cause a search for the unique value which is very + expensive since we will have to do a table scan which will lock up + all other writers during this period. This could perhaps be optimized + in the future. + */ + { + /* + First we create a buffer that we can use for reading rows, and can pass + to get_row(). + */ + if (!(read_buf= (uchar*) my_malloc(table->s->reclength, + MYF(MY_THREAD_SPECIFIC | MY_WME)))) + { + rc= HA_ERR_OUT_OF_MEM; + goto error; + } + /* + All of the buffer must be written out or we won't see all of the + data + */ + azflush(&(share->archive_write), Z_SYNC_FLUSH); + /* + Set the position of the local read thread to the beginning position. + */ + if (read_data_header(&archive)) + { + rc= HA_ERR_CRASHED_ON_USAGE; + goto error; + } + + Field *mfield= table->next_number_field; + + while (!(get_row(&archive, read_buf))) + { + if (!memcmp(read_buf + mfield->offset(record), + table->next_number_field->ptr, + mfield->max_display_length())) + { + rc= HA_ERR_FOUND_DUPP_KEY; + goto error; + } + } + } +#endif + else + { + if (temp_auto > share->archive_write.auto_increment) + stats.auto_increment_value= + (share->archive_write.auto_increment= temp_auto) + 1; + } + } + + /* + Notice that the global auto_increment has been increased. + In case of a failed row write, we will never try to reuse the value. + */ + share->rows_recorded++; + rc= real_write_row(buf, &(share->archive_write)); +error: + mysql_mutex_unlock(&share->mutex); + my_free(read_buf); + DBUG_RETURN(rc); +} + + +void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment, + ulonglong nb_desired_values, + ulonglong *first_value, + ulonglong *nb_reserved_values) +{ + *nb_reserved_values= ULONGLONG_MAX; + *first_value= share->archive_write.auto_increment + 1; +} + +/* Initialized at each key walk (called multiple times unlike rnd_init()) */ +int ha_archive::index_init(uint keynr, bool sorted) +{ + DBUG_ENTER("ha_archive::index_init"); + active_index= keynr; + DBUG_RETURN(0); +} + + +/* + No indexes, so if we get a request for an index search since we tell + the optimizer that we have unique indexes, we scan +*/ +int ha_archive::index_read(uchar *buf, const uchar *key, + uint key_len, enum ha_rkey_function find_flag) +{ + int rc; + DBUG_ENTER("ha_archive::index_read"); + rc= index_read_idx(buf, active_index, key, key_len, find_flag); + DBUG_RETURN(rc); +} + + +int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key, + uint key_len, enum ha_rkey_function find_flag) +{ + int rc; + bool found= 0; + KEY *mkey= &table->key_info[index]; + current_k_offset= mkey->key_part->offset; + current_key= key; + current_key_len= key_len; + + + DBUG_ENTER("ha_archive::index_read_idx"); + + rc= rnd_init(TRUE); + + if (rc) + goto error; + + while (!(get_row(&archive, buf))) + { + if (!memcmp(current_key, buf + current_k_offset, current_key_len)) + { + found= 1; + break; + } + } + + if (found) + { + /* notify handler that a record has been found */ + table->status= 0; + DBUG_RETURN(0); + } + +error: + DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE); +} + + +int ha_archive::index_next(uchar * buf) +{ + bool found= 0; + int rc; + + DBUG_ENTER("ha_archive::index_next"); + + while (!(get_row(&archive, buf))) + { + if (!memcmp(current_key, buf+current_k_offset, current_key_len)) + { + found= 1; + break; + } + } + + rc= found ? 0 : HA_ERR_END_OF_FILE; + DBUG_RETURN(rc); +} + +/* + All calls that need to scan the table start with this method. If we are told + that it is a table scan we rewind the file to the beginning, otherwise + we assume the position will be set. +*/ + +int ha_archive::rnd_init(bool scan) +{ + DBUG_ENTER("ha_archive::rnd_init"); + + if (share->crashed) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + if (init_archive_reader()) + DBUG_RETURN(errno); + + /* We rewind the file so that we can read from the beginning if scan */ + if (scan) + { + scan_rows= stats.records; + DBUG_PRINT("info", ("archive will retrieve %llu rows", + (unsigned long long) scan_rows)); + + if (read_data_header(&archive)) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + DBUG_RETURN(0); +} + + +/* + This is the method that is used to read a row. It assumes that the row is + positioned where you want it. +*/ +int ha_archive::get_row(azio_stream *file_to_read, uchar *buf) +{ + int rc; + DBUG_ENTER("ha_archive::get_row"); + DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d", + (uchar)file_to_read->version, + ARCHIVE_VERSION)); + if (file_to_read->version == ARCHIVE_VERSION) + rc= get_row_version3(file_to_read, buf); + else + rc= get_row_version2(file_to_read, buf); + + DBUG_PRINT("ha_archive", ("Return %d\n", rc)); + + DBUG_RETURN(rc); +} + +/* Reallocate buffer if needed */ +bool ha_archive::fix_rec_buff(unsigned int length) +{ + DBUG_ENTER("ha_archive::fix_rec_buff"); + DBUG_PRINT("ha_archive", ("Fixing %u for %u", + length, record_buffer->length)); + DBUG_ASSERT(record_buffer->buffer); + + if (length > record_buffer->length) + { + uchar *newptr; + if (!(newptr=(uchar*) my_realloc(PSI_INSTRUMENT_ME, + (uchar*) record_buffer->buffer, length, + MYF(MY_ALLOW_ZERO_PTR)))) + DBUG_RETURN(1); + record_buffer->buffer= newptr; + record_buffer->length= length; + } + + DBUG_ASSERT(length <= record_buffer->length); + + DBUG_RETURN(0); +} + +int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record) +{ + DBUG_ENTER("ha_archive::unpack_row"); + + unsigned int read; + int error; + uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE]; + unsigned int row_len; + + /* First we grab the length stored */ + read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error); + + if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE)) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + /* If we read nothing we are at the end of the file */ + if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + row_len= uint4korr(size_buffer); + DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len, + (unsigned int)table->s->reclength)); + + if (fix_rec_buff(row_len)) + { + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + } + DBUG_ASSERT(row_len <= record_buffer->length); + + read= azread(file_to_read, record_buffer->buffer, row_len, &error); + + if (read != row_len || error) + { + DBUG_RETURN(error ? HA_ERR_CRASHED_ON_USAGE : HA_ERR_WRONG_IN_RECORD); + } + + /* Copy null bits */ + const uchar *ptr= record_buffer->buffer, *end= ptr+ row_len; + memcpy(record, ptr, table->s->null_bytes); + ptr+= table->s->null_bytes; + if (ptr > end) + DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); + for (Field **field=table->field ; *field ; field++) + { + if (!((*field)->is_null_in_record(record))) + { + if (!(ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), + ptr, end))) + DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); + } + } + if (ptr != end) + DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); + DBUG_RETURN(0); +} + + +int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf) +{ + DBUG_ENTER("ha_archive::get_row_version3"); + + int returnable= unpack_row(file_to_read, buf); + + DBUG_RETURN(returnable); +} + + +int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf) +{ + unsigned int read; + int error; + uint *ptr, *end; + char *last; + size_t total_blob_length= 0; + MY_BITMAP *read_set= table->read_set; + DBUG_ENTER("ha_archive::get_row_version2"); + + read= azread(file_to_read, (voidp)buf, table->s->reclength, &error); + + /* If we read nothing we are at the end of the file */ + if (read == 0) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + if (read != table->s->reclength) + { + DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u", + read, + (unsigned int)table->s->reclength)); + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR ) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + /* + If the record is the wrong size, the file is probably damaged, unless + we are dealing with a delayed insert or a bulk insert. + */ + if ((ulong) read != table->s->reclength) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + /* Calculate blob length, we use this for our buffer */ + for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; + ptr != end ; + ptr++) + { + if (bitmap_is_set(read_set, + (((Field_blob*) table->field[*ptr])->field_index))) + total_blob_length += ((Field_blob*) table->field[*ptr])->get_length(); + } + + /* Adjust our row buffer if we need be */ + buffer.alloc(total_blob_length); + last= (char *)buffer.ptr(); + + /* Loop through our blobs and read them */ + for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; + ptr != end ; + ptr++) + { + size_t size= ((Field_blob*) table->field[*ptr])->get_length(); + if (size) + { + if (bitmap_is_set(read_set, + ((Field_blob*) table->field[*ptr])->field_index)) + { + read= azread(file_to_read, last, size, &error); + + if (error) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + if ((size_t) read != size) + DBUG_RETURN(HA_ERR_END_OF_FILE); + ((Field_blob*) table->field[*ptr])->set_ptr(read, (uchar*) last); + last += size; + } + else + { + (void)azseek(file_to_read, size, SEEK_CUR); + } + } + } + DBUG_RETURN(0); +} + + +/* + Called during ORDER BY. Its position is either from being called sequentially + or by having had ha_archive::rnd_pos() called before it is called. +*/ + +int ha_archive::rnd_next(uchar *buf) +{ + int rc; + DBUG_ENTER("ha_archive::rnd_next"); + + if (share->crashed) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + if (!scan_rows) + { + rc= HA_ERR_END_OF_FILE; + goto end; + } + scan_rows--; + + current_position= aztell(&archive); + rc= get_row(&archive, buf); + +end: + DBUG_RETURN(rc); +} + + +/* + Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after + each call to ha_archive::rnd_next() if an ordering of the rows is + needed. +*/ + +void ha_archive::position(const uchar *record) +{ + DBUG_ENTER("ha_archive::position"); + my_store_ptr(ref, ref_length, current_position); + DBUG_VOID_RETURN; +} + + +/* + This is called after a table scan for each row if the results of the + scan need to be ordered. It will take *pos and use it to move the + cursor in the file so that the next row that is called is the + correctly ordered row. +*/ + +int ha_archive::rnd_pos(uchar * buf, uchar *pos) +{ + int rc; + DBUG_ENTER("ha_archive::rnd_pos"); + current_position= (my_off_t)my_get_ptr(pos, ref_length); + if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L)) + { + rc= HA_ERR_CRASHED_ON_USAGE; + goto end; + } + rc= get_row(&archive, buf); +end: + DBUG_RETURN(rc); +} + + +/** + @brief Check for upgrade + + @param[in] check_opt check options + + @return Completion status + @retval HA_ADMIN_OK No upgrade required + @retval HA_ADMIN_CORRUPT Cannot read meta-data + @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required +*/ + +int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt) +{ + DBUG_ENTER("ha_archive::check_for_upgrade"); + if (init_archive_reader()) + DBUG_RETURN(HA_ADMIN_CORRUPT); + if (archive.version < ARCHIVE_VERSION) + DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE); + DBUG_RETURN(HA_ADMIN_OK); +} + + +/* + This method repairs the meta file. It does this by walking the datafile and + rewriting the meta file. If EXTENDED repair is requested, we attempt to + recover as much data as possible. +*/ +int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt) +{ + DBUG_ENTER("ha_archive::repair"); + int rc= optimize(thd, check_opt); + + if (rc) + DBUG_RETURN(HA_ADMIN_CORRUPT); + + share->crashed= FALSE; + DBUG_RETURN(0); +} + +/* + The table can become fragmented if data was inserted, read, and then + inserted again. What we do is open up the file and recompress it completely. +*/ +int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) +{ + int rc= 0; + azio_stream writer; + char writer_filename[FN_REFLEN]; + DBUG_ENTER("ha_archive::optimize"); + + mysql_mutex_lock(&share->mutex); + + if (init_archive_reader()) + { + mysql_mutex_unlock(&share->mutex); + DBUG_RETURN(errno); + } + + // now we close both our writer and our reader for the rename + if (share->archive_write_open) + { + azclose(&(share->archive_write)); + share->archive_write_open= FALSE; + } + + /* Lets create a file to contain the new data */ + fn_format(writer_filename, share->table_name, "", ARN, + MY_REPLACE_EXT | MY_UNPACK_FILENAME); + + if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY))) + { + mysql_mutex_unlock(&share->mutex); + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + /* + Transfer the embedded FRM so that the file can be discoverable. + Write file offset is set to the end of the file. + */ + if ((rc= frm_copy(&archive, &writer))) + goto error; + + /* + An extended rebuild is a lot more effort. We open up each row and re-record it. + Any dead rows are removed (aka rows that may have been partially recorded). + + As of Archive format 3, this is the only type that is performed, before this + version it was just done on T_EXTEND + */ + if (1) + { + DBUG_PRINT("ha_archive", ("archive extended rebuild")); + + /* + Now we will rewind the archive file so that we are positioned at the + start of the file. + */ + rc= read_data_header(&archive); + + /* + On success of writing out the new header, we now fetch each row and + insert it into the new archive file. + */ + if (!rc) + { + share->rows_recorded= 0; + stats.auto_increment_value= 1; + share->archive_write.auto_increment= 0; + MY_BITMAP *org_bitmap= tmp_use_all_columns(table, &table->read_set); + + while (!(rc= get_row(&archive, table->record[0]))) + { + real_write_row(table->record[0], &writer); + /* + Long term it should be possible to optimize this so that + it is not called on each row. + */ + if (table->found_next_number_field) + { + Field *field= table->found_next_number_field; + ulonglong auto_value= + (ulonglong) field->val_int(table->record[0] + + field->offset(table->record[0])); + if (share->archive_write.auto_increment < auto_value) + stats.auto_increment_value= + (share->archive_write.auto_increment= auto_value) + 1; + } + } + + tmp_restore_column_map(&table->read_set, org_bitmap); + share->rows_recorded= (ha_rows)writer.rows; + } + + DBUG_PRINT("info", ("recovered %llu archive rows", + (unsigned long long)share->rows_recorded)); + + DBUG_PRINT("ha_archive", ("recovered %llu archive rows", + (unsigned long long)share->rows_recorded)); + + /* + If REPAIR ... EXTENDED is requested, try to recover as much data + from data file as possible. In this case if we failed to read a + record, we assume EOF. This allows massive data loss, but we can + hardly do more with broken zlib stream. And this is the only way + to restore at least what is still recoverable. + */ + if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND)) + goto error; + } + + azclose(&writer); + share->dirty= FALSE; + + azclose(&archive); + + // make the file we just wrote be our data file + rc= my_rename(writer_filename, share->data_file_name, MYF(0)); + + + mysql_mutex_unlock(&share->mutex); + DBUG_RETURN(rc); +error: + DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc)); + azclose(&writer); + mysql_mutex_unlock(&share->mutex); + + DBUG_RETURN(rc); +} + +/* + Below is an example of how to setup row level locking. +*/ +THR_LOCK_DATA **ha_archive::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + if (lock_type == TL_WRITE_DELAYED) + delayed_insert= TRUE; + else + delayed_insert= FALSE; + + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) + { + /* + Here is where we get into the guts of a row level lock. + If TL_UNLOCK is set + If we are not doing a LOCK TABLE, DELAYED LOCK or DISCARD/IMPORT + TABLESPACE, then allow multiple writers + */ + + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE) && delayed_insert == FALSE && + !thd_in_lock_tables(thd) + && !thd_tablespace_op(thd)) + lock_type = TL_WRITE_ALLOW_WRITE; + + /* + In queries of type INSERT INTO t1 SELECT ... FROM t2 ... + MySQL would use the lock TL_READ_NO_INSERT on t2, and that + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts + to t2. Convert the lock to a normal read lock to allow + concurrent inserts to t2. + */ + + if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) + lock_type = TL_READ; + + lock.type=lock_type; + } + + *to++= &lock; + + return to; +} + +void ha_archive::update_create_info(HA_CREATE_INFO *create_info) +{ + char tmp_real_path[FN_REFLEN]; + DBUG_ENTER("ha_archive::update_create_info"); + + ha_archive::info(HA_STATUS_AUTO); + if (!(create_info->used_fields & HA_CREATE_USED_AUTO)) + { + create_info->auto_increment_value= stats.auto_increment_value; + } + + if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0)))) + create_info->data_file_name= thd_strdup(ha_thd(), tmp_real_path); + + DBUG_VOID_RETURN; +} + +/* + Hints for optimizer, see ha_tina for more information +*/ +int ha_archive::info(uint flag) +{ + DBUG_ENTER("ha_archive::info"); + + flush_and_clear_pending_writes(); + stats.deleted= 0; + + DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records)); + /* Costs quite a bit more to get all information */ + if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE)) + { + MY_STAT file_stat; // Stat information for the data file + + (void) mysql_file_stat(/* arch_key_file_data */ 0, share->data_file_name, &file_stat, MYF(MY_WME)); + + if (flag & HA_STATUS_TIME) + stats.update_time= (ulong) file_stat.st_mtime; + if (flag & HA_STATUS_CONST) + { + stats.max_data_file_length= MAX_FILE_SIZE; + stats.create_time= (ulong) file_stat.st_ctime; + } + if (flag & HA_STATUS_VARIABLE) + { + stats.delete_length= 0; + stats.data_file_length= file_stat.st_size; + stats.index_file_length=0; + stats.mean_rec_length= stats.records ? + ulong(stats.data_file_length / stats.records) : table->s->reclength; + } + } + + if (flag & HA_STATUS_AUTO) + { + if (init_archive_reader()) + DBUG_RETURN(errno); + + mysql_mutex_lock(&share->mutex); + azflush(&archive, Z_SYNC_FLUSH); + mysql_mutex_unlock(&share->mutex); + stats.auto_increment_value= archive.auto_increment + 1; + } + + DBUG_RETURN(0); +} + + +int ha_archive::external_lock(THD *thd, int lock_type) +{ + if (lock_type == F_RDLCK) + { + // We are going to read from the table. Flush any pending writes that we + // may have + flush_and_clear_pending_writes(); + } + return 0; +} + + +void ha_archive::flush_and_clear_pending_writes() +{ + mysql_mutex_lock(&share->mutex); + if (share->dirty) + { + DBUG_PRINT("ha_archive", ("archive flushing out rows for scan")); + DBUG_ASSERT(share->archive_write_open); + azflush(&(share->archive_write), Z_SYNC_FLUSH); + share->dirty= FALSE; + } + + /* + This should be an accurate number now, though bulk and delayed inserts can + cause the number to be inaccurate. + */ + stats.records= share->rows_recorded; + mysql_mutex_unlock(&share->mutex); +} + + +int ha_archive::extra(enum ha_extra_function operation) +{ + switch (operation) { + case HA_EXTRA_FLUSH: + mysql_mutex_lock(&share->mutex); + share->close_archive_writer(); + mysql_mutex_unlock(&share->mutex); + break; + default: + break; + } + return 0; +} + +/* + This method tells us that a bulk insert operation is about to occur. We set + a flag which will keep write_row from saying that its data is dirty. This in + turn will keep selects from causing a sync to occur. + Basically, yet another optimizations to keep compression working well. +*/ +void ha_archive::start_bulk_insert(ha_rows rows, uint flags) +{ + DBUG_ENTER("ha_archive::start_bulk_insert"); + if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT) + bulk_insert= TRUE; + DBUG_VOID_RETURN; +} + + +/* + Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert + flag, and set the share dirty so that the next select will call sync for us. +*/ +int ha_archive::end_bulk_insert() +{ + DBUG_ENTER("ha_archive::end_bulk_insert"); + bulk_insert= FALSE; + mysql_mutex_lock(&share->mutex); + if (share->archive_write_open) + share->dirty= true; + mysql_mutex_unlock(&share->mutex); + DBUG_RETURN(0); +} + +/* + We cancel a truncate command. The only way to delete an archive table is to drop it. + This is done for security reasons. In a later version we will enable this by + allowing the user to select a different row format. +*/ +int ha_archive::truncate() +{ + DBUG_ENTER("ha_archive::truncate"); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); +} + +/* + We just return state if asked. +*/ +bool ha_archive::is_crashed() const +{ + DBUG_ENTER("ha_archive::is_crashed"); + DBUG_RETURN(share->crashed); +} + +/* + Simple scan of the tables to make sure everything is ok. +*/ + +int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt) +{ + int rc= 0; + const char *old_proc_info; + ha_rows count; + DBUG_ENTER("ha_archive::check"); + + old_proc_info= thd_proc_info(thd, "Checking table"); + mysql_mutex_lock(&share->mutex); + count= share->rows_recorded; + /* Flush any waiting data */ + if (share->archive_write_open) + azflush(&(share->archive_write), Z_SYNC_FLUSH); + mysql_mutex_unlock(&share->mutex); + + if (init_archive_reader()) + DBUG_RETURN(HA_ADMIN_CORRUPT); + /* + Now we will rewind the archive file so that we are positioned at the + start of the file. + */ + read_data_header(&archive); + for (ha_rows cur_count= count; cur_count; cur_count--) + { + if ((rc= get_row(&archive, table->record[0]))) + goto error; + } + /* + Now read records that may have been inserted concurrently. + Acquire share->mutex so tail of the table is not modified by + concurrent writers. + */ + mysql_mutex_lock(&share->mutex); + count= share->rows_recorded - count; + if (share->archive_write_open) + azflush(&(share->archive_write), Z_SYNC_FLUSH); + while (!(rc= get_row(&archive, table->record[0]))) + count--; + mysql_mutex_unlock(&share->mutex); + + if ((rc && rc != HA_ERR_END_OF_FILE) || count) + goto error; + + thd_proc_info(thd, old_proc_info); + DBUG_RETURN(HA_ADMIN_OK); + +error: + thd_proc_info(thd, old_proc_info); + share->crashed= FALSE; + DBUG_RETURN(HA_ADMIN_CORRUPT); +} + +/* + Check and repair the table if needed. +*/ +bool ha_archive::check_and_repair(THD *thd) +{ + HA_CHECK_OPT check_opt; + DBUG_ENTER("ha_archive::check_and_repair"); + + check_opt.init(); + + DBUG_RETURN(repair(thd, &check_opt)); +} + +archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) +{ + DBUG_ENTER("ha_archive::create_record_buffer"); + archive_record_buffer *r; + if (!(r= (archive_record_buffer*) my_malloc(PSI_INSTRUMENT_ME, + sizeof(archive_record_buffer), MYF(MY_WME)))) + { + DBUG_RETURN(NULL); /* purecov: inspected */ + } + r->length= (int)length; + + if (!(r->buffer= (uchar*) my_malloc(PSI_INSTRUMENT_ME, r->length, MYF(MY_WME)))) + { + my_free(r); + DBUG_RETURN(NULL); /* purecov: inspected */ + } + + DBUG_RETURN(r); +} + +void ha_archive::destroy_record_buffer(archive_record_buffer *r) +{ + DBUG_ENTER("ha_archive::destroy_record_buffer"); + my_free(r->buffer); + my_free(r); + DBUG_VOID_RETURN; +} + +/* + In archive *any* ALTER should cause a table to be rebuilt, + no ALTER can be frm-only. + Because after any change to the frm file archive must update the + frm image in the ARZ file. And this cannot be done in-place, it + requires ARZ file to be recreated from scratch +*/ +bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info_arg, + uint table_changes) +{ + return COMPATIBLE_DATA_NO; +} + + +struct st_mysql_storage_engine archive_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +maria_declare_plugin(archive) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &archive_storage_engine, + "ARCHIVE", + "Brian Aker, MySQL AB", + "gzip-compresses tables for a low storage footprint", + PLUGIN_LICENSE_GPL, + archive_db_init, /* Plugin Init */ + NULL, /* Plugin Deinit */ + 0x0300 /* 3.0 */, + NULL, /* status variables */ + NULL, /* system variables */ + "1.0", /* string version */ + MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ +} +maria_declare_plugin_end; + diff --git a/storage/archive/ha_archive.h b/storage/archive/ha_archive.h new file mode 100644 index 00000000..2e03ac63 --- /dev/null +++ b/storage/archive/ha_archive.h @@ -0,0 +1,163 @@ +/* Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */ + +#ifdef USE_PRAGMA_INTERFACE +#pragma interface /* gcc class implementation */ +#endif + +#include <zlib.h> +#include "azlib.h" + +/* + Please read ha_archive.cc first. If you are looking for more general + answers on how storage engines work, look at ha_example.cc and + ha_example.h. +*/ + +typedef struct st_archive_record_buffer { + uchar *buffer; + uint32 length; +} archive_record_buffer; + + +class Archive_share : public Handler_share +{ +public: + mysql_mutex_t mutex; + THR_LOCK lock; + azio_stream archive_write; /* Archive file we are working with */ + ha_rows rows_recorded; /* Number of rows in tables */ + char table_name[FN_REFLEN]; + char data_file_name[FN_REFLEN]; + bool in_optimize; + bool archive_write_open; + bool dirty; /* Flag for if a flush should occur */ + bool crashed; /* Meta file is crashed */ + Archive_share(); + virtual ~Archive_share(); + int init_archive_writer(); + void close_archive_writer(); + int write_v1_metafile(); + int read_v1_metafile(); +}; + +/* + Version for file format. + 1 - Initial Version (Never Released) + 2 - Stream Compression, seperate blobs, no packing + 3 - One stream (row and blobs), with packing +*/ +#define ARCHIVE_VERSION 3 + +class ha_archive final : public handler +{ + THR_LOCK_DATA lock; /* MySQL lock */ + Archive_share *share; /* Shared lock info */ + + azio_stream archive; /* Archive file we are working with */ + my_off_t current_position; /* The position of the row we just read */ + uchar byte_buffer[IO_SIZE]; /* Initial buffer for our string */ + String buffer; /* Buffer used for blob storage */ + ha_rows scan_rows; /* Number of rows left in scan */ + bool delayed_insert; /* If the insert is delayed */ + bool bulk_insert; /* If we are performing a bulk insert */ + const uchar *current_key; + uint current_key_len; + uint current_k_offset; + archive_record_buffer *record_buffer; + bool archive_reader_open; + + archive_record_buffer *create_record_buffer(unsigned int length); + void destroy_record_buffer(archive_record_buffer *r); + int frm_copy(azio_stream *src, azio_stream *dst); + int frm_compare(azio_stream *src); + unsigned int pack_row_v1(const uchar *record); + +public: + ha_archive(handlerton *hton, TABLE_SHARE *table_arg); + ~ha_archive() = default; + const char *index_type(uint inx) { return "NONE"; } + ulonglong table_flags() const + { + return (HA_NO_TRANSACTIONS | HA_REC_NOT_IN_SEQ | HA_CAN_BIT_FIELD | + HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | + HA_STATS_RECORDS_IS_EXACT | HA_CAN_EXPORT | + HA_HAS_RECORDS | HA_CAN_REPAIR | HA_SLOW_RND_POS | + HA_FILE_BASED | HA_CAN_INSERT_DELAYED | HA_CAN_GEOMETRY); + } + ulong index_flags(uint idx, uint part, bool all_parts) const + { + return HA_ONLY_WHOLE_INDEX; + } + virtual void get_auto_increment(ulonglong offset, ulonglong increment, + ulonglong nb_desired_values, + ulonglong *first_value, + ulonglong *nb_reserved_values); + uint max_supported_keys() const { return 1; } + uint max_supported_key_length() const { return sizeof(ulonglong); } + uint max_supported_key_part_length() const { return sizeof(ulonglong); } + ha_rows records() { return share->rows_recorded; } + int index_init(uint keynr, bool sorted); + virtual int index_read(uchar * buf, const uchar * key, + uint key_len, enum ha_rkey_function find_flag); + virtual int index_read_idx(uchar * buf, uint index, const uchar * key, + uint key_len, enum ha_rkey_function find_flag); + int index_next(uchar * buf); + int open(const char *name, int mode, uint test_if_locked); + int close(void); + int write_row(const uchar * buf); + int real_write_row(const uchar *buf, azio_stream *writer); + int truncate(); + int rnd_init(bool scan=1); + int rnd_next(uchar *buf); + int rnd_pos(uchar * buf, uchar *pos); + int get_row(azio_stream *file_to_read, uchar *buf); + int get_row_version2(azio_stream *file_to_read, uchar *buf); + int get_row_version3(azio_stream *file_to_read, uchar *buf); + Archive_share *get_share(const char *table_name, int *rc); + int init_archive_reader(); + // Always try auto_repair in case of HA_ERR_CRASHED_ON_USAGE + bool auto_repair(int error) const + { return error == HA_ERR_CRASHED_ON_USAGE; } + int read_data_header(azio_stream *file_to_read); + void position(const uchar *record); + int info(uint); + int extra(enum ha_extra_function operation); + void update_create_info(HA_CREATE_INFO *create_info); + int create(const char *name, TABLE *form, HA_CREATE_INFO *create_info); + int optimize(THD* thd, HA_CHECK_OPT* check_opt); + int repair(THD* thd, HA_CHECK_OPT* check_opt); + int check_for_upgrade(HA_CHECK_OPT *check_opt); + void start_bulk_insert(ha_rows rows, uint flags); + int end_bulk_insert(); + enum row_type get_row_type() const + { + return ROW_TYPE_COMPRESSED; + } + THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, + enum thr_lock_type lock_type); + bool is_crashed() const; + int check(THD* thd, HA_CHECK_OPT* check_opt); + bool check_and_repair(THD *thd); + uint32 max_row_length(const uchar *buf); + bool fix_rec_buff(unsigned int length); + int unpack_row(azio_stream *file_to_read, uchar *record); + unsigned int pack_row(const uchar *record, azio_stream *writer); + bool check_if_incompatible_data(HA_CREATE_INFO *info, uint table_changes); + int external_lock(THD *thd, int lock_type); +private: + void flush_and_clear_pending_writes(); +}; + |