/* * This file is open source software, licensed to you under the terms * of the Apache License, Version 2.0 (the "License"). See the NOTICE file * distributed with this work for additional information regarding copyright * ownership. You may not use this file except in compliance with the License. * * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ /* * Copyright (C) 2015 Cloudius Systems, Ltd. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "mock_file.hh" #include #include #include using namespace seastar; namespace fs = std::filesystem; struct writer { output_stream out; static future> make(file f) { return api_v3::and_newer::make_file_output_stream(std::move(f)).then([] (output_stream&& os) { return make_shared(writer{std::move(os)}); }); } }; struct reader { input_stream in; reader(file f) : in(make_file_input_stream(std::move(f))) {} reader(file f, file_input_stream_options options) : in(make_file_input_stream(std::move(f), std::move(options))) {} }; SEASTAR_TEST_CASE(test_fstream) { return tmp_dir::do_with([] (tmp_dir& t) { auto filename = (t.get_path() / "testfile.tmp").native(); return open_file_dma(filename, open_flags::rw | open_flags::create | open_flags::truncate).then([filename] (file f) { return writer::make(std::move(f)).then([filename] (shared_ptr w) { auto buf = static_cast(::malloc(4096)); memset(buf, 0, 4096); buf[0] = '['; buf[1] = 'A'; buf[4095] = ']'; return w->out.write(buf, 4096).then([buf, w] { ::free(buf); return make_ready_future<>(); }).then([w] { auto buf = static_cast(::malloc(8192)); memset(buf, 0, 8192); buf[0] = '['; buf[1] = 'B'; buf[8191] = ']'; return w->out.write(buf, 8192).then([buf, w] { ::free(buf); return w->out.close().then([w] {}); }); }).then([filename] { return open_file_dma(filename, open_flags::ro); }).then([] (file f) { /* file content after running the above: * 00000000 5b 41 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |[A..............| * 00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| * * * 00000ff0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 5d |...............]| * 00001000 5b 42 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |[B..............| * 00001010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................| * * * 00002ff0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 5d |...............]| * 00003000 */ auto r = make_shared(std::move(f)); return r->in.read_exactly(4096 + 8192).then([r] (temporary_buffer buf) { auto p = buf.get(); BOOST_REQUIRE(p[0] == '[' && p[1] == 'A' && p[4095] == ']'); BOOST_REQUIRE(p[4096] == '[' && p[4096 + 1] == 'B' && p[4096 + 8191] == ']'); return make_ready_future<>(); }).then([r] { return r->in.close(); }).finally([r] {}); }); }); }); }); } SEASTAR_TEST_CASE(test_consume_skip_bytes) { return tmp_dir::do_with_thread([] (tmp_dir& t) { auto filename = (t.get_path() / "testfile.tmp").native(); auto f = open_file_dma(filename, open_flags::rw | open_flags::create | open_flags::truncate).get0(); auto w = writer::make(std::move(f)).get0(); auto write_block = [w] (char c, size_t size) { std::vector vec(size, c); w->out.write(&vec.front(), vec.size()).get(); }; write_block('a', 8192); write_block('b', 8192); w->out.close().get(); /* file content after running the above: * 00000000 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa| * * * 00002000 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbbbbbbbb| * * * 00004000 */ f = open_file_dma(filename, open_flags::ro).get0(); auto r = make_lw_shared(std::move(f), file_input_stream_options{512}); auto close_r_in = deferred_close(r->in); struct consumer { uint64_t _count = 0; using consumption_result_type = typename input_stream::consumption_result_type; using stop_consuming_type = typename consumption_result_type::stop_consuming_type; using tmp_buf = stop_consuming_type::tmp_buf; /* * Consumer reads the file as follows: * - first 8000 bytes are read in 512-byte chunks and checked * - next 2000 bytes are skipped (jumping over both read buffer size and DMA block) * - the remaining 6384 bytes are read and checked */ future operator()(tmp_buf buf) { if (_count < 8000) { auto delta = std::min(buf.size(), 8000 - _count); for (auto c : buf.share(0, delta)) { BOOST_REQUIRE_EQUAL(c, 'a'); } buf.trim_front(delta); _count += delta; if (_count == 8000) { return make_ready_future(skip_bytes{2000 - buf.size()}); } else { assert(buf.empty()); return make_ready_future(continue_consuming{}); } return make_ready_future(continue_consuming{}); } else { for (auto c : buf) { BOOST_REQUIRE_EQUAL(c, 'b'); } _count += buf.size(); if (_count < 14384) { return make_ready_future(continue_consuming{}); } else if (_count > 14384) { BOOST_FAIL("Read more than expected"); } return make_ready_future(stop_consuming_type({})); } } }; r->in.consume(consumer{}).get(); }); } SEASTAR_TEST_CASE(test_fstream_unaligned) { return tmp_dir::do_with([] (tmp_dir& t) { auto filename = (t.get_path() / "testfile.tmp").native(); return open_file_dma(filename, open_flags::rw | open_flags::create | open_flags::truncate).then([filename] (file f) { return writer::make(std::move(f)).then([filename] (shared_ptr w) { auto buf = static_cast(::malloc(40)); memset(buf, 0, 40); buf[0] = '['; buf[1] = 'A'; buf[39] = ']'; return w->out.write(buf, 40).then([buf, w] { ::free(buf); return w->out.close().then([w] {}); }).then([filename] { return open_file_dma(filename, open_flags::ro); }).then([] (file f) { return do_with(std::move(f), [] (file& f) { return f.size().then([] (size_t size) { // assert that file was indeed truncated to the amount of bytes written. BOOST_REQUIRE(size == 40); return make_ready_future<>(); }); }); }).then([filename] { return open_file_dma(filename, open_flags::ro); }).then([] (file f) { auto r = make_shared(std::move(f)); return r->in.read_exactly(40).then([r] (temporary_buffer buf) { auto p = buf.get(); BOOST_REQUIRE(p[0] == '[' && p[1] == 'A' && p[39] == ']'); return make_ready_future<>(); }).then([r] { return r->in.close(); }).finally([r] {}); }); }); }); }); } future<> test_consume_until_end(uint64_t size) { return tmp_dir::do_with([size] (tmp_dir& t) { auto filename = (t.get_path() / "testfile.tmp").native(); return open_file_dma(filename, open_flags::rw | open_flags::create | open_flags::truncate).then([size] (file f) { return api_v3::and_newer::make_file_output_stream(f).then([size] (output_stream&& os) { return do_with(std::move(os), [size] (output_stream& out) { std::vector buf(size); std::iota(buf.begin(), buf.end(), 0); return out.write(buf.data(), buf.size()).then([&out] { return out.flush(); }); }); }).then([f] { return f.size(); }).then([size, f] (size_t real_size) { BOOST_REQUIRE_EQUAL(size, real_size); }).then([size, f] { auto consumer = [offset = uint64_t(0), size] (temporary_buffer buf) mutable -> future::unconsumed_remainder> { if (!buf) { return make_ready_future::unconsumed_remainder>(temporary_buffer()); } BOOST_REQUIRE(offset + buf.size() <= size); std::vector expected(buf.size()); std::iota(expected.begin(), expected.end(), offset); offset += buf.size(); BOOST_REQUIRE(std::equal(buf.begin(), buf.end(), expected.begin())); return make_ready_future::unconsumed_remainder>(std::nullopt); }; return do_with(make_file_input_stream(f), std::move(consumer), [] (input_stream& in, auto& consumer) { return in.consume(consumer).then([&in] { return in.close(); }); }); }).finally([f] () mutable { return f.close(); }); }); }); } SEASTAR_TEST_CASE(test_consume_aligned_file) { return test_consume_until_end(4096); } SEASTAR_TEST_CASE(test_consume_empty_file) { return test_consume_until_end(0); } SEASTAR_TEST_CASE(test_consume_unaligned_file) { return test_consume_until_end(1); } SEASTAR_TEST_CASE(test_consume_unaligned_file_large) { return test_consume_until_end((1 << 20) + 1); } SEASTAR_TEST_CASE(test_input_stream_esp_around_eof) { return tmp_dir::do_with_thread([] (tmp_dir& t) { auto flen = uint64_t(5341); auto rdist = std::uniform_int_distribution(0, std::numeric_limits::max()); auto reng = testing::local_random_engine; auto data = boost::copy_range>( boost::irange(0, flen) | boost::adaptors::transformed([&] (int x) { return rdist(reng); })); auto filename = (t.get_path() / "testfile.tmp").native(); auto f = open_file_dma(filename, open_flags::rw | open_flags::create | open_flags::truncate).get0(); auto close_f = deferred_close(f); auto out = api_v3::and_newer::make_file_output_stream(f).get0(); out.write(reinterpret_cast(data.data()), data.size()).get(); out.flush().get(); //out.close().get(); // FIXME: closes underlying stream:?! struct range { uint64_t start; uint64_t end; }; auto ranges = std::vector{{ range{0, flen}, range{0, flen * 2}, range{0, flen + 1}, range{0, flen - 1}, range{0, 1}, range{1, 2}, range{flen - 1, flen}, range{flen - 1, flen + 1}, range{flen, flen + 1}, range{flen + 1, flen + 2}, range{1023, flen-1}, range{1023, flen}, range{1023, flen + 2}, range{8193, 8194}, range{1023, 1025}, range{1023, 1024}, range{1024, 1025}, range{1023, 4097}, }}; auto opt = file_input_stream_options(); opt.buffer_size = 512; for (auto&& r : ranges) { auto start = r.start; auto end = r.end; auto len = end - start; auto in = make_file_input_stream(f, start, len, opt); std::vector readback; auto more = true; while (more) { auto rdata = in.read().get0(); for (size_t i = 0; i < rdata.size(); ++i) { readback.push_back(rdata.get()[i]); } more = !rdata.empty(); } //in.close().get(); auto xlen = std::min(end, flen) - std::min(flen, start); if (xlen != readback.size()) { BOOST_FAIL(format("Expected {:d} bytes but got {:d}, start={:d}, end={:d}", xlen, readback.size(), start, end)); } BOOST_REQUIRE(std::equal(readback.begin(), readback.end(), data.begin() + std::min(start, flen))); } }); } #if SEASTAR_API_LEVEL >= 3 SEASTAR_TEST_CASE(without_api_prefix) { return tmp_dir::do_with_thread([](tmp_dir& t) { auto filename = (t.get_path() / "testfile.tmp").native(); auto f = open_file_dma(filename, open_flags::rw | open_flags::create | open_flags::truncate).get0(); output_stream out = make_file_output_stream(f).get0(); out.close().get(); }); } #endif SEASTAR_TEST_CASE(file_handle_test) { return tmp_dir::do_with_thread([] (tmp_dir& t) { auto filename = (t.get_path() / "testfile.tmp").native(); auto f = open_file_dma(filename, open_flags::create | open_flags::truncate | open_flags::rw).get0(); auto close_f = deferred_close(f); auto buf = static_cast(aligned_alloc(4096, 4096)); auto del = defer([&] () noexcept { ::free(buf); }); for (unsigned i = 0; i < 4096; ++i) { buf[i] = i; } f.dma_write(0, buf, 4096).get(); auto bad = std::vector(smp::count); // std::vector is special and unsuitable because it uses bitfields smp::invoke_on_all([fh = f.dup(), &bad] { return seastar::async([fh, &bad] { auto f = fh.to_file(); auto buf = static_cast(aligned_alloc(4096, 4096)); auto del = defer([&] () noexcept { ::free(buf); }); f.dma_read(0, buf, 4096).get(); for (unsigned i = 0; i < 4096; ++i) { bad[this_shard_id()] |= buf[i] != char(i); } }); }).get(); BOOST_REQUIRE(!boost::algorithm::any_of_equal(bad, 1u)); }); } SEASTAR_TEST_CASE(test_fstream_slow_start) { return seastar::async([] { static constexpr size_t file_size = 128 * 1024 * 1024; static constexpr size_t buffer_size = 260 * 1024; static constexpr size_t read_ahead = 1; auto mock_file = make_shared(file_size); auto history = make_lw_shared(); file_input_stream_options options{}; options.buffer_size = buffer_size; options.read_ahead = read_ahead; options.dynamic_adjustments = history; static constexpr size_t requests_at_slow_start = 2; // 1 request + 1 read-ahead static constexpr size_t requests_at_full_speed = read_ahead + 1; // 1 request + read_ahead std::optional initial_read_size; auto read_whole_file_with_slow_start = [&] (auto fstr) { uint64_t total_read = 0; size_t previous_buffer_length = 0; // We don't want to assume too much about fstream internals, but with // no history we should start with a buffer sizes somewhere in // (0, buffer_size) range. mock_file->set_read_size_verifier([&] (size_t length) { BOOST_CHECK_LE(length, initial_read_size.value_or(buffer_size - 1)); BOOST_CHECK_GE(length, initial_read_size.value_or(1)); previous_buffer_length = length; if (!initial_read_size) { initial_read_size = length; } }); // Slow start phase while (true) { // We should leave slow start before reading the whole file. BOOST_CHECK_LT(total_read, file_size); mock_file->set_allowed_read_requests(requests_at_slow_start); auto buf = fstr.read().get0(); BOOST_CHECK_GT(buf.size(), 0u); mock_file->set_read_size_verifier([&] (size_t length) { // There is no reason to reduce buffer size. BOOST_CHECK_LE(length, std::min(previous_buffer_length * 2, buffer_size)); BOOST_CHECK_GE(length, previous_buffer_length); previous_buffer_length = length; }); BOOST_TEST_MESSAGE(format("Size {:d}", buf.size())); total_read += buf.size(); if (buf.size() == buffer_size) { BOOST_TEST_MESSAGE("Leaving slow start phase."); break; } } // Reading at full speed now mock_file->set_expected_read_size(buffer_size); while (total_read != file_size) { mock_file->set_allowed_read_requests(requests_at_full_speed); auto buf = fstr.read().get0(); total_read += buf.size(); } mock_file->set_allowed_read_requests(requests_at_full_speed); auto buf = fstr.read().get0(); BOOST_CHECK_EQUAL(buf.size(), 0u); assert(buf.size() == 0); }; auto read_while_file_at_full_speed = [&] (auto fstr) { uint64_t total_read = 0; mock_file->set_expected_read_size(buffer_size); while (total_read != file_size) { mock_file->set_allowed_read_requests(requests_at_full_speed); auto buf = fstr.read().get0(); total_read += buf.size(); } mock_file->set_allowed_read_requests(requests_at_full_speed); auto buf = fstr.read().get0(); BOOST_CHECK_EQUAL(buf.size(), 0u); }; auto read_and_skip_a_lot = [&] (auto fstr) { uint64_t total_read = 0; size_t previous_buffer_size = buffer_size; mock_file->set_allowed_read_requests(std::numeric_limits::max()); mock_file->set_read_size_verifier([&] (size_t length) { // There is no reason to reduce buffer size. BOOST_CHECK_LE(length, previous_buffer_size); BOOST_CHECK_GE(length, initial_read_size.value_or(1)); previous_buffer_size = length; }); while (total_read != file_size) { auto buf = fstr.read().get0(); total_read += buf.size(); buf = fstr.read().get0(); total_read += buf.size(); auto skip_by = std::min(file_size - total_read, buffer_size * 2); fstr.skip(skip_by).get(); total_read += skip_by; } // We should be back at slow start at this stage. BOOST_CHECK_LT(previous_buffer_size, buffer_size); if (initial_read_size) { BOOST_CHECK_EQUAL(previous_buffer_size, *initial_read_size); } mock_file->set_allowed_read_requests(requests_at_full_speed); auto buf = fstr.read().get0(); BOOST_CHECK_EQUAL(buf.size(), 0u); }; auto make_fstream = [&] { struct fstream_wrapper { input_stream s; explicit fstream_wrapper(input_stream&& s) : s(std::move(s)) {} fstream_wrapper(fstream_wrapper&&) = default; fstream_wrapper& operator=(fstream_wrapper&&) = default; future> read() { return s.read(); } future<> skip(uint64_t n) { return s.skip(n); } ~fstream_wrapper() { s.close().get(); } }; return fstream_wrapper(make_file_input_stream(file(mock_file), 0, file_size, options)); }; BOOST_TEST_MESSAGE("Reading file, no history, expectiong a slow start"); read_whole_file_with_slow_start(make_fstream()); BOOST_TEST_MESSAGE("Reading file again, everything good so far, read at full speed"); read_while_file_at_full_speed(make_fstream()); BOOST_TEST_MESSAGE("Reading and skipping a lot"); read_and_skip_a_lot(make_fstream()); BOOST_TEST_MESSAGE("Reading file, bad history, we are back at slow start..."); read_whole_file_with_slow_start(make_fstream()); BOOST_TEST_MESSAGE("Reading file yet again, should've recovered by now"); read_while_file_at_full_speed(make_fstream()); }); } #ifdef SEASTAR_ENABLE_ALLOC_FAILURE_INJECTION SEASTAR_TEST_CASE(test_close_error) { using namespace seastar::memory; return tmp_dir::do_with_thread([] (tmp_dir& t) { bool done = false; for (size_t i = 0; !done; i++) { bool got_close_error = false; sstring filename = (t.get_path() / format("testfile-{}.tmp", i).c_str()).native(); file f = open_file_dma(filename, open_flags::rw | open_flags::create | open_flags::truncate).get0(); auto opts = file_output_stream_options{}; opts.write_behind = 16; std::unique_ptr> out = std::make_unique>(make_file_output_stream(std::move(f), opts).get0()); size_t size = 4096; std::vector buf(size); std::iota(buf.begin(), buf.end(), 0); size_t file_length = 1 * 1024 * 1024; auto fut = make_ready_future<>(); for (size_t len = 0; len < file_length; len += size) { fut = fut.finally([&] { return out->write(buf.data(), size); }); } fut.get(); try { local_failure_injector().fail_after(i); out->close().get(); done = true; local_failure_injector().cancel(); } catch (const std::bad_alloc&) { got_close_error = true; } BOOST_REQUIRE(got_close_error || done); out.reset(); remove_file(filename).get(); } }); } #endif // SEASTAR_ENABLE_ALLOC_FAILURE_INJECTION