From 6676ddc187724509d1e7f2acc9737e82fcc173db Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Fri, 19 Jun 2026 23:54:31 +0200 Subject: [PATCH] Report schedule_all start errors --- include/exec/static_thread_pool.hpp | 28 +++++++----- test/exec/test_static_thread_pool.cpp | 62 +++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 10 deletions(-) diff --git a/include/exec/static_thread_pool.hpp b/include/exec/static_thread_pool.hpp index 5f18060f7..6c5605dc9 100644 --- a/include/exec/static_thread_pool.hpp +++ b/include/exec/static_thread_pool.hpp @@ -1723,6 +1723,7 @@ namespace experimental::execution STDEXEC::__manual_lifetime>; std::vector<__manual_lifetime, item_allocator_t> items_; + std::size_t items_constructed_{}; public: operation(Range range, _static_thread_pool& pool, Receiver rcvr) @@ -1734,12 +1735,9 @@ namespace experimental::execution ~operation() { - if (this->has_started_) + for (std::size_t i = 0; i < items_constructed_; ++i) { - for (auto& item: items_) - { - item.__destroy(); - } + items_[i].__destroy(); } } @@ -1753,13 +1751,26 @@ namespace experimental::execution auto& remote_queue = *this->pool_.get_remote_queue(); auto it = std::ranges::begin(this->range_); std::size_t i0 = 0; - while (i0 + chunk_size < size) + STDEXEC_TRY { - for (std::size_t i = i0; i < i0 + chunk_size; ++i) + for (std::size_t i = 0; i < size; ++i) { items_[i].__construct_from(STDEXEC::connect, set_next(this->rcvr_, item_sender_t{this, it + i}), next_receiver_t{this}); + ++items_constructed_; + } + } + STDEXEC_CATCH_ALL + { + STDEXEC::set_error(static_cast(this->rcvr_), std::current_exception()); + return; + } + + while (i0 + chunk_size < size) + { + for (std::size_t i = i0; i < i0 + chunk_size; ++i) + { STDEXEC::start(items_[i].__get()); } @@ -1770,9 +1781,6 @@ namespace experimental::execution } for (std::size_t i = i0; i < size; ++i) { - items_[i].__construct_from(STDEXEC::connect, - set_next(this->rcvr_, item_sender_t{this, it + i}), - next_receiver_t{this}); STDEXEC::start(items_[i].__get()); } std::unique_lock lock{this->start_mutex_}; diff --git a/test/exec/test_static_thread_pool.cpp b/test/exec/test_static_thread_pool.cpp index 4d38f825e..305285e56 100644 --- a/test/exec/test_static_thread_pool.cpp +++ b/test/exec/test_static_thread_pool.cpp @@ -2,11 +2,52 @@ #include #include +#include #include +#include +#include #include #include namespace ex = STDEXEC; +namespace +{ + struct throwing_set_next_receiver + { + using receiver_concept = ex::receiver_tag; + + bool& set_value_called_; + bool& set_stopped_called_; + std::exception_ptr& error_; + + template + auto set_next(Item&&) -> decltype(ex::just()) + { + throw std::runtime_error{"set_next failed"}; + } + + void set_value() noexcept + { + set_value_called_ = true; + } + + void set_stopped() noexcept + { + set_stopped_called_ = true; + } + + void set_error(std::exception_ptr error) noexcept + { + error_ = error; + } + + auto get_env() const noexcept -> ex::env<> + { + return {}; + } + }; +} // namespace + TEST_CASE("static_thread_pool::get_scheduler_on_thread Test start on a specific thread", "[types][static_thread_pool]") { @@ -45,6 +86,27 @@ TEST_CASE("bulk on static_thread_pool executes on multiple threads", "[types][st REQUIRE(thread_ids.size() == num_of_threads); } +TEST_CASE("schedule_all on static_thread_pool sends errors from set_next", + "[types][static_thread_pool]") +{ + exec::static_thread_pool pool{1}; + bool set_value_called = false; + bool set_stopped_called = false; + std::exception_ptr error; + + auto op = exec::subscribe(exec::schedule_all(pool, std::views::iota(0, 1)), + throwing_set_next_receiver{set_value_called, + set_stopped_called, + error}); + + ex::start(op); + + CHECK_FALSE(set_value_called); + CHECK_FALSE(set_stopped_called); + REQUIRE(error); + CHECK_THROWS_AS(std::rethrow_exception(error), std::runtime_error); +} + TEST_CASE("bulk on static_thread_pool executes on multiple threads, take 2", "[types][static_thread_pool]") {