diff options
Diffstat (limited to '')
-rw-r--r-- | storage/maria/ma_write.c | 2503 |
1 files changed, 2503 insertions, 0 deletions
diff --git a/storage/maria/ma_write.c b/storage/maria/ma_write.c new file mode 100644 index 00000000..95cc1203 --- /dev/null +++ b/storage/maria/ma_write.c @@ -0,0 +1,2503 @@ +/* Copyright (C) 2004-2008 MySQL AB & MySQL Finland AB & TCX DataKonsult AB + Copyright (C) 2008-2009 Sun Microsystems, Inc. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +/* Write a row to a MARIA table */ + +#include "ma_fulltext.h" +#include "ma_rt_index.h" +#include "trnman.h" +#include "ma_key_recover.h" +#include "ma_blockrec.h" + + /* Functions declared in this file */ + +static int w_search(MARIA_HA *info, uint32 comp_flag, + MARIA_KEY *key, my_off_t page, + MARIA_PAGE *father_page, uchar *father_keypos, + my_bool insert_last); +static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo, + MARIA_KEY *key, MARIA_PAGE *curr_page, + MARIA_PAGE *father_page, + uchar *father_key_pos, MARIA_KEY_PARAM *s_temp); +static uchar *_ma_find_last_pos(MARIA_KEY *int_key, + MARIA_PAGE *page, uchar **after_key); +static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key); +static my_bool _ma_ck_write_btree(register MARIA_HA *info, MARIA_KEY *key); +static my_bool _ma_ck_write_btree_with_log(MARIA_HA *, MARIA_KEY *, my_off_t *, + uint32); +static my_bool _ma_log_split(MARIA_PAGE *page, uint org_length, + uint new_length, + const uchar *key_pos, + uint key_length, int move_length, + enum en_key_op prefix_or_suffix, + const uchar *data, uint data_length, + uint changed_length); +static my_bool _ma_log_del_prefix(MARIA_PAGE *page, + uint org_length, uint new_length, + const uchar *key_pos, uint key_length, + int move_length); +static my_bool _ma_log_key_middle(MARIA_PAGE *page, + uint new_length, + uint data_added_first, + uint data_changed_first, + uint data_deleted_last, + const uchar *key_pos, + uint key_length, int move_length); + +/* + @brief Default handler for returing position to new row + + @note + This is only called for non transactional tables and not for block format + which is why we use info->state here. +*/ + +MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info, + const uchar *record + __attribute__((unused))) +{ + return ((info->s->state.dellink != HA_OFFSET_ERROR && + !info->append_insert_at_end) ? + info->s->state.dellink : + info->state->data_file_length); +} + +my_bool _ma_write_abort_default(MARIA_HA *info __attribute__((unused))) +{ + return 0; +} + + +/* Write new record to a table */ + +int maria_write(MARIA_HA *info, const uchar *record) +{ + MARIA_SHARE *share= info->s; + uint i; + int save_errno; + MARIA_RECORD_POS filepos, oldpos= info->cur_row.lastpos; + uchar *buff; + my_bool lock_tree= share->lock_key_trees; + my_bool fatal_error; + MARIA_KEYDEF *keyinfo; + DBUG_ENTER("maria_write"); + DBUG_PRINT("enter",("index_file: %d data_file: %d", + share->kfile.file, info->dfile.file)); + + DBUG_EXECUTE_IF("maria_pretend_crashed_table_on_usage", + _ma_print_error(info, HA_ERR_CRASHED, 0); + DBUG_RETURN(my_errno= HA_ERR_CRASHED);); + if (share->options & HA_OPTION_READ_ONLY_DATA) + { + DBUG_RETURN(my_errno=EACCES); + } + if (_ma_readinfo(info,F_WRLCK,1)) + DBUG_RETURN(my_errno); + + if ((share->state.changed & STATE_DATA_FILE_FULL) || + (share->base.reloc == (ha_rows) 1 && + share->base.records == (ha_rows) 1 && + share->state.state.records == (ha_rows) 1)) + { /* System file */ + my_errno=HA_ERR_RECORD_FILE_FULL; + goto err2; + } + if (share->state.state.key_file_length >= share->base.margin_key_file_length) + { + my_errno=HA_ERR_INDEX_FILE_FULL; + goto err2; + } + if (_ma_mark_file_changed(share)) + goto err2; + + /* Calculate and check all unique constraints */ + + if (share->state.header.uniques) + { + for (i=0 ; i < share->state.header.uniques ; i++) + { + MARIA_UNIQUEDEF *def= share->uniqueinfo + i; + ha_checksum unique_hash= _ma_unique_hash(share->uniqueinfo+i,record); + if (maria_is_key_active(share->state.key_map, def->key)) + { + if (_ma_check_unique(info, def, record, + unique_hash, HA_OFFSET_ERROR)) + goto err2; + } + else + maria_unique_store(record+ share->keyinfo[def->key].seg->start, + unique_hash); + } + } + + /* Ensure we don't try to restore auto_increment if it doesn't change */ + info->last_auto_increment= ~(ulonglong) 0; + + if ((info->opt_flag & OPT_NO_ROWS)) + filepos= HA_OFFSET_ERROR; + else + { + /* + This may either calculate a record or, or write the record and return + the record id + */ + if ((filepos= (*share->write_record_init)(info, record)) == + HA_OFFSET_ERROR) + goto err2; + } + + /* Write all keys to indextree */ + buff= info->lastkey_buff2; + for (i=0, keyinfo= share->keyinfo ; i < share->base.keys ; i++, keyinfo++) + { + MARIA_KEY int_key; + if (maria_is_key_active(share->state.key_map, i)) + { + my_bool local_lock_tree= (lock_tree && + !(info->bulk_insert && + is_tree_inited(&info->bulk_insert[i]))); + if (local_lock_tree) + { + mysql_rwlock_wrlock(&keyinfo->root_lock); + keyinfo->version++; + } + if (keyinfo->flag & HA_FULLTEXT ) + { + if (_ma_ft_add(info,i, buff,record,filepos)) + { + if (local_lock_tree) + mysql_rwlock_unlock(&keyinfo->root_lock); + DBUG_PRINT("error",("Got error: %d on write",my_errno)); + goto err; + } + } + else + { + while (keyinfo->ck_insert(info, + (*keyinfo->make_key)(info, &int_key, i, + buff, record, filepos, + info->trn->trid))) + { + TRN *blocker; + DBUG_PRINT("error",("Got error: %d on write",my_errno)); + /* + explicit check to filter out temp tables, they aren't + transactional and don't have a proper TRN so the code + below doesn't work for them. + Also, filter out non-thread maria use, and table modified in + the same transaction. + At last, filter out non-dup-unique errors. + */ + if (!local_lock_tree) + goto err; + if (info->dup_key_trid == info->trn->trid || + my_errno != HA_ERR_FOUND_DUPP_KEY) + { + mysql_rwlock_unlock(&keyinfo->root_lock); + goto err; + } + /* Different TrIDs: table must be transactional */ + DBUG_ASSERT(share->base.born_transactional); + /* + If transactions are disabled, and dup_key_trid is different from + our TrID, it must be ALTER TABLE with dup_key_trid==0 (no + transaction). ALTER TABLE does have MARIA_HA::TRN not dummy but + puts TrID=0 in rows/keys. + */ + DBUG_ASSERT(share->now_transactional || + (info->dup_key_trid == 0)); + blocker= trnman_trid_to_trn(info->trn, info->dup_key_trid); + /* + if blocker TRN was not found, it means that the conflicting + transaction was committed long time ago. It could not be + aborted, as it would have to wait on the key tree lock + to remove the conflicting key it has inserted. + */ + if (!blocker || blocker->commit_trid != ~(TrID)0) + { /* committed */ + if (blocker) + mysql_mutex_unlock(& blocker->state_lock); + mysql_rwlock_unlock(&keyinfo->root_lock); + goto err; + } + mysql_rwlock_unlock(&keyinfo->root_lock); + { + /* running. now we wait */ + WT_RESOURCE_ID rc; + int res; + PSI_stage_info old_stage_info; + + rc.type= &ma_rc_dup_unique; + /* TODO savepoint id when we'll have them */ + rc.value= (intptr)blocker; + res= wt_thd_will_wait_for(info->trn->wt, blocker->wt, & rc); + if (res != WT_OK) + { + mysql_mutex_unlock(& blocker->state_lock); + my_errno= HA_ERR_LOCK_DEADLOCK; + goto err; + } + proc_info_hook(0, &stage_waiting_for_a_resource, &old_stage_info, + __func__, __FILE__, __LINE__); + res= wt_thd_cond_timedwait(info->trn->wt, & blocker->state_lock); + proc_info_hook(0, &old_stage_info, 0, __func__, __FILE__, __LINE__); + + mysql_mutex_unlock(& blocker->state_lock); + if (res != WT_OK) + { + my_errno= res == WT_TIMEOUT ? HA_ERR_LOCK_WAIT_TIMEOUT + : HA_ERR_LOCK_DEADLOCK; + goto err; + } + } + mysql_rwlock_wrlock(&keyinfo->root_lock); +#ifndef MARIA_CANNOT_ROLLBACK + keyinfo->version++; +#endif + } + } + + /* The above changed info->lastkey2. Inform maria_rnext_same(). */ + info->update&= ~HA_STATE_RNEXT_SAME; + + if (local_lock_tree) + mysql_rwlock_unlock(&keyinfo->root_lock); + } + } + if (share->calc_write_checksum) + info->cur_row.checksum= (*share->calc_write_checksum)(info,record); + if (filepos != HA_OFFSET_ERROR) + { + if ((*share->write_record)(info,record)) + goto err; + info->state->checksum+= info->cur_row.checksum; + } + if (!share->now_transactional) + { + if (share->base.auto_key != 0) + { + const HA_KEYSEG *keyseg= share->keyinfo[share->base.auto_key-1].seg; + const uchar *key= record + keyseg->start; + set_if_bigger(share->state.auto_increment, + ma_retrieve_auto_increment(key, keyseg->type)); + } + } + info->state->records++; + info->update= (HA_STATE_CHANGED | HA_STATE_AKTIV | HA_STATE_WRITTEN | + HA_STATE_ROW_CHANGED); + info->row_changes++; + share->state.changed|= STATE_NOT_MOVABLE | STATE_NOT_ZEROFILLED; + info->state->changed= 1; + + info->cur_row.lastpos= oldpos; + _ma_writeinfo(info, WRITEINFO_UPDATE_KEYFILE); + if (info->invalidator != 0) + { + DBUG_PRINT("info", ("invalidator... '%s' (update)", + share->open_file_name.str)); + (*info->invalidator)(share->open_file_name.str); + info->invalidator=0; + } + + /* + Update status of the table. We need to do so after each row write + for the log tables, as we want the new row to become visible to + other threads as soon as possible. We don't lock mutex here + (as it is required by pthread memory visibility rules) as (1) it's + not critical to use outdated share->is_log_table value (2) locking + mutex here for every write is too expensive. + */ + if (share->is_log_table) + _ma_update_status((void*) info); + + DBUG_RETURN(0); + +err: + save_errno= my_errno; + fatal_error= 0; + if (my_errno == HA_ERR_FOUND_DUPP_KEY || + my_errno == HA_ERR_RECORD_FILE_FULL || + my_errno == HA_ERR_LOCK_DEADLOCK || + my_errno == HA_ERR_LOCK_WAIT_TIMEOUT || + my_errno == HA_ERR_NULL_IN_SPATIAL || + my_errno == HA_ERR_OUT_OF_MEM) + { + info->errkey= i < share->base.keys ? (int) i : -1; + /* + We delete keys in the reverse order of insertion. This is the order that + a rollback would do and is important for CLR_ENDs generated by + _ma_ft|ck_delete() and write_record_abort() to work (with any other + order they would cause wrong jumps in the chain). + */ + while ( i-- > 0) + { + if (maria_is_key_active(share->state.key_map, i)) + { + my_bool local_lock_tree= (lock_tree && + !(info->bulk_insert && + is_tree_inited(&info->bulk_insert[i]))); + keyinfo= share->keyinfo + i; + if (local_lock_tree) + mysql_rwlock_wrlock(&keyinfo->root_lock); + /** + @todo RECOVERY BUG + The key deletes below should generate CLR_ENDs + */ + if (keyinfo->flag & HA_FULLTEXT) + { + if (_ma_ft_del(info,i,buff,record,filepos)) + { + fatal_error= 1; + if (local_lock_tree) + mysql_rwlock_unlock(&keyinfo->root_lock); + break; + } + } + else + { + MARIA_KEY key; + if (keyinfo->ck_delete(info, + (*keyinfo->make_key)(info, &key, i, buff, + record, + filepos, + info->trn->trid))) + { + fatal_error= 1; + if (local_lock_tree) + mysql_rwlock_unlock(&keyinfo->root_lock); + break; + } + } + if (local_lock_tree) + mysql_rwlock_unlock(&keyinfo->root_lock); + } + } + } + else + fatal_error= 1; + + if (filepos != HA_OFFSET_ERROR) + { + if ((*share->write_record_abort)(info)) + fatal_error= 1; + } + + if (info->bulk_insert) + { + uint j; + for (j=0 ; j < share->base.keys ; j++) + maria_flush_bulk_insert(info, j); + } + + if (fatal_error) + _ma_set_fatal_error(info, HA_ERR_CRASHED); + + info->update= (HA_STATE_CHANGED | HA_STATE_WRITTEN | HA_STATE_ROW_CHANGED); + my_errno=save_errno; +err2: + save_errno=my_errno; + DBUG_ASSERT(save_errno); + if (!save_errno) + save_errno= HA_ERR_INTERNAL_ERROR; /* Should never happen */ + DBUG_PRINT("error", ("got error: %d", save_errno)); + _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE); + DBUG_RETURN(my_errno=save_errno); +} /* maria_write */ + + +/* + Write one key to btree + + TODO + Remove this function and have bulk insert change keyinfo->ck_insert + to point to the right function +*/ + +my_bool _ma_ck_write(MARIA_HA *info, MARIA_KEY *key) +{ + DBUG_ENTER("_ma_ck_write"); + + if (info->bulk_insert && + is_tree_inited(&info->bulk_insert[key->keyinfo->key_nr])) + { + DBUG_RETURN(_ma_ck_write_tree(info, key)); + } + DBUG_RETURN(_ma_ck_write_btree(info, key)); +} /* _ma_ck_write */ + + +/********************************************************************** + Insert key into btree (normal case) +**********************************************************************/ + +static my_bool _ma_ck_write_btree(MARIA_HA *info, MARIA_KEY *key) +{ + my_bool error; + MARIA_KEYDEF *keyinfo= key->keyinfo; + my_off_t *root= &info->s->state.key_root[keyinfo->key_nr]; + DBUG_ENTER("_ma_ck_write_btree"); + + error= _ma_ck_write_btree_with_log(info, key, root, + keyinfo->write_comp_flag | key->flag); + if (info->ft1_to_ft2) + { + if (!error) + error= _ma_ft_convert_to_ft2(info, key); + delete_dynamic(info->ft1_to_ft2); + my_free(info->ft1_to_ft2); + info->ft1_to_ft2=0; + } + DBUG_RETURN(error); +} /* _ma_ck_write_btree */ + + +/** + @brief Write a key to the b-tree + + @retval 1 error + @retval 0 ok +*/ + +static my_bool _ma_ck_write_btree_with_log(MARIA_HA *info, MARIA_KEY *key, + my_off_t *root, uint32 comp_flag) +{ + MARIA_SHARE *share= info->s; + LSN lsn= LSN_IMPOSSIBLE; + int error; + my_off_t new_root= *root; + uchar key_buff[MARIA_MAX_KEY_BUFF]; + MARIA_KEY org_key; /* Set/used when now_transactional=TRUE */ + my_bool transactional= share->now_transactional; + DBUG_ENTER("_ma_ck_write_btree_with_log"); + + LINT_INIT_STRUCT(org_key); + + if (transactional) + { + /* Save original value as the key may change */ + org_key= *key; + memcpy(key_buff, key->data, key->data_length + key->ref_length); + } + + error= _ma_ck_real_write_btree(info, key, &new_root, comp_flag); + if (!error && transactional) + { + /* Log the original value */ + *key= org_key; + key->data= key_buff; + error= _ma_write_undo_key_insert(info, key, root, new_root, &lsn); + } + else + { + *root= new_root; + _ma_fast_unlock_key_del(info); + } + _ma_unpin_all_pages_and_finalize_row(info, lsn); + + DBUG_RETURN(error != 0); +} /* _ma_ck_write_btree_with_log */ + + +/** + @brief Write a key to the b-tree + + @retval 1 error + @retval 0 ok +*/ + +my_bool _ma_ck_real_write_btree(MARIA_HA *info, MARIA_KEY *key, my_off_t *root, + uint32 comp_flag) +{ + int error; + DBUG_ENTER("_ma_ck_real_write_btree"); + + /* key_length parameter is used only if comp_flag is SEARCH_FIND */ + if (*root == HA_OFFSET_ERROR || + (error= w_search(info, comp_flag, key, *root, (MARIA_PAGE *) 0, + (uchar*) 0, 1)) > 0) + error= _ma_enlarge_root(info, key, root); + DBUG_RETURN(error != 0); +} /* _ma_ck_real_write_btree */ + + +/** + @brief Make a new root with key as only pointer + + @retval 1 error + @retval 0 ok +*/ + +my_bool _ma_enlarge_root(MARIA_HA *info, MARIA_KEY *key, my_off_t *root) +{ + uint t_length, nod_flag; + MARIA_KEY_PARAM s_temp; + MARIA_SHARE *share= info->s; + MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link; + MARIA_KEYDEF *keyinfo= key->keyinfo; + MARIA_PAGE page; + my_bool res= 0; + DBUG_ENTER("_ma_enlarge_root"); + + page.info= info; + page.keyinfo= keyinfo; + page.buff= info->buff; + page.flag= 0; + + nod_flag= (*root != HA_OFFSET_ERROR) ? share->base.key_reflength : 0; + /* Store pointer to prev page if nod */ + _ma_kpointer(info, page.buff + share->keypage_header, *root); + t_length= (*keyinfo->pack_key)(key, nod_flag, (uchar*) 0, + (uchar*) 0, (uchar*) 0, &s_temp); + page.size= share->keypage_header + t_length + nod_flag; + + bzero(page.buff, share->keypage_header); + _ma_store_keynr(share, page.buff, keyinfo->key_nr); + if (nod_flag) + page.flag|= KEYPAGE_FLAG_ISNOD; + if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID)) + page.flag|= KEYPAGE_FLAG_HAS_TRANSID; + (*keyinfo->store_key)(keyinfo, page.buff + share->keypage_header + + nod_flag, &s_temp); + + /* Mark that info->buff was used */ + info->keyread_buff_used= info->page_changed= 1; + if ((page.pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) == + HA_OFFSET_ERROR) + DBUG_RETURN(1); + *root= page.pos; + + page_store_info(share, &page); + + /* + Clear unitialized part of page to avoid valgrind/purify warnings + and to get a clean page that is easier to compress and compare with + pages generated with redo + */ + bzero(page.buff + page.size, share->block_size - page.size); + + if (share->now_transactional && _ma_log_new(&page, 1)) + res= 1; + + if (_ma_write_keypage(&page, page_link->write_lock, + PAGECACHE_PRIORITY_HIGH)) + res= 1; + + DBUG_RETURN(res); +} /* _ma_enlarge_root */ + + +/* + Search after a position for a key and store it there + + TODO: + Change this to use pagecache directly instead of creating a copy + of the page. To do this, we must however change write-key-on-page + algorithm to not overwrite the buffer but instead store any overflow + key in a separate buffer. + + @return + @retval -1 error + @retval 0 ok + @retval > 0 Key should be stored in higher tree +*/ + +static int w_search(register MARIA_HA *info, uint32 comp_flag, MARIA_KEY *key, + my_off_t page_pos, + MARIA_PAGE *father_page, uchar *father_keypos, + my_bool insert_last) +{ + int error,flag; + uchar *temp_buff,*keypos,*keybuff; + my_bool was_last_key, buff_alloced; + my_off_t next_page, dup_key_pos; + MARIA_SHARE *share= info->s; + MARIA_KEYDEF *keyinfo= key->keyinfo; + MARIA_PAGE page; + DBUG_ENTER("w_search"); + DBUG_PRINT("enter", ("page: %lu", (ulong) (page_pos/keyinfo->block_length))); + + alloc_on_stack(*info->stack_end_ptr, temp_buff, buff_alloced, + (keyinfo->block_length + keyinfo->max_store_length*3)); + if (!temp_buff) + DBUG_RETURN(1); + + keybuff= temp_buff + (keyinfo->block_length + keyinfo->max_store_length*2); + + if (_ma_fetch_keypage(&page, info, keyinfo, page_pos, PAGECACHE_LOCK_WRITE, + DFLT_INIT_HITS, temp_buff, 0)) + goto err; + + flag= (*keyinfo->bin_search)(key, &page, comp_flag, &keypos, + keybuff, &was_last_key); + if (flag == 0) + { + MARIA_KEY tmp_key; + /* get position to record with duplicated key */ + + tmp_key.keyinfo= keyinfo; + tmp_key.data= keybuff; + + if ((*keyinfo->get_key)(&tmp_key, page.flag, page.node, &keypos)) + dup_key_pos= _ma_row_pos_from_key(&tmp_key); + else + dup_key_pos= HA_OFFSET_ERROR; + + if (keyinfo->flag & HA_FULLTEXT) + { + uint off; + int subkeys; + + get_key_full_length_rdonly(off, keybuff); + subkeys=ft_sintXkorr(keybuff+off); + comp_flag=SEARCH_SAME; + if (subkeys >= 0) + { + /* normal word, one-level tree structure */ + flag=(*keyinfo->bin_search)(key, &page, comp_flag, + &keypos, keybuff, &was_last_key); + } + else + { + /* popular word. two-level tree. going down */ + my_off_t root= dup_key_pos; + MARIA_KEY subkey; + get_key_full_length_rdonly(off, key->data); + subkey.keyinfo= keyinfo= &share->ft2_keyinfo; + subkey.data= key->data + off; + subkey.data_length= key->data_length - off; + subkey.ref_length= key->ref_length; + subkey.flag= key->flag; + + /* we'll modify key entry 'in vivo' */ + keypos-= keyinfo->keylength + page.node; + error= _ma_ck_real_write_btree(info, &subkey, &root, comp_flag); + _ma_dpointer(share, keypos+HA_FT_WLEN, root); + subkeys--; /* should there be underflow protection ? */ + DBUG_ASSERT(subkeys < 0); + ft_intXstore(keypos, subkeys); + if (!error) + { + page_mark_changed(info, &page); + if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED, + DFLT_INIT_HITS)) + goto err; + } + stack_alloc_free(temp_buff, buff_alloced); + DBUG_RETURN(error); + } + } + else /* not HA_FULLTEXT, normal HA_NOSAME key */ + { + /* + TODO + When the index will support true versioning - with multiple + identical values in the UNIQUE index, invisible to each other - + the following should be changed to "continue inserting keys, at the + end (of the row or statement) wait". We need to wait on *all* + unique conflicts at once, not one-at-a-time, because we need to + know all blockers in advance, otherwise we'll have incomplete wait-for + graph. + */ + /* + transaction that has inserted the conflicting key may be in progress. + the caller will wait for it to be committed or aborted. + */ + info->dup_key_trid= _ma_trid_from_key(&tmp_key); + info->dup_key_pos= dup_key_pos; + my_errno= HA_ERR_FOUND_DUPP_KEY; + DBUG_PRINT("warning", + ("Duplicate key. dup_key_trid: %lu pos %lu visible: %d", + (ulong) info->dup_key_trid, + (ulong) info->dup_key_pos, + info->trn ? trnman_can_read_from(info->trn, + info->dup_key_trid) : 2)); + goto err; + } + } + if (flag == MARIA_FOUND_WRONG_KEY) + { + my_errno= HA_ERR_CRASHED; + goto err; + } + if (!was_last_key) + insert_last=0; + next_page= _ma_kpos(page.node, keypos); + if (next_page == HA_OFFSET_ERROR || + (error= w_search(info, comp_flag, key, next_page, + &page, keypos, insert_last)) > 0) + { + error= _ma_insert(info, key, &page, keypos, keybuff, + father_page, father_keypos, insert_last); + if (error < 0) + goto err; + page_mark_changed(info, &page); + if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_WRITELOCKED, + DFLT_INIT_HITS)) + goto err; + } + stack_alloc_free(temp_buff, buff_alloced); + DBUG_RETURN(error); +err: + stack_alloc_free(temp_buff, buff_alloced); + DBUG_PRINT("exit",("Error: %d",my_errno)); + DBUG_RETURN(-1); +} /* w_search */ + + +/* + Insert new key. + + SYNOPSIS + _ma_insert() + info Open table information. + keyinfo Key definition information. + key New key + anc_page Key page (beginning) + key_pos Position in key page where to insert. + key_buff Copy of previous key if keys where packed. + father_page position of parent key page in file. + father_key_pos position in parent key page for balancing. + insert_last If to append at end of page. + + DESCRIPTION + Insert new key at right of key_pos. + Note that caller must save anc_buff + + This function writes log records for all changed pages + (Including anc_buff and father page) + + RETURN + < 0 Error. + 0 OK + 1 If key contains key to upper level (from balance page) + 2 If key contains key to upper level (from split space) +*/ + +int _ma_insert(register MARIA_HA *info, MARIA_KEY *key, + MARIA_PAGE *anc_page, uchar *key_pos, uchar *key_buff, + MARIA_PAGE *father_page, uchar *father_key_pos, + my_bool insert_last) +{ + uint a_length, nod_flag, org_anc_length; + int t_length; + uchar *endpos, *prev_key, *anc_buff; + MARIA_KEY_PARAM s_temp; + MARIA_SHARE *share= info->s; + MARIA_KEYDEF *keyinfo= key->keyinfo; + DBUG_ENTER("_ma_insert"); + DBUG_PRINT("enter",("key_pos:%p", key_pos)); + DBUG_EXECUTE("key", _ma_print_key(DBUG_FILE, key);); + + /* + Note that anc_page->size can be bigger then block_size in case of + delete key that caused increase of page length + */ + org_anc_length= a_length= anc_page->size; + nod_flag= anc_page->node; + + anc_buff= anc_page->buff; + endpos= anc_buff+ a_length; + prev_key= (key_pos == anc_buff + share->keypage_header + nod_flag ? + (uchar*) 0 : key_buff); + t_length= (*keyinfo->pack_key)(key, nod_flag, + (key_pos == endpos ? (uchar*) 0 : key_pos), + prev_key, prev_key, &s_temp); +#ifndef DBUG_OFF + if (prev_key && (keyinfo->flag & (HA_BINARY_PACK_KEY | HA_PACK_KEY))) + { + DBUG_DUMP("prev_key", prev_key, _ma_keylength(keyinfo,prev_key)); + } + if (keyinfo->flag & HA_PACK_KEY) + { + DBUG_PRINT("test",("t_length: %d ref_len: %d", + t_length,s_temp.ref_length)); + DBUG_PRINT("test",("n_ref_len: %d n_length: %d key_pos: %p", + s_temp.n_ref_length, s_temp.n_length, s_temp.key)); + } +#endif + if (t_length > 0) + { + if (t_length >= keyinfo->maxlength*2+MARIA_INDEX_OVERHEAD_SIZE) + { + _ma_set_fatal_error(info, HA_ERR_CRASHED); + DBUG_RETURN(-1); + } + bmove_upp(endpos+t_length, endpos, (uint) (endpos-key_pos)); + } + else + { + if (-t_length >= keyinfo->maxlength*2+MARIA_INDEX_OVERHEAD_SIZE) + { + _ma_set_fatal_error(info, HA_ERR_CRASHED); + DBUG_RETURN(-1); + } + bmove(key_pos,key_pos-t_length,(uint) (endpos-key_pos)+t_length); + } + (*keyinfo->store_key)(keyinfo,key_pos,&s_temp); + a_length+=t_length; + + if (key->flag & (SEARCH_USER_KEY_HAS_TRANSID | SEARCH_PAGE_KEY_HAS_TRANSID)) + _ma_mark_page_with_transid(share, anc_page); + + anc_page->size= a_length; + page_store_size(share, anc_page); + + /* + Check if the new key fits totally into the the page + (anc_buff is big enough to contain a full page + one key) + */ + if (a_length <= share->max_index_block_size) + { + if (share->max_index_block_size - a_length < 32 && + (keyinfo->flag & HA_FULLTEXT) && key_pos == endpos && + share->base.key_reflength <= share->rec_reflength && + share->options & (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD)) + { + /* + Normal word. One-level tree. Page is almost full. + Let's consider converting. + We'll compare 'key' and the first key at anc_buff + */ + const uchar *a= key->data; + const uchar *b= anc_buff + share->keypage_header + nod_flag; + uint alen, blen, ft2len= share->ft2_keyinfo.keylength; + /* the very first key on the page is always unpacked */ + DBUG_ASSERT((*b & 128) == 0); +#if HA_FT_MAXLEN >= 127 + blen= mi_uint2korr(b); b+=2; + When you enable this code, as part of the MyISAM->Maria merge of +ChangeSet@1.2562, 2008-04-09 07:41:40+02:00, serg@janus.mylan +9 -0 + restore ft2 functionality, fix bugs. + Then this will enable two-level fulltext index, which is not totally + recoverable yet. + So remove this text and inform Guilhem so that he fixes the issue. +#else + blen= *b++; +#endif + get_key_length(alen,a); + DBUG_ASSERT(info->ft1_to_ft2==0); + if (alen == blen && + ha_compare_char_varying(keyinfo->seg->charset, + a, alen, + b, blen, + FALSE/*b_is_prefix*/) == 0) + { + /* Yup. converting */ + info->ft1_to_ft2=(DYNAMIC_ARRAY *) + my_malloc(PSI_INSTRUMENT_ME, sizeof(DYNAMIC_ARRAY), MYF(MY_WME)); + my_init_dynamic_array(PSI_INSTRUMENT_ME, info->ft1_to_ft2, ft2len, 300, + 50, MYF(0)); + + /* + Now, adding all keys from the page to dynarray + if the page is a leaf (if not keys will be deleted later) + */ + if (!nod_flag) + { + /* + Let's leave the first key on the page, though, because + we cannot easily dispatch an empty page here + */ + b+=blen+ft2len+2; + for (a=anc_buff+a_length ; b < a ; b+=ft2len+2) + insert_dynamic(info->ft1_to_ft2, b); + + /* fixing the page's length - it contains only one key now */ + anc_page->size= share->keypage_header + blen + ft2len + 2; + page_store_size(share, anc_page); + } + /* the rest will be done when we're back from recursion */ + } + } + else + { + if (share->now_transactional && + _ma_log_add(anc_page, org_anc_length, + key_pos, s_temp.changed_length, t_length, 1, + KEY_OP_DEBUG_LOG_ADD_1)) + DBUG_RETURN(-1); + } + DBUG_RETURN(0); /* There is room on page */ + } + /* Page is full */ + if (nod_flag) + insert_last=0; + /* + TODO: + Remove 'born_transactional' here. + The only reason for having it here is that the current + _ma_balance_page_ can't handle variable length keys. + */ + if (!(keyinfo->flag & (HA_VAR_LENGTH_KEY | HA_BINARY_PACK_KEY)) && + father_page && !insert_last && !info->quick_mode && + !info->s->base.born_transactional) + { + s_temp.key_pos= key_pos; + page_mark_changed(info, father_page); + DBUG_RETURN(_ma_balance_page(info, keyinfo, key, anc_page, + father_page, father_key_pos, + &s_temp)); + } + DBUG_RETURN(_ma_split_page(info, key, anc_page, + MY_MIN(org_anc_length, + info->s->max_index_block_size), + key_pos, s_temp.changed_length, t_length, + key_buff, insert_last)); +} /* _ma_insert */ + + +/** + @brief split a full page in two and assign emerging item to key + + @fn _ma_split_page() + info Maria handler + keyinfo Key handler + key Buffer for middle key + split_page Page that should be split + org_split_length Original length of split_page before key was inserted + inserted_key_pos Address in buffer where key was inserted + changed_length Number of bytes changed at 'inserted_key_pos' + move_length Number of bytes buffer was moved when key was inserted + key_buff Key buffer to use for temporary storage of key + insert_last_key If we are insert key on rightmost key page + + @note + split_buff is not stored on disk (caller has to do this) + + @return + @retval 2 ok (Middle key up from _ma_insert()) + @retval -1 error +*/ + +int _ma_split_page(MARIA_HA *info, MARIA_KEY *key, MARIA_PAGE *split_page, + uint org_split_length, + uchar *inserted_key_pos, uint changed_length, + int move_length, + uchar *key_buff, my_bool insert_last_key) +{ + uint keynr; + uint length,a_length,key_ref_length,t_length,nod_flag,key_length; + uint page_length, split_length, page_flag; + uchar *key_pos, *pos, *UNINIT_VAR(after_key); + MARIA_KEY_PARAM s_temp; + MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link; + MARIA_SHARE *share= info->s; + MARIA_KEYDEF *keyinfo= key->keyinfo; + MARIA_KEY tmp_key; + MARIA_PAGE new_page; + int res; + DBUG_ENTER("_ma_split_page"); + + DBUG_DUMP("buff", split_page->buff, split_page->size); + + info->page_changed=1; /* Info->buff is used */ + info->keyread_buff_used=1; + page_flag= split_page->flag; + nod_flag= split_page->node; + key_ref_length= share->keypage_header + nod_flag; + + new_page.info= info; + new_page.buff= info->buff; + new_page.keyinfo= keyinfo; + + tmp_key.data= key_buff; + tmp_key.keyinfo= keyinfo; + if (insert_last_key) + key_pos= _ma_find_last_pos(&tmp_key, split_page, &after_key); + else + key_pos= _ma_find_half_pos(&tmp_key, split_page, &after_key); + if (!key_pos) + DBUG_RETURN(-1); + + key_length= tmp_key.data_length + tmp_key.ref_length; + split_length= (uint) (key_pos - split_page->buff); + a_length= split_page->size; + split_page->size= split_length; + page_store_size(share, split_page); + + key_pos=after_key; + if (nod_flag) + { + DBUG_PRINT("test",("Splitting nod")); + pos=key_pos-nod_flag; + memcpy(new_page.buff + share->keypage_header, pos, (size_t) nod_flag); + } + + /* Move middle item to key and pointer to new page */ + if ((new_page.pos= _ma_new(info, PAGECACHE_PRIORITY_HIGH, &page_link)) == + HA_OFFSET_ERROR) + DBUG_RETURN(-1); + + _ma_copy_key(key, &tmp_key); + _ma_kpointer(info, key->data + key_length, new_page.pos); + + /* Store new page */ + if (!(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &key_pos)) + DBUG_RETURN(-1); + + t_length=(*keyinfo->pack_key)(&tmp_key, nod_flag, (uchar *) 0, + (uchar*) 0, (uchar*) 0, &s_temp); + length=(uint) ((split_page->buff + a_length) - key_pos); + memcpy(new_page.buff + key_ref_length + t_length, key_pos, + (size_t) length); + (*keyinfo->store_key)(keyinfo,new_page.buff+key_ref_length,&s_temp); + page_length= length + t_length + key_ref_length; + + bzero(new_page.buff, share->keypage_header); + /* Copy KEYFLAG_FLAG_ISNODE and KEYPAGE_FLAG_HAS_TRANSID from parent page */ + new_page.flag= page_flag; + new_page.size= page_length; + page_store_info(share, &new_page); + + /* Copy key number */ + keynr= _ma_get_keynr(share, split_page->buff); + _ma_store_keynr(share, new_page.buff, keynr); + + res= 2; /* Middle key up */ + if (share->now_transactional && _ma_log_new(&new_page, 0)) + res= -1; + + /* + Clear unitialized part of page to avoid valgrind/purify warnings + and to get a clean page that is easier to compress and compare with + pages generated with redo + */ + bzero(new_page.buff + page_length, share->block_size - page_length); + + if (_ma_write_keypage(&new_page, page_link->write_lock, + DFLT_INIT_HITS)) + res= -1; + + /* Save changes to split pages */ + if (share->now_transactional && + _ma_log_split(split_page, org_split_length, split_length, + inserted_key_pos, changed_length, move_length, + KEY_OP_NONE, (uchar*) 0, 0, 0)) + res= -1; + + DBUG_DUMP_KEY("middle_key", key); + DBUG_RETURN(res); +} /* _ma_split_page */ + + +/* + Calculate how to much to move to split a page in two + + Returns pointer to start of key. + key will contain the key. + after_key will contain the position to where the next key starts +*/ + +uchar *_ma_find_half_pos(MARIA_KEY *key, MARIA_PAGE *ma_page, + uchar **after_key) +{ + uint keys, length, key_ref_length, page_flag, nod_flag; + uchar *page, *end, *lastpos; + MARIA_HA *info= ma_page->info; + MARIA_SHARE *share= info->s; + MARIA_KEYDEF *keyinfo= key->keyinfo; + DBUG_ENTER("_ma_find_half_pos"); + + nod_flag= ma_page->node; + key_ref_length= share->keypage_header + nod_flag; + page_flag= ma_page->flag; + length= ma_page->size - key_ref_length; + page= ma_page->buff+ key_ref_length; /* Point to first key */ + + if (!(keyinfo->flag & + (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY | + HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID)) + { + key_ref_length= keyinfo->keylength+nod_flag; + key->data_length= keyinfo->keylength - info->s->rec_reflength; + key->ref_length= info->s->rec_reflength; + key->flag= 0; + keys=length/(key_ref_length*2); + end=page+keys*key_ref_length; + *after_key=end+key_ref_length; + memcpy(key->data, end, key_ref_length); + DBUG_RETURN(end); + } + + end=page+length/2-key_ref_length; /* This is aprox. half */ + key->data[0]= 0; /* Safety */ + do + { + lastpos=page; + if (!(length= (*keyinfo->get_key)(key, page_flag, nod_flag, &page))) + DBUG_RETURN(0); + } while (page < end); + *after_key= page; + DBUG_PRINT("exit",("returns: %p page: %p half: %p", + lastpos, page, end)); + DBUG_RETURN(lastpos); +} /* _ma_find_half_pos */ + + +/** + Find second to last key on leaf page + + @notes + Used to split buffer at last key. In this case the next to last + key will be moved to parent page and last key will be on it's own page. + + @TODO + Add one argument for 'last key value' to get_key so that one can + do the loop without having to copy the found key the whole time + + @return + @retval Pointer to the start of the key before the last key + @retval int_key will contain the last key +*/ + +static uchar *_ma_find_last_pos(MARIA_KEY *int_key, MARIA_PAGE *ma_page, + uchar **after_key) +{ + uint keys, length, key_ref_length, page_flag; + uchar *page, *end, *lastpos, *prevpos; + uchar key_buff[MARIA_MAX_KEY_BUFF]; + MARIA_HA *info= ma_page->info; + MARIA_SHARE *share= info->s; + MARIA_KEYDEF *keyinfo= int_key->keyinfo; + MARIA_KEY tmp_key; + DBUG_ENTER("_ma_find_last_pos"); + + key_ref_length= share->keypage_header; + page_flag= ma_page->flag; + length= ma_page->size - key_ref_length; + page= ma_page->buff + key_ref_length; + + if (!(keyinfo->flag & + (HA_PACK_KEY | HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY | + HA_BINARY_PACK_KEY)) && !(page_flag & KEYPAGE_FLAG_HAS_TRANSID)) + { + keys= length / keyinfo->keylength - 2; + length= keyinfo->keylength; + int_key->data_length= length - info->s->rec_reflength; + int_key->ref_length= info->s->rec_reflength; + int_key->flag= 0; + end=page+keys*length; + *after_key=end+length; + memcpy(int_key->data, end, length); + DBUG_RETURN(end); + } + + end=page+length-key_ref_length; + lastpos=page; + tmp_key.data= key_buff; + tmp_key.keyinfo= int_key->keyinfo; + key_buff[0]= 0; /* Safety */ + + /* We know that there are at least 2 keys on the page */ + + if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page))) + { + _ma_set_fatal_error(info, HA_ERR_CRASHED); + DBUG_RETURN(0); + } + + do + { + prevpos=lastpos; lastpos=page; + int_key->data_length= tmp_key.data_length; + int_key->ref_length= tmp_key.ref_length; + int_key->flag= tmp_key.flag; + memcpy(int_key->data, key_buff, length); /* previous key */ + if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page))) + { + _ma_set_fatal_error(info, HA_ERR_CRASHED); + DBUG_RETURN(0); + } + } while (page < end); + + *after_key=lastpos; + DBUG_PRINT("exit",("returns: %p page: %p end: %p", + prevpos,page,end)); + DBUG_RETURN(prevpos); +} /* _ma_find_last_pos */ + + +/** + @brief Balance page with static size keys with page on right/left + + @param key Middle key will be stored here + + @notes + Father_buff will always be changed + Caller must handle saving of curr_buff + + @return + @retval 0 Balance was done (father buff is saved) + @retval 1 Middle key up (father buff is not saved) + @retval -1 Error +*/ + +static int _ma_balance_page(MARIA_HA *info, MARIA_KEYDEF *keyinfo, + MARIA_KEY *key, MARIA_PAGE *curr_page, + MARIA_PAGE *father_page, + uchar *father_key_pos, MARIA_KEY_PARAM *s_temp) +{ + MARIA_PINNED_PAGE tmp_page_link, *new_page_link= &tmp_page_link; + MARIA_SHARE *share= info->s; + my_bool right, buff_alloced; + uint k_length,father_length,father_keylength,nod_flag,curr_keylength; + uint right_length,left_length,new_right_length,new_left_length,extra_length; + uint keys, tmp_length, extra_buff_length; + uchar *pos, *extra_buff, *parting_key; + uchar *tmp_part_key; + MARIA_PAGE next_page, extra_page, *left_page, *right_page; + DBUG_ENTER("_ma_balance_page"); + + alloc_on_stack(*info->stack_end_ptr, tmp_part_key, buff_alloced, + keyinfo->max_store_length); + if (!tmp_part_key) + DBUG_RETURN(-1); + + k_length= keyinfo->keylength; + father_length= father_page->size; + father_keylength= k_length + share->base.key_reflength; + nod_flag= curr_page->node; + curr_keylength= k_length+nod_flag; + info->page_changed=1; + + if ((father_key_pos != father_page->buff+father_length && + (info->state->records & 1)) || + father_key_pos == father_page->buff+ share->keypage_header + + share->base.key_reflength) + { + right=1; + next_page.pos= _ma_kpos(share->base.key_reflength, + father_key_pos+father_keylength); + left_page= curr_page; + right_page= &next_page; + DBUG_PRINT("info", ("use right page: %lu", + (ulong) (next_page.pos / keyinfo->block_length))); + } + else + { + right=0; + father_key_pos-=father_keylength; + next_page.pos= _ma_kpos(share->base.key_reflength,father_key_pos); + left_page= &next_page; + right_page= curr_page; + DBUG_PRINT("info", ("use left page: %lu", + (ulong) (next_page.pos / keyinfo->block_length))); + } /* father_key_pos ptr to parting key */ + + if (_ma_fetch_keypage(&next_page, info, keyinfo, next_page.pos, + PAGECACHE_LOCK_WRITE, + DFLT_INIT_HITS, info->buff, 0)) + goto err; + page_mark_changed(info, &next_page); + DBUG_DUMP("next", next_page.buff, next_page.size); + + /* Test if there is room to share keys */ + left_length= left_page->size; + right_length= right_page->size; + keys= ((left_length+right_length-share->keypage_header*2-nod_flag*2)/ + curr_keylength); + + if ((right ? right_length : left_length) + curr_keylength <= + share->max_index_block_size) + { + /* Enough space to hold all keys in the two buffers ; Balance bufferts */ + new_left_length= share->keypage_header+nod_flag+(keys/2)*curr_keylength; + new_right_length=share->keypage_header+nod_flag+(((keys+1)/2)* + curr_keylength); + left_page->size= new_left_length; + page_store_size(share, left_page); + right_page->size= new_right_length; + page_store_size(share, right_page); + + DBUG_PRINT("info", ("left_length: %u -> %u right_length: %u -> %u", + left_length, new_left_length, + right_length, new_right_length)); + if (left_length < new_left_length) + { + uint length; + DBUG_PRINT("info", ("move keys to end of buff")); + + /* Move keys right_page -> left_page */ + pos= left_page->buff+left_length; + memcpy(pos,father_key_pos, (size_t) k_length); + memcpy(pos+k_length, right_page->buff + share->keypage_header, + (size_t) (length=new_left_length - left_length - k_length)); + pos= right_page->buff + share->keypage_header + length; + memcpy(father_key_pos, pos, (size_t) k_length); + bmove(right_page->buff + share->keypage_header, + pos + k_length, new_right_length - share->keypage_header); + + if (share->now_transactional) + { + if (right) + { + /* + Log changes to page on left + The original page is on the left and stored in left_page->buff + We have on the page the newly inserted key and data + from buff added last on the page + */ + if (_ma_log_split(curr_page, + left_length - s_temp->move_length, + new_left_length, + s_temp->key_pos, s_temp->changed_length, + s_temp->move_length, + KEY_OP_ADD_SUFFIX, + curr_page->buff + left_length, + new_left_length - left_length, + new_left_length - left_length+ k_length)) + goto err; + /* + Log changes to page on right + This contains the original data with some keys deleted from + start of page + */ + if (_ma_log_prefix(&next_page, 0, + ((int) new_right_length - (int) right_length), + KEY_OP_DEBUG_LOG_PREFIX_3)) + goto err; + } + else + { + /* + Log changes to page on right (the original page) which is in buff + Data is removed from start of page + The inserted key may be in buff or moved to curr_buff + */ + if (_ma_log_del_prefix(curr_page, + right_length - s_temp->changed_length, + new_right_length, + s_temp->key_pos, s_temp->changed_length, + s_temp->move_length)) + goto err; + /* + Log changes to page on left, which has new data added last + */ + if (_ma_log_suffix(&next_page, left_length, new_left_length)) + goto err; + } + } + } + else + { + uint length; + DBUG_PRINT("info", ("move keys to start of right_page")); + + bmove_upp(right_page->buff + new_right_length, + right_page->buff + right_length, + right_length - share->keypage_header); + length= new_right_length -right_length - k_length; + memcpy(right_page->buff + share->keypage_header + length, father_key_pos, + (size_t) k_length); + pos= left_page->buff + new_left_length; + memcpy(father_key_pos, pos, (size_t) k_length); + memcpy(right_page->buff + share->keypage_header, pos+k_length, + (size_t) length); + + if (share->now_transactional) + { + if (right) + { + /* + Log changes to page on left + The original page is on the left and stored in curr_buff + The page is shortened from end and the key may be on the page + */ + if (_ma_log_split(curr_page, + left_length - s_temp->move_length, + new_left_length, + s_temp->key_pos, s_temp->changed_length, + s_temp->move_length, + KEY_OP_NONE, (uchar*) 0, 0, 0)) + goto err; + /* + Log changes to page on right + This contains the original data, with some data from cur_buff + added first + */ + if (_ma_log_prefix(&next_page, + (uint) (new_right_length - right_length), + (int) (new_right_length - right_length), + KEY_OP_DEBUG_LOG_PREFIX_4)) + goto err; + } + else + { + /* + Log changes to page on right (the original page) which is in buff + We have on the page the newly inserted key and data + from buff added first on the page + */ + uint diff_length= new_right_length - right_length; + if (_ma_log_split(curr_page, + left_length - s_temp->move_length, + new_right_length, + s_temp->key_pos + diff_length, + s_temp->changed_length, + s_temp->move_length, + KEY_OP_ADD_PREFIX, + curr_page->buff + share->keypage_header, + diff_length, diff_length + k_length)) + goto err; + /* + Log changes to page on left, which is shortened from end + */ + if (_ma_log_suffix(&next_page, left_length, new_left_length)) + goto err; + } + } + } + + /* Log changes to father (one level up) page */ + + if (share->now_transactional && + _ma_log_change(father_page, father_key_pos, k_length, + KEY_OP_DEBUG_FATHER_CHANGED_1)) + goto err; + + /* + next_page_link->changed is marked as true above and fathers + page_link->changed is marked as true in caller + */ + if (_ma_write_keypage(&next_page, PAGECACHE_LOCK_LEFT_WRITELOCKED, + DFLT_INIT_HITS) || + _ma_write_keypage(father_page, + PAGECACHE_LOCK_LEFT_WRITELOCKED, DFLT_INIT_HITS)) + goto err; + stack_alloc_free(tmp_part_key, buff_alloced); + DBUG_RETURN(0); + } + + /* left_page and right_page are full, lets split and make new nod */ + + extra_buff= info->buff+share->base.max_key_block_length; + new_left_length= new_right_length= (share->keypage_header + nod_flag + + (keys+1) / 3 * curr_keylength); + extra_page.info= info; + extra_page.keyinfo= keyinfo; + extra_page.buff= extra_buff; + + /* + 5 is the minum number of keys we can have here. This comes from + the fact that each full page can store at least 2 keys and in this case + we have a 'split' key, ie 2+2+1 = 5 + */ + if (keys == 5) /* Too few keys to balance */ + new_left_length-=curr_keylength; + extra_length= (nod_flag + left_length + right_length - + new_left_length - new_right_length - curr_keylength); + extra_buff_length= extra_length + share->keypage_header; + DBUG_PRINT("info",("left_length: %d right_length: %d new_left_length: %d new_right_length: %d extra_length: %d", + left_length, right_length, + new_left_length, new_right_length, + extra_length)); + + left_page->size= new_left_length; + page_store_size(share, left_page); + right_page->size= new_right_length; + page_store_size(share, right_page); + + bzero(extra_buff, share->keypage_header); + extra_page.flag= nod_flag ? KEYPAGE_FLAG_ISNOD : 0; + extra_page.size= extra_buff_length; + page_store_info(share, &extra_page); + + /* Copy key number */ + _ma_store_keynr(share, extra_buff, keyinfo->key_nr); + + /* move first largest keys to new page */ + pos= right_page->buff + right_length-extra_length; + memcpy(extra_buff + share->keypage_header, pos, extra_length); + /* Zero old data from buffer */ + bzero(extra_buff + extra_buff_length, + share->block_size - extra_buff_length); + + /* Save new parting key between buff and extra_buff */ + memcpy(tmp_part_key, pos-k_length,k_length); + /* Make place for new keys */ + bmove_upp(right_page->buff + new_right_length, pos - k_length, + right_length - extra_length - k_length - share->keypage_header); + /* Copy keys from left page */ + pos= left_page->buff + new_left_length; + memcpy(right_page->buff + share->keypage_header, pos + k_length, + (size_t) (tmp_length= left_length - new_left_length - k_length)); + /* Copy old parting key */ + parting_key= right_page->buff + share->keypage_header + tmp_length; + memcpy(parting_key, father_key_pos, (size_t) k_length); + + /* Move new parting keys up to caller */ + memcpy((right ? key->data : father_key_pos),pos,(size_t) k_length); + memcpy((right ? father_key_pos : key->data),tmp_part_key, k_length); + + if ((extra_page.pos= _ma_new(info, DFLT_INIT_HITS, &new_page_link)) + == HA_OFFSET_ERROR) + goto err; + _ma_kpointer(info,key->data+k_length, extra_page.pos); + /* This is safe as long we are using not keys with transid */ + key->data_length= k_length - info->s->rec_reflength; + key->ref_length= info->s->rec_reflength; + + if (right) + { + /* + Page order according to key values: + orignal_page (curr_page = left_page), next_page (buff), extra_buff + + Move page positions so that we store data in extra_page where + next_page was and next_page will be stored at the new position + */ + swap_variables(my_off_t, extra_page.pos, next_page.pos); + } + + if (share->now_transactional) + { + if (right) + { + /* + left_page is shortened, + right_page is getting new keys at start and shortened from end. + extra_page is new page + + Note that extra_page (largest key parts) will be stored at the + place of the original 'right' page (next_page) and right page + will be stored at the new page position + + This makes the log entries smaller as right_page contains all + data to generate the data extra_buff + */ + + /* + Log changes to page on left (page shortened page at end) + */ + if (_ma_log_split(curr_page, + left_length - s_temp->move_length, new_left_length, + s_temp->key_pos, s_temp->changed_length, + s_temp->move_length, + KEY_OP_NONE, (uchar*) 0, 0, 0)) + goto err; + /* + Log changes to right page (stored at next page) + This contains the last 'extra_buff' from 'buff' + */ + if (_ma_log_prefix(&extra_page, + 0, (int) (extra_buff_length - right_length), + KEY_OP_DEBUG_LOG_PREFIX_5)) + goto err; + + /* + Log changes to middle page, which is stored at the new page + position + */ + if (_ma_log_new(&next_page, 0)) + goto err; + } + else + { + /* + Log changes to page on right (the original page) which is in buff + This contains the original data, with some data from curr_buff + added first and shortened at end + */ + int data_added_first= left_length - new_left_length; + if (_ma_log_key_middle(right_page, + new_right_length, + data_added_first, + data_added_first, + extra_length, + s_temp->key_pos, + s_temp->changed_length, + s_temp->move_length)) + goto err; + + /* Log changes to page on left, which is shortened from end */ + if (_ma_log_suffix(left_page, left_length, new_left_length)) + goto err; + + /* Log change to rightmost (new) page */ + if (_ma_log_new(&extra_page, 0)) + goto err; + } + + /* Log changes to father (one level up) page */ + if (share->now_transactional && + _ma_log_change(father_page, father_key_pos, k_length, + KEY_OP_DEBUG_FATHER_CHANGED_2)) + goto err; + } + + if (_ma_write_keypage(&next_page, + (right ? new_page_link->write_lock : + PAGECACHE_LOCK_LEFT_WRITELOCKED), + DFLT_INIT_HITS) || + _ma_write_keypage(&extra_page, + (!right ? new_page_link->write_lock : + PAGECACHE_LOCK_LEFT_WRITELOCKED), + DFLT_INIT_HITS)) + goto err; + + stack_alloc_free(tmp_part_key, buff_alloced); + DBUG_RETURN(1); /* Middle key up */ + +err: + stack_alloc_free(tmp_part_key, buff_alloced); + DBUG_RETURN(-1); +} /* _ma_balance_page */ + + +/********************************************************************** + * Bulk insert code * + **********************************************************************/ + +typedef struct { + MARIA_HA *info; + uint keynr; +} bulk_insert_param; + + +static my_bool _ma_ck_write_tree(register MARIA_HA *info, MARIA_KEY *key) +{ + my_bool error; + uint keynr= key->keyinfo->key_nr; + DBUG_ENTER("_ma_ck_write_tree"); + + /* Store ref_length as this is always constant */ + info->bulk_insert_ref_length= key->ref_length; + error= tree_insert(&info->bulk_insert[keynr], key->data, + key->data_length + key->ref_length, + info->bulk_insert[keynr].custom_arg) == 0; + DBUG_RETURN(error); +} /* _ma_ck_write_tree */ + + +/* typeof(_ma_keys_compare)=qsort_cmp2 */ + +static int keys_compare(bulk_insert_param *param, uchar *key1, uchar *key2) +{ + uint not_used[2]; + return ha_key_cmp(param->info->s->keyinfo[param->keynr].seg, + key1, key2, USE_WHOLE_KEY, SEARCH_SAME, + not_used); +} + + +static int keys_free(void* key_arg, TREE_FREE mode, void *param_arg) +{ + /* + Probably I can use info->lastkey here, but I'm not sure, + and to be safe I'd better use local lastkey. + */ + bulk_insert_param *param= (bulk_insert_param*)param_arg; + MARIA_SHARE *share= param->info->s; + uchar lastkey[MARIA_MAX_KEY_BUFF], *key= (uchar*)key_arg; + uint keylen; + MARIA_KEYDEF *keyinfo= share->keyinfo + param->keynr; + MARIA_KEY tmp_key; + + switch (mode) { + case free_init: + if (share->lock_key_trees) + { + mysql_rwlock_wrlock(&keyinfo->root_lock); + keyinfo->version++; + } + return 0; + case free_free: + /* Note: keylen doesn't contain transid lengths */ + keylen= _ma_keylength(keyinfo, key); + tmp_key.data= lastkey; + tmp_key.keyinfo= keyinfo; + tmp_key.data_length= keylen - share->rec_reflength; + tmp_key.ref_length= param->info->bulk_insert_ref_length; + tmp_key.flag= (param->info->bulk_insert_ref_length == + share->rec_reflength ? 0 : SEARCH_USER_KEY_HAS_TRANSID); + /* + We have to copy key as ma_ck_write_btree may need the buffer for + copying middle key up if tree is growing + */ + memcpy(lastkey, key, tmp_key.data_length + tmp_key.ref_length); + _ma_ck_write_btree(param->info, &tmp_key); + return 0; + case free_end: + if (share->lock_key_trees) + mysql_rwlock_unlock(&keyinfo->root_lock); + return 0; + } + return 0; +} + + +int maria_init_bulk_insert(MARIA_HA *info, size_t cache_size, ha_rows rows) +{ + MARIA_SHARE *share= info->s; + MARIA_KEYDEF *key=share->keyinfo; + bulk_insert_param *params; + uint i, num_keys, total_keylength; + ulonglong key_map; + DBUG_ENTER("_ma_init_bulk_insert"); + DBUG_PRINT("enter",("cache_size: %lu", (ulong) cache_size)); + + DBUG_ASSERT(!info->bulk_insert && + (!rows || rows >= MARIA_MIN_ROWS_TO_USE_BULK_INSERT)); + + maria_clear_all_keys_active(key_map); + for (i=total_keylength=num_keys=0 ; i < share->base.keys ; i++) + { + if (! (key[i].flag & HA_NOSAME) && (share->base.auto_key != i + 1) && + maria_is_key_active(share->state.key_map, i)) + { + num_keys++; + maria_set_key_active(key_map, i); + total_keylength+=key[i].maxlength+TREE_ELEMENT_EXTRA_SIZE; + } + } + + if (num_keys==0 || + num_keys * (size_t) MARIA_MIN_SIZE_BULK_INSERT_TREE > cache_size) + DBUG_RETURN(0); + + if (rows && rows*total_keylength < cache_size) + cache_size= (size_t)rows; + else + cache_size/=total_keylength*16; + + info->bulk_insert=(TREE *) + my_malloc(PSI_INSTRUMENT_ME, (sizeof(TREE)*share->base.keys+ + sizeof(bulk_insert_param)*num_keys),MYF(0)); + + if (!info->bulk_insert) + DBUG_RETURN(HA_ERR_OUT_OF_MEM); + + params=(bulk_insert_param *)(info->bulk_insert+share->base.keys); + for (i=0 ; i < share->base.keys ; i++) + { + if (maria_is_key_active(key_map, i)) + { + params->info=info; + params->keynr=i; + /* Only allocate a 16'th of the buffer at a time */ + init_tree(&info->bulk_insert[i], + cache_size * key[i].maxlength, + cache_size * key[i].maxlength, 0, + (qsort_cmp2) keys_compare, keys_free, (void *)params++, MYF(0)); + } + else + info->bulk_insert[i].root=0; + } + + DBUG_RETURN(0); +} + +void maria_flush_bulk_insert(MARIA_HA *info, uint inx) +{ + if (info->bulk_insert) + { + if (is_tree_inited(&info->bulk_insert[inx])) + reset_tree(&info->bulk_insert[inx]); + } +} + + +int maria_end_bulk_insert(MARIA_HA *info, my_bool abort) +{ + int first_error= 0; + DBUG_ENTER("maria_end_bulk_insert"); + if (info->bulk_insert) + { + uint i; + for (i=0 ; i < info->s->base.keys ; i++) + { + if (is_tree_inited(&info->bulk_insert[i])) + { + int error; + if (info->s->deleting) + reset_free_element(&info->bulk_insert[i]); + if ((error= delete_tree(&info->bulk_insert[i], abort))) + { + first_error= first_error ? first_error : error; + abort= 1; + } + } + } + my_free(info->bulk_insert); + info->bulk_insert= 0; + } + DBUG_RETURN(first_error); +} + + +/**************************************************************************** + Dedicated functions that generate log entries +****************************************************************************/ + + +int _ma_write_undo_key_insert(MARIA_HA *info, const MARIA_KEY *key, + my_off_t *root, my_off_t new_root, LSN *res_lsn) +{ + MARIA_SHARE *share= info->s; + MARIA_KEYDEF *keyinfo= key->keyinfo; + uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + + KEY_NR_STORE_SIZE]; + const uchar *key_value; + LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2]; + struct st_msg_to_write_hook_for_undo_key msg; + uint key_length; + + /* Save if we need to write a clr record */ + lsn_store(log_data, info->trn->undo_lsn); + key_nr_store(log_data + LSN_STORE_SIZE + FILEID_STORE_SIZE, + keyinfo->key_nr); + key_length= key->data_length + key->ref_length; + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key->data; + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length; + + msg.root= root; + msg.value= new_root; + msg.auto_increment= 0; + key_value= key->data; + if (share->base.auto_key == ((uint) keyinfo->key_nr + 1)) + { + const HA_KEYSEG *keyseg= keyinfo->seg; + uchar reversed[MARIA_MAX_KEY_BUFF]; + if (keyseg->flag & HA_SWAP_KEY) + { + /* We put key from log record to "data record" packing format... */ + const uchar *key_ptr= key->data, *key_end= key->data + keyseg->length; + uchar *to= reversed + keyseg->length; + do + { + *--to= *key_ptr++; + } while (key_ptr != key_end); + key_value= to; + } + /* ... so that we can read it with: */ + msg.auto_increment= + ma_retrieve_auto_increment(key_value, keyseg->type); + /* and write_hook_for_undo_key_insert() will pick this. */ + } + + return translog_write_record(res_lsn, LOGREC_UNDO_KEY_INSERT, + info->trn, info, + (translog_size_t) + log_array[TRANSLOG_INTERNAL_PARTS + 0].length + + key_length, + TRANSLOG_INTERNAL_PARTS + 2, log_array, + log_data + LSN_STORE_SIZE, &msg) ? -1 : 0; +} + + +/** + @brief Log creation of new page + + @note + We don't have to store the page_length into the log entry as we can + calculate this from the length of the log entry + + @retval 1 error + @retval 0 ok +*/ + +my_bool _ma_log_new(MARIA_PAGE *ma_page, my_bool root_page) +{ + LSN lsn; + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2 + KEY_NR_STORE_SIZE + +1]; + uint page_length; + LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 2]; + MARIA_HA *info= ma_page->info; + MARIA_SHARE *share= info->s; + my_off_t page= ma_page->pos / share->block_size; + DBUG_ENTER("_ma_log_new"); + DBUG_PRINT("enter", ("page: %lu", (ulong) page)); + + DBUG_ASSERT(share->now_transactional); + + /* Store address of new root page */ + page_store(log_data + FILEID_STORE_SIZE, page); + + /* Store link to next unused page */ + if (info->key_del_used == 2) + page= 0; /* key_del not changed */ + else + page= ((share->key_del_current == HA_OFFSET_ERROR) ? IMPOSSIBLE_PAGE_NO : + share->key_del_current / share->block_size); + + page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page); + key_nr_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE*2, + ma_page->keyinfo->key_nr); + log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE*2 + KEY_NR_STORE_SIZE]= + (uchar) root_page; + + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + + page_length= ma_page->size - LSN_STORE_SIZE; + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ma_page->buff + LSN_STORE_SIZE; + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= page_length; + + /* Remember new page length for future log entires for same page */ + ma_page->org_size= ma_page->size; + + if (translog_write_record(&lsn, LOGREC_REDO_INDEX_NEW_PAGE, + info->trn, info, + (translog_size_t) + (sizeof(log_data) + page_length), + TRANSLOG_INTERNAL_PARTS + 2, log_array, + log_data, NULL)) + DBUG_RETURN(1); + DBUG_RETURN(0); +} + + +/** + @brief + Log when some part of the key page changes +*/ + +my_bool _ma_log_change(MARIA_PAGE *ma_page, const uchar *key_pos, uint length, + enum en_key_debug debug_marker __attribute__((unused))) +{ + LSN lsn; + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 6 + 7], *log_pos; + LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4]; + uint offset= (uint) (key_pos - ma_page->buff), translog_parts; + MARIA_HA *info= ma_page->info; + my_off_t page= ma_page->pos / info->s->block_size; + DBUG_ENTER("_ma_log_change"); + DBUG_PRINT("enter", ("page: %lu length: %u", (ulong) page, length)); + + DBUG_ASSERT(info->s->now_transactional); + DBUG_ASSERT(offset + length <= ma_page->size); + DBUG_ASSERT(ma_page->org_size == ma_page->size); + + /* Store address of new root page */ + page= ma_page->pos / info->s->block_size; + page_store(log_data + FILEID_STORE_SIZE, page); + log_pos= log_data+ FILEID_STORE_SIZE + PAGE_STORE_SIZE; + +#ifdef EXTRA_DEBUG_KEY_CHANGES + (*log_pos++)= KEY_OP_DEBUG; + (*log_pos++)= debug_marker; +#endif + + log_pos[0]= KEY_OP_OFFSET; + int2store(log_pos+1, offset); + log_pos[3]= KEY_OP_CHANGE; + int2store(log_pos+4, length); + log_pos+= 6; + + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (log_pos - log_data); + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos; + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= length; + translog_parts= 2; + + _ma_log_key_changes(ma_page, + log_array + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_pos, &length, &translog_parts); + + if (translog_write_record(&lsn, LOGREC_REDO_INDEX, + info->trn, info, + (translog_size_t) (log_pos - log_data) + length, + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_array, log_data, NULL)) + DBUG_RETURN(1); + DBUG_RETURN(0); +} + + +/** + @brief Write log entry for page splitting + + @fn _ma_log_split() + @param + ma_page Page that is changed + org_length Original length of page. Can be bigger than block_size + for block that overflowed + new_length New length of page + key_pos Where key is inserted on page (may be 0 if no key) + key_length Number of bytes changed at key_pos + move_length Number of bytes moved at key_pos to make room for key + prefix_or_suffix KEY_OP_NONE Ignored + KEY_OP_ADD_PREFIX Add data to start of page + KEY_OP_ADD_SUFFIX Add data to end of page + data What data was added + data_length Number of bytes added first or last + changed_length Number of bytes changed first or last. + + @note + Write log entry for page that has got a key added to the page under + one and only one of the following senarios: + - Page is shortened from end + - Data is added to end of page + - Data added at front of page +*/ + +static my_bool _ma_log_split(MARIA_PAGE *ma_page, + uint org_length, uint new_length, + const uchar *key_pos, uint key_length, + int move_length, enum en_key_op prefix_or_suffix, + const uchar *data, uint data_length, + uint changed_length) +{ + LSN lsn; + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+3+3+3+3+2 +7]; + uchar *log_pos; + LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6]; + uint offset= (uint) (key_pos - ma_page->buff); + uint translog_parts, extra_length; + MARIA_HA *info= ma_page->info; + my_off_t page= ma_page->pos / info->s->block_size; + DBUG_ENTER("_ma_log_split"); + DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u", + (ulong) page, org_length, new_length)); + + DBUG_ASSERT(changed_length >= data_length); + DBUG_ASSERT(org_length <= info->s->max_index_block_size); + DBUG_ASSERT(new_length == ma_page->size); + DBUG_ASSERT(org_length == ma_page->org_size); + + log_pos= log_data + FILEID_STORE_SIZE; + page_store(log_pos, page); + log_pos+= PAGE_STORE_SIZE; + +#ifdef EXTRA_DEBUG_KEY_CHANGES + (*log_pos++)= KEY_OP_DEBUG; + (*log_pos++)= KEY_OP_DEBUG_LOG_SPLIT; +#endif + + /* Store keypage_flag */ + *log_pos++= KEY_OP_SET_PAGEFLAG; + *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff); + + if (new_length <= offset || !key_pos) + { + /* + Page was split before inserted key. Write redo entry where + we just cut current page at page_length + */ + uint length_offset= org_length - new_length; + log_pos[0]= KEY_OP_DEL_SUFFIX; + int2store(log_pos+1, length_offset); + log_pos+= 3; + translog_parts= 1; + extra_length= 0; + DBUG_ASSERT(data_length == 0); + } + else + { + /* Key was added to page which was split after the inserted key */ + uint max_key_length; + + /* + Handle case when split happened directly after the newly inserted key. + */ + max_key_length= new_length - offset; + extra_length= MY_MIN(key_length, max_key_length); + if (offset + move_length > new_length) + { + /* This is true when move_length includes changes for next packed key */ + move_length= new_length - offset; + } + + if ((int) new_length < (int) (org_length + move_length + data_length)) + { + /* Shorten page */ + uint diff= org_length + move_length + data_length - new_length; + log_pos[0]= KEY_OP_DEL_SUFFIX; + int2store(log_pos + 1, diff); + log_pos+= 3; + DBUG_ASSERT(data_length == 0); /* Page is shortened */ + DBUG_ASSERT(offset <= org_length - diff); + } + else + { + DBUG_ASSERT(new_length == org_length + move_length + data_length); + DBUG_ASSERT(offset <= org_length); + } + + log_pos[0]= KEY_OP_OFFSET; + int2store(log_pos+1, offset); + log_pos+= 3; + + if (move_length) + { + log_pos[0]= KEY_OP_SHIFT; + int2store(log_pos+1, move_length); + log_pos+= 3; + } + + log_pos[0]= KEY_OP_CHANGE; + int2store(log_pos+1, extra_length); + log_pos+= 3; + + /* Point to original inserted key data */ + if (prefix_or_suffix == KEY_OP_ADD_PREFIX) + key_pos+= data_length; + + translog_parts= 2; + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos; + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= extra_length; + } + + if (data_length) + { + /* Add prefix or suffix */ + log_pos[0]= prefix_or_suffix; + int2store(log_pos+1, data_length); + log_pos+= 3; + if (prefix_or_suffix == KEY_OP_ADD_PREFIX) + { + int2store(log_pos+1, changed_length); + log_pos+= 2; + data_length= changed_length; + } + log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].str= data; + log_array[TRANSLOG_INTERNAL_PARTS + translog_parts].length= data_length; + translog_parts++; + extra_length+= data_length; + } + + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - + log_data); + + _ma_log_key_changes(ma_page, + log_array + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_pos, &extra_length, &translog_parts); + /* Remember new page length for future log entires for same page */ + ma_page->org_size= ma_page->size; + + DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX, + info->trn, info, + (translog_size_t) + log_array[TRANSLOG_INTERNAL_PARTS + + 0].length + extra_length, + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_array, log_data, NULL)); +} + + +/** + @brief + Write log entry for page that has got a key added to the page + and page is shortened from start of page + + @fn _ma_log_del_prefix() + @param info Maria handler + @param page Page number + @param buff Page buffer + @param org_length Length of buffer when read + @param new_length Final length + @param key_pos Where on page buffer key was added. This is position + before prefix was removed + @param key_length How many bytes was changed at 'key_pos' + @param move_length How many bytes was moved up when key was added + + @return + @retval 0 ok + @retval 1 error +*/ + +static my_bool _ma_log_del_prefix(MARIA_PAGE *ma_page, + uint org_length, uint new_length, + const uchar *key_pos, uint key_length, + int move_length) +{ + LSN lsn; + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 12 + 7]; + uchar *log_pos; + LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4]; + uint offset= (uint) (key_pos - ma_page->buff); + uint diff_length= org_length + move_length - new_length; + uint translog_parts, extra_length; + MARIA_HA *info= ma_page->info; + my_off_t page= ma_page->pos / info->s->block_size; + DBUG_ENTER("_ma_log_del_prefix"); + DBUG_PRINT("enter", ("page: %lu org_length: %u new_length: %u", + (ulong) page, org_length, new_length)); + + DBUG_ASSERT((int) diff_length > 0); + DBUG_ASSERT(ma_page->org_size == org_length); + DBUG_ASSERT(ma_page->size == new_length); + + log_pos= log_data + FILEID_STORE_SIZE; + page_store(log_pos, page); + log_pos+= PAGE_STORE_SIZE; + + translog_parts= 1; + extra_length= 0; + +#ifdef EXTRA_DEBUG_KEY_CHANGES + *log_pos++= KEY_OP_DEBUG; + *log_pos++= KEY_OP_DEBUG_LOG_DEL_PREFIX; +#endif + + /* Store keypage_flag */ + *log_pos++= KEY_OP_SET_PAGEFLAG; + *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff); + + if (offset < diff_length + info->s->keypage_header) + { + /* + Key is not anymore on page. Move data down, but take into account that + the original page had grown with 'move_length bytes' + */ + DBUG_ASSERT(offset + key_length <= diff_length + info->s->keypage_header); + + log_pos[0]= KEY_OP_DEL_PREFIX; + int2store(log_pos+1, diff_length - move_length); + log_pos+= 3; + } + else + { + /* + Correct position to key, as data before key has been delete and key + has thus been moved down + */ + offset-= diff_length; + key_pos-= diff_length; + + /* Move data down */ + log_pos[0]= KEY_OP_DEL_PREFIX; + int2store(log_pos+1, diff_length); + log_pos+= 3; + + log_pos[0]= KEY_OP_OFFSET; + int2store(log_pos+1, offset); + log_pos+= 3; + + if (move_length) + { + log_pos[0]= KEY_OP_SHIFT; + int2store(log_pos+1, move_length); + log_pos+= 3; + } + log_pos[0]= KEY_OP_CHANGE; + int2store(log_pos+1, key_length); + log_pos+= 3; + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= key_pos; + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= key_length; + translog_parts= 2; + extra_length= key_length; + } + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - + log_data); + _ma_log_key_changes(ma_page, + log_array + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_pos, &extra_length, &translog_parts); + /* Remember new page length for future log entires for same page */ + ma_page->org_size= ma_page->size; + + DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX, + info->trn, info, + (translog_size_t) + log_array[TRANSLOG_INTERNAL_PARTS + + 0].length + extra_length, + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_array, log_data, NULL)); +} + + +/** + @brief + Write log entry for page that has got data added first and + data deleted last. Old changed key may be part of page +*/ + +static my_bool _ma_log_key_middle(MARIA_PAGE *ma_page, + uint new_length, + uint data_added_first, + uint data_changed_first, + uint data_deleted_last, + const uchar *key_pos, + uint key_length, int move_length) +{ + LSN lsn; + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 2 + 2 + 3+5+3+3+3 + 7]; + uchar *log_pos; + LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 6]; + uint key_offset; + uint translog_parts, extra_length; + MARIA_HA *info= ma_page->info; + my_off_t page= ma_page->pos / info->s->block_size; + DBUG_ENTER("_ma_log_key_middle"); + DBUG_PRINT("enter", ("page: %lu", (ulong) page)); + + DBUG_ASSERT(ma_page->size == new_length); + + /* new place of key after changes */ + key_pos+= data_added_first; + key_offset= (uint) (key_pos - ma_page->buff); + if (key_offset < new_length) + { + /* key is on page; Calculate how much of the key is there */ + uint max_key_length= new_length - key_offset; + if (max_key_length < key_length) + { + /* Key is last on page */ + key_length= max_key_length; + move_length= 0; + } + /* + Take into account that new data was added as part of original key + that also needs to be removed from page + */ + data_deleted_last+= move_length; + } + + /* First log changes to page */ + log_pos= log_data + FILEID_STORE_SIZE; + page_store(log_pos, page); + log_pos+= PAGE_STORE_SIZE; + +#ifdef EXTRA_DEBUG_KEY_CHANGES + *log_pos++= KEY_OP_DEBUG; + *log_pos++= KEY_OP_DEBUG_LOG_MIDDLE; +#endif + + /* Store keypage_flag */ + *log_pos++= KEY_OP_SET_PAGEFLAG; + *log_pos++= _ma_get_keypage_flag(info->s, ma_page->buff); + + log_pos[0]= KEY_OP_DEL_SUFFIX; + int2store(log_pos+1, data_deleted_last); + log_pos+= 3; + + log_pos[0]= KEY_OP_ADD_PREFIX; + int2store(log_pos+1, data_added_first); + int2store(log_pos+3, data_changed_first); + log_pos+= 5; + + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - + log_data); + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (ma_page->buff + + info->s->keypage_header); + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first; + translog_parts= 2; + extra_length= data_changed_first; + + /* If changed key is on page, log those changes too */ + + if (key_offset < new_length) + { + uchar *start_log_pos= log_pos; + + log_pos[0]= KEY_OP_OFFSET; + int2store(log_pos+1, key_offset); + log_pos+= 3; + if (move_length) + { + log_pos[0]= KEY_OP_SHIFT; + int2store(log_pos+1, move_length); + log_pos+= 3; + } + log_pos[0]= KEY_OP_CHANGE; + int2store(log_pos+1, key_length); + log_pos+= 3; + + log_array[TRANSLOG_INTERNAL_PARTS + 2].str= start_log_pos; + log_array[TRANSLOG_INTERNAL_PARTS + 2].length= (uint) (log_pos - + start_log_pos); + + log_array[TRANSLOG_INTERNAL_PARTS + 3].str= key_pos; + log_array[TRANSLOG_INTERNAL_PARTS + 3].length= key_length; + translog_parts+=2; + extra_length+= (uint) (log_array[TRANSLOG_INTERNAL_PARTS + 2].length + + key_length); + } + + _ma_log_key_changes(ma_page, + log_array + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_pos, &extra_length, &translog_parts); + /* Remember new page length for future log entires for same page */ + ma_page->org_size= ma_page->size; + + DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX, + info->trn, info, + (translog_size_t) + (log_array[TRANSLOG_INTERNAL_PARTS + + 0].length + extra_length), + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_array, log_data, NULL)); +} + + +#ifdef NOT_NEEDED + +/** + @brief + Write log entry for page that has got data added first and + data deleted last +*/ + +static my_bool _ma_log_middle(MARIA_PAGE *ma_page, + uint data_added_first, uint data_changed_first, + uint data_deleted_last) +{ + LSN lsn; + LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 4]; + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + 3 + 5 + 7], *log_pos; + MARIA_HA *info= ma_page->info; + my_off_t page= ma_page->page / info->s->block_size; + uint translog_parts, extra_length; + DBUG_ENTER("_ma_log_middle"); + DBUG_PRINT("enter", ("page: %lu", (ulong) page)); + + DBUG_ASSERT(ma_page->org_size + data_added_first - data_deleted_last == + ma_page->size); + + log_pos= log_data + FILEID_STORE_SIZE; + page_store(log_pos, page); + log_pos+= PAGE_STORE_SIZE; + + log_pos[0]= KEY_OP_DEL_PREFIX; + int2store(log_pos+1, data_deleted_last); + log_pos+= 3; + + log_pos[0]= KEY_OP_ADD_PREFIX; + int2store(log_pos+1, data_added_first); + int2store(log_pos+3, data_changed_first); + log_pos+= 5; + + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= (uint) (log_pos - + log_data); + + log_array[TRANSLOG_INTERNAL_PARTS + 1].str= ((char*) buff + + info->s->keypage_header); + log_array[TRANSLOG_INTERNAL_PARTS + 1].length= data_changed_first; + translog_parts= 2; + extra_length= data_changed_first; + + _ma_log_key_changes(ma_page, + log_array + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_pos, &extra_length, &translog_parts); + /* Remember new page length for future log entires for same page */ + ma_page->org_size= ma_page->size; + + DBUG_RETURN(translog_write_record(&lsn, LOGREC_REDO_INDEX, + info->trn, info, + (translog_size_t) + log_array[TRANSLOG_INTERNAL_PARTS + + 0].length + extra_length, + TRANSLOG_INTERNAL_PARTS + translog_parts, + log_array, log_data, NULL)); +} +#endif |