From 69a8df5cdb4270cc888b2db844b4c67887077306 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 9 Jun 2026 09:25:45 +0200 Subject: [PATCH 01/10] DPL Analysis: use shm metadata in CCDB tables instead of binary view --- .../CCDBSupport/src/AnalysisCCDBHelpers.cxx | 67 +++++++++++--- Framework/Core/include/Framework/ASoA.h | 91 ++++++++++++++++++- .../include/Framework/AnalysisDataModel.h | 5 + .../Core/include/Framework/AnalysisTask.h | 51 ++++++++++- Framework/Core/include/Framework/ArrowTypes.h | 5 + .../Core/include/Framework/DataAllocator.h | 10 +- .../include/Framework/FairMQDeviceProxy.h | 11 +++ Framework/Core/src/FairMQDeviceProxy.cxx | 21 +++++ 8 files changed, 242 insertions(+), 19 deletions(-) diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index 21fdae4a57760..b8932d7dbc991 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -11,6 +11,7 @@ #include "AnalysisCCDBHelpers.h" #include "CCDBFetcherHelper.h" +#include "Framework/ArrowTypes.h" #include "Framework/DataProcessingStats.h" #include "Framework/DeviceSpec.h" #include "Framework/TimingInfo.h" @@ -22,6 +23,7 @@ #include "Framework/DanglingEdgesContext.h" #include "Framework/ConfigContext.h" #include "Framework/ConfigParamsHelper.h" +#include #include #include #include @@ -109,20 +111,45 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) auto it = ccdbUrls.find(m.name); fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString()); auto columnName = m.name.substr(strlen("ccdb:")); +#if (FAIRMQ_VERSION_DEC >= 111000) + fields.emplace_back(std::make_shared(columnName, soa::asArrowDataType(), false, fieldMetadata)); +#else fields.emplace_back(std::make_shared(columnName, arrow::binary_view(), false, fieldMetadata)); +#endif } schemas.emplace_back(std::make_shared(fields, schemaMetadata)); } + std::vector>> allbuilders; + allbuilders.resize([&schemas]() { size_t size = 0; for (auto& schema : schemas) { size += schema->num_fields(); }; return size; }()); + auto* pool = arrow::default_memory_pool(); + + int idx = 0; + int sidx = 0; + for (auto const& schema : schemas) { + for (auto const& _ : schema->fields()) { +#if (FAIRMQ_VERSION_DEC >= 111000) + auto value_builder = std::make_shared(); + allbuilders[idx] = std::make_pair(sidx, std::make_shared(pool, std::move(value_builder), 3)); +#else + allbuilders[idx] = std::make_pair(sidx, std::make_shared()); +#endif + ++idx; + } + ++sidx; + } + std::shared_ptr helper = std::make_shared(); CCDBFetcherHelper::initialiseHelper(*helper, options); std::unordered_map bindings; fillValidRoutes(*helper, spec.outputs, bindings); - return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) { + return adaptStateless([schemas, bindings, helper, allbuilders](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) { O2_SIGNPOST_ID_GENERATE(sid, ccdb); O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice); - for (auto& schema : schemas) { + std::ranges::for_each(allbuilders, [](auto& builder) { builder.second->Reset(); }); + for (auto i = 0U; i < schemas.size(); ++i) { + auto& schema = schemas[i]; std::vector ops; auto inputBinding = *schema->metadata()->Get("sourceTable"); auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher")); @@ -134,6 +161,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) auto table = inputs.get(inputMatcher)->asArrowTable(); // FIXME: make the fTimestamp column configurable. auto timestampColumn = table->GetColumnByName("fTimestamp"); + auto reserveSize = timestampColumn->length(); O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB", "There are %zu bindings available", bindings.size()); for (auto& binding : bindings) { @@ -143,9 +171,16 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) } int outputRouteIndex = bindings.at(outRouteDesc); auto& spec = helper->routes[outputRouteIndex].matcher; - std::vector> builders; - for (auto const& _ : schema->fields()) { - builders.emplace_back(std::make_shared()); + auto builders = allbuilders | std::views::filter([&i](auto const& builder) { return builder.first == i; }); + unsigned int numBuilders = std::ranges::count_if(allbuilders, [&i](auto const& builder) { return builder.first == i; }); + arrow::Status status; + std::ranges::for_each(builders, [&status, &reserveSize](auto& builder) { + if (reserveSize > builder.second->capacity()) { + status &= builder.second->Reserve(reserveSize - builder.second->capacity()); + } + }); + if (!status.ok()) { + throw framework::runtime_error_f("Failed to reserve arrays: ", status.ToString().c_str()); } for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) { @@ -171,15 +206,25 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) O2_SIGNPOST_START(ccdb, sid, "handlingResponses", "Got %zu responses from server.", responses.size()); - if (builders.size() != responses.size()) { - LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size()); + if (numBuilders != responses.size()) { + LOGP(fatal, "Not enough responses (expected {}, found {})", numBuilders, responses.size()); } arrow::Status result; - for (size_t bi = 0; bi < responses.size(); bi++) { - auto& builder = builders[bi]; + + int bi = 0; + for (auto& builder : builders) { auto& response = responses[bi]; +#if (FAIRMQ_VERSION_DEC >= 111000) + result &= builder.second->Append(); + auto* value_builder = dynamic_cast(builder.second->value_builder()); + result &= value_builder->Append(response.id.handle); + result &= value_builder->Append(response.id.segment); + result &= value_builder->Append(response.size); +#else char const* address = reinterpret_cast(response.id.value); result &= builder->Append(std::string_view(address, response.size)); +#endif + ++bi; } if (!result.ok()) { LOGP(fatal, "Error adding results from CCDB"); @@ -188,9 +233,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) } } arrow::ArrayVector arrays; - for (auto& builder : builders) { - arrays.push_back(*builder->Finish()); - } + std::ranges::for_each(builders, [&arrays](auto& builder) { arrays.push_back(*builder.second->Finish()); }); auto outTable = arrow::Table::Make(schema, arrays); auto concrete = DataSpecUtils::asConcreteDataMatcher(spec); allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable); diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index 6ff77f8facaec..e81cae3ee5fa9 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -24,6 +24,7 @@ #include "Framework/ArrowTableSlicingCache.h" // IWYU pragma: export #include "Framework/SliceCache.h" // IWYU pragma: export #include "Framework/VariantHelpers.h" // IWYU pragma: export +#include #include #include // IWYU pragma: export #include // IWYU pragma: export @@ -36,9 +37,16 @@ #include #include // IWYU pragma: export +namespace fair::mq::shmem { +struct MetaHeader; +} + namespace o2::framework { using ListVector = std::vector>; +#if (FAIRMQ_VERSION_DEC >= 111000) +using PointerReconstructor = std::function; +#endif std::string cutString(std::string&& str); std::string strToUpper(std::string&& str); @@ -1081,6 +1089,9 @@ concept can_bind = requires(T&& t) { template concept has_index = (is_indexing_column || ...); +template +concept needs_ptr_rec = C::needs_ptr_rec; + template struct TableIterator : IP, C... { public: @@ -1248,6 +1259,21 @@ struct TableIterator : IP, C... { { doSetCurrentInternal(internal_index_columns_t{}, table); } +#if (FAIRMQ_VERSION_DEC >= 111000) + void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor) + { + [&pointerReconstructor, this](framework::pack) { + ([&pointerReconstructor, this]() { + if constexpr (needs_ptr_rec) { + if (pointerReconstructor) { + CC::ptrRec = &pointerReconstructor; + } + } + }.template operator()(), + ...); + }(all_columns{}); + } +#endif private: /// Helper to move at the end of columns which actually have an iterator. @@ -2285,7 +2311,12 @@ class Table { return self_t{mTable->Slice(0, 0), 0}; } - +#if (FAIRMQ_VERSION_DEC >= 111000) + void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor) + { + mBegin.setPointerReconstructor(pointerReconstructor); + } +#endif private: template arrow::ChunkedArray* lookupColumn() @@ -2464,6 +2495,56 @@ consteval static std::string_view namespace_prefix() }; \ [[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() } +#if (FAIRMQ_VERSION_DEC >= 111000) +#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \ + struct _Name_ : o2::soa::Column { \ + static constexpr const char* mLabel = _Label_; \ + static constexpr const char* query = _CCDBQuery_; \ + static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \ + static constexpr bool needs_ptr_rec = true; \ + std::function const* ptrRec = nullptr; \ + using base = o2::soa::Column; \ + using type = int64_t[3]; \ + using column_t = _Name_; \ + _Name_(arrow::ChunkedArray const* column) \ + : o2::soa::Column(o2::soa::ColumnIterator(column)) \ + { \ + } \ + \ + _Name_() = default; \ + _Name_(_Name_ const& other) = default; \ + _Name_& operator=(_Name_ const& other) = default; \ + \ + decltype(auto) _Getter_() const \ + { \ + auto& [handle, segment, size] = *mColumnIterator; \ + auto span = std::span{(*ptrRec)(fair::mq::shmem::MetaHeader{ \ + static_cast(size), \ + 0, handle, 0, 0, \ + static_cast(segment), true}), static_cast(size)}; \ + if constexpr (std::same_as<_ConcreteType_, std::span>) { \ + return span; \ + } else { \ + static std::byte* payload = nullptr; \ + static _ConcreteType_* deserialised = nullptr; \ + static TClass* c = TClass::GetClass(#_ConcreteType_); \ + if (payload != (std::byte*)span.data()) { \ + payload = (std::byte*)span.data(); \ + delete deserialised; \ + TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \ + deserialised = (_ConcreteType_*)soa::extractCCDBPayload((char*)payload, span.size(), c, "ccdb_object"); \ + } \ + return *deserialised; \ + } \ + } \ + \ + decltype(auto) \ + get() const \ + { \ + return _Getter_(); \ + } \ + }; +#else #define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \ struct _Name_ : o2::soa::Column, _Name_> { \ static constexpr const char* mLabel = _Label_; \ @@ -2506,6 +2587,7 @@ consteval static std::string_view namespace_prefix() return _Getter_(); \ } \ }; +#endif #define DECLARE_SOA_CCDB_COLUMN(_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) \ DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, "f" #_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) @@ -3849,7 +3931,12 @@ class FilteredBase : public T { return mCached; } - +#if (FAIRMQ_VERSION_DEC >= 111000) + void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor) + { + mFilteredBegin.setPointerReconstructor(pointerReconstructor); + } +#endif private: void resetRanges() { diff --git a/Framework/Core/include/Framework/AnalysisDataModel.h b/Framework/Core/include/Framework/AnalysisDataModel.h index c8dd33fba62ee..2d264b648ed76 100644 --- a/Framework/Core/include/Framework/AnalysisDataModel.h +++ b/Framework/Core/include/Framework/AnalysisDataModel.h @@ -26,6 +26,11 @@ #include "SimulationDataFormat/MCGenProperties.h" #include "Framework/PID.h" +#include +#if (FAIRMQ_VERSION_DEC >= 111000) +#include +#endif + namespace o2 { namespace aod diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 3170236e18f09..0888d3481f70b 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -26,6 +26,8 @@ #include "Framework/TypeIdHelpers.h" #include "Framework/ArrowTableSlicingCache.h" #include "Framework/AnalysisDataModel.h" +#include "Framework/DanglingEdgesContext.h" +#include #include #include @@ -302,11 +304,19 @@ struct AnalysisDataProcessorBuilder { } template +#if (FAIRMQ_VERSION_DEC >= 111000) + static void invokeProcess(Task& task, InputRecord& inputs, R matchers, PointerReconstructor const& pointerReconstructor, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) +#else static void invokeProcess(Task& task, InputRecord& inputs, R matchers, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) +#endif { using G = std::decay_t; auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, matchers, processingFunction, infos); - +#if (FAIRMQ_VERSION_DEC >= 111000) + if constexpr (!is_enumeration) { + groupingTable.setPointerReconstructor(pointerReconstructor); + } +#endif constexpr const int numElements = nested_brace_constructible_size>() / 10; // set filtered tables for partitions with grouping @@ -347,6 +357,12 @@ struct AnalysisDataProcessorBuilder { ...); }, associatedTables); +#if (FAIRMQ_VERSION_DEC >= 111000) + std::apply([&pointerReconstructor](auto&... table) { + (table.setPointerReconstructor(pointerReconstructor), ...); + }, + associatedTables); +#endif auto binder = [&task, &groupingTable, &associatedTables](auto& x) mutable { x.bindExternalIndices(&groupingTable, &std::get>(associatedTables)...); @@ -377,6 +393,12 @@ struct AnalysisDataProcessorBuilder { auto slicer = GroupSlicer(groupingTable, associatedTables, slices, newOrigin); for (auto& slice : slicer) { auto associatedSlices = slice.associatedTables(); +#if (FAIRMQ_VERSION_DEC >= 111000) + std::apply([&pointerReconstructor](auto&... table) { + (table.setPointerReconstructor(pointerReconstructor), ...); + }, + associatedSlices); +#endif overwriteInternalIndices(associatedSlices, associatedTables); std::apply( [&binder](auto&... x) mutable { @@ -643,8 +665,19 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) ic.services().get().setCaches(std::move(bindingsKeys)); ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); ic.services().get().setOrigin(newOrigin); - - return [task, expressionInfos, inputInfos, newOrigin](ProcessingContext& pc) mutable { +#if (FAIRMQ_VERSION_DEC >= 111000) + PointerReconstructor pointerReconstructor(nullptr); + bool hasCCDBTables = !ic.services().get().requestedTIMs.empty(); + + return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable { + if (hasCCDBTables && (!pointerReconstructor)) { + auto& proxy = pc.services().get(); + auto& spec = pc.services().get().requestedTIMs.front(); + pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0); + } +#else + return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable { +#endif // load the ccdb object from their cache homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); // reset partitions once per dataframe @@ -669,16 +702,28 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) if constexpr (requires { &T::process; }) { auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; +#if (FAIRMQ_VERSION_DEC >= 111000) + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin); +#else AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin); +#endif } // execute optional process() homogeneous_apply_refs_sized( +#if (FAIRMQ_VERSION_DEC >= 111000) + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto& x) { +#else [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { +#endif if constexpr (is_process_configurable) { if (x.value == true) { auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; +#if (FAIRMQ_VERSION_DEC >= 111000) + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, pointerReconstructor, x.process, expressionInfos, slices, newOrigin); +#else AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin); +#endif return true; } return false; diff --git a/Framework/Core/include/Framework/ArrowTypes.h b/Framework/Core/include/Framework/ArrowTypes.h index 2673472a81152..ef4c2c6323d8e 100644 --- a/Framework/Core/include/Framework/ArrowTypes.h +++ b/Framework/Core/include/Framework/ArrowTypes.h @@ -93,6 +93,11 @@ struct arrow_array_for { using type = arrow::FixedSizeListArray; using value_type = int8_t; }; +template +struct arrow_array_for { + using type = arrow::FixedSizeListArray; + using value_type = int64_t; +}; #define ARROW_VECTOR_FOR(_type_) \ template <> \ diff --git a/Framework/Core/include/Framework/DataAllocator.h b/Framework/Core/include/Framework/DataAllocator.h index ed9a31ca2857c..079565f7aba8d 100644 --- a/Framework/Core/include/Framework/DataAllocator.h +++ b/Framework/Core/include/Framework/DataAllocator.h @@ -42,6 +42,7 @@ // Do not change this for a full inclusion of fair::mq::Device. #include +#include namespace arrow { @@ -496,6 +497,8 @@ class DataAllocator struct CacheId { int64_t value; + int64_t handle; + int64_t segment; }; enum struct CacheStrategy : int { @@ -507,7 +510,7 @@ class DataAllocator CacheId adoptContainer(const Output& /*spec*/, ContainerT& /*container*/, CacheStrategy /* cache = false */, o2::header::SerializationMethod /* method = header::gSerializationMethodNone*/) { static_assert(always_static_assert_v, "Container cannot be moved. Please make sure it is backed by a o2::pmr::FairMQMemoryResource"); - return {0}; + return {0, 0, 0}; } /// Adopt a PMR container. Notice that the container must be moveable and @@ -595,12 +598,15 @@ DataAllocator::CacheId DataAllocator::adoptContainer(const Output& spec, Contain payloadMessage->GetSize() // ); - CacheId cacheId{0}; // + CacheId cacheId{0, 0, 0}; // if (cache == CacheStrategy::Always) { // The message will be shallow cloned in the cache. Since the // clone is indistinguishable from the original, we can keep sending // the original. cacheId.value = context.addToCache(payloadMessage); + auto meta = dynamic_cast(payloadMessage.get())->GetMeta(); + cacheId.handle = meta.fHandle; + cacheId.segment = meta.fSegmentId; } context.add(std::move(headerMessage), std::move(payloadMessage), routeIndex); diff --git a/Framework/Core/include/Framework/FairMQDeviceProxy.h b/Framework/Core/include/Framework/FairMQDeviceProxy.h index dbdade465f09c..1a73a90947bb3 100644 --- a/Framework/Core/include/Framework/FairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/FairMQDeviceProxy.h @@ -20,6 +20,10 @@ #include "Framework/InputRoute.h" #include "Framework/ForwardRoute.h" #include +#include +#if (FAIRMQ_VERSION_DEC >= 111000) +#include +#endif #include namespace o2::header @@ -29,6 +33,9 @@ struct DataHeader; namespace o2::framework { +#if (FAIRMQ_VERSION_DEC >= 111000) +using PointerReconstructor = std::function; +#endif /// Helper class to hide fair::mq::Device headers in the DataAllocator header. /// This is done because fair::mq::Device brings in a bunch of boost.mpl / /// boost.fusion stuff, slowing down compilation times enourmously. @@ -58,8 +65,12 @@ class FairMQDeviceProxy [[nodiscard]] ChannelIndex getForwardChannelIndexByName(std::string const& channelName) const; /// Retrieve the channel index from a given OutputSpec and the associated timeslice [[nodiscard]] ChannelIndex getOutputChannelIndex(OutputSpec const& spec, size_t timeslice) const; + /// Retrieve the pointer-reconstruction function for the shm manager for a given input spec + [[nodiscard]] PointerReconstructor getShmPointerReconstructor(InputSpec const& spec, size_t timeslice); +#if (FAIRMQ_VERSION_DEC >= 111000) /// Retrieve the channel index from a given OutputSpec and the associated timeslice void getMatchingForwardChannelIndexes(std::vector& result, header::DataHeader const& header, size_t timeslice) const; +#endif /// ChannelIndex from a RouteIndex [[nodiscard]] ChannelIndex getOutputChannelIndex(RouteIndex routeIndex) const; [[nodiscard]] ChannelIndex getInputChannelIndex(RouteIndex routeIndex) const; diff --git a/Framework/Core/src/FairMQDeviceProxy.cxx b/Framework/Core/src/FairMQDeviceProxy.cxx index e121084b866a2..317d8fba24504 100644 --- a/Framework/Core/src/FairMQDeviceProxy.cxx +++ b/Framework/Core/src/FairMQDeviceProxy.cxx @@ -364,4 +364,25 @@ void FairMQDeviceProxy::bind(std::vector const& outputs, std::vecto } mStateChangeCallback = newStatePending; } + +#if (FAIRMQ_VERSION_DEC >= 111000) +PointerReconstructor FairMQDeviceProxy::getShmPointerReconstructor(InputSpec const& spec, size_t timeslice) +{ + assert(mInputRoutes.size() == mInputs.size()); + ChannelIndex c{-1}; + for (size_t ri = 0; ri < mInputs.size(); ++ri) { + auto& route = mInputs[ri]; + + LOG(debug) << "matching: " << DataSpecUtils::describe(spec) << " to route " << DataSpecUtils::describe(route.matcher); + if ((spec == route.matcher) && (timeslice == route.timeslice)) { + c = mInputRoutes[ri].channel; + break; + } + } + if (c.value != ChannelIndex::INVALID) { + return {[transport = getInputChannel(c)->Transport()](fair::mq::shmem::MetaHeader&& meta){ return reinterpret_cast(fair::mq::shmem::GetDataAddressFromHandle(*transport, meta)); }}; + } + return {}; +} +#endif } // namespace o2::framework From 633e6a473ac95981196df67453b02603f7d4762c Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 12 Jun 2026 11:13:25 +0200 Subject: [PATCH 02/10] fixup! DPL Analysis: use shm metadata in CCDB tables instead of binary view --- Framework/Core/include/Framework/DataAllocator.h | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/DataAllocator.h b/Framework/Core/include/Framework/DataAllocator.h index 079565f7aba8d..b7169a5818680 100644 --- a/Framework/Core/include/Framework/DataAllocator.h +++ b/Framework/Core/include/Framework/DataAllocator.h @@ -42,7 +42,10 @@ // Do not change this for a full inclusion of fair::mq::Device. #include -#include +#include +#if (FAIRMQ_VERSION_DEC >= 111000) +#include +#endif namespace arrow { @@ -604,9 +607,11 @@ DataAllocator::CacheId DataAllocator::adoptContainer(const Output& spec, Contain // clone is indistinguishable from the original, we can keep sending // the original. cacheId.value = context.addToCache(payloadMessage); +#if (FAIRMQ_VERSION_DEC >= 111000) auto meta = dynamic_cast(payloadMessage.get())->GetMeta(); cacheId.handle = meta.fHandle; cacheId.segment = meta.fSegmentId; +#endif } context.add(std::move(headerMessage), std::move(payloadMessage), routeIndex); From 34e0edc16e954eb31b2792f4a1fe8304188c1576 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Fri, 12 Jun 2026 09:20:08 +0000 Subject: [PATCH 03/10] Please consider the following formatting changes --- Framework/Core/include/Framework/ASoA.h | 12 +++++++----- Framework/Core/include/Framework/AnalysisTask.h | 3 ++- Framework/Core/src/FairMQDeviceProxy.cxx | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index e81cae3ee5fa9..269cfe51a4750 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -37,7 +37,8 @@ #include #include // IWYU pragma: export -namespace fair::mq::shmem { +namespace fair::mq::shmem +{ struct MetaHeader; } @@ -2518,10 +2519,11 @@ consteval static std::string_view namespace_prefix() decltype(auto) _Getter_() const \ { \ auto& [handle, segment, size] = *mColumnIterator; \ - auto span = std::span{(*ptrRec)(fair::mq::shmem::MetaHeader{ \ - static_cast(size), \ - 0, handle, 0, 0, \ - static_cast(segment), true}), static_cast(size)}; \ + auto span = std::span{(*ptrRec)(fair::mq::shmem::MetaHeader{ \ + static_cast(size), \ + 0, handle, 0, 0, \ + static_cast(segment), true}), \ + static_cast(size)}; \ if constexpr (std::same_as<_ConcreteType_, std::span>) { \ return span; \ } else { \ diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 0888d3481f70b..92f7079fc81fa 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -736,7 +736,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) // finalize outputs homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get()); }; - }}; + } + }; return { name, diff --git a/Framework/Core/src/FairMQDeviceProxy.cxx b/Framework/Core/src/FairMQDeviceProxy.cxx index 317d8fba24504..b1cd9c9352829 100644 --- a/Framework/Core/src/FairMQDeviceProxy.cxx +++ b/Framework/Core/src/FairMQDeviceProxy.cxx @@ -380,7 +380,7 @@ PointerReconstructor FairMQDeviceProxy::getShmPointerReconstructor(InputSpec con } } if (c.value != ChannelIndex::INVALID) { - return {[transport = getInputChannel(c)->Transport()](fair::mq::shmem::MetaHeader&& meta){ return reinterpret_cast(fair::mq::shmem::GetDataAddressFromHandle(*transport, meta)); }}; + return {[transport = getInputChannel(c)->Transport()](fair::mq::shmem::MetaHeader&& meta) { return reinterpret_cast(fair::mq::shmem::GetDataAddressFromHandle(*transport, meta)); }}; } return {}; } From 04c5c9b6775ae7c069d2aaca518c55e7b1e06102 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Fri, 12 Jun 2026 09:21:56 +0000 Subject: [PATCH 04/10] Please consider the following formatting changes --- .../Core/include/Framework/AnalysisTask.h | 223 +++++++++--------- 1 file changed, 112 insertions(+), 111 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 92f7079fc81fa..ab675d8f128c7 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -602,141 +602,142 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) // replace origins in Preslice declarations homogeneous_apply_refs_sized([&newOrigin](auto& element) { return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get()); - auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { - Cache bindingsKeys; - Cache bindingsKeysUnsorted; - // add preslice declarations to slicing cache definition - homogeneous_apply_refs_sized([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); - - homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get()); - homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get()); - - auto& callbacks = ic.services().get(); - auto eoscb = [task](EndOfStreamContext& eosContext) { - homogeneous_apply_refs_sized([&eosContext](auto& element) { + auto algo = AlgorithmSpec::InitCallback{ + [task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { + Cache bindingsKeys; + Cache bindingsKeysUnsorted; + // add preslice declarations to slicing cache definition + homogeneous_apply_refs_sized([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get()); + + homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get()); + homogeneous_apply_refs_sized([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get()); + + auto& callbacks = ic.services().get(); + auto eoscb = [task](EndOfStreamContext& eosContext) { + homogeneous_apply_refs_sized([&eosContext](auto& element) { analysis_task_parsers::postRunService(eosContext, element); analysis_task_parsers::postRunOutput(eosContext, element); return true; }, - *task.get()); - eosContext.services().get().readyToQuit(QuitRequest::Me); - }; + *task.get()); + eosContext.services().get().readyToQuit(QuitRequest::Me); + }; - callbacks.set(eoscb); + callbacks.set(eoscb); - /// call the task's init() function first as it may manipulate the task's elements - if constexpr (requires { task->init(ic); }) { - task->init(ic); - } - - /// update configurables in filters and partitions - homogeneous_apply_refs_sized( - [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); }, - *task.get()); - /// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos - homogeneous_apply_refs_sized([&expressionInfos](auto& element) { - return analysis_task_parsers::createExpressionTrees(expressionInfos, element); - }, - *task.get()); + /// call the task's init() function first as it may manipulate the task's elements + if constexpr (requires { task->init(ic); }) { + task->init(ic); + } - /// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values - if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted); - } - homogeneous_apply_refs_sized( - [&bindingsKeys, &bindingsKeysUnsorted](auto& x) { - return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted); + /// update configurables in filters and partitions + homogeneous_apply_refs_sized( + [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); }, + *task.get()); + /// create expression trees for filters gandiva trees matched to schemas and store the pointers into expressionInfos + homogeneous_apply_refs_sized([&expressionInfos](auto& element) { + return analysis_task_parsers::createExpressionTrees(expressionInfos, element); }, - *task.get()); + *task.get()); - /// replace origin in slicing caches - std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) { - if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { - entry.matcher = replaceOrigin(entry.matcher, newOrigin); - } - return entry; - }); - std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) { - if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { - entry.matcher = replaceOrigin(entry.matcher, newOrigin); + /// parse process functions to enable requested grouping caches - note that at this state process configurables have their final values + if constexpr (requires { &T::process; }) { + AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted); } - return entry; - }); + homogeneous_apply_refs_sized( + [&bindingsKeys, &bindingsKeysUnsorted](auto& x) { + return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted); + }, + *task.get()); - ic.services().get().setCaches(std::move(bindingsKeys)); - ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); - ic.services().get().setOrigin(newOrigin); + /// replace origin in slicing caches + std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) { + if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { + entry.matcher = replaceOrigin(entry.matcher, newOrigin); + } + return entry; + }); + std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) { + if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { + entry.matcher = replaceOrigin(entry.matcher, newOrigin); + } + return entry; + }); + + ic.services().get().setCaches(std::move(bindingsKeys)); + ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); + ic.services().get().setOrigin(newOrigin); #if (FAIRMQ_VERSION_DEC >= 111000) - PointerReconstructor pointerReconstructor(nullptr); - bool hasCCDBTables = !ic.services().get().requestedTIMs.empty(); - - return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable { - if (hasCCDBTables && (!pointerReconstructor)) { - auto& proxy = pc.services().get(); - auto& spec = pc.services().get().requestedTIMs.front(); - pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0); - } + PointerReconstructor pointerReconstructor(nullptr); + bool hasCCDBTables = !ic.services().get().requestedTIMs.empty(); + + return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables, pointerReconstructor](ProcessingContext& pc) mutable { + if (hasCCDBTables && (!pointerReconstructor)) { + auto& proxy = pc.services().get(); + auto& spec = pc.services().get().requestedTIMs.front(); + pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0); + } #else - return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable { + return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable { #endif - // load the ccdb object from their cache - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); - // reset partitions once per dataframe - homogeneous_apply_refs_sized([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get()); - // reset selections for the next dataframe - std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; }); - // reset pre-slice for the next dataframe - auto& slices = pc.services().get(); - homogeneous_apply_refs_sized([&slices](auto& element) { - return analysis_task_parsers::updateSliceInfo(element, slices); - }, - *(task.get())); - // initialize local caches - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get())); - // prepare outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get()); - // execute run() - if constexpr (requires { task->run(pc); }) { - task->run(pc); - } - // execute process() - if constexpr (requires { &T::process; }) { - auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); - auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; + // load the ccdb object from their cache + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); + // reset partitions once per dataframe + homogeneous_apply_refs_sized([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get()); + // reset selections for the next dataframe + std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; }); + // reset pre-slice for the next dataframe + auto& slices = pc.services().get(); + homogeneous_apply_refs_sized([&slices](auto& element) { + return analysis_task_parsers::updateSliceInfo(element, slices); + }, + *(task.get())); + // initialize local caches + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get())); + // prepare outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get()); + // execute run() + if constexpr (requires { task->run(pc); }) { + task->run(pc); + } + // execute process() + if constexpr (requires { &T::process; }) { + auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); + auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; #if (FAIRMQ_VERSION_DEC >= 111000) - AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, pointerReconstructor, &T::process, expressionInfos, slices, newOrigin); #else - AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin); #endif - } - // execute optional process() - homogeneous_apply_refs_sized( + } + // execute optional process() + homogeneous_apply_refs_sized( #if (FAIRMQ_VERSION_DEC >= 111000) - [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto& x) { + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin, &pointerReconstructor](auto& x) { #else - [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { #endif - if constexpr (is_process_configurable) { - if (x.value == true) { - auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); - auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; + if constexpr (is_process_configurable) { + if (x.value == true) { + auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId(); }); + auto matchers = loc == inputInfos.end() ? std::vector>{} : loc->matchers; #if (FAIRMQ_VERSION_DEC >= 111000) - AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, pointerReconstructor, x.process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, pointerReconstructor, x.process, expressionInfos, slices, newOrigin); #else - AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin); + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin); #endif - return true; + return true; + } + return false; } return false; - } - return false; - }, - *task.get()); - // prepare delayed outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get()); - // finalize outputs - homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get()); - }; - } + }, + *task.get()); + // prepare delayed outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get()); + // finalize outputs + homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get()); + }; + } }; return { From fa9f40cf436df7921f62ec7eec4a2ca31380f684 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Fri, 12 Jun 2026 09:28:48 +0000 Subject: [PATCH 05/10] Please consider the following formatting changes --- Framework/Core/include/Framework/AnalysisTask.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index ab675d8f128c7..8e07eed8a5f7b 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -602,7 +602,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) // replace origins in Preslice declarations homogeneous_apply_refs_sized([&newOrigin](auto& element) { return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get()); - auto algo = AlgorithmSpec::InitCallback{ + auto algo = AlgorithmSpec::InitCallback + { [task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { Cache bindingsKeys; Cache bindingsKeysUnsorted; From 1ee9800942b4fc19da9895bc8b4cfe90bc81a2c0 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 12 Jun 2026 11:39:06 +0200 Subject: [PATCH 06/10] Apply suggestions from code review Co-authored-by: Anton Alkin --- Framework/Core/include/Framework/FairMQDeviceProxy.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Framework/Core/include/Framework/FairMQDeviceProxy.h b/Framework/Core/include/Framework/FairMQDeviceProxy.h index 1a73a90947bb3..0aa2d6dfc9e75 100644 --- a/Framework/Core/include/Framework/FairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/FairMQDeviceProxy.h @@ -65,12 +65,12 @@ class FairMQDeviceProxy [[nodiscard]] ChannelIndex getForwardChannelIndexByName(std::string const& channelName) const; /// Retrieve the channel index from a given OutputSpec and the associated timeslice [[nodiscard]] ChannelIndex getOutputChannelIndex(OutputSpec const& spec, size_t timeslice) const; + #if (FAIRMQ_VERSION_DEC >= 111000) /// Retrieve the pointer-reconstruction function for the shm manager for a given input spec [[nodiscard]] PointerReconstructor getShmPointerReconstructor(InputSpec const& spec, size_t timeslice); -#if (FAIRMQ_VERSION_DEC >= 111000) +#endif /// Retrieve the channel index from a given OutputSpec and the associated timeslice void getMatchingForwardChannelIndexes(std::vector& result, header::DataHeader const& header, size_t timeslice) const; -#endif /// ChannelIndex from a RouteIndex [[nodiscard]] ChannelIndex getOutputChannelIndex(RouteIndex routeIndex) const; [[nodiscard]] ChannelIndex getInputChannelIndex(RouteIndex routeIndex) const; From 0d496ab0e5bbd457b28cb70306d85d9856061284 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 12 Jun 2026 11:39:35 +0200 Subject: [PATCH 07/10] Apply suggestions from code review Co-authored-by: Anton Alkin --- Framework/Core/include/Framework/FairMQDeviceProxy.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/FairMQDeviceProxy.h b/Framework/Core/include/Framework/FairMQDeviceProxy.h index 0aa2d6dfc9e75..8ab93d83dc1c2 100644 --- a/Framework/Core/include/Framework/FairMQDeviceProxy.h +++ b/Framework/Core/include/Framework/FairMQDeviceProxy.h @@ -65,7 +65,7 @@ class FairMQDeviceProxy [[nodiscard]] ChannelIndex getForwardChannelIndexByName(std::string const& channelName) const; /// Retrieve the channel index from a given OutputSpec and the associated timeslice [[nodiscard]] ChannelIndex getOutputChannelIndex(OutputSpec const& spec, size_t timeslice) const; - #if (FAIRMQ_VERSION_DEC >= 111000) +#if (FAIRMQ_VERSION_DEC >= 111000) /// Retrieve the pointer-reconstruction function for the shm manager for a given input spec [[nodiscard]] PointerReconstructor getShmPointerReconstructor(InputSpec const& spec, size_t timeslice); #endif From 5137ed8a897a3beba428d3879dd1a0251bab79a7 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 12 Jun 2026 11:46:30 +0200 Subject: [PATCH 08/10] Apply suggestion from @aalkin --- Framework/Core/include/Framework/AnalysisTask.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 8e07eed8a5f7b..8b7daa0976423 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -678,7 +678,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) pointerReconstructor = proxy.getShmPointerReconstructor(spec, 0); } #else - return [task, expressionInfos, inputInfos, newOrigin, hasCCDBTables](ProcessingContext& pc) mutable { + return [task, expressionInfos, inputInfos, newOrigin](ProcessingContext& pc) mutable { #endif // load the ccdb object from their cache homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); From 41cdc74301153876f2032311a60cd6ed91bea27b Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 12 Jun 2026 11:53:26 +0200 Subject: [PATCH 09/10] Apply suggestion from @aalkin --- Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index b8932d7dbc991..1471164f71f03 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -222,7 +222,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) result &= value_builder->Append(response.size); #else char const* address = reinterpret_cast(response.id.value); - result &= builder->Append(std::string_view(address, response.size)); + result &= builder.second->Append(std::string_view(address, response.size)); #endif ++bi; } From 2f8a3b6f39db399647cf1eebbd3075e8256e6130 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 12 Jun 2026 12:06:32 +0200 Subject: [PATCH 10/10] Apply suggestion from @aalkin --- Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx index 1471164f71f03..6c23ef7e25966 100644 --- a/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx @@ -120,7 +120,11 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/) schemas.emplace_back(std::make_shared(fields, schemaMetadata)); } +#if (FAIRMQ_VERSION_DEC >= 111000) std::vector>> allbuilders; +#else + std::vector>> allbuilders; +#endif allbuilders.resize([&schemas]() { size_t size = 0; for (auto& schema : schemas) { size += schema->num_fields(); }; return size; }()); auto* pool = arrow::default_memory_pool();