From 04c5c9b6775ae7c069d2aaca518c55e7b1e06102 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Fri, 12 Jun 2026 09:21:56 +0000 Subject: [PATCH] Please consider the following formatting changes --- .../Core/include/Framework/AnalysisTask.h | 223 +++++++++--------- 1 file changed, 112 insertions(+), 111 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 92f7079fc81fa..ab675d8f128c7 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -602,141 +602,142 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) // replace origins in Preslice declarations homogeneous_apply_refs_sized([&newOrigin](auto& element) { return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get()); - auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { - Cache bindingsKeys; - Cache bindingsKeysUnsorted; - // add preslice declarations to slicing cache definition - homogeneous_apply_refs_sized([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); - - homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get()); - homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get()); - - auto& callbacks = ic.services().get(); - auto eoscb = [task](EndOfStreamContext& eosContext) { - homogeneous_apply_refs_sized([&eosContext](auto& element) { + auto algo = AlgorithmSpec::InitCallback{ + [task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { + Cache bindingsKeys; + Cache bindingsKeysUnsorted; + // add preslice declarations to slicing cache definition + homogeneous_apply_refs_sized([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); + + homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get()); + homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get()); + + auto& callbacks = ic.services().get(); + auto eoscb = [task](EndOfStreamContext& eosContext) { + homogeneous_apply_refs_sized([&eosContext](auto& element) { analysis_task_parsers::postRunService(eosContext, element); analysis_task_parsers::postRunOutput(eosContext, element); return true; }, - *task.get()); - eosContext.services().get().readyToQuit(QuitRequest::Me); - }; + *task.get()); + eosContext.services().get().readyToQuit(QuitRequest::Me); + }; - callbacks.set(eoscb); + callbacks.set(eoscb); - /// call the task's init() function first as it may manipulate the task's elements - if constexpr (requires { task->init(ic); }) { - task->init(ic); - } - - /// update configurables in filters and partitions - homogeneous_apply_refs_sized( - [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); }, - *task.get()); - /// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos - homogeneous_apply_refs_sized([&expressionInfos](auto& element) { - return analysis_task_parsers::createExpressionTrees(expressionInfos, element); - }, - *task.get()); + /// call the task's init() function first as it may manipulate the task's elements + if constexpr (requires { task->init(ic); }) { + task->init(ic); + } - /// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values - if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted); - } - homogeneous_apply_refs_sized( - [&bindingsKeys, &bindingsKeysUnsorted](auto& x) { - return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted); + /// update configurables in filters and partitions + homogeneous_apply_refs_sized( + [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); }, + *task.get()); + /// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos + homogeneous_apply_refs_sized([&expressionInfos](auto& element) { + return analysis_task_parsers::createExpressionTrees(expressionInfos, element); }, - *task.get()); + *task.get()); - /// replace origin in slicing caches - std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) { - if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { - entry.matcher = replaceOrigin(entry.matcher, newOrigin); - } - return entry; - }); - std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) { - if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { - entry.matcher = replaceOrigin(entry.matcher, newOrigin); + /// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values + if constexpr (requires { &T::process; }) { + AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted); } - return entry; - }); + homogeneous_apply_refs_sized( + [&bindingsKeys, &bindingsKeysUnsorted](auto& x) { + return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted); + }, + *task.get()); - ic.services().get().setCaches(std::move(bindingsKeys)); - ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); - ic.services().get().setOrigin(newOrigin); + /// replace origin in slicing caches + std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) { + if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { + entry.matcher = replaceOrigin(entry.matcher, newOrigin); + } + return entry; + }); + std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) { + if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { + entry.matcher = replaceOrigin(entry.matcher, newOrigin); + } + return entry; + }); + + ic.services().get().setCaches(std::move(bindingsKeys)); + ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); + ic.services().get().setOrigin(newOrigin); #if (FAIRMQ_VERSION_DEC >= 111000) - PointerReconstructor pointerReconstructor(nullptr); - bool hasCCDBTables = !ic.services().get().requestedTIMs.empty(); - - return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable { - if (hasCCDBTables && (!pointerReconstructor)) { - auto& proxy = pc.services().get(); - auto& spec = pc.services().get().requestedTIMs.front(); - pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0); - } + PointerReconstructor pointerReconstructor(nullptr); + bool hasCCDBTables = !ic.services().get().requestedTIMs.empty(); + + return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable { + if (hasCCDBTables && (!pointerReconstructor)) { + auto& proxy = pc.services().get(); + auto& spec = pc.services().get().requestedTIMs.front(); + pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0); + } #else - return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable { + return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable { #endif - // load the ccdb object from their cache - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); - // reset partitions once per dataframe - homogeneous_apply_refs_sized([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get()); - // reset selections for the next dataframe - std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; }); - // reset pre-slice for the next dataframe - auto& slices = pc.services().get(); - homogeneous_apply_refs_sized([&slices](auto& element) { - return analysis_task_parsers::updateSliceInfo(element, slices); - }, - *(task.get())); - // initialize local caches - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get())); - // prepare outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get()); - // execute run() - if constexpr (requires { task->run(pc); }) { - task->run(pc); - } - // execute process() - if constexpr (requires { &T::process; }) { - auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); - auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; + // load the ccdb object from their cache + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); + // reset partitions once per dataframe + homogeneous_apply_refs_sized([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get()); + // reset selections for the next dataframe + std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; }); + // reset pre-slice for the next dataframe + auto& slices = pc.services().get(); + homogeneous_apply_refs_sized([&slices](auto& element) { + return analysis_task_parsers::updateSliceInfo(element, slices); + }, + *(task.get())); + // initialize local caches + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get())); + // prepare outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get()); + // execute run() + if constexpr (requires { task->run(pc); }) { + task->run(pc); + } + // execute process() + if constexpr (requires { &T::process; }) { + auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); + auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; #if (FAIRMQ_VERSION_DEC >= 111000) - AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin); #else - AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin); #endif - } - // execute optional process() - homogeneous_apply_refs_sized( + } + // execute optional process() + homogeneous_apply_refs_sized( #if (FAIRMQ_VERSION_DEC >= 111000) - [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto& x) { + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto& x) { #else - [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { #endif - if constexpr (is_process_configurable) { - if (x.value == true) { - auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); - auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; + if constexpr (is_process_configurable) { + if (x.value == true) { + auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); + auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; #if (FAIRMQ_VERSION_DEC >= 111000) - AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, pointerReconstructor, x.process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, pointerReconstructor, x.process, expressionInfos, slices, newOrigin); #else - AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin); #endif - return true; + return true; + } + return false; } return false; - } - return false; - }, - *task.get()); - // prepare delayed outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get()); - // finalize outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get()); - }; - } + }, + *task.get()); + // prepare delayed outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get()); + // finalize outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get()); + }; + } }; return {