Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
35225a6
Add parallel_scheduler scaffold
JorgeV92 May 1, 2026
7997189
added receiver proxy
JorgeV92 May 4, 2026
fb72e92
Add receiver_proxy module export
JorgeV92 May 4, 2026
a38059e
added correct import
JorgeV92 May 4, 2026
14fd292
added correct import
JorgeV92 May 4, 2026
ffda8ba
fixed syntax for system_context_replaceability
JorgeV92 May 4, 2026
9c686ef
added bulk_item_receiver_proxy
JorgeV92 May 4, 2026
089d2a7
added parallel_scheduler_backend
JorgeV92 May 4, 2026
9cf288f
added parallel_scheduler
JorgeV92 May 4, 2026
eb7709a
fixed format
JorgeV92 May 4, 2026
51afa7a
Merge remote-tracking branch 'upstream/main' into parallel_scheduler
JorgeV92 May 14, 2026
f2a887e
fixed modified imports with scheduler_tag
JorgeV92 May 14, 2026
f761d3c
added parallel_scheduler::sender class
JorgeV92 May 14, 2026
7363e37
can y
JorgeV92 May 21, 2026
5b36756
add import for schedule_result_t
JorgeV92 May 21, 2026
dcfa4b7
fixed typo
JorgeV92 May 21, 2026
c2c7762
format fix
JorgeV92 May 21, 2026
71073e6
Address issue #83
Cra3z Jun 26, 2026
61fdc9a
Re-enable the remaining test cases disabled for MSVC
Cra3z Jun 26, 2026
0a1b062
Format code
Cra3z Jun 26, 2026
f5e8099
Revert exec-spawn-future.test.cpp
Cra3z Jun 26, 2026
0f78112
Merge branch 'main' into parallel_scheduler
Cra3z Jul 3, 2026
ec7f354
Rename namespace `system_context_replaceability` to `parallel_schedul…
Cra3z Jul 3, 2026
bc19472
Format code
Cra3z Jul 3, 2026
5dd01d9
Implement bulk customization for parallel-scheduler
Cra3z Jul 3, 2026
622181a
Fix parallel scheduler domain query
Cra3z Jul 4, 2026
2fd2264
Add definition of `get_parallel_scheduler`
Cra3z Jul 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions include/beman/execution/detail/get_completion_domain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ struct get_completion_domain_t : ::beman::execution::forwarding_query_t {
::beman::execution::detail::try_query(
::beman::execution::get_completion_scheduler<Tag>(q, e...),
get_completion_domain_t<::beman::execution::set_value_t>{},
::std::forward<Q>(q),
::std::forward<E>(e)...);
}) {
return ::beman::execution::detail::try_query(::beman::execution::get_completion_scheduler<Tag>(q, e...),
get_completion_domain_t<::beman::execution::set_value_t>{},
::std::forward<Q>(q),
::std::forward<E>(e)...);
} else if constexpr (::beman::execution::scheduler<Q> && 0u != sizeof...(E)) {
return ::beman::execution::default_domain{};
Expand Down
420 changes: 420 additions & 0 deletions include/beman/execution/detail/parallel_scheduler.hpp

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions include/beman/execution/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import beman.execution.detail.let;
import beman.execution.detail.matching_sig;
import beman.execution.detail.on;
import beman.execution.detail.operation_state;
import beman.execution.detail.parallel_scheduler;
import beman.execution.detail.prop;
import beman.execution.detail.read_env;
import beman.execution.detail.receiver;
Expand Down Expand Up @@ -107,6 +108,7 @@ import beman.execution.detail.write_env;
#include <beman/execution/detail/matching_sig.hpp>
#include <beman/execution/detail/on.hpp>
#include <beman/execution/detail/operation_state.hpp>
#include <beman/execution/detail/parallel_scheduler.hpp>
#include <beman/execution/detail/prop.hpp>
#include <beman/execution/detail/read_env.hpp>
#include <beman/execution/detail/receiver.hpp>
Expand Down
9 changes: 9 additions & 0 deletions include/beman/execution26/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ using ::beman::execution::get_domain;
using ::beman::execution::get_domain_t;
using ::beman::execution::get_env;
using ::beman::execution::get_env_t;
using ::beman::execution::get_parallel_scheduler;
using ::beman::execution::get_scheduler;
using ::beman::execution::get_scheduler_t;
using ::beman::execution::get_stop_token;
Expand All @@ -62,6 +63,7 @@ using ::beman::execution::let_value;
using ::beman::execution::let_value_t;
using ::beman::execution::operation_state;
using ::beman::execution::operation_state_tag;
using ::beman::execution::parallel_scheduler;
using ::beman::execution::read_env;
using ::beman::execution::receiver;
using ::beman::execution::receiver_of;
Expand Down Expand Up @@ -107,6 +109,13 @@ using ::beman::execution::when_all_with_variant;
using ::beman::execution::when_all_with_variant_t;
using ::beman::execution::with_awaitable_senders;

namespace parallel_scheduler_replacement {
using ::beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy;
using ::beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend;
using ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend;
using ::beman::execution::parallel_scheduler_replacement::receiver_proxy;
} // namespace parallel_scheduler_replacement

} // namespace beman::execution26

// ----------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions src/beman/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ target_sources(
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/on_stop_request.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/operation_state.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/operation_state_task.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/parallel_scheduler.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/product_type.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/poly.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution/detail/promise_env.hpp
Expand Down Expand Up @@ -350,6 +351,7 @@ if(BEMAN_USE_MODULES)
on.cppm
operation_state_task.cppm
operation_state.cppm
parallel_scheduler.cppm
product_type.cppm
prop.cppm
query_with_default.cppm
Expand Down
1 change: 1 addition & 0 deletions src/beman/execution/execution-detail.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export import beman.execution.detail.non_assignable;
export import beman.execution.detail.nothrow_callable;
export import beman.execution.detail.notify;
export import beman.execution.detail.operation_state_task;
export import beman.execution.detail.parallel_scheduler;
export import beman.execution.detail.product_type;
export import beman.execution.detail.query_with_default;
export import beman.execution.detail.queryable;
Expand Down
12 changes: 12 additions & 0 deletions src/beman/execution/execution.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import beman.execution.detail.never_stop_token;
import beman.execution.detail.nostopstate;
import beman.execution.detail.on;
export import beman.execution.detail.operation_state; // [exec.opstate], operation states
import beman.execution.detail.parallel_scheduler;
import beman.execution.detail.prop;
import beman.execution.detail.read_env;
import beman.execution.detail.run_loop;
Expand Down Expand Up @@ -242,6 +243,17 @@ export using ::beman::execution::stopped_as_error;
// [exec.run.loop], run_loop
export using ::beman::execution::run_loop;

// [exec.parallel.scheduler], parallel scheduler
export using ::beman::execution::parallel_scheduler;
export using ::beman::execution::get_parallel_scheduler;

namespace parallel_scheduler_replacement {
export using ::beman::execution::parallel_scheduler_replacement::receiver_proxy;
export using ::beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy;
export using ::beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend;
export using ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend;
} // namespace parallel_scheduler_replacement

// [exec.consumers], consumers
export using ::beman::execution::sync_wait_t;
export using ::beman::execution::sync_wait_with_variant_t;
Expand Down
19 changes: 19 additions & 0 deletions src/beman/execution/parallel_scheduler.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module;
// src/beman/execution/parallel_scheduler.cppm -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <beman/execution/detail/parallel_scheduler.hpp>

export module beman.execution.detail.parallel_scheduler;

namespace beman::execution {
export using beman::execution::parallel_scheduler;
export using beman::execution::get_parallel_scheduler;

namespace parallel_scheduler_replacement {
export using beman::execution::parallel_scheduler_replacement::receiver_proxy;
export using beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy;
export using beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend;
export using beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend;
} // namespace parallel_scheduler_replacement
} // namespace beman::execution
1 change: 1 addition & 0 deletions tests/beman/execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ list(
exec-just.test
exec-let.test
exec-on.test
exec-parallel-scheduler.test
exec-opstate-start.test
exec-opstate.test
exec-prop.test
Expand Down
4 changes: 0 additions & 4 deletions tests/beman/execution/exec-associate.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ TEST(exec_associate) {
static_assert(std::same_as<test_std::completion_signatures<test_std::set_value_t()>,
test_std::completion_signatures_of_t<snd0_t, test_std::env<>>>);

#ifndef _MSC_VER //-dk:TODO MSVC++ struggles with more than one of these test
using snd1_t = decltype(test_std::associate(test_std::just(std::string{}), null_token{}));
static_assert(std::same_as<test_std::completion_signatures<test_std::set_value_t(std::string)>,
test_std::completion_signatures_of_t<snd1_t, test_std::env<>>>);
Expand All @@ -136,10 +135,8 @@ TEST(exec_associate) {
using snd4_t = decltype(test_std::associate(test_std::just(std::ref(i)), null_token{}));
static_assert(std::same_as<test_std::completion_signatures<test_std::set_value_t(std::reference_wrapper<int>)>,
test_std::completion_signatures_of_t<snd4_t, test_std::env<>>>);
#endif
}

#ifndef _MSC_VER //-dk:TODO MSVC++ struggles with more than one of these test
// Identity behavior with null_token for value path + piping works.
{
{
Expand Down Expand Up @@ -241,5 +238,4 @@ TEST(exec_associate) {
ASSERT(completes_with_value(test_std::just(1) | test_std::associate(null_token{})));
ASSERT(!completes_with_value(test_std::just(1) | test_std::associate(expired_token{})));
}
#endif
}
9 changes: 5 additions & 4 deletions tests/beman/execution/exec-bulk.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,10 +391,10 @@ struct pstl_for_each_sender {
template <typename... Args>
auto set_value(Args&&... args) noexcept -> void {
try {
auto iota = std::views::iota(Shape(0), shape);
std::for_each(policy, std::ranges::begin(iota), std::ranges::end(iota), [&](Shape i) {
auto indices = std::views::iota(Shape(0), shape);
std::for_each(policy, std::ranges::begin(indices), std::ranges::end(indices), [&](Shape i) {
if constexpr (IsChunked) {
std::invoke(fn, i, i + 1, args...);
std::invoke(fn, i, i + Shape(1), args...);
} else {
std::invoke(fn, i, args...);
};
Expand Down Expand Up @@ -430,7 +430,8 @@ struct pstl_for_each_sender {

template <typename Rcvr>
auto connect(Rcvr rcvr) && noexcept {
return test_std::connect(child, receiver{std::move(rcvr), std::move(policy), std::move(shape), std::move(fn)});
return test_std::connect(std::move(child),
receiver{std::move(rcvr), std::move(policy), std::move(shape), std::move(fn)});
}

auto get_env() const noexcept { return test_std::get_env(child); }
Expand Down
10 changes: 4 additions & 6 deletions tests/beman/execution/exec-just.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,17 @@ auto test_just_allocator() -> void {
ASSERT(resource.count == 0u);
auto copy(std::make_obj_using_allocator<std::pmr::string>(std::pmr::polymorphic_allocator<>(&resource), str));
test::use(copy);
ASSERT(resource.count == 1u);
auto old_count = resource.count;
ASSERT(old_count > 0u);

auto env{test_std::get_env(receiver)};
auto alloc{test_std::get_allocator(env)};
test::use(alloc);

ASSERT(resource.count == 1u);
ASSERT(resource.count == old_count);
auto state{test_std::connect(std::move(sender), memory_receiver{&resource})};
test::use(state);
ASSERT(resource.count == 2u);
ASSERT(resource.count > old_count);
}

auto test_completion_signatures() -> void {
Expand All @@ -220,10 +221,7 @@ TEST(exec_just) {
test_just_constraints();
test_just();
test_completion_signatures();
#ifndef _MSC_VER
//-dk:TODO re-enable allocator test for MSVC++
test_just_allocator();
#endif
} catch (...) {
// NOLINTNEXTLINE(cert-dcl03-c,hicpp-static-assert,misc-static-assert)
ASSERT(nullptr == "the just tests shouldn't throw");
Expand Down
83 changes: 83 additions & 0 deletions tests/beman/execution/exec-parallel-scheduler.test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// src/beman/execution/tests/exec-parallel-scheduler.test.cpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <concepts>
#include <cstddef>
#include <exception>
#include <memory>
#include <optional>
#include <span>
#include <type_traits>
#include <utility>
#include <test/execution.hpp>
#ifdef BEMAN_HAS_MODULES
import beman.execution;
import beman.execution.detail.schedule_result_t;
#else
#include <beman/execution/execution.hpp>
#endif

namespace {
namespace replaceability = test_std::parallel_scheduler_replacement;

struct proxy : replaceability::receiver_proxy {
auto set_value() noexcept -> void override {}
auto set_error(::std::exception_ptr) noexcept -> void override {}
auto set_stopped() noexcept -> void override {}
};

struct bulk_proxy : replaceability::bulk_item_receiver_proxy {
auto set_value() noexcept -> void override {}
auto set_error(::std::exception_ptr) noexcept -> void override {}
auto set_stopped() noexcept -> void override {}
auto execute(::std::size_t, ::std::size_t) noexcept -> void override {}
};

struct backend : replaceability::parallel_scheduler_backend {
auto schedule(replaceability::receiver_proxy&, ::std::span<::std::byte>) noexcept -> void override {}
auto schedule_bulk_chunked(::std::size_t,
replaceability::bulk_item_receiver_proxy&,
::std::span<::std::byte>) noexcept -> void override {}
auto schedule_bulk_unchunked(::std::size_t,
replaceability::bulk_item_receiver_proxy&,
::std::span<::std::byte>) noexcept -> void override {}
};

auto test_parallel_scheduler_synopsis() -> void {
static_assert(!::std::default_initializable<test_std::parallel_scheduler>);
static_assert(::std::copy_constructible<test_std::parallel_scheduler>);
static_assert(::std::move_constructible<test_std::parallel_scheduler>);
static_assert(test_std::scheduler<test_std::parallel_scheduler>);

static_assert(::std::same_as<decltype(test_std::get_parallel_scheduler()), test_std::parallel_scheduler>);
static_assert(::std::same_as<test_std::schedule_result_t<test_std::parallel_scheduler>,
test_std::parallel_scheduler::sender>);
static_assert(test_std::sender<test_std::parallel_scheduler::sender>);
static_assert(::std::same_as<decltype(test_std::get_completion_signatures<test_std::parallel_scheduler::sender>()),
test_std::completion_signatures<test_std::set_value_t(),
test_std::set_error_t(::std::exception_ptr),
test_std::set_stopped_t()>>);

static_assert(
noexcept(test_std::get_forward_progress_guarantee(::std::declval<const test_std::parallel_scheduler&>())));
static_assert(::std::same_as<decltype(test_std::get_forward_progress_guarantee(
::std::declval<const test_std::parallel_scheduler&>())),
test_std::forward_progress_guarantee>);
}

auto test_replaceability_synopsis() -> void {
static_assert(::std::is_abstract_v<replaceability::receiver_proxy>);
static_assert(::std::is_abstract_v<replaceability::bulk_item_receiver_proxy>);
static_assert(::std::is_abstract_v<replaceability::parallel_scheduler_backend>);
static_assert(::std::derived_from<bulk_proxy, replaceability::receiver_proxy>);
static_assert(::std::derived_from<backend, replaceability::parallel_scheduler_backend>);
static_assert(::std::same_as<decltype(::std::declval<proxy&>().template try_query<int>(0)), ::std::optional<int>>);
static_assert(::std::same_as<decltype(replaceability::query_parallel_scheduler_backend()),
::std::shared_ptr<replaceability::parallel_scheduler_backend>>);
}
} // namespace

TEST(exec_parallel_scheduler) {
test_parallel_scheduler_synopsis();
test_replaceability_synopsis();
}
9 changes: 3 additions & 6 deletions tests/beman/execution/exec-then.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ struct sender {
}
};

constexpr auto consume_all = [](auto&&...) {};

template <bool Expect>
auto test_has(auto cpo, auto in_sender, auto fun) -> void {
static_assert(test_std::receiver<receiver>);
Expand All @@ -70,14 +72,9 @@ auto test_has(auto cpo, auto in_sender, auto fun) -> void {
static_assert(requires {
{ in_sender | cpo(fun) } -> test_std::sender;
});
#ifndef _MSC_VER
//-dk:TODO re-enable this test
static_assert(requires {
{
in_sender | cpo(fun) | cpo([](auto&&...) {})
} -> test_std::sender;
{ in_sender | cpo(fun) | cpo(consume_all) } -> test_std::sender;
});
#endif
auto sender{cpo(in_sender, fun)};
auto op{test_std::connect(::std::move(sender), receiver{})};
test_std::start(op);
Expand Down
Loading