diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-04 18:00:34 +0000 |
commit | 3f619478f796eddbba6e39502fe941b285dd97b1 (patch) | |
tree | e2c7b5777f728320e5b5542b6213fd3591ba51e2 /storage/maria/ma_bitmap.c | |
parent | Initial commit. (diff) | |
download | mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.tar.xz mariadb-3f619478f796eddbba6e39502fe941b285dd97b1.zip |
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/maria/ma_bitmap.c')
-rw-r--r-- | storage/maria/ma_bitmap.c | 3386 |
1 files changed, 3386 insertions, 0 deletions
diff --git a/storage/maria/ma_bitmap.c b/storage/maria/ma_bitmap.c new file mode 100644 index 00000000..61fe4f9d --- /dev/null +++ b/storage/maria/ma_bitmap.c @@ -0,0 +1,3386 @@ +/* Copyright (C) 2007 Michael Widenius + Copyright (c) 2010, 2013, Monty Program Ab. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +/* + Bitmap handling (for records in block) + + The data file starts with a bitmap page, followed by as many data + pages as the bitmap can cover. After this there is a new bitmap page + and more data pages etc. + + The bitmap code assumes there is always an active bitmap page and thus + that there is at least one bitmap page in the file + + Structure of bitmap page: + + Fixed size records (to be implemented later): + + 2 bits are used to indicate: + + 0 Empty + 1 0-75 % full (at least room for 2 records) + 2 75-100 % full (at least room for one record) + 3 100 % full (no more room for records) + + Assuming 8K pages, this will allow us to map: + 8192 (bytes per page) * 4 (pages mapped per byte) * 8192 (page size)= 256M + + (For Maria this will be 7*4 * 8192 = 224K smaller because of LSN) + + Note that for fixed size rows, we can't add more columns without doing + a full reorganization of the table. The user can always force a dynamic + size row format by specifying ROW_FORMAT=dynamic. + + + Dynamic size records: + + 3 bits are used to indicate Bytes free in 8K page + + 0 Empty page 8176 (head or tail) + 1 0-30 % full (at least room for 3 records) 5724 + 2 30-60 % full (at least room for 2 records) 3271 + 3 60-90 % full (at least room for one record) 818 + 4 100 % full (no more room for records) 0 + 5 Tail page, 0-40 % full 4906 + 6 Tail page, 40-80 % full 1636 + 7 Full tail page or full blob page 0 + + Assuming 8K pages, this will allow us to map: + 8192 (bytes per page) * 8 bits/byte / 3 bits/page * 8192 (page size)= 170.7M + + Note that values 1-3 may be adjust for each individual table based on + 'min record length'. Tail pages are for overflow data which can be of + any size and thus doesn't have to be adjusted for different tables. + If we add more columns to the table, some of the originally calculated + 'cut off' points may not be optimal, but they shouldn't be 'drasticly + wrong'. + + When allocating data from the bitmap, we are trying to do it in a + 'best fit' manner. Blobs and varchar blocks are given out in large + continuous extents to allow fast access to these. Before allowing a + row to 'flow over' to other blocks, we will compact the page and use + all space on it. If there is many rows in the page, we will ensure + there is *LEFT_TO_GROW_ON_SPLIT* bytes left on the page to allow other + rows to grow. + + The bitmap format allows us to extend the row file in big chunks, if needed. + + When calculating the size for a packed row, we will calculate the following + things separately: + - Row header + null_bits + empty_bits fixed size segments etc. + - Size of all char/varchar fields + - Size of each blob field + + The bitmap handler will get all the above information and return + either one page or a set of pages to put the different parts. + + Bitmaps are read on demand in response to insert/delete/update operations. + The following bitmap pointers will be cached and stored on disk on close: + - Current insert_bitmap; When inserting new data we will first try to + fill this one. + - First bitmap which is not completely full. This is updated when we + free data with an update or delete. + + While flushing out bitmaps, we will cache the status of the bitmap in memory + to avoid having to read a bitmap for insert of new data that will not + be of any use + - Total empty space + - Largest number of continuous pages + + Bitmap ONLY goes to disk in the following scenarios + - The file is closed (and we flush all changes to disk) + - On checkpoint + (Ie: When we do a checkpoint, we have to ensure that all bitmaps are + put on disk even if they are not in the page cache). + - When explicitly requested (for example on backup or after recovery, + to simplify things) + + The flow of writing a row is that: + - Mark the bitmap not flushable (_ma_bitmap_flushable(X, 1)) + - Lock the bitmap + - Decide which data pages we will write to + - Mark them full in the bitmap page so that other threads do not try to + use the same data pages as us + - We unlock the bitmap + - Write the data pages + - Lock the bitmap + - Correct the bitmap page with the true final occupation of the data + pages (that is, we marked pages full but when we are done we realize + we didn't fill them) + - Unlock the bitmap. + - Mark the bitmap flushable (_ma_bitmap_flushable(X, -1)) +*/ + +#include "maria_def.h" +#include "ma_blockrec.h" + +#define FULL_HEAD_PAGE 4 +#define FULL_TAIL_PAGE 7 + +const char *bits_to_txt[]= +{ + "empty", "00-30% full", "30-60% full", "60-90% full", "full", + "tail 00-40 % full", "tail 40-80 % full", "tail/blob full" +}; + +#define WRONG_BITMAP_FLUSH 0 /*define to 1 only for provoking bugs*/ + +static my_bool _ma_read_bitmap_page(MARIA_HA *info, + MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page); +static my_bool _ma_bitmap_create_missing(MARIA_HA *info, + MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page); +static void _ma_bitmap_unpin_all(MARIA_SHARE *share); +#ifndef DBUG_OFF +static void _ma_check_bitmap(MARIA_FILE_BITMAP *bitmap); +#else +#define _ma_check_bitmap(A) do { } while(0) +#endif + + +/* Write bitmap page to key cache */ + +static inline my_bool write_changed_bitmap(MARIA_SHARE *share, + MARIA_FILE_BITMAP *bitmap) +{ + my_bool res; + DBUG_ENTER("write_changed_bitmap"); + DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size); + DBUG_ASSERT(bitmap->file.pre_write_hook != 0); + DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable)); + + /* + Mark that a bitmap page has been written to page cache and we have + to flush it during checkpoint. + */ + bitmap->changed_not_flushed= 1; + + if ((bitmap->non_flushable == 0) || WRONG_BITMAP_FLUSH) + { + res= pagecache_write(share->pagecache, + &bitmap->file, bitmap->page, 0, + bitmap->map, PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, + PAGECACHE_PIN_LEFT_UNPINNED, + PAGECACHE_WRITE_DELAY, 0, LSN_IMPOSSIBLE); + DBUG_ASSERT(!res); + DBUG_RETURN(res); + } + else + { + /* + bitmap->non_flushable means that someone has changed the bitmap, + but it's not yet complete so it can't yet be written to disk. + In this case we write the changed bitmap to the disk cache, + but keep it pinned until the change is completed. The page will + be unpinned later by _ma_bitmap_unpin_all() as soon as non_flushable + is set back to 0. + */ + MARIA_PINNED_PAGE page_link; + DBUG_PRINT("info", ("Writing pinned bitmap page")); + res= pagecache_write(share->pagecache, + &bitmap->file, bitmap->page, 0, + bitmap->map, PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_PIN, + PAGECACHE_WRITE_DELAY, &page_link.link, + LSN_IMPOSSIBLE); + page_link.unlock= PAGECACHE_LOCK_LEFT_UNLOCKED; + page_link.changed= 1; + push_dynamic(&bitmap->pinned_pages, (const uchar*) (void*) &page_link); + DBUG_ASSERT(!res); + DBUG_RETURN(res); + } +} + +/* + Initialize bitmap variables in share + + SYNOPSIS + _ma_bitmap_init() + share Share handler + file Data file handler + last_page Pointer to last page (max_file_size) that needs to be + mapped by the bitmap. This is adjusted to bitmap + alignment. + + NOTES + This is called the first time a file is opened. + + RETURN + 0 ok + 1 error +*/ + +my_bool _ma_bitmap_init(MARIA_SHARE *share, File file, + pgcache_page_no_t *last_page) +{ + uint aligned_bit_blocks; + uint max_page_size; + MARIA_FILE_BITMAP *bitmap= &share->bitmap; + uint size= share->block_size; + myf flag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0); + pgcache_page_no_t first_bitmap_with_space; +#ifndef DBUG_OFF + /* We want to have a copy of the bitmap to be able to print differences */ + size*= 2; +#endif + + if (!((bitmap->map= (uchar*) my_malloc(PSI_INSTRUMENT_ME, size, flag))) || + my_init_dynamic_array(PSI_INSTRUMENT_ME, &bitmap->pinned_pages, + sizeof(MARIA_PINNED_PAGE), 1, 1, flag)) + return 1; + + bitmap->share= share; + bitmap->block_size= share->block_size; + bitmap->file.file= file; + _ma_bitmap_set_pagecache_callbacks(&bitmap->file, share); + + /* Size needs to be aligned on 6 */ + aligned_bit_blocks= (share->block_size - PAGE_SUFFIX_SIZE) / 6; + bitmap->max_total_size= bitmap->total_size= aligned_bit_blocks * 6; + /* + In each 6 bytes, we have 6*8/3 = 16 pages covered + The +1 is to add the bitmap page, as this doesn't have to be covered + */ + bitmap->pages_covered= aligned_bit_blocks * 16 + 1; + bitmap->flush_all_requested= bitmap->waiting_for_flush_all_requested= + bitmap->waiting_for_non_flushable= 0; + bitmap->non_flushable= 0; + + /* Update size for bits */ + /* TODO; Make this dependent of the row size */ + max_page_size= share->block_size - PAGE_OVERHEAD_SIZE(share) + DIR_ENTRY_SIZE; + bitmap->sizes[0]= max_page_size; /* Empty page */ + bitmap->sizes[1]= max_page_size - max_page_size * 30 / 100; + bitmap->sizes[2]= max_page_size - max_page_size * 60 / 100; + bitmap->sizes[3]= max_page_size - max_page_size * 90 / 100; + bitmap->sizes[4]= 0; /* Full page */ + bitmap->sizes[5]= max_page_size - max_page_size * 40 / 100; + bitmap->sizes[6]= max_page_size - max_page_size * 80 / 100; + bitmap->sizes[7]= 0; + + /* + If a record size will fit into the smallest empty page, return first + found page in find_head() + */ + if (bitmap->sizes[3] >= share->base.max_pack_length) + bitmap->return_first_match= 1; + + mysql_mutex_init(key_SHARE_BITMAP_lock, + &share->bitmap.bitmap_lock, MY_MUTEX_INIT_SLOW); + mysql_cond_init(key_SHARE_BITMAP_cond, + &share->bitmap.bitmap_cond, 0); + + first_bitmap_with_space= share->state.first_bitmap_with_space; + _ma_bitmap_reset_cache(share); + + /* + The bitmap used to map the file are aligned on 6 bytes. We now + calculate the max file size that can be used by the bitmap. This + is needed to get ma_info() give a true file size so that the user can + estimate if there is still space free for records in the file. + */ + { + pgcache_page_no_t last_bitmap_page; + ulong blocks, bytes; + + last_bitmap_page= *last_page - *last_page % bitmap->pages_covered; + blocks= (ulong) (*last_page - last_bitmap_page); + bytes= (blocks * 3) / 8; /* 3 bit per page / 8 bits per byte */ + /* Size needs to be aligned on 6 */ + bytes/= 6; + bytes*= 6; + bitmap->last_bitmap_page= last_bitmap_page; + bitmap->last_total_size= (uint)bytes; + *last_page= ((last_bitmap_page + bytes*8/3)); + } + + /* Restore first_bitmap_with_space if it's resonable */ + if (first_bitmap_with_space <= (share->state.state.data_file_length / + share->block_size)) + share->state.first_bitmap_with_space= first_bitmap_with_space; + + return 0; +} + + +/* + Free data allocated by _ma_bitmap_init + + SYNOPSIS + _ma_bitmap_end() + share Share handler +*/ + +my_bool _ma_bitmap_end(MARIA_SHARE *share) +{ + my_bool res; + +#ifndef DBUG_OFF + if (! share->internal_table) + mysql_mutex_assert_owner(&share->close_lock); +#endif + DBUG_ASSERT(share->bitmap.non_flushable == 0); + DBUG_ASSERT(share->bitmap.flush_all_requested == 0); + DBUG_ASSERT(share->bitmap.waiting_for_non_flushable == 0 && + share->bitmap.waiting_for_flush_all_requested == 0); + DBUG_ASSERT(share->bitmap.pinned_pages.elements == 0); + + res= _ma_bitmap_flush(share); + mysql_mutex_destroy(&share->bitmap.bitmap_lock); + mysql_cond_destroy(&share->bitmap.bitmap_cond); + delete_dynamic(&share->bitmap.pinned_pages); + my_free(share->bitmap.map); + share->bitmap.map= 0; + /* + This is to not get an assert in checkpoint. The bitmap will be flushed + at once by _ma_once_end_block_record() as part of the normal flush + of the kfile. + */ + share->bitmap.changed_not_flushed= 0; + return res; +} + +/* + Ensure that we have incremented open count before we try to read/write + a page while we have the bitmap lock. + This is needed to ensure that we don't call _ma_mark_file_changed() as + part of flushing a page to disk, as this locks share->internal_lock + and then mutex lock would happen in the wrong order. +*/ + +static inline void _ma_bitmap_mark_file_changed(MARIA_SHARE *share, + my_bool flush_translog) +{ + /* + It's extremely unlikely that the following test is true as it + only happens once if the table has changed. + */ + if (unlikely(!share->global_changed && + (share->state.changed & STATE_CHANGED))) + { + /* purecov: begin inspected */ + /* unlock mutex as it can't be hold during _ma_mark_file_changed() */ + mysql_mutex_unlock(&share->bitmap.bitmap_lock); + + /* + We have to flush the translog to ensure we have registered that the + table is open. + */ + if (flush_translog && share->now_transactional) + (void) translog_flush(share->state.logrec_file_id); + + _ma_mark_file_changed_now(share); + mysql_mutex_lock(&share->bitmap.bitmap_lock); + /* purecov: end */ + } +} + +/* + Send updated bitmap to the page cache + + SYNOPSIS + _ma_bitmap_flush() + share Share handler + + NOTES + In the future, _ma_bitmap_flush() will be called to flush changes don't + by this thread (ie, checking the changed flag is ok). The reason we + check it again in the mutex is that if someone else did a flush at the + same time, we don't have to do the write. + This is also ok for _ma_scan_init_block_record() which does not want to + miss rows: it cares only for committed rows, that is, rows for which there + was a commit before our transaction started; as commit and transaction's + start are protected by the same LOCK_trn_list mutex, we see memory at + least as new as at other transaction's commit time, so if the committed + rows caused bitmap->changed to be true, we see it; if we see 0 it really + means a flush happened since then. So, it's ok to read without bitmap's + mutex. + + RETURN + 0 ok + 1 error +*/ + +my_bool _ma_bitmap_flush(MARIA_SHARE *share) +{ + my_bool res= 0; + DBUG_ENTER("_ma_bitmap_flush"); + if (share->bitmap.changed) + { + mysql_mutex_lock(&share->bitmap.bitmap_lock); + if (share->bitmap.changed) + { + /* + We have to mark the file changed here, as otherwise the following + write to pagecache may force a page out from this file, which would + cause _ma_mark_file_changed() to be called with bitmaplock hold! + */ + _ma_bitmap_mark_file_changed(share, 1); + res= write_changed_bitmap(share, &share->bitmap); + share->bitmap.changed= 0; + } + mysql_mutex_unlock(&share->bitmap.bitmap_lock); + } + DBUG_RETURN(res); +} + + +/** + Dirty-page filtering criteria for bitmap pages + + @param type Page's type + @param pageno Page's number + @param rec_lsn Page's rec_lsn + @param arg pages_covered of bitmap +*/ + +static enum pagecache_flush_filter_result +filter_flush_bitmap_pages(enum pagecache_page_type type + __attribute__ ((unused)), + pgcache_page_no_t pageno, + LSN rec_lsn __attribute__ ((unused)), + void *arg) +{ + return ((pageno % (*(ulong*)arg)) == 0); +} + + +/** + Flushes current bitmap page to the pagecache, and then all bitmap pages + from pagecache to the file. Used by Checkpoint. + + @param share Table's share +*/ + +my_bool _ma_bitmap_flush_all(MARIA_SHARE *share) +{ + my_bool res= 0; + uint send_signal= 0; + MARIA_FILE_BITMAP *bitmap= &share->bitmap; + DBUG_ENTER("_ma_bitmap_flush_all"); + +#ifdef EXTRA_DEBUG_BITMAP + { + char buff[160]; + uint len= my_sprintf(buff, + (buff, "bitmap_flush: fd: %d id: %u " + "changed: %d changed_not_flushed: %d " + "flush_all_requested: %d", + share->bitmap.file.file, + share->id, + bitmap->changed, + bitmap->changed_not_flushed, + bitmap->flush_all_requested)); + (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY, + (uchar*) buff, len); + } +#endif + + mysql_mutex_lock(&bitmap->bitmap_lock); + if (!bitmap->changed && !bitmap->changed_not_flushed) + { + mysql_mutex_unlock(&bitmap->bitmap_lock); + DBUG_RETURN(0); + } + + _ma_bitmap_mark_file_changed(share, 0); + + /* + The following should be true as it was tested above. We have to test + this again as _ma_bitmap_mark_file_changed() did temporarly release + the bitmap mutex. + */ + if (bitmap->changed || bitmap->changed_not_flushed) + { + bitmap->flush_all_requested++; + bitmap->waiting_for_non_flushable++; +#if !WRONG_BITMAP_FLUSH + while (bitmap->non_flushable > 0) + { + DBUG_PRINT("info", ("waiting for bitmap to be flushable")); + mysql_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock); + } +#endif + bitmap->waiting_for_non_flushable--; +#ifdef EXTRA_DEBUG_BITMAP + { + char tmp[MAX_BITMAP_INFO_LENGTH]; + size_t len; + len= _ma_get_bitmap_description(bitmap, bitmap->map, bitmap->page, tmp); + (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY, + (uchar*) tmp, len); + } +#endif + + DBUG_ASSERT(bitmap->flush_all_requested == 1); + /* + Bitmap is in a flushable state: its contents in memory are reflected by + log records (complete REDO-UNDO groups) and all bitmap pages are + unpinned. We keep the mutex to preserve this situation, and flush to the + file. + */ + if (bitmap->changed) + { + bitmap->changed= FALSE; + res= write_changed_bitmap(share, bitmap); + } + /* + We do NOT use FLUSH_KEEP_LAZY because we must be sure that bitmap + pages have been flushed. That's a condition of correctness of + Recovery: data pages may have been all flushed, if we write the + checkpoint record Recovery will start from after their REDOs. If + bitmap page was not flushed, as the REDOs about it will be skipped, it + will wrongly not be recovered. If bitmap pages had a rec_lsn it would + be different. + There should be no pinned pages as bitmap->non_flushable==0. + */ + if (flush_pagecache_blocks_with_filter(share->pagecache, + &bitmap->file, FLUSH_KEEP, + filter_flush_bitmap_pages, + &bitmap->pages_covered) & + PCFLUSH_PINNED_AND_ERROR) + res= TRUE; + bitmap->changed_not_flushed= FALSE; + bitmap->flush_all_requested--; + /* + Some well-behaved threads may be waiting for flush_all_requested to + become false, wake them up. + */ + DBUG_PRINT("info", ("bitmap flusher waking up others")); + send_signal= (bitmap->waiting_for_flush_all_requested | + bitmap->waiting_for_non_flushable); + } + mysql_mutex_unlock(&bitmap->bitmap_lock); + if (send_signal) + mysql_cond_broadcast(&bitmap->bitmap_cond); + DBUG_RETURN(res); +} + + +/** + @brief Lock bitmap from being used by another thread + + @fn _ma_bitmap_lock() + @param share Table's share + + @notes + This is a temporary solution for allowing someone to delete an inserted + duplicate-key row while someone else is doing concurrent inserts. + This is ok for now as duplicate key errors are not that common. + + In the future we will add locks for row-pages to ensure two threads doesn't + work at the same time on the same page. +*/ + +void _ma_bitmap_lock(MARIA_SHARE *share) +{ + MARIA_FILE_BITMAP *bitmap= &share->bitmap; + DBUG_ENTER("_ma_bitmap_lock"); + + if (!share->now_transactional) + DBUG_VOID_RETURN; + + mysql_mutex_lock(&bitmap->bitmap_lock); + bitmap->flush_all_requested++; + bitmap->waiting_for_non_flushable++; + while (bitmap->non_flushable) + { + DBUG_PRINT("info", ("waiting for bitmap to be flushable")); + mysql_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock); + } + bitmap->waiting_for_non_flushable--; + /* + Ensure that _ma_bitmap_flush_all() and _ma_bitmap_lock() are blocked. + ma_bitmap_flushable() is blocked thanks to 'flush_all_requested'. + */ + bitmap->non_flushable= 1; + mysql_mutex_unlock(&bitmap->bitmap_lock); + DBUG_VOID_RETURN; +} + +/** + @brief Unlock bitmap after _ma_bitmap_lock() + + @fn _ma_bitmap_unlock() + @param share Table's share +*/ + +void _ma_bitmap_unlock(MARIA_SHARE *share) +{ + MARIA_FILE_BITMAP *bitmap= &share->bitmap; + uint send_signal; + DBUG_ENTER("_ma_bitmap_unlock"); + + if (!share->now_transactional) + DBUG_VOID_RETURN; + DBUG_ASSERT(bitmap->flush_all_requested > 0 && bitmap->non_flushable == 1); + + mysql_mutex_lock(&bitmap->bitmap_lock); + bitmap->non_flushable= 0; + _ma_bitmap_unpin_all(share); + send_signal= bitmap->waiting_for_non_flushable; + if (!--bitmap->flush_all_requested) + send_signal|= bitmap->waiting_for_flush_all_requested; + mysql_mutex_unlock(&bitmap->bitmap_lock); + if (send_signal) + mysql_cond_broadcast(&bitmap->bitmap_cond); + DBUG_VOID_RETURN; +} + + +/** + @brief Unpin all pinned bitmap pages + + @param share Table's share + + @return Operation status + @retval 0 ok + + @note This unpins pages pinned by other threads. +*/ + +static void _ma_bitmap_unpin_all(MARIA_SHARE *share) +{ + MARIA_FILE_BITMAP *bitmap= &share->bitmap; + MARIA_PINNED_PAGE *page_link= ((MARIA_PINNED_PAGE*) + dynamic_array_ptr(&bitmap->pinned_pages, 0)); + MARIA_PINNED_PAGE *pinned_page= page_link + bitmap->pinned_pages.elements; + DBUG_ENTER("_ma_bitmap_unpin_all"); + DBUG_PRINT("info", ("pinned: %zu", bitmap->pinned_pages.elements)); + while (pinned_page-- != page_link) + pagecache_unlock_by_link(share->pagecache, pinned_page->link, + pinned_page->unlock, PAGECACHE_UNPIN, + LSN_IMPOSSIBLE, LSN_IMPOSSIBLE, FALSE, TRUE); + bitmap->pinned_pages.elements= 0; + DBUG_VOID_RETURN; +} + + +/* + Intialize bitmap in memory to a zero bitmap + + SYNOPSIS + _ma_bitmap_delete_all() + share Share handler + + NOTES + This is called on maria_delete_all_rows (truncate data file). +*/ + +void _ma_bitmap_delete_all(MARIA_SHARE *share) +{ + MARIA_FILE_BITMAP *bitmap= &share->bitmap; + DBUG_ENTER("_ma_bitmap_delete_all"); + if (bitmap->map) /* Not in create */ + { + bzero(bitmap->map, bitmap->block_size); + bitmap->changed= 1; + bitmap->page= 0; + bitmap->used_size= bitmap->full_tail_size= bitmap->full_head_size= 0; + bitmap->total_size= bitmap->max_total_size; + } + DBUG_VOID_RETURN; +} + + +/** + @brief Reset bitmap caches + + @fn _ma_bitmap_reset_cache() + @param share Maria share + + @notes + This is called after we have swapped file descriptors and we want + bitmap to forget all cached information. + It's also called directly after we have opened a file. +*/ + +void _ma_bitmap_reset_cache(MARIA_SHARE *share) +{ + MARIA_FILE_BITMAP *bitmap= &share->bitmap; + + if (bitmap->map) /* If using bitmap */ + { + /* Forget changes in current bitmap page */ + bitmap->changed= 0; + + /* + We can't read a page yet, as in some case we don't have an active + page cache yet. + Pretend we have a dummy, full and not changed bitmap page in memory. + + We set bitmap->page to a value so that if we use it in + move_to_next_bitmap() it will point to page 0. + (This can only happen if writing to a bitmap page fails) + */ + bitmap->page= ((pgcache_page_no_t) 0) - bitmap->pages_covered; + bitmap->used_size= bitmap->total_size= bitmap->max_total_size; + bitmap->full_head_size= bitmap->full_tail_size= bitmap->max_total_size; + bfill(bitmap->map, share->block_size, 255); +#ifndef DBUG_OFF + memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size); +#endif + + /* Start scanning for free space from start of file */ + share->state.first_bitmap_with_space = 0; + } +} + + +/* + Return bitmap pattern for the smallest head block that can hold 'size' + + SYNOPSIS + size_to_head_pattern() + bitmap Bitmap + size Requested size + + RETURN + 0-3 For a description of the bitmap sizes, see the header +*/ + +static uint size_to_head_pattern(MARIA_FILE_BITMAP *bitmap, uint size) +{ + if (size <= bitmap->sizes[3]) + return 3; + if (size <= bitmap->sizes[2]) + return 2; + if (size <= bitmap->sizes[1]) + return 1; + DBUG_ASSERT(size <= bitmap->sizes[0]); + return 0; +} + + +/* + Return bitmap pattern for head block where there is size bytes free + + SYNOPSIS + _ma_free_size_to_head_pattern() + bitmap Bitmap + size Requested size + + RETURN + 0-4 (Possible bitmap patterns for head block) +*/ + +uint _ma_free_size_to_head_pattern(MARIA_FILE_BITMAP *bitmap, uint size) +{ + if (size < bitmap->sizes[3]) + return 4; + if (size < bitmap->sizes[2]) + return 3; + if (size < bitmap->sizes[1]) + return 2; + return (size < bitmap->sizes[0]) ? 1 : 0; +} + + +/* + Return bitmap pattern for the smallest tail block that can hold 'size' + + SYNOPSIS + size_to_tail_pattern() + bitmap Bitmap + size Requested size + + RETURN + 0, 5 or 6 For a description of the bitmap sizes, see the header +*/ + +static uint size_to_tail_pattern(MARIA_FILE_BITMAP *bitmap, uint size) +{ + if (size <= bitmap->sizes[6]) + return 6; + if (size <= bitmap->sizes[5]) + return 5; + DBUG_ASSERT(size <= bitmap->sizes[0]); + return 0; +} + + +/* + Return bitmap pattern for tail block where there is size bytes free + + SYNOPSIS + free_size_to_tail_pattern() + bitmap Bitmap + size Requested size + + RETURN + 0, 5, 6, 7 For a description of the bitmap sizes, see the header +*/ + +static uint free_size_to_tail_pattern(MARIA_FILE_BITMAP *bitmap, uint size) +{ + if (size >= bitmap->sizes[0]) + return 0; /* Revert to empty page */ + if (size < bitmap->sizes[6]) + return 7; + if (size < bitmap->sizes[5]) + return 6; + return 5; +} + + +/* + Return size guranteed to be available on a page + + SYNOPSIS + pattern_to_head_size() + bitmap Bitmap + pattern Pattern (0-7) + + RETURN + 0 - block_size +*/ + +static inline uint pattern_to_size(MARIA_FILE_BITMAP *bitmap, uint pattern) +{ + DBUG_ASSERT(pattern <= 7); + return bitmap->sizes[pattern]; +} + + +/* + Print bitmap for debugging + + SYNOPSIS + _ma_print_bitmap_changes() + bitmap Bitmap to print + + IMPLEMENTATION + Prints all changed bits since last call to _ma_print_bitmap(). + This is done by having a copy of the last bitmap in + bitmap->map+bitmap->block_size. +*/ + +#ifndef DBUG_OFF + +static void _ma_print_bitmap_changes(MARIA_FILE_BITMAP *bitmap) +{ + uchar *pos, *end, *org_pos; + ulong page; + DBUG_ENTER("_ma_print_bitmap_changes"); + + end= bitmap->map + bitmap->used_size; + DBUG_LOCK_FILE; + fprintf(DBUG_FILE,"\nBitmap page changes at page: %lu bitmap: %p\n", + (ulong) bitmap->page, bitmap->map); + + page= (ulong) bitmap->page+1; + for (pos= bitmap->map, org_pos= bitmap->map + bitmap->block_size ; + pos < end ; + pos+= 6, org_pos+= 6) + { + ulonglong bits= uint6korr(pos); /* 6 bytes = 6*8/3= 16 patterns */ + ulonglong org_bits= uint6korr(org_pos); + uint i; + + /* + Test if there is any changes in the next 16 bitmaps (to not have to + loop through all bits if we know they are the same) + */ + if (bits != org_bits) + { + for (i= 0; i < 16 ; i++, bits>>= 3, org_bits>>= 3) + { + if ((bits & 7) != (org_bits & 7)) + fprintf(DBUG_FILE, "Page: %8lu %s -> %s\n", page+i, + bits_to_txt[org_bits & 7], bits_to_txt[bits & 7]); + } + } + page+= 16; + } + fputc('\n', DBUG_FILE); + DBUG_UNLOCK_FILE; + memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size); + DBUG_VOID_RETURN; +} + + +/* Print content of bitmap for debugging */ + +void _ma_print_bitmap(MARIA_FILE_BITMAP *bitmap, uchar *data, + pgcache_page_no_t page) +{ + uchar *pos, *end; + char llbuff[22]; + + DBUG_LOCK_FILE; + fprintf(DBUG_FILE,"\nDump of bitmap page at %s\n", llstr(page, llbuff)); + + page++; /* Skip bitmap page */ + for (pos= data, end= pos + bitmap->max_total_size; + pos < end ; + pos+= 6) + { + ulonglong bits= uint6korr(pos); /* 6 bytes = 6*8/3= 16 patterns */ + + /* + Test if there is any changes in the next 16 bitmaps (to not have to + loop through all bits if we know they are the same) + */ + if (bits) + { + uint i; + for (i= 0; i < 16 ; i++, bits>>= 3) + { + if (bits & 7) + fprintf(DBUG_FILE, "Page: %8s %s\n", llstr(page+i, llbuff), + bits_to_txt[bits & 7]); + } + } + page+= 16; + } + fputc('\n', DBUG_FILE); + DBUG_UNLOCK_FILE; +} + +#endif /* DBUG_OFF */ + + +/* + Return content of bitmap as a printable string +*/ + +size_t _ma_get_bitmap_description(MARIA_FILE_BITMAP *bitmap, + uchar *bitmap_data, + pgcache_page_no_t page, + char *out) +{ + uchar *pos, *end; + size_t count=0, dot_printed= 0, len; + char buff[80], last[80]; + + page++; + last[0]=0; + for (pos= bitmap_data, end= pos+ bitmap->used_size ; pos < end ; pos+= 6) + { + ulonglong bits= uint6korr(pos); /* 6 bytes = 6*8/3= 16 patterns */ + uint i; + + for (i= 0; i < 16 ; i++, bits>>= 3) + { + if (count > 60) + { + if (memcmp(buff, last, count)) + { + memcpy(last, buff, count); + len= sprintf(out, "%8lu: ", (ulong) (page - count)); + memcpy(out+len, buff, count); + out+= len + count + 1; + out[-1]= '\n'; + dot_printed= 0; + } + else if (!(dot_printed++)) + { + out= strmov(out, "...\n"); + } + count= 0; + } + buff[count++]= '0' + (uint) (bits & 7); + page++; + } + } + len= sprintf(out, "%8lu: ", (ulong) (page - count)); + memcpy(out+len, buff, count); + out[len + count]= '\n'; + out[len + count + 1]= 0; + return len + count + 1; +} + + +/* + Adjust bitmap->total_size to not go over max_data_file_size +*/ + +static void adjust_total_size(MARIA_HA *info, pgcache_page_no_t page) +{ + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + + if (page < bitmap->last_bitmap_page) + bitmap->total_size= bitmap->max_total_size; /* Use all bits in bitmap */ + else + bitmap->total_size= bitmap->last_total_size; +} + +/*************************************************************************** + Reading & writing bitmap pages +***************************************************************************/ + +/* + Read a given bitmap page + + SYNOPSIS + _ma_read_bitmap_page() + info Maria handler + bitmap Bitmap handler + page Page to read + + NOTE + We don't always have share->bitmap.bitmap_lock here + (when called from_ma_check_bitmap_data() for example). + + RETURN + 0 ok + 1 error (Error writing old bitmap or reading bitmap page) +*/ + +static my_bool _ma_read_bitmap_page(MARIA_HA *info, + MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page) +{ + MARIA_SHARE *share= info->s; + my_bool res; + DBUG_ENTER("_ma_read_bitmap_page"); + DBUG_PRINT("enter", ("page: %lld data_file_length: %lld", + (longlong) page, + (longlong) share->state.state.data_file_length)); + DBUG_ASSERT(page % bitmap->pages_covered == 0); + DBUG_ASSERT(!bitmap->changed); + + bitmap->page= page; + if ((page + 1) * bitmap->block_size > share->state.state.data_file_length) + { + /* Inexistent or half-created page */ + res= _ma_bitmap_create_missing(info, bitmap, page); + if (!res) + adjust_total_size(info, page); + DBUG_RETURN(res); + } + + adjust_total_size(info, page); + bitmap->full_head_size= bitmap->full_tail_size= 0; + DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size); + res= pagecache_read(share->pagecache, + &bitmap->file, page, 0, + bitmap->map, PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == NULL; + + if (!res) + { + /* Calculate used_size */ + const uchar *data, *end= bitmap->map; + for (data= bitmap->map + bitmap->total_size; --data >= end && *data == 0; ) + {} + bitmap->used_size= (uint) ((data + 1) - end); + DBUG_ASSERT(bitmap->used_size <= bitmap->total_size); + } + else + { + _ma_set_fatal_error(info, my_errno); + } + /* + We can't check maria_bitmap_marker here as if the bitmap page + previously had a true checksum and the user switched mode to not checksum + this may have any value, except maria_normal_page_marker. + + Using maria_normal_page_marker gives us a protection against bugs + when running without any checksums. + */ + +#ifndef DBUG_OFF + if (!res) + { + memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size); + _ma_check_bitmap(bitmap); + } +#endif + DBUG_RETURN(res); +} + + +/* + Change to another bitmap page + + SYNOPSIS + _ma_change_bitmap_page() + info Maria handler + bitmap Bitmap handler + page Bitmap page to read + + NOTES + If old bitmap was changed, write it out before reading new one + We return empty bitmap if page is outside of file size + + RETURN + 0 ok + 1 error (Error writing old bitmap or reading bitmap page) +*/ + +static my_bool _ma_change_bitmap_page(MARIA_HA *info, + MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page) +{ + DBUG_ENTER("_ma_change_bitmap_page"); + + _ma_check_bitmap(bitmap); + + /* + We have to mark the file changed here, as otherwise the following + read/write to pagecache may force a page out from this file, which would + cause _ma_mark_file_changed() to be called with bitmaplock hold! + */ + _ma_bitmap_mark_file_changed(info->s, 1); + + if (bitmap->changed) + { + if (write_changed_bitmap(info->s, bitmap)) + DBUG_RETURN(1); + bitmap->changed= 0; + } + DBUG_RETURN(_ma_read_bitmap_page(info, bitmap, page)); +} + + +/* + Read next suitable bitmap + + SYNOPSIS + move_to_next_bitmap() + bitmap Bitmap handle + + NOTES + The found bitmap may be full, so calling function may need to call this + repeatedly until it finds enough space. + + TODO + Add cache of bitmaps to not read something that is not usable + + RETURN + 0 ok + 1 error (either couldn't save old bitmap or read new one) +*/ + +static my_bool move_to_next_bitmap(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap) +{ + pgcache_page_no_t page= bitmap->page; + MARIA_STATE_INFO *state= &info->s->state; + DBUG_ENTER("move_to_next_bitmap"); + + if (state->first_bitmap_with_space != ~(pgcache_page_no_t) 0 && + state->first_bitmap_with_space != page) + { + page= state->first_bitmap_with_space; + state->first_bitmap_with_space= ~(pgcache_page_no_t) 0; + DBUG_ASSERT(page % bitmap->pages_covered == 0); + } + else + { + page+= bitmap->pages_covered; + DBUG_ASSERT(page % bitmap->pages_covered == 0); + } + DBUG_RETURN(_ma_change_bitmap_page(info, bitmap, page)); +} + + +/**************************************************************************** + Allocate data in bitmaps +****************************************************************************/ + +/* + Store data in 'block' and mark the place used in the bitmap + + SYNOPSIS + fill_block() + bitmap Bitmap handle + block Store data about what we found + best_data Pointer to best 6 uchar aligned area in bitmap->map + best_pos Which bit in *best_data the area starts + 0 = first bit pattern, 1 second bit pattern etc + best_bits The original value of the bits at best_pos + fill_pattern Bitmap pattern to store in best_data[best_pos] + + NOTES + We mark all pages to be 'TAIL's, which means that + block->page_count is really a row position inside the page. +*/ + +static void fill_block(MARIA_FILE_BITMAP *bitmap, + MARIA_BITMAP_BLOCK *block, + uchar *best_data, uint best_pos, uint best_bits, + uint fill_pattern) +{ + uint page, offset, tmp; + uchar *data; + DBUG_ENTER("fill_block"); + + /* For each 6 bytes we have 6*8/3= 16 patterns */ + page= ((uint) (best_data - bitmap->map)) / 6 * 16 + best_pos; + DBUG_ASSERT(page + 1 < bitmap->pages_covered); + block->page= bitmap->page + 1 + page; + block->page_count= TAIL_PAGE_COUNT_MARKER; + block->empty_space= pattern_to_size(bitmap, best_bits); + block->sub_blocks= 0; + block->org_bitmap_value= best_bits; + block->used= BLOCKUSED_TAIL; /* See _ma_bitmap_release_unused() */ + + /* + Mark place used by reading/writing 2 bytes at a time to handle + bitmaps in overlapping bytes + */ + best_pos*= 3; + data= best_data+ best_pos / 8; + offset= best_pos & 7; + tmp= uint2korr(data); + + /* we turn off the 3 bits and replace them with fill_pattern */ + tmp= (tmp & ~(7 << offset)) | (fill_pattern << offset); + int2store(data, tmp); + bitmap->changed= 1; + DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap);); + DBUG_VOID_RETURN; +} + + +/* + Allocate data for head block + + SYNOPSIS + allocate_head() + bitmap bitmap + size Size of data region we need to store + block Store found information here + + IMPLEMENTATION + Find the best-fit page to put a region of 'size' + This is defined as the first page of the set of pages + with the smallest free space that can hold 'size'. + + NOTES + Updates bitmap->full_head_size while scanning data + + RETURN + 0 ok (block is updated) + 1 error (no space in bitmap; block is not touched) +*/ + + +static my_bool allocate_head(MARIA_FILE_BITMAP *bitmap, uint size, + MARIA_BITMAP_BLOCK *block) +{ + uint min_bits= size_to_head_pattern(bitmap, size); + uchar *data, *end; + uchar *best_data= 0; + uint best_bits= (uint) -1, UNINIT_VAR(best_pos); + my_bool first_pattern= 0; /* if doing insert_order */ + my_bool first_found= 1; + MARIA_SHARE *share= bitmap->share; + my_bool insert_order= + MY_TEST(share->base.extra_options & MA_EXTRA_OPTIONS_INSERT_ORDER); + DBUG_ENTER("allocate_head"); + + DBUG_ASSERT(size <= FULL_PAGE_SIZE(share)); + + end= bitmap->map + bitmap->used_size; + if (insert_order && bitmap->page == share->last_insert_bitmap) + { + uint last_insert_page= share->last_insert_page; + uint byte= 6 * (last_insert_page / 16); + first_pattern= last_insert_page % 16; + data= bitmap->map+byte; + first_found= 0; /* Don't update full_head_size */ + DBUG_ASSERT(data <= end); + } + else + data= bitmap->map + (bitmap->full_head_size/6)*6; + + for (; data < end; data+= 6, first_pattern= 0) + { + ulonglong bits= uint6korr(data); /* 6 bytes = 6*8/3= 16 patterns */ + uint i; + + /* + Skip common patterns + We can skip empty pages (if we already found a match) or + anything matching the following pattern as this will be either + a full page or a tail page + */ + if ((!bits && best_data) || + ((bits & 04444444444444444LL) == 04444444444444444LL)) + continue; + + for (i= first_pattern, bits >>= (3 * first_pattern); i < 16 ; + i++, bits >>= 3) + { + uint pattern= (uint) (bits & 7); + + if (pattern <= 3) /* Room for more data */ + { + if (first_found) + { + first_found= 0; + bitmap->full_head_size= (uint)(data - bitmap->map); + } + } + if (pattern <= min_bits) + { + /* There is enough space here, check if we have found better */ + if ((int) pattern > (int) best_bits) + { + /* + There is more than enough space here and it's better than what + we have found so far. Remember it, as we will choose it if we + don't find anything in this bitmap page. + */ + best_bits= pattern; + best_data= data; + best_pos= i; + if (pattern == min_bits || bitmap->return_first_match) + goto found; /* Best possible match */ + } + } + } + } + if (!best_data) /* Found no place */ + { + if (data >= bitmap->map + bitmap->total_size) + DBUG_RETURN(1); /* No space in bitmap */ + DBUG_ASSERT(uint6korr(data) == 0); + /* Allocate data at end of bitmap */ + bitmap->used_size= (uint) (data - bitmap->map) + 6; + best_data= data; + best_pos= best_bits= 0; + } + else + { + /* + This is not stricly needed as used_size should be alligned on 6, + but for easier debugging lets try to keep it more accurate + */ + uint position= (uint) (best_data - bitmap->map) + 6; + set_if_bigger(bitmap->used_size, position); + } + DBUG_ASSERT(bitmap->used_size <= bitmap->total_size); + +found: + if (insert_order) + { + share->last_insert_page= + ((uint) (best_data - bitmap->map)) / 6 * 16 + best_pos; + share->last_insert_bitmap= bitmap->page; + } + fill_block(bitmap, block, best_data, best_pos, best_bits, FULL_HEAD_PAGE); + DBUG_RETURN(0); +} + + +/* + Allocate data for tail block + + SYNOPSIS + allocate_tail() + bitmap bitmap + size Size of block we need to find + block Store found information here + + RETURN + 0 ok (block is updated) + 1 error (no space in bitmap; block is not touched) +*/ + + +static my_bool allocate_tail(MARIA_FILE_BITMAP *bitmap, uint size, + MARIA_BITMAP_BLOCK *block) +{ + uint min_bits= size_to_tail_pattern(bitmap, size); + uchar *data, *end, *best_data= 0; + my_bool first_found= 1; + uint best_bits= (uint) -1, UNINIT_VAR(best_pos); + DBUG_ENTER("allocate_tail"); + DBUG_PRINT("enter", ("size: %u", size)); + + data= bitmap->map + (bitmap->full_tail_size/6)*6; + end= bitmap->map + bitmap->used_size; + + /* + We have to add DIR_ENTRY_SIZE here as this is not part of the data size + See call to allocate_tail() in find_tail(). + */ + DBUG_ASSERT(size <= MAX_TAIL_SIZE(bitmap->block_size) + DIR_ENTRY_SIZE); + + for (; data < end; data += 6) + { + ulonglong bits= uint6korr(data); /* 6 bytes = 6*8/3= 16 patterns */ + uint i; + + /* + Skip common patterns + We can skip empty pages (if we already found a match) or + the following patterns: 1-4 (head pages, not suitable for tail) or + 7 (full tail page). See 'Dynamic size records' comment at start of file. + + At the moment we only skip full head and tail pages (ie, all bits are + set) as this is easy to detect with one simple test and is a + quite common case if we have blobs. + */ + + if ((!bits && best_data) || bits == 0xffffffffffffLL || + bits == 04444444444444444LL) + continue; + for (i= 0; i < 16; i++, bits >>= 3) + { + uint pattern= (uint) (bits & 7); + + if (pattern == 0 || + (pattern > FULL_HEAD_PAGE && pattern < FULL_TAIL_PAGE)) + { + /* There is room for tail data */ + if (first_found) + { + first_found= 0; + bitmap->full_tail_size= (uint)(data - bitmap->map); + } + } + + if (pattern <= min_bits && (!pattern || pattern > FULL_HEAD_PAGE)) + { + if ((int) pattern > (int) best_bits) + { + best_bits= pattern; + best_data= data; + best_pos= i; + if (pattern == min_bits) + goto found; /* Can't be better */ + } + } + } + } + if (!best_data) + { + if (data >= bitmap->map + bitmap->total_size) + DBUG_RETURN(1); + DBUG_ASSERT(uint6korr(data) == 0); + /* Allocate data at end of bitmap */ + best_data= data; + bitmap->used_size= (uint) (data - bitmap->map) + 6; + DBUG_ASSERT(bitmap->used_size <= bitmap->total_size); + best_pos= best_bits= 0; + } + +found: + fill_block(bitmap, block, best_data, best_pos, best_bits, FULL_TAIL_PAGE); + DBUG_RETURN(0); +} + + +/* + Allocate data for full blocks + + SYNOPSIS + allocate_full_pages() + bitmap bitmap + pages_needed Total size in pages (bitmap->total_size) we would like to have + block Store found information here + full_page 1 if we are not allowed to split extent + + IMPLEMENTATION + We will return the smallest area >= size. If there is no such + block, we will return the biggest area that satisfies + area_size >= MY_MIN(BLOB_SEGMENT_MIN_SIZE*full_page_size, size) + + To speed up searches, we will only consider areas that has at least 16 free + pages starting on an even boundary. When finding such an area, we will + extend it with all previous and following free pages. This will ensure + we don't get holes between areas + + RETURN + # Blocks used + 0 error (no space in bitmap; block is not touched) +*/ + +static ulong allocate_full_pages(MARIA_FILE_BITMAP *bitmap, + ulong pages_needed, + MARIA_BITMAP_BLOCK *block, my_bool full_page) +{ + uchar *data, *data_end, *page_end; + uchar *best_data= 0; + uint min_size; + uint best_area_size, UNINIT_VAR(best_prefix_area_size); + uint page, size; + ulonglong UNINIT_VAR(best_prefix_bits); + DBUG_ENTER("allocate_full_pages"); + DBUG_PRINT("enter", ("pages_needed: %lu", pages_needed)); + + min_size= pages_needed; + if (!full_page && min_size > BLOB_SEGMENT_MIN_SIZE) + min_size= BLOB_SEGMENT_MIN_SIZE; + best_area_size= ~(uint) 0; + + data= bitmap->map + (bitmap->full_head_size/6)*6; + data_end= bitmap->map + bitmap->used_size; + page_end= bitmap->map + bitmap->total_size; + + for (; data < page_end; data+= 6) + { + ulonglong bits= uint6korr(data); /* 6 bytes = 6*8/3= 16 patterns */ + uchar *data_start; + ulonglong prefix_bits= 0; + uint area_size, prefix_area_size, suffix_area_size; + + /* Find area with at least 16 free pages */ + if (bits) + continue; + data_start= data; + /* Find size of area */ + for (data+=6 ; data < data_end ; data+= 6) + { + if ((bits= uint6korr(data))) + break; + } + /* + Check if we are end of bitmap. In this case we know that + the rest of the bitmap is usable + */ + if (data >= data_end) + data= page_end; + area_size= (uint) (data - data_start) / 6 * 16; + if (area_size >= best_area_size) + continue; + prefix_area_size= suffix_area_size= 0; + if (!bits) + { + /* + End of page; All the rest of the bits on page are part of area + This is needed because bitmap->used_size only covers the set bits + in the bitmap. + */ + area_size+= (uint) (page_end - data) / 6 * 16; + if (area_size >= best_area_size) + break; + data= page_end; + } + else + { + /* Add bits at end of page */ + for (; !(bits & 7); bits >>= 3) + suffix_area_size++; + area_size+= suffix_area_size; + } + if (data_start != bitmap->map) + { + /* Add bits before page */ + bits= prefix_bits= uint6korr(data_start - 6); + DBUG_ASSERT(bits != 0); + /* 111 000 000 000 000 000 000 000 000 000 000 000 000 000 000 000 */ + if (!(bits & 07000000000000000LL)) + { + data_start-= 6; + do + { + prefix_area_size++; + bits<<= 3; + } while (!(bits & 07000000000000000LL)); + area_size+= prefix_area_size; + /* Calculate offset to page from data_start */ + prefix_area_size= 16 - prefix_area_size; + } + } + if (area_size >= min_size && area_size <= best_area_size) + { + best_data= data_start; + best_area_size= area_size; + best_prefix_bits= prefix_bits; + best_prefix_area_size= prefix_area_size; + + /* Prefer to put data in biggest possible area */ + if (area_size <= pages_needed) + min_size= area_size; + else + min_size= pages_needed; + } + } + if (!best_data) + DBUG_RETURN(0); /* No room on page */ + + /* + Now allocate MY_MIN(pages_needed, area_size), starting from + best_start + best_prefix_area_size + */ + if (best_area_size > pages_needed) + best_area_size= pages_needed; + + /* For each 6 bytes we have 6*8/3= 16 patterns */ + page= ((uint) (best_data - bitmap->map) * 8) / 3 + best_prefix_area_size; + block->page= bitmap->page + 1 + page; + block->page_count= best_area_size; + block->empty_space= 0; + block->sub_blocks= 0; + block->org_bitmap_value= 0; + block->used= 0; + DBUG_ASSERT(page + best_area_size < bitmap->pages_covered); + DBUG_PRINT("info", ("page: %lu page_count: %u", + (ulong) block->page, block->page_count)); + + if (best_prefix_area_size) + { + ulonglong tmp; + /* Convert offset back to bits */ + best_prefix_area_size= 16 - best_prefix_area_size; + if (best_area_size < best_prefix_area_size) + { + tmp= (1LL << best_area_size*3) - 1; + best_area_size= best_prefix_area_size; /* for easy end test */ + } + else + tmp= (1LL << best_prefix_area_size*3) - 1; + tmp<<= (16 - best_prefix_area_size) * 3; + DBUG_ASSERT((best_prefix_bits & tmp) == 0); + best_prefix_bits|= tmp; + int6store(best_data, best_prefix_bits); + if (!(best_area_size-= best_prefix_area_size)) + goto end; + best_data+= 6; + } + best_area_size*= 3; /* Bits to set */ + size= best_area_size/8; /* Bytes to set */ + bfill(best_data, size, 255); + best_data+= size; + if ((best_area_size-= size * 8)) + { + /* fill last uchar */ + *best_data|= (uchar) ((1 << best_area_size) -1); + best_data++; + } + if (data_end < best_data) + { + bitmap->used_size= (uint) (best_data - bitmap->map); + DBUG_ASSERT(bitmap->used_size <= bitmap->total_size); + } +end: + bitmap->changed= 1; + DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap);); + DBUG_RETURN(block->page_count); +} + + +/**************************************************************************** + Find right bitmaps where to store data +****************************************************************************/ + +/* + Find right bitmap and position for head block + + SYNOPSIS + find_head() + info Maria handler + length Size of data region we need store + position Position in bitmap_blocks where to store the + information for the head block. + + RETURN + 0 ok + 1 error +*/ + +static my_bool find_head(MARIA_HA *info, uint length, uint position) +{ + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + MARIA_BITMAP_BLOCK *block; + /* + There is always place for the head block in bitmap_blocks as these are + preallocated at _ma_init_block_record(). + */ + block= dynamic_element(&info->bitmap_blocks, position, MARIA_BITMAP_BLOCK *); + + if (info->s->base.extra_options & MA_EXTRA_OPTIONS_INSERT_ORDER) + { + if (bitmap->page != info->s->last_insert_bitmap && + _ma_change_bitmap_page(info, bitmap, + info->s->last_insert_bitmap)) + return 1; + /* Don't allocate any blocks from earlier pages */ + info->s->state.first_bitmap_with_space= info->s->last_insert_bitmap; + } + + /* + We need to have DIRENTRY_SIZE here to take into account that we may + need an extra directory entry for the row + */ + while (allocate_head(bitmap, length + DIR_ENTRY_SIZE, block)) + if (move_to_next_bitmap(info, bitmap)) + return 1; + return 0; +} + + +/* + Find right bitmap and position for tail + + SYNOPSIS + find_tail() + info Maria handler + length Size of data region we need store + position Position in bitmap_blocks where to store the + information for the head block. + + RETURN + 0 ok + 1 error +*/ + +static my_bool find_tail(MARIA_HA *info, uint length, size_t position) +{ + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + MARIA_BITMAP_BLOCK *block; + DBUG_ENTER("find_tail"); + DBUG_ASSERT(length <= info->s->block_size - PAGE_OVERHEAD_SIZE(info->s)); + + /* Needed, as there is no error checking in dynamic_element */ + if (allocate_dynamic(&info->bitmap_blocks, position)) + DBUG_RETURN(1); + block= dynamic_element(&info->bitmap_blocks, position, MARIA_BITMAP_BLOCK *); + + /* + We have to add DIR_ENTRY_SIZE to ensure we have space for the tail and + it's directroy entry on the page + */ + while (allocate_tail(bitmap, length + DIR_ENTRY_SIZE, block)) + if (move_to_next_bitmap(info, bitmap)) + DBUG_RETURN(1); + DBUG_RETURN(0); +} + + +/* + Find right bitmap and position for full blocks in one extent + + SYNOPSIS + find_mid() + info Maria handler. + pages How many pages to allocate. + position Position in bitmap_blocks where to store the + information for the head block. + NOTES + This is used to allocate the main extent after the 'head' block + (Ie, the middle part of the head-middle-tail entry) + + RETURN + 0 ok + 1 error +*/ + +static my_bool find_mid(MARIA_HA *info, ulong pages, uint position) +{ + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + MARIA_BITMAP_BLOCK *block; + block= dynamic_element(&info->bitmap_blocks, position, MARIA_BITMAP_BLOCK *); + + while (!allocate_full_pages(bitmap, pages, block, 1)) + { + if (move_to_next_bitmap(info, bitmap)) + return 1; + } + return 0; +} + + +/* + Find right bitmap and position for putting a blob + + SYNOPSIS + find_blob() + info Maria handler. + length Length of the blob + + NOTES + The extents are stored last in info->bitmap_blocks + + IMPLEMENTATION + Allocate all full pages for the block + optionally one tail + + RETURN + 0 ok + 1 error +*/ + +static my_bool find_blob(MARIA_HA *info, ulong length) +{ + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + uint full_page_size= FULL_PAGE_SIZE(info->s); + ulong pages; + uint rest_length, used; + size_t UNINIT_VAR(first_block_pos); + MARIA_BITMAP_BLOCK *first_block= 0; + DBUG_ENTER("find_blob"); + DBUG_PRINT("enter", ("length: %lu", length)); + + pages= length / full_page_size; + rest_length= (uint) (length - pages * full_page_size); + if (rest_length >= MAX_TAIL_SIZE(info->s->block_size)) + { + pages++; + rest_length= 0; + } + + first_block_pos= info->bitmap_blocks.elements; + if (pages) + { + MARIA_BITMAP_BLOCK *block; + if (allocate_dynamic(&info->bitmap_blocks, + info->bitmap_blocks.elements + + pages / BLOB_SEGMENT_MIN_SIZE + 2)) + DBUG_RETURN(1); + block= dynamic_element(&info->bitmap_blocks, info->bitmap_blocks.elements, + MARIA_BITMAP_BLOCK*); + do + { + /* + We use 0x3fff here as the two upmost bits are reserved for + TAIL_BIT and START_EXTENT_BIT + */ + used= allocate_full_pages(bitmap, + (pages >= 0x3fff ? 0x3fff : (uint) pages), + block, 0); + if (!used) + { + if (move_to_next_bitmap(info, bitmap)) + DBUG_RETURN(1); + } + else + { + pages-= used; + info->bitmap_blocks.elements++; + block++; + } + } while (pages != 0); + } + if (rest_length && find_tail(info, rest_length, + info->bitmap_blocks.elements++)) + DBUG_RETURN(1); + first_block= dynamic_element(&info->bitmap_blocks, first_block_pos, + MARIA_BITMAP_BLOCK*); + first_block->sub_blocks= (uint)(info->bitmap_blocks.elements + - first_block_pos); + DBUG_RETURN(0); +} + + +/* + Find pages to put ALL blobs + + SYNOPSIS + allocate_blobs() + info Maria handler + row Information of what is in the row (from calc_record_size()) + + RETURN + 0 ok + 1 error +*/ + +static my_bool allocate_blobs(MARIA_HA *info, MARIA_ROW *row) +{ + ulong *length, *end; + size_t elements; + /* + Reserve size for: + head block + one extent + tail block + */ + elements= info->bitmap_blocks.elements; + for (length= row->blob_lengths, end= length + info->s->base.blobs; + length < end; length++) + { + if (*length && find_blob(info, *length)) + return 1; + } + row->extents_count= (uint)(info->bitmap_blocks.elements - elements); + return 0; +} + + +/* + Reserve the current head page + + SYNOPSIS + use_head() + info Maria handler + page Page number to update + (Note that caller guarantees this is in the active + bitmap) + size How much free space is left on the page + block_position In which info->bitmap_block we have the + information about the head block. + + NOTES + This is used on update where we are updating an existing head page +*/ + +static void use_head(MARIA_HA *info, pgcache_page_no_t page, uint size, + uint block_position) +{ + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + MARIA_BITMAP_BLOCK *block; + uchar *data; + uint offset, tmp, offset_page; + DBUG_ENTER("use_head"); + + DBUG_ASSERT(page % bitmap->pages_covered); + + block= dynamic_element(&info->bitmap_blocks, block_position, + MARIA_BITMAP_BLOCK*); + block->page= page; + block->page_count= 1 + TAIL_BIT; + block->empty_space= size; + block->used= BLOCKUSED_TAIL; + + /* + Mark place used by reading/writing 2 bytes at a time to handle + bitmaps in overlapping bytes + */ + offset_page= (uint) (page - bitmap->page - 1) * 3; + offset= offset_page & 7; + data= bitmap->map + offset_page / 8; + tmp= uint2korr(data); + block->org_bitmap_value= (tmp >> offset) & 7; + tmp= (tmp & ~(7 << offset)) | (FULL_HEAD_PAGE << offset); + int2store(data, tmp); + bitmap->changed= 1; + DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap);); + DBUG_VOID_RETURN; +} + + +/* + Find out where to split the row (ie, what goes in head, middle, tail etc) + + SYNOPSIS + find_where_to_split_row() + share Maria share + row Information of what is in the row (from calc_record_size()) + extents Max number of extents we have to store in header + split_size Free size on the page (The head length must be less + than this) + + RETURN + row_length for the head block. +*/ + +static uint find_where_to_split_row(MARIA_SHARE *share, MARIA_ROW *row, + uint extents, uint split_size) +{ + uint *lengths, *lengths_end; + /* + Ensure we have the minimum required space on head page: + - Header + length of field lengths (row->min_length) + - Number of extents + - One extent + */ + uint row_length= (row->min_length + + size_to_store_key_length(extents) + + ROW_EXTENT_SIZE); + DBUG_ASSERT(row_length <= split_size); + + /* + Store first in all_field_lengths the different parts that are written + to the row. This needs to be in same order as in + ma_block_rec.c::write_block_record() + */ + row->null_field_lengths[-3]= extents * ROW_EXTENT_SIZE; + row->null_field_lengths[-2]= share->base.fixed_not_null_fields_length; + row->null_field_lengths[-1]= row->field_lengths_length; + for (lengths= row->null_field_lengths - EXTRA_LENGTH_FIELDS, + lengths_end= (lengths + share->base.fields - share->base.blobs + + EXTRA_LENGTH_FIELDS); lengths < lengths_end; lengths++) + { + if (row_length + *lengths > split_size) + break; + row_length+= *lengths; + } + return row_length; +} + + +/* + Find where to write the middle parts of the row and the tail + + SYNOPSIS + write_rest_of_head() + info Maria handler + position Position in bitmap_blocks. Is 0 for rows that needs + full blocks (ie, has a head, middle part and optional tail) + rest_length How much left of the head block to write. + + RETURN + 0 ok + 1 error +*/ + +static my_bool write_rest_of_head(MARIA_HA *info, uint position, + ulong rest_length) +{ + MARIA_SHARE *share= info->s; + uint full_page_size= FULL_PAGE_SIZE(share); + MARIA_BITMAP_BLOCK *block; + DBUG_ENTER("write_rest_of_head"); + DBUG_PRINT("enter", ("position: %u rest_length: %lu", position, + rest_length)); + + if (position == 0) + { + /* Write out full pages */ + uint pages= rest_length / full_page_size; + + rest_length%= full_page_size; + if (rest_length >= MAX_TAIL_SIZE(share->block_size)) + { + /* Put tail on a full page */ + pages++; + rest_length= 0; + } + if (find_mid(info, pages, 1)) + DBUG_RETURN(1); + /* + Insert empty block after full pages, to allow write_block_record() to + split segment into used + free page + */ + block= dynamic_element(&info->bitmap_blocks, 2, MARIA_BITMAP_BLOCK*); + block->page_count= 0; + block->used= 0; + } + if (rest_length) + { + if (find_tail(info, rest_length, ELEMENTS_RESERVED_FOR_MAIN_PART - 1)) + DBUG_RETURN(1); + } + else + { + /* Empty tail block */ + block= dynamic_element(&info->bitmap_blocks, + ELEMENTS_RESERVED_FOR_MAIN_PART - 1, + MARIA_BITMAP_BLOCK *); + block->page_count= 0; + block->used= 0; + } + DBUG_RETURN(0); +} + + +/* + Find where to store one row + + SYNPOSIS + _ma_bitmap_find_place() + info Maria handler + row Information about row to write + blocks Store data about allocated places here + + RETURN + 0 ok + row->space_on_head_page contains minimum number of bytes we + expect to put on the head page. + 1 error + my_errno is set to error +*/ + +my_bool _ma_bitmap_find_place(MARIA_HA *info, MARIA_ROW *row, + MARIA_BITMAP_BLOCKS *blocks) +{ + MARIA_SHARE *share= info->s; + my_bool res= 1; + uint full_page_size, position, max_page_size; + uint head_length, row_length, rest_length, extents_length; + DBUG_ENTER("_ma_bitmap_find_place"); + + blocks->count= 0; + blocks->tail_page_skipped= blocks->page_skipped= 0; + row->extents_count= 0; + + /* + Reserve place for the following blocks: + - Head block + - Full page block + - Marker block to allow write_block_record() to split full page blocks + into full and free part + - Tail block + */ + + info->bitmap_blocks.elements= ELEMENTS_RESERVED_FOR_MAIN_PART; + max_page_size= (share->block_size - PAGE_OVERHEAD_SIZE(share)); + + mysql_mutex_lock(&share->bitmap.bitmap_lock); + + if (row->total_length <= max_page_size) + { + /* Row fits in one page */ + position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1; + if (find_head(info, (uint) row->total_length, position)) + goto abort; + row->space_on_head_page= row->total_length; + goto end; + } + + /* + First allocate all blobs so that we can find out the needed size for + the main block. + */ + if (row->blob_length && allocate_blobs(info, row)) + goto abort; + + extents_length= row->extents_count * ROW_EXTENT_SIZE; + /* + The + 3 is reserved for storing the number of segments in the row header. + */ + if ((head_length= (row->head_length + extents_length + 3)) <= + max_page_size) + { + /* Main row part fits into one page */ + position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1; + if (find_head(info, head_length, position)) + goto abort; + row->space_on_head_page= head_length; + goto end; + } + + /* Allocate enough space */ + head_length+= ELEMENTS_RESERVED_FOR_MAIN_PART * ROW_EXTENT_SIZE; + + /* The first segment size is stored in 'row_length' */ + row_length= find_where_to_split_row(share, row, row->extents_count + + ELEMENTS_RESERVED_FOR_MAIN_PART-1, + max_page_size); + + full_page_size= MAX_TAIL_SIZE(share->block_size); + position= 0; + rest_length= head_length - row_length; + if (rest_length <= full_page_size) + position= ELEMENTS_RESERVED_FOR_MAIN_PART -2; /* Only head and tail */ + if (find_head(info, row_length, position)) + goto abort; + row->space_on_head_page= row_length; + + if (write_rest_of_head(info, position, rest_length)) + goto abort; + +end: + blocks->block= dynamic_element(&info->bitmap_blocks, position, + MARIA_BITMAP_BLOCK*); + blocks->block->sub_blocks= ELEMENTS_RESERVED_FOR_MAIN_PART - position; + /* First block's page_count is for all blocks */ + blocks->count= (uint)(info->bitmap_blocks.elements - position); + res= 0; + +abort: + mysql_mutex_unlock(&share->bitmap.bitmap_lock); + DBUG_RETURN(res); +} + + +/* + Find where to put row on update (when head page is already defined) + + SYNPOSIS + _ma_bitmap_find_new_place() + info Maria handler + row Information about row to write + page On which page original row was stored + free_size Free size on head page + blocks Store data about allocated places here + + NOTES + This function is only called when the new row can't fit in the space of + the old row in the head page. + + This is essently same as _ma_bitmap_find_place() except that + we don't call find_head() to search in bitmaps where to put the page. + + RETURN + 0 ok + 1 error +*/ + +my_bool _ma_bitmap_find_new_place(MARIA_HA *info, MARIA_ROW *row, + pgcache_page_no_t page, uint free_size, + MARIA_BITMAP_BLOCKS *blocks) +{ + MARIA_SHARE *share= info->s; + my_bool res= 1; + uint position; + uint head_length, row_length, rest_length, extents_length; + ulonglong bitmap_page; + DBUG_ENTER("_ma_bitmap_find_new_place"); + + blocks->count= 0; + blocks->tail_page_skipped= blocks->page_skipped= 0; + row->extents_count= 0; + info->bitmap_blocks.elements= ELEMENTS_RESERVED_FOR_MAIN_PART; + + mysql_mutex_lock(&share->bitmap.bitmap_lock); + + /* + First allocate all blobs (so that we can find out the needed size for + the main block. + */ + if (row->blob_length && allocate_blobs(info, row)) + goto abort; + + /* Switch bitmap to current head page */ + bitmap_page= page - page % share->bitmap.pages_covered; + + if (share->bitmap.page != bitmap_page && + _ma_change_bitmap_page(info, &share->bitmap, bitmap_page)) + goto abort; + + extents_length= row->extents_count * ROW_EXTENT_SIZE; + if ((head_length= (row->head_length + extents_length + 3)) <= free_size) + { + /* Main row part fits into one page */ + position= ELEMENTS_RESERVED_FOR_MAIN_PART - 1; + use_head(info, page, head_length, position); + row->space_on_head_page= head_length; + goto end; + } + + /* Allocate enough space */ + head_length+= ELEMENTS_RESERVED_FOR_MAIN_PART * ROW_EXTENT_SIZE; + + /* + The first segment size is stored in 'row_length' + We have to add ELEMENTS_RESERVED_FOR_MAIN_PART here as the extent + information may be up to this size when the header splits. + */ + row_length= find_where_to_split_row(share, row, row->extents_count + + ELEMENTS_RESERVED_FOR_MAIN_PART-1, + free_size); + + position= 0; + rest_length= head_length - row_length; + if (rest_length <= MAX_TAIL_SIZE(share->block_size)) + position= ELEMENTS_RESERVED_FOR_MAIN_PART -2; /* Only head and tail */ + use_head(info, page, row_length, position); + row->space_on_head_page= row_length; + + if (write_rest_of_head(info, position, rest_length)) + goto abort; + +end: + blocks->block= dynamic_element(&info->bitmap_blocks, position, + MARIA_BITMAP_BLOCK*); + blocks->block->sub_blocks= ELEMENTS_RESERVED_FOR_MAIN_PART - position; + /* First block's page_count is for all blocks */ + blocks->count= (uint)(info->bitmap_blocks.elements - position); + res= 0; + +abort: + mysql_mutex_unlock(&share->bitmap.bitmap_lock); + DBUG_RETURN(res); +} + + +/**************************************************************************** + Clear and reset bits +****************************************************************************/ + +/* + Set fill pattern for a page + + set_page_bits() + info Maria handler + bitmap Bitmap handler + page Adress to page + fill_pattern Pattern (not size) for page + + NOTES + Page may not be part of active bitmap + + RETURN + 0 ok + 1 error +*/ + +static my_bool set_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page, uint fill_pattern) +{ + pgcache_page_no_t bitmap_page; + uint offset_page, offset, tmp, org_tmp, used_offset; + uchar *data; + DBUG_ENTER("set_page_bits"); + DBUG_ASSERT(fill_pattern <= 7); + + bitmap_page= page - page % bitmap->pages_covered; + if (bitmap_page != bitmap->page && + _ma_change_bitmap_page(info, bitmap, bitmap_page)) + DBUG_RETURN(1); + + /* Find page number from start of bitmap */ + offset_page= (uint) (page - bitmap->page - 1); + + /* + Mark place used by reading/writing 2 bytes at a time to handle + bitmaps in overlapping bytes + */ + offset_page*= 3; + offset= offset_page & 7; + data= bitmap->map + offset_page / 8; + org_tmp= tmp= uint2korr(data); + tmp= (tmp & ~(7 << offset)) | (fill_pattern << offset); + if (tmp == org_tmp) + DBUG_RETURN(0); /* No changes */ + + /* + Take care to not write bytes outside of bitmap. + fill_pattern is 3 bits, so we need to write two bytes + if bit position we write to is > (8-3) + */ + if (offset > 5) + int2store(data, tmp); + else + data[0]= tmp; + + /* + Reset full_head_size or full_tail_size if we are releasing data before + it. Increase used_size if we are allocating data. + */ + used_offset= (uint) (data - bitmap->map); + if (fill_pattern < 4) + set_if_smaller(bitmap->full_head_size, used_offset); + if (fill_pattern == 0 || (fill_pattern > 4 && fill_pattern < 7)) + set_if_smaller(bitmap->full_tail_size, used_offset); + if (fill_pattern != 0) + { + /* Calulcate which was the last changed byte */ + used_offset+= offset > 5 ? 2 : 1; + set_if_bigger(bitmap->used_size, used_offset); + } + + _ma_check_bitmap(bitmap); + bitmap->changed= 1; + DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap);); + if (fill_pattern != FULL_HEAD_PAGE && fill_pattern != FULL_TAIL_PAGE) + set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page); + /* + Note that if the condition above is false (page is full), and all pages of + this bitmap are now full, and that bitmap page was + first_bitmap_with_space, we don't modify first_bitmap_with_space, indeed + its value still tells us where to start our search for a bitmap with space + (which is for sure after this full one). + That does mean that first_bitmap_with_space is only a lower bound. + */ + DBUG_RETURN(0); +} + + +/* + Get bitmap pattern for a given page + + SYNOPSIS + bitmap_get_page_bits() + info Maria handler + bitmap Bitmap handler + page Page number + + RETURN + 0-7 Bitmap pattern + ~0 Error (couldn't read page) +*/ + +static uint bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page) +{ + pgcache_page_no_t bitmap_page; + uint offset_page, offset, tmp; + uchar *data; + DBUG_ENTER("_ma_bitmap_get_page_bits"); + + bitmap_page= page - page % bitmap->pages_covered; + if (bitmap_page != bitmap->page && + _ma_change_bitmap_page(info, bitmap, bitmap_page)) + DBUG_RETURN(~ (uint) 0); + + /* Find page number from start of bitmap */ + offset_page= (uint) (page - bitmap->page - 1); + /* + Mark place used by reading/writing 2 bytes at a time to handle + bitmaps in overlapping bytes + */ + offset_page*= 3; + offset= offset_page & 7; + data= bitmap->map + offset_page / 8; + tmp= uint2korr(data); + DBUG_RETURN((tmp >> offset) & 7); +} + + +/* As above, but take a lock while getting the data */ + +uint _ma_bitmap_get_page_bits(MARIA_HA *info, MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page) +{ + uint tmp; + mysql_mutex_lock(&bitmap->bitmap_lock); + tmp= bitmap_get_page_bits(info, bitmap, page); + mysql_mutex_unlock(&bitmap->bitmap_lock); + return tmp; +} + + +/* + Mark all pages in a region as free + + SYNOPSIS + _ma_bitmap_reset_full_page_bits() + info Maria handler + bitmap Bitmap handler + page Start page + page_count Number of pages + + NOTES + We assume that all pages in region is covered by same bitmap + One must have a lock on info->s->bitmap.bitmap_lock + + RETURN + 0 ok + 1 Error (when reading bitmap) +*/ + +my_bool _ma_bitmap_reset_full_page_bits(MARIA_HA *info, + MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page, + uint page_count) +{ + ulonglong bitmap_page; + uint offset, bit_start, bit_count, tmp, byte_offset; + uchar *data; + DBUG_ENTER("_ma_bitmap_reset_full_page_bits"); + DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count)); + mysql_mutex_assert_owner(&info->s->bitmap.bitmap_lock); + + bitmap_page= page - page % bitmap->pages_covered; + DBUG_ASSERT(page != bitmap_page); + + if (bitmap_page != bitmap->page && + _ma_change_bitmap_page(info, bitmap, bitmap_page)) + DBUG_RETURN(1); + + /* Find page number from start of bitmap */ + offset= (uint) (page - bitmap->page - 1); + + /* Clear bits from 'page * 3' -> '(page + page_count) * 3' */ + bit_start= offset * 3; + bit_count= page_count * 3; + + byte_offset= bit_start/8; + data= bitmap->map + byte_offset; + offset= bit_start & 7; + + tmp= (255 << offset); /* Bits to keep */ + if (bit_count + offset < 8) + { + /* Only clear bits between 'offset' and 'offset+bit_count-1' */ + tmp^= (255 << (offset + bit_count)); + } + *data&= ~tmp; + + set_if_smaller(bitmap->full_head_size, byte_offset); + set_if_smaller(bitmap->full_tail_size, byte_offset); + + if ((int) (bit_count-= (8 - offset)) > 0) + { + uint fill; + data++; + /* + -1 is here to avoid one 'if' statement and to let the following code + handle the last byte + */ + if ((fill= (bit_count - 1) / 8)) + { + bzero(data, fill); + data+= fill; + } + bit_count-= fill * 8; /* Bits left to clear */ + tmp= (1 << bit_count) - 1; + *data&= ~tmp; + } + set_if_smaller(info->s->state.first_bitmap_with_space, bitmap_page); + bitmap->changed= 1; + DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap);); + DBUG_RETURN(0); +} + + +/* + Set all pages in a region as used + + SYNOPSIS + _ma_bitmap_set_full_page_bits() + info Maria handler + bitmap Bitmap handler + page Start page + page_count Number of pages + + NOTES + We assume that all pages in region is covered by same bitmap + One must have a lock on info->s->bitmap.bitmap_lock + + RETURN + 0 ok + 1 Error (when reading bitmap) +*/ + +my_bool _ma_bitmap_set_full_page_bits(MARIA_HA *info, + MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page, uint page_count) +{ + ulonglong bitmap_page; + uint offset, bit_start, bit_count, tmp; + uchar *data; + DBUG_ENTER("_ma_bitmap_set_full_page_bits"); + DBUG_PRINT("enter", ("page: %lu page_count: %u", (ulong) page, page_count)); + mysql_mutex_assert_owner(&info->s->bitmap.bitmap_lock); + + bitmap_page= page - page % bitmap->pages_covered; + if (page == bitmap_page || + page + page_count > bitmap_page + bitmap->pages_covered) + { + DBUG_ASSERT(0); /* Wrong in data */ + DBUG_RETURN(1); + } + + if (bitmap_page != bitmap->page && + _ma_change_bitmap_page(info, bitmap, bitmap_page)) + DBUG_RETURN(1); + + /* Find page number from start of bitmap */ + offset= (uint) (page - bitmap->page - 1); + + /* Set bits from 'page * 3' -> '(page + page_count) * 3' */ + bit_start= offset * 3; + bit_count= page_count * 3; + + data= bitmap->map + bit_start / 8; + offset= bit_start & 7; + + tmp= (255 << offset); /* Bits to keep */ + if (bit_count + offset < 8) + { + /* Only set bits between 'offset' and 'offset+bit_count-1' */ + tmp^= (255 << (offset + bit_count)); + } + *data|= tmp; + + if ((int) (bit_count-= (8 - offset)) > 0) + { + uint fill; + data++; + /* + -1 is here to avoid one 'if' statement and to let the following code + handle the last byte + */ + if ((fill= (bit_count - 1) / 8)) + { + bfill(data, fill, 255); + data+= fill; + } + bit_count-= fill * 8; /* Bits left to set */ + tmp= (1 << bit_count) - 1; + *data|= tmp; + } + set_if_bigger(bitmap->used_size, (uint) (data - bitmap->map) + 1); + _ma_check_bitmap(bitmap); + bitmap->changed= 1; + DBUG_EXECUTE("bitmap", _ma_print_bitmap_changes(bitmap);); + DBUG_RETURN(0); +} + + +/** + @brief + Make a transition of MARIA_FILE_BITMAP::non_flushable. + If the bitmap becomes flushable, which requires that REDO-UNDO has been + logged and all bitmap pages touched by the thread have a correct + allocation, it unpins all bitmap pages, and if _ma_bitmap_flush_all() is + waiting (in practice it is a checkpoint), it wakes it up. + If the bitmap becomes or stays unflushable, the function merely records it + unless a concurrent _ma_bitmap_flush_all() is happening, in which case the + function first waits for the flush to be done. + + @note + this sets info->non_flushable_state to 1 if we have incremented + bitmap->non_flushable and not yet decremented it. + + @param share Table's share + @param non_flushable_inc Increment of MARIA_FILE_BITMAP::non_flushable + (-1 or +1). +*/ + +void _ma_bitmap_flushable(MARIA_HA *info, int non_flushable_inc) +{ + MARIA_SHARE *share= info->s; + MARIA_FILE_BITMAP *bitmap; + DBUG_ENTER("_ma_bitmap_flushable"); + + /* + Not transactional tables are never automaticly flushed and needs no + protection + */ + if (!share->now_transactional) + DBUG_VOID_RETURN; + + bitmap= &share->bitmap; + mysql_mutex_lock(&bitmap->bitmap_lock); + + if (non_flushable_inc == -1) + { + DBUG_ASSERT((int) bitmap->non_flushable > 0); + DBUG_ASSERT(info->non_flushable_state == 1); + if (--bitmap->non_flushable == 0) + { + /* + We unlock and unpin pages locked and pinned by other threads. It does + not seem to be an issue as all bitmap changes are serialized with + the bitmap's mutex. + */ + _ma_bitmap_unpin_all(share); + if (unlikely(bitmap->waiting_for_non_flushable)) + { + DBUG_PRINT("info", ("bitmap flushable waking up flusher")); + mysql_cond_broadcast(&bitmap->bitmap_cond); + } + } + DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable)); + mysql_mutex_unlock(&bitmap->bitmap_lock); + info->non_flushable_state= 0; + DBUG_VOID_RETURN; + } + DBUG_ASSERT(non_flushable_inc == 1); + DBUG_ASSERT(info->non_flushable_state == 0); + + bitmap->waiting_for_flush_all_requested++; + while (unlikely(bitmap->flush_all_requested)) + { + /* + Some other thread is waiting for the bitmap to become + flushable. Not the moment to make the bitmap unflushable or more + unflushable; let's rather back off and wait. If we didn't do this, with + multiple writers, there may always be one thread causing the bitmap to + be unflushable and _ma_bitmap_flush_all() would wait for long. + There should not be a deadlock because if our thread increased + non_flushable (and thus _ma_bitmap_flush_all() is waiting for at least + our thread), it is not going to increase it more so is not going to come + here. + */ + DBUG_PRINT("info", ("waiting for bitmap flusher")); + mysql_cond_wait(&bitmap->bitmap_cond, &bitmap->bitmap_lock); + } + bitmap->waiting_for_flush_all_requested--; + bitmap->non_flushable++; + DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable)); + mysql_mutex_unlock(&bitmap->bitmap_lock); + info->non_flushable_state= 1; + DBUG_VOID_RETURN; +} + + +/* + Correct bitmap pages to reflect the true allocation + + SYNOPSIS + _ma_bitmap_release_unused() + info Maria handle + blocks Bitmap blocks + + IMPLEMENTATION + If block->used & BLOCKUSED_TAIL is set: + If block->used & BLOCKUSED_USED is set, then the bits for the + corresponding page is set according to block->empty_space + If block->used & BLOCKUSED_USED is not set, then the bits for + the corresponding page is set to org_bitmap_value; + + If block->used & BLOCKUSED_TAIL is not set: + if block->used is not set, the bits for the corresponding page are + cleared + + For the first block (head block) the logic is same as for a tail block + + Note that we may have 'filler blocks' that are used to split a block + in half; These can be recognized by that they have page_count == 0. + + This code also reverse the effect of ma_bitmap_flushable(.., 1); + + RETURN + 0 ok + 1 error (Couldn't write or read bitmap page) +*/ + +my_bool _ma_bitmap_release_unused(MARIA_HA *info, MARIA_BITMAP_BLOCKS *blocks) +{ + MARIA_BITMAP_BLOCK *block= blocks->block, *end= block + blocks->count; + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + uint bits, current_bitmap_value; + DBUG_ENTER("_ma_bitmap_release_unused"); + + /* + We can skip FULL_HEAD_PAGE (4) as the page was marked as 'full' + when we allocated space in the page + */ + current_bitmap_value= FULL_HEAD_PAGE; + + mysql_mutex_lock(&bitmap->bitmap_lock); + + /* First handle head block */ + if (block->used & BLOCKUSED_USED) + { + DBUG_PRINT("info", ("head page: %lu empty_space: %u", + (ulong) block->page, block->empty_space)); + bits= _ma_free_size_to_head_pattern(bitmap, block->empty_space); + if (block->used & BLOCKUSED_USE_ORG_BITMAP) + current_bitmap_value= block->org_bitmap_value; + } + else + bits= block->org_bitmap_value; + if (bits != current_bitmap_value) + { + if (set_page_bits(info, bitmap, block->page, bits)) + goto err; + } + else + { + DBUG_ASSERT(current_bitmap_value == + bitmap_get_page_bits(info, bitmap, block->page)); + } + + /* Handle all full pages and tail pages (for head page and blob) */ + for (block++; block < end; block++) + { + uint page_count; + if (!block->page_count) + continue; /* Skip 'filler blocks' */ + + page_count= block->page_count; + if (block->used & BLOCKUSED_TAIL) + { + current_bitmap_value= FULL_TAIL_PAGE; + /* The bitmap page is only one page */ + page_count= 1; + if (block->used & BLOCKUSED_USED) + { + DBUG_PRINT("info", ("tail page: %lu empty_space: %u", + (ulong) block->page, block->empty_space)); + bits= free_size_to_tail_pattern(bitmap, block->empty_space); + if (block->used & BLOCKUSED_USE_ORG_BITMAP) + current_bitmap_value= block->org_bitmap_value; + } + else + bits= block->org_bitmap_value; + + /* + The page has all bits set; The following test is an optimization + to not set the bits to the same value as before. + */ + DBUG_ASSERT(current_bitmap_value == + bitmap_get_page_bits(info, bitmap, block->page)); + + if (bits != current_bitmap_value) + { + if (set_page_bits(info, bitmap, block->page, bits)) + goto err; + } + } + else if (!(block->used & BLOCKUSED_USED) && + _ma_bitmap_reset_full_page_bits(info, bitmap, + block->page, page_count)) + goto err; + } + + /* This duplicates ma_bitmap_flushable(-1) except it already has mutex */ + if (info->non_flushable_state) + { + DBUG_ASSERT(((int) (bitmap->non_flushable)) > 0); + info->non_flushable_state= 0; + if (--bitmap->non_flushable == 0) + { + _ma_bitmap_unpin_all(info->s); + if (unlikely(bitmap->waiting_for_non_flushable)) + { + DBUG_PRINT("info", ("bitmap flushable waking up flusher")); + mysql_cond_broadcast(&bitmap->bitmap_cond); + } + } + } + DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable)); + + mysql_mutex_unlock(&bitmap->bitmap_lock); + DBUG_RETURN(0); + +err: + mysql_mutex_unlock(&bitmap->bitmap_lock); + DBUG_RETURN(1); +} + + +/* + Free full pages from bitmap and pagecache + + SYNOPSIS + _ma_bitmap_free_full_pages() + info Maria handle + extents Extents (as stored on disk) + count Number of extents + + IMPLEMENTATION + Mark all full pages (not tails) from extents as free, both in bitmap + and page cache. + + RETURN + 0 ok + 1 error (Couldn't write or read bitmap page) +*/ + +my_bool _ma_bitmap_free_full_pages(MARIA_HA *info, const uchar *extents, + uint count) +{ + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + my_bool res; + DBUG_ENTER("_ma_bitmap_free_full_pages"); + + for (; count--; extents+= ROW_EXTENT_SIZE) + { + pgcache_page_no_t page= uint5korr(extents); + uint page_count= (uint2korr(extents + ROW_EXTENT_PAGE_SIZE) & + ~START_EXTENT_BIT); + if (!(page_count & TAIL_BIT)) + { + if (page == 0 && page_count == 0) + continue; /* Not used extent */ + if (pagecache_delete_pages(info->s->pagecache, &info->dfile, page, + page_count, PAGECACHE_LOCK_WRITE, 1)) + DBUG_RETURN(1); + mysql_mutex_lock(&bitmap->bitmap_lock); + res= _ma_bitmap_reset_full_page_bits(info, bitmap, page, page_count); + mysql_mutex_unlock(&bitmap->bitmap_lock); + if (res) + DBUG_RETURN(1); + } + } + DBUG_RETURN(0); +} + + +/* + Mark in the bitmap how much free space there is on a page + + SYNOPSIS + _ma_bitmap_set() + info Maria handler + page Adress to page + head 1 if page is a head page, 0 if tail page + empty_space How much empty space there is on page + + RETURN + 0 ok + 1 error +*/ + +my_bool _ma_bitmap_set(MARIA_HA *info, pgcache_page_no_t page, my_bool head, + uint empty_space) +{ + MARIA_FILE_BITMAP *bitmap= &info->s->bitmap; + uint bits; + my_bool res; + DBUG_ENTER("_ma_bitmap_set"); + DBUG_PRINT("enter", ("page: %lu head: %d empty_space: %u", + (ulong) page, head, empty_space)); + + mysql_mutex_lock(&info->s->bitmap.bitmap_lock); + bits= (head ? + _ma_free_size_to_head_pattern(bitmap, empty_space) : + free_size_to_tail_pattern(bitmap, empty_space)); + res= set_page_bits(info, bitmap, page, bits); + mysql_mutex_unlock(&info->s->bitmap.bitmap_lock); + DBUG_RETURN(res); +} + + +/* + Check that bitmap pattern is correct for a page + + NOTES + Used in maria_chk + + SYNOPSIS + _ma_check_bitmap_data() + info Maria handler + page_type What kind of page this is + page Adress to page + empty_space Empty space on page + bitmap_pattern Bitmap pattern for page (from bitmap) + + RETURN + 0 ok + 1 error +*/ + +my_bool _ma_check_bitmap_data(MARIA_HA *info, enum en_page_type page_type, + uint empty_space, uint bitmap_pattern) +{ + uint bits; + switch (page_type) { + case UNALLOCATED_PAGE: + case MAX_PAGE_TYPE: + bits= 0; + break; + case HEAD_PAGE: + bits= _ma_free_size_to_head_pattern(&info->s->bitmap, empty_space); + break; + case TAIL_PAGE: + bits= free_size_to_tail_pattern(&info->s->bitmap, empty_space); + break; + case BLOB_PAGE: + bits= FULL_TAIL_PAGE; + break; + default: + bits= 0; /* to satisfy compiler */ + DBUG_ASSERT(0); + } + return (bitmap_pattern != bits); +} + +/** + Check that bitmap looks correct + + - All data before full_head_size and full_tail_size are allocated + - There is no allocated data after used_size + All of the above need to be correct only according to 6 byte + alignment as all loops reads 6 bytes at a time and we check both + start and end position according to the current 6 byte position. +*/ + +#ifndef DBUG_OFF +static void _ma_check_bitmap(MARIA_FILE_BITMAP *bitmap) +{ + uchar *data= bitmap->map; + uchar *end= bitmap->map + bitmap->total_size; + uchar *full_head_end=0, *full_tail_end=0, *first_empty= bitmap->map; + + for (; data < end; data+= 6) + { + ulonglong bits= uint6korr(data); /* 6 bytes = 6*8/3= 16 patterns */ + uint i; + + if (bits == 04444444444444444LL || bits == 0xffffffffffffLL) + { + first_empty= data + 6; + continue; /* block fully used */ + } + if (bits == 0) + { + if (!full_head_end) + full_head_end= data; + if (!full_tail_end) + full_tail_end= data; + continue; + } + + first_empty= data + 6; + if (!full_head_end || !full_tail_end) + { + for (i= 0, bits >>= 0; i < 16 ; i++, bits >>= 3) + { + uint pattern= (uint) (bits & 7); + if (pattern == FULL_HEAD_PAGE || pattern == FULL_TAIL_PAGE) + continue; + + if (pattern < 4 && !full_head_end) + full_head_end= data; + if ((pattern == 0 || (pattern > 4 && pattern < 7)) && !full_tail_end) + full_tail_end= data; + } + } + } + if (!full_head_end) + full_head_end= data; + if (!full_tail_end) + full_tail_end= data; + + /* used_size must point after the last byte that had some data) */ + DBUG_ASSERT(bitmap->used_size <= bitmap->total_size); + DBUG_ASSERT((bitmap->map + (bitmap->used_size+5)/6*6) >= first_empty); + /* full_xxxx_size can't point after the first block that has free data */ + DBUG_ASSERT((bitmap->map + (bitmap->full_head_size/6*6)) <= full_head_end); + DBUG_ASSERT((bitmap->map + (bitmap->full_tail_size/6*6)) <= full_tail_end); +} +#endif + + +/* + Check if the page type matches the one that we have in the bitmap + + SYNOPSIS + _ma_check_if_right_bitmap_type() + info Maria handler + page_type What kind of page this is + page Adress to page + bitmap_pattern Store here the pattern that was in the bitmap for the + page. This is always updated. + + NOTES + Used in maria_chk + + RETURN + 0 ok + 1 error +*/ + +my_bool _ma_check_if_right_bitmap_type(MARIA_HA *info, + enum en_page_type page_type, + pgcache_page_no_t page, + uint *bitmap_pattern) +{ + if ((*bitmap_pattern= _ma_bitmap_get_page_bits(info, &info->s->bitmap, + page)) > 7) + return 1; /* Couldn't read page */ + switch (page_type) { + case HEAD_PAGE: + return *bitmap_pattern < 1 || *bitmap_pattern > 4; + case TAIL_PAGE: + return *bitmap_pattern < 5; + case BLOB_PAGE: + return *bitmap_pattern != 7; + default: + break; + } + DBUG_ASSERT(0); + return 1; +} + + +/** + @brief create the first bitmap page of a freshly created data file + + @param share table's share + + @return Operation status + @retval 0 OK + @retval !=0 Error +*/ + +int _ma_bitmap_create_first(MARIA_SHARE *share) +{ + uint block_size= share->bitmap.block_size; + File file= share->bitmap.file.file; + uchar marker[CRC_SIZE]; + + /* + Next write operation of the page will write correct CRC + if it is needed + */ + int4store(marker, MARIA_NO_CRC_BITMAP_PAGE); + + if (mysql_file_chsize(file, block_size - sizeof(marker), + 0, MYF(MY_WME)) || + my_pwrite(file, marker, sizeof(marker), + block_size - sizeof(marker), + MYF(MY_NABP | MY_WME))) + return 1; + share->state.state.data_file_length= block_size; + _ma_bitmap_delete_all(share); + return 0; +} + + +/** + @brief Pagecache callback to get the TRANSLOG_ADDRESS to flush up to, when a + bitmap page needs to be flushed. + + @param page Page's content + @param page_no Page's number (<offset>/<page length>) + @param data_ptr Callback data pointer (pointer to MARIA_SHARE) + + @retval TRANSLOG_ADDRESS to flush up to. +*/ + +static my_bool +flush_log_for_bitmap(PAGECACHE_IO_HOOK_ARGS *args __attribute__ ((unused))) +{ +#ifdef DBUG_ASSERT_EXISTS + const MARIA_SHARE *share= (MARIA_SHARE*)args->data; +#endif + DBUG_ENTER("flush_log_for_bitmap"); + DBUG_ASSERT(share->now_transactional); + /* + WAL imposes that UNDOs reach disk before bitmap is flushed. We don't know + the LSN of the last UNDO about this bitmap page, so we flush whole log. + */ + DBUG_RETURN(translog_flush(translog_get_horizon())); +} + + +/** + @brief Set callbacks for bitmap pages + + @note + We don't use pagecache_file_init here, as we want to keep the + code readable +*/ + +void _ma_bitmap_set_pagecache_callbacks(PAGECACHE_FILE *file, + MARIA_SHARE *share) +{ + pagecache_file_set_null_hooks(file); + file->callback_data= (uchar*) share; + file->flush_log_callback= maria_flush_log_for_page_none; + file->post_write_hook= maria_page_write_failure; + + if (share->temporary) + { + file->post_read_hook= &maria_page_crc_check_none; + file->pre_write_hook= &maria_page_filler_set_none; + } + else + { + file->post_read_hook= &maria_page_crc_check_bitmap; + if (share->options & HA_OPTION_PAGE_CHECKSUM) + file->pre_write_hook= &maria_page_crc_set_normal; + else + file->pre_write_hook= &maria_page_filler_set_bitmap; + if (share->now_transactional) + file->flush_log_callback= flush_log_for_bitmap; + } +} + + +/** + Extends data file with zeroes and creates new bitmap pages into page cache. + + Writes all bitmap pages in [from, to]. + + Non-bitmap pages of zeroes are correct as they are marked empty in + bitmaps. Bitmap pages will not be zeroes: they will get their CRC fixed when + flushed. And if there is a crash before flush (so they are zeroes at + restart), a REDO will re-create them in page cache. +*/ + +static my_bool +_ma_bitmap_create_missing_into_pagecache(MARIA_SHARE *share, + MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t from, + pgcache_page_no_t to, + uchar *zeroes) +{ + pgcache_page_no_t i; + /* + We do not use my_chsize() because there can be a race between when it + reads the physical size and when it writes (assume data_file_length is 10, + physical length is 8 and two data pages are in cache, and here we do a + my_chsize: my_chsize sees physical length is 8, then the two data pages go + to disk then my_chsize writes from page 8 and so overwrites the two data + pages, wrongly). + We instead rely on the filesystem filling gaps with zeroes. + */ + for (i= from; i <= to; i+= bitmap->pages_covered) + { + /** + No need to keep them pinned, they are new so flushable. + @todo but we may want to keep them pinned, as an optimization: if they + are not pinned they may go to disk before the data pages go (so, the + physical pages would be in non-ascending "sparse" order on disk), or the + filesystem may fill gaps with zeroes physically which is a waste of + time. + */ + if (pagecache_write(share->pagecache, + &bitmap->file, i, 0, + zeroes, PAGECACHE_PLAIN_PAGE, + PAGECACHE_LOCK_LEFT_UNLOCKED, + PAGECACHE_PIN_LEFT_UNPINNED, + PAGECACHE_WRITE_DELAY, 0, LSN_IMPOSSIBLE)) + goto err; + } + /* + Data pages after data_file_length are full of zeroes but that is allowed + as they are marked empty in the bitmap. + */ + return FALSE; +err: + _ma_set_fatal_error_with_share(share, my_errno); + return TRUE; +} + + +/** + Creates missing bitmaps when we extend the data file. + + At run-time, when we need a new bitmap page we come here; and only one bitmap + page at a time is created. + + In some recovery cases we insert at a large offset in the data file, way + beyond state.data_file_length, so can need to create more than one bitmap + page in one go. Known case is: + Start a transaction in Maria; + delete last row of very large table (with delete_row) + do a bulk insert + crash + Then UNDO_BULK_INSERT will truncate table files, and + UNDO_ROW_DELETE will want to put the row back to its original position, + extending the data file a lot: bitmap page*s* in the hole must be created, + or he table would look corrupted. + + We need to log REDOs for bitmap creation, consider: we apply a REDO for a + data page, which creates the first data page covered by a new bitmap + not yet created. If the data page is flushed but the bitmap page is not and + there is a crash, re-execution of the REDO will complain about the zeroed + bitmap page (see it as corruption). Thus a REDO is needed to re-create the + bitmap. + + @param info Maria handler + @param bitmap Bitmap handler + @param page Last bitmap page to create + + @note When this function is called this must be true: + ((page + 1) * bitmap->block_size > info->s->state.state.data_file_length) + +*/ + +static my_bool _ma_bitmap_create_missing(MARIA_HA *info, + MARIA_FILE_BITMAP *bitmap, + pgcache_page_no_t page) +{ + MARIA_SHARE *share= info->s; + uint block_size= bitmap->block_size; + pgcache_page_no_t from, to; + my_off_t data_file_length= share->state.state.data_file_length; + DBUG_ENTER("_ma_bitmap_create_missing"); + DBUG_PRINT("enter", ("page: %lld", (longlong) page)); + + /* First (in offset order) bitmap page to create */ + if (data_file_length < block_size) + goto err; /* corrupted, should have first bitmap page */ + if (page * block_size >= share->base.max_data_file_length) + { + my_errno= HA_ERR_RECORD_FILE_FULL; + goto err; + } + + from= (data_file_length / block_size - 1) / bitmap->pages_covered + 1; + from*= bitmap->pages_covered; + /* + page>=from because: + (page + 1) * bs > dfl, and page == k * pc so: + (k * pc + 1) * bs > dfl; k * pc + 1 > dfl / bs; k * pc > dfl / bs - 1 + k > (dfl / bs - 1) / pc; k >= (dfl / bs - 1) / pc + 1 + k * pc >= ((dfl / bs - 1) / pc + 1) * pc == from. + */ + DBUG_ASSERT(page >= from); + + if (share->now_transactional) + { + LSN lsn; + uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE * 2]; + LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; + page_store(log_data + FILEID_STORE_SIZE, from); + page_store(log_data + FILEID_STORE_SIZE + PAGE_STORE_SIZE, page); + log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; + log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); + /* + We don't use info->trn so that this REDO is always executed even though + the UNDO does not reach disk due to crash. This is also consistent with + the fact that the new bitmap pages are not pinned. + */ + if (translog_write_record(&lsn, LOGREC_REDO_BITMAP_NEW_PAGE, + &dummy_transaction_object, info, + (translog_size_t)sizeof(log_data), + TRANSLOG_INTERNAL_PARTS + 1, log_array, + log_data, NULL)) + goto err; + /* + No need to flush the log: the bitmap pages we are going to create will + flush it when they go to disk. + */ + } + + /* + Last bitmap page. It has special creation: will go to the page cache + only later as we are going to modify it very soon. + */ + bzero(bitmap->map, bitmap->block_size); + bitmap->used_size= bitmap->full_head_size= bitmap->full_tail_size= 0; + bitmap->changed=1; +#ifndef DBUG_OFF + /* + Make a copy of the page to be able to print out bitmap changes during + debugging + */ + memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size); +#endif + + /* Last bitmap page to create before 'page' */ + DBUG_ASSERT(page >= bitmap->pages_covered); + to= page - bitmap->pages_covered; + /* + In run-time situations, from>=to is always false, i.e. we always create + one bitmap at a time ('page'). + */ + if ((from <= to) && + _ma_bitmap_create_missing_into_pagecache(share, bitmap, from, to, + bitmap->map)) + goto err; + + share->state.state.data_file_length= (page + 1) * bitmap->block_size; + + DBUG_RETURN(FALSE); +err: + DBUG_RETURN(TRUE); +} + + +my_bool _ma_apply_redo_bitmap_new_page(MARIA_HA *info, + LSN lsn __attribute__ ((unused)), + const uchar *header) +{ + MARIA_SHARE *share= info->s; + MARIA_FILE_BITMAP *bitmap= &share->bitmap; + my_bool error; + pgcache_page_no_t from, to, min_from; + DBUG_ENTER("_ma_apply_redo_bitmap_new_page"); + + from= page_korr(header); + to= page_korr(header + PAGE_STORE_SIZE); + DBUG_PRINT("info", ("from: %lu to: %lu", (ulong)from, (ulong)to)); + if ((from > to) || + (from % bitmap->pages_covered) != 0 || + (to % bitmap->pages_covered) != 0) + { + error= TRUE; /* corrupted log record */ + goto err; + } + + min_from= (share->state.state.data_file_length / bitmap->block_size - 1) / + bitmap->pages_covered + 1; + min_from*= bitmap->pages_covered; + if (from < min_from) + { + DBUG_PRINT("info", ("overwrite bitmap pages from %lu", (ulong)min_from)); + /* + We have to overwrite. It could be that there was a bitmap page in + memory, covering a data page which went to disk, then crash: the + bitmap page is now full of zeros and is ==min_from, we have to overwrite + it with correct checksum. + */ + } + share->state.changed|= STATE_CHANGED; + bzero(info->buff, bitmap->block_size); + if (!(error= + _ma_bitmap_create_missing_into_pagecache(share, bitmap, from, to, + info->buff))) + share->state.state.data_file_length= (to + 1) * bitmap->block_size; + +err: + DBUG_RETURN(error); +} |