diff options
Diffstat (limited to 'src/seastar/tests/unit/futures_test.cc')
-rw-r--r-- | src/seastar/tests/unit/futures_test.cc | 1617 |
1 files changed, 1617 insertions, 0 deletions
diff --git a/src/seastar/tests/unit/futures_test.cc b/src/seastar/tests/unit/futures_test.cc new file mode 100644 index 000000000..200e33ac9 --- /dev/null +++ b/src/seastar/tests/unit/futures_test.cc @@ -0,0 +1,1617 @@ +/* + * This file is open source software, licensed to you under the terms + * of the Apache License, Version 2.0 (the "License"). See the NOTICE file + * distributed with this work for additional information regarding copyright + * ownership. You may not use this file except in compliance with the License. + * + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Copyright (C) 2014 Cloudius Systems, Ltd. + */ + +#include <seastar/testing/test_case.hh> + +#include <seastar/core/reactor.hh> +#include <seastar/core/shared_ptr.hh> +#include <seastar/core/future-util.hh> +#include <seastar/core/sleep.hh> +#include <seastar/core/stream.hh> +#include <seastar/util/backtrace.hh> +#include <seastar/core/do_with.hh> +#include <seastar/core/shared_future.hh> +#include <seastar/core/manual_clock.hh> +#include <seastar/core/thread.hh> +#include <seastar/core/print.hh> +#include <seastar/core/gate.hh> +#include <seastar/util/log.hh> +#include <boost/iterator/counting_iterator.hpp> +#include <seastar/testing/thread_test_case.hh> + +#include <boost/range/iterator_range.hpp> +#include <boost/range/irange.hpp> + +#include <seastar/core/internal/api-level.hh> + +using namespace seastar; +using namespace std::chrono_literals; + +static_assert(std::is_nothrow_default_constructible_v<gate>, + "seastar::gate constructor must not throw"); +static_assert(std::is_nothrow_move_constructible_v<gate>, + "seastar::gate move constructor must not throw"); + +static_assert(std::is_nothrow_default_constructible_v<shared_future<>>); +static_assert(std::is_nothrow_copy_constructible_v<shared_future<>>); +static_assert(std::is_nothrow_move_constructible_v<shared_future<>>); + +static_assert(std::is_nothrow_move_constructible_v<shared_promise<>>); + +class expected_exception : public std::runtime_error { +public: + expected_exception() : runtime_error("expected") {} +}; + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wself-move" +#endif +SEASTAR_TEST_CASE(test_self_move) { + future_state<std::tuple<std::unique_ptr<int>>> s1; + s1.set(std::make_unique<int>(42)); + s1 = std::move(s1); // no crash, but the value of s1 is not defined. + +#if SEASTAR_API_LEVEL < 5 + future_state<std::tuple<std::unique_ptr<int>>> s2; +#else + future_state<std::unique_ptr<int>> s2; +#endif + s2.set(std::make_unique<int>(42)); + std::swap(s2, s2); + BOOST_REQUIRE_EQUAL(*std::move(s2).get0(), 42); + + promise<std::unique_ptr<int>> p1; + p1.set_value(std::make_unique<int>(42)); + p1 = std::move(p1); // no crash, but the value of p1 is not defined. + + promise<std::unique_ptr<int>> p2; + p2.set_value(std::make_unique<int>(42)); + std::swap(p2, p2); + BOOST_REQUIRE_EQUAL(*p2.get_future().get0(), 42); + + auto f1 = make_ready_future<std::unique_ptr<int>>(std::make_unique<int>(42)); + f1 = std::move(f1); // no crash, but the value of f1 is not defined. + + auto f2 = make_ready_future<std::unique_ptr<int>>(std::make_unique<int>(42)); + std::swap(f2, f2); + BOOST_REQUIRE_EQUAL(*f2.get0(), 42); + + return make_ready_future<>(); +} +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + +static subscription<int> get_empty_subscription(std::function<future<> (int)> func) { + stream<int> s; + auto ret = s.listen(func); + s.close(); + return ret; +} + +SEASTAR_TEST_CASE(test_stream) { + auto sub = get_empty_subscription([](int x) { + return make_ready_future<>(); + }); + return sub.done(); +} + +SEASTAR_TEST_CASE(test_stream_drop_sub) { + auto s = make_lw_shared<stream<int>>(); + std::optional<future<>> ret; + { + auto sub = s->listen([](int x) { + return make_ready_future<>(); + }); + ret = sub.done(); + // It is ok to drop the subscription when we only want the competition future. + } + return s->produce(42).then([ret = std::move(*ret), s] () mutable { + s->close(); + return std::move(ret); + }); +} + +SEASTAR_TEST_CASE(test_reference) { + int a = 42; + future<int&> orig = make_ready_future<int&>(a); + future<int&> fut = std::move(orig); + int& r = fut.get0(); + r = 43; + BOOST_REQUIRE_EQUAL(a, 43); + return make_ready_future<>(); +} + +SEASTAR_TEST_CASE(test_set_future_state_with_tuple) { + future_state<std::tuple<int>> s1; + promise<int> p1; + const std::tuple<int> v1(42); + s1.set(v1); + p1.set_value(v1); + + return make_ready_future<>(); +} + +SEASTAR_THREAD_TEST_CASE(test_set_value_make_exception_in_copy) { + struct throw_in_copy { + throw_in_copy() noexcept = default; + throw_in_copy(throw_in_copy&& x) noexcept { + } + throw_in_copy(const throw_in_copy& x) { + throw 42; + } + }; + promise<throw_in_copy> p1; + throw_in_copy v; + p1.set_value(v); + BOOST_REQUIRE_THROW(p1.get_future().get(), int); +} + +SEASTAR_THREAD_TEST_CASE(test_set_exception_in_constructor) { + struct throw_in_constructor { + throw_in_constructor() { + throw 42; + } + }; + future<throw_in_constructor> f = make_ready_future<throw_in_constructor>(); + BOOST_REQUIRE(f.failed()); + BOOST_REQUIRE_THROW(f.get(), int); +} + +SEASTAR_TEST_CASE(test_finally_is_called_on_success_and_failure) { + auto finally1 = make_shared<bool>(); + auto finally2 = make_shared<bool>(); + + return make_ready_future().then([] { + }).finally([=] { + *finally1 = true; + }).then([] { + throw std::runtime_error(""); + }).finally([=] { + *finally2 = true; + }).then_wrapped([=] (auto&& f) { + BOOST_REQUIRE(*finally1); + BOOST_REQUIRE(*finally2); + + // Should be failed. + try { + f.get(); + BOOST_REQUIRE(false); + } catch (...) {} + }); +} + +SEASTAR_TEST_CASE(test_get_on_promise) { + auto p = promise<uint32_t>(); + p.set_value(10); + BOOST_REQUIRE_EQUAL(10u, p.get_future().get0()); + return make_ready_future(); +} + +// An exception class with a controlled what() overload +class test_exception : public std::exception { + sstring _what; +public: + explicit test_exception(sstring what) : _what(std::move(what)) {} + virtual const char* what() const noexcept override { + return _what.c_str(); + } +}; + +static void check_finally_exception(std::exception_ptr ex) { + BOOST_REQUIRE_EQUAL(fmt::format("{}", ex), + "seastar::nested_exception: test_exception (bar) (while cleaning up after test_exception (foo))"); + try { + // convert to the concrete type nested_exception + std::rethrow_exception(ex); + } catch (seastar::nested_exception& ex) { + try { + std::rethrow_exception(ex.inner); + } catch (test_exception& inner) { + BOOST_REQUIRE_EQUAL(inner.what(), "bar"); + } + try { + ex.rethrow_nested(); + } catch (test_exception& outer) { + BOOST_REQUIRE_EQUAL(outer.what(), "foo"); + } + } +} + +SEASTAR_TEST_CASE(test_finally_exception) { + return make_ready_future<>().then([] { + throw test_exception("foo"); + }).finally([] { + throw test_exception("bar"); + }).handle_exception(check_finally_exception); +} + +SEASTAR_TEST_CASE(test_finally_exceptional_future) { + return make_ready_future<>().then([] { + throw test_exception("foo"); + }).finally([] { + return make_exception_future<>(test_exception("bar")); + }).handle_exception(check_finally_exception); +} + +SEASTAR_TEST_CASE(test_finally_waits_for_inner) { + auto finally = make_shared<bool>(); + auto p = make_shared<promise<>>(); + + auto f = make_ready_future().then([] { + }).finally([=] { + return p->get_future().then([=] { + *finally = true; + }); + }).then([=] { + BOOST_REQUIRE(*finally); + }); + BOOST_REQUIRE(!*finally); + p->set_value(); + return f; +} + +SEASTAR_TEST_CASE(test_finally_is_called_on_success_and_failure__not_ready_to_armed) { + auto finally1 = make_shared<bool>(); + auto finally2 = make_shared<bool>(); + + promise<> p; + auto f = p.get_future().finally([=] { + *finally1 = true; + }).then([] { + throw std::runtime_error(""); + }).finally([=] { + *finally2 = true; + }).then_wrapped([=] (auto &&f) { + BOOST_REQUIRE(*finally1); + BOOST_REQUIRE(*finally2); + try { + f.get(); + } catch (...) {} // silence exceptional future ignored messages + }); + + p.set_value(); + return f; +} + +SEASTAR_TEST_CASE(test_exception_from_finally_fails_the_target) { + promise<> pr; + auto f = pr.get_future().finally([=] { + throw std::runtime_error(""); + }).then([] { + BOOST_REQUIRE(false); + }).then_wrapped([] (auto&& f) { + try { + f.get(); + } catch (...) {} // silence exceptional future ignored messages + }); + + pr.set_value(); + return f; +} + +SEASTAR_TEST_CASE(test_exception_from_finally_fails_the_target_on_already_resolved) { + return make_ready_future().finally([=] { + throw std::runtime_error(""); + }).then([] { + BOOST_REQUIRE(false); + }).then_wrapped([] (auto&& f) { + try { + f.get(); + } catch (...) {} // silence exceptional future ignored messages + }); +} + +SEASTAR_TEST_CASE(test_exception_thrown_from_then_wrapped_causes_future_to_fail) { + return make_ready_future().then_wrapped([] (auto&& f) { + throw std::runtime_error(""); + }).then_wrapped([] (auto&& f) { + try { + f.get(); + BOOST_REQUIRE(false); + } catch (...) {} + }); +} + +SEASTAR_TEST_CASE(test_exception_thrown_from_then_wrapped_causes_future_to_fail__async_case) { + promise<> p; + + auto f = p.get_future().then_wrapped([] (auto&& f) { + throw std::runtime_error(""); + }).then_wrapped([] (auto&& f) { + try { + f.get(); + BOOST_REQUIRE(false); + } catch (...) {} + }); + + p.set_value(); + + return f; +} + +SEASTAR_TEST_CASE(test_failing_intermediate_promise_should_fail_the_master_future) { + promise<> p1; + promise<> p2; + + auto f = p1.get_future().then([f = p2.get_future()] () mutable { + return std::move(f); + }).then([] { + BOOST_REQUIRE(false); + }); + + p1.set_value(); + p2.set_exception(std::runtime_error("boom")); + + return std::move(f).then_wrapped([](auto&& f) { + try { + f.get(); + BOOST_REQUIRE(false); + } catch (...) {} + }); +} + +SEASTAR_TEST_CASE(test_future_forwarding__not_ready_to_unarmed) { + promise<> p1; + promise<> p2; + + auto f1 = p1.get_future(); + auto f2 = p2.get_future(); + + f1.forward_to(std::move(p2)); + + BOOST_REQUIRE(!f2.available()); + + auto called = f2.then([] {}); + + p1.set_value(); + return called; +} + +SEASTAR_TEST_CASE(test_future_forwarding__not_ready_to_armed) { + promise<> p1; + promise<> p2; + + auto f1 = p1.get_future(); + auto f2 = p2.get_future(); + + auto called = f2.then([] {}); + + f1.forward_to(std::move(p2)); + + BOOST_REQUIRE(!f2.available()); + + p1.set_value(); + + return called; +} + +SEASTAR_TEST_CASE(test_future_forwarding__ready_to_unarmed) { + promise<> p2; + + auto f1 = make_ready_future<>(); + auto f2 = p2.get_future(); + + std::move(f1).forward_to(std::move(p2)); + BOOST_REQUIRE(f2.available()); + + return std::move(f2).then_wrapped([] (future<> f) { + BOOST_REQUIRE(!f.failed()); + }); +} + +SEASTAR_TEST_CASE(test_future_forwarding__ready_to_armed) { + promise<> p2; + + auto f1 = make_ready_future<>(); + auto f2 = p2.get_future(); + + auto called = std::move(f2).then([] {}); + + BOOST_REQUIRE(f1.available()); + + f1.forward_to(std::move(p2)); + return called; +} + +static void forward_dead_unarmed_promise_with_dead_future_to(promise<>& p) { + promise<> p2; + p.get_future().forward_to(std::move(p2)); +} + +SEASTAR_TEST_CASE(test_future_forwarding__ready_to_unarmed_soon_to_be_dead) { + promise<> p1; + forward_dead_unarmed_promise_with_dead_future_to(p1); + make_ready_future<>().forward_to(std::move(p1)); + return make_ready_future<>(); +} + +SEASTAR_TEST_CASE(test_exception_can_be_thrown_from_do_until_body) { + return do_until([] { return false; }, [] { + throw expected_exception(); + return now(); + }).then_wrapped([] (auto&& f) { + try { + f.get(); + BOOST_FAIL("should have failed"); + } catch (const expected_exception& e) { + // expected + } + }); +} + +SEASTAR_TEST_CASE(test_bare_value_can_be_returned_from_callback) { + return now().then([] { + return 3; + }).then([] (int x) { + BOOST_REQUIRE(x == 3); + }); +} + +SEASTAR_TEST_CASE(test_when_all_iterator_range) { + std::vector<future<size_t>> futures; + for (size_t i = 0; i != 1000000; ++i) { + // Use a mix of available and unavailable futures to exercise + // both paths in when_all(). + auto fut = (i % 2) == 0 ? make_ready_future<>() : later(); + futures.push_back(fut.then([i] { return i; })); + } + // Verify the above statement is correct + BOOST_REQUIRE(!std::all_of(futures.begin(), futures.end(), + [] (auto& f) { return f.available(); })); + auto p = make_shared(std::move(futures)); + return when_all(p->begin(), p->end()).then([p] (std::vector<future<size_t>> ret) { + BOOST_REQUIRE(std::all_of(ret.begin(), ret.end(), [] (auto& f) { return f.available(); })); + BOOST_REQUIRE(std::all_of(ret.begin(), ret.end(), [&ret] (auto& f) { return f.get0() == size_t(&f - ret.data()); })); + }); +} + +SEASTAR_TEST_CASE(test_map_reduce) { + auto square = [] (long x) { return make_ready_future<long>(x*x); }; + long n = 1000; + return map_reduce(boost::make_counting_iterator<long>(0), boost::make_counting_iterator<long>(n), + square, long(0), std::plus<long>()).then([n] (auto result) { + auto m = n - 1; // counting does not include upper bound + BOOST_REQUIRE_EQUAL(result, (m * (m + 1) * (2*m + 1)) / 6); + }); +} + +SEASTAR_TEST_CASE(test_map_reduce_simple) { + return do_with(0L, [] (auto& res) { + long n = 10; + return map_reduce(boost::make_counting_iterator<long>(0), boost::make_counting_iterator<long>(n), + [] (long x) { return x; }, + [&res] (long x) { res += x; }).then([n, &res] { + long expected = (n * (n - 1)) / 2; + BOOST_REQUIRE_EQUAL(res, expected); + }); + }); +} + +SEASTAR_TEST_CASE(test_map_reduce_tuple) { + return do_with(0L, 0L, [] (auto& res0, auto& res1) { + long n = 10; + return map_reduce(boost::make_counting_iterator<long>(0), boost::make_counting_iterator<long>(n), + [] (long x) { return std::tuple<long, long>(x, -x); }, + [&res0, &res1] (std::tuple<long, long> t) { res0 += std::get<0>(t); res1 += std::get<1>(t); }).then([n, &res0, &res1] { + long expected = (n * (n - 1)) / 2; + BOOST_REQUIRE_EQUAL(res0, expected); + BOOST_REQUIRE_EQUAL(res1, -expected); + }); + }); +} + +// This test doesn't actually test anything - it just waits for the future +// returned by sleep to complete. However, a bug we had in sleep() caused +// this test to fail the sanitizer in the debug build, so this is a useful +// regression test. +SEASTAR_TEST_CASE(test_sleep) { + return sleep(std::chrono::milliseconds(100)); +} + +SEASTAR_TEST_CASE(test_do_with_1) { + return do_with(1, [] (int& one) { + BOOST_REQUIRE_EQUAL(one, 1); + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(test_do_with_2) { + return do_with(1, 2L, [] (int& one, long two) { + BOOST_REQUIRE_EQUAL(one, 1); + BOOST_REQUIRE_EQUAL(two, 2); + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(test_do_with_3) { + return do_with(1, 2L, 3, [] (int& one, long two, int three) { + BOOST_REQUIRE_EQUAL(one, 1); + BOOST_REQUIRE_EQUAL(two, 2); + BOOST_REQUIRE_EQUAL(three, 3); + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(test_do_with_4) { + return do_with(1, 2L, 3, 4, [] (int& one, long two, int three, int four) { + BOOST_REQUIRE_EQUAL(one, 1); + BOOST_REQUIRE_EQUAL(two, 2); + BOOST_REQUIRE_EQUAL(three, 3); + BOOST_REQUIRE_EQUAL(four, 4); + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(test_do_with_5) { + using func = noncopyable_function<void()>; + return do_with(func([] {}), [] (func&) { + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(test_do_with_6) { + const int x = 42; + return do_with(int(42), x, [](int&, int&) { + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(test_do_with_7) { + const int x = 42; + return do_with(x, [](int&) { + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(test_do_while_stopping_immediately) { + return do_with(int(0), [] (int& count) { + return repeat([&count] { + ++count; + return stop_iteration::yes; + }).then([&count] { + BOOST_REQUIRE(count == 1); + }); + }); +} + +SEASTAR_TEST_CASE(test_do_while_stopping_after_two_iterations) { + return do_with(int(0), [] (int& count) { + return repeat([&count] { + ++count; + return count == 2 ? stop_iteration::yes : stop_iteration::no; + }).then([&count] { + BOOST_REQUIRE(count == 2); + }); + }); +} + +SEASTAR_TEST_CASE(test_do_while_failing_in_the_first_step) { + return repeat([] { + throw expected_exception(); + return stop_iteration::no; + }).then_wrapped([](auto&& f) { + try { + f.get(); + BOOST_FAIL("should not happen"); + } catch (const expected_exception&) { + // expected + } + }); +} + +SEASTAR_TEST_CASE(test_do_while_failing_in_the_second_step) { + return do_with(int(0), [] (int& count) { + return repeat([&count] { + ++count; + if (count > 1) { + throw expected_exception(); + } + return later().then([] { return stop_iteration::no; }); + }).then_wrapped([&count](auto&& f) { + try { + f.get(); + BOOST_FAIL("should not happen"); + } catch (const expected_exception&) { + BOOST_REQUIRE(count == 2); + } + }); + }); +} + +SEASTAR_TEST_CASE(test_parallel_for_each) { + return async([] { + // empty + parallel_for_each(std::vector<int>(), [] (int) -> future<> { + BOOST_FAIL("should not reach"); + abort(); + }).get(); + + // immediate result + auto range = boost::copy_range<std::vector<int>>(boost::irange(1, 6)); + auto sum = 0; + parallel_for_each(range, [&sum] (int v) { + sum += v; + return make_ready_future<>(); + }).get(); + BOOST_REQUIRE_EQUAL(sum, 15); + + // all suspend + sum = 0; + parallel_for_each(range, [&sum] (int v) { + return later().then([&sum, v] { + sum += v; + }); + }).get(); + BOOST_REQUIRE_EQUAL(sum, 15); + + // throws immediately + BOOST_CHECK_EXCEPTION(parallel_for_each(range, [] (int) -> future<> { + throw 5; + }).get(), int, [] (int v) { return v == 5; }); + + // throws after suspension + BOOST_CHECK_EXCEPTION(parallel_for_each(range, [] (int) { + return later().then([] { + throw 5; + }); + }).get(), int, [] (int v) { return v == 5; }); + }); +} + +SEASTAR_TEST_CASE(test_parallel_for_each_early_failure) { + return do_with(0, [] (int& counter) { + return parallel_for_each(boost::irange(0, 11000), [&counter] (int i) { + using namespace std::chrono_literals; + // force scheduling + return sleep((i % 31 + 1) * 1ms).then([&counter, i] { + ++counter; + if (i % 1777 == 1337) { + return make_exception_future<>(i); + } + return make_ready_future<>(); + }); + }).then_wrapped([&counter] (future<> f) { + BOOST_REQUIRE_EQUAL(counter, 11000); + BOOST_REQUIRE(f.failed()); + try { + f.get(); + BOOST_FAIL("wanted an exception"); + } catch (int i) { + BOOST_REQUIRE(i % 1777 == 1337); + } catch (...) { + BOOST_FAIL("bad exception type"); + } + }); + }); +} + +SEASTAR_TEST_CASE(test_parallel_for_each_waits_for_all_fibers_even_if_one_of_them_failed) { + auto can_exit = make_lw_shared<bool>(false); + return parallel_for_each(boost::irange(0, 2), [can_exit] (int i) { + return later().then([i, can_exit] { + if (i == 1) { + throw expected_exception(); + } else { + using namespace std::chrono_literals; + return sleep(300ms).then([can_exit] { + *can_exit = true; + }); + } + }); + }).then_wrapped([can_exit] (auto&& f) { + try { + f.get(); + } catch (...) { + // expected + } + BOOST_REQUIRE(*can_exit); + }); +} + +SEASTAR_THREAD_TEST_CASE(test_parallel_for_each_broken_promise) { + auto fut = [] { + std::vector<promise<>> v(2); + return parallel_for_each(v, [] (promise<>& p) { + return p.get_future(); + }); + }(); + BOOST_CHECK_THROW(fut.get(), broken_promise); +} + +SEASTAR_THREAD_TEST_CASE(test_repeat_broken_promise) { + auto get_fut = [] { + promise<stop_iteration> pr; + return pr.get_future(); + }; + + future<> r = repeat([fut = get_fut()] () mutable { + return std::move(fut); + }); + + BOOST_CHECK_THROW(r.get(), broken_promise); +} + +#ifndef SEASTAR_SHUFFLE_TASK_QUEUE +SEASTAR_TEST_CASE(test_high_priority_task_runs_in_the_middle_of_loops) { + auto counter = make_lw_shared<int>(0); + auto flag = make_lw_shared<bool>(false); + return repeat([counter, flag] { + if (*counter == 1) { + BOOST_REQUIRE(*flag); + return stop_iteration::yes; + } + engine().add_high_priority_task(make_task([flag] { + *flag = true; + })); + ++(*counter); + return stop_iteration::no; + }); +} +#endif + +SEASTAR_TEST_CASE(futurize_invoke_val_exception) { + return futurize_invoke([] (int arg) { throw expected_exception(); return arg; }, 1).then_wrapped([] (future<int> f) { + try { + f.get(); + BOOST_FAIL("should have thrown"); + } catch (expected_exception& e) {} + }); +} + +SEASTAR_TEST_CASE(futurize_invoke_val_ok) { + return futurize_invoke([] (int arg) { return arg * 2; }, 2).then_wrapped([] (future<int> f) { + try { + auto x = f.get0(); + BOOST_REQUIRE_EQUAL(x, 4); + } catch (expected_exception& e) { + BOOST_FAIL("should not have thrown"); + } + }); +} + +SEASTAR_TEST_CASE(futurize_invoke_val_future_exception) { + return futurize_invoke([] (int a) { + return sleep(std::chrono::milliseconds(100)).then([] { + throw expected_exception(); + return make_ready_future<int>(0); + }); + }, 0).then_wrapped([] (future<int> f) { + try { + f.get(); + BOOST_FAIL("should have thrown"); + } catch (expected_exception& e) { } + }); +} + +SEASTAR_TEST_CASE(futurize_invoke_val_future_ok) { + return futurize_invoke([] (int a) { + return sleep(std::chrono::milliseconds(100)).then([a] { + return make_ready_future<int>(a * 100); + }); + }, 2).then_wrapped([] (future<int> f) { + try { + auto x = f.get0(); + BOOST_REQUIRE_EQUAL(x, 200); + } catch (expected_exception& e) { + BOOST_FAIL("should not have thrown"); + } + }); +} +SEASTAR_TEST_CASE(futurize_invoke_void_exception) { + return futurize_invoke([] (auto arg) { throw expected_exception(); }, 0).then_wrapped([] (future<> f) { + try { + f.get(); + BOOST_FAIL("should have thrown"); + } catch (expected_exception& e) {} + }); +} + +SEASTAR_TEST_CASE(futurize_invoke_void_ok) { + return futurize_invoke([] (auto arg) { }, 0).then_wrapped([] (future<> f) { + try { + f.get(); + } catch (expected_exception& e) { + BOOST_FAIL("should not have thrown"); + } + }); +} + +SEASTAR_TEST_CASE(futurize_invoke_void_future_exception) { + return futurize_invoke([] (auto a) { + return sleep(std::chrono::milliseconds(100)).then([] { + throw expected_exception(); + }); + }, 0).then_wrapped([] (future<> f) { + try { + f.get(); + BOOST_FAIL("should have thrown"); + } catch (expected_exception& e) { } + }); +} + +SEASTAR_TEST_CASE(futurize_invoke_void_future_ok) { + auto a = make_lw_shared<int>(1); + return futurize_invoke([] (int& a) { + return sleep(std::chrono::milliseconds(100)).then([&a] { + a *= 100; + }); + }, *a).then_wrapped([a] (future<> f) { + try { + f.get(); + BOOST_REQUIRE_EQUAL(*a, 100); + } catch (expected_exception& e) { + BOOST_FAIL("should not have thrown"); + } + }); +} + +SEASTAR_TEST_CASE(test_unused_shared_future_is_not_a_broken_future) { + promise<> p; + shared_future<> s(p.get_future()); + return make_ready_future<>(); +} + +SEASTAR_TEST_CASE(test_shared_future_propagates_value_to_all) { + return seastar::async([] { + promise<shared_ptr<int>> p; // shared_ptr<> to check it deals with emptyable types + shared_future<shared_ptr<int>> f(p.get_future()); + + auto f1 = f.get_future(); + auto f2 = f.get_future(); + + p.set_value(make_shared<int>(1)); + BOOST_REQUIRE(*f1.get0() == 1); + BOOST_REQUIRE(*f2.get0() == 1); + }); +} + +template<typename... T> +void check_fails_with_expected(future<T...> f) { + try { + f.get(); + BOOST_FAIL("Should have failed"); + } catch (expected_exception&) { + // expected + } +} + +SEASTAR_TEST_CASE(test_shared_future_propagates_value_to_copies) { + return seastar::async([] { + promise<int> p; + auto sf1 = shared_future<int>(p.get_future()); + auto sf2 = sf1; + + auto f1 = sf1.get_future(); + auto f2 = sf2.get_future(); + + p.set_value(1); + + BOOST_REQUIRE(f1.get0() == 1); + BOOST_REQUIRE(f2.get0() == 1); + }); +} + +SEASTAR_TEST_CASE(test_obtaining_future_from_shared_future_after_it_is_resolved) { + promise<int> p1; + promise<int> p2; + auto sf1 = shared_future<int>(p1.get_future()); + auto sf2 = shared_future<int>(p2.get_future()); + p1.set_value(1); + p2.set_exception(expected_exception()); + return sf2.get_future().then_wrapped([f1 = sf1.get_future()] (auto&& f) mutable { + check_fails_with_expected(std::move(f)); + return std::move(f1); + }).then_wrapped([] (auto&& f) { + BOOST_REQUIRE(f.get0() == 1); + }); +} + +SEASTAR_TEST_CASE(test_valueless_shared_future) { + return seastar::async([] { + promise<> p; + shared_future<> f(p.get_future()); + + auto f1 = f.get_future(); + auto f2 = f.get_future(); + + p.set_value(); + + f1.get(); + f2.get(); + }); +} + +SEASTAR_TEST_CASE(test_shared_future_propagates_errors_to_all) { + promise<int> p; + shared_future<int> f(p.get_future()); + + auto f1 = f.get_future(); + auto f2 = f.get_future(); + + p.set_exception(expected_exception()); + + return f1.then_wrapped([f2 = std::move(f2)] (auto&& f) mutable { + check_fails_with_expected(std::move(f)); + return std::move(f2); + }).then_wrapped([] (auto&& f) mutable { + check_fails_with_expected(std::move(f)); + }); +} + +SEASTAR_TEST_CASE(test_ignored_future_warning) { + // This doesn't warn: + promise<> p; + p.set_exception(expected_exception()); + future<> f = p.get_future(); + f.ignore_ready_future(); + + // And by analogy, neither should this + shared_promise<> p2; + p2.set_exception(expected_exception()); + future<> f2 = p2.get_shared_future(); + f2.ignore_ready_future(); + return make_ready_future<>(); +} + +SEASTAR_TEST_CASE(test_futurize_from_tuple) { + std::tuple<int> v1 = std::make_tuple(3); + std::tuple<> v2 = {}; + future<int> fut1 = futurize<int>::from_tuple(v1); + future<> fut2 = futurize<void>::from_tuple(v2); + BOOST_REQUIRE(fut1.get0() == std::get<0>(v1)); +#if SEASTAR_API_LEVEL < 5 + BOOST_REQUIRE(fut2.get() == v2); +#endif + return make_ready_future<>(); +} + +SEASTAR_TEST_CASE(test_repeat_until_value) { + return do_with(int(), [] (int& counter) { + return repeat_until_value([&counter] () -> future<std::optional<int>> { + if (counter == 10000) { + return make_ready_future<std::optional<int>>(counter); + } else { + ++counter; + return make_ready_future<std::optional<int>>(std::nullopt); + } + }).then([&counter] (int result) { + BOOST_REQUIRE(counter == 10000); + BOOST_REQUIRE(result == counter); + }); + }); +} + +SEASTAR_TEST_CASE(test_repeat_until_value_implicit_future) { + // Same as above, but returning std::optional<int> instead of future<std::optional<int>> + return do_with(int(), [] (int& counter) { + return repeat_until_value([&counter] { + if (counter == 10000) { + return std::optional<int>(counter); + } else { + ++counter; + return std::optional<int>(std::nullopt); + } + }).then([&counter] (int result) { + BOOST_REQUIRE(counter == 10000); + BOOST_REQUIRE(result == counter); + }); + }); +} + +SEASTAR_TEST_CASE(test_repeat_until_value_exception) { + return repeat_until_value([] { + throw expected_exception(); + return std::optional<int>(43); + }).then_wrapped([] (future<int> f) { + check_fails_with_expected(std::move(f)); + }); +} + +SEASTAR_TEST_CASE(test_when_allx) { + return when_all(later(), later(), make_ready_future()).discard_result(); +} + +// A noncopyable and nonmovable struct +struct non_copy_non_move { + non_copy_non_move() = default; + non_copy_non_move(non_copy_non_move&&) = delete; + non_copy_non_move(const non_copy_non_move&) = delete; +}; + +SEASTAR_TEST_CASE(test_when_all_functions) { + auto f = [x = non_copy_non_move()] { + (void)x; + return make_ready_future<int>(42); + }; + return when_all(f, [] { + throw 42; + return make_ready_future<>(); + }, later()).then([] (std::tuple<future<int>, future<>, future<>> res) { + BOOST_REQUIRE_EQUAL(std::get<0>(res).get0(), 42); + + BOOST_REQUIRE(std::get<1>(res).available()); + BOOST_REQUIRE(std::get<1>(res).failed()); + std::get<1>(res).ignore_ready_future(); + + BOOST_REQUIRE(std::get<2>(res).available()); + BOOST_REQUIRE(!std::get<2>(res).failed()); + return make_ready_future<>(); + }); +} + +SEASTAR_TEST_CASE(test_when_all_succeed_functions) { + auto f = [x = non_copy_non_move()] { + (void)x; + return make_ready_future<int>(42); + }; + return when_all_succeed(f, [] { + throw 42; + return make_ready_future<>(); + }, later()).then_wrapped([] (auto res) { // type of `res` changes when SESTAR_API_LEVEL < 3 + BOOST_REQUIRE(res.available()); + BOOST_REQUIRE(res.failed()); + res.ignore_ready_future(); + return make_ready_future<>(); + }); +} + +template<typename E, typename... T> +static void check_failed_with(future<T...>&& f) { + BOOST_REQUIRE(f.failed()); + try { + f.get(); + BOOST_FAIL("exception expected"); + } catch (const E& e) { + // expected + } catch (...) { + BOOST_FAIL(format("wrong exception: {}", std::current_exception())); + } +} + +template<typename... T> +static void check_timed_out(future<T...>&& f) { + check_failed_with<timed_out_error>(std::move(f)); +} + +SEASTAR_TEST_CASE(test_with_timeout_when_it_times_out) { + return seastar::async([] { + promise<> pr; + auto f = with_timeout(manual_clock::now() + 2s, pr.get_future()); + + BOOST_REQUIRE(!f.available()); + + manual_clock::advance(1s); + later().get(); + + BOOST_REQUIRE(!f.available()); + + manual_clock::advance(1s); + later().get(); + + check_timed_out(std::move(f)); + + pr.set_value(); + }); +} + +SEASTAR_THREAD_TEST_CASE(test_shared_future_get_future_after_timeout) { + // This used to crash because shared_future checked if the list of + // pending futures was empty to decide if it had already called + // then_wrapped. If all pending futures timed out, it would call + // it again. + promise<> pr; + shared_future<with_clock<manual_clock>> sfut(pr.get_future()); + future<> fut1 = sfut.get_future(manual_clock::now() + 1s); + + manual_clock::advance(1s); + + check_timed_out(std::move(fut1)); + + future<> fut2 = sfut.get_future(manual_clock::now() + 1s); + manual_clock::advance(1s); + check_timed_out(std::move(fut2)); + + future<> fut3 = sfut.get_future(manual_clock::now() + 1s); + pr.set_value(); + fut3.get(); +} + +SEASTAR_TEST_CASE(test_custom_exception_factory_in_with_timeout) { + return seastar::async([] { + class custom_error : public std::exception { + public: + virtual const char* what() const noexcept { + return "timedout"; + } + }; + struct my_exception_factory { + static auto timeout() { + return custom_error(); + } + }; + promise<> pr; + auto f = with_timeout<my_exception_factory>(manual_clock::now() + 1s, pr.get_future()); + + manual_clock::advance(1s); + later().get(); + + check_failed_with<custom_error>(std::move(f)); + }); +} + +SEASTAR_TEST_CASE(test_with_timeout_when_it_does_not_time_out) { + return seastar::async([] { + { + promise<int> pr; + auto f = with_timeout(manual_clock::now() + 1s, pr.get_future()); + + pr.set_value(42); + + BOOST_REQUIRE_EQUAL(f.get0(), 42); + } + + // Check that timer was indeed cancelled + manual_clock::advance(1s); + later().get(); + }); +} + +SEASTAR_TEST_CASE(test_shared_future_with_timeout) { + return seastar::async([] { + shared_promise<with_clock<manual_clock>, int> pr; + auto f1 = pr.get_shared_future(manual_clock::now() + 1s); + auto f2 = pr.get_shared_future(manual_clock::now() + 2s); + auto f3 = pr.get_shared_future(); + + BOOST_REQUIRE(!f1.available()); + BOOST_REQUIRE(!f2.available()); + BOOST_REQUIRE(!f3.available()); + + manual_clock::advance(1s); + later().get(); + + check_timed_out(std::move(f1)); + BOOST_REQUIRE(!f2.available()); + BOOST_REQUIRE(!f3.available()); + + manual_clock::advance(1s); + later().get(); + + check_timed_out(std::move(f2)); + BOOST_REQUIRE(!f3.available()); + + pr.set_value(42); + + BOOST_REQUIRE_EQUAL(42, f3.get0()); + }); +} + +#if SEASTAR_API_LEVEL < 4 +#define THEN_UNPACK then +#else +#define THEN_UNPACK then_unpack +#endif + +SEASTAR_TEST_CASE(test_when_all_succeed_tuples) { + return seastar::when_all_succeed( + make_ready_future<>(), + make_ready_future<sstring>("hello world"), + make_ready_future<int>(42), + make_ready_future<>(), + make_ready_future<std::tuple<int, sstring>>(std::tuple(84, "hi")), + make_ready_future<bool>(true) + ).THEN_UNPACK([] (sstring msg, int v, std::tuple<int, sstring> t, bool b) { + BOOST_REQUIRE_EQUAL(msg, "hello world"); + BOOST_REQUIRE_EQUAL(v, 42); + BOOST_REQUIRE_EQUAL(std::get<0>(t), 84); + BOOST_REQUIRE_EQUAL(std::get<1>(t), "hi"); + BOOST_REQUIRE_EQUAL(b, true); + + return seastar::when_all_succeed( + make_exception_future<>(42), + make_ready_future<sstring>("hello world"), + make_exception_future<int>(43), + make_ready_future<>() + ).THEN_UNPACK([] (sstring, int) { + BOOST_FAIL("shouldn't reach"); + return false; + }).handle_exception([] (auto excp) { + try { + std::rethrow_exception(excp); + } catch (int v) { + BOOST_REQUIRE(v == 42 || v == 43); + return true; + } catch (...) { } + return false; + }).then([] (auto ret) { + BOOST_REQUIRE(ret); + }); + }); +} + +SEASTAR_TEST_CASE(test_when_all_succeed_vector) { + std::vector<future<>> vecs; + vecs.emplace_back(make_ready_future<>()); + vecs.emplace_back(make_ready_future<>()); + vecs.emplace_back(make_ready_future<>()); + vecs.emplace_back(make_ready_future<>()); + return seastar::when_all_succeed(vecs.begin(), vecs.end()).then([] { + std::vector<future<>> vecs; + vecs.emplace_back(make_ready_future<>()); + vecs.emplace_back(make_ready_future<>()); + vecs.emplace_back(make_exception_future<>(42)); + vecs.emplace_back(make_exception_future<>(43)); + return seastar::when_all_succeed(vecs.begin(), vecs.end()); + }).then([] { + BOOST_FAIL("shouldn't reach"); + return false; + }).handle_exception([] (auto excp) { + try { + std::rethrow_exception(excp); + } catch (int v) { + BOOST_REQUIRE(v == 42 || v == 43); + return true; + } catch (...) { } + return false; + }).then([] (auto ret) { + BOOST_REQUIRE(ret); + + std::vector<future<int>> vecs; + vecs.emplace_back(make_ready_future<int>(1)); + vecs.emplace_back(make_ready_future<int>(2)); + vecs.emplace_back(make_ready_future<int>(3)); + return seastar::when_all_succeed(vecs.begin(), vecs.end()); + }).then([] (std::vector<int> vals) { + BOOST_REQUIRE_EQUAL(vals.size(), 3u); + BOOST_REQUIRE_EQUAL(vals[0], 1); + BOOST_REQUIRE_EQUAL(vals[1], 2); + BOOST_REQUIRE_EQUAL(vals[2], 3); + + std::vector<future<int>> vecs; + vecs.emplace_back(make_ready_future<int>(1)); + vecs.emplace_back(make_ready_future<int>(2)); + vecs.emplace_back(make_exception_future<int>(42)); + vecs.emplace_back(make_exception_future<int>(43)); + return seastar::when_all_succeed(vecs.begin(), vecs.end()); + }).then([] (std::vector<int>) { + BOOST_FAIL("shouldn't reach"); + return false; + }).handle_exception([] (auto excp) { + try { + std::rethrow_exception(excp); + } catch (int v) { + BOOST_REQUIRE(v == 42 || v == 43); + return true; + } catch (...) { } + return false; + }).then([] (auto ret) { + BOOST_REQUIRE(ret); + }); +} + +SEASTAR_TEST_CASE(test_futurize_mutable) { + int count = 0; + return seastar::repeat([count]() mutable { + ++count; + if (count == 3) { + return seastar::stop_iteration::yes; + } + return seastar::stop_iteration::no; + }); +} + +SEASTAR_THREAD_TEST_CASE(test_broken_promises) { + std::optional<future<>> f; + std::optional<future<>> f2; + { // Broken after attaching a continuation + auto p = promise<>(); + f = p.get_future(); + f2 = f->then_wrapped([&] (future<> f3) { + BOOST_CHECK(f3.failed()); + BOOST_CHECK_THROW(f3.get(), broken_promise); + f = { }; + }); + } + f2->get(); + BOOST_CHECK(!f); + + { // Broken before attaching a continuation + auto p = promise<>(); + f = p.get_future(); + } + f->then_wrapped([&] (future<> f3) { + BOOST_CHECK(f3.failed()); + BOOST_CHECK_THROW(f3.get(), broken_promise); + f = { }; + }).get(); + BOOST_CHECK(!f); + + { // Broken before suspending a thread + auto p = promise<>(); + f = p.get_future(); + } + BOOST_CHECK_THROW(f->get(), broken_promise); +} + +SEASTAR_TEST_CASE(test_warn_on_broken_promise_with_no_future) { + // Example code where we expect a "Exceptional future ignored" + // warning. + promise<> p; + // Intentionally destroy the future + (void)p.get_future(); + + with_allow_abandoned_failed_futures(1, [&] { + p.set_exception(std::runtime_error("foo")); + }); + + return make_ready_future<>(); +} + +SEASTAR_THREAD_TEST_CASE(test_exception_future_with_backtrace) { + int counter = 0; + auto inner = [&] (bool return_exception) mutable { + if (!return_exception) { + return make_ready_future<int>(++counter); + } else { + return make_exception_future_with_backtrace<int>(expected_exception()); + } + }; + auto outer = [&] (bool return_exception) { + return inner(return_exception).then([] (int i) { + return make_ready_future<int>(-i); + }); + }; + + BOOST_REQUIRE_EQUAL(outer(false).get0(), -1); + BOOST_REQUIRE_EQUAL(counter, 1); + + BOOST_CHECK_THROW(outer(true).get0(), expected_exception); + BOOST_REQUIRE_EQUAL(counter, 1); + + // Example code where we expect a "Exceptional future ignored" + // warning. + (void)outer(true).then_wrapped([](future<int> fut) { + with_allow_abandoned_failed_futures(1, [fut = std::move(fut)]() mutable { + auto foo = std::move(fut); + }); + }); +} + +class throw_on_move { + int _i; +public: + throw_on_move(int i = 0) noexcept { + _i = i; + } + throw_on_move(const throw_on_move&) = delete; + throw_on_move(throw_on_move&&) { + _i = -1; + throw expected_exception(); + } + + int value() const { + return _i; + } +}; + +SEASTAR_TEST_CASE(test_async_throw_on_move) { + return async([] (throw_on_move t) { + BOOST_CHECK(false); + }, throw_on_move()).handle_exception_type([] (const expected_exception&) { + return make_ready_future<>(); + }); +} + +future<> func4() { + return later().then([] { + seastar_logger.info("backtrace: {}", current_backtrace()); + }); +} + +void func3() { + seastar::async([] { + func4().get(); + }).get(); +} + +future<> func2() { + return seastar::async([] { + func3(); + }); +} + +future<> func1() { + return later().then([] { + return func2(); + }); +} + +SEASTAR_THREAD_TEST_CASE(test_backtracing) { + func1().get(); +} + +SEASTAR_THREAD_TEST_CASE(test_then_unpack) { + make_ready_future<std::tuple<>>().then_unpack([] () { + BOOST_REQUIRE(true); + }).get(); + make_ready_future<std::tuple<int>>(std::tuple<int>(1)).then_unpack([] (int x) { + BOOST_REQUIRE(x == 1); + }).get(); + make_ready_future<std::tuple<int, long>>(std::tuple<int, long>(1, 2)).then_unpack([] (int x, long y) { + BOOST_REQUIRE(x == 1 && y == 2); + }).get(); + make_ready_future<std::tuple<std::unique_ptr<int>>>(std::tuple(std::make_unique<int>(42))).then_unpack([] (std::unique_ptr<int> p1) { + BOOST_REQUIRE(*p1 == 42); + }).get(); +} + +future<> test_then_function_f() { + return make_ready_future<>(); +} + +SEASTAR_TEST_CASE(test_then_function) { + return make_ready_future<>().then(test_then_function_f); +} + +SEASTAR_THREAD_TEST_CASE(test_with_gate) { + gate g; + int counter = 0; + int gate_closed_errors = 0; + int other_errors = 0; + + // test normal operation when gate is opened + BOOST_CHECK_NO_THROW(with_gate(g, [&] { counter++; }).get()); + BOOST_REQUIRE_EQUAL(counter, 1); + + // test that an exception returned by the calling func + // is propagated to with_gate future + counter = gate_closed_errors = other_errors = 0; + BOOST_CHECK_NO_THROW(with_gate(g, [&] { + counter++; + return make_exception_future<>(expected_exception()); + }).handle_exception_type([&] (gate_closed_exception& e) { + gate_closed_errors++; + }).handle_exception([&] (std::exception_ptr) { + other_errors++; + }).get()); + BOOST_REQUIRE(counter); + BOOST_REQUIRE(!gate_closed_errors); + BOOST_REQUIRE(other_errors); + + g.close().get(); + + // test that with_gate.get() throws when the gate is closed + counter = gate_closed_errors = other_errors = 0; + BOOST_CHECK_THROW(with_gate(g, [&] { counter++; }).get(), gate_closed_exception); + BOOST_REQUIRE(!counter); + + // test that with_gate throws when the gate is closed + counter = gate_closed_errors = other_errors = 0; + BOOST_CHECK_THROW(with_gate(g, [&] { + counter++; + }).then_wrapped([&] (future<> f) { + auto eptr = f.get_exception(); + try { + std::rethrow_exception(eptr); + } catch (gate_closed_exception& e) { + gate_closed_errors++; + } catch (...) { + other_errors++; + } + }).get(), gate_closed_exception); + BOOST_REQUIRE(!counter); + BOOST_REQUIRE(!gate_closed_errors); + BOOST_REQUIRE(!other_errors); + + // test that try_with_gate returns gate_closed_exception when the gate is closed + counter = gate_closed_errors = other_errors = 0; + try_with_gate(g, [&] { counter++; }).handle_exception_type([&] (gate_closed_exception& e) { + gate_closed_errors++; + }).handle_exception([&] (std::exception_ptr) { + other_errors++; + }).get(); + BOOST_REQUIRE(!counter); + BOOST_REQUIRE(gate_closed_errors); + BOOST_REQUIRE(!other_errors); +} + +SEASTAR_THREAD_TEST_CASE(test_max_concurrent_for_each) { + BOOST_TEST_MESSAGE("empty range"); + max_concurrent_for_each(std::vector<int>(), 3, [] (int) { + BOOST_FAIL("should not reach"); + return make_exception_future<>(std::bad_function_call()); + }).get(); + + auto range = boost::copy_range<std::vector<int>>(boost::irange(1, 8)); + + BOOST_TEST_MESSAGE("iterator"); + auto sum = 0; + max_concurrent_for_each(range.begin(), range.end(), 3, [&sum] (int v) { + sum += v; + return make_ready_future<>(); + }).get(); + BOOST_REQUIRE_EQUAL(sum, 28); + + BOOST_TEST_MESSAGE("const iterator"); + sum = 0; + max_concurrent_for_each(range.cbegin(), range.cend(), 3, [&sum] (int v) { + sum += v; + return make_ready_future<>(); + }).get(); + BOOST_REQUIRE_EQUAL(sum, 28); + + BOOST_TEST_MESSAGE("reverse iterator"); + sum = 0; + max_concurrent_for_each(range.rbegin(), range.rend(), 3, [&sum] (int v) { + sum += v; + return make_ready_future<>(); + }).get(); + BOOST_REQUIRE_EQUAL(sum, 28); + + BOOST_TEST_MESSAGE("immediate result"); + sum = 0; + max_concurrent_for_each(range, 3, [&sum] (int v) { + sum += v; + return make_ready_future<>(); + }).get(); + BOOST_REQUIRE_EQUAL(sum, 28); + + BOOST_TEST_MESSAGE("suspend"); + sum = 0; + max_concurrent_for_each(range, 3, [&sum] (int v) { + return later().then([&sum, v] { + sum += v; + }); + }).get(); + BOOST_REQUIRE_EQUAL(sum, 28); + + BOOST_TEST_MESSAGE("throw immediately"); + sum = 0; + BOOST_CHECK_EXCEPTION(max_concurrent_for_each(range, 3, [&sum] (int v) { + sum += v; + if (v == 1) { + throw 5; + } + return make_ready_future<>(); + }).get(), int, [] (int v) { return v == 5; }); + BOOST_REQUIRE_EQUAL(sum, 28); + + BOOST_TEST_MESSAGE("throw after suspension"); + sum = 0; + BOOST_CHECK_EXCEPTION(max_concurrent_for_each(range, 3, [&sum] (int v) { + return later().then([&sum, v] { + sum += v; + if (v == 2) { + throw 5; + } + }); + }).get(), int, [] (int v) { return v == 5; }); + + BOOST_TEST_MESSAGE("concurrency higher than vector length"); + sum = 0; + max_concurrent_for_each(range, range.size() + 3, [&sum] (int v) { + sum += v; + return make_ready_future<>(); + }).get(); + BOOST_REQUIRE_EQUAL(sum, 28); +} |