From 35225a659618bb294fdd8585a0a31ca3e13a7875 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Thu, 30 Apr 2026 20:18:22 -0500 Subject: [PATCH 01/25] Add parallel_scheduler scaffold --- .../execution/detail/parallel_scheduler.hpp | 61 +++++++++++++++++++ include/beman/execution/execution.hpp | 2 + src/beman/execution/CMakeLists.txt | 2 + src/beman/execution/execution-detail.cppm | 1 + src/beman/execution/execution.cppm | 5 ++ src/beman/execution/parallel_scheduler.cppm | 12 ++++ 6 files changed, 83 insertions(+) create mode 100644 include/beman/execution/detail/parallel_scheduler.hpp create mode 100644 src/beman/execution/parallel_scheduler.cppm diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp new file mode 100644 index 00000000..0394734f --- /dev/null +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -0,0 +1,61 @@ +// include/beman/execution/detail/parallel_scheduler.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_BEMAN_EXECUTION_DETAIL_PARALLEL_SCHEDULER +#define INCLUDED_BEMAN_EXECUTION_DETAIL_PARALLEL_SCHEDULER + +#include +#ifdef BEMAN_HAS_IMPORT_STD +import std; +#else +#include +#include +#include +#include +#include +#include +#include +#include +#endif +#ifdef BEMAN_HAS_MODULES +import beman.execution.detail.completion_signatures; +import beman.execution.detail.get_completion_scheduler; +import beman.execution.detail.get_forward_progress_guarantee; +import beman.execution.detail.operation_state; +import beman.execution.detail.receiver; +import beman.execution.detail.scheduler; +import beman.execution.detail.scheduler_t; +import beman.execution.detail.sender; +import beman.execution.detail.set_error; +import beman.execution.detail.set_stopped; +import beman.execution.detail.set_value; +#else +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#endif + +// ---------------------------------------------------------------------------- + +namespace beman::execution { + +class parallel_scheduler { + // TODO(P2079R10): add scheduler state and operations. +}; + +// TODO(P2079R10): implement using system_context_replaceability::query_parallel_scheduler_backend(). +auto get_parallel_scheduler() -> parallel_scheduler; + +} // namespace beman::execution + +// ---------------------------------------------------------------------------- + +#endif // INCLUDED_BEMAN_EXECUTION_DETAIL_PARALLEL_SCHEDULER diff --git a/include/beman/execution/execution.hpp b/include/beman/execution/execution.hpp index 5f526386..914a5115 100644 --- a/include/beman/execution/execution.hpp +++ b/include/beman/execution/execution.hpp @@ -39,6 +39,7 @@ import beman.execution.detail.let; import beman.execution.detail.matching_sig; import beman.execution.detail.on; import beman.execution.detail.operation_state; +import beman.execution.detail.parallel_scheduler; import beman.execution.detail.prop; import beman.execution.detail.read_env; import beman.execution.detail.receiver; @@ -101,6 +102,7 @@ import beman.execution.detail.write_env; #include #include #include +#include #include #include #include diff --git a/src/beman/execution/CMakeLists.txt b/src/beman/execution/CMakeLists.txt index 674d52c5..e663edb8 100644 --- a/src/beman/execution/CMakeLists.txt +++ b/src/beman/execution/CMakeLists.txt @@ -133,6 +133,7 @@ target_sources( ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/on_stop_request.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/operation_state.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/operation_state_task.hpp + ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/parallel_scheduler.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/product_type.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/prop.hpp ${PROJECT_SOURCE_DIR}/include/beman/execution/detail/query_with_default.hpp @@ -327,6 +328,7 @@ if(BEMAN_USE_MODULES) on.cppm operation_state_task.cppm operation_state.cppm + parallel_scheduler.cppm product_type.cppm prop.cppm query_with_default.cppm diff --git a/src/beman/execution/execution-detail.cppm b/src/beman/execution/execution-detail.cppm index 568be1bb..6d8a0ad4 100644 --- a/src/beman/execution/execution-detail.cppm +++ b/src/beman/execution/execution-detail.cppm @@ -59,6 +59,7 @@ export import beman.execution.detail.non_assignable; export import beman.execution.detail.nothrow_callable; export import beman.execution.detail.notify; export import beman.execution.detail.operation_state_task; +export import beman.execution.detail.parallel_scheduler; export import beman.execution.detail.product_type; export import beman.execution.detail.query_with_default; export import beman.execution.detail.queryable; diff --git a/src/beman/execution/execution.cppm b/src/beman/execution/execution.cppm index 3de60d3c..c6e4b9d4 100644 --- a/src/beman/execution/execution.cppm +++ b/src/beman/execution/execution.cppm @@ -43,6 +43,7 @@ import beman.execution.detail.never_stop_token; import beman.execution.detail.nostopstate; import beman.execution.detail.on; export import beman.execution.detail.operation_state; // [exec.opstate], operation states +import beman.execution.detail.parallel_scheduler; import beman.execution.detail.prop; import beman.execution.detail.read_env; import beman.execution.detail.run_loop; @@ -237,6 +238,10 @@ export using ::beman::execution::stopped_as_error; // [exec.run.loop], run_loop export using ::beman::execution::run_loop; +// [exec.parallel.scheduler], parallel scheduler +export using ::beman::execution::parallel_scheduler; +export using ::beman::execution::get_parallel_scheduler; + // [exec.consumers], consumers export using ::beman::execution::sync_wait_t; export using ::beman::execution::sync_wait_with_variant_t; diff --git a/src/beman/execution/parallel_scheduler.cppm b/src/beman/execution/parallel_scheduler.cppm new file mode 100644 index 00000000..14076f84 --- /dev/null +++ b/src/beman/execution/parallel_scheduler.cppm @@ -0,0 +1,12 @@ +module; +// src/beman/execution/parallel_scheduler.cppm -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include + +export module beman.execution.detail.parallel_scheduler; + +namespace beman::execution { +export using beman::execution::parallel_scheduler; +export using beman::execution::get_parallel_scheduler; +} // namespace beman::execution From 799718972a3d8aa26214cd5e6d238f0a664166ee Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 19:15:10 -0500 Subject: [PATCH 02/25] added receiver proxy --- .../execution/detail/parallel_scheduler.hpp | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index 0394734f..d30b226f 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -45,6 +45,24 @@ import beman.execution.detail.set_value; // ---------------------------------------------------------------------------- +namespace beman::execution::system_context_replaceability { + +struct receiver_proxy { + virtual ~receiver_proxy() = default; + + virtual auto set_value() noexcept -> void = 0; + virtual auto set_error(::std::exception_ptr) noexcept -> void = 0; + virtual auto set_stopped() noexcept -> void = 0; + + template + requires(::std::same_as> && ::std::is_object_v

&& !::std::is_array_v

) + auto try_query(Query) noexcept -> ::std::optional

{ + // TODO(P2079R10): forward supported receiver environment queries + // through this proxy, especially get_stop_token_t -> inplace_stop_token + return ::std::nullopt; + } +}; + namespace beman::execution { class parallel_scheduler { From fb72e92b3dd8dfe5672e9f59f6d284d3c4c25a36 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 19:44:41 -0500 Subject: [PATCH 03/25] Add receiver_proxy module export --- src/beman/execution/parallel_scheduler.cppm | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/beman/execution/parallel_scheduler.cppm b/src/beman/execution/parallel_scheduler.cppm index 14076f84..234cecbf 100644 --- a/src/beman/execution/parallel_scheduler.cppm +++ b/src/beman/execution/parallel_scheduler.cppm @@ -9,4 +9,8 @@ export module beman.execution.detail.parallel_scheduler; namespace beman::execution { export using beman::execution::parallel_scheduler; export using beman::execution::get_parallel_scheduler; + +namespace system_context_replaceability { +export using beman::execution::system_context_replaceability::query_parallel_scheduler_backend; +} // namespace system_context_replaceability } // namespace beman::execution From a38059efd834d3d7cf32d44ee9f52313a618b603 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 19:46:45 -0500 Subject: [PATCH 04/25] added correct import --- src/beman/execution/parallel_scheduler.cppm | 1 + 1 file changed, 1 insertion(+) diff --git a/src/beman/execution/parallel_scheduler.cppm b/src/beman/execution/parallel_scheduler.cppm index 234cecbf..760cdc45 100644 --- a/src/beman/execution/parallel_scheduler.cppm +++ b/src/beman/execution/parallel_scheduler.cppm @@ -11,6 +11,7 @@ export using beman::execution::parallel_scheduler; export using beman::execution::get_parallel_scheduler; namespace system_context_replaceability { +export using beman::execution::system_context_replaceability::receiver_proxy; export using beman::execution::system_context_replaceability::query_parallel_scheduler_backend; } // namespace system_context_replaceability } // namespace beman::execution From 14fd292b4531f51a12b7741f22c6bf919269ae25 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 19:47:38 -0500 Subject: [PATCH 05/25] added correct import --- src/beman/execution/parallel_scheduler.cppm | 1 - 1 file changed, 1 deletion(-) diff --git a/src/beman/execution/parallel_scheduler.cppm b/src/beman/execution/parallel_scheduler.cppm index 760cdc45..ee434f01 100644 --- a/src/beman/execution/parallel_scheduler.cppm +++ b/src/beman/execution/parallel_scheduler.cppm @@ -12,6 +12,5 @@ export using beman::execution::get_parallel_scheduler; namespace system_context_replaceability { export using beman::execution::system_context_replaceability::receiver_proxy; -export using beman::execution::system_context_replaceability::query_parallel_scheduler_backend; } // namespace system_context_replaceability } // namespace beman::execution From ffda8baf9589585365f1fb79c7e4da590a31ce33 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 19:56:40 -0500 Subject: [PATCH 06/25] fixed syntax for system_context_replaceability --- include/beman/execution/detail/parallel_scheduler.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index d30b226f..1e22109b 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -62,6 +62,7 @@ struct receiver_proxy { return ::std::nullopt; } }; +}// namespace beman::execution::system_context_replaceability namespace beman::execution { From 9c686efe61be1c60ace7123f31b057e4c5847bb7 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 20:18:11 -0500 Subject: [PATCH 07/25] added bulk_item_receiver_proxy --- include/beman/execution/detail/parallel_scheduler.hpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index 1e22109b..ce5ab8e9 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -50,9 +50,9 @@ namespace beman::execution::system_context_replaceability { struct receiver_proxy { virtual ~receiver_proxy() = default; - virtual auto set_value() noexcept -> void = 0; + virtual auto set_value() noexcept -> void = 0; virtual auto set_error(::std::exception_ptr) noexcept -> void = 0; - virtual auto set_stopped() noexcept -> void = 0; + virtual auto set_stopped() noexcept -> void = 0; template requires(::std::same_as> && ::std::is_object_v

&& !::std::is_array_v

) @@ -62,7 +62,12 @@ struct receiver_proxy { return ::std::nullopt; } }; -}// namespace beman::execution::system_context_replaceability + +struct bulk_item_receiver_proxy : receiver_proxy { + virtual auto execute(::std::size_t, ::std::size_t) noexcept -> void = 0; +}; + +} // namespace beman::execution::system_context_replaceability namespace beman::execution { From 089d2a7b88ca939cbd8cde4949bb7b5be809e0a9 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 21:34:02 -0500 Subject: [PATCH 08/25] added parallel_scheduler_backend --- .../beman/execution/detail/parallel_scheduler.hpp | 15 +++++++++++++++ src/beman/execution/parallel_scheduler.cppm | 1 + 2 files changed, 16 insertions(+) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index ce5ab8e9..092f49bb 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -67,6 +67,21 @@ struct bulk_item_receiver_proxy : receiver_proxy { virtual auto execute(::std::size_t, ::std::size_t) noexcept -> void = 0; }; +struct parallel_scheduler_backend { + virtual ~parallel_scheduler_backend() = default; + + virtual auto schedule(receiver_proxy&, ::std::span<::std::byte>) noexcept -> void = 0; + virtual auto schedule_bulk_chunked(::std::size_t, + bulk_item_receiver_proxy&, + ::std::span<::std::byte>) noexcept -> void = 0; + virtual auto schedule_bulk_unchunked(::std::size_t, + bulk_item_receiver_proxy&, + ::std::span<::std::byte>) noexcept -> void = 0; +}; + +// TODO(P2079R10): provide the project-supported link-time replaceability hook. +auto query_parallel_scheduler_backend() -> ::std::shared_ptr; + } // namespace beman::execution::system_context_replaceability namespace beman::execution { diff --git a/src/beman/execution/parallel_scheduler.cppm b/src/beman/execution/parallel_scheduler.cppm index ee434f01..760cdc45 100644 --- a/src/beman/execution/parallel_scheduler.cppm +++ b/src/beman/execution/parallel_scheduler.cppm @@ -12,5 +12,6 @@ export using beman::execution::get_parallel_scheduler; namespace system_context_replaceability { export using beman::execution::system_context_replaceability::receiver_proxy; +export using beman::execution::system_context_replaceability::query_parallel_scheduler_backend; } // namespace system_context_replaceability } // namespace beman::execution From 9cf288ff2505d0f5adce777e6961429450e79ba8 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 22:13:36 -0500 Subject: [PATCH 09/25] added parallel_scheduler --- .../execution/detail/parallel_scheduler.hpp | 34 ++++++++++++++++++- src/beman/execution/parallel_scheduler.cppm | 2 ++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index 092f49bb..38a7ed62 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -87,7 +87,39 @@ auto query_parallel_scheduler_backend() -> ::std::shared_ptr parallel_scheduler& = default; + auto operator=(parallel_scheduler&&) noexcept -> parallel_scheduler& = default; + + auto operator==(const parallel_scheduler& other) const noexcept -> bool { + return this->backend_ == other.backend_; + } + + static constexpr auto query(::beman::execution::get_forward_progress_guarantee_t) noexcept + -> ::beman::execution::forward_progress_guarantee { + return ::beman::execution::forward_progress_guarantee::parallel; + } + + auto schedule() const noexcept -> sender; + // TODO(P2079R10): customize bulk_chunked and bulk_unchunked for this scheduler. + + private: + explicit parallel_scheduler(::std::shared_ptr backend) noexcept : backend_(::std::move(backend)) {} + + ::std::shared_ptr backend_; + + friend auto get_parallel_scheduler() -> parallel_scheduler; }; // TODO(P2079R10): implement using system_context_replaceability::query_parallel_scheduler_backend(). diff --git a/src/beman/execution/parallel_scheduler.cppm b/src/beman/execution/parallel_scheduler.cppm index 760cdc45..16138923 100644 --- a/src/beman/execution/parallel_scheduler.cppm +++ b/src/beman/execution/parallel_scheduler.cppm @@ -12,6 +12,8 @@ export using beman::execution::get_parallel_scheduler; namespace system_context_replaceability { export using beman::execution::system_context_replaceability::receiver_proxy; +export using beman::execution::system_context_replaceability::bulk_item_receiver_proxy; +export using beman::execution::system_context_replaceability::parallel_scheduler_backend; export using beman::execution::system_context_replaceability::query_parallel_scheduler_backend; } // namespace system_context_replaceability } // namespace beman::execution From eb7709afefbcb728fc01b8e282126e6a4c43b0fd Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Sun, 3 May 2026 22:41:39 -0500 Subject: [PATCH 10/25] fixed format --- .../beman/execution/detail/parallel_scheduler.hpp | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index 38a7ed62..923702b2 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -71,12 +71,10 @@ struct parallel_scheduler_backend { virtual ~parallel_scheduler_backend() = default; virtual auto schedule(receiver_proxy&, ::std::span<::std::byte>) noexcept -> void = 0; - virtual auto schedule_bulk_chunked(::std::size_t, - bulk_item_receiver_proxy&, - ::std::span<::std::byte>) noexcept -> void = 0; - virtual auto schedule_bulk_unchunked(::std::size_t, - bulk_item_receiver_proxy&, - ::std::span<::std::byte>) noexcept -> void = 0; + virtual auto schedule_bulk_chunked(::std::size_t, bulk_item_receiver_proxy&, ::std::span<::std::byte>) noexcept + -> void = 0; + virtual auto schedule_bulk_unchunked(::std::size_t, bulk_item_receiver_proxy&, ::std::span<::std::byte>) noexcept + -> void = 0; }; // TODO(P2079R10): provide the project-supported link-time replaceability hook. @@ -94,7 +92,7 @@ class parallel_scheduler { class sender; - parallel_scheduler() = delete; + parallel_scheduler() = delete; ~parallel_scheduler() = default; parallel_scheduler(const parallel_scheduler&) noexcept = default; From f2a887ef2dac1ce9460bae1a5a6ed0a5b80ce2ac Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Wed, 13 May 2026 21:21:04 -0500 Subject: [PATCH 11/25] fixed modified imports with scheduler_tag --- include/beman/execution/detail/parallel_scheduler.hpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index 923702b2..6b97ba74 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -24,7 +24,7 @@ import beman.execution.detail.get_forward_progress_guarantee; import beman.execution.detail.operation_state; import beman.execution.detail.receiver; import beman.execution.detail.scheduler; -import beman.execution.detail.scheduler_t; +import beman.execution.detail.scheduler_tag; import beman.execution.detail.sender; import beman.execution.detail.set_error; import beman.execution.detail.set_stopped; @@ -36,7 +36,7 @@ import beman.execution.detail.set_value; #include #include #include -#include +#include #include #include #include @@ -88,7 +88,7 @@ class parallel_scheduler { using backend_type = ::beman::execution::system_context_replaceability::parallel_scheduler_backend; public: - using scheduler_concept = ::beman::execution::scheduler_t; + using scheduler_concept = ::beman::execution::scheduler_tag; class sender; From f761d3cc801b21571d65ced28f72555aa82ca933 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Wed, 13 May 2026 22:48:13 -0500 Subject: [PATCH 12/25] added parallel_scheduler::sender class --- .../execution/detail/parallel_scheduler.hpp | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index 6b97ba74..3da33831 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -120,6 +120,85 @@ class parallel_scheduler { friend auto get_parallel_scheduler() -> parallel_scheduler; }; +class parallel_scheduler::sender { + using backend_type = ::beman::execution::system_context_replaceability::parallel_scheduler_backend; + + public: + using sender_concept = ::beman::execution::sender_tag; + using completion_signatures = + ::beman::execution::completion_signatures<::beman::execution::set_value_t(), + ::beman::execution::set_error_t(::std::exception_ptr), + ::beman::execution::set_stopped_t()>; + + class env { + public: + explicit env(::std::shared_ptr backend) noexcept : backend_(::std::move(backend)) {} + + auto + query(const ::beman::execution::get_completion_scheduler_t<::beman::execution::set_value_t>&) const noexcept + -> ::beman::execution::parallel_scheduler { + return ::beman::execution::parallel_scheduler{this->backend_}; + } + + private: + ::std::shared_ptr backend_; + }; + + template <::beman::execution::receiver Rcvr> + class operation : public ::beman::execution::system_context_replaceability::receiver_proxy { + public: + using operation_state_concept = ::beman::execution::operation_state_tag; + + operation(::std::shared_ptr backend, + Rcvr&& rcvr) noexcept(::std::is_nothrow_constructible_v<::std::remove_cvref_t, Rcvr>) + : backend_(::std::move(backend)), rcvr_(::std::forward(rcvr)) {} + + auto start() & noexcept -> void { + // TODO(P2079R10): define backend storage sizing and stopped-before-start handling. + this->backend_->schedule(*this, ::std::span<::std::byte>{this->storage_}); + } + + private: + auto set_value() noexcept -> void override { ::beman::execution::set_value(::std::move(this->rcvr_)); } + + auto set_error(::std::exception_ptr error) noexcept -> void override { + ::beman::execution::set_error(::std::move(this->rcvr_), ::std::move(error)); + } + + auto set_stopped() noexcept -> void override { ::beman::execution::set_stopped(::std::move(this->rcvr_)); } + + ::std::shared_ptr backend_; + ::std::remove_cvref_t rcvr_; + alignas(void*)::std::byte storage_[sizeof(void*) * 4]{}; + }; + + explicit sender(::std::shared_ptr backend) noexcept : backend_(::std::move(backend)) {} + + template + static consteval auto get_completion_signatures() noexcept -> completion_signatures { + return {}; + } + + auto get_env() const noexcept -> env { return env{this->backend_}; } + + template <::beman::execution::receiver Rcvr> + auto connect(Rcvr&& rcvr) & noexcept(::std::is_nothrow_constructible_v<::std::remove_cvref_t, Rcvr>) + -> operation { + return operation{this->backend_, ::std::forward(rcvr)}; + } + + template <::beman::execution::receiver Rcvr> + auto connect(Rcvr&& rcvr) && noexcept(::std::is_nothrow_constructible_v<::std::remove_cvref_t, Rcvr>) + -> operation { + return operation{::std::move(this->backend_), ::std::forward(rcvr)}; + } + + private: + ::std::shared_ptr backend_; +}; + +inline auto parallel_scheduler::schedule() const noexcept -> sender { return sender{this->backend_}; } + // TODO(P2079R10): implement using system_context_replaceability::query_parallel_scheduler_backend(). auto get_parallel_scheduler() -> parallel_scheduler; From 7363e379b657b07ee398be3b6a8c491f77ceb650 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Wed, 20 May 2026 21:07:25 -0500 Subject: [PATCH 13/25] can y --- include/beman/execution26/execution.hpp | 9 ++ src/beman/execution/execution.cppm | 7 ++ tests/beman/execution/CMakeLists.txt | 1 + .../exec-parallel-scheduler.test.cpp | 84 +++++++++++++++++++ 4 files changed, 101 insertions(+) create mode 100644 tests/beman/execution/exec-parallel-scheduler.test.cpp diff --git a/include/beman/execution26/execution.hpp b/include/beman/execution26/execution.hpp index d414719b..65095bb8 100644 --- a/include/beman/execution26/execution.hpp +++ b/include/beman/execution26/execution.hpp @@ -42,6 +42,7 @@ using ::beman::execution::get_domain; using ::beman::execution::get_domain_t; using ::beman::execution::get_env; using ::beman::execution::get_env_t; +using ::beman::execution::get_parallel_scheduler; using ::beman::execution::get_scheduler; using ::beman::execution::get_scheduler_t; using ::beman::execution::get_stop_token; @@ -62,6 +63,7 @@ using ::beman::execution::let_value; using ::beman::execution::let_value_t; using ::beman::execution::operation_state; using ::beman::execution::operation_state_tag; +using ::beman::execution::parallel_scheduler; using ::beman::execution::read_env; using ::beman::execution::receiver; using ::beman::execution::receiver_of; @@ -107,6 +109,13 @@ using ::beman::execution::when_all_with_variant; using ::beman::execution::when_all_with_variant_t; using ::beman::execution::with_awaitable_senders; +namespace system_context_replaceability { +using ::beman::execution::system_context_replaceability::bulk_item_receiver_proxy; +using ::beman::execution::system_context_replaceability::parallel_scheduler_backend; +using ::beman::execution::system_context_replaceability::query_parallel_scheduler_backend; +using ::beman::execution::system_context_replaceability::receiver_proxy; +} // namespace system_context_replaceability + } // namespace beman::execution26 // ---------------------------------------------------------------------------- diff --git a/src/beman/execution/execution.cppm b/src/beman/execution/execution.cppm index 1d93dc90..f5672614 100644 --- a/src/beman/execution/execution.cppm +++ b/src/beman/execution/execution.cppm @@ -242,6 +242,13 @@ export using ::beman::execution::run_loop; export using ::beman::execution::parallel_scheduler; export using ::beman::execution::get_parallel_scheduler; +namespace system_context_replaceability { +export using ::beman::execution::system_context_replaceability::receiver_proxy; +export using ::beman::execution::system_context_replaceability::bulk_item_receiver_proxy; +export using ::beman::execution::system_context_replaceability::parallel_scheduler_backend; +export using ::beman::execution::system_context_replaceability::query_parallel_scheduler_backend; +} // namespace system_context_replaceability + // [exec.consumers], consumers export using ::beman::execution::sync_wait_t; export using ::beman::execution::sync_wait_with_variant_t; diff --git a/tests/beman/execution/CMakeLists.txt b/tests/beman/execution/CMakeLists.txt index 08530378..9474ad3c 100644 --- a/tests/beman/execution/CMakeLists.txt +++ b/tests/beman/execution/CMakeLists.txt @@ -32,6 +32,7 @@ list( exec-just.test exec-let.test exec-on.test + exec-parallel-scheduler.test exec-opstate-start.test exec-opstate.test exec-prop.test diff --git a/tests/beman/execution/exec-parallel-scheduler.test.cpp b/tests/beman/execution/exec-parallel-scheduler.test.cpp new file mode 100644 index 00000000..4006d3f0 --- /dev/null +++ b/tests/beman/execution/exec-parallel-scheduler.test.cpp @@ -0,0 +1,84 @@ +// src/beman/execution/tests/exec-parallel-scheduler.test.cpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef BEMAN_HAS_MODULESSo +import beman.execution; +#else +#include +#endif + +namespace { +namespace replaceability = test_std::system_context_replaceability; + +struct proxy : replaceability::receiver_proxy { + auto set_value() noexcept -> void override {} + auto set_error(::std::exception_ptr) noexcept -> void override {} + auto set_stopped() noexcept -> void override {} +}; + +struct bulk_proxy : replaceability::bulk_item_receiver_proxy { + auto set_value() noexcept -> void override {} + auto set_error(::std::exception_ptr) noexcept -> void override {} + auto set_stopped() noexcept -> void override {} + auto execute(::std::size_t, ::std::size_t) noexcept -> void override {} +}; + +struct backend : replaceability::parallel_scheduler_backend { + auto schedule(replaceability::receiver_proxy&, ::std::span<::std::byte>) noexcept -> void override {} + auto schedule_bulk_chunked(::std::size_t, + replaceability::bulk_item_receiver_proxy&, + ::std::span<::std::byte>) noexcept -> void override {} + auto schedule_bulk_unchunked(::std::size_t, + replaceability::bulk_item_receiver_proxy&, + ::std::span<::std::byte>) noexcept -> void override {} +}; + +auto test_parallel_scheduler_synopsis() -> void { + static_assert(!::std::default_initializable); + static_assert(::std::copy_constructible); + static_assert(::std::move_constructible); + static_assert(test_std::scheduler); + + static_assert(::std::same_as); + static_assert(::std::same_as, + test_std::parallel_scheduler::sender>); + static_assert(test_std::sender); + static_assert(::std::same_as< + decltype(test_std::get_completion_signatures()), + test_std::completion_signatures>); + + static_assert(noexcept(test_std::get_forward_progress_guarantee( + ::std::declval()))); + static_assert(::std::same_as())), + test_std::forward_progress_guarantee>); +} + +auto test_replaceability_synopsis() -> void { + static_assert(::std::is_abstract_v); + static_assert(::std::is_abstract_v); + static_assert(::std::is_abstract_v); + static_assert(::std::derived_from); + static_assert(::std::derived_from); + static_assert(::std::same_as().template try_query(0)), + ::std::optional>); + static_assert(::std::same_as>); +} +} // namespace + +TEST(exec_parallel_scheduler) { + test_parallel_scheduler_synopsis(); + test_replaceability_synopsis(); +} From 5b36756f5b26497ba3a016d87ec46ba1b6a982b4 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Wed, 20 May 2026 22:23:57 -0500 Subject: [PATCH 14/25] add import for schedule_result_t --- tests/beman/execution/exec-parallel-scheduler.test.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/beman/execution/exec-parallel-scheduler.test.cpp b/tests/beman/execution/exec-parallel-scheduler.test.cpp index 4006d3f0..fc2f258c 100644 --- a/tests/beman/execution/exec-parallel-scheduler.test.cpp +++ b/tests/beman/execution/exec-parallel-scheduler.test.cpp @@ -12,6 +12,7 @@ #include #ifdef BEMAN_HAS_MODULESSo import beman.execution; +import beman.execution.detail.schedule_result_t; #else #include #endif From dcfa4b7195adf3752e5d2088304ab32de86e2ca7 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Wed, 20 May 2026 22:46:10 -0500 Subject: [PATCH 15/25] fixed typo --- tests/beman/execution/exec-parallel-scheduler.test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/beman/execution/exec-parallel-scheduler.test.cpp b/tests/beman/execution/exec-parallel-scheduler.test.cpp index fc2f258c..6727dbb2 100644 --- a/tests/beman/execution/exec-parallel-scheduler.test.cpp +++ b/tests/beman/execution/exec-parallel-scheduler.test.cpp @@ -10,7 +10,7 @@ #include #include #include -#ifdef BEMAN_HAS_MODULESSo +#ifdef BEMAN_HAS_MODULES import beman.execution; import beman.execution.detail.schedule_result_t; #else From c2c77625969c7e27cc8bfef5602fa06b7a97e3a0 Mon Sep 17 00:00:00 2001 From: JorgeV92 Date: Wed, 20 May 2026 22:57:39 -0500 Subject: [PATCH 16/25] format fix --- .../exec-parallel-scheduler.test.cpp | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/tests/beman/execution/exec-parallel-scheduler.test.cpp b/tests/beman/execution/exec-parallel-scheduler.test.cpp index 6727dbb2..bc90feff 100644 --- a/tests/beman/execution/exec-parallel-scheduler.test.cpp +++ b/tests/beman/execution/exec-parallel-scheduler.test.cpp @@ -10,7 +10,7 @@ #include #include #include -#ifdef BEMAN_HAS_MODULES +#ifdef BEMAN_HAS_MODULES import beman.execution; import beman.execution.detail.schedule_result_t; #else @@ -53,16 +53,15 @@ auto test_parallel_scheduler_synopsis() -> void { static_assert(::std::same_as, test_std::parallel_scheduler::sender>); static_assert(test_std::sender); - static_assert(::std::same_as< - decltype(test_std::get_completion_signatures()), - test_std::completion_signatures>); + static_assert(::std::same_as()), + test_std::completion_signatures>); - static_assert(noexcept(test_std::get_forward_progress_guarantee( - ::std::declval()))); + static_assert( + noexcept(test_std::get_forward_progress_guarantee(::std::declval()))); static_assert(::std::same_as())), + ::std::declval())), test_std::forward_progress_guarantee>); } @@ -72,8 +71,7 @@ auto test_replaceability_synopsis() -> void { static_assert(::std::is_abstract_v); static_assert(::std::derived_from); static_assert(::std::derived_from); - static_assert(::std::same_as().template try_query(0)), - ::std::optional>); + static_assert(::std::same_as().template try_query(0)), ::std::optional>); static_assert(::std::same_as>); } From 71073e6a6e094faa7e7990b1a62553d8b08555f0 Mon Sep 17 00:00:00 2001 From: Cra3z Date: Fri, 26 Jun 2026 13:50:47 +0800 Subject: [PATCH 17/25] Address issue #83 --- tests/beman/execution/exec-bulk.test.cpp | 9 +++++---- tests/beman/execution/exec-just.test.cpp | 10 ++++------ 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/tests/beman/execution/exec-bulk.test.cpp b/tests/beman/execution/exec-bulk.test.cpp index 0febe816..01a8b0a7 100644 --- a/tests/beman/execution/exec-bulk.test.cpp +++ b/tests/beman/execution/exec-bulk.test.cpp @@ -391,10 +391,10 @@ struct pstl_for_each_sender { template auto set_value(Args&&... args) noexcept -> void { try { - auto iota = std::views::iota(Shape(0), shape); - std::for_each(policy, std::ranges::begin(iota), std::ranges::end(iota), [&](Shape i) { + auto indices = std::views::iota(Shape(0), shape); + std::for_each(policy, std::ranges::begin(indices), std::ranges::end(indices), [&](Shape i) { if constexpr (IsChunked) { - std::invoke(fn, i, i + 1, args...); + std::invoke(fn, i, i + Shape(1), args...); } else { std::invoke(fn, i, args...); }; @@ -430,7 +430,8 @@ struct pstl_for_each_sender { template auto connect(Rcvr rcvr) && noexcept { - return test_std::connect(child, receiver{std::move(rcvr), std::move(policy), std::move(shape), std::move(fn)}); + return test_std::connect(std::move(child), + receiver{std::move(rcvr), std::move(policy), std::move(shape), std::move(fn)}); } auto get_env() const noexcept { return test_std::get_env(child); } diff --git a/tests/beman/execution/exec-just.test.cpp b/tests/beman/execution/exec-just.test.cpp index 8feab9f9..3ce04652 100644 --- a/tests/beman/execution/exec-just.test.cpp +++ b/tests/beman/execution/exec-just.test.cpp @@ -191,16 +191,17 @@ auto test_just_allocator() -> void { ASSERT(resource.count == 0u); auto copy(std::make_obj_using_allocator(std::pmr::polymorphic_allocator<>(&resource), str)); test::use(copy); - ASSERT(resource.count == 1u); + auto old_count = resource.count; + ASSERT(old_count > 0u); auto env{test_std::get_env(receiver)}; auto alloc{test_std::get_allocator(env)}; test::use(alloc); - ASSERT(resource.count == 1u); + ASSERT(resource.count == old_count); auto state{test_std::connect(std::move(sender), memory_receiver{&resource})}; test::use(state); - ASSERT(resource.count == 2u); + ASSERT(resource.count > old_count); } auto test_completion_signatures() -> void { @@ -220,10 +221,7 @@ TEST(exec_just) { test_just_constraints(); test_just(); test_completion_signatures(); -#ifndef _MSC_VER - //-dk:TODO re-enable allocator test for MSVC++ test_just_allocator(); -#endif } catch (...) { // NOLINTNEXTLINE(cert-dcl03-c,hicpp-static-assert,misc-static-assert) ASSERT(nullptr == "the just tests shouldn't throw"); From 61fdc9a71e704714825361da880790b3b3e9305d Mon Sep 17 00:00:00 2001 From: Cra3z Date: Fri, 26 Jun 2026 14:23:23 +0800 Subject: [PATCH 18/25] Re-enable the remaining test cases disabled for MSVC --- tests/beman/execution/exec-associate.test.cpp | 6 +----- tests/beman/execution/exec-spawn-future.test.cpp | 6 ------ tests/beman/execution/exec-then.test.cpp | 9 +++------ 3 files changed, 4 insertions(+), 17 deletions(-) diff --git a/tests/beman/execution/exec-associate.test.cpp b/tests/beman/execution/exec-associate.test.cpp index 9d2987ed..949890cd 100644 --- a/tests/beman/execution/exec-associate.test.cpp +++ b/tests/beman/execution/exec-associate.test.cpp @@ -119,7 +119,6 @@ TEST(exec_associate) { static_assert(std::same_as, test_std::completion_signatures_of_t>>); -#ifndef _MSC_VER //-dk:TODO MSVC++ struggles with more than one of these test using snd1_t = decltype(test_std::associate(test_std::just(std::string{}), null_token{})); static_assert(std::same_as, test_std::completion_signatures_of_t>>); @@ -136,10 +135,8 @@ TEST(exec_associate) { using snd4_t = decltype(test_std::associate(test_std::just(std::ref(i)), null_token{})); static_assert(std::same_as)>, test_std::completion_signatures_of_t>>); -#endif } -#ifndef _MSC_VER //-dk:TODO MSVC++ struggles with more than one of these test // Identity behavior with null_token for value path + piping works. { { @@ -191,7 +188,7 @@ TEST(exec_associate) { { auto snd = test_std::just(true) | test_std::associate(expired_token{}) | test_std::upon_stopped([]() noexcept { return false; }); - auto r = test_std::sync_wait(std::move(snd)); + auto r = test_std::sync_wait(std::move(snd)); ASSERT(r.has_value()); ASSERT((std::get<0>(r.value()) == false)); } @@ -241,5 +238,4 @@ TEST(exec_associate) { ASSERT(completes_with_value(test_std::just(1) | test_std::associate(null_token{}))); ASSERT(!completes_with_value(test_std::just(1) | test_std::associate(expired_token{}))); } -#endif } diff --git a/tests/beman/execution/exec-spawn-future.test.cpp b/tests/beman/execution/exec-spawn-future.test.cpp index 36aaf96d..5f50cb13 100644 --- a/tests/beman/execution/exec-spawn-future.test.cpp +++ b/tests/beman/execution/exec-spawn-future.test.cpp @@ -221,9 +221,7 @@ auto test_receiver() { std::move(r0).set_error(17); ASSERT(state.called == true); ASSERT((std::holds_alternative>(state.result))); -#ifndef _MSC_VER //-dk:TODO enable test for MSVC++ ASSERT((std::get<1>(std::get>(state.result)) == 17)); -#endif } { @@ -238,11 +236,9 @@ auto test_receiver() { std::move(r0).set_value(17, true, 'x'); ASSERT(state.called == true); ASSERT((std::holds_alternative>(state.result))); -#ifndef _MSC_VER //-dk:TODO enable test for MSVC++ ASSERT((std::get<1>(std::get>(state.result)) == 17)); ASSERT((std::get<2>(std::get>(state.result)) == true)); ASSERT((std::get<3>(std::get>(state.result)) == 'x')); -#endif } { @@ -257,7 +253,6 @@ auto test_receiver() { std::move(r0).set_value(17, throws(), 'x'); ASSERT(state.called == true); ASSERT((std::holds_alternative>(state.result))); -#ifndef _MSC_VER //-dk:TODO enable test for MSVC++ try { std::rethrow_exception( std::get<1>(std::get>(state.result))); @@ -267,7 +262,6 @@ auto test_receiver() { } catch (...) { ASSERT(nullptr == "not reached"); } -#endif } } diff --git a/tests/beman/execution/exec-then.test.cpp b/tests/beman/execution/exec-then.test.cpp index ec0abef7..101fde6a 100644 --- a/tests/beman/execution/exec-then.test.cpp +++ b/tests/beman/execution/exec-then.test.cpp @@ -58,6 +58,8 @@ struct sender { } }; +constexpr auto consume_all = [](auto&&...) {}; + template auto test_has(auto cpo, auto in_sender, auto fun) -> void { static_assert(test_std::receiver); @@ -70,14 +72,9 @@ auto test_has(auto cpo, auto in_sender, auto fun) -> void { static_assert(requires { { in_sender | cpo(fun) } -> test_std::sender; }); -#ifndef _MSC_VER - //-dk:TODO re-enable this test static_assert(requires { - { - in_sender | cpo(fun) | cpo([](auto&&...) {}) - } -> test_std::sender; + { in_sender | cpo(fun) | cpo(consume_all) } -> test_std::sender; }); -#endif auto sender{cpo(in_sender, fun)}; auto op{test_std::connect(::std::move(sender), receiver{})}; test_std::start(op); From 0a1b062ad88e73076f6f20a34d872281904ad293 Mon Sep 17 00:00:00 2001 From: Cra3z Date: Fri, 26 Jun 2026 14:32:15 +0800 Subject: [PATCH 19/25] Format code --- tests/beman/execution/exec-associate.test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/beman/execution/exec-associate.test.cpp b/tests/beman/execution/exec-associate.test.cpp index 949890cd..07144f32 100644 --- a/tests/beman/execution/exec-associate.test.cpp +++ b/tests/beman/execution/exec-associate.test.cpp @@ -188,7 +188,7 @@ TEST(exec_associate) { { auto snd = test_std::just(true) | test_std::associate(expired_token{}) | test_std::upon_stopped([]() noexcept { return false; }); - auto r = test_std::sync_wait(std::move(snd)); + auto r = test_std::sync_wait(std::move(snd)); ASSERT(r.has_value()); ASSERT((std::get<0>(r.value()) == false)); } From f5e80999eb55fbdecc09358b4c93a46e7727de66 Mon Sep 17 00:00:00 2001 From: Cra3z Date: Fri, 26 Jun 2026 14:37:29 +0800 Subject: [PATCH 20/25] Revert exec-spawn-future.test.cpp --- tests/beman/execution/exec-spawn-future.test.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/beman/execution/exec-spawn-future.test.cpp b/tests/beman/execution/exec-spawn-future.test.cpp index 5f50cb13..36aaf96d 100644 --- a/tests/beman/execution/exec-spawn-future.test.cpp +++ b/tests/beman/execution/exec-spawn-future.test.cpp @@ -221,7 +221,9 @@ auto test_receiver() { std::move(r0).set_error(17); ASSERT(state.called == true); ASSERT((std::holds_alternative>(state.result))); +#ifndef _MSC_VER //-dk:TODO enable test for MSVC++ ASSERT((std::get<1>(std::get>(state.result)) == 17)); +#endif } { @@ -236,9 +238,11 @@ auto test_receiver() { std::move(r0).set_value(17, true, 'x'); ASSERT(state.called == true); ASSERT((std::holds_alternative>(state.result))); +#ifndef _MSC_VER //-dk:TODO enable test for MSVC++ ASSERT((std::get<1>(std::get>(state.result)) == 17)); ASSERT((std::get<2>(std::get>(state.result)) == true)); ASSERT((std::get<3>(std::get>(state.result)) == 'x')); +#endif } { @@ -253,6 +257,7 @@ auto test_receiver() { std::move(r0).set_value(17, throws(), 'x'); ASSERT(state.called == true); ASSERT((std::holds_alternative>(state.result))); +#ifndef _MSC_VER //-dk:TODO enable test for MSVC++ try { std::rethrow_exception( std::get<1>(std::get>(state.result))); @@ -262,6 +267,7 @@ auto test_receiver() { } catch (...) { ASSERT(nullptr == "not reached"); } +#endif } } From ec7f354b11e0e04677a7b7116b31800fd0df4997 Mon Sep 17 00:00:00 2001 From: Cra3z Date: Fri, 3 Jul 2026 14:02:26 +0800 Subject: [PATCH 21/25] Rename namespace `system_context_replaceability` to `parallel_scheduler_replacement` --- .../beman/execution/detail/parallel_scheduler.hpp | 10 +++++----- include/beman/execution26/execution.hpp | 12 ++++++------ src/beman/execution/execution.cppm | 10 +++++----- src/beman/execution/parallel_scheduler.cppm | 12 ++++++------ .../beman/execution/exec-parallel-scheduler.test.cpp | 2 +- 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index 3da33831..fd12ea30 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -45,7 +45,7 @@ import beman.execution.detail.set_value; // ---------------------------------------------------------------------------- -namespace beman::execution::system_context_replaceability { +namespace beman::execution::parallel_scheduler_replacement { struct receiver_proxy { virtual ~receiver_proxy() = default; @@ -80,12 +80,12 @@ struct parallel_scheduler_backend { // TODO(P2079R10): provide the project-supported link-time replaceability hook. auto query_parallel_scheduler_backend() -> ::std::shared_ptr; -} // namespace beman::execution::system_context_replaceability +} // namespace beman::execution::parallel_scheduler_replacement namespace beman::execution { class parallel_scheduler { - using backend_type = ::beman::execution::system_context_replaceability::parallel_scheduler_backend; + using backend_type = ::beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend; public: using scheduler_concept = ::beman::execution::scheduler_tag; @@ -121,7 +121,7 @@ class parallel_scheduler { }; class parallel_scheduler::sender { - using backend_type = ::beman::execution::system_context_replaceability::parallel_scheduler_backend; + using backend_type = ::beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend; public: using sender_concept = ::beman::execution::sender_tag; @@ -145,7 +145,7 @@ class parallel_scheduler::sender { }; template <::beman::execution::receiver Rcvr> - class operation : public ::beman::execution::system_context_replaceability::receiver_proxy { + class operation : public ::beman::execution::parallel_scheduler_replacement::receiver_proxy { public: using operation_state_concept = ::beman::execution::operation_state_tag; diff --git a/include/beman/execution26/execution.hpp b/include/beman/execution26/execution.hpp index 65095bb8..5b685670 100644 --- a/include/beman/execution26/execution.hpp +++ b/include/beman/execution26/execution.hpp @@ -109,12 +109,12 @@ using ::beman::execution::when_all_with_variant; using ::beman::execution::when_all_with_variant_t; using ::beman::execution::with_awaitable_senders; -namespace system_context_replaceability { -using ::beman::execution::system_context_replaceability::bulk_item_receiver_proxy; -using ::beman::execution::system_context_replaceability::parallel_scheduler_backend; -using ::beman::execution::system_context_replaceability::query_parallel_scheduler_backend; -using ::beman::execution::system_context_replaceability::receiver_proxy; -} // namespace system_context_replaceability +namespace parallel_scheduler_replacement { +using ::beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy; +using ::beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend; +using ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend; +using ::beman::execution::parallel_scheduler_replacement::receiver_proxy; +} // namespace parallel_scheduler_replacement } // namespace beman::execution26 diff --git a/src/beman/execution/execution.cppm b/src/beman/execution/execution.cppm index fe052496..871d9412 100644 --- a/src/beman/execution/execution.cppm +++ b/src/beman/execution/execution.cppm @@ -247,11 +247,11 @@ export using ::beman::execution::run_loop; export using ::beman::execution::parallel_scheduler; export using ::beman::execution::get_parallel_scheduler; -namespace system_context_replaceability { -export using ::beman::execution::system_context_replaceability::receiver_proxy; -export using ::beman::execution::system_context_replaceability::bulk_item_receiver_proxy; -export using ::beman::execution::system_context_replaceability::parallel_scheduler_backend; -export using ::beman::execution::system_context_replaceability::query_parallel_scheduler_backend; +namespace parallel_scheduler_replacement { +export using ::beman::execution::parallel_scheduler_replacement::receiver_proxy; +export using ::beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy; +export using ::beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend; +export using ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend; } // namespace system_context_replaceability // [exec.consumers], consumers diff --git a/src/beman/execution/parallel_scheduler.cppm b/src/beman/execution/parallel_scheduler.cppm index 16138923..8fcf75e4 100644 --- a/src/beman/execution/parallel_scheduler.cppm +++ b/src/beman/execution/parallel_scheduler.cppm @@ -10,10 +10,10 @@ namespace beman::execution { export using beman::execution::parallel_scheduler; export using beman::execution::get_parallel_scheduler; -namespace system_context_replaceability { -export using beman::execution::system_context_replaceability::receiver_proxy; -export using beman::execution::system_context_replaceability::bulk_item_receiver_proxy; -export using beman::execution::system_context_replaceability::parallel_scheduler_backend; -export using beman::execution::system_context_replaceability::query_parallel_scheduler_backend; -} // namespace system_context_replaceability +namespace parallel_scheduler_replacement { +export using beman::execution::parallel_scheduler_replacement::receiver_proxy; +export using beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy; +export using beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend; +export using beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend; +} // namespace parallel_scheduler_replacement } // namespace beman::execution diff --git a/tests/beman/execution/exec-parallel-scheduler.test.cpp b/tests/beman/execution/exec-parallel-scheduler.test.cpp index bc90feff..7758ebb3 100644 --- a/tests/beman/execution/exec-parallel-scheduler.test.cpp +++ b/tests/beman/execution/exec-parallel-scheduler.test.cpp @@ -18,7 +18,7 @@ import beman.execution.detail.schedule_result_t; #endif namespace { -namespace replaceability = test_std::system_context_replaceability; +namespace replaceability = test_std::parallel_scheduler_replacement; struct proxy : replaceability::receiver_proxy { auto set_value() noexcept -> void override {} From bc194728150b5ac3e9aacefc34d68d25877d428f Mon Sep 17 00:00:00 2001 From: Cra3z Date: Fri, 3 Jul 2026 14:19:21 +0800 Subject: [PATCH 22/25] Format code --- src/beman/execution/execution.cppm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/beman/execution/execution.cppm b/src/beman/execution/execution.cppm index 871d9412..a43dcf2e 100644 --- a/src/beman/execution/execution.cppm +++ b/src/beman/execution/execution.cppm @@ -252,7 +252,7 @@ export using ::beman::execution::parallel_scheduler_replacement::receiver_proxy; export using ::beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy; export using ::beman::execution::parallel_scheduler_replacement::parallel_scheduler_backend; export using ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend; -} // namespace system_context_replaceability +} // namespace parallel_scheduler_replacement // [exec.consumers], consumers export using ::beman::execution::sync_wait_t; From 5dd01d98a1dea2a741f62937e3b8f1b6fd117e5d Mon Sep 17 00:00:00 2001 From: Cra3z <3324654761@qq.com> Date: Fri, 3 Jul 2026 23:14:49 +0800 Subject: [PATCH 23/25] Implement bulk customization for parallel-scheduler --- .../detail/get_completion_domain.hpp | 2 - .../execution/detail/parallel_scheduler.hpp | 200 +++++++++++++++++- 2 files changed, 199 insertions(+), 3 deletions(-) diff --git a/include/beman/execution/detail/get_completion_domain.hpp b/include/beman/execution/detail/get_completion_domain.hpp index a150ba98..743c53aa 100644 --- a/include/beman/execution/detail/get_completion_domain.hpp +++ b/include/beman/execution/detail/get_completion_domain.hpp @@ -55,12 +55,10 @@ struct get_completion_domain_t : ::beman::execution::forwarding_query_t { ::beman::execution::detail::try_query( ::beman::execution::get_completion_scheduler(q, e...), get_completion_domain_t<::beman::execution::set_value_t>{}, - ::std::forward(q), ::std::forward(e)...); }) { return ::beman::execution::detail::try_query(::beman::execution::get_completion_scheduler(q, e...), get_completion_domain_t<::beman::execution::set_value_t>{}, - ::std::forward(q), ::std::forward(e)...); } else if constexpr (::beman::execution::scheduler && 0u != sizeof...(E)) { return ::beman::execution::default_domain{}; diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index fd12ea30..3c083b4e 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -16,23 +16,52 @@ import std; #include #include #include +#include #endif #ifdef BEMAN_HAS_MODULES +import beman.execution.detail.bulk; +import beman.execution.detail.connect; +import beman.execution.detail.connect_result_t; import beman.execution.detail.completion_signatures; +import beman.execution.detail.env_of_t; +import beman.execution.detail.execution_policy; +import beman.execution.detail.forward_like; import beman.execution.detail.get_completion_scheduler; +import beman.execution.detail.get_completion_signatures; +import beman.execution.detail.get_domain; +import beman.execution.detail.get_env; import beman.execution.detail.get_forward_progress_guarantee; +import beman.execution.detail.meta.combine; +import beman.execution.detail.meta.prepend; +import beman.execution.detail.meta.unique; import beman.execution.detail.operation_state; import beman.execution.detail.receiver; import beman.execution.detail.scheduler; import beman.execution.detail.scheduler_tag; import beman.execution.detail.sender; +import beman.execution.detail.start; +import beman.execution.detail.tag_of_t; import beman.execution.detail.set_error; import beman.execution.detail.set_stopped; import beman.execution.detail.set_value; +import beman.execution.detail.unreachable; +import beman.execution.detail.value_types_of_t; #else +#include +#include +#include #include +#include +#include +#include #include +#include +#include +#include #include +#include +#include +#include #include #include #include @@ -41,6 +70,10 @@ import beman.execution.detail.set_value; #include #include #include +#include +#include +#include +#include #endif // ---------------------------------------------------------------------------- @@ -82,6 +115,167 @@ auto query_parallel_scheduler_backend() -> ::std::shared_ptr +struct psched_bulk_sender { + using sender_concept = ::beman::execution::sender_tag; + + static constexpr bool is_parallel_policy = ::std::same_as || + ::std::same_as; + template + struct rcvr_proxy : ::beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy { + using receiver_concept = ::beman::execution::receiver_tag; + using result_type = ::beman::execution::detail::meta:: + prepend<::std::monostate, ::beman::execution::value_types_of_t>>; + + rcvr_proxy(Rcvr rcvr, Policy policy, Shape shape, Fn fn) noexcept + : rcvr(::std::move(rcvr)), policy(::std::move(policy)), shape(::std::move(shape)), fn(::std::move(fn)) {} + + auto get_env() const noexcept { return ::beman::execution::get_env(rcvr); } + + auto execute(::std::size_t begin, ::std::size_t end) noexcept -> void final { + const Shape first = is_parallel_policy ? static_cast(begin) : Shape(0); + const Shape last = is_parallel_policy ? static_cast(end) : shape; + const auto call_fn = [=, this](const Args&... args) { + if constexpr (IsChunked) { + fn(first, last, args...); + } else { + for (Shape i = first; i < last; ++i) { + fn(i, args...); + } + } + }; + std::visit( + [&](T& tpl) { + if constexpr (::std::same_as) { + ::beman::execution::detail::unreachable(); + } else { + ::std::apply(call_fn, ::std::move(tpl)); + } + }, + result); + } + + auto set_value() noexcept -> void final { + const auto call_set_value = [this](Args&&... args) { + ::beman::execution::set_value(::std::move(rcvr), ::std::forward(args)...); + }; + try { + std::visit( + [&](T tpl) { + if constexpr (::std::same_as) { + ::beman::execution::detail::unreachable(); + } else { + ::std::apply(call_set_value, ::std::move(tpl)); + } + }, + ::std::move(result)); + } catch (...) { + this->set_error(::std::current_exception()); + } + } + + template + auto set_error(E e) noexcept -> void { + ::beman::execution::set_error(::std::move(rcvr), ::std::move(e)); + } + + auto set_error(std::exception_ptr e) noexcept -> void final { this->set_error<>(::std::move(e)); } + + auto set_stopped() noexcept -> void final { ::beman::execution::set_stopped(::std::move(rcvr)); } + + result_type result; + Rcvr rcvr; + Policy policy; + Shape shape; + Fn fn; + }; + + template + struct state { + using operation_concept = ::beman::execution::operation_state_tag; + using result_type = ::beman::execution::detail::meta:: + prepend<::std::monostate, ::beman::execution::value_types_of_t>>; + + struct receiver_ref { + using receiver_concept = ::beman::execution::receiver_tag; + + template + auto set_value(Args&&... args) noexcept -> void { + proxy->result.emplace(::std::forward(args)...); + auto backend = ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend(); + const ::std::size_t s = is_parallel_policy ? proxy->shape : 1uz; + alignas(void*)::std::byte storage[sizeof(void*) * 4uz]; + if constexpr (IsChunked) { + backend->schedule_bulk_chunked(s, *proxy, storage); + } else { + backend->schedule_bulk_unchunked(s, *proxy, storage); + } + } + + template + auto set_error(E e) noexcept -> void { + proxy->set_error(std::move(e)); + } + + auto set_stopped() noexcept -> void { proxy->set_stopped(); } + + auto get_env() const noexcept { return proxy->get_env(); } + + rcvr_proxy* proxy; + }; + using sub_state_t = ::beman::execution::connect_result_t; + + state(Child child, Rcvr rcvr, Policy policy, Shape shape, Fn fn) + : proxy(::std::move(rcvr), ::std::move(policy), ::std::move(shape), ::std::move(fn)), + sub_state(::beman::execution::connect(::std::move(child), proxy)) {} + + auto start() & noexcept -> void { ::beman::execution::start(sub_state); } + + rcvr_proxy proxy; + sub_state_t sub_state; + }; + + template + static consteval auto get_completion_signatures() { + constexpr auto compl_sigs = ::beman::execution::get_completion_signatures(); + return ::beman::execution::detail::meta::unique<::beman::execution::detail::meta::combine< + ::std::remove_cvref_t, + ::beman::execution::completion_signatures<::beman::execution::set_error_t(::std::exception_ptr)>>>{}; + } + + template + auto connect(Rcvr rcvr) && noexcept { + return state{std::move(child), std::move(rcvr), std::move(policy), std::move(shape), std::move(fn)}; + } + + auto get_env() const noexcept { return ::beman::execution::get_env(child); } + + [[no_unique_address]] std::bool_constant _; + Policy policy; + Shape shape; + Fn fn; + Child child; +}; + +struct parallel_scheduler_domain { + template + requires ::std::same_as<::beman::execution::tag_of_t, ::beman::execution::bulk_chunked_t> || + ::std::same_as<::beman::execution::tag_of_t, ::beman::execution::bulk_unchunked_t> + static auto transform_sender(::beman::execution::set_value_t, Sndr&& sndr, const auto&) noexcept { + auto&& [_, data, child] = ::std::forward(sndr); + auto [policy, shape, fn] = ::beman::execution::detail::forward_like(data); + return ::beman::execution::detail::psched_bulk_sender{ + ::std::bool_constant< + ::std::same_as<::beman::execution::tag_of_t, ::beman::execution::bulk_chunked_t>>{}, + ::std::move(policy), + ::std::move(shape), + ::std::move(fn), + ::beman::execution::detail::forward_like(child)}; + } +}; +} // namespace beman::execution::detail + namespace beman::execution { class parallel_scheduler { @@ -109,6 +303,10 @@ class parallel_scheduler { return ::beman::execution::forward_progress_guarantee::parallel; } + static constexpr auto query(::beman::execution::get_domain_t) noexcept { + return ::beman::execution::detail::parallel_scheduler_domain{}; + } + auto schedule() const noexcept -> sender; // TODO(P2079R10): customize bulk_chunked and bulk_unchunked for this scheduler. @@ -199,7 +397,7 @@ class parallel_scheduler::sender { inline auto parallel_scheduler::schedule() const noexcept -> sender { return sender{this->backend_}; } -// TODO(P2079R10): implement using system_context_replaceability::query_parallel_scheduler_backend(). +// TODO(P2079R10): implement using parallel_scheduler_replacement::query_parallel_scheduler_backend(). auto get_parallel_scheduler() -> parallel_scheduler; } // namespace beman::execution From 622181a7e264b9c8df9a85397339b5d9abb10c32 Mon Sep 17 00:00:00 2001 From: Cra3z <3324654761@qq.com> Date: Sat, 4 Jul 2026 11:06:52 +0800 Subject: [PATCH 24/25] Fix parallel scheduler domain query --- .../execution/detail/parallel_scheduler.hpp | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index 3c083b4e..bfd50423 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -23,12 +23,13 @@ import beman.execution.detail.bulk; import beman.execution.detail.connect; import beman.execution.detail.connect_result_t; import beman.execution.detail.completion_signatures; +import beman.execution.detail.decayed_tuple; import beman.execution.detail.env_of_t; import beman.execution.detail.execution_policy; import beman.execution.detail.forward_like; +import beman.execution.detail.get_completion_domain; import beman.execution.detail.get_completion_scheduler; import beman.execution.detail.get_completion_signatures; -import beman.execution.detail.get_domain; import beman.execution.detail.get_env; import beman.execution.detail.get_forward_progress_guarantee; import beman.execution.detail.meta.combine; @@ -51,12 +52,13 @@ import beman.execution.detail.value_types_of_t; #include #include #include +#include #include #include #include +#include #include #include -#include #include #include #include @@ -124,8 +126,7 @@ struct psched_bulk_sender { ::std::same_as; template struct rcvr_proxy : ::beman::execution::parallel_scheduler_replacement::bulk_item_receiver_proxy { - using receiver_concept = ::beman::execution::receiver_tag; - using result_type = ::beman::execution::detail::meta:: + using result_type = ::beman::execution::detail::meta:: prepend<::std::monostate, ::beman::execution::value_types_of_t>>; rcvr_proxy(Rcvr rcvr, Policy policy, Shape shape, Fn fn) noexcept @@ -193,8 +194,8 @@ struct psched_bulk_sender { template struct state { - using operation_concept = ::beman::execution::operation_state_tag; - using result_type = ::beman::execution::detail::meta:: + using operation_state_concept = ::beman::execution::operation_state_tag; + using result_type = ::beman::execution::detail::meta:: prepend<::std::monostate, ::beman::execution::value_types_of_t>>; struct receiver_ref { @@ -202,7 +203,8 @@ struct psched_bulk_sender { template auto set_value(Args&&... args) noexcept -> void { - proxy->result.emplace(::std::forward(args)...); + using arg_t = ::beman::execution::detail::decayed_tuple; + proxy->result.template emplace(::std::forward(args)...); auto backend = ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend(); const ::std::size_t s = is_parallel_policy ? proxy->shape : 1uz; alignas(void*)::std::byte storage[sizeof(void*) * 4uz]; @@ -228,7 +230,7 @@ struct psched_bulk_sender { state(Child child, Rcvr rcvr, Policy policy, Shape shape, Fn fn) : proxy(::std::move(rcvr), ::std::move(policy), ::std::move(shape), ::std::move(fn)), - sub_state(::beman::execution::connect(::std::move(child), proxy)) {} + sub_state(::beman::execution::connect(::std::move(child), receiver_ref{&proxy})) {} auto start() & noexcept -> void { ::beman::execution::start(sub_state); } @@ -246,16 +248,17 @@ struct psched_bulk_sender { template auto connect(Rcvr rcvr) && noexcept { - return state{std::move(child), std::move(rcvr), std::move(policy), std::move(shape), std::move(fn)}; + return state{ + ::std::move(child), ::std::move(rcvr), ::std::move(policy), ::std::move(shape), ::std::move(fn)}; } auto get_env() const noexcept { return ::beman::execution::get_env(child); } - [[no_unique_address]] std::bool_constant _; - Policy policy; - Shape shape; - Fn fn; - Child child; + [[no_unique_address]] ::std::bool_constant _; + Policy policy; + Shape shape; + Fn fn; + Child child; }; struct parallel_scheduler_domain { @@ -303,8 +306,9 @@ class parallel_scheduler { return ::beman::execution::forward_progress_guarantee::parallel; } - static constexpr auto query(::beman::execution::get_domain_t) noexcept { - return ::beman::execution::detail::parallel_scheduler_domain{}; + static constexpr auto query(::beman::execution::get_completion_domain_t<::beman::execution::set_value_t>) noexcept + -> ::beman::execution::detail::parallel_scheduler_domain { + return {}; } auto schedule() const noexcept -> sender; From 2fd226468fb7bced1ed583733ed0ba6ea6f819bf Mon Sep 17 00:00:00 2001 From: Cra3z <3324654761@qq.com> Date: Sat, 4 Jul 2026 12:11:38 +0800 Subject: [PATCH 25/25] Add definition of `get_parallel_scheduler` --- .../execution/detail/parallel_scheduler.hpp | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/include/beman/execution/detail/parallel_scheduler.hpp b/include/beman/execution/detail/parallel_scheduler.hpp index bfd50423..0e0843a2 100644 --- a/include/beman/execution/detail/parallel_scheduler.hpp +++ b/include/beman/execution/detail/parallel_scheduler.hpp @@ -202,10 +202,13 @@ struct psched_bulk_sender { using receiver_concept = ::beman::execution::receiver_tag; template - auto set_value(Args&&... args) noexcept -> void { + auto set_value(Args&&... args) noexcept -> void try { using arg_t = ::beman::execution::detail::decayed_tuple; proxy->result.template emplace(::std::forward(args)...); auto backend = ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend(); + if (backend == nullptr) [[unlikely]] { + ::std::terminate(); + } const ::std::size_t s = is_parallel_policy ? proxy->shape : 1uz; alignas(void*)::std::byte storage[sizeof(void*) * 4uz]; if constexpr (IsChunked) { @@ -213,11 +216,13 @@ struct psched_bulk_sender { } else { backend->schedule_bulk_unchunked(s, *proxy, storage); } + } catch (...) { + proxy->set_error(::std::current_exception()); } template auto set_error(E e) noexcept -> void { - proxy->set_error(std::move(e)); + proxy->set_error(::std::move(e)); } auto set_stopped() noexcept -> void { proxy->set_stopped(); } @@ -312,7 +317,6 @@ class parallel_scheduler { } auto schedule() const noexcept -> sender; - // TODO(P2079R10): customize bulk_chunked and bulk_unchunked for this scheduler. private: explicit parallel_scheduler(::std::shared_ptr backend) noexcept : backend_(::std::move(backend)) {} @@ -401,8 +405,13 @@ class parallel_scheduler::sender { inline auto parallel_scheduler::schedule() const noexcept -> sender { return sender{this->backend_}; } -// TODO(P2079R10): implement using parallel_scheduler_replacement::query_parallel_scheduler_backend(). -auto get_parallel_scheduler() -> parallel_scheduler; +[[nodiscard]] inline auto get_parallel_scheduler() -> parallel_scheduler { + auto backend = ::beman::execution::parallel_scheduler_replacement::query_parallel_scheduler_backend(); + if (backend == nullptr) [[unlikely]] { + ::std::terminate(); + } + return parallel_scheduler{::std::move(backend)}; +} } // namespace beman::execution