diff options
Diffstat (limited to 'storage/maria/unittest/ma_test_loghandler_multithread-t.c')
-rw-r--r-- | storage/maria/unittest/ma_test_loghandler_multithread-t.c | 557 |
1 files changed, 557 insertions, 0 deletions
diff --git a/storage/maria/unittest/ma_test_loghandler_multithread-t.c b/storage/maria/unittest/ma_test_loghandler_multithread-t.c new file mode 100644 index 00000000..cb4d2bc7 --- /dev/null +++ b/storage/maria/unittest/ma_test_loghandler_multithread-t.c @@ -0,0 +1,557 @@ +/* Copyright (C) 2006-2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */ + +#include "../maria_def.h" +#include <stdio.h> +#include <errno.h> +#include <tap.h> +#include "../trnman.h" + +extern my_bool maria_log_remove(const char *testdir); +extern char *create_tmpdir(const char *progname); + +#ifndef DBUG_OFF +static const char *default_dbug_option; +#endif + +#define PCACHE_SIZE (1024*1024*10) + +#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512) +/*#define LOG_FLAGS TRANSLOG_SECTOR_PROTECTION | TRANSLOG_PAGE_CRC */ +#define LOG_FLAGS 0 +/*#define LONG_BUFFER_SIZE (1024L*1024L*1024L + 1024L*1024L*512)*/ + +#ifdef MULTIFLUSH_TEST + +#define LONG_BUFFER_SZ (16384L) +#define MIN_REC_LENGTH 10 +#define SHOW_DIVIDER 20 +#define ITERATIONS 10000 +#define FLUSH_ITERATIONS 1000 +#define WRITERS 2 +#define FLUSHERS 10 + +#else + +#define LONG_BUFFER_SZ (512L*1024L*1024L) +#define MIN_REC_LENGTH 30 +#define SHOW_DIVIDER 10 +#define ITERATIONS 3 +#define FLUSH_ITERATIONS 0 +#define WRITERS 3 +#define FLUSHERS 0 + +#endif + +#define LONG_BUFFER_SIZE (LONG_BUFFER_SZ >> (skip_big_tests ? 4 : 0)) + +static uint number_of_writers= WRITERS; +static uint number_of_flushers= FLUSHERS; + +static pthread_cond_t COND_thread_count; +static pthread_mutex_t LOCK_thread_count; +static uint thread_count; + +static ulong lens[WRITERS][ITERATIONS]; +static LSN lsns1[WRITERS][ITERATIONS]; +static LSN lsns2[WRITERS][ITERATIONS]; +static uchar *long_buffer; + + +static LSN last_lsn; /* For test purposes the variable allow dirty read/write */ + +/* + Get pseudo-random length of the field in + limits [MIN_REC_LENGTH..LONG_BUFFER_SIZE] + + SYNOPSIS + get_len() + + RETURN + length - length >= 0 length <= LONG_BUFFER_SIZE +*/ + +static uint32 get_len() +{ + return MIN_REC_LENGTH + + (uint32)(((ulonglong)rand())* + (LONG_BUFFER_SIZE - MIN_REC_LENGTH - 1)/RAND_MAX); +} + + +/* + Check that the buffer filled correctly + + SYNOPSIS + check_content() + ptr Pointer to the buffer + length length of the buffer + + RETURN + 0 - OK + 1 - Error +*/ + +static my_bool check_content(uchar *ptr, ulong length) +{ + ulong i; + for (i= 0; i < length; i++) + { + if (((uchar)ptr[i]) != (i & 0xFF)) + { + fprintf(stderr, "Byte # %lu is %x instead of %x", + i, (uint) ptr[i], (uint) (i & 0xFF)); + return 1; + } + } + return 0; +} + + +/* + Read whole record content, and check content (put with offset) + + SYNOPSIS + read_and_check_content() + rec The record header buffer + buffer The buffer to read the record in + skip Skip this number of bytes ot the record content + + RETURN + 0 - OK + 1 - Error +*/ + + +static my_bool read_and_check_content(TRANSLOG_HEADER_BUFFER *rec, + uchar *buffer, uint skip) +{ + int res= 0; + translog_size_t len; + + if ((len= translog_read_record(rec->lsn, 0, rec->record_length, + buffer, NULL)) != rec->record_length) + { + fprintf(stderr, "Requested %lu byte, read %lu\n", + (ulong) rec->record_length, (ulong) len); + res= 1; + } + res|= check_content(buffer + skip, rec->record_length - skip); + return(res); +} + +void writer(int num) +{ + LSN lsn; + TRN trn; + uchar long_tr_id[6]; + uint i; + + trn.short_id= num; + trn.first_undo_lsn= TRANSACTION_LOGGED_LONG_ID; + for (i= 0; i < ITERATIONS; i++) + { + uint len= get_len(); + LEX_CUSTRING parts[TRANSLOG_INTERNAL_PARTS + 1]; + lens[num][i]= len; + + int2store(long_tr_id, num); + int4store(long_tr_id + 2, i); + parts[TRANSLOG_INTERNAL_PARTS + 0].str= long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; + if (translog_write_record(&lsn, + LOGREC_FIXED_RECORD_0LSN_EXAMPLE, + &trn, NULL, 6, TRANSLOG_INTERNAL_PARTS + 1, + parts, NULL, NULL)) + { + fprintf(stderr, "Can't write LOGREC_FIXED_RECORD_0LSN_EXAMPLE record #%lu " + "thread %i\n", (ulong) i, num); + translog_destroy(); + pthread_mutex_lock(&LOCK_thread_count); + ok(0, "write records"); + pthread_mutex_unlock(&LOCK_thread_count); + return; + } + lsns1[num][i]= lsn; + parts[TRANSLOG_INTERNAL_PARTS + 0].str= long_buffer; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= len; + if (translog_write_record(&lsn, + LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE, + &trn, NULL, + len, TRANSLOG_INTERNAL_PARTS + 1, + parts, NULL, NULL)) + { + fprintf(stderr, "Can't write variable record #%lu\n", (ulong) i); + translog_destroy(); + pthread_mutex_lock(&LOCK_thread_count); + ok(0, "write records"); + pthread_mutex_unlock(&LOCK_thread_count); + return; + } + lsns2[num][i]= lsn; + last_lsn= lsn; + pthread_mutex_lock(&LOCK_thread_count); + ok(1, "write records"); + pthread_mutex_unlock(&LOCK_thread_count); + } + return; +} + + +static void *test_thread_writer(void *arg) +{ + int param= *((int*) arg); + + my_thread_init(); + + writer(param); + + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + ok(1, "writer finished"); /* just to show progress */ + pthread_cond_signal(&COND_thread_count); /* Tell main we are + ready */ + pthread_mutex_unlock(&LOCK_thread_count); + free((uchar*) arg); + my_thread_end(); + return(0); +} + + +static void *test_thread_flusher(void *arg) +{ + int param= *((int*) arg); + int i; + + my_thread_init(); + + for(i= 0; i < FLUSH_ITERATIONS; i++) + { + translog_flush(last_lsn); + pthread_mutex_lock(&LOCK_thread_count); + ok(1, "-- flush %d", param); + pthread_mutex_unlock(&LOCK_thread_count); + } + + pthread_mutex_lock(&LOCK_thread_count); + thread_count--; + ok(1, "flusher finished"); /* just to show progress */ + pthread_cond_signal(&COND_thread_count); /* Tell main we are + ready */ + pthread_mutex_unlock(&LOCK_thread_count); + free((uchar*) arg); + my_thread_end(); + return(0); +} + + +int main(int argc __attribute__((unused)), + char **argv __attribute__ ((unused))) +{ + uint32 i; + PAGECACHE pagecache; + LSN first_lsn; + TRANSLOG_HEADER_BUFFER rec; + struct st_translog_scanner_data scanner; + pthread_t tid; + pthread_attr_t thr_attr; + int *param, error; + int rc; + MY_INIT(argv[0]); + + // plan read MYTAP_CONFIG so skip_big_tests will be set before using + plan(WRITERS + FLUSHERS + + ITERATIONS * WRITERS * 3 + FLUSH_ITERATIONS * FLUSHERS ); + /* We don't need to do physical syncs in this test */ + my_disable_sync= 1; + + bzero(&pagecache, sizeof(pagecache)); + maria_data_root= create_tmpdir(argv[0]); + if (maria_log_remove(0)) + exit(1); + + long_buffer= malloc(LONG_BUFFER_SIZE + 7 * 2 + 2); + if (long_buffer == 0) + { + fprintf(stderr, "End of memory\n"); + exit(1); + } + for (i= 0; i < (uint32)(LONG_BUFFER_SIZE + 7 * 2 + 2); i++) + long_buffer[i]= (i & 0xFF); + +#ifndef DBUG_OFF +#if defined(__WIN__) + default_dbug_option= "d:t:i:O,\\ma_test_loghandler.trace"; +#else + default_dbug_option= "d:t:i:o,/tmp/ma_test_loghandler.trace"; +#endif + if (argc > 1) + { + DBUG_SET(default_dbug_option); + DBUG_SET_INITIAL(default_dbug_option); + } +#endif + + + if ((error= pthread_cond_init(&COND_thread_count, NULL))) + { + fprintf(stderr, "COND_thread_count: %d from pthread_cond_init " + "(errno: %d)\n", error, errno); + exit(1); + } + if ((error= pthread_mutex_init(&LOCK_thread_count, MY_MUTEX_INIT_FAST))) + { + fprintf(stderr, "LOCK_thread_count: %d from pthread_cond_init " + "(errno: %d)\n", error, errno); + exit(1); + } + if ((error= pthread_attr_init(&thr_attr))) + { + fprintf(stderr, "Got error: %d from pthread_attr_init " + "(errno: %d)\n", error, errno); + exit(1); + } + if ((error= pthread_attr_setdetachstate(&thr_attr, PTHREAD_CREATE_DETACHED))) + { + fprintf(stderr, + "Got error: %d from pthread_attr_setdetachstate (errno: %d)\n", + error, errno); + exit(1); + } + +#ifdef HAVE_THR_SETCONCURRENCY + thr_setconcurrency(2); +#endif + + if (ma_control_file_open(TRUE, TRUE, TRUE)) + { + fprintf(stderr, "Can't init control file (%d)\n", errno); + exit(1); + } + if (init_pagecache(&pagecache, PCACHE_SIZE, 0, 0, + TRANSLOG_PAGE_SIZE, 0, 0) == 0) + { + fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno); + exit(1); + } + if (translog_init_with_table(maria_data_root, LOG_FILE_SIZE, 50112, 0, &pagecache, + LOG_FLAGS, 0, &translog_example_table_init, + 0)) + { + fprintf(stderr, "Can't init loghandler (%d)\n", errno); + exit(1); + } + /* Suppressing of automatic record writing */ + dummy_transaction_object.first_undo_lsn|= TRANSACTION_LOGGED_LONG_ID; + + srand(122334817L); + { + LEX_CUSTRING parts[TRANSLOG_INTERNAL_PARTS + 1]; + uchar long_tr_id[6]= + { + 0x11, 0x22, 0x33, 0x44, 0x55, 0x66 + }; + + parts[TRANSLOG_INTERNAL_PARTS + 0].str= long_tr_id; + parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6; + dummy_transaction_object.first_undo_lsn= TRANSACTION_LOGGED_LONG_ID; + if (translog_write_record(&first_lsn, + LOGREC_FIXED_RECORD_0LSN_EXAMPLE, + &dummy_transaction_object, NULL, 6, + TRANSLOG_INTERNAL_PARTS + 1, + parts, NULL, NULL)) + { + fprintf(stderr, "Can't write the first record\n"); + translog_destroy(); + exit(1); + } + } + + + pthread_mutex_lock(&LOCK_thread_count); + while (number_of_writers != 0 || number_of_flushers != 0) + { + if (number_of_writers) + { + param= (int*) malloc(sizeof(int)); + *param= number_of_writers - 1; + if ((error= pthread_create(&tid, &thr_attr, test_thread_writer, + (void*) param))) + { + fprintf(stderr, "Got error: %d from pthread_create (errno: %d)\n", + error, errno); + exit(1); + } + thread_count++; + number_of_writers--; + } + if (number_of_flushers) + { + param= (int*) malloc(sizeof(int)); + *param= number_of_flushers - 1; + if ((error= pthread_create(&tid, &thr_attr, test_thread_flusher, + (void*) param))) + { + fprintf(stderr, "Got error: %d from pthread_create (errno: %d)\n", + error, errno); + exit(1); + } + thread_count++; + number_of_flushers--; + } + } + pthread_mutex_unlock(&LOCK_thread_count); + + pthread_attr_destroy(&thr_attr); + + /* wait finishing */ + pthread_mutex_lock(&LOCK_thread_count); + while (thread_count) + { + if ((error= pthread_cond_wait(&COND_thread_count, &LOCK_thread_count))) + fprintf(stderr, "COND_thread_count: %d from pthread_cond_wait\n", error); + } + pthread_mutex_unlock(&LOCK_thread_count); + + /* Find last LSN and flush up to it (all our log) */ + { + LSN max= 0; + for (i= 0; i < WRITERS; i++) + { + if (cmp_translog_addr(lsns2[i][ITERATIONS - 1], max) > 0) + max= lsns2[i][ITERATIONS - 1]; + } + translog_flush(max); + } + + rc= 1; + + { + uint indeces[WRITERS]; + uint index, stage; + int len; + bzero(indeces, sizeof(uint) * WRITERS); + + bzero(indeces, sizeof(indeces)); + + if (translog_scanner_init(first_lsn, 1, &scanner, 0)) + { + fprintf(stderr, "scanner init failed\n"); + goto err; + } + for (i= 0;; i++) + { + len= translog_read_next_record_header(&scanner, &rec); + + if (len == RECHEADER_READ_ERROR) + { + fprintf(stderr, "1-%d translog_read_next_record_header failed (%d)\n", + i, errno); + translog_free_record_header(&rec); + goto err; + } + if (len == RECHEADER_READ_EOF) + { + if (i != WRITERS * ITERATIONS * 2) + { + fprintf(stderr, "EOL met at iteration %u instead of %u\n", + i, ITERATIONS * WRITERS * 2); + translog_free_record_header(&rec); + goto err; + } + break; + } + index= indeces[rec.short_trid] / 2; + stage= indeces[rec.short_trid] % 2; + if (stage == 0) + { + if (rec.type !=LOGREC_FIXED_RECORD_0LSN_EXAMPLE || + rec.record_length != 6 || + uint2korr(rec.header) != rec.short_trid || + index != uint4korr(rec.header + 2) || + cmp_translog_addr(lsns1[rec.short_trid][index], rec.lsn) != 0) + { + fprintf(stderr, "Incorrect LOGREC_FIXED_RECORD_0LSN_EXAMPLE " + "data read(%d)\n" + "type %u, strid %u %u, len %u, i: %u %u, " + "lsn" LSN_FMT " " LSN_FMT "\n", + i, (uint) rec.type, + (uint) rec.short_trid, (uint) uint2korr(rec.header), + (uint) rec.record_length, + (uint) index, (uint) uint4korr(rec.header + 2), + LSN_IN_PARTS(rec.lsn), + LSN_IN_PARTS(lsns1[rec.short_trid][index])); + translog_free_record_header(&rec); + goto err; + } + } + else + { + if (rec.type != LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE || + len != 9 || + rec.record_length != lens[rec.short_trid][index] || + cmp_translog_addr(lsns2[rec.short_trid][index], rec.lsn) != 0 || + check_content(rec.header, (uint)len)) + { + fprintf(stderr, + "Incorrect LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE " + "data read(%d) " + "thread: %d, iteration %d, stage %d\n" + "type %u (%d), len %d, length %lu %lu (%d) " + "lsn" LSN_FMT " " LSN_FMT "\n", + i, (uint) rec.short_trid, index, stage, + (uint) rec.type, (rec.type != + LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE), + len, + (ulong) rec.record_length, lens[rec.short_trid][index], + (rec.record_length != lens[rec.short_trid][index]), + LSN_IN_PARTS(rec.lsn), + LSN_IN_PARTS(lsns2[rec.short_trid][index])); + translog_free_record_header(&rec); + goto err; + } + if (read_and_check_content(&rec, long_buffer, 0)) + { + fprintf(stderr, + "Incorrect LOGREC_VARIABLE_RECORD_0LSN_EXAMPLE " + "in whole rec read lsn" LSN_FMT "\n", + LSN_IN_PARTS(rec.lsn)); + translog_free_record_header(&rec); + goto err; + } + } + ok(1, "record read"); + translog_free_record_header(&rec); + indeces[rec.short_trid]++; + } + } + + rc= 0; +err: + if (rc) + ok(0, "record read"); + translog_destroy(); + end_pagecache(&pagecache, 1); + ma_control_file_end(); + if (maria_log_remove(maria_data_root)) + exit(1); + + my_uuid_end(); + my_free_open_file_info(); + my_end(0); + return(exit_status()); +} + +#include "../ma_check_standalone.h" |