diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
commit | 3f619478f796eddbba6e39502fe941b285dd97b1 (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /storage/archive/ha_archive.cc | |
parent | Initial commit. (diff) | |
download | mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.tar.xz mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/archive/ha_archive.cc')
-rw-r--r-- | storage/archive/ha_archive.cc | 1965 |
1 files changed, 1965 insertions, 0 deletions
diff --git a/storage/archive/ha_archive.cc b/storage/archive/ha_archive.cc new file mode 100644 index 00000000..19a0ffe0 --- /dev/null +++ b/storage/archive/ha_archive.cc @@ -0,0 +1,1965 @@ +/* + Copyright (c) 2004, 2014, Oracle and/or its affiliates + Copyright (c) 2010, 2014, SkySQL Ab. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA +*/ + +#ifdef USE_PRAGMA_IMPLEMENTATION +#pragma implementation // gcc: Class implementation +#endif + +#include <my_global.h> +#include "sql_class.h" // SSV +#include "sql_table.h" // build_table_filename +#include <myisam.h> // T_EXTEND + +#include "ha_archive.h" +#include "discover.h" +#include <my_dir.h> + +#include <mysql/plugin.h> + +/* + First, if you want to understand storage engines you should look at + ha_example.cc and ha_example.h. + + This example was written as a test case for a customer who needed + a storage engine without indexes that could compress data very well. + So, welcome to a completely compressed storage engine. This storage + engine only does inserts. No replace, deletes, or updates. All reads are + complete table scans. Compression is done through a combination of packing + and making use of the zlib library + + We keep a file pointer open for each instance of ha_archive for each read + but for writes we keep one open file handle just for that. We flush it + only if we have a read occur. azip handles compressing lots of records + at once much better then doing lots of little records between writes. + It is possible to not lock on writes but this would then mean we couldn't + handle bulk inserts as well (that is if someone was trying to read at + the same time since we would want to flush). + + A "meta" file is kept alongside the data file. This file serves two purpose. + The first purpose is to track the number of rows in the table. The second + purpose is to determine if the table was closed properly or not. When the + meta file is first opened it is marked as dirty. It is opened when the table + itself is opened for writing. When the table is closed the new count for rows + is written to the meta file and the file is marked as clean. If the meta file + is opened and it is marked as dirty, it is assumed that a crash occurred. At + this point an error occurs and the user is told to rebuild the file. + A rebuild scans the rows and rewrites the meta file. If corruption is found + in the data file then the meta file is not repaired. + + At some point a recovery method for such a drastic case needs to be divised. + + Locks are row level, and you will get a consistant read. + + For performance as far as table scans go it is quite fast. I don't have + good numbers but locally it has out performed both Innodb and MyISAM. For + Innodb the question will be if the table can be fit into the buffer + pool. For MyISAM its a question of how much the file system caches the + MyISAM file. With enough free memory MyISAM is faster. Its only when the OS + doesn't have enough memory to cache entire table that archive turns out + to be any faster. + + Examples between MyISAM (packed) and Archive. + + Table with 76695844 identical rows: + 29680807 a_archive.ARZ + 920350317 a.MYD + + + Table with 8991478 rows (all of Slashdot's comments): + 1922964506 comment_archive.ARZ + 2944970297 comment_text.MYD + + + TODO: + Allow users to set compression level. + Allow adjustable block size. + Implement versioning, should be easy. + Allow for errors, find a way to mark bad rows. + Add optional feature so that rows can be flushed at interval (which will cause less + compression but may speed up ordered searches). + Checkpoint the meta file to allow for faster rebuilds. + Option to allow for dirty reads, this would lower the sync calls, which would make + inserts a lot faster, but would mean highly arbitrary reads. + + -Brian + + Archive file format versions: + <5.1.5 - v.1 + 5.1.5-5.1.15 - v.2 + >5.1.15 - v.3 +*/ + +/* The file extension */ +#define ARZ ".ARZ" // The data file +#define ARN ".ARN" // Files used during an optimize call +#define ARM ".ARM" // Meta file (deprecated) + +/* 5.0 compatibility */ +#define META_V1_OFFSET_CHECK_HEADER 0 +#define META_V1_OFFSET_VERSION 1 +#define META_V1_OFFSET_ROWS_RECORDED 2 +#define META_V1_OFFSET_CHECK_POINT 10 +#define META_V1_OFFSET_CRASHED 18 +#define META_V1_LENGTH 19 + +/* + uchar + uchar +*/ +#define DATA_BUFFER_SIZE 2 // Size of the data used in the data file +#define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption + +#ifdef HAVE_PSI_INTERFACE +extern "C" PSI_file_key arch_key_file_data; +#endif + +/* Static declarations for handerton */ +static handler *archive_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root); +int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share); + +/* + Number of rows that will force a bulk insert. +*/ +#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2 + +/* + Size of header used for row +*/ +#define ARCHIVE_ROW_HEADER_SIZE 4 + +static handler *archive_create_handler(handlerton *hton, + TABLE_SHARE *table, + MEM_ROOT *mem_root) +{ + return new (mem_root) ha_archive(hton, table); +} + +#ifdef HAVE_PSI_INTERFACE +PSI_mutex_key az_key_mutex_Archive_share_mutex; + +static PSI_mutex_info all_archive_mutexes[]= +{ + { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0} +}; + +PSI_file_key arch_key_file_metadata, arch_key_file_data; +static PSI_file_info all_archive_files[]= +{ + { &arch_key_file_metadata, "metadata", 0}, + { &arch_key_file_data, "data", 0} +}; + +static void init_archive_psi_keys(void) +{ + const char* category= "archive"; + int count; + + if (!PSI_server) + return; + + count= array_elements(all_archive_mutexes); + mysql_mutex_register(category, all_archive_mutexes, count); + + count= array_elements(all_archive_files); + mysql_file_register(category, all_archive_files, count); +} + +#endif /* HAVE_PSI_INTERFACE */ + +/* + Initialize the archive handler. + + SYNOPSIS + archive_db_init() + void * + + RETURN + FALSE OK + TRUE Error +*/ + +/* + We just implement one additional file extension. + ARM is here just to properly drop 5.0 tables. +*/ +static const char *ha_archive_exts[] = { + ARZ, + ARM, + NullS +}; + +int archive_db_init(void *p) +{ + DBUG_ENTER("archive_db_init"); + handlerton *archive_hton; + +#ifdef HAVE_PSI_INTERFACE + init_archive_psi_keys(); +#endif + + archive_hton= (handlerton *)p; + archive_hton->db_type= DB_TYPE_ARCHIVE_DB; + archive_hton->create= archive_create_handler; + archive_hton->flags= HTON_NO_FLAGS; + archive_hton->discover_table= archive_discover; + archive_hton->tablefile_extensions= ha_archive_exts; + + DBUG_RETURN(0); +} + + +Archive_share::Archive_share() +{ + crashed= false; + in_optimize= false; + archive_write_open= false; + dirty= false; + DBUG_PRINT("ha_archive", ("Archive_share: %p", + this)); + thr_lock_init(&lock); + /* + We will use this lock for rows. + */ + mysql_mutex_init(az_key_mutex_Archive_share_mutex, + &mutex, MY_MUTEX_INIT_FAST); +} + + +Archive_share::~Archive_share() +{ + DBUG_PRINT("ha_archive", ("~Archive_share: %p", this)); + if (archive_write_open) + { + mysql_mutex_lock(&mutex); + (void) close_archive_writer(); // Will reset archive_write_open + mysql_mutex_unlock(&mutex); + } + thr_lock_delete(&lock); + mysql_mutex_destroy(&mutex); +} + + +ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg) + :handler(hton, table_arg), delayed_insert(0), bulk_insert(0) +{ + /* Set our original buffer from pre-allocated memory */ + buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info); + + /* The size of the offset value we will use for position() */ + ref_length= sizeof(my_off_t); + archive_reader_open= FALSE; +} + +int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share) +{ + DBUG_ENTER("archive_discover"); + DBUG_PRINT("archive_discover", ("db: '%s' name: '%s'", share->db.str, + share->table_name.str)); + azio_stream frm_stream; + char az_file[FN_REFLEN]; + uchar *frm_ptr; + MY_STAT file_stat; + + strxmov(az_file, share->normalized_path.str, ARZ, NullS); + + if (!(mysql_file_stat(/* arch_key_file_data */ 0, az_file, &file_stat, MYF(0)))) + DBUG_RETURN(HA_ERR_NO_SUCH_TABLE); + + if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY))) + { + if (errno == EROFS || errno == EACCES) + DBUG_RETURN(my_errno= errno); + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + if (frm_stream.frm_length == 0) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + frm_ptr= (uchar *)my_malloc(PSI_INSTRUMENT_ME, frm_stream.frm_length, + MYF(MY_THREAD_SPECIFIC | MY_WME)); + if (!frm_ptr) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + if (azread_frm(&frm_stream, frm_ptr)) + goto ret; + + azclose(&frm_stream); + + my_errno= share->init_from_binary_frm_image(thd, 1, + frm_ptr, frm_stream.frm_length); +ret: + my_free(frm_ptr); + DBUG_RETURN(my_errno); +} + +/** + @brief Read version 1 meta file (5.0 compatibility routine). + + @return Completion status + @retval 0 Success + @retval !0 Failure +*/ + +int Archive_share::read_v1_metafile() +{ + char file_name[FN_REFLEN]; + uchar buf[META_V1_LENGTH]; + File fd; + DBUG_ENTER("Archive_share::read_v1_metafile"); + + fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT); + if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1) + DBUG_RETURN(-1); + + if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf)) + { + mysql_file_close(fd, MYF(0)); + DBUG_RETURN(-1); + } + + rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED); + crashed= buf[META_V1_OFFSET_CRASHED]; + mysql_file_close(fd, MYF(0)); + DBUG_RETURN(0); +} + + +/** + @brief Write version 1 meta file (5.0 compatibility routine). + + @return Completion status + @retval 0 Success + @retval !0 Failure +*/ + +int Archive_share::write_v1_metafile() +{ + char file_name[FN_REFLEN]; + uchar buf[META_V1_LENGTH]; + File fd; + DBUG_ENTER("Archive_share::write_v1_metafile"); + + buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER; + buf[META_V1_OFFSET_VERSION]= 1; + int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded); + int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0); + buf[META_V1_OFFSET_CRASHED]= crashed; + + fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT); + if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1) + DBUG_RETURN(-1); + + if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf)) + { + mysql_file_close(fd, MYF(0)); + DBUG_RETURN(-1); + } + + mysql_file_close(fd, MYF(0)); + DBUG_RETURN(0); +} + +/** + @brief Pack version 1 row (5.0 compatibility routine). + + @param[in] record the record to pack + + @return Length of packed row +*/ + +unsigned int ha_archive::pack_row_v1(const uchar *record) +{ + uint *blob, *end; + uchar *pos; + DBUG_ENTER("pack_row_v1"); + memcpy(record_buffer->buffer, record, table->s->reclength); + + /* + The end of VARCHAR fields are filled with garbage,so here + we explicitly set the end of the VARCHAR fields with zeroes + */ + + for (Field** field= table->field; (*field) ; field++) + { + Field *fld= *field; + if (fld->type() == MYSQL_TYPE_VARCHAR) + { + if (!(fld->is_real_null(record - table->record[0]))) + { + ptrdiff_t start= (fld->ptr - table->record[0]); + Field_varstring *const field_var= (Field_varstring *)fld; + uint offset= field_var->data_length() + field_var->length_size(); + memset(record_buffer->buffer + start + offset, 0, + fld->field_length - offset + 1); + } + } + } + pos= record_buffer->buffer + table->s->reclength; + for (blob= table->s->blob_field, end= blob + table->s->blob_fields; + blob != end; blob++) + { + uint32 length= ((Field_blob *) table->field[*blob])->get_length(); + if (length) + { + uchar *data_ptr= ((Field_blob *) table->field[*blob])->get_ptr(); + memcpy(pos, data_ptr, length); + pos+= length; + } + } + DBUG_RETURN((int)(pos - record_buffer->buffer)); +} + +/* + This method reads the header of a datafile and returns whether or not it was successful. +*/ +int ha_archive::read_data_header(azio_stream *file_to_read) +{ + int error; + unsigned long ret; + uchar data_buffer[DATA_BUFFER_SIZE]; + DBUG_ENTER("ha_archive::read_data_header"); + + if (azrewind(file_to_read) == -1) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + if (file_to_read->version >= 3) + DBUG_RETURN(0); + /* Everything below this is just legacy to version 2< */ + + DBUG_PRINT("ha_archive", ("Reading legacy data header")); + + ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error); + + if (ret != DATA_BUFFER_SIZE) + { + DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu", + DATA_BUFFER_SIZE, ret)); + DBUG_RETURN(1); + } + + if (error) + { + DBUG_PRINT("ha_archive", ("Compression error (%d)", error)); + DBUG_RETURN(1); + } + + DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0])); + DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1])); + + if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) && + (data_buffer[1] == 1 || data_buffer[1] == 2)) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + DBUG_RETURN(0); +} + + +/* + We create the shared memory space that we will use for the open table. + No matter what we try to get or create a share. This is so that a repair + table operation can occur. + + See ha_example.cc for a longer description. +*/ +Archive_share *ha_archive::get_share(const char *table_name, int *rc) +{ + Archive_share *tmp_share; + + DBUG_ENTER("ha_archive::get_share"); + + lock_shared_ha_data(); + if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr()))) + { + azio_stream archive_tmp; + + tmp_share= new Archive_share; + + if (!tmp_share) + { + *rc= HA_ERR_OUT_OF_MEM; + goto err; + } + DBUG_PRINT("ha_archive", ("new Archive_share: %p", + tmp_share)); + + fn_format(tmp_share->data_file_name, table_name, "", + ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME); + strmov(tmp_share->table_name, table_name); + DBUG_PRINT("ha_archive", ("Data File %s", + tmp_share->data_file_name)); + + /* + We read the meta file, but do not mark it dirty. Since we are not + doing a write we won't mark it dirty (and we won't open it for + anything but reading... open it for write and we will generate null + compression writes). + */ + if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY))) + { + delete tmp_share; + *rc= my_errno ? my_errno : HA_ERR_CRASHED; + tmp_share= NULL; + goto err; + } + stats.auto_increment_value= archive_tmp.auto_increment + 1; + tmp_share->rows_recorded= (ha_rows)archive_tmp.rows; + tmp_share->crashed= archive_tmp.dirty; + share= tmp_share; + if (archive_tmp.version == 1) + share->read_v1_metafile(); + else if (frm_compare(&archive_tmp)) + *rc= HA_ERR_TABLE_DEF_CHANGED; + + azclose(&archive_tmp); + + set_ha_share_ptr(static_cast<Handler_share*>(tmp_share)); + } + if (tmp_share->crashed) + *rc= HA_ERR_CRASHED_ON_USAGE; +err: + unlock_shared_ha_data(); + + DBUG_ASSERT(tmp_share || *rc); + + DBUG_RETURN(tmp_share); +} + + +int Archive_share::init_archive_writer() +{ + DBUG_ENTER("Archive_share::init_archive_writer"); + /* + It is expensive to open and close the data files and since you can't have + a gzip file that can be both read and written we keep a writer open + that is shared amoung all open tables. + */ + if (!(azopen(&archive_write, data_file_name, + O_RDWR|O_BINARY))) + { + DBUG_PRINT("ha_archive", ("Could not open archive write file")); + crashed= true; + DBUG_RETURN(1); + } + archive_write_open= true; + + DBUG_RETURN(0); +} + + +void Archive_share::close_archive_writer() +{ + mysql_mutex_assert_owner(&mutex); + if (archive_write_open) + { + if (archive_write.version == 1) + (void) write_v1_metafile(); + azclose(&archive_write); + archive_write_open= false; + dirty= false; + } +} + + +/* + No locks are required because it is associated with just one handler instance +*/ +int ha_archive::init_archive_reader() +{ + DBUG_ENTER("ha_archive::init_archive_reader"); + /* + It is expensive to open and close the data files and since you can't have + a gzip file that can be both read and written we keep a writer open + that is shared amoung all open tables, but have one reader open for + each handler instance. + */ + if (!archive_reader_open) + { + if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY))) + { + DBUG_PRINT("ha_archive", ("Could not open archive read file")); + share->crashed= TRUE; + DBUG_RETURN(1); + } + archive_reader_open= TRUE; + } + + DBUG_RETURN(0); +} + + +/* + When opening a file we: + Create/get our shared structure. + Init out lock. + We open the file we will read from. +*/ +int ha_archive::open(const char *name, int mode, uint open_options) +{ + int rc= 0; + DBUG_ENTER("ha_archive::open"); + + DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s", + (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no")); + share= get_share(name, &rc); + if (!share) + DBUG_RETURN(rc); + + /* Allow open on crashed table in repair mode only. */ + switch (rc) + { + case 0: + break; + case HA_ERR_TABLE_DEF_CHANGED: + case HA_ERR_CRASHED_ON_USAGE: + if (open_options & HA_OPEN_FOR_REPAIR) + { + rc= 0; + break; + } + /* fall through */ + default: + DBUG_RETURN(rc); + } + + DBUG_ASSERT(share); + + record_buffer= create_record_buffer(table->s->reclength + + ARCHIVE_ROW_HEADER_SIZE); + + if (!record_buffer) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + thr_lock_data_init(&share->lock, &lock, NULL); + + DBUG_PRINT("ha_archive", ("archive table was crashed %s", + rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no")); + if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR) + { + DBUG_RETURN(0); + } + + DBUG_RETURN(rc); +} + + +/* + Closes the file. + + SYNOPSIS + close(); + + IMPLEMENTATION: + + We first close this storage engines file handle to the archive and + then remove our reference count to the table (and possibly free it + as well). + + RETURN + 0 ok + 1 Error +*/ + +int ha_archive::close(void) +{ + int rc= 0; + DBUG_ENTER("ha_archive::close"); + + destroy_record_buffer(record_buffer); + + /* First close stream */ + if (archive_reader_open) + { + if (azclose(&archive)) + rc= 1; + } + DBUG_RETURN(rc); +} + + +/** + Copy a frm blob between streams. + + @param src The source stream. + @param dst The destination stream. + + @return Zero on success, non-zero otherwise. +*/ + +int ha_archive::frm_copy(azio_stream *src, azio_stream *dst) +{ + int rc= 0; + uchar *frm_ptr; + + if (!src->frm_length) + { + size_t frm_len; + if (!table_share->read_frm_image((const uchar**) &frm_ptr, &frm_len)) + { + azwrite_frm(dst, frm_ptr, frm_len); + table_share->free_frm_image(frm_ptr); + } + return 0; + } + + if (!(frm_ptr= (uchar *) my_malloc(PSI_INSTRUMENT_ME, src->frm_length, + MYF(MY_THREAD_SPECIFIC | MY_WME)))) + return HA_ERR_OUT_OF_MEM; + + /* Write file offset is set to the end of the file. */ + if (azread_frm(src, frm_ptr) || + azwrite_frm(dst, frm_ptr, src->frm_length)) + rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR; + + my_free(frm_ptr); + + return rc; +} + + +/** + Compare frm blob with the on-disk frm file + + @param s The azio stream. + + @return Zero if equal, non-zero otherwise. +*/ + +int ha_archive::frm_compare(azio_stream *s) +{ + if (!s->frmver_length) + return 0; // Old pre-10.0 archive table. Never rediscover. + + LEX_CUSTRING *ver= &table->s->tabledef_version; + return ver->length != s->frmver_length || + memcmp(ver->str, s->frmver, ver->length); +} + + +/* + We create our data file here. The format is pretty simple. + You can read about the format of the data file above. + Unlike other storage engines we do not "pack" our data. Since we + are about to do a general compression, packing would just be a waste of + CPU time. If the table has blobs they are written after the row in the order + of creation. +*/ + +int ha_archive::create(const char *name, TABLE *table_arg, + HA_CREATE_INFO *create_info) +{ + char name_buff[FN_REFLEN]; + char linkname[FN_REFLEN]; + int error; + azio_stream create_stream; /* Archive file we are working with */ + const uchar *frm_ptr; + size_t frm_len; + + DBUG_ENTER("ha_archive::create"); + + stats.auto_increment_value= create_info->auto_increment_value; + + for (uint key= 0; key < table_arg->s->keys; key++) + { + KEY *pos= table_arg->key_info+key; + KEY_PART_INFO *key_part= pos->key_part; + KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts; + + for (; key_part != key_part_end; key_part++) + { + Field *field= key_part->field; + + if (!(field->flags & AUTO_INCREMENT_FLAG) || + key_part->key_part_flag & HA_REVERSE_SORT) + + { + error= HA_WRONG_CREATE_OPTION; + DBUG_PRINT("ha_archive", ("Index error in creating archive table")); + goto error; + } + } + } + + /* + We reuse name_buff since it is available. + */ +#ifdef HAVE_READLINK + if (my_use_symdir && + create_info->data_file_name && + create_info->data_file_name[0] != '#') + { + DBUG_PRINT("ha_archive", ("archive will create stream file %s", + create_info->data_file_name)); + + fn_format(name_buff, create_info->data_file_name, "", ARZ, + MY_REPLACE_EXT | MY_UNPACK_FILENAME); + fn_format(linkname, name, "", ARZ, + MY_REPLACE_EXT | MY_UNPACK_FILENAME); + } + else +#endif /* HAVE_READLINK */ + { + if (create_info->data_file_name) + my_error(WARN_OPTION_IGNORED, MYF(ME_WARNING), "DATA DIRECTORY"); + + fn_format(name_buff, name, "", ARZ, + MY_REPLACE_EXT | MY_UNPACK_FILENAME); + linkname[0]= 0; + } + + /* Archive engine never uses INDEX DIRECTORY. */ + if (create_info->index_file_name) + my_error(WARN_OPTION_IGNORED, MYF(ME_WARNING), "INDEX DIRECTORY"); + + /* + There is a chance that the file was "discovered". In this case + just use whatever file is there. + */ + my_errno= 0; + if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY))) + { + error= errno; + goto error2; + } + + if (linkname[0]) + my_symlink(name_buff, linkname, MYF(0)); + + /* + Here is where we open up the frm and pass it to archive to store + */ + if (!table_arg->s->read_frm_image(&frm_ptr, &frm_len)) + { + azwrite_frm(&create_stream, frm_ptr, frm_len); + table_arg->s->free_frm_image(frm_ptr); + } + + if (create_info->comment.str) + azwrite_comment(&create_stream, create_info->comment.str, + create_info->comment.length); + + /* + Yes you need to do this, because the starting value + for the autoincrement may not be zero. + */ + create_stream.auto_increment= stats.auto_increment_value ? + stats.auto_increment_value - 1 : 0; + if (azclose(&create_stream)) + { + error= errno; + goto error2; + } + + DBUG_PRINT("ha_archive", ("Creating File %s", name_buff)); + DBUG_PRINT("ha_archive", ("Creating Link %s", linkname)); + + + DBUG_RETURN(0); + +error2: + delete_table(name); +error: + /* Return error number, if we got one */ + DBUG_RETURN(error ? error : -1); +} + +/* + This is where the actual row is written out. +*/ +int ha_archive::real_write_row(const uchar *buf, azio_stream *writer) +{ + my_off_t written; + unsigned int r_pack_length; + DBUG_ENTER("ha_archive::real_write_row"); + + /* We pack the row for writing */ + r_pack_length= pack_row(buf, writer); + + written= azwrite(writer, record_buffer->buffer, r_pack_length); + if (written != r_pack_length) + { + DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d", + (uint32) written, + (uint32)r_pack_length)); + DBUG_RETURN(-1); + } + + if (!delayed_insert || !bulk_insert) + share->dirty= TRUE; + + DBUG_RETURN(0); +} + + +/* + Calculate max length needed for row. This includes + the bytes required for the length in the header. +*/ + +uint32 ha_archive::max_row_length(const uchar *record) +{ + uint32 length= (uint32)(table->s->reclength + table->s->fields*2); + length+= ARCHIVE_ROW_HEADER_SIZE; + my_ptrdiff_t const rec_offset= record - table->record[0]; + + uint *ptr, *end; + for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; + ptr != end ; + ptr++) + { + if (!table->field[*ptr]->is_null(rec_offset)) + length += 2 + ((Field_blob*)table->field[*ptr])->get_length(rec_offset); + } + + return length; +} + + +unsigned int ha_archive::pack_row(const uchar *record, azio_stream *writer) +{ + uchar *ptr; + my_ptrdiff_t const rec_offset= record - table->record[0]; + DBUG_ENTER("ha_archive::pack_row"); + + if (fix_rec_buff(max_row_length(record))) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */ + + if (writer->version == 1) + DBUG_RETURN(pack_row_v1(record)); + + /* Copy null bits */ + memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE, + record, table->s->null_bytes); + ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE; + + for (Field **field=table->field ; *field ; field++) + { + if (!((*field)->is_null(rec_offset))) + ptr= (*field)->pack(ptr, record + (*field)->offset(record)); + } + + int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer - + ARCHIVE_ROW_HEADER_SIZE)); + DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int) + (ptr - record_buffer->buffer - + ARCHIVE_ROW_HEADER_SIZE))); + + DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer)); +} + + +/* + Look at ha_archive::open() for an explanation of the row format. + Here we just write out the row. + + Wondering about start_bulk_insert()? We don't implement it for + archive since it optimizes for lots of writes. The only save + for implementing start_bulk_insert() is that we could skip + setting dirty to true each time. +*/ +int ha_archive::write_row(const uchar *buf) +{ + int rc; + uchar *read_buf= NULL; + ulonglong temp_auto; + uchar *record= table->record[0]; + DBUG_ENTER("ha_archive::write_row"); + + if (share->crashed) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + mysql_mutex_lock(&share->mutex); + + if (!share->archive_write_open && share->init_archive_writer()) + { + rc= errno; + goto error; + } + + if (table->next_number_field && record == table->record[0]) + { + KEY *mkey= &table->key_info[0]; // We only support one key right now + update_auto_increment(); + temp_auto= table->next_number_field->val_int(); + + /* + We don't support decremening auto_increment. They make the performance + just cry. + */ + if (temp_auto <= share->archive_write.auto_increment && + mkey->flags & HA_NOSAME) + { + rc= HA_ERR_FOUND_DUPP_KEY; + goto error; + } +#ifdef DEAD_CODE + /* + Bad news, this will cause a search for the unique value which is very + expensive since we will have to do a table scan which will lock up + all other writers during this period. This could perhaps be optimized + in the future. + */ + { + /* + First we create a buffer that we can use for reading rows, and can pass + to get_row(). + */ + if (!(read_buf= (uchar*) my_malloc(table->s->reclength, + MYF(MY_THREAD_SPECIFIC | MY_WME)))) + { + rc= HA_ERR_OUT_OF_MEM; + goto error; + } + /* + All of the buffer must be written out or we won't see all of the + data + */ + azflush(&(share->archive_write), Z_SYNC_FLUSH); + /* + Set the position of the local read thread to the beginning position. + */ + if (read_data_header(&archive)) + { + rc= HA_ERR_CRASHED_ON_USAGE; + goto error; + } + + Field *mfield= table->next_number_field; + + while (!(get_row(&archive, read_buf))) + { + if (!memcmp(read_buf + mfield->offset(record), + table->next_number_field->ptr, + mfield->max_display_length())) + { + rc= HA_ERR_FOUND_DUPP_KEY; + goto error; + } + } + } +#endif + else + { + if (temp_auto > share->archive_write.auto_increment) + stats.auto_increment_value= + (share->archive_write.auto_increment= temp_auto) + 1; + } + } + + /* + Notice that the global auto_increment has been increased. + In case of a failed row write, we will never try to reuse the value. + */ + share->rows_recorded++; + rc= real_write_row(buf, &(share->archive_write)); +error: + mysql_mutex_unlock(&share->mutex); + my_free(read_buf); + DBUG_RETURN(rc); +} + + +void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment, + ulonglong nb_desired_values, + ulonglong *first_value, + ulonglong *nb_reserved_values) +{ + *nb_reserved_values= ULONGLONG_MAX; + *first_value= share->archive_write.auto_increment + 1; +} + +/* Initialized at each key walk (called multiple times unlike rnd_init()) */ +int ha_archive::index_init(uint keynr, bool sorted) +{ + DBUG_ENTER("ha_archive::index_init"); + active_index= keynr; + DBUG_RETURN(0); +} + + +/* + No indexes, so if we get a request for an index search since we tell + the optimizer that we have unique indexes, we scan +*/ +int ha_archive::index_read(uchar *buf, const uchar *key, + uint key_len, enum ha_rkey_function find_flag) +{ + int rc; + DBUG_ENTER("ha_archive::index_read"); + rc= index_read_idx(buf, active_index, key, key_len, find_flag); + DBUG_RETURN(rc); +} + + +int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key, + uint key_len, enum ha_rkey_function find_flag) +{ + int rc; + bool found= 0; + KEY *mkey= &table->key_info[index]; + current_k_offset= mkey->key_part->offset; + current_key= key; + current_key_len= key_len; + + + DBUG_ENTER("ha_archive::index_read_idx"); + + rc= rnd_init(TRUE); + + if (rc) + goto error; + + while (!(get_row(&archive, buf))) + { + if (!memcmp(current_key, buf + current_k_offset, current_key_len)) + { + found= 1; + break; + } + } + + if (found) + { + /* notify handler that a record has been found */ + table->status= 0; + DBUG_RETURN(0); + } + +error: + DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE); +} + + +int ha_archive::index_next(uchar * buf) +{ + bool found= 0; + int rc; + + DBUG_ENTER("ha_archive::index_next"); + + while (!(get_row(&archive, buf))) + { + if (!memcmp(current_key, buf+current_k_offset, current_key_len)) + { + found= 1; + break; + } + } + + rc= found ? 0 : HA_ERR_END_OF_FILE; + DBUG_RETURN(rc); +} + +/* + All calls that need to scan the table start with this method. If we are told + that it is a table scan we rewind the file to the beginning, otherwise + we assume the position will be set. +*/ + +int ha_archive::rnd_init(bool scan) +{ + DBUG_ENTER("ha_archive::rnd_init"); + + if (share->crashed) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + if (init_archive_reader()) + DBUG_RETURN(errno); + + /* We rewind the file so that we can read from the beginning if scan */ + if (scan) + { + scan_rows= stats.records; + DBUG_PRINT("info", ("archive will retrieve %llu rows", + (unsigned long long) scan_rows)); + + if (read_data_header(&archive)) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + DBUG_RETURN(0); +} + + +/* + This is the method that is used to read a row. It assumes that the row is + positioned where you want it. +*/ +int ha_archive::get_row(azio_stream *file_to_read, uchar *buf) +{ + int rc; + DBUG_ENTER("ha_archive::get_row"); + DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d", + (uchar)file_to_read->version, + ARCHIVE_VERSION)); + if (file_to_read->version == ARCHIVE_VERSION) + rc= get_row_version3(file_to_read, buf); + else + rc= get_row_version2(file_to_read, buf); + + DBUG_PRINT("ha_archive", ("Return %d\n", rc)); + + DBUG_RETURN(rc); +} + +/* Reallocate buffer if needed */ +bool ha_archive::fix_rec_buff(unsigned int length) +{ + DBUG_ENTER("ha_archive::fix_rec_buff"); + DBUG_PRINT("ha_archive", ("Fixing %u for %u", + length, record_buffer->length)); + DBUG_ASSERT(record_buffer->buffer); + + if (length > record_buffer->length) + { + uchar *newptr; + if (!(newptr=(uchar*) my_realloc(PSI_INSTRUMENT_ME, + (uchar*) record_buffer->buffer, length, + MYF(MY_ALLOW_ZERO_PTR)))) + DBUG_RETURN(1); + record_buffer->buffer= newptr; + record_buffer->length= length; + } + + DBUG_ASSERT(length <= record_buffer->length); + + DBUG_RETURN(0); +} + +int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record) +{ + DBUG_ENTER("ha_archive::unpack_row"); + + unsigned int read; + int error; + uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE]; + unsigned int row_len; + + /* First we grab the length stored */ + read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error); + + if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE)) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + /* If we read nothing we are at the end of the file */ + if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + row_len= uint4korr(size_buffer); + DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len, + (unsigned int)table->s->reclength)); + + if (fix_rec_buff(row_len)) + { + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + } + DBUG_ASSERT(row_len <= record_buffer->length); + + read= azread(file_to_read, record_buffer->buffer, row_len, &error); + + if (read != row_len || error) + { + DBUG_RETURN(error ? HA_ERR_CRASHED_ON_USAGE : HA_ERR_WRONG_IN_RECORD); + } + + /* Copy null bits */ + const uchar *ptr= record_buffer->buffer, *end= ptr+ row_len; + memcpy(record, ptr, table->s->null_bytes); + ptr+= table->s->null_bytes; + if (ptr > end) + DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); + for (Field **field=table->field ; *field ; field++) + { + if (!((*field)->is_null_in_record(record))) + { + if (!(ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), + ptr, end))) + DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); + } + } + if (ptr != end) + DBUG_RETURN(HA_ERR_WRONG_IN_RECORD); + DBUG_RETURN(0); +} + + +int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf) +{ + DBUG_ENTER("ha_archive::get_row_version3"); + + int returnable= unpack_row(file_to_read, buf); + + DBUG_RETURN(returnable); +} + + +int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf) +{ + unsigned int read; + int error; + uint *ptr, *end; + char *last; + size_t total_blob_length= 0; + MY_BITMAP *read_set= table->read_set; + DBUG_ENTER("ha_archive::get_row_version2"); + + read= azread(file_to_read, (voidp)buf, table->s->reclength, &error); + + /* If we read nothing we are at the end of the file */ + if (read == 0) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + if (read != table->s->reclength) + { + DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u", + read, + (unsigned int)table->s->reclength)); + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR ) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + /* + If the record is the wrong size, the file is probably damaged, unless + we are dealing with a delayed insert or a bulk insert. + */ + if ((ulong) read != table->s->reclength) + DBUG_RETURN(HA_ERR_END_OF_FILE); + + /* Calculate blob length, we use this for our buffer */ + for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; + ptr != end ; + ptr++) + { + if (bitmap_is_set(read_set, + (((Field_blob*) table->field[*ptr])->field_index))) + total_blob_length += ((Field_blob*) table->field[*ptr])->get_length(); + } + + /* Adjust our row buffer if we need be */ + buffer.alloc(total_blob_length); + last= (char *)buffer.ptr(); + + /* Loop through our blobs and read them */ + for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ; + ptr != end ; + ptr++) + { + size_t size= ((Field_blob*) table->field[*ptr])->get_length(); + if (size) + { + if (bitmap_is_set(read_set, + ((Field_blob*) table->field[*ptr])->field_index)) + { + read= azread(file_to_read, last, size, &error); + + if (error) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + if ((size_t) read != size) + DBUG_RETURN(HA_ERR_END_OF_FILE); + ((Field_blob*) table->field[*ptr])->set_ptr(read, (uchar*) last); + last += size; + } + else + { + (void)azseek(file_to_read, size, SEEK_CUR); + } + } + } + DBUG_RETURN(0); +} + + +/* + Called during ORDER BY. Its position is either from being called sequentially + or by having had ha_archive::rnd_pos() called before it is called. +*/ + +int ha_archive::rnd_next(uchar *buf) +{ + int rc; + DBUG_ENTER("ha_archive::rnd_next"); + + if (share->crashed) + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + + if (!scan_rows) + { + rc= HA_ERR_END_OF_FILE; + goto end; + } + scan_rows--; + + current_position= aztell(&archive); + rc= get_row(&archive, buf); + +end: + DBUG_RETURN(rc); +} + + +/* + Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after + each call to ha_archive::rnd_next() if an ordering of the rows is + needed. +*/ + +void ha_archive::position(const uchar *record) +{ + DBUG_ENTER("ha_archive::position"); + my_store_ptr(ref, ref_length, current_position); + DBUG_VOID_RETURN; +} + + +/* + This is called after a table scan for each row if the results of the + scan need to be ordered. It will take *pos and use it to move the + cursor in the file so that the next row that is called is the + correctly ordered row. +*/ + +int ha_archive::rnd_pos(uchar * buf, uchar *pos) +{ + int rc; + DBUG_ENTER("ha_archive::rnd_pos"); + current_position= (my_off_t)my_get_ptr(pos, ref_length); + if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L)) + { + rc= HA_ERR_CRASHED_ON_USAGE; + goto end; + } + rc= get_row(&archive, buf); +end: + DBUG_RETURN(rc); +} + + +/** + @brief Check for upgrade + + @param[in] check_opt check options + + @return Completion status + @retval HA_ADMIN_OK No upgrade required + @retval HA_ADMIN_CORRUPT Cannot read meta-data + @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required +*/ + +int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt) +{ + DBUG_ENTER("ha_archive::check_for_upgrade"); + if (init_archive_reader()) + DBUG_RETURN(HA_ADMIN_CORRUPT); + if (archive.version < ARCHIVE_VERSION) + DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE); + DBUG_RETURN(HA_ADMIN_OK); +} + + +/* + This method repairs the meta file. It does this by walking the datafile and + rewriting the meta file. If EXTENDED repair is requested, we attempt to + recover as much data as possible. +*/ +int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt) +{ + DBUG_ENTER("ha_archive::repair"); + int rc= optimize(thd, check_opt); + + if (rc) + DBUG_RETURN(HA_ADMIN_CORRUPT); + + share->crashed= FALSE; + DBUG_RETURN(0); +} + +/* + The table can become fragmented if data was inserted, read, and then + inserted again. What we do is open up the file and recompress it completely. +*/ +int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt) +{ + int rc= 0; + azio_stream writer; + char writer_filename[FN_REFLEN]; + DBUG_ENTER("ha_archive::optimize"); + + mysql_mutex_lock(&share->mutex); + + if (init_archive_reader()) + { + mysql_mutex_unlock(&share->mutex); + DBUG_RETURN(errno); + } + + // now we close both our writer and our reader for the rename + if (share->archive_write_open) + { + azclose(&(share->archive_write)); + share->archive_write_open= FALSE; + } + + /* Lets create a file to contain the new data */ + fn_format(writer_filename, share->table_name, "", ARN, + MY_REPLACE_EXT | MY_UNPACK_FILENAME); + + if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY))) + { + mysql_mutex_unlock(&share->mutex); + DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); + } + + /* + Transfer the embedded FRM so that the file can be discoverable. + Write file offset is set to the end of the file. + */ + if ((rc= frm_copy(&archive, &writer))) + goto error; + + /* + An extended rebuild is a lot more effort. We open up each row and re-record it. + Any dead rows are removed (aka rows that may have been partially recorded). + + As of Archive format 3, this is the only type that is performed, before this + version it was just done on T_EXTEND + */ + if (1) + { + DBUG_PRINT("ha_archive", ("archive extended rebuild")); + + /* + Now we will rewind the archive file so that we are positioned at the + start of the file. + */ + rc= read_data_header(&archive); + + /* + On success of writing out the new header, we now fetch each row and + insert it into the new archive file. + */ + if (!rc) + { + share->rows_recorded= 0; + stats.auto_increment_value= 1; + share->archive_write.auto_increment= 0; + MY_BITMAP *org_bitmap= tmp_use_all_columns(table, &table->read_set); + + while (!(rc= get_row(&archive, table->record[0]))) + { + real_write_row(table->record[0], &writer); + /* + Long term it should be possible to optimize this so that + it is not called on each row. + */ + if (table->found_next_number_field) + { + Field *field= table->found_next_number_field; + ulonglong auto_value= + (ulonglong) field->val_int(table->record[0] + + field->offset(table->record[0])); + if (share->archive_write.auto_increment < auto_value) + stats.auto_increment_value= + (share->archive_write.auto_increment= auto_value) + 1; + } + } + + tmp_restore_column_map(&table->read_set, org_bitmap); + share->rows_recorded= (ha_rows)writer.rows; + } + + DBUG_PRINT("info", ("recovered %llu archive rows", + (unsigned long long)share->rows_recorded)); + + DBUG_PRINT("ha_archive", ("recovered %llu archive rows", + (unsigned long long)share->rows_recorded)); + + /* + If REPAIR ... EXTENDED is requested, try to recover as much data + from data file as possible. In this case if we failed to read a + record, we assume EOF. This allows massive data loss, but we can + hardly do more with broken zlib stream. And this is the only way + to restore at least what is still recoverable. + */ + if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND)) + goto error; + } + + azclose(&writer); + share->dirty= FALSE; + + azclose(&archive); + + // make the file we just wrote be our data file + rc= my_rename(writer_filename, share->data_file_name, MYF(0)); + + + mysql_mutex_unlock(&share->mutex); + DBUG_RETURN(rc); +error: + DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc)); + azclose(&writer); + mysql_mutex_unlock(&share->mutex); + + DBUG_RETURN(rc); +} + +/* + Below is an example of how to setup row level locking. +*/ +THR_LOCK_DATA **ha_archive::store_lock(THD *thd, + THR_LOCK_DATA **to, + enum thr_lock_type lock_type) +{ + if (lock_type == TL_WRITE_DELAYED) + delayed_insert= TRUE; + else + delayed_insert= FALSE; + + if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) + { + /* + Here is where we get into the guts of a row level lock. + If TL_UNLOCK is set + If we are not doing a LOCK TABLE, DELAYED LOCK or DISCARD/IMPORT + TABLESPACE, then allow multiple writers + */ + + if ((lock_type >= TL_WRITE_CONCURRENT_INSERT && + lock_type <= TL_WRITE) && delayed_insert == FALSE && + !thd_in_lock_tables(thd) + && !thd_tablespace_op(thd)) + lock_type = TL_WRITE_ALLOW_WRITE; + + /* + In queries of type INSERT INTO t1 SELECT ... FROM t2 ... + MySQL would use the lock TL_READ_NO_INSERT on t2, and that + would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts + to t2. Convert the lock to a normal read lock to allow + concurrent inserts to t2. + */ + + if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd)) + lock_type = TL_READ; + + lock.type=lock_type; + } + + *to++= &lock; + + return to; +} + +void ha_archive::update_create_info(HA_CREATE_INFO *create_info) +{ + char tmp_real_path[FN_REFLEN]; + DBUG_ENTER("ha_archive::update_create_info"); + + ha_archive::info(HA_STATUS_AUTO); + if (!(create_info->used_fields & HA_CREATE_USED_AUTO)) + { + create_info->auto_increment_value= stats.auto_increment_value; + } + + if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0)))) + create_info->data_file_name= thd_strdup(ha_thd(), tmp_real_path); + + DBUG_VOID_RETURN; +} + +/* + Hints for optimizer, see ha_tina for more information +*/ +int ha_archive::info(uint flag) +{ + DBUG_ENTER("ha_archive::info"); + + flush_and_clear_pending_writes(); + stats.deleted= 0; + + DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records)); + /* Costs quite a bit more to get all information */ + if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE)) + { + MY_STAT file_stat; // Stat information for the data file + + (void) mysql_file_stat(/* arch_key_file_data */ 0, share->data_file_name, &file_stat, MYF(MY_WME)); + + if (flag & HA_STATUS_TIME) + stats.update_time= (ulong) file_stat.st_mtime; + if (flag & HA_STATUS_CONST) + { + stats.max_data_file_length= MAX_FILE_SIZE; + stats.create_time= (ulong) file_stat.st_ctime; + } + if (flag & HA_STATUS_VARIABLE) + { + stats.delete_length= 0; + stats.data_file_length= file_stat.st_size; + stats.index_file_length=0; + stats.mean_rec_length= stats.records ? + ulong(stats.data_file_length / stats.records) : table->s->reclength; + } + } + + if (flag & HA_STATUS_AUTO) + { + if (init_archive_reader()) + DBUG_RETURN(errno); + + mysql_mutex_lock(&share->mutex); + azflush(&archive, Z_SYNC_FLUSH); + mysql_mutex_unlock(&share->mutex); + stats.auto_increment_value= archive.auto_increment + 1; + } + + DBUG_RETURN(0); +} + + +int ha_archive::external_lock(THD *thd, int lock_type) +{ + if (lock_type == F_RDLCK) + { + // We are going to read from the table. Flush any pending writes that we + // may have + flush_and_clear_pending_writes(); + } + return 0; +} + + +void ha_archive::flush_and_clear_pending_writes() +{ + mysql_mutex_lock(&share->mutex); + if (share->dirty) + { + DBUG_PRINT("ha_archive", ("archive flushing out rows for scan")); + DBUG_ASSERT(share->archive_write_open); + azflush(&(share->archive_write), Z_SYNC_FLUSH); + share->dirty= FALSE; + } + + /* + This should be an accurate number now, though bulk and delayed inserts can + cause the number to be inaccurate. + */ + stats.records= share->rows_recorded; + mysql_mutex_unlock(&share->mutex); +} + + +int ha_archive::extra(enum ha_extra_function operation) +{ + switch (operation) { + case HA_EXTRA_FLUSH: + mysql_mutex_lock(&share->mutex); + share->close_archive_writer(); + mysql_mutex_unlock(&share->mutex); + break; + default: + break; + } + return 0; +} + +/* + This method tells us that a bulk insert operation is about to occur. We set + a flag which will keep write_row from saying that its data is dirty. This in + turn will keep selects from causing a sync to occur. + Basically, yet another optimizations to keep compression working well. +*/ +void ha_archive::start_bulk_insert(ha_rows rows, uint flags) +{ + DBUG_ENTER("ha_archive::start_bulk_insert"); + if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT) + bulk_insert= TRUE; + DBUG_VOID_RETURN; +} + + +/* + Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert + flag, and set the share dirty so that the next select will call sync for us. +*/ +int ha_archive::end_bulk_insert() +{ + DBUG_ENTER("ha_archive::end_bulk_insert"); + bulk_insert= FALSE; + mysql_mutex_lock(&share->mutex); + if (share->archive_write_open) + share->dirty= true; + mysql_mutex_unlock(&share->mutex); + DBUG_RETURN(0); +} + +/* + We cancel a truncate command. The only way to delete an archive table is to drop it. + This is done for security reasons. In a later version we will enable this by + allowing the user to select a different row format. +*/ +int ha_archive::truncate() +{ + DBUG_ENTER("ha_archive::truncate"); + DBUG_RETURN(HA_ERR_WRONG_COMMAND); +} + +/* + We just return state if asked. +*/ +bool ha_archive::is_crashed() const +{ + DBUG_ENTER("ha_archive::is_crashed"); + DBUG_RETURN(share->crashed); +} + +/* + Simple scan of the tables to make sure everything is ok. +*/ + +int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt) +{ + int rc= 0; + const char *old_proc_info; + ha_rows count; + DBUG_ENTER("ha_archive::check"); + + old_proc_info= thd_proc_info(thd, "Checking table"); + mysql_mutex_lock(&share->mutex); + count= share->rows_recorded; + /* Flush any waiting data */ + if (share->archive_write_open) + azflush(&(share->archive_write), Z_SYNC_FLUSH); + mysql_mutex_unlock(&share->mutex); + + if (init_archive_reader()) + DBUG_RETURN(HA_ADMIN_CORRUPT); + /* + Now we will rewind the archive file so that we are positioned at the + start of the file. + */ + read_data_header(&archive); + for (ha_rows cur_count= count; cur_count; cur_count--) + { + if ((rc= get_row(&archive, table->record[0]))) + goto error; + } + /* + Now read records that may have been inserted concurrently. + Acquire share->mutex so tail of the table is not modified by + concurrent writers. + */ + mysql_mutex_lock(&share->mutex); + count= share->rows_recorded - count; + if (share->archive_write_open) + azflush(&(share->archive_write), Z_SYNC_FLUSH); + while (!(rc= get_row(&archive, table->record[0]))) + count--; + mysql_mutex_unlock(&share->mutex); + + if ((rc && rc != HA_ERR_END_OF_FILE) || count) + goto error; + + thd_proc_info(thd, old_proc_info); + DBUG_RETURN(HA_ADMIN_OK); + +error: + thd_proc_info(thd, old_proc_info); + share->crashed= FALSE; + DBUG_RETURN(HA_ADMIN_CORRUPT); +} + +/* + Check and repair the table if needed. +*/ +bool ha_archive::check_and_repair(THD *thd) +{ + HA_CHECK_OPT check_opt; + DBUG_ENTER("ha_archive::check_and_repair"); + + check_opt.init(); + + DBUG_RETURN(repair(thd, &check_opt)); +} + +archive_record_buffer *ha_archive::create_record_buffer(unsigned int length) +{ + DBUG_ENTER("ha_archive::create_record_buffer"); + archive_record_buffer *r; + if (!(r= (archive_record_buffer*) my_malloc(PSI_INSTRUMENT_ME, + sizeof(archive_record_buffer), MYF(MY_WME)))) + { + DBUG_RETURN(NULL); /* purecov: inspected */ + } + r->length= (int)length; + + if (!(r->buffer= (uchar*) my_malloc(PSI_INSTRUMENT_ME, r->length, MYF(MY_WME)))) + { + my_free(r); + DBUG_RETURN(NULL); /* purecov: inspected */ + } + + DBUG_RETURN(r); +} + +void ha_archive::destroy_record_buffer(archive_record_buffer *r) +{ + DBUG_ENTER("ha_archive::destroy_record_buffer"); + my_free(r->buffer); + my_free(r); + DBUG_VOID_RETURN; +} + +/* + In archive *any* ALTER should cause a table to be rebuilt, + no ALTER can be frm-only. + Because after any change to the frm file archive must update the + frm image in the ARZ file. And this cannot be done in-place, it + requires ARZ file to be recreated from scratch +*/ +bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info_arg, + uint table_changes) +{ + return COMPATIBLE_DATA_NO; +} + + +struct st_mysql_storage_engine archive_storage_engine= +{ MYSQL_HANDLERTON_INTERFACE_VERSION }; + +maria_declare_plugin(archive) +{ + MYSQL_STORAGE_ENGINE_PLUGIN, + &archive_storage_engine, + "ARCHIVE", + "Brian Aker, MySQL AB", + "gzip-compresses tables for a low storage footprint", + PLUGIN_LICENSE_GPL, + archive_db_init, /* Plugin Init */ + NULL, /* Plugin Deinit */ + 0x0300 /* 3.0 */, + NULL, /* status variables */ + NULL, /* system variables */ + "1.0", /* string version */ + MariaDB_PLUGIN_MATURITY_STABLE /* maturity */ +} +maria_declare_plugin_end; + |