Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 60 additions & 13 deletions Framework/CCDBSupport/src/AnalysisCCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "AnalysisCCDBHelpers.h"
#include "CCDBFetcherHelper.h"
#include "Framework/ArrowTypes.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/DeviceSpec.h"
#include "Framework/TimingInfo.h"
Expand All @@ -22,6 +23,7 @@
#include "Framework/DanglingEdgesContext.h"
#include "Framework/ConfigContext.h"
#include "Framework/ConfigParamsHelper.h"
#include <fairmq/Version.h>
#include <arrow/array/builder_binary.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
Expand Down Expand Up @@ -109,20 +111,49 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
auto it = ccdbUrls.find(m.name);
fieldMetadata->Append("url", it != ccdbUrls.end() ? it->second : m.defaultValue.asString());
auto columnName = m.name.substr(strlen("ccdb:"));
#if (FAIRMQ_VERSION_DEC >= 111000)
fields.emplace_back(std::make_shared<arrow::Field>(columnName, soa::asArrowDataType<int64_t[3]>(), false, fieldMetadata));
#else
fields.emplace_back(std::make_shared<arrow::Field>(columnName, arrow::binary_view(), false, fieldMetadata));
#endif
}
schemas.emplace_back(std::make_shared<arrow::Schema>(fields, schemaMetadata));
}

#if (FAIRMQ_VERSION_DEC >= 111000)
std::vector<std::pair<uint32_t, std::shared_ptr<arrow::FixedSizeListBuilder>>> allbuilders;
Comment thread
aalkin marked this conversation as resolved.
#else
std::vector<std::pair<uint32_t, std::shared_ptr<arrow::BinaryViewBuilder>>> allbuilders;
#endif
allbuilders.resize([&schemas]() { size_t size = 0; for (auto& schema : schemas) { size += schema->num_fields(); }; return size; }());
auto* pool = arrow::default_memory_pool();

int idx = 0;
int sidx = 0;
for (auto const& schema : schemas) {
for (auto const& _ : schema->fields()) {
#if (FAIRMQ_VERSION_DEC >= 111000)
auto value_builder = std::make_shared<arrow::Int64Builder>();
allbuilders[idx] = std::make_pair(sidx, std::make_shared<arrow::FixedSizeListBuilder>(pool, std::move(value_builder), 3));
#else
allbuilders[idx] = std::make_pair(sidx, std::make_shared<arrow::BinaryViewBuilder>());
#endif
++idx;
}
++sidx;
}

std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
CCDBFetcherHelper::initialiseHelper(*helper, options);
std::unordered_map<std::string, int> bindings;
fillValidRoutes(*helper, spec.outputs, bindings);

return adaptStateless([schemas, bindings, helper](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
return adaptStateless([schemas, bindings, helper, allbuilders](InputRecord& inputs, DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
O2_SIGNPOST_ID_GENERATE(sid, ccdb);
O2_SIGNPOST_START(ccdb, sid, "fetchFromAnalysisCCDB", "Fetching CCDB objects for analysis%" PRIu64, (uint64_t)timingInfo.timeslice);
for (auto& schema : schemas) {
std::ranges::for_each(allbuilders, [](auto& builder) { builder.second->Reset(); });
for (auto i = 0U; i < schemas.size(); ++i) {
auto& schema = schemas[i];
std::vector<CCDBFetcherHelper::FetchOp> ops;
auto inputBinding = *schema->metadata()->Get("sourceTable");
auto inputMatcher = DataSpecUtils::fromString(*schema->metadata()->Get("sourceMatcher"));
Expand All @@ -134,6 +165,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
auto table = inputs.get<TableConsumer>(inputMatcher)->asArrowTable();
// FIXME: make the fTimestamp column configurable.
auto timestampColumn = table->GetColumnByName("fTimestamp");
auto reserveSize = timestampColumn->length();
O2_SIGNPOST_EVENT_EMIT_INFO(ccdb, sid, "fetchFromAnalysisCCDB",
"There are %zu bindings available", bindings.size());
for (auto& binding : bindings) {
Expand All @@ -143,9 +175,16 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
}
int outputRouteIndex = bindings.at(outRouteDesc);
auto& spec = helper->routes[outputRouteIndex].matcher;
std::vector<std::shared_ptr<arrow::BinaryViewBuilder>> builders;
for (auto const& _ : schema->fields()) {
builders.emplace_back(std::make_shared<arrow::BinaryViewBuilder>());
auto builders = allbuilders | std::views::filter([&i](auto const& builder) { return builder.first == i; });
unsigned int numBuilders = std::ranges::count_if(allbuilders, [&i](auto const& builder) { return builder.first == i; });
arrow::Status status;
std::ranges::for_each(builders, [&status, &reserveSize](auto& builder) {
if (reserveSize > builder.second->capacity()) {
status &= builder.second->Reserve(reserveSize - builder.second->capacity());
}
});
if (!status.ok()) {
throw framework::runtime_error_f("Failed to reserve arrays: ", status.ToString().c_str());
}

for (auto ci = 0; ci < timestampColumn->num_chunks(); ++ci) {
Expand All @@ -171,15 +210,25 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
O2_SIGNPOST_START(ccdb, sid, "handlingResponses",
"Got %zu responses from server.",
responses.size());
if (builders.size() != responses.size()) {
LOGP(fatal, "Not enough responses (expected {}, found {})", builders.size(), responses.size());
if (numBuilders != responses.size()) {
LOGP(fatal, "Not enough responses (expected {}, found {})", numBuilders, responses.size());
}
arrow::Status result;
for (size_t bi = 0; bi < responses.size(); bi++) {
auto& builder = builders[bi];

int bi = 0;
for (auto& builder : builders) {
auto& response = responses[bi];
#if (FAIRMQ_VERSION_DEC >= 111000)
result &= builder.second->Append();
auto* value_builder = dynamic_cast<arrow::Int64Builder*>(builder.second->value_builder());
result &= value_builder->Append(response.id.handle);
result &= value_builder->Append(response.id.segment);
result &= value_builder->Append(response.size);
#else
char const* address = reinterpret_cast<char const*>(response.id.value);
result &= builder->Append(std::string_view(address, response.size));
result &= builder.second->Append(std::string_view(address, response.size));
#endif
++bi;
}
if (!result.ok()) {
LOGP(fatal, "Error adding results from CCDB");
Expand All @@ -188,9 +237,7 @@ AlgorithmSpec AnalysisCCDBHelpers::fetchFromCCDB(ConfigContext const& /*ctx*/)
}
}
arrow::ArrayVector arrays;
for (auto& builder : builders) {
arrays.push_back(*builder->Finish());
}
std::ranges::for_each(builders, [&arrays](auto& builder) { arrays.push_back(*builder.second->Finish()); });
auto outTable = arrow::Table::Make(schema, arrays);
auto concrete = DataSpecUtils::asConcreteDataMatcher(spec);
allocator.adopt(Output{concrete.origin, concrete.description, concrete.subSpec}, outTable);
Expand Down
93 changes: 91 additions & 2 deletions Framework/Core/include/Framework/ASoA.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "Framework/ArrowTableSlicingCache.h" // IWYU pragma: export
#include "Framework/SliceCache.h" // IWYU pragma: export
#include "Framework/VariantHelpers.h" // IWYU pragma: export
#include <fairmq/Version.h>
#include <arrow/array/array_binary.h>
#include <arrow/table.h> // IWYU pragma: export
#include <arrow/array.h> // IWYU pragma: export
Expand All @@ -36,9 +37,17 @@
#include <cstring>
#include <gsl/span> // IWYU pragma: export

namespace fair::mq::shmem
{
struct MetaHeader;
}

namespace o2::framework
{
using ListVector = std::vector<std::vector<int64_t>>;
#if (FAIRMQ_VERSION_DEC >= 111000)
using PointerReconstructor = std::function<std::byte*(fair::mq::shmem::MetaHeader&&)>;
#endif

std::string cutString(std::string&& str);
std::string strToUpper(std::string&& str);
Expand Down Expand Up @@ -1081,6 +1090,9 @@ concept can_bind = requires(T&& t) {
template <typename... C>
concept has_index = (is_indexing_column<C> || ...);

template <typename C>
concept needs_ptr_rec = C::needs_ptr_rec;

template <typename D, typename O, typename IP, typename... C>
struct TableIterator : IP, C... {
public:
Expand Down Expand Up @@ -1248,6 +1260,21 @@ struct TableIterator : IP, C... {
{
doSetCurrentInternal(internal_index_columns_t{}, table);
}
#if (FAIRMQ_VERSION_DEC >= 111000)
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
{
[&pointerReconstructor, this]<typename... Cs>(framework::pack<Cs...>) {
([&pointerReconstructor, this]<typename CC>() {
if constexpr (needs_ptr_rec<CC>) {
if (pointerReconstructor) {
CC::ptrRec = &pointerReconstructor;
}
}
}.template operator()<Cs>(),
...);
}(all_columns{});
}
#endif

private:
/// Helper to move at the end of columns which actually have an iterator.
Expand Down Expand Up @@ -2285,7 +2312,12 @@ class Table
{
return self_t{mTable->Slice(0, 0), 0};
}

#if (FAIRMQ_VERSION_DEC >= 111000)
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
{
mBegin.setPointerReconstructor(pointerReconstructor);
}
#endif
private:
template <typename T>
arrow::ChunkedArray* lookupColumn()
Expand Down Expand Up @@ -2464,6 +2496,57 @@ consteval static std::string_view namespace_prefix()
}; \
[[maybe_unused]] static constexpr o2::framework::expressions::BindingNode _Getter_ { _Label_, _Name_::hash, o2::framework::expressions::selectArrowType<_Type_>() }

#if (FAIRMQ_VERSION_DEC >= 111000)
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
struct _Name_ : o2::soa::Column<int64_t[3], _Name_> { \
static constexpr const char* mLabel = _Label_; \
static constexpr const char* query = _CCDBQuery_; \
static constexpr const uint32_t hash = crc32(namespace_prefix<_Name_>(), std::string_view{#_Getter_}); \
static constexpr bool needs_ptr_rec = true; \
std::function<std::byte*(fair::mq::shmem::MetaHeader&&)> const* ptrRec = nullptr; \
using base = o2::soa::Column<int64_t[3], _Name_>; \
using type = int64_t[3]; \
using column_t = _Name_; \
_Name_(arrow::ChunkedArray const* column) \
: o2::soa::Column<int64_t[3], _Name_>(o2::soa::ColumnIterator<int64_t[3]>(column)) \
{ \
} \
\
_Name_() = default; \
_Name_(_Name_ const& other) = default; \
_Name_& operator=(_Name_ const& other) = default; \
\
decltype(auto) _Getter_() const \
{ \
auto& [handle, segment, size] = *mColumnIterator; \
auto span = std::span<std::byte>{(*ptrRec)(fair::mq::shmem::MetaHeader{ \
static_cast<size_t>(size), \
0, handle, 0, 0, \
static_cast<uint16_t>(segment), true}), \
static_cast<size_t>(size)}; \
if constexpr (std::same_as<_ConcreteType_, std::span<std::byte>>) { \
return span; \
} else { \
static std::byte* payload = nullptr; \
static _ConcreteType_* deserialised = nullptr; \
static TClass* c = TClass::GetClass(#_ConcreteType_); \
if (payload != (std::byte*)span.data()) { \
payload = (std::byte*)span.data(); \
delete deserialised; \
TBufferFile f(TBufferFile::EMode::kRead, span.size(), (char*)span.data(), kFALSE); \
deserialised = (_ConcreteType_*)soa::extractCCDBPayload((char*)payload, span.size(), c, "ccdb_object"); \
} \
return *deserialised; \
} \
} \
\
decltype(auto) \
get() const \
{ \
return _Getter_(); \
} \
};
#else
#define DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, _Label_, _Getter_, _ConcreteType_, _CCDBQuery_) \
struct _Name_ : o2::soa::Column<std::span<std::byte>, _Name_> { \
static constexpr const char* mLabel = _Label_; \
Expand Down Expand Up @@ -2506,6 +2589,7 @@ consteval static std::string_view namespace_prefix()
return _Getter_(); \
} \
};
#endif

#define DECLARE_SOA_CCDB_COLUMN(_Name_, _Getter_, _ConcreteType_, _CCDBQuery_) \
DECLARE_SOA_CCDB_COLUMN_FULL(_Name_, "f" #_Name_, _Getter_, _ConcreteType_, _CCDBQuery_)
Expand Down Expand Up @@ -3849,7 +3933,12 @@ class FilteredBase : public T
{
return mCached;
}

#if (FAIRMQ_VERSION_DEC >= 111000)
void setPointerReconstructor(framework::PointerReconstructor const& pointerReconstructor)
{
mFilteredBegin.setPointerReconstructor(pointerReconstructor);
}
#endif
private:
void resetRanges()
{
Expand Down
5 changes: 5 additions & 0 deletions Framework/Core/include/Framework/AnalysisDataModel.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
#include "SimulationDataFormat/MCGenProperties.h"
#include "Framework/PID.h"

#include <fairmq/Version.h>
#if (FAIRMQ_VERSION_DEC >= 111000)
#include <fairmq/shmem/Common.h>
#endif

namespace o2
{
namespace aod
Expand Down
Loading