Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions include/exec/static_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,7 @@ namespace experimental::execution
STDEXEC::__manual_lifetime<item_operation_t>>;

std::vector<__manual_lifetime<item_operation_t>, item_allocator_t> items_;
std::size_t items_constructed_{};

public:
operation(Range range, _static_thread_pool& pool, Receiver rcvr)
Expand All @@ -1734,12 +1735,9 @@ namespace experimental::execution

~operation()
{
if (this->has_started_)
for (std::size_t i = 0; i < items_constructed_; ++i)
{
for (auto& item: items_)
{
item.__destroy();
}
items_[i].__destroy();
}
}

Expand All @@ -1753,13 +1751,26 @@ namespace experimental::execution
auto& remote_queue = *this->pool_.get_remote_queue();
auto it = std::ranges::begin(this->range_);
std::size_t i0 = 0;
while (i0 + chunk_size < size)
STDEXEC_TRY
{
for (std::size_t i = i0; i < i0 + chunk_size; ++i)
for (std::size_t i = 0; i < size; ++i)
{
items_[i].__construct_from(STDEXEC::connect,
set_next(this->rcvr_, item_sender_t{this, it + i}),
next_receiver_t{this});
++items_constructed_;
}
}
STDEXEC_CATCH_ALL
{
STDEXEC::set_error(static_cast<Receiver&&>(this->rcvr_), std::current_exception());
return;
}

while (i0 + chunk_size < size)
{
for (std::size_t i = i0; i < i0 + chunk_size; ++i)
{
STDEXEC::start(items_[i].__get());
}

Expand All @@ -1770,9 +1781,6 @@ namespace experimental::execution
}
for (std::size_t i = i0; i < size; ++i)
{
items_[i].__construct_from(STDEXEC::connect,
set_next(this->rcvr_, item_sender_t{this, it + i}),
next_receiver_t{this});
STDEXEC::start(items_[i].__get());
}
std::unique_lock lock{this->start_mutex_};
Expand Down
62 changes: 62 additions & 0 deletions test/exec/test_static_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,52 @@
#include <exec/static_thread_pool.hpp>
#include <stdexec/execution.hpp>

#include <exception>
#include <mutex>
#include <ranges>
#include <stdexcept>
#include <thread>
#include <unordered_set>
namespace ex = STDEXEC;

namespace
{
struct throwing_set_next_receiver
{
using receiver_concept = ex::receiver_tag;

bool& set_value_called_;
bool& set_stopped_called_;
std::exception_ptr& error_;

template <class Item>
auto set_next(Item&&) -> decltype(ex::just())
{
throw std::runtime_error{"set_next failed"};
}

void set_value() noexcept
{
set_value_called_ = true;
}

void set_stopped() noexcept
{
set_stopped_called_ = true;
}

void set_error(std::exception_ptr error) noexcept
{
error_ = error;
}

auto get_env() const noexcept -> ex::env<>
{
return {};
}
};
} // namespace

TEST_CASE("static_thread_pool::get_scheduler_on_thread Test start on a specific thread",
"[types][static_thread_pool]")
{
Expand Down Expand Up @@ -45,6 +86,27 @@ TEST_CASE("bulk on static_thread_pool executes on multiple threads", "[types][st
REQUIRE(thread_ids.size() == num_of_threads);
}

TEST_CASE("schedule_all on static_thread_pool sends errors from set_next",
"[types][static_thread_pool]")
{
exec::static_thread_pool pool{1};
bool set_value_called = false;
bool set_stopped_called = false;
std::exception_ptr error;

auto op = exec::subscribe(exec::schedule_all(pool, std::views::iota(0, 1)),
throwing_set_next_receiver{set_value_called,
set_stopped_called,
error});

ex::start(op);

CHECK_FALSE(set_value_called);
CHECK_FALSE(set_stopped_called);
REQUIRE(error);
CHECK_THROWS_AS(std::rethrow_exception(error), std::runtime_error);
}

TEST_CASE("bulk on static_thread_pool executes on multiple threads, take 2",
"[types][static_thread_pool]")
{
Expand Down