Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions include/exec/static_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1743,11 +1743,15 @@ namespace experimental::execution

void start() & noexcept
{
std::size_t size = items_.size();
std::size_t nthreads = this->pool_.available_parallelism();
std::size_t size = items_.size();
std::size_t nthreads = this->pool_.available_parallelism();
STDEXEC_ASSERT(nthreads > 0);
bwos_params params = this->pool_.params();
std::size_t local_size = params.blockSize * params.numBlocks;
std::size_t chunk_size = __umin({size / nthreads, local_size * nthreads});
std::size_t chunk_size = size == 0
? 0
: __umax({std::size_t{1},
__umin({size / nthreads, local_size * nthreads})});
auto& remote_queue = *this->pool_.get_remote_queue();
auto it = std::ranges::begin(this->range_);
std::size_t i0 = 0;
Expand Down Expand Up @@ -1776,6 +1780,7 @@ namespace experimental::execution

std::unique_lock lock{this->start_mutex_};
this->pool_.bulk_enqueue(remote_queue, std::move(this->tasks_), this->tasks_size_);
this->tasks_size_ = 0;
lock.unlock();
i0 += chunk_size;
}
Expand Down
28 changes: 28 additions & 0 deletions test/exec/test_static_thread_pool.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#include "catch2/catch_all.hpp"
#include "exec/sequence/ignore_all_values.hpp"
#include "exec/sequence/transform_each.hpp"
#include <exec/static_thread_pool.hpp>
#include <stdexec/execution.hpp>

#include <atomic>
#include <exception>
#include <mutex>
#include <ranges>
Expand Down Expand Up @@ -86,6 +89,31 @@ TEST_CASE("bulk on static_thread_pool executes on multiple threads", "[types][st
REQUIRE(thread_ids.size() == num_of_threads);
}

TEST_CASE("schedule_all on static_thread_pool handles ranges smaller than available parallelism",
"[types][static_thread_pool]")
{
constexpr size_t const num_of_threads = 5;
constexpr int const range_size = 3;

exec::static_thread_pool pool{num_of_threads};
REQUIRE(range_size < pool.available_parallelism());

std::atomic<int> count{0};
std::atomic<int> sum{0};
auto sender = exec::schedule_all(pool, std::views::iota(0, range_size))
| exec::transform_each(ex::then(
[&](int x) noexcept
{
count.fetch_add(1, std::memory_order_relaxed);
sum.fetch_add(x, std::memory_order_relaxed);
}))
| exec::ignore_all_values();

CHECK(ex::sync_wait(std::move(sender)));
CHECK(count.load(std::memory_order_relaxed) == range_size);
CHECK(sum.load(std::memory_order_relaxed) == 3);
}

TEST_CASE("schedule_all on static_thread_pool sends errors from set_next",
"[types][static_thread_pool]")
{
Expand Down
Loading