diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index 21fdae4a57760..6c23ef7e25966 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -11,6 +11,7 @@ #include "AnalysisCCDBHelpers.h" #include "CCDBFetcherHelper.h" +#include "Framework/ArrowTypes.h" #include "Framework/DataProcessingStats.h" #include "Framework/DeviceSpec.h" #include "Framework/TimingInfo.h" @@ -22,6 +23,7 @@ #include "Framework/DanglingEdgesContext.h" #include "Framework/ConfigContext.h" #include "Framework/ConfigParamsHelper.h" +#include #include #include #include @@ -109,20 +111,49 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) auto it = ccdbUrls.find(m.name); fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString()); auto columnName = m.name.substr(strlen("ccdb:")); +#if (FAIRMQ_VERSION_DEC >= 111000) + fields.emplace_back(std::make_shared(columnName, soa::asArrowDataType(), false, fieldMetadata)); +#else fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, fieldMetadata)); +#endif } schemas.emplace_back(std::make_shared(fields, schemaMetadata)); } +#if (FAIRMQ_VERSION_DEC >= 111000) + std::vector>> allbuilders; +#else + std::vector>> allbuilders; +#endif + allbuilders.resize([&schemas]() { size_t size = 0; for (auto& schema : schemas) { size += schema->num_fields(); }; return size; }()); + auto* pool = arrow::default_memory_pool(); + + int idx = 0; + int sidx = 0; + for (auto const& schema : schemas) { + for (auto const& _ : schema->fields()) { +#if (FAIRMQ_VERSION_DEC >= 111000) + auto value_builder = std::make_shared(); + allbuilders[idx] = std::make_pair(sidx, std::make_shared(pool, std::move(value_builder), 3)); +#else + allbuilders[idx] = std::make_pair(sidx, std::make_shared()); +#endif + ++idx; + } + ++sidx; + } + std::shared_ptr helper = std::make_shared(); CCDBFetcherHelper::initialiseHelper(*helper, options); std::unordered_map bindings; fillValidRoutes(*helper, spec.outputs, bindings); - return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) { + return adaptStateless([schemas, bindings, helper, allbuilders](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) { O2_SIGNPOST_ID_GENERATE(sid, ccdb); O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice); - for (auto& schema : schemas) { + std::ranges::for_each(allbuilders, [](auto& builder) { builder.second->Reset(); }); + for (auto i = 0U; i < schemas.size(); ++i) { + auto& schema = schemas[i]; std::vector ops; auto inputBinding = *schema->metadata()->Get("sourceTable"); auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher")); @@ -134,6 +165,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) auto table = inputs.get(inputMatcher)->asArrowTable(); // FIXME: make the fTimestamp column configurable. auto timestampColumn = table->GetColumnByName("fTimestamp"); + auto reserveSize = timestampColumn->length(); O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", "There are %zu bindings available", bindings.size()); for (auto& binding : bindings) { @@ -143,9 +175,16 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) } int outputRouteIndex = bindings.at(outRouteDesc); auto& spec = helper->routes[outputRouteIndex].matcher; - std::vector> builders; - for (auto const& _ : schema->fields()) { - builders.emplace_back(std::make_shared()); + auto builders = allbuilders | std::views::filter([&i](auto const& builder) { return builder.first == i; }); + unsigned int numBuilders = std::ranges::count_if(allbuilders, [&i](auto const& builder) { return builder.first == i; }); + arrow::Status status; + std::ranges::for_each(builders, [&status, &reserveSize](auto& builder) { + if (reserveSize > builder.second->capacity()) { + status &= builder.second->Reserve(reserveSize - builder.second->capacity()); + } + }); + if (!status.ok()) { + throw framework::runtime_error_f("Failed to reserve arrays: ", status.ToString().c_str()); } for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) { @@ -171,15 +210,25 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) O2_SIGNPOST_START(ccdb, sid, "handlingResponses", "Got %zu responses from server.", responses.size()); - if (builders.size() != responses.size()) { - LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size()); + if (numBuilders != responses.size()) { + LOGP(fatal, "Not enough responses (expected {}, found {})", numBuilders, responses.size()); } arrow::Status result; - for (size_t bi = 0; bi < responses.size(); bi++) { - auto& builder = builders[bi]; + + int bi = 0; + for (auto& builder : builders) { auto& response = responses[bi]; +#if (FAIRMQ_VERSION_DEC >= 111000) + result &= builder.second->Append(); + auto* value_builder = dynamic_cast(builder.second->value_builder()); + result &= value_builder->Append(response.id.handle); + result &= value_builder->Append(response.id.segment); + result &= value_builder->Append(response.size); +#else char const* address = reinterpret_cast(response.id.value); - result &= builder->Append(std::string_view(address, response.size)); + result &= builder.second->Append(std::string_view(address, response.size)); +#endif + ++bi; } if (!result.ok()) { LOGP(fatal, "Error adding results from CCDB"); @@ -188,9 +237,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) } } arrow::ArrayVector arrays; - for (auto& builder : builders) { - arrays.push_back(*builder->Finish()); - } + std::ranges::for_each(builders, [&arrays](auto& builder) { arrays.push_back(*builder.second->Finish()); }); auto outTable = arrow::Table::Make(schema, arrays); auto concrete = DataSpecUtils::asConcreteDataMatcher(spec); allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable); diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 6ff77f8facaec..269cfe51a4750 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -24,6 +24,7 @@ #include "Framework/ArrowTableSlicingCache.h" // IWYU pragma: export #include "Framework/SliceCache.h" // IWYU pragma: export #include "Framework/VariantHelpers.h" // IWYU pragma: export +#include #include #include // IWYU pragma: export #include // IWYU pragma: export @@ -36,9 +37,17 @@ #include #include // IWYU pragma: export +namespace fair::mq::shmem +{ +struct MetaHeader; +} + namespace o2::framework { using ListVector = std::vector>; +#if (FAIRMQ_VERSION_DEC >= 111000) +using PointerReconstructor = std::function; +#endif std::string cutString(std::string&& str); std::string strToUpper(std::string&& str); @@ -1081,6 +1090,9 @@ concept can_bind = requires(T&& t) { template concept has_index = (is_indexing_column || ...); +template +concept needs_ptr_rec = C::needs_ptr_rec; + template struct TableIterator : IP, C... { public: @@ -1248,6 +1260,21 @@ struct TableIterator : IP, C... { { doSetCurrentInternal(internal_index_columns_t{}, table); } +#if (FAIRMQ_VERSION_DEC >= 111000) + void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor) + { + [&pointerReconstructor, this](framework::pack) { + ([&pointerReconstructor, this]() { + if constexpr (needs_ptr_rec) { + if (pointerReconstructor) { + CC::ptrRec = &pointerReconstructor; + } + } + }.template operator()(), + ...); + }(all_columns{}); + } +#endif private: /// Helper to move at the end of columns which actually have an iterator. @@ -2285,7 +2312,12 @@ class Table { return self_t{mTable->Slice(0, 0), 0}; } - +#if (FAIRMQ_VERSION_DEC >= 111000) + void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor) + { + mBegin.setPointerReconstructor(pointerReconstructor); + } +#endif private: template arrow::ChunkedArray* lookupColumn() @@ -2464,6 +2496,57 @@ consteval static std::string_view namespace_prefix() }; \ [[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() } +#if (FAIRMQ_VERSION_DEC >= 111000) +#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \ + struct _Name_ : o2::soa::Column { \ + static constexpr const char* mLabel = _Label_; \ + static constexpr const char* query = _CCDBQuery_; \ + static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \ + static constexpr bool needs_ptr_rec = true; \ + std::function const* ptrRec = nullptr; \ + using base = o2::soa::Column; \ + using type = int64_t[3]; \ + using column_t = _Name_; \ + _Name_(arrow::ChunkedArray const* column) \ + : o2::soa::Column(o2::soa::ColumnIterator(column)) \ + { \ + } \ + \ + _Name_() = default; \ + _Name_(_Name_ const& other) = default; \ + _Name_& operator=(_Name_ const& other) = default; \ + \ + decltype(auto) _Getter_() const \ + { \ + auto& [handle, segment, size] = *mColumnIterator; \ + auto span = std::span{(*ptrRec)(fair::mq::shmem::MetaHeader{ \ + static_cast(size), \ + 0, handle, 0, 0, \ + static_cast(segment), true}), \ + static_cast(size)}; \ + if constexpr (std::same_as<_ConcreteType_, std::span>) { \ + return span; \ + } else { \ + static std::byte* payload = nullptr; \ + static _ConcreteType_* deserialised = nullptr; \ + static TClass* c = TClass::GetClass(#_ConcreteType_); \ + if (payload != (std::byte*)span.data()) { \ + payload = (std::byte*)span.data(); \ + delete deserialised; \ + TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \ + deserialised = (_ConcreteType_*)soa::extractCCDBPayload((char*)payload, span.size(), c, "ccdb_object"); \ + } \ + return *deserialised; \ + } \ + } \ + \ + decltype(auto) \ + get() const \ + { \ + return _Getter_(); \ + } \ + }; +#else #define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \ struct _Name_ : o2::soa::Column, _Name_> { \ static constexpr const char* mLabel = _Label_; \ @@ -2506,6 +2589,7 @@ consteval static std::string_view namespace_prefix() return _Getter_(); \ } \ }; +#endif #define DECLARE_SOA_CCDB_COLUMN(_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) \ DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, "f" #_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) @@ -3849,7 +3933,12 @@ class FilteredBase : public T { return mCached; } - +#if (FAIRMQ_VERSION_DEC >= 111000) + void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor) + { + mFilteredBegin.setPointerReconstructor(pointerReconstructor); + } +#endif private: void resetRanges() { diff --git a/Framework/Core/include/Framework/AnalysisDataModel.h b/Framework/Core/include/Framework/AnalysisDataModel.h index c8dd33fba62ee..2d264b648ed76 100644 --- a/Framework/Core/include/Framework/AnalysisDataModel.h +++ b/Framework/Core/include/Framework/AnalysisDataModel.h @@ -26,6 +26,11 @@ #include "SimulationDataFormat/MCGenProperties.h" #include "Framework/PID.h" +#include +#if (FAIRMQ_VERSION_DEC >= 111000) +#include +#endif + namespace o2 { namespace aod diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 3170236e18f09..8b7daa0976423 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -26,6 +26,8 @@ #include "Framework/TypeIdHelpers.h" #include "Framework/ArrowTableSlicingCache.h" #include "Framework/AnalysisDataModel.h" +#include "Framework/DanglingEdgesContext.h" +#include #include #include @@ -302,11 +304,19 @@ struct AnalysisDataProcessorBuilder { } template +#if (FAIRMQ_VERSION_DEC >= 111000) + static void invokeProcess(Task& task, InputRecord& inputs, R matchers, PointerReconstructor const& pointerReconstructor, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) +#else static void invokeProcess(Task& task, InputRecord& inputs, R matchers, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) +#endif { using G = std::decay_t; auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, matchers, processingFunction, infos); - +#if (FAIRMQ_VERSION_DEC >= 111000) + if constexpr (!is_enumeration) { + groupingTable.setPointerReconstructor(pointerReconstructor); + } +#endif constexpr const int numElements = nested_brace_constructible_size>() / 10; // set filtered tables for partitions with grouping @@ -347,6 +357,12 @@ struct AnalysisDataProcessorBuilder { ...); }, associatedTables); +#if (FAIRMQ_VERSION_DEC >= 111000) + std::apply([&pointerReconstructor](auto&... table) { + (table.setPointerReconstructor(pointerReconstructor), ...); + }, + associatedTables); +#endif auto binder = [&task, &groupingTable, &associatedTables](auto& x) mutable { x.bindExternalIndices(&groupingTable, &std::get>(associatedTables)...); @@ -377,6 +393,12 @@ struct AnalysisDataProcessorBuilder { auto slicer = GroupSlicer(groupingTable, associatedTables, slices, newOrigin); for (auto& slice : slicer) { auto associatedSlices = slice.associatedTables(); +#if (FAIRMQ_VERSION_DEC >= 111000) + std::apply([&pointerReconstructor](auto&... table) { + (table.setPointerReconstructor(pointerReconstructor), ...); + }, + associatedSlices); +#endif overwriteInternalIndices(associatedSlices, associatedTables); std::apply( [&binder](auto&... x) mutable { @@ -580,118 +602,144 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) // replace origins in Preslice declarations homogeneous_apply_refs_sized([&newOrigin](auto& element) { return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get()); - auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { - Cache bindingsKeys; - Cache bindingsKeysUnsorted; - // add preslice declarations to slicing cache definition - homogeneous_apply_refs_sized([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); - - homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get()); - homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get()); - - auto& callbacks = ic.services().get(); - auto eoscb = [task](EndOfStreamContext& eosContext) { - homogeneous_apply_refs_sized([&eosContext](auto& element) { + auto algo = AlgorithmSpec::InitCallback + { + [task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { + Cache bindingsKeys; + Cache bindingsKeysUnsorted; + // add preslice declarations to slicing cache definition + homogeneous_apply_refs_sized([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); + + homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get()); + homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get()); + + auto& callbacks = ic.services().get(); + auto eoscb = [task](EndOfStreamContext& eosContext) { + homogeneous_apply_refs_sized([&eosContext](auto& element) { analysis_task_parsers::postRunService(eosContext, element); analysis_task_parsers::postRunOutput(eosContext, element); return true; }, - *task.get()); - eosContext.services().get().readyToQuit(QuitRequest::Me); - }; - - callbacks.set(eoscb); - - /// call the task's init() function first as it may manipulate the task's elements - if constexpr (requires { task->init(ic); }) { - task->init(ic); - } - - /// update configurables in filters and partitions - homogeneous_apply_refs_sized( - [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); }, - *task.get()); - /// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos - homogeneous_apply_refs_sized([&expressionInfos](auto& element) { - return analysis_task_parsers::createExpressionTrees(expressionInfos, element); - }, - *task.get()); + *task.get()); + eosContext.services().get().readyToQuit(QuitRequest::Me); + }; - /// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values - if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted); - } - homogeneous_apply_refs_sized( - [&bindingsKeys, &bindingsKeysUnsorted](auto& x) { - return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted); - }, - *task.get()); + callbacks.set(eoscb); - /// replace origin in slicing caches - std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) { - if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { - entry.matcher = replaceOrigin(entry.matcher, newOrigin); + /// call the task's init() function first as it may manipulate the task's elements + if constexpr (requires { task->init(ic); }) { + task->init(ic); } - return entry; - }); - std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) { - if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { - entry.matcher = replaceOrigin(entry.matcher, newOrigin); - } - return entry; - }); - ic.services().get().setCaches(std::move(bindingsKeys)); - ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); - ic.services().get().setOrigin(newOrigin); - - return [task, expressionInfos, inputInfos, newOrigin](ProcessingContext& pc) mutable { - // load the ccdb object from their cache - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); - // reset partitions once per dataframe - homogeneous_apply_refs_sized([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get()); - // reset selections for the next dataframe - std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; }); - // reset pre-slice for the next dataframe - auto& slices = pc.services().get(); - homogeneous_apply_refs_sized([&slices](auto& element) { - return analysis_task_parsers::updateSliceInfo(element, slices); + /// update configurables in filters and partitions + homogeneous_apply_refs_sized( + [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); }, + *task.get()); + /// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos + homogeneous_apply_refs_sized([&expressionInfos](auto& element) { + return analysis_task_parsers::createExpressionTrees(expressionInfos, element); }, - *(task.get())); - // initialize local caches - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get())); - // prepare outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get()); - // execute run() - if constexpr (requires { task->run(pc); }) { - task->run(pc); - } - // execute process() + *task.get()); + + /// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values if constexpr (requires { &T::process; }) { - auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); - auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; - AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted); } - // execute optional process() homogeneous_apply_refs_sized( - [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { - if constexpr (is_process_configurable) { - if (x.value == true) { - auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); - auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; - AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin); - return true; - } - return false; - } - return false; + [&bindingsKeys, &bindingsKeysUnsorted](auto& x) { + return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); - // prepare delayed outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get()); - // finalize outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get()); - }; - }}; + + /// replace origin in slicing caches + std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) { + if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { + entry.matcher = replaceOrigin(entry.matcher, newOrigin); + } + return entry; + }); + std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) { + if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { + entry.matcher = replaceOrigin(entry.matcher, newOrigin); + } + return entry; + }); + + ic.services().get().setCaches(std::move(bindingsKeys)); + ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); + ic.services().get().setOrigin(newOrigin); +#if (FAIRMQ_VERSION_DEC >= 111000) + PointerReconstructor pointerReconstructor(nullptr); + bool hasCCDBTables = !ic.services().get().requestedTIMs.empty(); + + return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable { + if (hasCCDBTables && (!pointerReconstructor)) { + auto& proxy = pc.services().get(); + auto& spec = pc.services().get().requestedTIMs.front(); + pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0); + } +#else + return [task, expressionInfos, inputInfos, newOrigin](ProcessingContext& pc) mutable { +#endif + // load the ccdb object from their cache + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); + // reset partitions once per dataframe + homogeneous_apply_refs_sized([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get()); + // reset selections for the next dataframe + std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; }); + // reset pre-slice for the next dataframe + auto& slices = pc.services().get(); + homogeneous_apply_refs_sized([&slices](auto& element) { + return analysis_task_parsers::updateSliceInfo(element, slices); + }, + *(task.get())); + // initialize local caches + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get())); + // prepare outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get()); + // execute run() + if constexpr (requires { task->run(pc); }) { + task->run(pc); + } + // execute process() + if constexpr (requires { &T::process; }) { + auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); + auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; +#if (FAIRMQ_VERSION_DEC >= 111000) + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin); +#else + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin); +#endif + } + // execute optional process() + homogeneous_apply_refs_sized( +#if (FAIRMQ_VERSION_DEC >= 111000) + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto& x) { +#else + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { +#endif + if constexpr (is_process_configurable) { + if (x.value == true) { + auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); + auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; +#if (FAIRMQ_VERSION_DEC >= 111000) + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, pointerReconstructor, x.process, expressionInfos, slices, newOrigin); +#else + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin); +#endif + return true; + } + return false; + } + return false; + }, + *task.get()); + // prepare delayed outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get()); + // finalize outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get()); + }; + } + }; return { name, diff --git a/Framework/Core/include/Framework/ArrowTypes.h b/Framework/Core/include/Framework/ArrowTypes.h index 2673472a81152..ef4c2c6323d8e 100644 --- a/Framework/Core/include/Framework/ArrowTypes.h +++ b/Framework/Core/include/Framework/ArrowTypes.h @@ -93,6 +93,11 @@ struct arrow_array_for { using type = arrow::FixedSizeListArray; using value_type = int8_t; }; +template +struct arrow_array_for { + using type = arrow::FixedSizeListArray; + using value_type = int64_t; +}; #define ARROW_VECTOR_FOR(_type_) \ template <> \ diff --git a/Framework/Core/include/Framework/DataAllocator.h b/Framework/Core/include/Framework/DataAllocator.h index ed9a31ca2857c..b7169a5818680 100644 --- a/Framework/Core/include/Framework/DataAllocator.h +++ b/Framework/Core/include/Framework/DataAllocator.h @@ -42,6 +42,10 @@ // Do not change this for a full inclusion of fair::mq::Device. #include +#include +#if (FAIRMQ_VERSION_DEC >= 111000) +#include +#endif namespace arrow { @@ -496,6 +500,8 @@ class DataAllocator struct CacheId { int64_t value; + int64_t handle; + int64_t segment; }; enum struct CacheStrategy : int { @@ -507,7 +513,7 @@ class DataAllocator CacheId adoptContainer(const Output& /*spec*/, ContainerT& /*container*/, CacheStrategy /* cache = false */, o2::header::SerializationMethod /* method = header::gSerializationMethodNone*/) { static_assert(always_static_assert_v, "Container cannot be moved. Please make sure it is backed by a o2::pmr::FairMQMemoryResource"); - return {0}; + return {0, 0, 0}; } /// Adopt a PMR container. Notice that the container must be moveable and @@ -595,12 +601,17 @@ DataAllocator::CacheId DataAllocator::adoptContainer(const Output& spec, Contain payloadMessage->GetSize() // ); - CacheId cacheId{0}; // + CacheId cacheId{0, 0, 0}; // if (cache == CacheStrategy::Always) { // The message will be shallow cloned in the cache. Since the // clone is indistinguishable from the original, we can keep sending // the original. cacheId.value = context.addToCache(payloadMessage); +#if (FAIRMQ_VERSION_DEC >= 111000) + auto meta = dynamic_cast(payloadMessage.get())->GetMeta(); + cacheId.handle = meta.fHandle; + cacheId.segment = meta.fSegmentId; +#endif } context.add(std::move(headerMessage), std::move(payloadMessage), routeIndex); diff --git a/Framework/Core/include/Framework/FairMQDeviceProxy.h b/Framework/Core/include/Framework/FairMQDeviceProxy.h index dbdade465f09c..8ab93d83dc1c2 100644 --- a/Framework/Core/include/Framework/FairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/FairMQDeviceProxy.h @@ -20,6 +20,10 @@ #include "Framework/InputRoute.h" #include "Framework/ForwardRoute.h" #include +#include +#if (FAIRMQ_VERSION_DEC >= 111000) +#include +#endif #include namespace o2::header @@ -29,6 +33,9 @@ struct DataHeader; namespace o2::framework { +#if (FAIRMQ_VERSION_DEC >= 111000) +using PointerReconstructor = std::function; +#endif /// Helper class to hide fair::mq::Device headers in the DataAllocator header. /// This is done because fair::mq::Device brings in a bunch of boost.mpl / /// boost.fusion stuff, slowing down compilation times enourmously. @@ -58,6 +65,10 @@ class FairMQDeviceProxy [[nodiscard]] ChannelIndex getForwardChannelIndexByName(std::string const& channelName) const; /// Retrieve the channel index from a given OutputSpec and the associated timeslice [[nodiscard]] ChannelIndex getOutputChannelIndex(OutputSpec const& spec, size_t timeslice) const; +#if (FAIRMQ_VERSION_DEC >= 111000) + /// Retrieve the pointer-reconstruction function for the shm manager for a given input spec + [[nodiscard]] PointerReconstructor getShmPointerReconstructor(InputSpec const& spec, size_t timeslice); +#endif /// Retrieve the channel index from a given OutputSpec and the associated timeslice void getMatchingForwardChannelIndexes(std::vector& result, header::DataHeader const& header, size_t timeslice) const; /// ChannelIndex from a RouteIndex diff --git a/Framework/Core/src/FairMQDeviceProxy.cxx b/Framework/Core/src/FairMQDeviceProxy.cxx index e121084b866a2..b1cd9c9352829 100644 --- a/Framework/Core/src/FairMQDeviceProxy.cxx +++ b/Framework/Core/src/FairMQDeviceProxy.cxx @@ -364,4 +364,25 @@ void FairMQDeviceProxy::bind(std::vector const& outputs, std::vecto } mStateChangeCallback = newStatePending; } + +#if (FAIRMQ_VERSION_DEC >= 111000) +PointerReconstructor FairMQDeviceProxy::getShmPointerReconstructor(InputSpec const& spec, size_t timeslice) +{ + assert(mInputRoutes.size() == mInputs.size()); + ChannelIndex c{-1}; + for (size_t ri = 0; ri < mInputs.size(); ++ri) { + auto& route = mInputs[ri]; + + LOG(debug) << "matching: " << DataSpecUtils::describe(spec) << " to route " << DataSpecUtils::describe(route.matcher); + if ((spec == route.matcher) && (timeslice == route.timeslice)) { + c = mInputRoutes[ri].channel; + break; + } + } + if (c.value != ChannelIndex::INVALID) { + return {[transport = getInputChannel(c)->Transport()](fair::mq::shmem::MetaHeader&& meta) { return reinterpret_cast(fair::mq::shmem::GetDataAddressFromHandle(*transport, meta)); }}; + } + return {}; +} +#endif } // namespace o2::framework