From 13a4744824194a4f77eb59a463589640ba83d683 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 5 May 2026 13:02:44 +0200 Subject: [PATCH 01/14] DPL Analysis: introduce InputInfo for AnalysisTask * InputInfo contains the matchers associated with particular arguments of a particular process function and is formed when the DataProcessorSpec is created * Vector of InputInfos is used in processing to extract tables from InputRecord --- .../Core/include/Framework/AnalysisManagers.h | 1 + .../Core/include/Framework/AnalysisTask.h | 163 +++++++++--------- 2 files changed, 86 insertions(+), 78 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index 1873f33937742..37d98aa8075e1 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -261,6 +261,7 @@ bool prepareOutput(ProcessingContext&, T&) template bool prepareOutput(ProcessingContext& context, T& produces) { + /// FIXME: use matcher instead of an OutputRef produces.resetCursor(std::move(context.outputs().make(soa::tableRef2OutputRef()))); return true; } diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index fbd523c7b0c37..ae428553e4b0a 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -68,6 +68,15 @@ concept is_enumeration = is_enumeration_v>; template concept is_table_iterator_or_enumeration = soa::is_table_or_iterator || is_enumeration; +/// Structure to contain mapping between matchers and process functions. +/// Process function is identified by hash, each matcher has associated +/// argument position for that process function; single argument can have +/// many matchers associated due to complicated joins +struct InputInfo { + uint32_t hash; + std::vector> matchers; +}; + // Helper struct which builds a DataProcessorSpec from // the contents of an AnalysisTask... namespace @@ -93,11 +102,20 @@ struct AnalysisDataProcessorBuilder { } template - static void addOriginalRef(const char* name, bool value, std::vector& inputs) + static void addOriginalRef(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash) { auto spec = soa::tableRef2InputSpec(); spec.metadata.emplace_back(ConfigParamSpec{std::string{"control:"} + name, VariantType::Bool, value, {"\"\""}}); DataSpecUtils::updateInputList(inputs, std::move(spec)); + auto matcher = DataSpecUtils::asConcreteDataMatcher(spec); + auto locate = std::ranges::find_if(iInfos, [&hash](auto const& info) { return info.hash == hash; }); + if (locate == iInfos.end()) { + iInfos.emplace_back(hash, std::vector{std::pair{ai, matcher}}); + } else { + if (std::ranges::none_of(locate->matchers, [&ai, &matcher](auto const& match) { return (match.first == ai) && (match.second == matcher); })) { + locate->matchers.emplace_back(std::pair{ai, matcher}); + } + } } /// helpers to append expression information for a single argument @@ -122,43 +140,43 @@ struct AnalysisDataProcessorBuilder { /// helpers to append InputSpec for a single argument template - static void addInput(const char* name, bool value, std::vector& inputs) + static void addInput(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash) { - [&name, &value, &inputs] refs, size_t... Is>(std::index_sequence) mutable { - (addOriginalRef(name, value, inputs), ...); + [&name, &value, &inputs, &iInfos, &ai, &hash] refs, size_t... Is>(std::index_sequence) mutable { + (addOriginalRef(name, value, inputs, iInfos, ai, hash), ...); }.template operator()::originals>(std::make_index_sequence::originals.size()>()); } template - static void addInput(const char* name, bool value, std::vector& inputs) + static void addInput(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash) { - addInput::parent_t>(name, value, inputs); + addInput::parent_t>(name, value, inputs, iInfos, ai, hash); } /// helper to append the inputs and expression information for normalized arguments template - static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector& inputs, std::vector& eInfos) + static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos) { int ai = -1; - ([&ai, &hash, &eInfos, &name, &value, &inputs]() mutable { + ([&ai, &hash, &eInfos, &name, &value, &inputs, &iInfos]() mutable { ++ai; using T = std::decay_t; addExpression(ai, hash, eInfos); - addInput(name, value, inputs); + addInput(name, value, inputs, iInfos, ai, hash); }(), ...); } /// helper to parse the process arguments template - inline static bool requestInputsFromArgs(T&, std::string const&, std::vector&, std::vector&) + inline static bool requestInputsFromArgs(T&, std::string const&, std::vector&, std::vector&, std::vector&) { return false; } template - inline static bool requestInputsFromArgs(T& pc, std::string const& name, std::vector& inputs, std::vector& eis) + inline static bool requestInputsFromArgs(T& pc, std::string const& name, std::vector& inputs, std::vector& eis, std::vector& iifs) { - AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (name + "/" + pc.name).c_str(), pc.value, inputs, eis); + AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (name + "/" + pc.name).c_str(), pc.value, inputs, eis, iifs); return true; } template @@ -174,7 +192,7 @@ struct AnalysisDataProcessorBuilder { } /// 1. enumeration (must be the only argument) template - static void inputsFromArgs(void (C::*)(A), const char* /*name*/, bool /*value*/, std::vector& inputs, std::vector&) //, Cache&, Cache&) + static void inputsFromArgs(void (C::*)(A), const char* /*name*/, bool /*value*/, std::vector& inputs, std::vector&, std::vector&) { std::vector inputMetadata; // FIXME: for the moment we do not support begin, end and step. @@ -183,20 +201,20 @@ struct AnalysisDataProcessorBuilder { /// 2. 1st argument is an iterator template - static void inputsFromArgs(void (C::*)(A, Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos) //, Cache& bk, Cache& bku) + static void inputsFromArgs(void (C::*)(A, Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos) requires(std::is_lvalue_reference_v && (std::is_lvalue_reference_v && ...)) { constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); - addInputsAndExpressions::parent_t, Args...>(hash, name, value, inputs, eInfos); + addInputsAndExpressions::parent_t, Args...>(hash, name, value, inputs, eInfos, iInfos); } /// 3. generic case template - static void inputsFromArgs(void (C::*)(Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos) //, Cache&, Cache&) + static void inputsFromArgs(void (C::*)(Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos) requires(std::is_lvalue_reference_v && ...) { constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); - addInputsAndExpressions(hash, name, value, inputs, eInfos); + addInputsAndExpressions(hash, name, value, inputs, eInfos, iInfos); } /// 1. enumeration (no grouping) @@ -226,28 +244,32 @@ struct AnalysisDataProcessorBuilder { return table; } - template - static auto extractFromRecord(InputRecord& record) + template + static auto extractTablesFromRecord(InputRecord& record, R matchers) { - return T { [&record] refs, size_t... Is>(std::index_sequence) { return std::vector{extractTableFromRecord(record)...}; }.template operator()(std::make_index_sequence()) }; + std::vector> tables; + std::ranges::transform(matchers, std::back_inserter(tables), [&record](auto const& m) { + return record.get(m.second)->asArrowTable(); + }); + return tables; } - template - static auto extractFromRecord(InputRecord& record) + template + static auto extractFromRecord(InputRecord& record, R matchers) { - return typename T::parent_t { [&record] refs, size_t... Is>(std::index_sequence) { return std::vector{extractTableFromRecord(record)...}; }.template operator()(std::make_index_sequence()) }; + return T{extractTablesFromRecord(record, matchers)}; } - template - static auto extractFilteredFromRecord(InputRecord& record, ExpressionInfo& info) + template + static auto extractFromRecord(InputRecord& record, R matchers) { - std::shared_ptr table = nullptr; - auto joiner = [&record] refs, size_t... Is>(std::index_sequence) { return std::vector{extractTableFromRecord(record)...}; }; - if constexpr (soa::is_iterator) { - table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()(std::make_index_sequence()), std::span{T::parent_t::originalLabels}); - } else { - table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()(std::make_index_sequence()), std::span{T::originalLabels}); - } + return typename T::parent_t{extractTablesFromRecord(record, matchers)}; + } + + template + static auto extractFilteredFromRecord(InputRecord& record, R matchers, ExpressionInfo& info) + { + std::shared_ptr table = soa::ArrowHelpers::joinTables(extractTablesFromRecord(record, matchers)); expressions::updateFilterInfo(info, table); if constexpr (!o2::soa::is_smallgroups>) { if (info.selection == nullptr) { @@ -262,46 +284,37 @@ struct AnalysisDataProcessorBuilder { } template - static auto extract(InputRecord&, std::vector&, size_t) + static auto extract(InputRecord&, std::vector, std::vector&, size_t) { return T{}; } - template - static auto extract(InputRecord& record, std::vector& infos, size_t phash) + template + static auto extract(InputRecord& record, std::vector iInfos, std::vector& infos, size_t phash) { - if constexpr (std::same_as) { - return extractFilteredFromRecord(record, *std::find_if(infos.begin(), infos.end(), [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); })); + auto matchers = std::ranges::find_if(iInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers | std::views::filter([](auto const& pair) { return pair.first == AI; }); + if constexpr (soa::is_filtered) { + return extractFilteredFromRecord(record, matchers, *std::ranges::find_if(infos, [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); })); } else { - return extractFromRecord(record); - } - } - - template - static auto extract(InputRecord& record, std::vector& infos, size_t phash) - { - if constexpr (soa::is_filtered_table) { - return extractFilteredFromRecord(record, *std::find_if(infos.begin(), infos.end(), [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); })); - } else { - return extractFromRecord(record); + return extractFromRecord(record, matchers); } } template - static auto bindGroupingTable(InputRecord& record, void (C::*)(Grouping, Args...), std::vector& infos) + static auto bindGroupingTable(InputRecord& record, std::vector iInfos, void (C::*)(Grouping, Args...), std::vector& infos) requires(!std::same_as) { constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); - return extract, 0>(record, infos, hash); + return extract, 0>(record, iInfos, infos, hash); } template - static auto bindAssociatedTables(InputRecord& record, void (C::*)(Grouping, Args...), std::vector& infos) + static auto bindAssociatedTables(InputRecord& record, std::vector iInfos, void (C::*)(Grouping, Args...), std::vector& infos) requires(!std::same_as && sizeof...(Args) > 0) { constexpr auto p = pack{}; constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); - return std::make_tuple(extract, has_type_at_v(p) + 1>(record, infos, hash)...); + return std::make_tuple(extract, has_type_at_v(p) + 1>(record, iInfos, infos, hash)...); } template @@ -311,10 +324,10 @@ struct AnalysisDataProcessorBuilder { } template - static void invokeProcess(Task& task, InputRecord& inputs, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices) + static void invokeProcess(Task& task, InputRecord& inputs, std::vector iInfos, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices) { using G = std::decay_t; - auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos); + auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, iInfos, processingFunction, infos); constexpr const int numElements = nested_brace_constructible_size>() / 10; @@ -339,15 +352,11 @@ struct AnalysisDataProcessorBuilder { std::invoke(processingFunction, task, *element); } } else { - static_assert(soa::is_table || is_enumeration, - "Single argument of process() should be a table-like or an iterator"); std::invoke(processingFunction, task, groupingTable); } } else { // multiple arguments to process - static_assert(((soa::is_iterator> == false) && ...), - "Associated arguments of process() should not be iterators"); - auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos); + auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, iInfos, processingFunction, infos); // pre-bind self indices std::apply( [&task](auto&... t) mutable { @@ -386,7 +395,7 @@ struct AnalysisDataProcessorBuilder { }, task); overwriteInternalIndices(associatedTables, associatedTables); - if constexpr (soa::is_iterator>) { + if constexpr (soa::is_iterator) { auto slicer = GroupSlicer(groupingTable, associatedTables, slices); for (auto& slice : slicer) { auto associatedSlices = slice.associatedTables(); @@ -404,7 +413,9 @@ struct AnalysisDataProcessorBuilder { }, task); - invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices); + [](Task& task, void (Task::*processingFunction)(Grouping, Associated...), Grouping g, std::tuple...>& at) { + std::invoke(processingFunction, task, g, std::get>(at)...); + }(task, processingFunction, slice.groupingElement(), associatedSlices); } } else { // bind partitions and grouping table @@ -414,16 +425,12 @@ struct AnalysisDataProcessorBuilder { }, task); - invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables); + [](Task& task, void (Task::*processingFunction)(Grouping, Associated...), Grouping g, std::tuple...>& at) { + std::invoke(processingFunction, task, g, std::get>(at)...); + }(task, processingFunction, groupingTable, associatedTables); } } } - - template - static void invokeProcessWithArgs(C& task, T processingFunction, G g, std::tuple& at) - { - std::invoke(processingFunction, task, g, std::get(at)...); - } }; } // namespace @@ -501,7 +508,6 @@ auto getTaskNameSetProcesses(std::string& outputName, A... args) outputName = type_to_task_name(type_name_str); return task; } - } // namespace /// Adaptor to make an AlgorithmSpec from a o2::framework::Task @@ -526,6 +532,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) std::vector inputs; std::vector options; std::vector expressionInfos; + std::vector inputInfos; constexpr const int numElements = nested_brace_constructible_size>() / 10; @@ -536,12 +543,12 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) /// parse process functions defined by corresponding configurables if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos); + AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos, inputInfos); } homogeneous_apply_refs_sized( - [name = name_str, &expressionInfos, &inputs](auto& x) mutable { + [name = name_str, &expressionInfos, &inputs, &inputInfos](auto& x) mutable { // this pushes (argumentIndex, processHash, schemaPtr, nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators - return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos); + return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos, inputInfos); }, *task.get()); @@ -564,7 +571,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end()); homogeneous_apply_refs_sized([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get()); - auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos](InitContext& ic) mutable { + auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos](InitContext& ic) mutable { Cache bindingsKeys; Cache bindingsKeysUnsorted; // add preslice declarations to slicing cache definition @@ -613,7 +620,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) ic.services().get().setCaches(std::move(bindingsKeys)); ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); - return [task, expressionInfos](ProcessingContext& pc) mutable { + return [task, expressionInfos, inputInfos](ProcessingContext& pc) mutable { // load the ccdb object from their cache homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); // reset partitions once per dataframe @@ -621,7 +628,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) // reset selections for the next dataframe std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; }); // reset pre-slice for the next dataframe - auto slices = pc.services().get(); + auto& slices = pc.services().get(); homogeneous_apply_refs_sized([&slices](auto& element) { return analysis_task_parsers::updateSliceInfo(element, slices); }, @@ -636,14 +643,14 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) } // execute process() if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices); + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), inputInfos, &T::process, expressionInfos, slices); } // execute optional process() homogeneous_apply_refs_sized( - [&pc, &expressionInfos, &task, &slices](auto& x) { + [&pc, &expressionInfos, &task, &slices, &inputInfos](auto& x) { if constexpr (is_process_configurable) { if (x.value == true) { - AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), x.process, expressionInfos, slices); + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), inputInfos, x.process, expressionInfos, slices); return true; } return false; From dfd8af1eafeafa4408e2b88e407d7124ba5d1dd0 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Wed, 6 May 2026 09:42:23 +0200 Subject: [PATCH 02/14] add fixmes --- Framework/Core/include/Framework/ASoA.h | 2 +- .../Core/include/Framework/AnalysisHelpers.h | 31 +++++++++++++++++-- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index fc17fa139875c..bb3597b51fb97 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -1521,7 +1521,7 @@ consteval static bool relatedBySortedIndex() namespace o2::framework { - +/// FIXME: has to track origin to handle the correct arguments struct PreslicePolicyBase { const std::string binding; Entry bindingKey; diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index cfd2f357ba06f..f76377b07ea55 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -573,6 +573,9 @@ struct OutputForTable { } }; +/// In a multi-origin case the origin is provided by the type +/// FIXME: in a rewritten origin case, we need to modify the output designation + /// This helper class allows you to declare things which will be created by a /// given analysis task. Notice how the actual cursor is implemented by the /// means of the WritingCursor helper class, from which produces actually @@ -598,6 +601,9 @@ struct ProducesGroup { template concept is_produces_group = std::derived_from; +/// In a multi-origin case the origin is provided by the type +/// FIXME: In a rewritten origin case, we need to modify the output designation + /// Helper template for table transformations template struct TableTransform { @@ -648,6 +654,10 @@ constexpr auto transformBase() return TableTransform>::ref>{}; } + +/// In a multi-origin case the origin is provided by the type +/// FIXME: In a rewritten origin case the output designation needs to be changed (through base class) +/// The extraction of the elements needs to be changed in AnalysisManagers using the origin information from the base class template struct Spawns : decltype(transformBase()) { using spawnable_t = T; @@ -692,11 +702,14 @@ concept is_spawns = requires(T t) { requires std::same_as>; }; +/// In a multi-origin case the origin is provided by the type +/// FIXME: In a rewritten origin case the output designation needs to be changed (through base class) +/// The extraction of the elements needs to be changed in AnalysisManagers using the origin information from the base class + /// This helper struct allows you to declare extended tables with dynamically-supplied /// expressions to be created by the task /// The actual expressions have to be set in init() for the configurable expression /// columns, used to define the table - template struct Defines : decltype(transformBase()) { static constexpr bool delayed = DELAYED; @@ -761,7 +774,6 @@ struct Sparse { }; /// This helper struct allows you to declare index tables to be created in a task - template constexpr auto transformBase() { @@ -769,6 +781,10 @@ constexpr auto transformBase() return TableTransform{}; } +/// In a multi-origin case the origin is provided by the type +/// FIXME: In a rewritten origin case the output designation needs to be changed (through base class) +/// The extraction of the elements needs to be changed in AnalysisManagers using the origin information from the base class + template struct Builds : decltype(transformBase()) { using buildable_t = T; @@ -818,6 +834,10 @@ concept is_builds = requires(T t) { requires std::same_as>; }; + +/// a task with rewritten origin, if running together with a task with the default, will +/// have a different name and thus its output would be routed separately + /// This helper class allows you to declare things which will be created by a /// given analysis task. Currently wrapped objects are limited to be TNamed /// descendants. Objects will be written to a ROOT file at the end of the @@ -958,6 +978,13 @@ auto getTableFromFilter(soa::is_not_filtered_table auto const& table, soa::Selec void initializePartitionCaches(std::set const& hashes, std::shared_ptr const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter); +/// Partition ties directly to the argument type +/// in a case with several origins in subscriptions it will get the correct input, as the type contains the origin +/// in a case with rewritten origin the type stays the same, so the association stays correct +/// FIXME: currently partition has to rerun the selection each time the invokeProcess is called +/// the real reason is to provide grouped parts for the process functions that request it +/// better solution would be to "slice" the selection, as is already done in GroupSlicer +/// for the same purpose, instead of reapplying the filtering template struct Partition { using content_t = T; From faf2a4a93b25deecd8ae424c6fb5990f67950c09 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 8 May 2026 09:15:08 +0200 Subject: [PATCH 03/14] use runtime matchers for spawns/builds --- .../Core/include/Framework/AnalysisHelpers.h | 4 ++++ .../Core/include/Framework/AnalysisManagers.h | 21 ++++++++++++++----- .../Core/include/Framework/AnalysisTask.h | 13 ++---------- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index f76377b07ea55..4ebfadce94957 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -609,6 +609,10 @@ template struct TableTransform { using metadata = M; constexpr static auto sources = M::template generateSources>(); + std::vector requiredInputs = [](std::index_sequence){ + return std::vector{soa::tableRef2InputSpec()...}; + }(std::make_index_sequence()); + OutputSpec outputSpec = soa::tableRef2OutputSpec(); template static auto base_spec() diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index 37d98aa8075e1..c5dbdbb64d21d 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -41,6 +41,17 @@ static inline auto extractOriginals(ProcessingContext& pc) return {pc.inputs().get(o2::aod::matcher())->asArrowTable()...}; }(std::make_index_sequence()); } + +template +static auto extractTablesFromRecord(InputRecord& record, R matchers) +{ + std::vector> tables; + std::ranges::transform(matchers, std::back_inserter(tables), [&record](auto const& m) { + return record.get(m)->asArrowTable(); + }); + return tables; +} + } // namespace namespace analysis_task_parsers @@ -222,7 +233,7 @@ template requires(is_spawns || is_builds || is_defines) bool appendOutput(std::vector& outputs, T& entity, uint32_t) { - outputs.emplace_back(entity.spec()); + outputs.emplace_back(entity.outputSpec); return true; } @@ -277,7 +288,7 @@ template bool prepareOutput(ProcessingContext& context, T& spawns) { using metadata = o2::aod::MetadataTrait>::metadata; - auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals>()>(context), std::span{metadata::base_table_t::originalLabels}); + auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), spawns.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) ); if (originalTable->num_rows() == 0) { originalTable = makeEmptyTable("EMPTY", typename metadata::base_table_t::persistent_columns_t{}); } @@ -296,7 +307,7 @@ template bool prepareOutput(ProcessingContext& context, T& builds) { using metadata = o2::aod::MetadataTrait>::metadata; - return builds.build(extractOriginals>()>(context)); + return builds.build(framework::extractTablesFromRecord(context.inputs(), builds.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) )); } template @@ -304,7 +315,7 @@ bool prepareOutput(ProcessingContext& context, T& defines) requires(T::delayed == false) { using metadata = o2::aod::MetadataTrait>::metadata; - auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals>()>(context), std::span{metadata::base_table_t::originalLabels}); + auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) ); if (originalTable->num_rows() == 0) { originalTable = makeEmptyTable("EMPTY", typename metadata::base_table_t::persistent_columns_t{}); } @@ -336,7 +347,7 @@ bool prepareDelayedOutput(ProcessingContext& context, T& defines) defines.recompile(); } using metadata = o2::aod::MetadataTrait>::metadata; - auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals(context), std::span{metadata::base_table_t::originalLabels}); + auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) ); if (originalTable->num_rows() == 0) { originalTable = makeEmptyTable(); } diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index ae428553e4b0a..841bf29cad970 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -234,16 +234,6 @@ struct AnalysisDataProcessorBuilder { { } - template - static auto extractTableFromRecord(InputRecord& record) - { - auto table = record.get(o2::aod::matcher())->asArrowTable(); - if (table->num_rows() == 0) { - table = makeEmptyTable(); - } - return table; - } - template static auto extractTablesFromRecord(InputRecord& record, R matchers) { @@ -292,7 +282,8 @@ struct AnalysisDataProcessorBuilder { template static auto extract(InputRecord& record, std::vector iInfos, std::vector& infos, size_t phash) { - auto matchers = std::ranges::find_if(iInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers | std::views::filter([](auto const& pair) { return pair.first == AI; }); + auto matchers = std::ranges::find_if(iInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers + | std::views::filter([](auto const& pair) { return pair.first == AI; }); if constexpr (soa::is_filtered) { return extractFilteredFromRecord(record, matchers, *std::ranges::find_if(infos, [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); })); } else { From d8d1cfad70231afa3d5092570a60ddf306ed1382 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 8 May 2026 13:34:49 +0200 Subject: [PATCH 04/14] add runtime output matcher to produces --- Framework/Core/include/Framework/AnalysisHelpers.h | 1 + Framework/Core/include/Framework/AnalysisManagers.h | 9 ++++----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 4ebfadce94957..c8657f201d761 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -483,6 +483,7 @@ struct WritingCursor { public: using persistent_table_t = decltype([]() { if constexpr (soa::is_iterator) { return typename T::parent_t{nullptr}; } else { return T{nullptr}; } }()); using cursor_t = decltype(std::declval().cursor()); + OutputSpec outputSpec{soa::tableRef2OutputSpec()}; template void operator()(Ts&&... args) diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index c5dbdbb64d21d..1bcee70bf12b4 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -200,9 +200,9 @@ constexpr bool appendOutput(std::vector&, T&, uint32_t) } template -constexpr bool appendOutput(std::vector& outputs, T&, uint32_t) +constexpr bool appendOutput(std::vector& outputs, T& produces, uint32_t) { - outputs.emplace_back(soa::tableRef2OutputSpec()); + outputs.emplace_back(produces.outputSpec); return true; } @@ -272,8 +272,8 @@ bool prepareOutput(ProcessingContext&, T&) template bool prepareOutput(ProcessingContext& context, T& produces) { - /// FIXME: use matcher instead of an OutputRef - produces.resetCursor(std::move(context.outputs().make(soa::tableRef2OutputRef()))); + auto matcher = DataSpecUtils::asConcreteDataMatcher(produces.outputSpec); + produces.resetCursor(std::move(context.outputs().make(Output{matcher.origin, matcher.description, matcher.subSpec}))); return true; } @@ -306,7 +306,6 @@ bool prepareOutput(ProcessingContext& context, T& spawns) template bool prepareOutput(ProcessingContext& context, T& builds) { - using metadata = o2::aod::MetadataTrait>::metadata; return builds.build(framework::extractTablesFromRecord(context.inputs(), builds.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) )); } From f59c54f717c46126bbe01f8845e66b67c4244310 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 8 May 2026 15:06:25 +0200 Subject: [PATCH 05/14] add output matchers to tableTransform; add fixmes --- .../Core/include/Framework/AnalysisHelpers.h | 28 ------------------- .../Core/include/Framework/AnalysisManagers.h | 18 ++++++------ .../Core/include/Framework/AnalysisTask.h | 6 ++++ 3 files changed, 16 insertions(+), 36 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index c8657f201d761..177df31a5675e 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -614,34 +614,6 @@ struct TableTransform { return std::vector{soa::tableRef2InputSpec()...}; }(std::make_index_sequence()); OutputSpec outputSpec = soa::tableRef2OutputSpec(); - - template - static auto base_spec() - { - return soa::tableRef2InputSpec(); - } - - static auto base_specs() - { - return [](std::index_sequence) { - return std::array{base_spec()...}; - }(std::make_index_sequence{}); - } - - static constexpr auto spec() - { - return soa::tableRef2OutputSpec(); - } - - static constexpr auto output() - { - return soa::tableRef2Output(); - } - - static constexpr auto ref() - { - return soa::tableRef2OutputRef(); - } }; /// This helper struct allows you to declare extended tables which should be diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index 1bcee70bf12b4..df45b692d26b9 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -159,13 +159,12 @@ const char* controlOption() } template -concept with_base_table = requires { T::base_specs(); }; +concept with_required_inputs = requires(T t) { t.requiredInputs.size(); }; -template -bool requestInputs(std::vector& inputs, T const& /*entity*/) +template +bool requestInputs(std::vector& inputs, T const& entity) { - auto base_specs = T::base_specs(); - for (auto base_spec : base_specs) { + for (auto base_spec : entity.requiredInputs) { base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption()}, VariantType::Bool, true, {"\"\""}}); DataSpecUtils::updateInputList(inputs, std::forward(base_spec)); } @@ -388,21 +387,24 @@ bool finalizeOutput(ProcessingContext& context, T& producesGroup) template bool finalizeOutput(ProcessingContext& context, T& spawns) { - context.outputs().adopt(spawns.output(), spawns.asArrowTable()); + auto matcher = DataSpecUtils::asConcreteDataMatcher(spawns.outputSpec); + context.outputs().adopt(Output{matcher.origin, matcher.description, matcher.subSpec}, spawns.asArrowTable()); return true; } template bool finalizeOutput(ProcessingContext& context, T& builds) { - context.outputs().adopt(builds.output(), builds.asArrowTable()); + auto matcher = DataSpecUtils::asConcreteDataMatcher(builds.outputSpec); + context.outputs().adopt(Output{matcher.origin, matcher.description, matcher.subSpec}, builds.asArrowTable()); return true; } template bool finalizeOutput(ProcessingContext& context, T& defines) { - context.outputs().adopt(defines.output(), defines.asArrowTable()); + auto matcher = DataSpecUtils::asConcreteDataMatcher(defines.outputSpec); + context.outputs().adopt(Output{matcher.origin, matcher.description, matcher.subSpec}, defines.asArrowTable()); return true; } diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 841bf29cad970..4116a2d2528a8 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -562,6 +562,12 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end()); homogeneous_apply_refs_sized([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get()); + /// FIXME: In order to replace origins consistently, there are following things that need to be touched + /// 1. inputs and outputs, including their metadata + /// 2. inputInfos, that contain matchers for extracting arguments of process functions + /// 3. bindingKeys/bindingKeysUnsorted, that contain matchers to extract tables used to calculate slicing (created in init) + /// 4. Produces/Spawns/Defines/Builds contain matchers for required inputs and created outputs that need to be modified + auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos](InitContext& ic) mutable { Cache bindingsKeys; Cache bindingsKeysUnsorted; From b201a6d23713723b0dde5dd142018acb47aad2f1 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Mon, 11 May 2026 12:48:36 +0200 Subject: [PATCH 06/14] streamline option building (based on #15320) --- Framework/Core/src/DeviceSpecHelpers.cxx | 104 +++++++++++++---------- 1 file changed, 61 insertions(+), 43 deletions(-) diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 011b3aa12162f..d3c3ace1477a6 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1596,53 +1596,71 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, } }; - for (const auto varit : varmap) { + for (const auto& varit : varmap) { // find the option belonging to key, add if the option has been parsed // and is not defaulted const auto* description = odesc.find_nothrow(varit.first, false); - if (description && varmap.count(varit.first)) { - // check the semantics of the value - auto semantic = description->semantic(); - const char* optarg = ""; - if (semantic) { - // the value semantics allows different properties like - // multitoken, zero_token and composing - // currently only the simple case is supported - assert(semantic->min_tokens() <= 1); - // assert(semantic->max_tokens() && semantic->min_tokens()); - if (semantic->min_tokens() > 0) { - std::string stringRep; - if (auto v = boost::any_cast(&varit.second.value())) { - stringRep = *v; - } else if (auto v = boost::any_cast(&varit.second.value())) { - std::stringstream tmp; - tmp << *v; - stringRep = fmt::format("{}", tmp.str()); - } - if (varit.first == "channel-config") { - // FIXME: the parameter to channel-config can be a list of configurations separated - // by semicolon. The individual configurations will be separated and added individually. - // The device arguments can then contaoin multiple channel-config entries, but only - // one for the last configuration is added to control.options - processRawChannelConfig(stringRep); - optarg = tmpArgs.back().c_str(); - } else { - std::string key(fmt::format("--{}", varit.first)); - if (stringRep.length() == 0) { - // in order to identify options without parameter we add a string - // with one blank for the 'blank' parameter, it is filtered out - // further down and a zero-length string is added to argument list - stringRep = " "; - } - updateDeviceArguments(key, stringRep); - optarg = uniqueDeviceArgs[key].c_str(); - } - } else if (semantic->min_tokens() == 0 && varit.second.as()) { - updateDeviceArguments(fmt::format("--{}", varit.first), ""); + if (description == nullptr || varmap.count(varit.first) == 0) { + continue; + } + + if (varit.second.defaulted()) { + continue; + } + + // check the semantics of the value + auto semantic = description->semantic(); + const char* optarg = ""; + if (!semantic) { + control.options.insert(std::make_pair(varit.first, optarg)); + continue; + } + + if (semantic->min_tokens() == 0 && varit.second.as()) { + updateDeviceArguments(fmt::format("--{}", varit.first), ""); + control.options.insert(std::make_pair(varit.first, optarg)); + continue; + } + + // the value semantics allows different properties like + // multitoken, zero_token and composing + // currently only the simple case is supported + assert(semantic->min_tokens() <= 1); + // assert(semantic->max_tokens() && semantic->min_tokens()); + if (semantic->min_tokens() == 0) { + control.options.insert(std::make_pair(varit.first, optarg)); + continue; + } + + if (semantic->min_tokens() > 0) { + std::string stringRep; + if (auto v = boost::any_cast(&varit.second.value())) { + stringRep = *v; + } else if (auto v = boost::any_cast(&varit.second.value())) { + std::stringstream tmp; + tmp << *v; + stringRep = fmt::format("{}", tmp.str()); + } + if (varit.first == "channel-config") { + // FIXME: the parameter to channel-config can be a list of configurations separated + // by semicolon. The individual configurations will be separated and added individually. + // The device arguments can then contaoin multiple channel-config entries, but only + // one for the last configuration is added to control.options + processRawChannelConfig(stringRep); + optarg = tmpArgs.back().c_str(); + } else { + std::string key(fmt::format("--{}", varit.first)); + if (stringRep.length() == 0) { + // in order to identify options without parameter we add a string + // with one blank for the 'blank' parameter, it is filtered out + // further down and a zero-length string is added to argument list + stringRep = " "; } + updateDeviceArguments(key, stringRep); + optarg = uniqueDeviceArgs[key].c_str(); } - control.options.insert(std::make_pair(varit.first, optarg)); } + control.options.insert(std::make_pair(varit.first, optarg)); } }; @@ -1653,11 +1671,11 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, // Add the channel configuration for (auto& channel : spec.outputChannels) { - tmpArgs.emplace_back(std::string("--channel-config")); + tmpArgs.emplace_back("--channel-config"); tmpArgs.emplace_back(outputChannel2String(channel)); } for (auto& channel : spec.inputChannels) { - tmpArgs.emplace_back(std::string("--channel-config")); + tmpArgs.emplace_back("--channel-config"); tmpArgs.emplace_back(inputChannel2String(channel)); } From 07bef59c8f3c8902ac03d671071adc4c297e98ed Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Mon, 11 May 2026 16:15:41 +0200 Subject: [PATCH 07/14] fixup! streamline option building (based on #15320) --- Framework/Core/src/DeviceSpecHelpers.cxx | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index d3c3ace1477a6..38e6b8016df56 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1604,10 +1604,6 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, continue; } - if (varit.second.defaulted()) { - continue; - } - // check the semantics of the value auto semantic = description->semantic(); const char* optarg = ""; From e8854d7991bd56219e58d9d1c7ceb9356b62473f Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Mon, 11 May 2026 16:58:05 +0200 Subject: [PATCH 08/14] add aod-origin-replace option --- Framework/Core/include/Framework/AnalysisHelpers.h | 1 + Framework/Core/include/Framework/AnalysisTask.h | 12 ++++++++++++ Framework/Core/src/AnalysisHelpers.cxx | 5 +++++ Framework/Core/src/WorkflowCustomizationHelpers.cxx | 2 ++ 4 files changed, 20 insertions(+) diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 177df31a5675e..a69eebcef8286 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -69,6 +69,7 @@ struct IndexBuilder { namespace o2::framework { +void wrongOriginReplacement(std::string_view replacement); std::shared_ptr makeEmptyTableImpl(const char* name, std::shared_ptr& schema); template diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 4116a2d2528a8..565017bed346a 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -525,6 +525,18 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) std::vector expressionInfos; std::vector inputInfos; + std::string replacementOrigin; + header::DataOrigin newOrigin{"AOD"}; + if (ctx.options().hasOption("aod-origin-replace")) { + replacementOrigin = ctx.options().get("aod-origin-replace"); + if (replacementOrigin.size() > 4UL) { + wrongOriginReplacement(replacementOrigin); + } + } + if (!replacementOrigin.empty()) { + newOrigin.runtimeInit(replacementOrigin.c_str(), std::min(replacementOrigin.size(), 4UL)); + } + constexpr const int numElements = nested_brace_constructible_size>() / 10; /// make sure options and configurables are set before expression infos are created diff --git a/Framework/Core/src/AnalysisHelpers.cxx b/Framework/Core/src/AnalysisHelpers.cxx index 149664c42caba..1a550ab24db24 100644 --- a/Framework/Core/src/AnalysisHelpers.cxx +++ b/Framework/Core/src/AnalysisHelpers.cxx @@ -79,6 +79,11 @@ std::shared_ptr IndexBuilder::materialize(std::vector makeEmptyTableImpl(const char* name, std::shared_ptr& schema) { schema = schema->WithMetadata(std::make_shared(std::vector{std::string{"label"}}, std::vector{std::string{name}})); diff --git a/Framework/Core/src/WorkflowCustomizationHelpers.cxx b/Framework/Core/src/WorkflowCustomizationHelpers.cxx index 2154d0fe26f8d..67101749d94df 100644 --- a/Framework/Core/src/WorkflowCustomizationHelpers.cxx +++ b/Framework/Core/src/WorkflowCustomizationHelpers.cxx @@ -67,6 +67,8 @@ std::vector WorkflowCustomizationHelpers::requiredWorkflowOptio {"aod-writer-resmode", VariantType::String, "RECREATE", {"Creation mode of the result files: NEW, CREATE, RECREATE, UPDATE"}}, {"aod-writer-ntfmerge", VariantType::Int, -1, {"Number of time frames to merge into one file"}}, {"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}}, + // options to manipulate origins + {"aod-origin-replace", VariantType::String, "", {"Replace AOD origin with the string provided"}}, {"fairmq-rate-logging", VariantType::Int, 0, {"Rate logging for FairMQ channels"}}, {"fairmq-recv-buffer-size", VariantType::Int, 4, {"recvBufferSize option for FairMQ channels"}}, From 61fbfdecba40d79d26b18c3a395cd90f5fd8868b Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Tue, 12 May 2026 14:26:11 +0200 Subject: [PATCH 09/14] complete baseline origin replacement --- .../Core/include/Framework/AnalysisHelpers.h | 44 +++++--- .../Core/include/Framework/AnalysisManagers.h | 24 ++++ .../Core/include/Framework/AnalysisTask.h | 105 +++++++++++------- .../Core/include/Framework/GroupSlicer.h | 15 ++- .../Core/include/Framework/InputRecord.h | 2 +- Framework/Core/src/AnalysisHelpers.cxx | 15 +++ 6 files changed, 141 insertions(+), 64 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index a69eebcef8286..67ac0f109aa7f 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -173,6 +173,9 @@ struct Builder { std::shared_ptr materialize(ProcessingContext& pc); }; + +ConfigParamSpec replaceOrigin(ConfigParamSpec& source, std::string const& originStr); +ConcreteDataMatcher replaceOrigin(ConcreteDataMatcher& matcher, const header::DataOrigin& newOrigin); } // namespace o2::framework namespace o2::soa @@ -274,19 +277,23 @@ consteval IndexKind getIndexKind() } template -inline constexpr auto getIndexMapping() +inline constexpr auto getIndexMapping(header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) { std::vector idx; using indices = T::index_pack_t; using Key = T::Key; - [&idx](std::index_sequence) mutable { + [&idx, &newOrigin](std::index_sequence) mutable { constexpr auto refs = T::generateSources(); - ([&idx]() mutable { + ([&idx, &newOrigin]() mutable { constexpr auto pos = o2::aod::MetadataTrait>::metadata::template getIndexPosToKey(); + auto matcher = o2::aod::matcher(); + if ((ref.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) { + matcher = replaceOrigin(matcher, newOrigin); + } if constexpr (pos == -1) { - idx.emplace_back(o2::aod::label(), o2::aod::matcher(), C::columnLabel(), IndexKind::IdxSelf, pos); + idx.emplace_back(o2::aod::label(), matcher, C::columnLabel(), IndexKind::IdxSelf, pos); } else { - idx.emplace_back(o2::aod::label(), o2::aod::matcher(), C::columnLabel(), getIndexKind(), pos); + idx.emplace_back(o2::aod::label(), matcher, C::columnLabel(), getIndexKind(), pos); } }.template operator()>(), ...); @@ -381,16 +388,16 @@ constexpr auto getExpressionMetadata() -> std::vector -constexpr auto getIndexMetadata() -> std::vector +constexpr auto getIndexMetadata(header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) -> std::vector { - auto map = getIndexMapping(); + auto map = getIndexMapping(newOrigin); return {framework::ConfigParamSpec{"index-records", framework::VariantType::String, framework::serializeIndexRecords(map), {"\"\""}}, {framework::ConfigParamSpec{"index-exclusive", framework::VariantType::Bool, T::exclusive, {"\"\""}}}}; } template requires(!soa::with_index_pack) -constexpr auto getIndexMetadata() -> std::vector +constexpr auto getIndexMetadata(header::DataOrigin) -> std::vector { return {}; } @@ -398,7 +405,7 @@ constexpr auto getIndexMetadata() -> std::vector } // namespace template -constexpr auto tableRef2InputSpec() +constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) { std::vector metadata; std::vector sources; @@ -407,24 +414,29 @@ constexpr auto tableRef2InputSpec() } else if constexpr (soa::with_sources_generator>::metadata>) { sources = getInputMetadata>::metadata, o2::aod::Hash>(); } + if ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) { + std::ranges::transform(sources, sources.begin(), [originStr = newOrigin.as()](framework::ConfigParamSpec& source){ + return replaceOrigin(source, originStr); + }); + } metadata.insert(metadata.end(), sources.begin(), sources.end()); auto ccdbURLs = getCCDBMetadata>::metadata>(); metadata.insert(metadata.end(), ccdbURLs.begin(), ccdbURLs.end()); auto expressions = getExpressionMetadata>::metadata>(); metadata.insert(metadata.end(), expressions.begin(), expressions.end()); - auto indices = getIndexMetadata>::metadata>(); + auto indices = getIndexMetadata>::metadata>(newOrigin); metadata.insert(metadata.end(), indices.begin(), indices.end()); if constexpr (!soa::with_ccdb_urls>::metadata>) { metadata.emplace_back(framework::ConfigParamSpec{"schema", framework::VariantType::String, framework::serializeSchema(o2::aod::MetadataTrait>::metadata::getSchema()), {"\"\""}}); } return framework::InputSpec{ - o2::aod::label(), - o2::aod::origin(), - o2::aod::description(o2::aod::signature()), - R.version, - framework::Lifetime::Timeframe, - metadata}; + o2::aod::label(), + ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) ? newOrigin : o2::aod::origin(), + o2::aod::description(o2::aod::signature()), + R.version, + framework::Lifetime::Timeframe, + metadata}; } template diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index df45b692d26b9..24f4fccaf0e0b 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -577,6 +577,30 @@ static void setGroupedCombination(C& comb, TG& grouping, std::tuple& asso } /// Preslice handling +template + requires(!is_preslice && !is_preslice_group) +bool replaceOrigin(T&, header::DataOrigin const&) +{ + return false; +} + +template +bool replaceOrigin(T& preslice, header::DataOrigin const& newOrigin) +{ + if ((T::target_t::originals[0].origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) { + preslice.bindingKey.matcher = framework::replaceOrigin(preslice.bindingKey.matcher, newOrigin); + return true; + } + return false; +} + +template +bool replaceOrigin(T& presliceGroup, header::DataOrigin const& newOrigin) +{ + homogeneous_apply_refs([&newOrigin](auto& preslice){ return replaceOrigin(preslice, newOrigin); }, presliceGroup); + return true; +} + template requires(!is_preslice && !is_preslice_group) bool registerCache(T&, Cache&, Cache&) diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index 565017bed346a..d00b65be22d27 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -102,12 +102,12 @@ struct AnalysisDataProcessorBuilder { } template - static void addOriginalRef(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash) + static void addOriginalRef(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) { - auto spec = soa::tableRef2InputSpec(); + auto spec = soa::tableRef2InputSpec(newOrigin); spec.metadata.emplace_back(ConfigParamSpec{std::string{"control:"} + name, VariantType::Bool, value, {"\"\""}}); - DataSpecUtils::updateInputList(inputs, std::move(spec)); auto matcher = DataSpecUtils::asConcreteDataMatcher(spec); + DataSpecUtils::updateInputList(inputs, std::move(spec)); auto locate = std::ranges::find_if(iInfos, [&hash](auto const& info) { return info.hash == hash; }); if (locate == iInfos.end()) { iInfos.emplace_back(hash, std::vector{std::pair{ai, matcher}}); @@ -140,43 +140,43 @@ struct AnalysisDataProcessorBuilder { /// helpers to append InputSpec for a single argument template - static void addInput(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash) + static void addInput(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash, header::DataOrigin&& newOrigin = header::DataOrigin{"AOD"}) { - [&name, &value, &inputs, &iInfos, &ai, &hash] refs, size_t... Is>(std::index_sequence) mutable { - (addOriginalRef(name, value, inputs, iInfos, ai, hash), ...); + [&name, &value, &inputs, &iInfos, &ai, &hash, newOrigin = std::move(newOrigin)] refs, size_t... Is>(std::index_sequence) mutable { + (addOriginalRef(name, value, inputs, iInfos, ai, hash, newOrigin), ...); }.template operator()::originals>(std::make_index_sequence::originals.size()>()); } - template - static void addInput(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash) - { - addInput::parent_t>(name, value, inputs, iInfos, ai, hash); - } + // template + // static void addInput(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash, header::DataOrigin&& newOrigin = header::DataOrigin{"AOD"}) + // { + // addInput::parent_t>(name, value, inputs, iInfos, ai, hash, newOrigin); + // } /// helper to append the inputs and expression information for normalized arguments template - static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos) + static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos, header::DataOrigin&& newOrigin = header::DataOrigin{"AOD"}) { int ai = -1; - ([&ai, &hash, &eInfos, &name, &value, &inputs, &iInfos]() mutable { + ([&ai, &hash, &eInfos, &name, &value, &inputs, &iInfos, newOrigin]() mutable { ++ai; using T = std::decay_t; addExpression(ai, hash, eInfos); - addInput(name, value, inputs, iInfos, ai, hash); + addInput(name, value, inputs, iInfos, ai, hash, std::move(newOrigin)); }(), ...); } /// helper to parse the process arguments template - inline static bool requestInputsFromArgs(T&, std::string const&, std::vector&, std::vector&, std::vector&) + inline static bool requestInputsFromArgs(T&, std::string const&, std::vector&, std::vector&, std::vector&, header::DataOrigin) { return false; } template - inline static bool requestInputsFromArgs(T& pc, std::string const& name, std::vector& inputs, std::vector& eis, std::vector& iifs) + inline static bool requestInputsFromArgs(T& pc, std::string const& name, std::vector& inputs, std::vector& eis, std::vector& iifs, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) { - AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (name + "/" + pc.name).c_str(), pc.value, inputs, eis, iifs); + AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (name + "/" + pc.name).c_str(), pc.value, inputs, eis, iifs, newOrigin); return true; } template @@ -192,7 +192,7 @@ struct AnalysisDataProcessorBuilder { } /// 1. enumeration (must be the only argument) template - static void inputsFromArgs(void (C::*)(A), const char* /*name*/, bool /*value*/, std::vector& inputs, std::vector&, std::vector&) + static void inputsFromArgs(void (C::*)(A), const char* /*name*/, bool /*value*/, std::vector& inputs, std::vector&, std::vector&, header::DataOrigin) { std::vector inputMetadata; // FIXME: for the moment we do not support begin, end and step. @@ -201,20 +201,20 @@ struct AnalysisDataProcessorBuilder { /// 2. 1st argument is an iterator template - static void inputsFromArgs(void (C::*)(A, Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos) + static void inputsFromArgs(void (C::*)(A, Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) requires(std::is_lvalue_reference_v && (std::is_lvalue_reference_v && ...)) { constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); - addInputsAndExpressions::parent_t, Args...>(hash, name, value, inputs, eInfos, iInfos); + addInputsAndExpressions::parent_t, Args...>(hash, name, value, inputs, eInfos, iInfos, std::move(newOrigin)); } /// 3. generic case template - static void inputsFromArgs(void (C::*)(Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos) + static void inputsFromArgs(void (C::*)(Args...), const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) requires(std::is_lvalue_reference_v && ...) { constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId(); - addInputsAndExpressions(hash, name, value, inputs, eInfos, iInfos); + addInputsAndExpressions(hash, name, value, inputs, eInfos, iInfos, std::move(newOrigin)); } /// 1. enumeration (no grouping) @@ -315,7 +315,7 @@ struct AnalysisDataProcessorBuilder { } template - static void invokeProcess(Task& task, InputRecord& inputs, std::vector iInfos, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices) + static void invokeProcess(Task& task, InputRecord& inputs, std::vector iInfos, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices, std::string const& newOriginStr) { using G = std::decay_t; auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, iInfos, processingFunction, infos); @@ -387,7 +387,7 @@ struct AnalysisDataProcessorBuilder { task); overwriteInternalIndices(associatedTables, associatedTables); if constexpr (soa::is_iterator) { - auto slicer = GroupSlicer(groupingTable, associatedTables, slices); + auto slicer = GroupSlicer(groupingTable, associatedTables, slices, newOriginStr); for (auto& slice : slicer) { auto associatedSlices = slice.associatedTables(); overwriteInternalIndices(associatedSlices, associatedTables); @@ -525,16 +525,16 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) std::vector expressionInfos; std::vector inputInfos; - std::string replacementOrigin; + std::string newOriginStr; header::DataOrigin newOrigin{"AOD"}; if (ctx.options().hasOption("aod-origin-replace")) { - replacementOrigin = ctx.options().get("aod-origin-replace"); - if (replacementOrigin.size() > 4UL) { - wrongOriginReplacement(replacementOrigin); + newOriginStr = ctx.options().get("aod-origin-replace"); + if (newOriginStr.size() > 4UL) { + wrongOriginReplacement(newOriginStr); } } - if (!replacementOrigin.empty()) { - newOrigin.runtimeInit(replacementOrigin.c_str(), std::min(replacementOrigin.size(), 4UL)); + if (!newOriginStr.empty()) { + newOrigin.runtimeInit(newOriginStr.c_str(), std::min(newOriginStr.size(), 4UL)); } constexpr const int numElements = nested_brace_constructible_size>() / 10; @@ -546,12 +546,12 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) /// parse process functions defined by corresponding configurables if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos, inputInfos); + AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos, inputInfos, newOrigin); } homogeneous_apply_refs_sized( - [name = name_str, &expressionInfos, &inputs, &inputInfos](auto& x) mutable { + [name = name_str, &expressionInfos, &inputs, &inputInfos, &newOrigin](auto& x) mutable { // this pushes (argumentIndex, processHash, schemaPtr, nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators - return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos, inputInfos); + return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos, inputInfos, newOrigin); }, *task.get()); @@ -574,13 +574,18 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end()); homogeneous_apply_refs_sized([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get()); - /// FIXME: In order to replace origins consistently, there are following things that need to be touched - /// 1. inputs and outputs, including their metadata - /// 2. inputInfos, that contain matchers for extracting arguments of process functions - /// 3. bindingKeys/bindingKeysUnsorted, that contain matchers to extract tables used to calculate slicing (created in init) - /// 4. Produces/Spawns/Defines/Builds contain matchers for required inputs and created outputs that need to be modified + // replace origins in Preslice declarations + homogeneous_apply_refs_sized([&newOrigin](auto& element){ return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get()); - auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos](InitContext& ic) mutable { + /// FIXME: In order to replace origins consistently, there are following things that need to be touched + /// 1. inputs and outputs, including their metadata - done + /// 2. inputInfos, that contain matchers for extracting arguments of process functions - done in the 1st step + /// 3. bindingKeys/bindingKeysUnsorted, that contain matchers to extract tables used to calculate slicing - preslices are update, bks are updated + /// 4. Produces/Spawns/Defines/Builds contain matchers for required inputs and created outputs that need to be modified - same + /// + /// 3a. GroupSlicer has to use runtime list of extractions + + auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { Cache bindingsKeys; Cache bindingsKeysUnsorted; // add preslice declarations to slicing cache definition @@ -626,10 +631,24 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) }, *task.get()); + /// replace origin in slicing caches + std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry){ + if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { + entry.matcher = replaceOrigin(entry.matcher, newOrigin); + } + return entry; + }); + std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry){ + if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { + entry.matcher = replaceOrigin(entry.matcher, newOrigin); + } + return entry; + }); + ic.services().get().setCaches(std::move(bindingsKeys)); ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); - return [task, expressionInfos, inputInfos](ProcessingContext& pc) mutable { + return [task, expressionInfos, inputInfos, newOriginStr](ProcessingContext& pc) mutable { // load the ccdb object from their cache homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); // reset partitions once per dataframe @@ -652,14 +671,14 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) } // execute process() if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), inputInfos, &T::process, expressionInfos, slices); + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), inputInfos, &T::process, expressionInfos, slices, newOriginStr); } // execute optional process() homogeneous_apply_refs_sized( - [&pc, &expressionInfos, &task, &slices, &inputInfos](auto& x) { + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOriginStr](auto& x) { if constexpr (is_process_configurable) { if (x.value == true) { - AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), inputInfos, x.process, expressionInfos, slices); + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), inputInfos, x.process, expressionInfos, slices, newOriginStr); return true; } return false; diff --git a/Framework/Core/include/Framework/GroupSlicer.h b/Framework/Core/include/Framework/GroupSlicer.h index 596e68d8cdd4c..f2b72b0c83bf9 100644 --- a/Framework/Core/include/Framework/GroupSlicer.h +++ b/Framework/Core/include/Framework/GroupSlicer.h @@ -14,6 +14,7 @@ #include "Framework/Pack.h" #include "Framework/ASoA.h" +#include "Framework/AnalysisHelpers.h" #include #include @@ -25,9 +26,9 @@ namespace o2::framework template struct GroupSlicer { using grouping_t = std::decay_t; - GroupSlicer(G& gt, std::tuple& at, ArrowTableSlicingCache& slices) + GroupSlicer(G& gt, std::tuple& at, ArrowTableSlicingCache& slices, std::string const& newOriginStr = "AOD") : max{gt.size()}, - mBegin{GroupSlicerIterator(gt, at, slices)} + mBegin{GroupSlicerIterator(gt, at, slices, newOriginStr)} { } @@ -55,7 +56,11 @@ struct GroupSlicer { { constexpr auto index = framework::has_type_at_v>(associated_pack_t{}); auto binding = o2::soa::getLabelFromTypeForKey>(mIndexColumnName); - auto bk = Entry(binding, o2::soa::getMatcherFromTypeForKey>(mIndexColumnName), mIndexColumnName); + auto matcher = o2::soa::getMatcherFromTypeForKey>(mIndexColumnName); + if ((matcher.origin == header::DataOrigin{"AOD"}) && (replacementOrigin != header::DataOrigin{"AOD"})) { + matcher = framework::replaceOrigin(matcher, replacementOrigin); + } + auto bk = Entry(binding, matcher, mIndexColumnName); if constexpr (!o2::soa::is_smallgroups>) { if (table.size() == 0) { return; @@ -82,7 +87,7 @@ struct GroupSlicer { starts[index] = selections[index]->begin(); } - GroupSlicerIterator(G& gt, std::tuple& at, ArrowTableSlicingCache& slices) + GroupSlicerIterator(G& gt, std::tuple& at, ArrowTableSlicingCache& slices, std::string const& newOriginStr = "AOD") : mIndexColumnName{std::string("fIndex") + o2::framework::cutString(o2::soa::getLabelFromType())}, mGt{>}, mAt{&at}, @@ -90,6 +95,7 @@ struct GroupSlicer { position{0}, mSlices{&slices} { + replacementOrigin.runtimeInit(newOriginStr.c_str(), newOriginStr.size()); if constexpr (soa::is_filtered_table>) { groupSelection = mGt->getSelectedRows(); } @@ -271,6 +277,7 @@ struct GroupSlicer { std::array sliceInfos; std::array sliceInfosUnsorted; ArrowTableSlicingCache* mSlices; + header::DataOrigin replacementOrigin; }; GroupSlicerIterator& begin() diff --git a/Framework/Core/include/Framework/InputRecord.h b/Framework/Core/include/Framework/InputRecord.h index d2e152c1bcacc..d0a757ce2790b 100644 --- a/Framework/Core/include/Framework/InputRecord.h +++ b/Framework/Core/include/Framework/InputRecord.h @@ -529,7 +529,7 @@ class InputRecord auto pos = getPos(matcher); if (pos < 0) { auto msg = describeAvailableInputs(); - throw runtime_error_f("InputRecord::get: no input with binding %s found. %s", DataSpecUtils::describe(matcher).c_str(), msg.c_str()); + throw runtime_error_f("InputRecord::get: no input %s found. %s", DataSpecUtils::describe(matcher).c_str(), msg.c_str()); } return getByPos(pos, part); } diff --git a/Framework/Core/src/AnalysisHelpers.cxx b/Framework/Core/src/AnalysisHelpers.cxx index 1a550ab24db24..5e46ed86860e8 100644 --- a/Framework/Core/src/AnalysisHelpers.cxx +++ b/Framework/Core/src/AnalysisHelpers.cxx @@ -9,6 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "Framework/AnalysisHelpers.h" +#include #include "Framework/ExpressionHelpers.h" #include "ExpressionJSONHelpers.h" #include "IndexJSONHelpers.h" @@ -84,6 +85,20 @@ void wrongOriginReplacement(std::string_view replacement) throw framework::runtime_error_f("Provided origin replacement string is longer than 4 symbols: %s", replacement.data()); } +ConfigParamSpec replaceOrigin(ConfigParamSpec& source, std::string const& originStr) +{ + if (!source.name.starts_with("input:")) { + return source; + } + source.defaultValue = std::regex_replace(source.defaultValue.get(), std::regex{"/AOD/"}, "/" + originStr + "/"); + return source; +} + +ConcreteDataMatcher replaceOrigin(ConcreteDataMatcher& matcher, header::DataOrigin const& newOrigin) +{ + return ConcreteDataMatcher{newOrigin, matcher.description, matcher.subSpec}; +} + std::shared_ptr makeEmptyTableImpl(const char* name, std::shared_ptr& schema) { schema = schema->WithMetadata(std::make_shared(std::vector{std::string{"label"}}, std::vector{std::string{name}})); From 90722dba582b9c52f65938863f174e5c2059e5a3 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Wed, 13 May 2026 13:19:54 +0200 Subject: [PATCH 10/14] track the former aod inputs/outpus with metadata --- .../src/AODJAlienReaderHelpers.cxx | 5 ++-- .../AnalysisSupport/src/DataInputDirector.cxx | 4 +-- .../AnalysisSupport/src/DataInputDirector.h | 2 +- .../Framework/AnalysisDataModelHelpers.h | 2 +- .../Core/include/Framework/AnalysisHelpers.h | 28 ++++++++++++++---- .../Core/include/Framework/AnalysisManagers.h | 29 +++++++++++++++---- .../Core/include/Framework/AnalysisTask.h | 9 ++++-- .../Core/src/AnalysisDataModelHelpers.cxx | 9 ++---- Framework/Core/src/AnalysisSupportHelpers.cxx | 6 +++- Framework/Core/src/ArrowSupport.cxx | 6 +++- Framework/Core/src/WorkflowHelpers.cxx | 12 ++++++-- 11 files changed, 82 insertions(+), 30 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index 8fde9f52a0e09..d7613d3266ecb 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -240,8 +240,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const // create header auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher); auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec); + bool wasAOD = std::ranges::any_of(route.matcher.metadata, [](ConfigParamSpec const& p){ return p.name.starts_with("aod-origin-replaced"); }); - if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) { + if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed, wasAOD)) { if (first) { // check if there is a next file to read fcnt += device.maxInputTimeslices; @@ -255,7 +256,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const } // get first folder of next file ntf = 0; - if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed)) { + if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed, wasAOD)) { LOGP(fatal, "Can not retrieve tree for table {}: fileCounter {}, timeFrame {}", concrete.origin.as(), fcnt, ntf); throw std::runtime_error("Processing is stopped!"); } diff --git a/Framework/AnalysisSupport/src/DataInputDirector.cxx b/Framework/AnalysisSupport/src/DataInputDirector.cxx index 46674f19400a6..1c3a3f5a65f2e 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.cxx +++ b/Framework/AnalysisSupport/src/DataInputDirector.cxx @@ -900,7 +900,7 @@ uint64_t DataInputDirector::getTimeFrameNumber(header::DataHeader dh, int counte return didesc->getTimeFrameNumber(counter, numTF, wantedLevel, origin); } -bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed) +bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed, bool wasAOD) { std::string treename; @@ -913,7 +913,7 @@ bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, // . filename from defaultDataInputDescriptor // . treename from DataHeader didesc = mdefaultDataInputDescriptor; - treename = aod::datamodel::getTreeName(dh); + treename = aod::datamodel::getTreeName(dh, wasAOD); } std::string origin = dh.dataOrigin.as(); diff --git a/Framework/AnalysisSupport/src/DataInputDirector.h b/Framework/AnalysisSupport/src/DataInputDirector.h index 18ab5c0c1382e..b48976d5e51dd 100644 --- a/Framework/AnalysisSupport/src/DataInputDirector.h +++ b/Framework/AnalysisSupport/src/DataInputDirector.h @@ -160,7 +160,7 @@ class DataInputDirector int getNumberInputDescriptors() { return mdataInputDescriptors.size(); } void createDefaultDataInputDescriptor(); - bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed); + bool readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed, bool wasAOD); uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF); arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF); int getTimeFramesInFile(header::DataHeader dh, int counter); diff --git a/Framework/Core/include/Framework/AnalysisDataModelHelpers.h b/Framework/Core/include/Framework/AnalysisDataModelHelpers.h index dc7e722e0fd91..b88356aa1c9e4 100644 --- a/Framework/Core/include/Framework/AnalysisDataModelHelpers.h +++ b/Framework/Core/include/Framework/AnalysisDataModelHelpers.h @@ -16,6 +16,6 @@ namespace o2::aod::datamodel { -std::string getTreeName(header::DataHeader dh); +std::string getTreeName(header::DataHeader dh, bool wasAOD); } // namespace o2::aod::datamodel #endif // O2_FRAMEWORK_ANALYSISDATAMODELHELPERS_H_ diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 67ac0f109aa7f..d971347df6031 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -418,6 +418,7 @@ constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOri std::ranges::transform(sources, sources.begin(), [originStr = newOrigin.as()](framework::ConfigParamSpec& source){ return replaceOrigin(source, originStr); }); + metadata.push_back(framework::ConfigParamSpec{"aod-origin-replaced", framework::VariantType::Bool, true, {"\"\""}}); } metadata.insert(metadata.end(), sources.begin(), sources.end()); auto ccdbURLs = getCCDBMetadata>::metadata>(); @@ -440,7 +441,7 @@ constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOri } template -constexpr auto tableRef2OutputSpec() +constexpr auto tableRef2OutputSpec(header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) { std::vector metadata; using md = typename o2::aod::MetadataTrait>::metadata; @@ -453,7 +454,7 @@ constexpr auto tableRef2OutputSpec() } return framework::OutputSpec{ framework::OutputLabel{o2::aod::label()}, - o2::aod::origin(), + ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) ? newOrigin : o2::aod::origin(), o2::aod::description(o2::aod::signature()), R.version, framework::Lifetime::Timeframe, @@ -497,6 +498,10 @@ struct WritingCursor { using persistent_table_t = decltype([]() { if constexpr (soa::is_iterator) { return typename T::parent_t{nullptr}; } else { return T{nullptr}; } }()); using cursor_t = decltype(std::declval().cursor()); OutputSpec outputSpec{soa::tableRef2OutputSpec()}; + OutputSpec updateOutputSpec(header::DataOrigin const& newOrigin) + { + outputSpec = soa::tableRef2OutputSpec(newOrigin); + } template void operator()(Ts&&... args) @@ -623,10 +628,21 @@ template struct TableTransform { using metadata = M; constexpr static auto sources = M::template generateSources>(); - std::vector requiredInputs = [](std::index_sequence){ - return std::vector{soa::tableRef2InputSpec()...}; - }(std::make_index_sequence()); - OutputSpec outputSpec = soa::tableRef2OutputSpec(); + + OutputSpec outputSpec = updateOutputSpec(); + static OutputSpec updateOutputSpec(header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"}) + { + return soa::tableRef2OutputSpec(newOrigin); + } + + std::vector requiredInputs = getRequiredInputs(); + static std::vector getRequiredInputs(header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"}) + { + return [&newOrigin](std::index_sequence){ + return std::vector{soa::tableRef2InputSpec(newOrigin)...}; + }(std::make_index_sequence()); + } + }; /// This helper struct allows you to declare extended tables which should be diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index 24f4fccaf0e0b..06b68bbc5e1a1 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -134,8 +134,19 @@ bool appendCondition(std::vector& inputs, C& conditionGroup) } /// Table auto-creation handling + +template +concept with_required_inputs = requires(T t) { t.getRequiredInputs(); }; + +template + requires(!with_required_inputs) +bool requestInputs(std::vector&, T&, header::DataOrigin) +{ + return false; +} + template -bool requestInputs(std::vector&, T const&) +bool updateOutputSpec(T&, header::DataOrigin) { return false; } @@ -158,12 +169,10 @@ const char* controlOption() return "control:define"; } -template -concept with_required_inputs = requires(T t) { t.requiredInputs.size(); }; - template -bool requestInputs(std::vector& inputs, T const& entity) +bool requestInputs(std::vector& inputs, T& entity, header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"}) { + entity.requiredInputs = entity.getRequiredInputs(newOrigin); for (auto base_spec : entity.requiredInputs) { base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption()}, VariantType::Bool, true, {"\"\""}}); DataSpecUtils::updateInputList(inputs, std::forward(base_spec)); @@ -171,6 +180,16 @@ bool requestInputs(std::vector& inputs, T const& entity) return true; } +template +concept with_updateable_output = requires(T t) { t.updateOutputSpec(); }; + +template +bool updateOutputSpec(T& entity, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) +{ + entity.outputSpec = entity.updateOutputSpec(newOrigin); + return true; +} + template bool newDataframeCondition(InputRecord&, C&) { diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index d00b65be22d27..ea67cee3860c3 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -557,8 +557,8 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) // request base tables for spawnable extended tables and indices to be built // this checks for duplications - homogeneous_apply_refs_sized([&inputs](auto& element) { - return analysis_task_parsers::requestInputs(inputs, element); + homogeneous_apply_refs_sized([&inputs, &newOrigin](auto& element) { + return analysis_task_parsers::requestInputs(inputs, element, newOrigin); }, *task.get()); @@ -567,8 +567,13 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) LOG(warn) << "Task " << name_str << " has no inputs"; } + // update OutputSpecs in output declarations + homogeneous_apply_refs_sized([&newOrigin](auto& element){ return analysis_task_parsers::updateOutputSpec(element, newOrigin); }, *task.get()); + + // append outputs homogeneous_apply_refs_sized([&outputs, &hash](auto& element) { return analysis_task_parsers::appendOutput(outputs, element, hash); }, *task.get()); + // request services auto requiredServices = CommonServices::defaultServices(); auto arrowServices = CommonServices::arrowServices(); requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end()); diff --git a/Framework/Core/src/AnalysisDataModelHelpers.cxx b/Framework/Core/src/AnalysisDataModelHelpers.cxx index b7b459c89d847..308bba1ca1507 100644 --- a/Framework/Core/src/AnalysisDataModelHelpers.cxx +++ b/Framework/Core/src/AnalysisDataModelHelpers.cxx @@ -12,8 +12,6 @@ #include "Framework/AnalysisDataModelHelpers.h" #include "Framework/AnalysisDataModel.h" #include "Framework/AnalysisSupportHelpers.h" -#include "Framework/StringHelpers.h" -#include "Framework/Logger.h" std::string str_tolower(std::string s) { @@ -25,7 +23,7 @@ std::string str_tolower(std::string s) namespace o2::aod::datamodel { -std::string getTreeName(header::DataHeader dh) +std::string getTreeName(header::DataHeader dh, bool wasAOD) { auto description = std::string(dh.dataDescription.str); auto iver = (float)dh.subSpecification; @@ -38,11 +36,8 @@ std::string getTreeName(header::DataHeader dh) } // add prefix according to origin - for (auto possibleOrigin : framework::AODOrigins) { - if (dh.dataOrigin == possibleOrigin) { + if (wasAOD || std::ranges::any_of(framework::AODOrigins, [&o = dh.dataOrigin](header::DataOrigin const& origin){ return o == origin; })) { treeName = "O2" + treeName; - break; - } } // exceptions from this diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index 35228bba531b0..d72bebdb3f63a 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -174,7 +174,11 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs // FIXME: until we have a single list of pairs additionalInputs | - views::partial_match_filter(AODOrigins) | + std::ranges::views::filter([](InputSpec const& input){ + return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){ + return p.name.starts_with("aod-origin-replaced"); + }); + }) | std::ranges::views::filter([](InputSpec const& input) { return std::ranges::none_of(input.metadata, [](ConfigParamSpec const& p) { return (p.name.compare("projectors") == 0) || (p.name.compare("index-records") == 0); }); }) | diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index eecff4ce87c74..307e80b2695fc 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -711,7 +711,11 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // update currently requested AODs for (auto& d : workflow) { d.inputs | - views::partial_match_filter(AODOrigins) | + std::ranges::views::filter([](InputSpec const& input){ + return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){ + return p.name.starts_with("aod-origin-replaced"); + }); + }) | sinks::update_input_list{dec.requestedAODs}; } diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 9b80ef14d7621..188b6653c6a43 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -282,8 +282,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext bool hasProjectors = false; bool hasIndexRecords = false; bool hasCCDBURLs = false; + bool wasAOD = false; // all three options are exclusive for (auto const& p : input.metadata) { + if (p.name.starts_with("aod-origin-replaced")) { + wasAOD = true; + } if (p.name.compare("projectors") == 0) { hasProjectors = true; break; @@ -342,7 +346,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext DataSpecUtils::updateInputList(dec.requestedIDXs, InputSpec{input}); } else if (hasCCDBURLs) { DataSpecUtils::updateInputList(dec.requestedTIMs, InputSpec{input}); - } else if (DataSpecUtils::partialMatch(input, AODOrigins)) { + } else if (DataSpecUtils::partialMatch(input, AODOrigins) || wasAOD) { DataSpecUtils::updateInputList(dec.requestedAODs, InputSpec{input}); } } @@ -353,8 +357,12 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext bool hasProjectors = false; bool hasIndexRecords = false; bool hasCCDBURLs = false; + bool wasAOD = false; // all three options are exclusive for (auto const& p : output.metadata) { + if (p.name.starts_with("aod-origin-replaced")) { + wasAOD = true; + } if (p.name.compare("projectors") == 0) { hasProjectors = true; break; @@ -374,7 +382,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext dec.providedTIMs.emplace_back(output); } else if (hasIndexRecords) { dec.providedIDXs.emplace_back(output); - } else if (DataSpecUtils::partialMatch(output, AODOrigins)) { + } else if (DataSpecUtils::partialMatch(output, AODOrigins) || wasAOD) { dec.providedAODs.emplace_back(output); } else if (DataSpecUtils::partialMatch(output, header::DataOrigin{"ATSK"})) { dec.providedOutputObjHist.emplace_back(output); From a3d90d625189f599157f8652afed03aed6423448 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 15 May 2026 11:17:54 +0200 Subject: [PATCH 11/14] make grouping aware of the origin change; fix exception in treeName generator; restrict Preslice to tables --- Framework/Core/include/Framework/ASoA.h | 31 ++++++++++++++----- .../Core/include/Framework/AnalysisHelpers.h | 16 +++------- .../Core/include/Framework/AnalysisManagers.h | 10 ++++-- .../Core/include/Framework/AnalysisTask.h | 13 ++++---- .../Framework/ArrowTableSlicingCache.h | 9 +++++- .../Core/include/Framework/GroupSlicer.h | 10 +++--- .../Core/src/AnalysisDataModelHelpers.cxx | 2 +- Framework/Core/src/ArrowSupport.cxx | 3 +- Framework/Core/src/ArrowTableSlicingCache.cxx | 14 +++++---- 9 files changed, 67 insertions(+), 41 deletions(-) diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index bb3597b51fb97..d991d013de201 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -1547,7 +1547,7 @@ struct PreslicePolicyGeneral : public PreslicePolicyBase { template concept is_preslice_policy = std::derived_from; -template +template struct PresliceBase : public Policy { constexpr static bool optional = OPT; using target_t = T; @@ -1580,13 +1580,13 @@ struct PresliceBase : public Policy { } }; -template +template using PresliceUnsorted = PresliceBase; -template +template using PresliceUnsortedOptional = PresliceBase; -template +template using Preslice = PresliceBase; -template +template using PresliceOptional = PresliceBase; template @@ -1744,7 +1744,12 @@ auto doFilteredSliceBy(T const* table, o2::framework::PresliceBase auto doSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache) { - auto localCache = cache.ptr->getCacheFor({"", o2::soa::getMatcherFromTypeForKey(node.name), node.name}); + auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){ + if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) { + m.origin = o; + } + return m; + }(o2::soa::getMatcherFromTypeForKey(node.name)), node.name}); auto [offset, count] = localCache.getSliceFor(value); auto t = typename T::self_t({table->asArrowTable()->Slice(static_cast(offset), count)}, static_cast(offset)); if (t.tableSize() != 0) { @@ -1756,7 +1761,12 @@ auto doSliceByCached(T const* table, framework::expressions::BindingNode const& template auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache) { - auto localCache = cache.ptr->getCacheFor({"", o2::soa::getMatcherFromTypeForKey(node.name), node.name}); + auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){ + if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) { + m.origin = o; + } + return m; + }(o2::soa::getMatcherFromTypeForKey(node.name)), node.name}); auto [offset, count] = localCache.getSliceFor(value); auto slice = table->asArrowTable()->Slice(static_cast(offset), count); return prepareFilteredSlice(table, slice, offset); @@ -1765,7 +1775,12 @@ auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode template auto doSliceByCachedUnsorted(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache) { - auto localCache = cache.ptr->getCacheUnsortedFor({"", o2::soa::getMatcherFromTypeForKey(node.name), node.name}); + auto localCache = cache.ptr->getCacheUnsortedFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){ + if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) { + m.origin = o; + } + return m; + }(o2::soa::getMatcherFromTypeForKey(node.name)), node.name}); if constexpr (soa::is_filtered_table) { auto t = typename T::self_t({table->asArrowTable()}, localCache.getSliceFor(value)); if (t.tableSize() != 0) { diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index d971347df6031..342651cdbb29b 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -452,6 +452,9 @@ constexpr auto tableRef2OutputSpec(header::DataOrigin newOrigin = header::DataOr } else if constexpr (soa::with_index_pack) { metadata.emplace_back("index-records", framework::VariantType::Bool, true, framework::ConfigParamSpec::HelpString{"\"\""}); } + if ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) { + metadata.push_back(framework::ConfigParamSpec{"aod-origin-replaced", framework::VariantType::Bool, true, {"\"\""}}); + } return framework::OutputSpec{ framework::OutputLabel{o2::aod::label()}, ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) ? newOrigin : o2::aod::origin(), @@ -461,15 +464,6 @@ constexpr auto tableRef2OutputSpec(header::DataOrigin newOrigin = header::DataOr metadata}; } -template -constexpr auto tableRef2Output() -{ - return framework::Output{ - o2::aod::origin(), - o2::aod::description(o2::aod::signature()), - R.version}; -} - template constexpr auto tableRef2OutputRef() { @@ -498,9 +492,9 @@ struct WritingCursor { using persistent_table_t = decltype([]() { if constexpr (soa::is_iterator) { return typename T::parent_t{nullptr}; } else { return T{nullptr}; } }()); using cursor_t = decltype(std::declval().cursor()); OutputSpec outputSpec{soa::tableRef2OutputSpec()}; - OutputSpec updateOutputSpec(header::DataOrigin const& newOrigin) + OutputSpec updateOutputSpec(header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"}) { - outputSpec = soa::tableRef2OutputSpec(newOrigin); + return soa::tableRef2OutputSpec(newOrigin); } template diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index 06b68bbc5e1a1..68c2a7c20eb1e 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -190,6 +190,12 @@ bool updateOutputSpec(T& entity, header::DataOrigin newOrigin = header::DataOrig return true; } +template +bool updateOutputSpec(T& producesGroup, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) +{ + homogeneous_apply_refs([&newOrigin](auto& produces){ return updateOutputSpec(produces, newOrigin); }, producesGroup); +} + template bool newDataframeCondition(InputRecord&, C&) { @@ -604,9 +610,9 @@ bool replaceOrigin(T&, header::DataOrigin const&) } template -bool replaceOrigin(T& preslice, header::DataOrigin const& newOrigin) +bool replaceOrigin(T& preslice, header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"}) { - if ((T::target_t::originals[0].origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) { + if ((T::target_t::binding_origin == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) { preslice.bindingKey.matcher = framework::replaceOrigin(preslice.bindingKey.matcher, newOrigin); return true; } diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index ea67cee3860c3..bbb02817887c4 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -315,7 +315,7 @@ struct AnalysisDataProcessorBuilder { } template - static void invokeProcess(Task& task, InputRecord& inputs, std::vector iInfos, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices, std::string const& newOriginStr) + static void invokeProcess(Task& task, InputRecord& inputs, std::vector iInfos, void (Task::*processingFunction)(Grouping, Associated...), std::vector& infos, ArrowTableSlicingCache& slices, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) { using G = std::decay_t; auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, iInfos, processingFunction, infos); @@ -387,7 +387,7 @@ struct AnalysisDataProcessorBuilder { task); overwriteInternalIndices(associatedTables, associatedTables); if constexpr (soa::is_iterator) { - auto slicer = GroupSlicer(groupingTable, associatedTables, slices, newOriginStr); + auto slicer = GroupSlicer(groupingTable, associatedTables, slices, newOrigin); for (auto& slice : slicer) { auto associatedSlices = slice.associatedTables(); overwriteInternalIndices(associatedSlices, associatedTables); @@ -652,8 +652,9 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) ic.services().get().setCaches(std::move(bindingsKeys)); ic.services().get().setCachesUnsorted(std::move(bindingsKeysUnsorted)); + ic.services().get().setOrigin(newOrigin); - return [task, expressionInfos, inputInfos, newOriginStr](ProcessingContext& pc) mutable { + return [task, expressionInfos, inputInfos, newOrigin](ProcessingContext& pc) mutable { // load the ccdb object from their cache homogeneous_apply_refs_sized([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get()); // reset partitions once per dataframe @@ -676,14 +677,14 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) } // execute process() if constexpr (requires { &T::process; }) { - AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), inputInfos, &T::process, expressionInfos, slices, newOriginStr); + AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), inputInfos, &T::process, expressionInfos, slices, newOrigin); } // execute optional process() homogeneous_apply_refs_sized( - [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOriginStr](auto& x) { + [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) { if constexpr (is_process_configurable) { if (x.value == true) { - AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), inputInfos, x.process, expressionInfos, slices, newOriginStr); + AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), inputInfos, x.process, expressionInfos, slices, newOrigin); return true; } return false; diff --git a/Framework/Core/include/Framework/ArrowTableSlicingCache.h b/Framework/Core/include/Framework/ArrowTableSlicingCache.h index 073eadc22d72c..b7cd1df2a74c6 100644 --- a/Framework/Core/include/Framework/ArrowTableSlicingCache.h +++ b/Framework/Core/include/Framework/ArrowTableSlicingCache.h @@ -64,9 +64,14 @@ struct ArrowTableSlicingCacheDef { constexpr static ServiceKind service_kind = ServiceKind::Global; Cache bindingsKeys; Cache bindingsKeysUnsorted; + header::DataOrigin newOrigin = header::DataOrigin{"AOD"}; void setCaches(Cache&& bsks); void setCachesUnsorted(Cache&& bsks); + void setOrigin(header::DataOrigin newOrigin_ = header::DataOrigin{"AOD"}) + { + newOrigin = newOrigin_; + } }; struct ArrowTableSlicingCache { @@ -80,7 +85,9 @@ struct ArrowTableSlicingCache { std::vector> valuesUnsorted; std::vector groups; - ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorted = {}); + header::DataOrigin newOrigin = header::DataOrigin{"AOD"}; + + ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorted = {}, header::DataOrigin newOrigin_ = header::DataOrigin{"AOD"}); // set caching information externally void setCaches(Cache&& bsks, Cache&& bsksUnsorted = {}); diff --git a/Framework/Core/include/Framework/GroupSlicer.h b/Framework/Core/include/Framework/GroupSlicer.h index f2b72b0c83bf9..25cb882359b06 100644 --- a/Framework/Core/include/Framework/GroupSlicer.h +++ b/Framework/Core/include/Framework/GroupSlicer.h @@ -26,9 +26,9 @@ namespace o2::framework template struct GroupSlicer { using grouping_t = std::decay_t; - GroupSlicer(G& gt, std::tuple& at, ArrowTableSlicingCache& slices, std::string const& newOriginStr = "AOD") + GroupSlicer(G& gt, std::tuple& at, ArrowTableSlicingCache& slices, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) : max{gt.size()}, - mBegin{GroupSlicerIterator(gt, at, slices, newOriginStr)} + mBegin{GroupSlicerIterator(gt, at, slices, newOrigin)} { } @@ -87,15 +87,15 @@ struct GroupSlicer { starts[index] = selections[index]->begin(); } - GroupSlicerIterator(G& gt, std::tuple& at, ArrowTableSlicingCache& slices, std::string const& newOriginStr = "AOD") + GroupSlicerIterator(G& gt, std::tuple& at, ArrowTableSlicingCache& slices, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) : mIndexColumnName{std::string("fIndex") + o2::framework::cutString(o2::soa::getLabelFromType())}, mGt{>}, mAt{&at}, mGroupingElement{gt.begin()}, position{0}, - mSlices{&slices} + mSlices{&slices}, + replacementOrigin{newOrigin} { - replacementOrigin.runtimeInit(newOriginStr.c_str(), newOriginStr.size()); if constexpr (soa::is_filtered_table>) { groupSelection = mGt->getSelectedRows(); } diff --git a/Framework/Core/src/AnalysisDataModelHelpers.cxx b/Framework/Core/src/AnalysisDataModelHelpers.cxx index 308bba1ca1507..e3e9929e8f473 100644 --- a/Framework/Core/src/AnalysisDataModelHelpers.cxx +++ b/Framework/Core/src/AnalysisDataModelHelpers.cxx @@ -42,7 +42,7 @@ std::string getTreeName(header::DataHeader dh, bool wasAOD) // exceptions from this auto origin = std::string(dh.dataOrigin.str); - if (origin == "AOD" && description == "MCCOLLISLABEL") { + if ((origin == "AOD" || wasAOD) && description == "MCCOLLISLABEL") { treeName = "O2mccollisionlabel"; } diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 307e80b2695fc..8232dc43222c4 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -769,7 +769,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowTableSlicingCacheSpec() .uniqueId = CommonServices::simpleServiceId(), .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId(), new ArrowTableSlicingCache(Cache{services.get().bindingsKeys}, - Cache{services.get().bindingsKeysUnsorted}), + Cache{services.get().bindingsKeysUnsorted}, + services.get().newOrigin), ServiceKind::Stream, typeid(ArrowTableSlicingCache).name()}; }, .configure = CommonServices::noConfiguration(), .preProcessing = [](ProcessingContext& pc, void* service_ptr) { diff --git a/Framework/Core/src/ArrowTableSlicingCache.cxx b/Framework/Core/src/ArrowTableSlicingCache.cxx index 5162c698a1d66..a3cb755f158ef 100644 --- a/Framework/Core/src/ArrowTableSlicingCache.cxx +++ b/Framework/Core/src/ArrowTableSlicingCache.cxx @@ -11,6 +11,7 @@ #include "Framework/ArrowTableSlicingCache.h" #include "Framework/RuntimeError.h" +#include "Framework/DataSpecUtils.h" #include #include @@ -78,9 +79,10 @@ void ArrowTableSlicingCacheDef::setCachesUnsorted(Cache&& bsks) bindingsKeysUnsorted = bsks; } -ArrowTableSlicingCache::ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorted) +ArrowTableSlicingCache::ArrowTableSlicingCache(Cache&& bsks, Cache&& bsksUnsorted, header::DataOrigin newOrigin_) : bindingsKeys{bsks}, - bindingsKeysUnsorted{bsksUnsorted} + bindingsKeysUnsorted{bsksUnsorted}, + newOrigin{newOrigin_} { offsets.resize(bindingsKeys.size()); sizes.resize(bindingsKeys.size()); @@ -112,7 +114,7 @@ arrow::Status ArrowTableSlicingCache::updateCacheEntry(int pos, std::shared_ptr< } auto& [b, m, k, e] = bindingsKeys[pos]; if (!e) { - throw runtime_error_f("Disabled cache %s/%s update requested", b.c_str(), k.c_str()); + throw runtime_error_f("Disabled cache (%s) %s/%s update requested", DataSpecUtils::describe(m).c_str(), b.c_str(), k.c_str()); } validateOrder(bindingsKeys[pos], table); @@ -205,7 +207,7 @@ std::pair ArrowTableSlicingCache::getCachePos(const Entry& bindingKey if (pos != -1) { return {pos, false}; } - throw runtime_error_f("%s/%s not found neither in sorted or unsorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); + throw runtime_error_f("(%s) %s/%s not found neither in sorted or unsorted cache", DataSpecUtils::describe(bindingKey.matcher).c_str(), bindingKey.binding.c_str(), bindingKey.key.c_str()); } int ArrowTableSlicingCache::getCachePosSortedFor(Entry const& bindingKey) const @@ -242,10 +244,10 @@ SliceInfoUnsortedPtr ArrowTableSlicingCache::getCacheUnsortedFor(const Entry& bi { auto [p, s] = getCachePos(bindingKey); if (s) { - throw runtime_error_f("%s/%s is found in sorted cache", bindingKey.binding.c_str(), bindingKey.key.c_str()); + throw runtime_error_f("(%s) %s/%s is found in sorted cache", DataSpecUtils::describe(bindingKey.matcher).c_str(), bindingKey.binding.c_str(), bindingKey.key.c_str()); } if (!bindingsKeysUnsorted[p].enabled) { - throw runtime_error_f("Disabled unsorted cache %s/%s is requested", bindingKey.binding.c_str(), bindingKey.key.c_str()); + throw runtime_error_f("Disabled unsorted cache (%s) %s/%s is requested", DataSpecUtils::describe(bindingKey.matcher).c_str(), bindingKey.binding.c_str(), bindingKey.key.c_str()); } return getCacheUnsortedForPos(p); From b173df65c7b77848decf788e32ceb7a688c5aad2 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 15 May 2026 11:19:46 +0200 Subject: [PATCH 12/14] remove comments --- Framework/Core/include/Framework/AnalysisTask.h | 8 -------- 1 file changed, 8 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index bbb02817887c4..d26fda13157cc 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -582,14 +582,6 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) // replace origins in Preslice declarations homogeneous_apply_refs_sized([&newOrigin](auto& element){ return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get()); - /// FIXME: In order to replace origins consistently, there are following things that need to be touched - /// 1. inputs and outputs, including their metadata - done - /// 2. inputInfos, that contain matchers for extracting arguments of process functions - done in the 1st step - /// 3. bindingKeys/bindingKeysUnsorted, that contain matchers to extract tables used to calculate slicing - preslices are update, bks are updated - /// 4. Produces/Spawns/Defines/Builds contain matchers for required inputs and created outputs that need to be modified - same - /// - /// 3a. GroupSlicer has to use runtime list of extractions - auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { Cache bindingsKeys; Cache bindingsKeysUnsorted; From af903ee79405dbd64b87684c405b7b4aab9b02f1 Mon Sep 17 00:00:00 2001 From: Anton Alkin Date: Fri, 15 May 2026 11:21:18 +0200 Subject: [PATCH 13/14] fixup! remove comments --- Framework/Core/include/Framework/AnalysisTask.h | 6 ------ 1 file changed, 6 deletions(-) diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index d26fda13157cc..bdff4a3c0cc25 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -147,12 +147,6 @@ struct AnalysisDataProcessorBuilder { }.template operator()::originals>(std::make_index_sequence::originals.size()>()); } - // template - // static void addInput(const char* name, bool value, std::vector& inputs, std::vector& iInfos, int ai, uint32_t hash, header::DataOrigin&& newOrigin = header::DataOrigin{"AOD"}) - // { - // addInput::parent_t>(name, value, inputs, iInfos, ai, hash, newOrigin); - // } - /// helper to append the inputs and expression information for normalized arguments template static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector& inputs, std::vector& eInfos, std::vector& iInfos, header::DataOrigin&& newOrigin = header::DataOrigin{"AOD"}) From ba73ee1afa89fbd1eb91ce241b78807622369c96 Mon Sep 17 00:00:00 2001 From: ALICE Action Bot Date: Fri, 15 May 2026 09:31:53 +0000 Subject: [PATCH 14/14] Please consider the following formatting changes --- .../src/AODJAlienReaderHelpers.cxx | 2 +- Framework/Core/include/Framework/ASoA.h | 15 +++++++----- .../Core/include/Framework/AnalysisHelpers.h | 23 ++++++++----------- .../Core/include/Framework/AnalysisManagers.h | 12 +++++----- .../Core/include/Framework/AnalysisTask.h | 11 ++++----- .../Core/src/AnalysisDataModelHelpers.cxx | 4 ++-- Framework/Core/src/AnalysisSupportHelpers.cxx | 4 ++-- Framework/Core/src/ArrowSupport.cxx | 4 ++-- 8 files changed, 37 insertions(+), 38 deletions(-) diff --git a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx index d7613d3266ecb..7f7f4e048b440 100644 --- a/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx +++ b/Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx @@ -240,7 +240,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const // create header auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher); auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec); - bool wasAOD = std::ranges::any_of(route.matcher.metadata, [](ConfigParamSpec const& p){ return p.name.starts_with("aod-origin-replaced"); }); + bool wasAOD = std::ranges::any_of(route.matcher.metadata, [](ConfigParamSpec const& p) { return p.name.starts_with("aod-origin-replaced"); }); if (!didir->readTree(outputs, dh, fcnt, ntf, totalSizeCompressed, totalSizeUncompressed, wasAOD)) { if (first) { diff --git a/Framework/Core/include/Framework/ASoA.h b/Framework/Core/include/Framework/ASoA.h index d991d013de201..180945ad44061 100644 --- a/Framework/Core/include/Framework/ASoA.h +++ b/Framework/Core/include/Framework/ASoA.h @@ -1744,12 +1744,13 @@ auto doFilteredSliceBy(T const* table, o2::framework::PresliceBase auto doSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache) { - auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){ + auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m) { if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) { m.origin = o; } return m; - }(o2::soa::getMatcherFromTypeForKey(node.name)), node.name}); + }(o2::soa::getMatcherFromTypeForKey(node.name)), + node.name}); auto [offset, count] = localCache.getSliceFor(value); auto t = typename T::self_t({table->asArrowTable()->Slice(static_cast(offset), count)}, static_cast(offset)); if (t.tableSize() != 0) { @@ -1761,12 +1762,13 @@ auto doSliceByCached(T const* table, framework::expressions::BindingNode const& template auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache) { - auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){ + auto localCache = cache.ptr->getCacheFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m) { if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) { m.origin = o; } return m; - }(o2::soa::getMatcherFromTypeForKey(node.name)), node.name}); + }(o2::soa::getMatcherFromTypeForKey(node.name)), + node.name}); auto [offset, count] = localCache.getSliceFor(value); auto slice = table->asArrowTable()->Slice(static_cast(offset), count); return prepareFilteredSlice(table, slice, offset); @@ -1775,12 +1777,13 @@ auto doFilteredSliceByCached(T const* table, framework::expressions::BindingNode template auto doSliceByCachedUnsorted(T const* table, framework::expressions::BindingNode const& node, int value, o2::framework::SliceCache& cache) { - auto localCache = cache.ptr->getCacheUnsortedFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m){ + auto localCache = cache.ptr->getCacheUnsortedFor({"", [&o = cache.ptr->newOrigin](framework::ConcreteDataMatcher&& m) { if ((m.origin == header::DataOrigin{"AOD"}) && (o != header::DataOrigin{"AOD"})) { m.origin = o; } return m; - }(o2::soa::getMatcherFromTypeForKey(node.name)), node.name}); + }(o2::soa::getMatcherFromTypeForKey(node.name)), + node.name}); if constexpr (soa::is_filtered_table) { auto t = typename T::self_t({table->asArrowTable()}, localCache.getSliceFor(value)); if (t.tableSize() != 0) { diff --git a/Framework/Core/include/Framework/AnalysisHelpers.h b/Framework/Core/include/Framework/AnalysisHelpers.h index 342651cdbb29b..6066da7957b71 100644 --- a/Framework/Core/include/Framework/AnalysisHelpers.h +++ b/Framework/Core/include/Framework/AnalysisHelpers.h @@ -415,7 +415,7 @@ constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOri sources = getInputMetadata>::metadata, o2::aod::Hash>(); } if ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) { - std::ranges::transform(sources, sources.begin(), [originStr = newOrigin.as()](framework::ConfigParamSpec& source){ + std::ranges::transform(sources, sources.begin(), [originStr = newOrigin.as()](framework::ConfigParamSpec& source) { return replaceOrigin(source, originStr); }); metadata.push_back(framework::ConfigParamSpec{"aod-origin-replaced", framework::VariantType::Bool, true, {"\"\""}}); @@ -432,12 +432,12 @@ constexpr auto tableRef2InputSpec(header::DataOrigin newOrigin = header::DataOri } return framework::InputSpec{ - o2::aod::label(), - ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) ? newOrigin : o2::aod::origin(), - o2::aod::description(o2::aod::signature()), - R.version, - framework::Lifetime::Timeframe, - metadata}; + o2::aod::label(), + ((R.origin_hash == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) ? newOrigin : o2::aod::origin(), + o2::aod::description(o2::aod::signature()), + R.version, + framework::Lifetime::Timeframe, + metadata}; } template @@ -632,11 +632,10 @@ struct TableTransform { std::vector requiredInputs = getRequiredInputs(); static std::vector getRequiredInputs(header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"}) { - return [&newOrigin](std::index_sequence){ - return std::vector{soa::tableRef2InputSpec(newOrigin)...}; - }(std::make_index_sequence()); + return [&newOrigin](std::index_sequence) { + return std::vector{soa::tableRef2InputSpec(newOrigin)...}; + }(std::make_index_sequence()); } - }; /// This helper struct allows you to declare extended tables which should be @@ -654,7 +653,6 @@ constexpr auto transformBase() return TableTransform>::ref>{}; } - /// In a multi-origin case the origin is provided by the type /// FIXME: In a rewritten origin case the output designation needs to be changed (through base class) /// The extraction of the elements needs to be changed in AnalysisManagers using the origin information from the base class @@ -834,7 +832,6 @@ concept is_builds = requires(T t) { requires std::same_as>; }; - /// a task with rewritten origin, if running together with a task with the default, will /// have a different name and thus its output would be routed separately diff --git a/Framework/Core/include/Framework/AnalysisManagers.h b/Framework/Core/include/Framework/AnalysisManagers.h index 68c2a7c20eb1e..e896624d8b7a7 100644 --- a/Framework/Core/include/Framework/AnalysisManagers.h +++ b/Framework/Core/include/Framework/AnalysisManagers.h @@ -193,7 +193,7 @@ bool updateOutputSpec(T& entity, header::DataOrigin newOrigin = header::DataOrig template bool updateOutputSpec(T& producesGroup, header::DataOrigin newOrigin = header::DataOrigin{"AOD"}) { - homogeneous_apply_refs([&newOrigin](auto& produces){ return updateOutputSpec(produces, newOrigin); }, producesGroup); + homogeneous_apply_refs([&newOrigin](auto& produces) { return updateOutputSpec(produces, newOrigin); }, producesGroup); } template @@ -312,7 +312,7 @@ template bool prepareOutput(ProcessingContext& context, T& spawns) { using metadata = o2::aod::MetadataTrait>::metadata; - auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), spawns.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) ); + auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), spawns.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); }))); if (originalTable->num_rows() == 0) { originalTable = makeEmptyTable("EMPTY", typename metadata::base_table_t::persistent_columns_t{}); } @@ -330,7 +330,7 @@ bool prepareOutput(ProcessingContext& context, T& spawns) template bool prepareOutput(ProcessingContext& context, T& builds) { - return builds.build(framework::extractTablesFromRecord(context.inputs(), builds.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) )); + return builds.build(framework::extractTablesFromRecord(context.inputs(), builds.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); }))); } template @@ -338,7 +338,7 @@ bool prepareOutput(ProcessingContext& context, T& defines) requires(T::delayed == false) { using metadata = o2::aod::MetadataTrait>::metadata; - auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) ); + auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); }))); if (originalTable->num_rows() == 0) { originalTable = makeEmptyTable("EMPTY", typename metadata::base_table_t::persistent_columns_t{}); } @@ -370,7 +370,7 @@ bool prepareDelayedOutput(ProcessingContext& context, T& defines) defines.recompile(); } using metadata = o2::aod::MetadataTrait>::metadata; - auto originalTable = soa::ArrowHelpers::joinTables( framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input){ return DataSpecUtils::asConcreteDataMatcher(input); }) ) ); + auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); }))); if (originalTable->num_rows() == 0) { originalTable = makeEmptyTable(); } @@ -622,7 +622,7 @@ bool replaceOrigin(T& preslice, header::DataOrigin const& newOrigin = header::Da template bool replaceOrigin(T& presliceGroup, header::DataOrigin const& newOrigin) { - homogeneous_apply_refs([&newOrigin](auto& preslice){ return replaceOrigin(preslice, newOrigin); }, presliceGroup); + homogeneous_apply_refs([&newOrigin](auto& preslice) { return replaceOrigin(preslice, newOrigin); }, presliceGroup); return true; } diff --git a/Framework/Core/include/Framework/AnalysisTask.h b/Framework/Core/include/Framework/AnalysisTask.h index bdff4a3c0cc25..25f6187e9426e 100644 --- a/Framework/Core/include/Framework/AnalysisTask.h +++ b/Framework/Core/include/Framework/AnalysisTask.h @@ -276,8 +276,7 @@ struct AnalysisDataProcessorBuilder { template static auto extract(InputRecord& record, std::vector iInfos, std::vector& infos, size_t phash) { - auto matchers = std::ranges::find_if(iInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers - | std::views::filter([](auto const& pair) { return pair.first == AI; }); + auto matchers = std::ranges::find_if(iInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers | std::views::filter([](auto const& pair) { return pair.first == AI; }); if constexpr (soa::is_filtered) { return extractFilteredFromRecord(record, matchers, *std::ranges::find_if(infos, [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); })); } else { @@ -562,7 +561,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) } // update OutputSpecs in output declarations - homogeneous_apply_refs_sized([&newOrigin](auto& element){ return analysis_task_parsers::updateOutputSpec(element, newOrigin); }, *task.get()); + homogeneous_apply_refs_sized([&newOrigin](auto& element) { return analysis_task_parsers::updateOutputSpec(element, newOrigin); }, *task.get()); // append outputs homogeneous_apply_refs_sized([&outputs, &hash](auto& element) { return analysis_task_parsers::appendOutput(outputs, element, hash); }, *task.get()); @@ -574,7 +573,7 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) homogeneous_apply_refs_sized([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get()); // replace origins in Preslice declarations - homogeneous_apply_refs_sized([&newOrigin](auto& element){ return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get()); + homogeneous_apply_refs_sized([&newOrigin](auto& element) { return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get()); auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable { Cache bindingsKeys; @@ -623,13 +622,13 @@ DataProcessorSpec adaptAnalysisTask(ConfigContext const& ctx, Args&&... args) *task.get()); /// replace origin in slicing caches - std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry){ + std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) { if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { entry.matcher = replaceOrigin(entry.matcher, newOrigin); } return entry; }); - std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry){ + std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) { if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) { entry.matcher = replaceOrigin(entry.matcher, newOrigin); } diff --git a/Framework/Core/src/AnalysisDataModelHelpers.cxx b/Framework/Core/src/AnalysisDataModelHelpers.cxx index e3e9929e8f473..b683ce026dba4 100644 --- a/Framework/Core/src/AnalysisDataModelHelpers.cxx +++ b/Framework/Core/src/AnalysisDataModelHelpers.cxx @@ -36,8 +36,8 @@ std::string getTreeName(header::DataHeader dh, bool wasAOD) } // add prefix according to origin - if (wasAOD || std::ranges::any_of(framework::AODOrigins, [&o = dh.dataOrigin](header::DataOrigin const& origin){ return o == origin; })) { - treeName = "O2" + treeName; + if (wasAOD || std::ranges::any_of(framework::AODOrigins, [&o = dh.dataOrigin](header::DataOrigin const& origin) { return o == origin; })) { + treeName = "O2" + treeName; } // exceptions from this diff --git a/Framework/Core/src/AnalysisSupportHelpers.cxx b/Framework/Core/src/AnalysisSupportHelpers.cxx index d72bebdb3f63a..c99110353de21 100644 --- a/Framework/Core/src/AnalysisSupportHelpers.cxx +++ b/Framework/Core/src/AnalysisSupportHelpers.cxx @@ -174,8 +174,8 @@ void AnalysisSupportHelpers::addMissingOutputsToBuilder(std::vector c additionalInputs | sinks::update_input_list{publisher.inputs}; // update publisher inputs // FIXME: until we have a single list of pairs additionalInputs | - std::ranges::views::filter([](InputSpec const& input){ - return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){ + std::ranges::views::filter([](InputSpec const& input) { + return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p) { return p.name.starts_with("aod-origin-replaced"); }); }) | diff --git a/Framework/Core/src/ArrowSupport.cxx b/Framework/Core/src/ArrowSupport.cxx index 8232dc43222c4..0a5b168f893b6 100644 --- a/Framework/Core/src/ArrowSupport.cxx +++ b/Framework/Core/src/ArrowSupport.cxx @@ -711,8 +711,8 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec() // update currently requested AODs for (auto& d : workflow) { d.inputs | - std::ranges::views::filter([](InputSpec const& input){ - return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p){ + std::ranges::views::filter([](InputSpec const& input) { + return DataSpecUtils::partialMatch(input, AODOrigins) || std::ranges::any_of(input.metadata, [](ConfigParamSpec const& p) { return p.name.starts_with("aod-origin-replaced"); }); }) |