summaryrefslogtreecommitdiffstats
path: root/src/seastar/tests/unit/fstream_test.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/seastar/tests/unit/fstream_test.cc')
-rw-r--r--src/seastar/tests/unit/fstream_test.cc510
1 files changed, 510 insertions, 0 deletions
diff --git a/src/seastar/tests/unit/fstream_test.cc b/src/seastar/tests/unit/fstream_test.cc
new file mode 100644
index 00000000..f3b4c352
--- /dev/null
+++ b/src/seastar/tests/unit/fstream_test.cc
@@ -0,0 +1,510 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2015 Cloudius Systems, Ltd.
+ */
+
+#include <algorithm>
+#include <iostream>
+#include <numeric>
+#include <seastar/core/reactor.hh>
+#include <seastar/core/fstream.hh>
+#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/app-template.hh>
+#include <seastar/core/do_with.hh>
+#include <seastar/core/seastar.hh>
+#include <seastar/testing/test_case.hh>
+#include <seastar/core/thread.hh>
+#include <seastar/util/defer.hh>
+#include <random>
+#include <boost/range/adaptor/transformed.hpp>
+#include <boost/algorithm/cxx11/any_of.hpp>
+#include "mock_file.hh"
+
+using namespace seastar;
+
+struct writer {
+ output_stream<char> out;
+ writer(file f) : out(make_file_output_stream(std::move(f))) {}
+};
+
+struct reader {
+ input_stream<char> in;
+ reader(file f) : in(make_file_input_stream(std::move(f))) {}
+ reader(file f, file_input_stream_options options) : in(make_file_input_stream(std::move(f), std::move(options))) {}
+};
+
+SEASTAR_TEST_CASE(test_fstream) {
+ auto sem = make_lw_shared<semaphore>(0);
+
+ open_file_dma("testfile.tmp",
+ open_flags::rw | open_flags::create | open_flags::truncate).then([sem] (file f) {
+ auto w = make_shared<writer>(std::move(f));
+ auto buf = static_cast<char*>(::malloc(4096));
+ memset(buf, 0, 4096);
+ buf[0] = '[';
+ buf[1] = 'A';
+ buf[4095] = ']';
+ w->out.write(buf, 4096).then([buf, w] {
+ ::free(buf);
+ return make_ready_future<>();
+ }).then([w] {
+ auto buf = static_cast<char*>(::malloc(8192));
+ memset(buf, 0, 8192);
+ buf[0] = '[';
+ buf[1] = 'B';
+ buf[8191] = ']';
+ return w->out.write(buf, 8192).then([buf, w] {
+ ::free(buf);
+ return w->out.close().then([w] {});
+ });
+ }).then([] {
+ return open_file_dma("testfile.tmp", open_flags::ro);
+ }).then([] (file f) {
+ /* file content after running the above:
+ * 00000000 5b 41 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |[A..............|
+ * 00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
+ * *
+ * 00000ff0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 5d |...............]|
+ * 00001000 5b 42 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |[B..............|
+ * 00001010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
+ * *
+ * 00002ff0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 5d |...............]|
+ * 00003000
+ */
+ auto r = make_shared<reader>(std::move(f));
+ return r->in.read_exactly(4096 + 8192).then([r] (temporary_buffer<char> buf) {
+ auto p = buf.get();
+ BOOST_REQUIRE(p[0] == '[' && p[1] == 'A' && p[4095] == ']');
+ BOOST_REQUIRE(p[4096] == '[' && p[4096 + 1] == 'B' && p[4096 + 8191] == ']');
+ return make_ready_future<>();
+ }).then([r] {
+ return r->in.close();
+ }).finally([r] {});
+ }).finally([sem] () {
+ sem->signal();
+ });
+ });
+
+ return sem->wait();
+}
+
+SEASTAR_TEST_CASE(test_consume_skip_bytes) {
+ return seastar::async([] {
+ auto f = open_file_dma("testfile.tmp",
+ open_flags::rw | open_flags::create | open_flags::truncate).get0();
+ auto w = make_lw_shared<writer>(std::move(f));
+ auto write_block = [w] (char c, size_t size) {
+ std::vector<char> vec(size, c);
+ w->out.write(&vec.front(), vec.size()).get();
+ };
+ write_block('a', 8192);
+ write_block('b', 8192);
+ w->out.close().get();
+ /* file content after running the above:
+ * 00000000 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
+ * *
+ * 00002000 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbbbbbbbb|
+ * *
+ * 00004000
+ */
+ f = open_file_dma("testfile.tmp", open_flags::ro).get0();
+ auto r = make_lw_shared<reader>(std::move(f), file_input_stream_options{512});
+ struct consumer {
+ uint64_t _count = 0;
+ using consumption_result_type = typename input_stream<char>::consumption_result_type;
+ using stop_consuming_type = typename consumption_result_type::stop_consuming_type;
+ using tmp_buf = stop_consuming_type::tmp_buf;
+
+ /*
+ * Consumer reads the file as follows:
+ * - first 8000 bytes are read in 512-byte chunks and checked
+ * - next 2000 bytes are skipped (jumping over both read buffer size and DMA block)
+ * - the remaining 6384 bytes are read and checked
+ */
+ future<consumption_result_type> operator()(tmp_buf buf) {
+ if (_count < 8000) {
+ auto delta = std::min(buf.size(), 8000 - _count);
+ for (auto c : buf.share(0, delta)) {
+ BOOST_REQUIRE_EQUAL(c, 'a');
+ }
+ buf.trim_front(delta);
+ _count += delta;
+
+ if (_count == 8000) {
+ return make_ready_future<consumption_result_type>(skip_bytes{2000 - buf.size()});
+ } else {
+ assert(buf.empty());
+ return make_ready_future<consumption_result_type>(continue_consuming{});
+ }
+ return make_ready_future<consumption_result_type>(continue_consuming{});
+ } else {
+ for (auto c : buf) {
+ BOOST_REQUIRE_EQUAL(c, 'b');
+ }
+ _count += buf.size();
+ if (_count < 14384) {
+ return make_ready_future<consumption_result_type>(continue_consuming{});
+ } else if (_count > 14384) {
+ BOOST_FAIL("Read more than expected");
+ }
+ return make_ready_future<consumption_result_type>(stop_consuming_type({}));
+ }
+ }
+ };
+ r->in.consume(consumer{}).get();
+ r->in.close().get();
+ });
+}
+
+SEASTAR_TEST_CASE(test_fstream_unaligned) {
+ auto sem = make_lw_shared<semaphore>(0);
+
+ open_file_dma("testfile.tmp",
+ open_flags::rw | open_flags::create | open_flags::truncate).then([sem] (file f) {
+ auto w = make_shared<writer>(std::move(f));
+ auto buf = static_cast<char*>(::malloc(40));
+ memset(buf, 0, 40);
+ buf[0] = '[';
+ buf[1] = 'A';
+ buf[39] = ']';
+ w->out.write(buf, 40).then([buf, w] {
+ ::free(buf);
+ return w->out.close().then([w] {});
+ }).then([] {
+ return open_file_dma("testfile.tmp", open_flags::ro);
+ }).then([] (file f) {
+ return do_with(std::move(f), [] (file& f) {
+ return f.size().then([] (size_t size) {
+ // assert that file was indeed truncated to the amount of bytes written.
+ BOOST_REQUIRE(size == 40);
+ return make_ready_future<>();
+ });
+ });
+ }).then([] {
+ return open_file_dma("testfile.tmp", open_flags::ro);
+ }).then([] (file f) {
+ auto r = make_shared<reader>(std::move(f));
+ return r->in.read_exactly(40).then([r] (temporary_buffer<char> buf) {
+ auto p = buf.get();
+ BOOST_REQUIRE(p[0] == '[' && p[1] == 'A' && p[39] == ']');
+ return make_ready_future<>();
+ }).then([r] {
+ return r->in.close();
+ }).finally([r] {});
+ }).finally([sem] () {
+ sem->signal();
+ });
+ });
+
+ return sem->wait();
+}
+
+future<> test_consume_until_end(uint64_t size) {
+ return open_file_dma("testfile.tmp",
+ open_flags::rw | open_flags::create | open_flags::truncate).then([size] (file f) {
+ return do_with(make_file_output_stream(f), [size] (output_stream<char>& out) {
+ std::vector<char> buf(size);
+ std::iota(buf.begin(), buf.end(), 0);
+ return out.write(buf.data(), buf.size()).then([&out] {
+ return out.flush();
+ });
+ }).then([f] {
+ return f.size();
+ }).then([size, f] (size_t real_size) {
+ BOOST_REQUIRE_EQUAL(size, real_size);
+ }).then([size, f] {
+ auto consumer = [offset = uint64_t(0), size] (temporary_buffer<char> buf) mutable -> future<input_stream<char>::unconsumed_remainder> {
+ if (!buf) {
+ return make_ready_future<input_stream<char>::unconsumed_remainder>(temporary_buffer<char>());
+ }
+ BOOST_REQUIRE(offset + buf.size() <= size);
+ std::vector<char> expected(buf.size());
+ std::iota(expected.begin(), expected.end(), offset);
+ offset += buf.size();
+ BOOST_REQUIRE(std::equal(buf.begin(), buf.end(), expected.begin()));
+ return make_ready_future<input_stream<char>::unconsumed_remainder>(compat::nullopt);
+ };
+ return do_with(make_file_input_stream(f), std::move(consumer), [] (input_stream<char>& in, auto& consumer) {
+ return in.consume(consumer).then([&in] {
+ return in.close();
+ });
+ });
+ });
+ });
+}
+
+
+SEASTAR_TEST_CASE(test_consume_aligned_file) {
+ return test_consume_until_end(4096);
+}
+
+SEASTAR_TEST_CASE(test_consume_empty_file) {
+ return test_consume_until_end(0);
+}
+
+SEASTAR_TEST_CASE(test_consume_unaligned_file) {
+ return test_consume_until_end(1);
+}
+
+SEASTAR_TEST_CASE(test_consume_unaligned_file_large) {
+ return test_consume_until_end((1 << 20) + 1);
+}
+
+SEASTAR_TEST_CASE(test_input_stream_esp_around_eof) {
+ return seastar::async([] {
+ auto flen = uint64_t(5341);
+ auto rdist = std::uniform_int_distribution<char>();
+ auto reng = std::default_random_engine();
+ auto data = boost::copy_range<std::vector<uint8_t>>(
+ boost::irange<uint64_t>(0, flen)
+ | boost::adaptors::transformed([&] (int x) { return rdist(reng); }));
+ auto f = open_file_dma("file.tmp",
+ open_flags::rw | open_flags::create | open_flags::truncate).get0();
+ auto out = make_file_output_stream(f);
+ out.write(reinterpret_cast<const char*>(data.data()), data.size()).get();
+ out.flush().get();
+ //out.close().get(); // FIXME: closes underlying stream:?!
+ struct range { uint64_t start; uint64_t end; };
+ auto ranges = std::vector<range>{{
+ range{0, flen},
+ range{0, flen * 2},
+ range{0, flen + 1},
+ range{0, flen - 1},
+ range{0, 1},
+ range{1, 2},
+ range{flen - 1, flen},
+ range{flen - 1, flen + 1},
+ range{flen, flen + 1},
+ range{flen + 1, flen + 2},
+ range{1023, flen-1},
+ range{1023, flen},
+ range{1023, flen + 2},
+ range{8193, 8194},
+ range{1023, 1025},
+ range{1023, 1024},
+ range{1024, 1025},
+ range{1023, 4097},
+ }};
+ auto opt = file_input_stream_options();
+ opt.buffer_size = 512;
+ for (auto&& r : ranges) {
+ auto start = r.start;
+ auto end = r.end;
+ auto len = end - start;
+ auto in = make_file_input_stream(f, start, len, opt);
+ std::vector<uint8_t> readback;
+ auto more = true;
+ while (more) {
+ auto rdata = in.read().get0();
+ for (size_t i = 0; i < rdata.size(); ++i) {
+ readback.push_back(rdata.get()[i]);
+ }
+ more = !rdata.empty();
+ }
+ //in.close().get();
+ auto xlen = std::min(end, flen) - std::min(flen, start);
+ if (xlen != readback.size()) {
+ BOOST_FAIL(format("Expected {:d} bytes but got {:d}, start={:d}, end={:d}", xlen, readback.size(), start, end));
+ }
+ BOOST_REQUIRE(std::equal(readback.begin(), readback.end(), data.begin() + std::min(start, flen)));
+ }
+ f.close().get();
+ });
+}
+
+SEASTAR_TEST_CASE(file_handle_test) {
+ return seastar::async([] {
+ auto f = open_file_dma("testfile.tmp", open_flags::create | open_flags::truncate | open_flags::rw).get0();
+ auto buf = static_cast<char*>(aligned_alloc(4096, 4096));
+ auto del = defer([&] { ::free(buf); });
+ for (unsigned i = 0; i < 4096; ++i) {
+ buf[i] = i;
+ }
+ f.dma_write(0, buf, 4096).get();
+ auto bad = std::vector<unsigned>(smp::count); // std::vector<bool> is special and unsuitable because it uses bitfields
+ smp::invoke_on_all([fh = f.dup(), &bad] {
+ return seastar::async([fh, &bad] {
+ auto f = fh.to_file();
+ auto buf = static_cast<char*>(aligned_alloc(4096, 4096));
+ auto del = defer([&] { ::free(buf); });
+ f.dma_read(0, buf, 4096).get();
+ for (unsigned i = 0; i < 4096; ++i) {
+ bad[engine().cpu_id()] |= buf[i] != char(i);
+ }
+ });
+ }).get();
+ BOOST_REQUIRE(!boost::algorithm::any_of_equal(bad, 1u));
+ f.close().get();
+ });
+}
+
+SEASTAR_TEST_CASE(test_fstream_slow_start) {
+ return seastar::async([] {
+ static constexpr size_t file_size = 128 * 1024 * 1024;
+ static constexpr size_t buffer_size = 260 * 1024;
+ static constexpr size_t read_ahead = 1;
+
+ auto mock_file = make_shared<mock_read_only_file>(file_size);
+
+ auto history = make_lw_shared<file_input_stream_history>();
+
+ file_input_stream_options options{};
+ options.buffer_size = buffer_size;
+ options.read_ahead = read_ahead;
+ options.dynamic_adjustments = history;
+
+ static constexpr size_t requests_at_slow_start = 2; // 1 request + 1 read-ahead
+ static constexpr size_t requests_at_full_speed = read_ahead + 1; // 1 request + read_ahead
+
+ compat::optional<size_t> initial_read_size;
+
+ auto read_whole_file_with_slow_start = [&] (auto fstr) {
+ uint64_t total_read = 0;
+ size_t previous_buffer_length = 0;
+
+ // We don't want to assume too much about fstream internals, but with
+ // no history we should start with a buffer sizes somewhere in
+ // (0, buffer_size) range.
+ mock_file->set_read_size_verifier([&] (size_t length) {
+ BOOST_CHECK_LE(length, initial_read_size.value_or(buffer_size - 1));
+ BOOST_CHECK_GE(length, initial_read_size.value_or(1));
+ previous_buffer_length = length;
+ if (!initial_read_size) {
+ initial_read_size = length;
+ }
+ });
+
+ // Slow start phase
+ while (true) {
+ // We should leave slow start before reading the whole file.
+ BOOST_CHECK_LT(total_read, file_size);
+
+ mock_file->set_allowed_read_requests(requests_at_slow_start);
+ auto buf = fstr.read().get0();
+ BOOST_CHECK_GT(buf.size(), 0u);
+
+ mock_file->set_read_size_verifier([&] (size_t length) {
+ // There is no reason to reduce buffer size.
+ BOOST_CHECK_LE(length, std::min(previous_buffer_length * 2, buffer_size));
+ BOOST_CHECK_GE(length, previous_buffer_length);
+ previous_buffer_length = length;
+ });
+
+ BOOST_TEST_MESSAGE(format("Size {:d}", buf.size()));
+ total_read += buf.size();
+ if (buf.size() == buffer_size) {
+ BOOST_TEST_MESSAGE("Leaving slow start phase.");
+ break;
+ }
+ }
+
+ // Reading at full speed now
+ mock_file->set_expected_read_size(buffer_size);
+ while (total_read != file_size) {
+ mock_file->set_allowed_read_requests(requests_at_full_speed);
+ auto buf = fstr.read().get0();
+ total_read += buf.size();
+ }
+
+ mock_file->set_allowed_read_requests(requests_at_full_speed);
+ auto buf = fstr.read().get0();
+ BOOST_CHECK_EQUAL(buf.size(), 0u);
+ assert(buf.size() == 0);
+ };
+
+ auto read_while_file_at_full_speed = [&] (auto fstr) {
+ uint64_t total_read = 0;
+
+ mock_file->set_expected_read_size(buffer_size);
+ while (total_read != file_size) {
+ mock_file->set_allowed_read_requests(requests_at_full_speed);
+ auto buf = fstr.read().get0();
+ total_read += buf.size();
+ }
+
+ mock_file->set_allowed_read_requests(requests_at_full_speed);
+ auto buf = fstr.read().get0();
+ BOOST_CHECK_EQUAL(buf.size(), 0u);
+ };
+
+ auto read_and_skip_a_lot = [&] (auto fstr) {
+ uint64_t total_read = 0;
+ size_t previous_buffer_size = buffer_size;
+
+ mock_file->set_allowed_read_requests(std::numeric_limits<size_t>::max());
+ mock_file->set_read_size_verifier([&] (size_t length) {
+ // There is no reason to reduce buffer size.
+ BOOST_CHECK_LE(length, previous_buffer_size);
+ BOOST_CHECK_GE(length, initial_read_size.value_or(1));
+ previous_buffer_size = length;
+ });
+ while (total_read != file_size) {
+ auto buf = fstr.read().get0();
+ total_read += buf.size();
+
+ buf = fstr.read().get0();
+ total_read += buf.size();
+
+ auto skip_by = std::min(file_size - total_read, buffer_size * 2);
+ fstr.skip(skip_by).get();
+ total_read += skip_by;
+ }
+
+ // We should be back at slow start at this stage.
+ BOOST_CHECK_LT(previous_buffer_size, buffer_size);
+ if (initial_read_size) {
+ BOOST_CHECK_EQUAL(previous_buffer_size, *initial_read_size);
+ }
+
+ mock_file->set_allowed_read_requests(requests_at_full_speed);
+ auto buf = fstr.read().get0();
+ BOOST_CHECK_EQUAL(buf.size(), 0u);
+
+ };
+
+ auto make_fstream = [&] {
+ struct fstream_wrapper {
+ input_stream<char> s;
+ fstream_wrapper(fstream_wrapper&&) = default;
+ fstream_wrapper& operator=(fstream_wrapper&&) = default;
+ future<temporary_buffer<char>> read() {
+ return s.read();
+ }
+ future<> skip(uint64_t n) {
+ return s.skip(n);
+ }
+ ~fstream_wrapper() {
+ s.close().get();
+ }
+ };
+ return fstream_wrapper{make_file_input_stream(file(mock_file), 0, file_size, options)};
+ };
+
+ BOOST_TEST_MESSAGE("Reading file, no history, expectiong a slow start");
+ read_whole_file_with_slow_start(make_fstream());
+ BOOST_TEST_MESSAGE("Reading file again, everything good so far, read at full speed");
+ read_while_file_at_full_speed(make_fstream());
+ BOOST_TEST_MESSAGE("Reading and skipping a lot");
+ read_and_skip_a_lot(make_fstream());
+ BOOST_TEST_MESSAGE("Reading file, bad history, we are back at slow start...");
+ read_whole_file_with_slow_start(make_fstream());
+ BOOST_TEST_MESSAGE("Reading file yet again, should've recovered by now");
+ read_while_file_at_full_speed(make_fstream());
+ });
+}