diff options
Diffstat (limited to 'storage/tokudb/PerconaFT/ft/loader/dbufio.cc')
-rw-r--r-- | storage/tokudb/PerconaFT/ft/loader/dbufio.cc | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/storage/tokudb/PerconaFT/ft/loader/dbufio.cc b/storage/tokudb/PerconaFT/ft/loader/dbufio.cc new file mode 100644 index 00000000..90f76cec --- /dev/null +++ b/storage/tokudb/PerconaFT/ft/loader/dbufio.cc @@ -0,0 +1,598 @@ +/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4: +#ident "$Id$" +/*====== +This file is part of PerconaFT. + + +Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved. + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License, version 2, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. + +---------------------------------------- + + PerconaFT is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License, version 3, + as published by the Free Software Foundation. + + PerconaFT is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with PerconaFT. If not, see <http://www.gnu.org/licenses/>. +======= */ + +#ident "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved." + +#include <my_global.h> +#include <errno.h> +#include <string.h> +#include <unistd.h> + +#include "portability/toku_assert.h" +#include "portability/memory.h" + +#include "ft/ft-internal.h" +#include "ft/serialize/ft_node-serialize.h" +#include "loader/dbufio.h" +#include "loader/loader-internal.h" + +toku_instr_key *bfs_mutex_key; +toku_instr_key *bfs_cond_key; +toku_instr_key *io_thread_key; + +struct dbufio_file { + // i/o thread owns these + int fd; + + // consumers own these + size_t offset_in_buf; + toku_off_t offset_in_uncompressed_file; + + // need the mutex to modify these + struct dbufio_file *next; + bool second_buf_ready; // if true, the i/o thread is not touching anything. + + // consumers own [0], i/o thread owns [1], they are swapped by the consumer only when the condition mutex is held and second_buf_ready is true. + char *buf[2]; + size_t n_in_buf[2]; + int error_code[2]; // includes errno or eof. [0] is the error code associated with buf[0], [1] is the code for buf[1] + + bool io_done; +}; + + +/* A dbufio_fileset */ +struct dbufio_fileset { + // The mutex/condition variables protect + // the singly-linked list of files that need I/O (head/tail in the fileset, and next in each file) + // in each file: + // the second_buf_ready boolean (which says the second buffer is full of data). + // the swapping of the buf[], n_in_buf[], and error_code[] values. + toku_mutex_t mutex; + toku_cond_t cond; + int N; // How many files. This is constant once established. + int n_not_done; // how many of the files require more I/O? Owned by the i/o thread. + struct dbufio_file *files; // an array of length N. + struct dbufio_file *head, *tail; // must have the mutex to fiddle with these. + size_t bufsize; // the bufsize is the constant (the same for all buffers). + + bool panic; + bool compressed; + int panic_errno; + toku_pthread_t iothread; +}; + + +static void enq (DBUFIO_FILESET bfs, struct dbufio_file *f) { + if (bfs->tail==NULL) { + bfs->head = f; + } else { + bfs->tail->next = f; + } + bfs->tail = f; + f->next = NULL; +} + +static void panic (DBUFIO_FILESET bfs, int r) { + if (bfs->panic) return; + bfs->panic_errno = r; // Don't really care about a race on this variable... Writes to it are atomic, so at least one good panic reason will be stored. + bfs->panic = true; + return; +} + +static bool paniced (DBUFIO_FILESET bfs) { + return bfs->panic; +} + +static ssize_t dbf_read_some_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) { + ssize_t ret; + invariant(bufsize >= MAX_UNCOMPRESSED_BUF); + unsigned char *raw_block = NULL; + + // deserialize the sub block header + + // total_size + // num_sub_blocks + // compressed_size,uncompressed_size,xsum (repeated num_sub_blocks times) + ssize_t readcode; + const uint32_t header_size = sizeof(uint32_t); + char header[header_size]; + + readcode = toku_os_read(dbf->fd, &header, header_size); + if (readcode < 0) { + ret = -1; + goto exit; + } + if (readcode == 0) { + ret = 0; + goto exit; + } + if (readcode < (ssize_t) header_size) { + errno = TOKUDB_NO_DATA; + ret = -1; + goto exit; + } + uint32_t total_size; + { + uint32_t *p = (uint32_t *) &header[0]; + total_size = toku_dtoh32(p[0]); + } + if (total_size == 0 || total_size > (1<<30)) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + + //Cannot use XMALLOC + MALLOC_N(total_size, raw_block); + if (raw_block == nullptr) { + errno = ENOMEM; + ret = -1; + goto exit; + } + readcode = toku_os_read(dbf->fd, raw_block, total_size); + if (readcode < 0) { + ret = -1; + goto exit; + } + if (readcode < (ssize_t) total_size) { + errno = TOKUDB_NO_DATA; + ret = -1; + goto exit; + } + + struct sub_block sub_block[max_sub_blocks]; + uint32_t *sub_block_header; + sub_block_header = (uint32_t *) &raw_block[0]; + int32_t n_sub_blocks; + n_sub_blocks = toku_dtoh32(sub_block_header[0]); + sub_block_header++; + size_t size_subblock_header; + size_subblock_header = sub_block_header_size(n_sub_blocks); + if (n_sub_blocks == 0 || n_sub_blocks > max_sub_blocks || size_subblock_header > total_size) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + for (int i = 0; i < n_sub_blocks; i++) { + sub_block_init(&sub_block[i]); + sub_block[i].compressed_size = toku_dtoh32(sub_block_header[0]); + sub_block[i].uncompressed_size = toku_dtoh32(sub_block_header[1]); + sub_block[i].xsum = toku_dtoh32(sub_block_header[2]); + sub_block_header += 3; + } + + // verify sub block sizes + size_t total_compressed_size; + total_compressed_size = 0; + for (int i = 0; i < n_sub_blocks; i++) { + uint32_t compressed_size = sub_block[i].compressed_size; + if (compressed_size<=0 || compressed_size>(1<<30)) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + + uint32_t uncompressed_size = sub_block[i].uncompressed_size; + if (uncompressed_size<=0 || uncompressed_size>(1<<30)) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + total_compressed_size += compressed_size; + } + if (total_size != total_compressed_size + size_subblock_header) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + + // sum up the uncompressed size of the sub blocks + size_t uncompressed_size; + uncompressed_size = get_sum_uncompressed_size(n_sub_blocks, sub_block); + if (uncompressed_size > bufsize || uncompressed_size > MAX_UNCOMPRESSED_BUF) { + errno = toku_db_badformat(); + ret = -1; + goto exit; + } + + unsigned char *uncompressed_data; + uncompressed_data = (unsigned char *)buf; + + // point at the start of the compressed data (past the node header, the sub block header, and the header checksum) + unsigned char *compressed_data; + compressed_data = raw_block + size_subblock_header; + + // decompress all the compressed sub blocks into the uncompressed buffer + { + int r; + r = decompress_all_sub_blocks(n_sub_blocks, sub_block, compressed_data, uncompressed_data, get_num_cores(), get_ft_pool()); + if (r != 0) { + fprintf(stderr, "%s:%d loader failed %d at %p size %" PRIu32"\n", __FUNCTION__, __LINE__, r, raw_block, total_size); + dump_bad_block(raw_block, total_size); + errno = r; + ret = -1; + goto exit; + } + } + ret = uncompressed_size; +exit: + if (raw_block) { + toku_free(raw_block); + } + return ret; +} + +static ssize_t dbf_read_compressed(struct dbufio_file *dbf, char *buf, size_t bufsize) { + invariant(bufsize >= MAX_UNCOMPRESSED_BUF); + size_t count = 0; + + while (count + MAX_UNCOMPRESSED_BUF <= bufsize) { + ssize_t readcode = dbf_read_some_compressed(dbf, buf + count, bufsize - count); + if (readcode < 0) { + return readcode; + } + count += readcode; + if (readcode == 0) { + break; + } + } + return count; +} + +static void* io_thread (void *v) +// The dbuf_thread does all the asynchronous I/O. +{ + DBUFIO_FILESET bfs = (DBUFIO_FILESET)v; + toku_mutex_lock(&bfs->mutex); + //printf("%s:%d Locked\n", __FILE__, __LINE__); + while (1) { + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + // printf("n_not_done=%d\n", bfs->n_not_done); + if (bfs->n_not_done == 0) { + // all done (meaning we stored EOF (or another error) in + // error_code[0] for the file. + // printf("unlocked\n"); + toku_mutex_unlock(&bfs->mutex); + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + + struct dbufio_file *dbf = bfs->head; + if (dbf == NULL) { + // No I/O needs to be done yet. + // Wait until something happens that will wake us up. + toku_cond_wait(&bfs->cond, &bfs->mutex); + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + // Have the lock so go around. + } else { + // Some I/O needs to be done. + // printf("%s:%d Need I/O\n", __FILE__, __LINE__); + assert(dbf->second_buf_ready == false); + assert(!dbf->io_done); + bfs->head = dbf->next; + if (bfs->head == NULL) + bfs->tail = NULL; + + // Unlock the mutex now that we have ownership of dbf to allow + // consumers to get the mutex and perform swaps. They won't swap + // this buffer because second_buf_ready is false. + toku_mutex_unlock(&bfs->mutex); + //printf("%s:%d Doing read fd=%d\n", __FILE__, __LINE__, dbf->fd); + { + ssize_t readcode; + if (bfs->compressed) { + readcode = dbf_read_compressed(dbf, dbf->buf[1], bfs->bufsize); + } + else { + readcode = toku_os_read(dbf->fd, dbf->buf[1], bfs->bufsize); + } + //printf("%s:%d readcode=%ld\n", __FILE__, __LINE__, readcode); + if (readcode==-1) { + // a real error. Save the real error. + int the_errno = get_error_errno(); + fprintf(stderr, "%s:%d dbf=%p fd=%d errno=%d\n", __FILE__, __LINE__, dbf, dbf->fd, the_errno); + dbf->error_code[1] = the_errno; + dbf->n_in_buf[1] = 0; + } else if (readcode==0) { + // End of file. Save it. + dbf->error_code[1] = EOF; + dbf->n_in_buf[1] = 0; + dbf->io_done = true; + + } else { + dbf->error_code[1] = 0; + dbf->n_in_buf[1] = readcode; + } + + //printf("%s:%d locking mutex again=%ld\n", __FILE__, __LINE__, readcode); + { + toku_mutex_lock(&bfs->mutex); + if (paniced(bfs)) { + toku_mutex_unlock(&bfs->mutex); // ignore any error + toku_instr_delete_current_thread(); + return toku_pthread_done(nullptr); + } + } + // Now that we have the mutex, we can decrement n_not_done (if + // applicable) and set second_buf_ready + if (readcode<=0) { + bfs->n_not_done--; + } + //printf("%s:%d n_not_done=%d\n", __FILE__, __LINE__, bfs->n_not_done); + dbf->second_buf_ready = true; + toku_cond_broadcast(&bfs->cond); + //printf("%s:%d did broadcast=%d\n", __FILE__, __LINE__, bfs->n_not_done); + // Still have the lock so go around the loop + } + } + } +} + +int create_dbufio_fileset (DBUFIO_FILESET *bfsp, int N, int fds[/*N*/], size_t bufsize, bool compressed) { + //printf("%s:%d here\n", __FILE__, __LINE__); + int result = 0; + DBUFIO_FILESET CALLOC(bfs); + if (bfs==0) { result = get_error_errno(); } + + bfs->compressed = compressed; + + bool mutex_inited = false, cond_inited = false; + if (result==0) { + CALLOC_N(N, bfs->files); + if (bfs->files==NULL) { result = get_error_errno(); } + else { + for (int i=0; i<N; i++) { + bfs->files[i].buf[0] = bfs->files[i].buf[1] = NULL; + } + } + } + // printf("%s:%d here\n", __FILE__, __LINE__); + if (result == 0) { + toku_mutex_init(*bfs_mutex_key, &bfs->mutex, nullptr); + mutex_inited = true; + } + if (result == 0) { + toku_cond_init(*bfs_cond_key, &bfs->cond, nullptr); + cond_inited = true; + } + if (result == 0) { + bfs->N = N; + bfs->n_not_done = N; + bfs->head = bfs->tail = NULL; + for (int i = 0; i < N; i++) { + bfs->files[i].fd = fds[i]; + bfs->files[i].offset_in_buf = 0; + bfs->files[i].offset_in_uncompressed_file = 0; + bfs->files[i].next = NULL; + bfs->files[i].second_buf_ready = false; + for (int j = 0; j < 2; j++) { + if (result == 0) { + MALLOC_N(bufsize, bfs->files[i].buf[j]); + if (bfs->files[i].buf[j] == NULL) { + result = get_error_errno(); + } + } + bfs->files[i].n_in_buf[j] = 0; + bfs->files[i].error_code[j] = 0; + } + bfs->files[i].io_done = false; + ssize_t r; + if (bfs->compressed) { + r = dbf_read_compressed(&bfs->files[i], bfs->files[i].buf[0], bufsize); + } else { + r = toku_os_read(bfs->files[i].fd, bfs->files[i].buf[0], bufsize); + } + { + if (r<0) { + result=get_error_errno(); + break; + } else if (r==0) { + // it's EOF + bfs->files[i].io_done = true; + bfs->n_not_done--; + bfs->files[i].error_code[0] = EOF; + } else { + bfs->files[i].n_in_buf[0] = r; + //printf("%s:%d enq [%d]\n", __FILE__, __LINE__, i); + enq(bfs, &bfs->files[i]); + } + } + } + bfs->bufsize = bufsize; + bfs->panic = false; + bfs->panic_errno = 0; + } + // printf("Creating IO thread\n"); + if (result == 0) { + result = toku_pthread_create(*io_thread_key, + &bfs->iothread, + nullptr, + io_thread, + static_cast<void *>(bfs)); + } + if (result == 0) { + *bfsp = bfs; + return 0; + } + // Now undo everything. + // If we got here, there is no thread (either result was zero before the + // thread was created, or else the thread creation itself failed. + if (bfs) { + if (bfs->files) { + // the files were allocated, so we have to free all the bufs. + for (int i=0; i<N; i++) { + for (int j=0; j<2; j++) { + if (bfs->files[i].buf[j]) + toku_free(bfs->files[i].buf[j]); + bfs->files[i].buf[j]=NULL; + } + } + toku_free(bfs->files); + bfs->files=NULL; + } + if (cond_inited) { + toku_cond_destroy(&bfs->cond); // don't check error status + } + if (mutex_inited) { + toku_mutex_destroy(&bfs->mutex); // don't check error status + } + toku_free(bfs); + } + return result; +} + +int panic_dbufio_fileset(DBUFIO_FILESET bfs, int error) { + toku_mutex_lock(&bfs->mutex); + panic(bfs, error); + toku_cond_broadcast(&bfs->cond); + toku_mutex_unlock(&bfs->mutex); + return 0; +} + +int destroy_dbufio_fileset (DBUFIO_FILESET bfs) { + int result = 0; + { + void *retval; + int r = toku_pthread_join(bfs->iothread, &retval); + assert(r==0); + assert(retval==NULL); + } + { + toku_mutex_destroy(&bfs->mutex); + } + { + toku_cond_destroy(&bfs->cond); + } + if (bfs->files) { + for (int i=0; i<bfs->N; i++) { + for (int j=0; j<2; j++) { + //printf("%s:%d free([%d][%d]=%p\n", __FILE__, __LINE__, i,j, bfs->files[i].buf[j]); + toku_free(bfs->files[i].buf[j]); + } + } + toku_free(bfs->files); + } + toku_free(bfs); + return result; +} + +int dbufio_fileset_read (DBUFIO_FILESET bfs, int filenum, void *buf_v, size_t count, size_t *n_read) { + char *buf = (char*)buf_v; + struct dbufio_file *dbf = &bfs->files[filenum]; + if (dbf->error_code[0]!=0) return dbf->error_code[0]; + if (dbf->offset_in_buf + count <= dbf->n_in_buf[0]) { + // Enough data is present to do it all now + memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, count); + dbf->offset_in_buf += count; + dbf->offset_in_uncompressed_file += count; + *n_read = count; + return 0; + } else if (dbf->n_in_buf[0] > dbf->offset_in_buf) { + // There is something in buf[0] + size_t this_count = dbf->n_in_buf[0]-dbf->offset_in_buf; + assert(dbf->offset_in_buf + this_count <= bfs->bufsize); + memcpy(buf, dbf->buf[0]+dbf->offset_in_buf, this_count); + dbf->offset_in_buf += this_count; + dbf->offset_in_uncompressed_file += this_count; + size_t sub_n_read; + int r = dbufio_fileset_read(bfs, filenum, buf+this_count, count-this_count, &sub_n_read); + if (r==0) { + *n_read = this_count + sub_n_read; + return 0; + } else { + // The error code will have been saved. We got some data so return that + *n_read = this_count; + return 0; + } + } else { + // There is nothing in buf[0]. So we need to swap buffers + toku_mutex_lock(&bfs->mutex); + while (1) { + if (dbf->second_buf_ready) { + dbf->n_in_buf[0] = dbf->n_in_buf[1]; + { + char *tmp = dbf->buf[0]; + dbf->buf[0] = dbf->buf[1]; + dbf->buf[1] = tmp; + } + dbf->error_code[0] = dbf->error_code[1]; + dbf->second_buf_ready = false; + dbf->offset_in_buf = 0; + if (!dbf->io_done) { + // Don't enqueue it if the I/O is all done. + //printf("%s:%d enq [%ld]\n", __FILE__, __LINE__, dbf-&bfs->files[0]); + enq(bfs, dbf); + } + toku_cond_broadcast(&bfs->cond); + toku_mutex_unlock(&bfs->mutex); + if (dbf->error_code[0]==0) { + assert(dbf->n_in_buf[0]>0); + return dbufio_fileset_read(bfs, filenum, buf_v, count, n_read); + } else { + *n_read = 0; + return dbf->error_code[0]; + } + } else { + toku_cond_wait(&bfs->cond, &bfs->mutex); + } + } + assert(0); // cannot get here. + } +} + +void +dbufio_print(DBUFIO_FILESET bfs) { + fprintf(stderr, "%s:%d bfs=%p", __FILE__, __LINE__, bfs); + if (bfs->panic) + fprintf(stderr, " panic=%d", bfs->panic_errno); + fprintf(stderr, " N=%d %d %" PRIuMAX, bfs->N, bfs->n_not_done, (uintmax_t) bfs->bufsize); + for (int i = 0; i < bfs->N; i++) { + struct dbufio_file *dbf = &bfs->files[i]; + if (dbf->error_code[0] || dbf->error_code[1]) + fprintf(stderr, " %d=[%d,%d]", i, dbf->error_code[0], dbf->error_code[1]); + } + fprintf(stderr, "\n"); + +} |