Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 112 additions & 111 deletions Framework/Core/include/Framework/AnalysisTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -602,141 +602,142 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args)
// replace origins in Preslice declarations
homogeneous_apply_refs_sized<numElements>([&newOrigin](auto& element) { return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get());

auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable {
Cache bindingsKeys;
Cache bindingsKeysUnsorted;
// add preslice declarations to slicing cache definition
homogeneous_apply_refs_sized<numElements>([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get());

homogeneous_apply_refs_sized<numElements>([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get());
homogeneous_apply_refs_sized<numElements>([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get());

auto& callbacks = ic.services().get<CallbackService>();
auto eoscb = [task](EndOfStreamContext& eosContext) {
homogeneous_apply_refs_sized<numElements>([&eosContext](auto& element) {
auto algo = AlgorithmSpec::InitCallback{
[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable {
Cache bindingsKeys;
Cache bindingsKeysUnsorted;
// add preslice declarations to slicing cache definition
homogeneous_apply_refs_sized<numElements>([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get());

homogeneous_apply_refs_sized<numElements>([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get());
homogeneous_apply_refs_sized<numElements>([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get());

auto& callbacks = ic.services().get<CallbackService>();
auto eoscb = [task](EndOfStreamContext& eosContext) {
homogeneous_apply_refs_sized<numElements>([&eosContext](auto& element) {
analysis_task_parsers::postRunService(eosContext, element);
analysis_task_parsers::postRunOutput(eosContext, element);
return true; },
*task.get());
eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
};
*task.get());
eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
};

callbacks.set<CallbackService::Id::EndOfStream>(eoscb);
callbacks.set<CallbackService::Id::EndOfStream>(eoscb);

/// call the task's init() function first as it may manipulate the task's elements
if constexpr (requires { task->init(ic); }) {
task->init(ic);
}

/// update configurables in filters and partitions
homogeneous_apply_refs_sized<numElements>(
[&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); },
*task.get());
/// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos
homogeneous_apply_refs_sized<numElements>([&expressionInfos](auto& element) {
return analysis_task_parsers::createExpressionTrees(expressionInfos, element);
},
*task.get());
/// call the task's init() function first as it may manipulate the task's elements
if constexpr (requires { task->init(ic); }) {
task->init(ic);
}

/// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values
if constexpr (requires { &T::process; }) {
AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted);
}
homogeneous_apply_refs_sized<numElements>(
[&bindingsKeys, &bindingsKeysUnsorted](auto& x) {
return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted);
/// update configurables in filters and partitions
homogeneous_apply_refs_sized<numElements>(
[&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); },
*task.get());
/// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos
homogeneous_apply_refs_sized<numElements>([&expressionInfos](auto& element) {
return analysis_task_parsers::createExpressionTrees(expressionInfos, element);
},
*task.get());
*task.get());

/// replace origin in slicing caches
std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) {
if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
entry.matcher = replaceOrigin(entry.matcher, newOrigin);
}
return entry;
});
std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) {
if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
entry.matcher = replaceOrigin(entry.matcher, newOrigin);
/// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values
if constexpr (requires { &T::process; }) {
AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted);
}
return entry;
});
homogeneous_apply_refs_sized<numElements>(
[&bindingsKeys, &bindingsKeysUnsorted](auto& x) {
return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted);
},
*task.get());

ic.services().get<ArrowTableSlicingCacheDef>().setCaches(std::move(bindingsKeys));
ic.services().get<ArrowTableSlicingCacheDef>().setCachesUnsorted(std::move(bindingsKeysUnsorted));
ic.services().get<ArrowTableSlicingCacheDef>().setOrigin(newOrigin);
/// replace origin in slicing caches
std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) {
if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
entry.matcher = replaceOrigin(entry.matcher, newOrigin);
}
return entry;
});
std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) {
if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
entry.matcher = replaceOrigin(entry.matcher, newOrigin);
}
return entry;
});

ic.services().get<ArrowTableSlicingCacheDef>().setCaches(std::move(bindingsKeys));
ic.services().get<ArrowTableSlicingCacheDef>().setCachesUnsorted(std::move(bindingsKeysUnsorted));
ic.services().get<ArrowTableSlicingCacheDef>().setOrigin(newOrigin);
#if (FAIRMQ_VERSION_DEC >= 111000)
PointerReconstructor pointerReconstructor(nullptr);
bool hasCCDBTables = !ic.services().get<DanglingEdgesContext>().requestedTIMs.empty();

return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable {
if (hasCCDBTables && (!pointerReconstructor)) {
auto& proxy = pc.services().get<FairMQDeviceProxy>();
auto& spec = pc.services().get<DanglingEdgesContext>().requestedTIMs.front();
pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0);
}
PointerReconstructor pointerReconstructor(nullptr);
bool hasCCDBTables = !ic.services().get<DanglingEdgesContext>().requestedTIMs.empty();

return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable {
if (hasCCDBTables && (!pointerReconstructor)) {
auto& proxy = pc.services().get<FairMQDeviceProxy>();
auto& spec = pc.services().get<DanglingEdgesContext>().requestedTIMs.front();
pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0);
}
#else
return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable {
return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable {
#endif
// load the ccdb object from their cache
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get());
// reset partitions once per dataframe
homogeneous_apply_refs_sized<numElements>([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get());
// reset selections for the next dataframe
std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; });
// reset pre-slice for the next dataframe
auto& slices = pc.services().get<ArrowTableSlicingCache>();
homogeneous_apply_refs_sized<numElements>([&slices](auto& element) {
return analysis_task_parsers::updateSliceInfo(element, slices);
},
*(task.get()));
// initialize local caches
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get()));
// prepare outputs
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get());
// execute run()
if constexpr (requires { task->run(pc); }) {
task->run(pc);
}
// execute process()
if constexpr (requires { &T::process; }) {
auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(&T::process)>(); });
auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
// load the ccdb object from their cache
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get());
// reset partitions once per dataframe
homogeneous_apply_refs_sized<numElements>([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get());
// reset selections for the next dataframe
std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; });
// reset pre-slice for the next dataframe
auto& slices = pc.services().get<ArrowTableSlicingCache>();
homogeneous_apply_refs_sized<numElements>([&slices](auto& element) {
return analysis_task_parsers::updateSliceInfo(element, slices);
},
*(task.get()));
// initialize local caches
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get()));
// prepare outputs
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get());
// execute run()
if constexpr (requires { task->run(pc); }) {
task->run(pc);
}
// execute process()
if constexpr (requires { &T::process; }) {
auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(&T::process)>(); });
auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
#if (FAIRMQ_VERSION_DEC >= 111000)
AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin);
AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin);
#else
AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin);
AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin);
#endif
}
// execute optional process()
homogeneous_apply_refs_sized<numElements>(
}
// execute optional process()
homogeneous_apply_refs_sized<numElements>(
#if (FAIRMQ_VERSION_DEC >= 111000)
[&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto& x) {
[&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto& x) {
#else
[&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) {
[&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) {
#endif
if constexpr (is_process_configurable<decltype(x)>) {
if (x.value == true) {
auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(x.process)>(); });
auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
if constexpr (is_process_configurable<decltype(x)>) {
if (x.value == true) {
auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(x.process)>(); });
auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
#if (FAIRMQ_VERSION_DEC >= 111000)
AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, pointerReconstructor, x.process, expressionInfos, slices, newOrigin);
AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, pointerReconstructor, x.process, expressionInfos, slices, newOrigin);
#else
AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin);
AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin);
#endif
return true;
return true;
}
return false;
}
return false;
}
return false;
},
*task.get());
// prepare delayed outputs
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get());
// finalize outputs
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
};
}
},
*task.get());
// prepare delayed outputs
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get());
// finalize outputs
homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
};
}
};

return {
Expand Down
Loading