diff --git a/Framework/Core/include/Framework/DataProcessingHelpers.h b/Framework/Core/include/Framework/DataProcessingHelpers.h index f414e3aa4ae00..e19474447ed12 100644 --- a/Framework/Core/include/Framework/DataProcessingHelpers.h +++ b/Framework/Core/include/Framework/DataProcessingHelpers.h @@ -54,7 +54,7 @@ struct DataProcessingHelpers { /// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies); /// Helper to route messages for forwarding - static std::vector routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector>& currentSetOfInputs, + static std::vector routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector>& currentSetOfInputs, bool copy, bool consume); /// Helper to route messages for forwarding static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span& currentSetOfInputs, std::vector& forwardedParts, diff --git a/Framework/Core/include/Framework/DataRelayer.h b/Framework/Core/include/Framework/DataRelayer.h index b56a2cb59ff10..f2799dc1ac62c 100644 --- a/Framework/Core/include/Framework/DataRelayer.h +++ b/Framework/Core/include/Framework/DataRelayer.h @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -113,7 +114,7 @@ class DataRelayer ActivityStats processDanglingInputs(std::vector const&, ServiceRegistryRef context, bool createNew); - using OnDropCallback = std::function>&, TimesliceIndex::OldestOutputInfo info)>; + using OnDropCallback = std::function>&, TimesliceIndex::OldestOutputInfo info)>; // Callback for when some messages are about to be owned by the the DataRelayer using OnInsertionCallback = std::function&)>; @@ -153,11 +154,14 @@ class DataRelayer /// @returns the actions ready to be performed. void getReadyToProcess(std::vector& completed); - /// Returns an input registry associated to the given timeslice and gives - /// ownership to the caller. This is because once the inputs are out of the - /// DataRelayer they need to be deleted once the processing is concluded. - std::vector> consumeAllInputsForTimeslice(TimesliceSlot id); + /// Returns spans into the relayer's internal storage for the given timeslice. + /// The slot is marked invalid so new data can be relayed to it immediately, + /// but the underlying message vectors are NOT freed until releaseSlot() is + /// called. The caller must call releaseSlot() once it is done with the spans. + std::vector> consumeAllInputsForTimeslice(TimesliceSlot id); std::vector> consumeExistingInputsForTimeslice(TimesliceSlot id); + /// Free the storage for a slot previously handed out by consumeAllInputsForTimeslice(). + void releaseSlot(TimesliceSlot slot); /// Returns how many timeslices we can handle in parallel [[nodiscard]] size_t getParallelTimeslices() const; @@ -204,6 +208,11 @@ class DataRelayer /// N is the maximum number of inflight timeslices, while /// M is the number of inputs which are requested. std::vector> mCache; + /// Holding area for message vectors moved out of mCache by + /// consumeAllInputsForTimeslice(). Same NxM layout as mCache. + /// Spans returned to callers point here and remain valid until releaseSlot(). + /// relay() never touches mConsumedCache, so no locking is needed in releaseSlot(). + std::vector> mConsumedCache; /// This is the index which maps a given timestamp to the associated /// cacheline. diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 4121d333f6b56..96b47caaf12d5 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -587,7 +587,7 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void { // the inputs which are shared between this device and others // to the next one in the daisy chain. // FIXME: do it in a smarter way than O(N^2) -static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector>& currentSetOfInputs, +static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector>& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) { auto& proxy = registry.get(); @@ -619,7 +619,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done"); }; -static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector>& currentSetOfInputs, +static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector>& currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) { auto& proxy = registry.get(); @@ -629,8 +629,7 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl // Always copy them, because we do not want to actually send them. // We merely need the side effect of the consume, if applicable. for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { - auto span = std::span(currentSetOfInputs[ii]); - DataProcessingHelpers::cleanForwardedMessages(span, consume); + DataProcessingHelpers::cleanForwardedMessages(currentSetOfInputs[ii], consume); } O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done"); @@ -1278,7 +1277,7 @@ void DataProcessingDevice::Run() // - we can trigger further events from the queue // - we can guarantee this is the last thing we do in the loop ( // assuming no one else is adding to the queue before this point). - auto onDrop = [®istry = mServiceRegistry, lid](TimesliceSlot slot, std::vector>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) { + auto onDrop = [®istry = mServiceRegistry, lid](TimesliceSlot slot, std::vector>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) { O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index); ServiceRegistryRef ref{registry}; ref.get(); @@ -1985,7 +1984,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& nPayloadsPerHeader = 1; ii += (nMessages / 2) - 1; } - auto onDrop = [ref](TimesliceSlot slot, std::vector>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) { + auto onDrop = [ref](TimesliceSlot slot, std::vector>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) { O2_SIGNPOST_ID_GENERATE(cid, async_queue); O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu", slot.index, oldestOutputInfo.timeslice.value); @@ -2163,15 +2162,20 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v // want to support multithreaded dispatching of operations, I can simply // move these to some thread local store and the rest of the lambdas // should work just fine. - std::vector> currentSetOfInputs; + std::vector> currentSetOfInputs; + std::vector> ownedInputs; // - auto getInputSpan = [ref, ¤tSetOfInputs](TimesliceSlot slot, bool consume = true) { + auto getInputSpan = [ref, ¤tSetOfInputs, &ownedInputs](TimesliceSlot slot, bool consume = true) { auto& relayer = ref.get(); if (consume) { currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot); } else { - currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot); + ownedInputs = relayer.consumeExistingInputsForTimeslice(slot); + currentSetOfInputs.resize(ownedInputs.size()); + for (size_t i = 0; i < ownedInputs.size(); ++i) { + currentSetOfInputs[i] = std::span(ownedInputs[i]); + } } // Convert raw message indices directly to a DataRef in O(1). // Used both by the sequential PartIterator and as the fallback for positional access. @@ -2245,7 +2249,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v // to avoid double counting them. // This was actually the easiest solution we could find for // O2-646. - auto cleanTimers = [¤tSetOfInputs](TimesliceSlot slot, InputRecord& record) { + auto cleanTimers = [¤tSetOfInputs, &ownedInputs](TimesliceSlot slot, InputRecord& record) { assert(record.size() == currentSetOfInputs.size()); for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) { // assuming that for timer inputs we do have exactly one PartRef object @@ -2258,8 +2262,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v if (input.header == nullptr) { continue; } - // This will hopefully delete the message. - currentSetOfInputs[ii].clear(); + // For the consume=false (Process) path, ownedInputs holds the actual + // message vectors and the span points into them. + ownedInputs[ii].clear(); + currentSetOfInputs[ii] = {}; } }; @@ -2412,9 +2418,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v if (spec.forwards.empty() == false) { auto& timesliceIndex = ref.get(); forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false); + ref.get().releaseSlot(action.slot); O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false); continue; } + ref.get().releaseSlot(action.slot); } // If there is no optional inputs we canForwardEarly // the messages to that parallel processing can happen. @@ -2567,6 +2575,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v if (action.op == CompletionPolicy::CompletionOp::Process) { cleanTimers(action.slot, record); } + if (shouldConsume) { + ref.get().releaseSlot(action.slot); + } O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str()); } O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions"); diff --git a/Framework/Core/src/DataProcessingHelpers.cxx b/Framework/Core/src/DataProcessingHelpers.cxx index b8399a4c591e7..a29e7b345c94c 100644 --- a/Framework/Core/src/DataProcessingHelpers.cxx +++ b/Framework/Core/src/DataProcessingHelpers.cxx @@ -393,15 +393,14 @@ void DataProcessingHelpers::cleanForwardedMessages(std::span>& currentSetOfInputs, + std::vector>& currentSetOfInputs, const bool copyByDefault, bool consume) -> std::vector { // we collect all messages per forward in a map and send them together std::vector forwardedParts(proxy.getNumForwardChannels()); for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) { - auto span = std::span(currentSetOfInputs[ii]); - routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume); + routeForwardedMessages(proxy, currentSetOfInputs[ii], forwardedParts, copyByDefault, consume); } return forwardedParts; }; diff --git a/Framework/Core/src/DataRelayer.cxx b/Framework/Core/src/DataRelayer.cxx index 7adf5b5c97fbb..73059729fe9e3 100644 --- a/Framework/Core/src/DataRelayer.cxx +++ b/Framework/Core/src/DataRelayer.cxx @@ -412,22 +412,20 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop) ref = mContext](TimesliceSlot slot) { if (onDrop) { auto oldestPossibleTimeslice = index.getOldestPossibleOutput(); - // State of the computation - std::vector> dropped(numInputTypes); + // Build spans over the cache entries (no copy or move of message ownership). + // The spans are valid for the duration of the onDrop call; entries are + // cleared below after the callback returns. + std::vector> droppedSpans(numInputTypes); for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) { auto cacheId = slot.index * numInputTypes + ai; cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; - // TODO: in the original implementation of the cache, there have been only two messages per entry, - // check if the 2 above corresponds to the number of messages. - if (!cache[cacheId].empty()) { - dropped[ai] = std::move(cache[cacheId]); - } + droppedSpans[ai] = cache[cacheId]; } - bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return !m.empty(); }); + bool anyDropped = std::any_of(droppedSpans.begin(), droppedSpans.end(), [](auto& s) { return !s.empty(); }); if (anyDropped) { O2_SIGNPOST_ID_GENERATE(aid, data_relayer); O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value); - onDrop(slot, dropped, oldestPossibleTimeslice); + onDrop(slot, droppedSpans, oldestPossibleTimeslice); } } assert(cache.empty() == false); @@ -886,58 +884,33 @@ void DataRelayer::updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStat } } -std::vector> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot) +std::vector> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot) { std::scoped_lock lock(mMutex); - const auto numInputTypes = mDistinctRoutesIndex.size(); - // State of the computation - std::vector> messages(numInputTypes); - auto& cache = mCache; - auto& index = mTimesliceIndex; - - // Nothing to see here, this is just to make the outer loop more understandable. - auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) { - return; - }; - - // We move ownership so that the cache can be reused once the computation is - // finished. We mark the given cache slot invalid, so that it can be reused - // This means we can still handle old messages if there is still space in the - // cache where to put them. - auto moveHeaderPayloadToOutput = [&messages, - &cachedStateMetrics = mCachedStateMetrics, - &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) { - auto cacheId = s.index * numInputTypes + arg; - cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; - // TODO: in the original implementation of the cache, there have been only two messages per entry, - // check if the 2 above corresponds to the number of messages. - if (!cache[cacheId].empty()) { - messages[arg] = std::move(cache[cacheId]); - } - index.markAsInvalid(s); - }; - - // An invalid set of arguments is a set of arguments associated to an invalid - // timeslice, so I can simply do that. I keep the assertion there because in principle - // we should have dispatched the timeslice already! - // FIXME: what happens when we have enough timeslices to hit the invalid one? - auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) { - for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) { - assert(std::accumulate(cache[ai].begin(), cache[ai].end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; })); - cache[ai].clear(); - } - index.markAsInvalid(s); - }; - - // Outer loop here. - jumpToCacheEntryAssociatedWith(slot); - for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) { - moveHeaderPayloadToOutput(slot, ai); + // Move message vectors from mCache into mConsumedCache (same NxM layout) so that: + // - mCache entries are immediately empty and the slot can be reused by relay() + // - the messages remain alive in mConsumedCache until releaseSlot() is called + // relay() never touches mConsumedCache, so the returned spans are safe to use + // concurrently with new relay() calls into this (now-invalid) slot. + std::vector> spans(numInputTypes); + for (size_t ai = 0; ai < numInputTypes; ++ai) { + auto cacheId = slot.index * numInputTypes + ai; + mCachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING; + mConsumedCache[cacheId] = std::move(mCache[cacheId]); + spans[ai] = mConsumedCache[cacheId]; } - invalidateCacheFor(slot); + mTimesliceIndex.markAsInvalid(slot); + return spans; +} - return messages; +void DataRelayer::releaseSlot(TimesliceSlot slot) +{ + // No lock needed: relay() only touches mCache, never mConsumedCache. + const auto numInputTypes = mDistinctRoutesIndex.size(); + for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) { + mConsumedCache[ai].clear(); + } } std::vector> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot) @@ -991,6 +964,9 @@ void DataRelayer::clear() for (auto& cache : mCache) { cache.clear(); } + for (auto& consumed : mConsumedCache) { + consumed.clear(); + } for (size_t s = 0; s < mTimesliceIndex.size(); ++s) { mTimesliceIndex.markAsInvalid(TimesliceSlot{s}); } @@ -1024,6 +1000,7 @@ void DataRelayer::publishMetrics() // maybe misleading to have the allocation in a function primarily for // metrics publishing, do better in setPipelineLength? mCache.resize(numInputTypes * mTimesliceIndex.size()); + mConsumedCache.resize(mCache.size()); auto& states = mContext.get(); mCachedStateMetrics.resize(mCache.size()); diff --git a/Framework/Core/test/benchmark_DataRelayer.cxx b/Framework/Core/test/benchmark_DataRelayer.cxx index e7df8fbb2fe9b..7ec722692afb9 100644 --- a/Framework/Core/test/benchmark_DataRelayer.cxx +++ b/Framework/Core/test/benchmark_DataRelayer.cxx @@ -140,7 +140,11 @@ static void BM_RelaySingleSlot(benchmark::State& state) auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); assert(result.size() == 1); assert((result.at(0) | count_parts{}) == 1); - inflightMessages = std::move(result[0]); + inflightMessages.clear(); + for (auto& msg : result[0]) { + inflightMessages.emplace_back(std::move(msg)); + } + relayer.releaseSlot(ready[0].slot); } } @@ -196,7 +200,11 @@ static void BM_RelayMultipleSlots(benchmark::State& state) auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); assert(result.size() == 1); assert((result.at(0) | count_parts{}) == 1); - inflightMessages = std::move(result[0]); + inflightMessages.clear(); + for (auto& msg : result[0]) { + inflightMessages.emplace_back(std::move(msg)); + } + relayer.releaseSlot(ready[0].slot); } } @@ -271,9 +279,16 @@ static void BM_RelayMultipleRoutes(benchmark::State& state) assert(result.size() == 2); assert((result.at(0) | count_parts{}) == 1); assert((result.at(1) | count_parts{}) == 1); - inflightMessages = std::move(result[0]); - inflightMessages.emplace_back(std::move(result[1][0])); - inflightMessages.emplace_back(std::move(result[1][1])); + state.PauseTiming(); + inflightMessages.clear(); + for (auto& msg : result[0]) { + inflightMessages.emplace_back(std::move(msg)); + } + for (auto& msg : result[1]) { + inflightMessages.emplace_back(std::move(msg)); + } + relayer.releaseSlot(ready[0].slot); + state.ResumeTiming(); } } @@ -333,7 +348,14 @@ static void BM_RelaySplitParts(benchmark::State& state) relayer.getReadyToProcess(ready); assert(ready.size() == 1); assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); - inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0]); + auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); + state.PauseTiming(); + inflightMessages.clear(); + for (auto& msg : result[0]) { + inflightMessages.emplace_back(std::move(msg)); + } + relayer.releaseSlot(ready[0].slot); + state.ResumeTiming(); } } @@ -387,7 +409,14 @@ static void BM_RelayMultiplePayloads(benchmark::State& state) relayer.getReadyToProcess(ready); assert(ready.size() == 1); assert(ready[0].op == CompletionPolicy::CompletionOp::Consume); - inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0]); + auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot); + state.PauseTiming(); + inflightMessages.clear(); + for (auto& msg : result[0]) { + inflightMessages.emplace_back(std::move(msg)); + } + relayer.releaseSlot(ready[0].slot); + state.ResumeTiming(); } } diff --git a/Framework/Core/test/test_ForwardInputs.cxx b/Framework/Core/test/test_ForwardInputs.cxx index 0263158ee0f9b..d0ca4f35d022e 100644 --- a/Framework/Core/test/test_ForwardInputs.cxx +++ b/Framework/Core/test/test_ForwardInputs.cxx @@ -27,6 +27,18 @@ O2_DECLARE_DYNAMIC_LOG(forwarding); using namespace o2::framework; +// Build a vector of spans over an existing vector-of-vectors for tests that +// construct currentSetOfInputs locally (rather than via consumeAllInputsForTimeslice). +static std::vector> asSpans(std::vector>& vecs) +{ + std::vector> spans; + spans.reserve(vecs.size()); + for (auto& v : vecs) { + spans.emplace_back(v); + } + return spans; +} + TEST_CASE("ForwardInputsEmpty") { o2::header::DataHeader dh; @@ -45,7 +57,8 @@ TEST_CASE("ForwardInputsEmpty") std::vector> currentSetOfInputs; - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.empty()); } @@ -96,7 +109,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRoute") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 2); // Two messages for that route } @@ -148,7 +162,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, true); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, true); REQUIRE(result.size() == 1); REQUIRE(result[0].Size() == 0); // Because there is a nullptr, we do not forward this as it was already consumed. } @@ -204,7 +219,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2. However it cannot really happen. // Correct behavior below: @@ -263,7 +279,8 @@ TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong // FIXME: actually correct behavior below @@ -329,7 +346,8 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutes") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // Two messages per route REQUIRE(result[1].Size() == 0); // Only the first DPL matched channel matters @@ -393,7 +411,8 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // With external matching channels, we need to copy and then forward REQUIRE(result[1].Size() == 2); // @@ -473,7 +492,8 @@ TEST_CASE("ForwardInputsMultiMessageMultipleRoutes") currentSetOfInputs.emplace_back(std::move(messageSet2)); REQUIRE(currentSetOfInputs.size() == 2); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 2); // REQUIRE(result[1].Size() == 2); // @@ -537,7 +557,8 @@ TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes REQUIRE(result[0].Size() == 0); // Two messages per route REQUIRE(result[1].Size() == 2); // Two messages per route @@ -621,7 +642,8 @@ TEST_CASE("ForwardInputsSplitPayload") REQUIRE((messageSet | count_parts{}) == 2); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 2); // Two routes CHECK(result[0].Size() == 2); // No messages on this route CHECK(result[1].Size() == 3); @@ -742,7 +764,8 @@ TEST_CASE("ForwardInputEOSSingleRoute") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded } @@ -788,7 +811,8 @@ TEST_CASE("ForwardInputOldestPossibleSingleRoute") REQUIRE((messageSet | count_parts{}) == 1); currentSetOfInputs.emplace_back(std::move(messageSet)); - auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume); + auto spans = asSpans(currentSetOfInputs); + auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, spans, copyByDefault, consume); REQUIRE(result.size() == 1); // One route REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded }