summaryrefslogtreecommitdiffstats
path: root/storage/maria/ma_create.c
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:00:34 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-04 18:00:34 +0000
commit3f619478f796eddbba6e39502fe941b285dd97b1 (patch)
treee2c7b5777f728320e5b5542b6213fd3591ba51e2 /storage/maria/ma_create.c
parentInitial commit. (diff)
downloadmariadb-upstream.tar.xz
mariadb-upstream.zip
Adding upstream version 1:10.11.6.upstream/1%10.11.6upstream
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'storage/maria/ma_create.c')
-rw-r--r--storage/maria/ma_create.c1526
1 files changed, 1526 insertions, 0 deletions
diff --git a/storage/maria/ma_create.c b/storage/maria/ma_create.c
new file mode 100644
index 00000000..7fd739d1
--- /dev/null
+++ b/storage/maria/ma_create.c
@@ -0,0 +1,1526 @@
+/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
+
+/* Create a MARIA table */
+
+#include "ma_ftdefs.h"
+#include "ma_sp_defs.h"
+#include <my_bit.h>
+#include "ma_blockrec.h"
+#include "trnman_public.h"
+#include "trnman.h"
+#include "ma_crypt.h"
+
+#ifdef _WIN32
+#include <fcntl.h>
+#endif
+#include <m_ctype.h>
+
+static int compare_columns(MARIA_COLUMNDEF **a, MARIA_COLUMNDEF **b);
+
+
+static ulonglong update_tot_length(ulonglong tot_length, ulonglong max_rows, uint length)
+{
+ ulonglong tot_length_part;
+
+ if (tot_length == ULONGLONG_MAX)
+ return ULONGLONG_MAX;
+
+ tot_length_part= (max_rows/(ulong) ((maria_block_size -
+ MAX_KEYPAGE_HEADER_SIZE - KEYPAGE_CHECKSUM_SIZE)/
+ (length*2)));
+ if (tot_length_part >= ULONGLONG_MAX / maria_block_size)
+ return ULONGLONG_MAX;
+
+ if (tot_length > ULONGLONG_MAX - tot_length_part * maria_block_size)
+ return ULONGLONG_MAX;
+
+ return tot_length + tot_length_part * maria_block_size;
+}
+
+
+/*
+ Old options is used when recreating database, from maria_chk
+*/
+
+int maria_create(const char *name, enum data_file_type datafile_type,
+ uint keys,MARIA_KEYDEF *keydefs,
+ uint columns, MARIA_COLUMNDEF *columndef,
+ uint uniques, MARIA_UNIQUEDEF *uniquedefs,
+ MARIA_CREATE_INFO *ci,uint flags)
+{
+ uint i,j;
+ File UNINIT_VAR(dfile), UNINIT_VAR(file);
+ int errpos,save_errno, create_mode= O_RDWR | O_TRUNC, res;
+ myf create_flag, common_flag= MY_WME, sync_dir= 0;
+ uint length,max_key_length,packed,pack_bytes,pointer,real_length_diff,
+ key_length,info_length,key_segs,options,min_key_length,
+ base_pos,long_varchar_count,
+ unique_key_parts,fulltext_keys,offset, not_block_record_extra_length;
+ uint max_field_lengths, extra_header_size, column_nr;
+ uint internal_table= flags & HA_CREATE_INTERNAL_TABLE;
+ ulong reclength, real_reclength,min_pack_length;
+ char kfilename[FN_REFLEN], klinkname[FN_REFLEN], *klinkname_ptr= 0;
+ char dfilename[FN_REFLEN], dlinkname[FN_REFLEN], *dlinkname_ptr= 0;
+ ulong pack_reclength;
+ ulonglong tot_length,max_rows, tmp;
+ enum en_fieldtype type;
+ enum data_file_type org_datafile_type= datafile_type;
+ MARIA_SHARE share;
+ TRN tmp_transaction_object;
+ MARIA_KEYDEF *keydef,tmp_keydef;
+ MARIA_UNIQUEDEF *uniquedef;
+ HA_KEYSEG *keyseg,tmp_keyseg;
+ MARIA_COLUMNDEF *column, *end_column;
+ double *rec_per_key_part;
+ ulong *nulls_per_key_part;
+ uint16 *column_array;
+ my_off_t key_root[HA_MAX_POSSIBLE_KEY], kfile_size_before_extension;
+ MARIA_CREATE_INFO tmp_create_info;
+ my_bool tmp_table= FALSE; /* cache for presence of HA_OPTION_TMP_TABLE */
+ my_bool forced_packed;
+ uchar *log_data= NULL;
+ my_bool encrypted= ci->encrypted && datafile_type == BLOCK_RECORD;
+ my_bool insert_order= MY_TEST(flags & HA_PRESERVE_INSERT_ORDER);
+ uint crypt_page_header_space= 0;
+ DBUG_ENTER("maria_create");
+ DBUG_PRINT("enter", ("keys: %u columns: %u uniques: %u flags: %u",
+ keys, columns, uniques, flags));
+
+ DBUG_ASSERT(maria_inited);
+
+ if (flags & HA_CREATE_TMP_TABLE)
+ common_flag|= MY_THREAD_SPECIFIC;
+
+ if (!ci)
+ {
+ bzero((char*) &tmp_create_info,sizeof(tmp_create_info));
+ ci=&tmp_create_info;
+ }
+
+ if (keys + uniques > MARIA_MAX_KEY)
+ {
+ DBUG_RETURN(my_errno=HA_WRONG_CREATE_OPTION);
+ }
+ errpos=0;
+ options=0;
+ bzero((uchar*) &share,sizeof(share));
+
+ if (flags & HA_DONT_TOUCH_DATA)
+ {
+ /* We come here from recreate table */
+ org_datafile_type= ci->org_data_file_type;
+ if (!(ci->old_options & HA_OPTION_TEMP_COMPRESS_RECORD))
+ options= (ci->old_options &
+ (HA_OPTION_COMPRESS_RECORD | HA_OPTION_PACK_RECORD |
+ HA_OPTION_READ_ONLY_DATA | HA_OPTION_CHECKSUM |
+ HA_OPTION_TMP_TABLE | HA_OPTION_DELAY_KEY_WRITE |
+ HA_OPTION_LONG_BLOB_PTR | HA_OPTION_PAGE_CHECKSUM));
+ else
+ {
+ /* Uncompressing rows */
+ options= (ci->old_options &
+ (HA_OPTION_CHECKSUM | HA_OPTION_TMP_TABLE |
+ HA_OPTION_DELAY_KEY_WRITE | HA_OPTION_LONG_BLOB_PTR |
+ HA_OPTION_PAGE_CHECKSUM));
+ }
+ }
+ else
+ {
+ /* Transactional tables must be of type BLOCK_RECORD */
+ if (ci->transactional)
+ datafile_type= BLOCK_RECORD;
+ }
+
+ if (!(rec_per_key_part=
+ (double*) my_malloc(PSI_INSTRUMENT_ME,
+ (keys + uniques)*HA_MAX_KEY_SEG*sizeof(double) +
+ (keys + uniques)*HA_MAX_KEY_SEG*sizeof(ulong) +
+ sizeof(uint16) * columns,
+ MYF(common_flag | MY_ZEROFILL))))
+ DBUG_RETURN(my_errno);
+ nulls_per_key_part= (ulong*) (rec_per_key_part +
+ (keys + uniques) * HA_MAX_KEY_SEG);
+ column_array= (uint16*) (nulls_per_key_part +
+ (keys + uniques) * HA_MAX_KEY_SEG);
+
+
+ /* Start by checking fields and field-types used */
+ long_varchar_count=packed= not_block_record_extra_length=
+ pack_reclength= max_field_lengths= 0;
+ reclength= min_pack_length= ci->null_bytes;
+ forced_packed= 0;
+ column_nr= 0;
+
+ if (encrypted)
+ {
+ DBUG_ASSERT(datafile_type == BLOCK_RECORD);
+ crypt_page_header_space= ma_crypt_get_data_page_header_space();
+ }
+
+ for (column= columndef, end_column= column + columns ;
+ column != end_column ;
+ column++)
+ {
+ /* Fill in not used struct parts */
+ column->column_nr= column_nr++;
+ column->offset= reclength;
+ column->empty_pos= 0;
+ column->empty_bit= 0;
+ column->fill_length= column->length;
+ if (column->null_bit)
+ options|= HA_OPTION_NULL_FIELDS;
+
+ reclength+= column->length;
+ type= column->type;
+ if (datafile_type == BLOCK_RECORD)
+ {
+ if (type == FIELD_SKIP_PRESPACE)
+ type= column->type= FIELD_NORMAL; /* SKIP_PRESPACE not supported */
+ if (type == FIELD_NORMAL &&
+ column->length > FULL_PAGE_SIZE2(maria_block_size,
+ crypt_page_header_space))
+ {
+ /* FIELD_NORMAL can't be split over many blocks, convert to a CHAR */
+ type= column->type= FIELD_SKIP_ENDSPACE;
+ }
+ }
+
+ if (type != FIELD_NORMAL && type != FIELD_CHECK)
+ {
+ column->empty_pos= packed/8;
+ column->empty_bit= (1 << (packed & 7));
+ if (type == FIELD_BLOB)
+ {
+ forced_packed= 1;
+ packed++;
+ share.base.blobs++;
+ if (pack_reclength != INT_MAX32)
+ {
+ if (column->length == 4+portable_sizeof_char_ptr)
+ pack_reclength= INT_MAX32;
+ else
+ {
+ /* Add max possible blob length */
+ pack_reclength+= (1 << ((column->length-
+ portable_sizeof_char_ptr)*8));
+ }
+ }
+ max_field_lengths+= (column->length - portable_sizeof_char_ptr);
+ }
+ else if (type == FIELD_SKIP_PRESPACE ||
+ type == FIELD_SKIP_ENDSPACE)
+ {
+ forced_packed= 1;
+ max_field_lengths+= column->length > 255 ? 2 : 1;
+ not_block_record_extra_length++;
+ packed++;
+ }
+ else if (type == FIELD_VARCHAR)
+ {
+ pack_reclength++;
+ not_block_record_extra_length++;
+ max_field_lengths++;
+ if (datafile_type != DYNAMIC_RECORD)
+ packed++;
+ column->fill_length= 1;
+ options|= HA_OPTION_NULL_FIELDS; /* Use ma_checksum() */
+
+ /* We must test for 257 as length includes pack-length */
+ if (MY_TEST(column->length >= 257))
+ {
+ long_varchar_count++;
+ max_field_lengths++;
+ column->fill_length= 2;
+ }
+ }
+ else if (type == FIELD_SKIP_ZERO)
+ packed++;
+ else
+ {
+ if (!column->null_bit)
+ min_pack_length+= column->length;
+ else
+ {
+ /* Only BLOCK_RECORD skips NULL fields for all field values */
+ not_block_record_extra_length+= column->length;
+ }
+ column->empty_pos= 0;
+ column->empty_bit= 0;
+ }
+ }
+ else /* FIELD_NORMAL */
+ {
+ if (!column->null_bit)
+ {
+ min_pack_length+= column->length;
+ share.base.fixed_not_null_fields++;
+ share.base.fixed_not_null_fields_length+= column->length;
+ }
+ else
+ not_block_record_extra_length+= column->length;
+ }
+ }
+
+ if (datafile_type == STATIC_RECORD && forced_packed)
+ {
+ /* Can't use fixed length records, revert to block records */
+ datafile_type= BLOCK_RECORD;
+ }
+
+ if (datafile_type == NO_RECORD && uniques)
+ {
+ /* Can't do unique without data, revert to block records */
+ datafile_type= BLOCK_RECORD;
+ }
+
+ if (encrypted)
+ {
+ /*
+ datafile_type is set (finally?)
+ update encryption that is only supported for BLOCK_RECORD
+ */
+ if (datafile_type != BLOCK_RECORD)
+ {
+ encrypted= FALSE;
+ crypt_page_header_space= 0;
+ }
+ }
+
+ if (datafile_type == DYNAMIC_RECORD)
+ options|= HA_OPTION_PACK_RECORD; /* Must use packed records */
+
+ if (datafile_type == STATIC_RECORD || datafile_type == NO_RECORD)
+ {
+ /* We can't use checksum with static length rows */
+ flags&= ~HA_CREATE_CHECKSUM;
+ options&= ~HA_OPTION_CHECKSUM;
+ min_pack_length= reclength;
+ packed= 0;
+ }
+ else if (datafile_type != BLOCK_RECORD)
+ min_pack_length+= not_block_record_extra_length;
+ else
+ min_pack_length+= 5; /* Min row overhead */
+
+ if (flags & HA_CREATE_TMP_TABLE)
+ {
+ options|= HA_OPTION_TMP_TABLE;
+ tmp_table= TRUE;
+ create_mode|= O_NOFOLLOW | (internal_table ? 0 : O_EXCL);
+ /* "CREATE TEMPORARY" tables are not crash-safe (dropped at restart) */
+ ci->transactional= FALSE;
+ flags&= ~HA_CREATE_PAGE_CHECKSUM;
+ }
+ share.base.null_bytes= ci->null_bytes;
+ share.base.original_null_bytes= ci->null_bytes;
+ share.base.born_transactional= ci->transactional;
+ share.base.max_field_lengths= max_field_lengths;
+ share.base.field_offsets= 0; /* for future */
+ share.base.compression_algorithm= ci->compression_algorithm;
+ share.base.s3_block_size= ci->s3_block_size;
+
+ if (flags & HA_CREATE_CHECKSUM || (options & HA_OPTION_CHECKSUM))
+ {
+ options|= HA_OPTION_CHECKSUM;
+ min_pack_length++;
+ pack_reclength++;
+ }
+ if (pack_reclength < INT_MAX32)
+ pack_reclength+= max_field_lengths + long_varchar_count;
+ else
+ pack_reclength= INT_MAX32;
+
+ if (flags & HA_CREATE_DELAY_KEY_WRITE)
+ options|= HA_OPTION_DELAY_KEY_WRITE;
+ if (flags & HA_CREATE_RELIES_ON_SQL_LAYER)
+ options|= HA_OPTION_RELIES_ON_SQL_LAYER;
+ if (flags & HA_CREATE_PAGE_CHECKSUM)
+ options|= HA_OPTION_PAGE_CHECKSUM;
+
+ pack_bytes= (packed + 7) / 8;
+ if (pack_reclength != INT_MAX32)
+ pack_reclength+= reclength+pack_bytes +
+ MY_TEST(test_all_bits(options, HA_OPTION_CHECKSUM |
+ HA_OPTION_PACK_RECORD));
+ min_pack_length+= pack_bytes;
+ /* Calculate min possible row length for rows-in-block */
+ extra_header_size= MAX_FIXED_HEADER_SIZE;
+ if (ci->transactional)
+ {
+ extra_header_size= TRANS_MAX_FIXED_HEADER_SIZE;
+ DBUG_PRINT("info",("creating a transactional table"));
+ }
+ share.base.min_block_length= (extra_header_size + share.base.null_bytes +
+ pack_bytes);
+ if (!ci->data_file_length && ci->max_rows)
+ {
+ set_if_bigger(ci->max_rows, ci->reloc_rows);
+ if (pack_reclength == INT_MAX32 ||
+ (~(ulonglong) 0)/ci->max_rows < (ulonglong) pack_reclength)
+ ci->data_file_length= ~(ulonglong) 0;
+ else
+ {
+ ci->data_file_length= _ma_safe_mul(ci->max_rows, pack_reclength);
+ if (datafile_type == BLOCK_RECORD)
+ {
+ /* Assume that blocks are only half full (very pessimistic!) */
+ ci->data_file_length= _ma_safe_mul(ci->data_file_length, 2);
+ set_if_bigger(ci->data_file_length, maria_block_size*2);
+ }
+ }
+ }
+ else if (!ci->max_rows)
+ {
+ if (datafile_type == BLOCK_RECORD)
+ {
+ uint rows_per_page=
+ ((maria_block_size - PAGE_OVERHEAD_SIZE_RAW - crypt_page_header_space)
+ / (min_pack_length + extra_header_size + DIR_ENTRY_SIZE));
+ ulonglong data_file_length= ci->data_file_length;
+ if (!data_file_length)
+ data_file_length= ((((ulonglong) 1 << ((BLOCK_RECORD_POINTER_SIZE-1) *
+ 8))/2 -1) * maria_block_size);
+ if (rows_per_page > 0)
+ {
+ set_if_smaller(rows_per_page, MAX_ROWS_PER_PAGE);
+ ci->max_rows= (data_file_length / maria_block_size+1) * rows_per_page;
+ }
+ else
+ ci->max_rows= data_file_length / (min_pack_length +
+ extra_header_size +
+ DIR_ENTRY_SIZE);
+ }
+ else
+ ci->max_rows=(ha_rows) (ci->data_file_length/(min_pack_length +
+ ((options &
+ HA_OPTION_PACK_RECORD) ?
+ 3 : 0)));
+ set_if_smaller(ci->reloc_rows, ci->max_rows);
+ }
+ max_rows= (ulonglong) ci->max_rows;
+ if (datafile_type == BLOCK_RECORD)
+ {
+ /*
+ The + 1 is for record position withing page
+ The * 2 is because we need one bit for knowing if there is transid's
+ after the row pointer
+ */
+ pointer= maria_get_pointer_length((ci->data_file_length /
+ maria_block_size) * 2, 4) + 1;
+ set_if_smaller(pointer, BLOCK_RECORD_POINTER_SIZE);
+
+ if (!max_rows)
+ max_rows= (((((ulonglong) 1 << ((pointer-1)*8)) -1) * maria_block_size) /
+ min_pack_length / 2);
+ }
+ else
+ {
+ if (datafile_type == NO_RECORD)
+ pointer= 0;
+ else if (datafile_type != STATIC_RECORD)
+ pointer= maria_get_pointer_length(ci->data_file_length,
+ maria_data_pointer_size);
+ else
+ pointer= maria_get_pointer_length(ci->max_rows, maria_data_pointer_size);
+ if (!max_rows)
+ max_rows= ((((ulonglong) 1 << (pointer*8)) -1) / min_pack_length);
+ }
+
+ real_reclength=reclength;
+ if (datafile_type == STATIC_RECORD)
+ {
+ if (reclength <= pointer)
+ reclength=pointer+1; /* reserve place for delete link */
+ }
+ else
+ reclength+= long_varchar_count; /* We need space for varchar! */
+
+ max_key_length=0; tot_length=0 ; key_segs=0;
+ fulltext_keys=0;
+ share.state.rec_per_key_part= rec_per_key_part;
+ share.state.nulls_per_key_part= nulls_per_key_part;
+ share.state.key_root=key_root;
+ share.state.key_del= HA_OFFSET_ERROR;
+ if (uniques)
+ max_key_length= MARIA_UNIQUE_HASH_LENGTH + pointer;
+
+ for (i=0, keydef=keydefs ; i < keys ; i++ , keydef++)
+ {
+ share.state.key_root[i]= HA_OFFSET_ERROR;
+ length= real_length_diff= 0;
+ min_key_length= key_length= pointer;
+
+ if (keydef->key_alg == HA_KEY_ALG_RTREE)
+ keydef->flag|= HA_RTREE_INDEX; /* For easier tests */
+
+ if (keydef->flag & HA_SPATIAL)
+ {
+#ifdef HAVE_SPATIAL
+ /* BAR TODO to support 3D and more dimensions in the future */
+ uint sp_segs=SPDIMS*2;
+ keydef->flag=HA_SPATIAL;
+
+ if (flags & HA_DONT_TOUCH_DATA)
+ {
+ /*
+ Called by maria_chk - i.e. table structure was taken from
+ MYI file and SPATIAL key *does have* additional sp_segs keysegs.
+ keydef->seg here points right at the GEOMETRY segment,
+ so we only need to decrease keydef->keysegs.
+ (see maria_recreate_table() in _ma_check.c)
+ */
+ keydef->keysegs-=sp_segs-1;
+ }
+
+ for (j=0, keyseg=keydef->seg ; (int) j < keydef->keysegs ;
+ j++, keyseg++)
+ {
+ if (keyseg->type != HA_KEYTYPE_BINARY &&
+ keyseg->type != HA_KEYTYPE_VARBINARY1 &&
+ keyseg->type != HA_KEYTYPE_VARBINARY2)
+ {
+ my_errno=HA_WRONG_CREATE_OPTION;
+ goto err_no_lock;
+ }
+ }
+ keydef->keysegs+=sp_segs;
+ key_length+=SPLEN*sp_segs;
+ length++; /* At least one length uchar */
+ min_key_length++;
+#else
+ my_errno= HA_ERR_UNSUPPORTED;
+ goto err_no_lock;
+#endif /*HAVE_SPATIAL*/
+ }
+ else if (keydef->flag & HA_FULLTEXT)
+ {
+ keydef->flag=HA_FULLTEXT | HA_PACK_KEY | HA_VAR_LENGTH_KEY;
+ options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
+
+ for (j=0, keyseg=keydef->seg ; (int) j < keydef->keysegs ;
+ j++, keyseg++)
+ {
+ if (keyseg->type != HA_KEYTYPE_TEXT &&
+ keyseg->type != HA_KEYTYPE_VARTEXT1 &&
+ keyseg->type != HA_KEYTYPE_VARTEXT2)
+ {
+ my_errno=HA_WRONG_CREATE_OPTION;
+ goto err_no_lock;
+ }
+ if (!(keyseg->flag & HA_BLOB_PART) &&
+ (keyseg->type == HA_KEYTYPE_VARTEXT1 ||
+ keyseg->type == HA_KEYTYPE_VARTEXT2))
+ {
+ /* Make a flag that this is a VARCHAR */
+ keyseg->flag|= HA_VAR_LENGTH_PART;
+ /* Store in bit_start number of bytes used to pack the length */
+ keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1)?
+ 1 : 2);
+ }
+ }
+
+ fulltext_keys++;
+ key_length+= HA_FT_MAXBYTELEN+HA_FT_WLEN;
+ length++; /* At least one length uchar */
+ min_key_length+= 1 + HA_FT_WLEN;
+ real_length_diff=HA_FT_MAXBYTELEN-FT_MAX_WORD_LEN_FOR_SORT;
+ }
+ else
+ {
+ /* Test if prefix compression */
+ if (keydef->flag & HA_PACK_KEY)
+ {
+ /* Can't use space_compression on number keys */
+ if ((keydef->seg[0].flag & HA_SPACE_PACK) &&
+ keydef->seg[0].type == (int) HA_KEYTYPE_NUM)
+ keydef->seg[0].flag&= ~HA_SPACE_PACK;
+
+ /* Only use HA_PACK_KEY when first segment is a variable length key */
+ if (!(keydef->seg[0].flag & (HA_SPACE_PACK | HA_BLOB_PART |
+ HA_VAR_LENGTH_PART)))
+ {
+ /* pack relative to previous key */
+ keydef->flag&= ~HA_PACK_KEY;
+ keydef->flag|= HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY;
+ }
+ else
+ {
+ keydef->seg[0].flag|=HA_PACK_KEY; /* for easyer intern test */
+ keydef->flag|=HA_VAR_LENGTH_KEY;
+ options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
+ }
+ }
+ if (keydef->flag & HA_BINARY_PACK_KEY)
+ options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
+
+ if (keydef->flag & HA_AUTO_KEY && ci->with_auto_increment)
+ share.base.auto_key=i+1;
+ for (j=0, keyseg=keydef->seg ; j < keydef->keysegs ; j++, keyseg++)
+ {
+ /* numbers are stored with high by first to make compression easier */
+ switch (keyseg->type) {
+ case HA_KEYTYPE_SHORT_INT:
+ case HA_KEYTYPE_LONG_INT:
+ case HA_KEYTYPE_FLOAT:
+ case HA_KEYTYPE_DOUBLE:
+ case HA_KEYTYPE_USHORT_INT:
+ case HA_KEYTYPE_ULONG_INT:
+ case HA_KEYTYPE_LONGLONG:
+ case HA_KEYTYPE_ULONGLONG:
+ case HA_KEYTYPE_INT24:
+ case HA_KEYTYPE_UINT24:
+ case HA_KEYTYPE_INT8:
+ keyseg->flag|= HA_SWAP_KEY;
+ break;
+ case HA_KEYTYPE_VARTEXT1:
+ case HA_KEYTYPE_VARTEXT2:
+ case HA_KEYTYPE_VARBINARY1:
+ case HA_KEYTYPE_VARBINARY2:
+ if (!(keyseg->flag & HA_BLOB_PART))
+ {
+ /* Make a flag that this is a VARCHAR */
+ keyseg->flag|= HA_VAR_LENGTH_PART;
+ /* Store in bit_start number of bytes used to pack the length */
+ keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1 ||
+ keyseg->type == HA_KEYTYPE_VARBINARY1) ?
+ 1 : 2);
+ }
+ break;
+ default:
+ break;
+ }
+ if (keyseg->flag & HA_SPACE_PACK)
+ {
+ DBUG_ASSERT(!(keyseg->flag & (HA_VAR_LENGTH_PART | HA_BLOB_PART)));
+ keydef->flag |= HA_SPACE_PACK_USED | HA_VAR_LENGTH_KEY;
+ options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
+ length++; /* At least one length uchar */
+ if (!keyseg->null_bit)
+ min_key_length++;
+ key_length+= keyseg->length;
+ if (keyseg->length >= 255)
+ {
+ /* prefix may be 3 bytes */
+ length+= 2;
+ }
+ }
+ else if (keyseg->flag & (HA_VAR_LENGTH_PART | HA_BLOB_PART))
+ {
+ DBUG_ASSERT(!test_all_bits(keyseg->flag,
+ (HA_VAR_LENGTH_PART | HA_BLOB_PART)));
+ keydef->flag|=HA_VAR_LENGTH_KEY;
+ length++; /* At least one length uchar */
+ if (!keyseg->null_bit)
+ min_key_length++;
+ options|=HA_OPTION_PACK_KEYS; /* Using packed keys */
+ key_length+= keyseg->length;
+ if (keyseg->length >= 255)
+ {
+ /* prefix may be 3 bytes */
+ length+= 2;
+ }
+ }
+ else
+ {
+ key_length+= keyseg->length;
+ if (!keyseg->null_bit)
+ min_key_length+= keyseg->length;
+ }
+ if (keyseg->null_bit)
+ {
+ key_length++;
+ /* min key part is 1 byte */
+ min_key_length++;
+ options|=HA_OPTION_PACK_KEYS;
+ keyseg->flag|=HA_NULL_PART;
+ keydef->flag|=HA_VAR_LENGTH_KEY | HA_NULL_PART_KEY;
+ }
+ }
+ } /* if HA_FULLTEXT */
+ key_segs+=keydef->keysegs;
+ if (keydef->keysegs > HA_MAX_KEY_SEG)
+ {
+ my_errno=HA_WRONG_CREATE_OPTION;
+ goto err_no_lock;
+ }
+ /*
+ key_segs may be 0 in the case when we only want to be able to
+ add on row into the table. This can happen with some DISTINCT queries
+ in MySQL
+ */
+ if ((keydef->flag & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME &&
+ key_segs)
+ share.state.rec_per_key_part[key_segs-1]=1L;
+ length+=key_length;
+ /*
+ A key can't be longer than than half a index block (as we have
+ to be able to put at least 2 keys on an index block for the key
+ algorithms to work).
+ */
+ if (length > _ma_max_key_length())
+ {
+ my_errno=HA_WRONG_CREATE_OPTION;
+ goto err_no_lock;
+ }
+ keydef->block_length= (uint16) maria_block_size;
+ keydef->keylength= (uint16) key_length;
+ keydef->minlength= (uint16) min_key_length;
+ keydef->maxlength= (uint16) length;
+
+ if (length > max_key_length)
+ max_key_length= length;
+
+ tot_length= update_tot_length(tot_length, max_rows, length);
+ }
+
+ unique_key_parts=0;
+ for (i=0, uniquedef=uniquedefs ; i < uniques ; i++ , uniquedef++)
+ {
+ uniquedef->key=keys+i;
+ unique_key_parts+=uniquedef->keysegs;
+ share.state.key_root[keys+i]= HA_OFFSET_ERROR;
+
+ tot_length= update_tot_length(tot_length, max_rows, MARIA_UNIQUE_HASH_LENGTH + pointer);
+ }
+ keys+=uniques; /* Each unique has 1 key */
+ key_segs+=uniques; /* Each unique has 1 key seg */
+
+ base_pos=(MARIA_STATE_INFO_SIZE + keys * MARIA_STATE_KEY_SIZE +
+ key_segs * MARIA_STATE_KEYSEG_SIZE);
+ info_length= base_pos+(uint) (MARIA_BASE_INFO_SIZE+
+ keys * MARIA_KEYDEF_SIZE+
+ uniques * MARIA_UNIQUEDEF_SIZE +
+ (key_segs + unique_key_parts)*HA_KEYSEG_SIZE+
+ columns*(MARIA_COLUMNDEF_SIZE + 2));
+
+ if (encrypted)
+ {
+ share.base.extra_options|= MA_EXTRA_OPTIONS_ENCRYPTED;
+
+ /* store crypt data in info */
+ info_length+= ma_crypt_get_file_length();
+ }
+
+ if (insert_order)
+ {
+ share.base.extra_options|= MA_EXTRA_OPTIONS_INSERT_ORDER;
+ }
+
+ share.state.state.key_file_length= MY_ALIGN(info_length, maria_block_size);
+ DBUG_PRINT("info", ("info_length: %u", info_length));
+ /* There are only 16 bits for the total header length. */
+ if (share.state.state.key_file_length > 65535)
+ {
+ my_printf_error(HA_WRONG_CREATE_OPTION,
+ "Aria table '%s' has too many columns and/or "
+ "indexes and/or unique constraints.",
+ MYF(0), name + dirname_length(name));
+ my_errno= HA_WRONG_CREATE_OPTION;
+ goto err_no_lock;
+ }
+
+ bmove(share.state.header.file_version, maria_file_magic, 4);
+ ci->old_options=options | (ci->old_options & HA_OPTION_TEMP_COMPRESS_RECORD ?
+ HA_OPTION_COMPRESS_RECORD |
+ HA_OPTION_TEMP_COMPRESS_RECORD: 0);
+ mi_int2store(share.state.header.options,ci->old_options);
+ mi_int2store(share.state.header.header_length,info_length);
+ mi_int2store(share.state.header.state_info_length,MARIA_STATE_INFO_SIZE);
+ mi_int2store(share.state.header.base_info_length,MARIA_BASE_INFO_SIZE);
+ mi_int2store(share.state.header.base_pos,base_pos);
+ share.state.header.data_file_type= share.data_file_type= datafile_type;
+ share.state.header.org_data_file_type= org_datafile_type;
+ share.state.header.not_used= 0;
+
+ share.state.dellink = HA_OFFSET_ERROR;
+ share.state.first_bitmap_with_space= 0;
+#ifdef MARIA_EXTERNAL_LOCKING
+ share.state.process= (ulong) getpid();
+#endif
+ share.state.version= (ulong) time((time_t*) 0);
+ share.state.sortkey= (ushort) ~0;
+ share.state.auto_increment=ci->auto_increment;
+ share.options=options;
+ share.base.rec_reflength=pointer;
+ share.base.block_size= maria_block_size;
+ share.base.language= (ci->language ? ci->language :
+ default_charset_info->number);
+
+ /*
+ Get estimate for index file length (this may be wrong for FT keys)
+ This is used for pointers to other key pages.
+ */
+ tmp= (tot_length / maria_block_size + keys * MARIA_INDEX_BLOCK_MARGIN);
+
+ /*
+ use maximum of key_file_length we calculated and key_file_length value we
+ got from MAI file header (see also mariapack.c:save_state)
+ */
+ share.base.key_reflength=
+ maria_get_pointer_length(MY_MAX(ci->key_file_length,tmp),3);
+ share.base.keys= share.state.header.keys= keys;
+ share.state.header.uniques= uniques;
+ share.state.header.fulltext_keys= fulltext_keys;
+ mi_int2store(share.state.header.key_parts,key_segs);
+ mi_int2store(share.state.header.unique_key_parts,unique_key_parts);
+
+ maria_set_all_keys_active(share.state.key_map, keys);
+
+ share.base.keystart = share.state.state.key_file_length;
+ share.base.max_key_block_length= maria_block_size;
+ share.base.max_key_length=ALIGN_SIZE(max_key_length+4);
+ share.base.records=ci->max_rows;
+ share.base.reloc= ci->reloc_rows;
+ share.base.reclength=real_reclength;
+ share.base.pack_reclength= reclength + MY_TEST(options & HA_OPTION_CHECKSUM);
+ share.base.max_pack_length=pack_reclength;
+ share.base.min_pack_length=min_pack_length;
+ share.base.pack_bytes= pack_bytes;
+ share.base.fields= columns;
+ share.base.pack_fields= packed;
+
+ if (share.data_file_type == BLOCK_RECORD)
+ {
+ /*
+ we are going to create a first bitmap page, set data_file_length
+ to reflect this, before the state goes to disk
+ */
+ share.state.state.data_file_length= maria_block_size;
+ /* Add length of packed fields + length */
+ share.base.pack_reclength+= share.base.max_field_lengths+3;
+ share.base.max_pack_length= share.base.pack_reclength;
+
+ /* Adjust max_pack_length, to be used if we have short rows */
+ if (share.base.max_pack_length < maria_block_size)
+ {
+ share.base.max_pack_length+= FLAG_SIZE;
+ if (ci->transactional)
+ share.base.max_pack_length+= TRANSID_SIZE * 2;
+ }
+ }
+
+ /* max_data_file_length and max_key_file_length are recalculated on open */
+ if (tmp_table)
+ share.base.max_data_file_length= (my_off_t) ci->data_file_length;
+ else if (ci->transactional && translog_status == TRANSLOG_OK &&
+ !maria_in_recovery)
+ {
+ /*
+ we have checked translog_inited above, because maria_chk may call us
+ (via maria_recreate_table()) and it does not have a log.
+ */
+ sync_dir= MY_SYNC_DIR;
+ /*
+ If crash between _ma_state_info_write_sub() and
+ _ma_update_state__lsns_sub(), table should be ignored by Recovery (or
+ old REDOs would fail), so we cannot let LSNs be 0:
+ */
+ share.state.skip_redo_lsn= share.state.is_of_horizon=
+ share.state.create_rename_lsn= LSN_MAX;
+ /*
+ We have to mark the table as not movable as the table will contain the
+ maria_uuid and create_rename_lsn
+ */
+ share.state.changed|= STATE_NOT_MOVABLE;
+ }
+
+ if (datafile_type == DYNAMIC_RECORD)
+ {
+ share.base.min_block_length=
+ (share.base.pack_reclength+3 < MARIA_EXTEND_BLOCK_LENGTH &&
+ ! share.base.blobs) ?
+ MY_MAX(share.base.pack_reclength,MARIA_MIN_BLOCK_LENGTH) :
+ MARIA_EXTEND_BLOCK_LENGTH;
+ }
+ else if (datafile_type == STATIC_RECORD)
+ share.base.min_block_length= share.base.pack_reclength;
+
+ if (! (flags & HA_DONT_TOUCH_DATA))
+ share.state.create_time= time((time_t*) 0);
+
+ if (!internal_table)
+ mysql_mutex_lock(&THR_LOCK_maria);
+
+ /*
+ NOTE: For test_if_reopen() we need a real path name. Hence we need
+ MY_RETURN_REAL_PATH for every fn_format(filename, ...).
+ */
+ if (ci->index_file_name)
+ {
+ char *iext= strrchr(ci->index_file_name, '.');
+ int have_iext= iext && !strcmp(iext, MARIA_NAME_IEXT);
+ if (tmp_table)
+ {
+ char *path;
+ /* chop off the table name, tempory tables use generated name */
+ if ((path= strrchr(ci->index_file_name, FN_LIBCHAR)))
+ *path= '\0';
+ fn_format(kfilename, name, ci->index_file_name, MARIA_NAME_IEXT,
+ MY_REPLACE_DIR | MY_UNPACK_FILENAME |
+ MY_RETURN_REAL_PATH | MY_APPEND_EXT);
+ }
+ else
+ {
+ fn_format(kfilename, ci->index_file_name, "", MARIA_NAME_IEXT,
+ MY_UNPACK_FILENAME | MY_RETURN_REAL_PATH |
+ (have_iext ? MY_REPLACE_EXT : MY_APPEND_EXT));
+ }
+ fn_format(klinkname, name, "", MARIA_NAME_IEXT,
+ MY_UNPACK_FILENAME|MY_APPEND_EXT);
+ klinkname_ptr= klinkname;
+ /*
+ Don't create the table if the link or file exists to ensure that one
+ doesn't accidently destroy another table.
+ Don't sync dir now if the data file has the same path.
+ */
+ create_flag=
+ (ci->data_file_name &&
+ !strcmp(ci->index_file_name, ci->data_file_name)) ? 0 : sync_dir;
+ }
+ else
+ {
+ char *iext= strrchr(name, '.');
+ int have_iext= iext && !strcmp(iext, MARIA_NAME_IEXT);
+ fn_format(kfilename, name, "", MARIA_NAME_IEXT, MY_UNPACK_FILENAME |
+ (internal_table ? 0 : MY_RETURN_REAL_PATH) |
+ (have_iext ? MY_REPLACE_EXT : MY_APPEND_EXT));
+ klinkname_ptr= NullS;
+ /*
+ Replace the current file.
+ Don't sync dir now if the data file has the same path.
+ */
+ create_flag= (flags & HA_CREATE_KEEP_FILES) ? 0 : MY_DELETE_OLD;
+ create_flag|= (!ci->data_file_name ? 0 : sync_dir);
+ }
+
+ /*
+ If a MRG_MARIA table is in use, the mapped MARIA tables are open,
+ but no entry is made in the table cache for them.
+ A TRUNCATE command checks for the table in the cache only and could
+ be fooled to believe, the table is not open.
+ Pull the emergency brake in this situation. (Bug #8306)
+
+
+ NOTE: The filename is compared against unique_file_name of every
+ open table. Hence we need a real path here.
+ */
+ if (!internal_table && _ma_test_if_reopen(kfilename))
+ {
+ my_printf_error(HA_ERR_TABLE_EXIST, "Aria table '%s' is in use "
+ "(most likely by a MERGE table). Try FLUSH TABLES.",
+ MYF(0), name + dirname_length(name));
+ my_errno= HA_ERR_TABLE_EXIST;
+ goto err;
+ }
+
+ if ((file= mysql_file_create_with_symlink(key_file_kfile, klinkname_ptr,
+ kfilename, 0, create_mode,
+ MYF(common_flag|create_flag))) < 0)
+ goto err;
+ errpos=1;
+
+ DBUG_PRINT("info", ("write state info and base info"));
+ if (_ma_state_info_write_sub(file, &share.state,
+ MA_STATE_INFO_WRITE_FULL_INFO) ||
+ _ma_base_info_write(file, &share.base))
+ goto err;
+ DBUG_PRINT("info", ("base_pos: %d base_info_size: %d",
+ base_pos, MARIA_BASE_INFO_SIZE));
+ DBUG_ASSERT(mysql_file_tell(file,MYF(0)) == base_pos+ MARIA_BASE_INFO_SIZE);
+
+ /* Write key and keyseg definitions */
+ DBUG_PRINT("info", ("write key and keyseg definitions"));
+ for (i=0 ; i < share.base.keys - uniques; i++)
+ {
+ uint sp_segs=(keydefs[i].flag & HA_SPATIAL) ? 2*SPDIMS : 0;
+
+ if (_ma_keydef_write(file, &keydefs[i]))
+ goto err;
+ for (j=0 ; j < keydefs[i].keysegs-sp_segs ; j++)
+ if (_ma_keyseg_write(file, &keydefs[i].seg[j]))
+ goto err;
+#ifdef HAVE_SPATIAL
+ for (j=0 ; j < sp_segs ; j++)
+ {
+ HA_KEYSEG sseg;
+ sseg.type=SPTYPE;
+ sseg.language= 7; /* Binary */
+ sseg.null_bit=0;
+ sseg.bit_start=0;
+ sseg.bit_length= 0;
+ sseg.bit_pos= 0;
+ sseg.length=SPLEN;
+ sseg.null_pos=0;
+ sseg.start=j*SPLEN;
+ sseg.flag= HA_SWAP_KEY;
+ if (_ma_keyseg_write(file, &sseg))
+ goto err;
+ }
+#endif
+ }
+ /* Create extra keys for unique definitions */
+ offset= real_reclength - uniques*MARIA_UNIQUE_HASH_LENGTH;
+ bzero((char*) &tmp_keydef,sizeof(tmp_keydef));
+ bzero((char*) &tmp_keyseg,sizeof(tmp_keyseg));
+ for (i=0; i < uniques ; i++)
+ {
+ tmp_keydef.keysegs=1;
+ tmp_keydef.block_length= (uint16) maria_block_size;
+ tmp_keydef.keylength= MARIA_UNIQUE_HASH_LENGTH + pointer;
+ tmp_keydef.minlength=tmp_keydef.maxlength=tmp_keydef.keylength;
+ tmp_keyseg.type= MARIA_UNIQUE_HASH_TYPE;
+ tmp_keyseg.length= MARIA_UNIQUE_HASH_LENGTH;
+ tmp_keyseg.start= offset;
+ offset+= MARIA_UNIQUE_HASH_LENGTH;
+ if (_ma_keydef_write(file,&tmp_keydef) ||
+ _ma_keyseg_write(file,(&tmp_keyseg)))
+ goto err;
+ }
+
+ /* Save unique definition */
+ DBUG_PRINT("info", ("write unique definitions"));
+ for (i=0 ; i < share.state.header.uniques ; i++)
+ {
+ HA_KEYSEG *keyseg_end;
+ keyseg= uniquedefs[i].seg;
+ if (_ma_uniquedef_write(file, &uniquedefs[i]))
+ goto err;
+ for (keyseg= uniquedefs[i].seg, keyseg_end= keyseg+ uniquedefs[i].keysegs;
+ keyseg < keyseg_end;
+ keyseg++)
+ {
+ switch (keyseg->type) {
+ case HA_KEYTYPE_VARTEXT1:
+ case HA_KEYTYPE_VARTEXT2:
+ case HA_KEYTYPE_VARBINARY1:
+ case HA_KEYTYPE_VARBINARY2:
+ if (!(keyseg->flag & HA_BLOB_PART))
+ {
+ keyseg->flag|= HA_VAR_LENGTH_PART;
+ keyseg->bit_start= ((keyseg->type == HA_KEYTYPE_VARTEXT1 ||
+ keyseg->type == HA_KEYTYPE_VARBINARY1) ?
+ 1 : 2);
+ }
+ break;
+ default:
+ DBUG_ASSERT((keyseg->flag & HA_VAR_LENGTH_PART) == 0);
+ break;
+ }
+ if (_ma_keyseg_write(file, keyseg))
+ goto err;
+ }
+ }
+ DBUG_PRINT("info", ("write field definitions"));
+ if (datafile_type == BLOCK_RECORD)
+ {
+ /* Store columns in a more efficent order */
+ MARIA_COLUMNDEF **col_order, **pos;
+ if (!(col_order= (MARIA_COLUMNDEF**) my_malloc(PSI_INSTRUMENT_ME,
+ share.base.fields *
+ sizeof(MARIA_COLUMNDEF*),
+ common_flag)))
+ goto err;
+ for (column= columndef, pos= col_order ;
+ column != end_column ;
+ column++, pos++)
+ *pos= column;
+ qsort(col_order, share.base.fields, sizeof(*col_order),
+ (qsort_cmp) compare_columns);
+ for (i=0 ; i < share.base.fields ; i++)
+ {
+ column_array[col_order[i]->column_nr]= i;
+ if (_ma_columndef_write(file, col_order[i]))
+ {
+ my_free(col_order);
+ goto err;
+ }
+ }
+ my_free(col_order);
+ }
+ else
+ {
+ for (i=0 ; i < share.base.fields ; i++)
+ {
+ column_array[i]= (uint16) i;
+ if (_ma_columndef_write(file, &columndef[i]))
+ goto err;
+ }
+ }
+ if (_ma_column_nr_write(file, column_array, columns))
+ goto err;
+
+ if (encrypted)
+ {
+ DBUG_ASSERT(share.data_file_name.length == 0);
+ share.data_file_name.str= (char*) name; /* For error reporting */
+ if (ma_crypt_create(&share) ||
+ ma_crypt_write(&share, file))
+ goto err;
+ }
+
+ if ((kfile_size_before_extension= mysql_file_tell(file,MYF(0))) == MY_FILEPOS_ERROR)
+ goto err;
+#ifndef DBUG_OFF
+ if (kfile_size_before_extension != info_length)
+ DBUG_PRINT("warning",("info_length: %u != used_length: %u",
+ info_length, (uint)kfile_size_before_extension));
+#endif
+
+ if (sync_dir)
+ {
+ /*
+ we log the first bytes and then the size to which we extend; this is
+ a log of about 1 KB of mostly zeroes if this is a small table.
+ */
+ char empty_string[]= "";
+ LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 4];
+ translog_size_t total_rec_length= 0;
+ uint k;
+ LSN lsn;
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].length= 1 + 2 + 2 +
+ (uint) kfile_size_before_extension;
+ /* we are needing maybe 64 kB, so don't use the stack */
+ log_data= my_malloc(PSI_INSTRUMENT_ME,
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].length, MYF(0));
+ if ((log_data == NULL) ||
+ mysql_file_pread(file, 1 + 2 + 2 + log_data,
+ (size_t) kfile_size_before_extension, 0, MYF(MY_NABP)))
+ goto err;
+ /*
+ remember if the data file was created or not, to know if Recovery can
+ do it or not, in the future
+ */
+ log_data[0]= MY_TEST(flags & HA_DONT_TOUCH_DATA);
+ int2store(log_data + 1, kfile_size_before_extension);
+ int2store(log_data + 1 + 2, share.base.keystart);
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (uchar *)name;
+ /* we store the end-zero, for Recovery to just pass it to my_create() */
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].length= strlen(name) + 1;
+ log_array[TRANSLOG_INTERNAL_PARTS + 1].str= log_data;
+ /* symlink description is also needed for re-creation by Recovery: */
+ {
+ const char *s= ci->data_file_name ? ci->data_file_name : empty_string;
+ log_array[TRANSLOG_INTERNAL_PARTS + 2].str= (uchar*)s;
+ log_array[TRANSLOG_INTERNAL_PARTS + 2].length= strlen(s) + 1;
+ s= ci->index_file_name ? ci->index_file_name : empty_string;
+ log_array[TRANSLOG_INTERNAL_PARTS + 3].str= (uchar*)s;
+ log_array[TRANSLOG_INTERNAL_PARTS + 3].length= strlen(s) + 1;
+ }
+ for (k= TRANSLOG_INTERNAL_PARTS;
+ k < (sizeof(log_array)/sizeof(log_array[0])); k++)
+ total_rec_length+= (translog_size_t) log_array[k].length;
+ /**
+ For this record to be of any use for Recovery, we need the upper
+ MySQL layer to be crash-safe, which it is not now (that would require
+ work using the ddl_log of sql/sql_table.cc); when it is, we should
+ reconsider the moment of writing this log record (before or after op,
+ under THR_LOCK_maria or not...), how to use it in Recovery.
+ For now this record can serve when we apply logs to a backup,
+ so we sync it. This happens before the data file is created. If the
+ data file was created before, and we crashed before writing the log
+ record, at restart the table may be used, so we would not have a
+ trustable history in the log (impossible to apply this log to a
+ backup). The way we do it, if we crash before writing the log record
+ then there is no data file and the table cannot be used.
+ @todo Note that in case of TRUNCATE TABLE we also come here; for
+ Recovery to be able to finish TRUNCATE TABLE, instead of leaving a
+ half-truncated table, we should log the record at start of
+ maria_create(); for that we shouldn't write to the index file but to a
+ buffer (DYNAMIC_STRING), put the buffer into the record, then put the
+ buffer into the index file (so, change _ma_keydef_write() etc). That
+ would also enable Recovery to finish a CREATE TABLE. The final result
+ would be that we would be able to finish what the SQL layer has asked
+ for: it would be atomic.
+ When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not
+ called external_lock(), so have no TRN. It does not matter, as all
+ these operations are non-transactional and sync their files.
+ */
+ trnman_init_tmp_trn_for_logging_trid(&tmp_transaction_object);
+ if (unlikely(translog_write_record(&lsn,
+ LOGREC_REDO_CREATE_TABLE,
+ &tmp_transaction_object, NULL,
+ total_rec_length,
+ sizeof(log_array)/sizeof(log_array[0]),
+ log_array, NULL, NULL) ||
+ translog_flush(lsn)))
+ goto err;
+ share.kfile.file= file;
+ DBUG_EXECUTE_IF("maria_flush_whole_log",
+ {
+ DBUG_PRINT("maria_flush_whole_log", ("now"));
+ translog_flush(translog_get_horizon());
+ });
+ DBUG_EXECUTE_IF("maria_crash_create_table",
+ {
+ DBUG_PRINT("maria_crash_create_table", ("now"));
+ DBUG_SUICIDE();
+ });
+ /*
+ store LSN into file, needed for Recovery to not be confused if a
+ DROP+CREATE happened (applying REDOs to the wrong table).
+ */
+ if (_ma_update_state_lsns_sub(&share, lsn, tmp_transaction_object.trid,
+ FALSE, TRUE))
+ goto err;
+ my_free(log_data);
+ log_data= 0;
+ }
+
+ if (!(flags & HA_DONT_TOUCH_DATA))
+ {
+ if (ci->data_file_name)
+ {
+ char *dext= strrchr(ci->data_file_name, '.');
+ int have_dext= dext && !strcmp(dext, MARIA_NAME_DEXT);
+
+ if (tmp_table)
+ {
+ char *path;
+ /* chop off the table name, tempory tables use generated name */
+ if ((path= strrchr(ci->data_file_name, FN_LIBCHAR)))
+ *path= '\0';
+ fn_format(dfilename, name, ci->data_file_name, MARIA_NAME_DEXT,
+ MY_REPLACE_DIR | MY_UNPACK_FILENAME | MY_APPEND_EXT);
+ }
+ else
+ {
+ fn_format(dfilename, ci->data_file_name, "", MARIA_NAME_DEXT,
+ MY_UNPACK_FILENAME |
+ (have_dext ? MY_REPLACE_EXT : MY_APPEND_EXT));
+ }
+ fn_format(dlinkname, name, "",MARIA_NAME_DEXT,
+ MY_UNPACK_FILENAME | MY_APPEND_EXT);
+ dlinkname_ptr= dlinkname;
+ create_flag=0;
+ }
+ else
+ {
+ fn_format(dfilename,name,"", MARIA_NAME_DEXT,
+ MY_UNPACK_FILENAME | MY_APPEND_EXT);
+ create_flag= (flags & HA_CREATE_KEEP_FILES) ? 0 : MY_DELETE_OLD;
+ }
+ ma_debug_crash_here("storage_engine_middle_of_create");
+ if ((dfile=
+ mysql_file_create_with_symlink(key_file_dfile, dlinkname_ptr,
+ dfilename, 0, create_mode,
+ MYF(common_flag | create_flag | sync_dir))) < 0)
+ goto err;
+ errpos=3;
+
+ if (_ma_initialize_data_file(&share, dfile))
+ goto err;
+ }
+
+ /* Enlarge files */
+ DBUG_PRINT("info", ("enlarge to keystart: %lu",
+ (ulong) share.base.keystart));
+ if (mysql_file_chsize(file,(ulong) share.base.keystart,0,MYF(0)))
+ goto err;
+
+ if (!internal_table && sync_dir && mysql_file_sync(file, MYF(0)))
+ goto err;
+
+ if (! (flags & HA_DONT_TOUCH_DATA))
+ {
+#ifdef USE_RELOC
+ if (mysql_file_chsize(key_file_dfile, dfile,
+ share.base.min_pack_length*ci->reloc_rows,0,MYF(0)))
+ goto err;
+#endif
+ if (!internal_table && sync_dir && mysql_file_sync(dfile, MYF(0)))
+ goto err;
+ if (mysql_file_close(dfile,MYF(0)))
+ goto err;
+ }
+ if (!internal_table)
+ mysql_mutex_unlock(&THR_LOCK_maria);
+ res= 0;
+ my_free((char*) rec_per_key_part);
+ ma_crypt_free(&share);
+ errpos=0;
+ if (mysql_file_close(file,MYF(0)))
+ res= my_errno;
+ DBUG_RETURN(res);
+
+err:
+ if (!internal_table)
+ mysql_mutex_unlock(&THR_LOCK_maria);
+
+err_no_lock:
+ save_errno=my_errno;
+ switch (errpos) {
+ case 3:
+ mysql_file_close(dfile, MYF(0));
+ if (! (flags & HA_DONT_TOUCH_DATA))
+ {
+ mysql_file_delete(key_file_dfile, dfilename, MYF(sync_dir));
+ if (dlinkname_ptr)
+ mysql_file_delete(key_file_dfile, dlinkname_ptr, MYF(sync_dir));
+ }
+ /* fall through */
+ case 1:
+ mysql_file_close(file, MYF(0));
+ if (! (flags & HA_DONT_TOUCH_DATA))
+ {
+ mysql_file_delete(key_file_kfile, kfilename, MYF(sync_dir));
+ if (klinkname_ptr)
+ mysql_file_delete(key_file_kfile, klinkname_ptr, MYF(sync_dir));
+ }
+ }
+ ma_crypt_free(&share);
+ my_free(log_data);
+ my_free(rec_per_key_part);
+ DBUG_RETURN(my_errno=save_errno); /* return the fatal errno */
+}
+
+
+uint maria_get_pointer_length(ulonglong file_length, uint def)
+{
+ DBUG_ASSERT(def >= 2 && def <= 7);
+ if (file_length) /* If not default */
+ {
+#ifdef NOT_YET_READY_FOR_8_BYTE_POINTERS
+ if (file_length >= (1ULL << 56))
+ def=8;
+ else
+#endif
+ if (file_length >= (1ULL << 48))
+ def=7;
+ else if (file_length >= (1ULL << 40))
+ def=6;
+ else if (file_length >= (1ULL << 32))
+ def=5;
+ else if (file_length >= (1ULL << 24))
+ def=4;
+ else if (file_length >= (1ULL << 16))
+ def=3;
+ else
+ def=2;
+ }
+ return def;
+}
+
+
+/*
+ Sort columns for records-in-block
+
+ IMPLEMENTATION
+ Sort columns in following order:
+
+ Fixed size, not null columns
+ Fixed length, null fields
+ Numbers (zero fill fields)
+ Variable length fields (CHAR, VARCHAR) according to length
+ Blobs
+
+ For same kind of fields, keep fields in original order
+*/
+
+static inline int sign(long a)
+{
+ return a < 0 ? -1 : (a > 0 ? 1 : 0);
+}
+
+
+static int compare_columns(MARIA_COLUMNDEF **a_ptr, MARIA_COLUMNDEF **b_ptr)
+{
+ MARIA_COLUMNDEF *a= *a_ptr, *b= *b_ptr;
+ enum en_fieldtype a_type, b_type;
+
+ a_type= (a->type == FIELD_CHECK) ? FIELD_NORMAL : a->type;
+ b_type= (b->type == FIELD_CHECK) ? FIELD_NORMAL : b->type;
+
+ if (a_type == FIELD_NORMAL && !a->null_bit)
+ {
+ if (b_type != FIELD_NORMAL || b->null_bit)
+ return -1;
+ return sign((long) a->offset - (long) b->offset);
+ }
+ if (b_type == FIELD_NORMAL && !b->null_bit)
+ return 1;
+ if (a_type == b_type)
+ return sign((long) a->offset - (long) b->offset);
+ if (a_type == FIELD_NORMAL)
+ return -1;
+ if (b_type == FIELD_NORMAL)
+ return 1;
+ if (a_type == FIELD_SKIP_ZERO)
+ return -1;
+ if (b_type == FIELD_SKIP_ZERO)
+ return 1;
+ if (a->type != FIELD_BLOB && b->type != FIELD_BLOB)
+ if (a->length != b->length)
+ return sign((long) a->length - (long) b->length);
+ if (a_type == FIELD_BLOB)
+ return 1;
+ if (b_type == FIELD_BLOB)
+ return -1;
+ return sign((long) a->offset - (long) b->offset);
+}
+
+
+/**
+ @brief Initialize data file
+
+ @note
+ In BLOCK_RECORD, a freshly created datafile is one page long; while in
+ other formats it is 0-byte long.
+ */
+
+int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
+{
+ if (share->data_file_type == BLOCK_RECORD)
+ {
+ share->bitmap.block_size= share->base.block_size;
+ share->bitmap.file.file = dfile;
+ return _ma_bitmap_create_first(share);
+ }
+ return 0;
+}
+
+
+/**
+ @brief Writes create_rename_lsn, skip_redo_lsn and is_of_horizon to disk,
+ can force.
+
+ This is for special cases where:
+ - we don't want to write the full state to disk (so, not call
+ _ma_state_info_write()) because some parts of the state may be
+ currently inconsistent, or because it would be overkill
+ - we must sync these LSNs immediately for correctness.
+ It acquires intern_lock to protect the LSNs and state write.
+
+ @param share table's share
+ @param lsn LSN to write to log files
+ @param create_trid Trid to be used as state.create_trid
+ @param do_sync if the write should be forced to disk
+ @param update_create_rename_lsn if this LSN should be updated or not
+
+ @return Operation status
+ @retval 0 ok
+ @retval 1 error (disk problem)
+*/
+
+int _ma_update_state_lsns(MARIA_SHARE *share, LSN lsn, TrID create_trid,
+ my_bool do_sync, my_bool update_create_rename_lsn)
+{
+ int res;
+ DBUG_ENTER("_ma_update_state_lsns");
+ mysql_mutex_lock(&share->intern_lock);
+ res= _ma_update_state_lsns_sub(share, lsn, create_trid, do_sync,
+ update_create_rename_lsn);
+ mysql_mutex_unlock(&share->intern_lock);
+ DBUG_RETURN(res);
+}
+
+
+/**
+ @brief Writes create_rename_lsn, skip_redo_lsn and is_of_horizon to disk,
+ can force.
+
+ Shortcut of _ma_update_state_lsns() when we know that intern_lock is not
+ needed (when creating a table or opening it for the first time).
+
+ @param share table's share
+ @param lsn LSN to write to state; if LSN_IMPOSSIBLE, write
+ a LOGREC_IMPORTED_TABLE and use its LSN as lsn.
+ @param create_trid Trid to be used as state.create_trid
+ @param do_sync if the write should be forced to disk
+ @param update_create_rename_lsn if this LSN should be updated or not
+
+ @return Operation status
+ @retval 0 ok
+ @retval 1 error (disk problem)
+*/
+
+#if defined(_MSC_VER) && (_MSC_VER == 1310)
+/*
+ Visual Studio 2003 compiler produces internal compiler error
+ in this function. Disable optimizations to workaround.
+*/
+#pragma optimize("",off)
+#endif
+int _ma_update_state_lsns_sub(MARIA_SHARE *share, LSN lsn, TrID create_trid,
+ my_bool do_sync,
+ my_bool update_create_rename_lsn)
+{
+ uchar buf[LSN_STORE_SIZE * 3], *ptr;
+ uchar trid_buff[8];
+ File file= share->kfile.file;
+ DBUG_ENTER("_ma_update_state_lsns_sub");
+ DBUG_ASSERT(file >= 0);
+ CRASH_IF_S3_TABLE(share);
+
+ if (lsn == LSN_IMPOSSIBLE)
+ {
+ int res;
+ LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
+ /* table name is logged only for information */
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].str=
+ (uchar *)(share->open_file_name.str);
+ log_array[TRANSLOG_INTERNAL_PARTS + 0].length=
+ share->open_file_name.length + 1;
+ if ((res= translog_write_record(&lsn, LOGREC_IMPORTED_TABLE,
+ &dummy_transaction_object, NULL,
+ (translog_size_t)
+ log_array[TRANSLOG_INTERNAL_PARTS +
+ 0].length,
+ sizeof(log_array)/sizeof(log_array[0]),
+ log_array, NULL, NULL)))
+ DBUG_RETURN(res);
+ }
+
+ for (ptr= buf; ptr < (buf + sizeof(buf)); ptr+= LSN_STORE_SIZE)
+ lsn_store(ptr, lsn);
+ share->state.skip_redo_lsn= share->state.is_of_horizon= lsn;
+ share->state.create_trid= create_trid;
+ mi_int8store(trid_buff, create_trid);
+
+ /*
+ Update create_rename_lsn if update was requested or if the old one had an
+ impossible value.
+ */
+ if (update_create_rename_lsn ||
+ (share->state.create_rename_lsn > lsn && lsn != LSN_IMPOSSIBLE))
+ {
+ share->state.create_rename_lsn= lsn;
+ if (share->id != 0)
+ {
+ /*
+ If OP is the operation which is calling us, if table is later written,
+ we could see in the log:
+ FILE_ID ... REDO_OP ... REDO_INSERT.
+ (that can happen in real life at least with OP=REPAIR).
+ As FILE_ID will be ignored by Recovery because it is <
+ create_rename_lsn, REDO_INSERT would be ignored too, wrongly.
+ To avoid that, we force a LOGREC_FILE_ID to be logged at next write:
+ */
+ translog_deassign_id_from_share(share);
+ }
+ }
+ else
+ lsn_store(buf, share->state.create_rename_lsn);
+ DBUG_RETURN(my_pwrite(file, buf, sizeof(buf),
+ sizeof(share->state.header) +
+ MARIA_FILE_CREATE_RENAME_LSN_OFFSET, MYF(MY_NABP)) ||
+ my_pwrite(file, trid_buff, sizeof(trid_buff),
+ sizeof(share->state.header) +
+ MARIA_FILE_CREATE_TRID_OFFSET, MYF(MY_NABP)) ||
+ (do_sync && mysql_file_sync(file, MYF(0))));
+}
+#if defined(_MSC_VER) && (_MSC_VER == 1310)
+#pragma optimize("",on)
+#endif /*VS2003 compiler bug workaround*/