Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/DataProcessingHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ struct DataProcessingHelpers {
/// starts the EoS timers and returns the new TransitionHandlingState in case as new state is requested
static TransitionHandlingState updateStateTransition(ServiceRegistryRef const& ref, ProcessingPolicies const& policies);
/// Helper to route messages for forwarding
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
static std::vector<fair::mq::Parts> routeForwardedMessageSet(FairMQDeviceProxy& proxy, std::vector<std::span<fair::mq::MessagePtr>>& currentSetOfInputs,
bool copy, bool consume);
/// Helper to route messages for forwarding
static void routeForwardedMessages(FairMQDeviceProxy& proxy, std::span<fair::mq::MessagePtr>& currentSetOfInputs, std::vector<fair::mq::Parts>& forwardedParts,
Expand Down
19 changes: 14 additions & 5 deletions Framework/Core/include/Framework/DataRelayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include <cstddef>
#include <mutex>
#include <span>
#include <vector>
#include <functional>

Expand Down Expand Up @@ -113,7 +114,7 @@ class DataRelayer
ActivityStats processDanglingInputs(std::vector<ExpirationHandler> const&,
ServiceRegistryRef context, bool createNew);

using OnDropCallback = std::function<void(TimesliceSlot, std::vector<std::vector<fair::mq::MessagePtr>>&, TimesliceIndex::OldestOutputInfo info)>;
using OnDropCallback = std::function<void(TimesliceSlot, std::vector<std::span<fair::mq::MessagePtr>>&, TimesliceIndex::OldestOutputInfo info)>;

// Callback for when some messages are about to be owned by the the DataRelayer
using OnInsertionCallback = std::function<void(ServiceRegistryRef&, std::span<fair::mq::MessagePtr>&)>;
Expand Down Expand Up @@ -153,11 +154,14 @@ class DataRelayer
/// @returns the actions ready to be performed.
void getReadyToProcess(std::vector<RecordAction>& completed);

/// Returns an input registry associated to the given timeslice and gives
/// ownership to the caller. This is because once the inputs are out of the
/// DataRelayer they need to be deleted once the processing is concluded.
std::vector<std::vector<fair::mq::MessagePtr>> consumeAllInputsForTimeslice(TimesliceSlot id);
/// Returns spans into the relayer's internal storage for the given timeslice.
/// The slot is marked invalid so new data can be relayed to it immediately,
/// but the underlying message vectors are NOT freed until releaseSlot() is
/// called. The caller must call releaseSlot() once it is done with the spans.
std::vector<std::span<fair::mq::MessagePtr>> consumeAllInputsForTimeslice(TimesliceSlot id);
std::vector<std::vector<fair::mq::MessagePtr>> consumeExistingInputsForTimeslice(TimesliceSlot id);
/// Free the storage for a slot previously handed out by consumeAllInputsForTimeslice().
void releaseSlot(TimesliceSlot slot);

/// Returns how many timeslices we can handle in parallel
[[nodiscard]] size_t getParallelTimeslices() const;
Expand Down Expand Up @@ -204,6 +208,11 @@ class DataRelayer
/// N is the maximum number of inflight timeslices, while
/// M is the number of inputs which are requested.
std::vector<std::vector<fair::mq::MessagePtr>> mCache;
/// Holding area for message vectors moved out of mCache by
/// consumeAllInputsForTimeslice(). Same NxM layout as mCache.
/// Spans returned to callers point here and remain valid until releaseSlot().
/// relay() never touches mConsumedCache, so no locking is needed in releaseSlot().
std::vector<std::vector<fair::mq::MessagePtr>> mConsumedCache;

/// This is the index which maps a given timestamp to the associated
/// cacheline.
Expand Down
35 changes: 23 additions & 12 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ auto decongestionCallbackLate = [](AsyncTask& task, size_t aid) -> void {
// the inputs which are shared between this device and others
// to the next one in the daisy chain.
// FIXME: do it in a smarter way than O(N^2)
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::span<fair::mq::MessagePtr>>& currentSetOfInputs,
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
auto& proxy = registry.get<FairMQDeviceProxy>();

Expand Down Expand Up @@ -619,7 +619,7 @@ static auto forwardInputs = [](ServiceRegistryRef registry, TimesliceSlot slot,
O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Forwarding done");
};

static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot slot, std::vector<std::span<fair::mq::MessagePtr>>& currentSetOfInputs,
TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume = true) {
auto& proxy = registry.get<FairMQDeviceProxy>();

Expand All @@ -629,8 +629,7 @@ static auto cleanEarlyForward = [](ServiceRegistryRef registry, TimesliceSlot sl
// Always copy them, because we do not want to actually send them.
// We merely need the side effect of the consume, if applicable.
for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
DataProcessingHelpers::cleanForwardedMessages(span, consume);
DataProcessingHelpers::cleanForwardedMessages(currentSetOfInputs[ii], consume);
}

O2_SIGNPOST_END(forwarding, sid, "forwardInputs", "Cleaning done");
Expand Down Expand Up @@ -1278,7 +1277,7 @@ void DataProcessingDevice::Run()
// - we can trigger further events from the queue
// - we can guarantee this is the last thing we do in the loop (
// assuming no one else is adding to the queue before this point).
auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
auto onDrop = [&registry = mServiceRegistry, lid](TimesliceSlot slot, std::vector<std::span<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
O2_SIGNPOST_START(device, lid, "run_loop", "Dropping message from slot %" PRIu64 ". Forwarding as needed.", (uint64_t)slot.index);
ServiceRegistryRef ref{registry};
ref.get<AsyncQueue>();
Expand Down Expand Up @@ -1985,7 +1984,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
nPayloadsPerHeader = 1;
ii += (nMessages / 2) - 1;
}
auto onDrop = [ref](TimesliceSlot slot, std::vector<std::vector<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
auto onDrop = [ref](TimesliceSlot slot, std::vector<std::span<fair::mq::MessagePtr>>& dropped, TimesliceIndex::OldestOutputInfo oldestOutputInfo) {
O2_SIGNPOST_ID_GENERATE(cid, async_queue);
O2_SIGNPOST_EVENT_EMIT(async_queue, cid, "onDrop", "Dropping message from slot %zu. Forwarding as needed. Timeslice %zu",
slot.index, oldestOutputInfo.timeslice.value);
Expand Down Expand Up @@ -2163,15 +2162,20 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
// want to support multithreaded dispatching of operations, I can simply
// move these to some thread local store and the rest of the lambdas
// should work just fine.
std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
std::vector<std::span<fair::mq::MessagePtr>> currentSetOfInputs;
std::vector<std::vector<fair::mq::MessagePtr>> ownedInputs;

//
auto getInputSpan = [ref, &currentSetOfInputs](TimesliceSlot slot, bool consume = true) {
auto getInputSpan = [ref, &currentSetOfInputs, &ownedInputs](TimesliceSlot slot, bool consume = true) {
auto& relayer = ref.get<DataRelayer>();
if (consume) {
currentSetOfInputs = relayer.consumeAllInputsForTimeslice(slot);
} else {
currentSetOfInputs = relayer.consumeExistingInputsForTimeslice(slot);
ownedInputs = relayer.consumeExistingInputsForTimeslice(slot);
currentSetOfInputs.resize(ownedInputs.size());
for (size_t i = 0; i < ownedInputs.size(); ++i) {
currentSetOfInputs[i] = std::span(ownedInputs[i]);
}
}
// Convert raw message indices directly to a DataRef in O(1).
// Used both by the sequential PartIterator and as the fallback for positional access.
Expand Down Expand Up @@ -2245,7 +2249,7 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
// to avoid double counting them.
// This was actually the easiest solution we could find for
// O2-646.
auto cleanTimers = [&currentSetOfInputs](TimesliceSlot slot, InputRecord& record) {
auto cleanTimers = [&currentSetOfInputs, &ownedInputs](TimesliceSlot slot, InputRecord& record) {
assert(record.size() == currentSetOfInputs.size());
for (size_t ii = 0, ie = record.size(); ii < ie; ++ii) {
// assuming that for timer inputs we do have exactly one PartRef object
Expand All @@ -2258,8 +2262,10 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
if (input.header == nullptr) {
continue;
}
// This will hopefully delete the message.
currentSetOfInputs[ii].clear();
// For the consume=false (Process) path, ownedInputs holds the actual
// message vectors and the span points into them.
ownedInputs[ii].clear();
currentSetOfInputs[ii] = {};
}
};

Expand Down Expand Up @@ -2412,9 +2418,11 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
if (spec.forwards.empty() == false) {
auto& timesliceIndex = ref.get<TimesliceIndex>();
forwardInputs(ref, action.slot, currentSetOfInputs, timesliceIndex.getOldestPossibleOutput(), false);
ref.get<DataRelayer>().releaseSlot(action.slot);
O2_SIGNPOST_END(device, aid, "device", "Forwarding inputs consume: %d.", false);
continue;
}
ref.get<DataRelayer>().releaseSlot(action.slot);
}
// If there is no optional inputs we canForwardEarly
// the messages to that parallel processing can happen.
Expand Down Expand Up @@ -2567,6 +2575,9 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
if (action.op == CompletionPolicy::CompletionOp::Process) {
cleanTimers(action.slot, record);
}
if (shouldConsume) {
ref.get<DataRelayer>().releaseSlot(action.slot);
}
O2_SIGNPOST_END(device, aid, "device", "Done processing action on slot %lu for action %{public}s", action.slot.index, fmt::format("{}", action.op).c_str());
}
O2_SIGNPOST_END(device, sid, "device", "Start processing ready actions");
Expand Down
5 changes: 2 additions & 3 deletions Framework/Core/src/DataProcessingHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,14 @@ void DataProcessingHelpers::cleanForwardedMessages(std::span<fair::mq::MessagePt
}

auto DataProcessingHelpers::routeForwardedMessageSet(FairMQDeviceProxy& proxy,
std::vector<std::vector<fair::mq::MessagePtr>>& currentSetOfInputs,
std::vector<std::span<fair::mq::MessagePtr>>& currentSetOfInputs,
const bool copyByDefault, bool consume) -> std::vector<fair::mq::Parts>
{
// we collect all messages per forward in a map and send them together
std::vector<fair::mq::Parts> forwardedParts(proxy.getNumForwardChannels());

for (size_t ii = 0, ie = currentSetOfInputs.size(); ii < ie; ++ii) {
auto span = std::span<fair::mq::MessagePtr>(currentSetOfInputs[ii]);
routeForwardedMessages(proxy, span, forwardedParts, copyByDefault, consume);
routeForwardedMessages(proxy, currentSetOfInputs[ii], forwardedParts, copyByDefault, consume);
}
return forwardedParts;
};
Expand Down
89 changes: 33 additions & 56 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -412,22 +412,20 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop)
ref = mContext](TimesliceSlot slot) {
if (onDrop) {
auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
// State of the computation
std::vector<std::vector<fair::mq::MessagePtr>> dropped(numInputTypes);
// Build spans over the cache entries (no copy or move of message ownership).
// The spans are valid for the duration of the onDrop call; entries are
// cleared below after the callback returns.
std::vector<std::span<fair::mq::MessagePtr>> droppedSpans(numInputTypes);
for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
auto cacheId = slot.index * numInputTypes + ai;
cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
// TODO: in the original implementation of the cache, there have been only two messages per entry,
// check if the 2 above corresponds to the number of messages.
if (!cache[cacheId].empty()) {
dropped[ai] = std::move(cache[cacheId]);
}
droppedSpans[ai] = cache[cacheId];
}
bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return !m.empty(); });
bool anyDropped = std::any_of(droppedSpans.begin(), droppedSpans.end(), [](auto& s) { return !s.empty(); });
if (anyDropped) {
O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
onDrop(slot, dropped, oldestPossibleTimeslice);
onDrop(slot, droppedSpans, oldestPossibleTimeslice);
}
}
assert(cache.empty() == false);
Expand Down Expand Up @@ -886,58 +884,33 @@ void DataRelayer::updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStat
}
}

std::vector<std::vector<fair::mq::MessagePtr>> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
std::vector<std::span<fair::mq::MessagePtr>> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
{
std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);

const auto numInputTypes = mDistinctRoutesIndex.size();
// State of the computation
std::vector<std::vector<fair::mq::MessagePtr>> messages(numInputTypes);
auto& cache = mCache;
auto& index = mTimesliceIndex;

// Nothing to see here, this is just to make the outer loop more understandable.
auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
return;
};

// We move ownership so that the cache can be reused once the computation is
// finished. We mark the given cache slot invalid, so that it can be reused
// This means we can still handle old messages if there is still space in the
// cache where to put them.
auto moveHeaderPayloadToOutput = [&messages,
&cachedStateMetrics = mCachedStateMetrics,
&cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
auto cacheId = s.index * numInputTypes + arg;
cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
// TODO: in the original implementation of the cache, there have been only two messages per entry,
// check if the 2 above corresponds to the number of messages.
if (!cache[cacheId].empty()) {
messages[arg] = std::move(cache[cacheId]);
}
index.markAsInvalid(s);
};

// An invalid set of arguments is a set of arguments associated to an invalid
// timeslice, so I can simply do that. I keep the assertion there because in principle
// we should have dispatched the timeslice already!
// FIXME: what happens when we have enough timeslices to hit the invalid one?
auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
assert(std::accumulate(cache[ai].begin(), cache[ai].end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
cache[ai].clear();
}
index.markAsInvalid(s);
};

// Outer loop here.
jumpToCacheEntryAssociatedWith(slot);
for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
moveHeaderPayloadToOutput(slot, ai);
// Move message vectors from mCache into mConsumedCache (same NxM layout) so that:
// - mCache entries are immediately empty and the slot can be reused by relay()
// - the messages remain alive in mConsumedCache until releaseSlot() is called
// relay() never touches mConsumedCache, so the returned spans are safe to use
// concurrently with new relay() calls into this (now-invalid) slot.
std::vector<std::span<fair::mq::MessagePtr>> spans(numInputTypes);
for (size_t ai = 0; ai < numInputTypes; ++ai) {
auto cacheId = slot.index * numInputTypes + ai;
mCachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
mConsumedCache[cacheId] = std::move(mCache[cacheId]);
spans[ai] = mConsumedCache[cacheId];
}
invalidateCacheFor(slot);
mTimesliceIndex.markAsInvalid(slot);
return spans;
}

return messages;
void DataRelayer::releaseSlot(TimesliceSlot slot)
{
// No lock needed: relay() only touches mCache, never mConsumedCache.
const auto numInputTypes = mDistinctRoutesIndex.size();
for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
mConsumedCache[ai].clear();
}
}

std::vector<std::vector<fair::mq::MessagePtr>> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
Expand Down Expand Up @@ -991,6 +964,9 @@ void DataRelayer::clear()
for (auto& cache : mCache) {
cache.clear();
}
for (auto& consumed : mConsumedCache) {
consumed.clear();
}
for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
}
Expand Down Expand Up @@ -1024,6 +1000,7 @@ void DataRelayer::publishMetrics()
// maybe misleading to have the allocation in a function primarily for
// metrics publishing, do better in setPipelineLength?
mCache.resize(numInputTypes * mTimesliceIndex.size());
mConsumedCache.resize(mCache.size());
auto& states = mContext.get<DataProcessingStates>();

mCachedStateMetrics.resize(mCache.size());
Expand Down
Loading