diff --git a/Framework/Core/include/Framework/CommonLabels.h b/Framework/Core/include/Framework/CommonLabels.h index 8be41a33af41d..17b1c85bf6fe4 100644 --- a/Framework/Core/include/Framework/CommonLabels.h +++ b/Framework/Core/include/Framework/CommonLabels.h @@ -21,6 +21,11 @@ namespace o2::framework // When present on a DataProcessor, no DomainInfoHeader messages will be sent downstream. const extern DataProcessorLabel suppressDomainInfoLabel; +// Label to allow multiple DataProcessorSpecs with the same name in the topology. +// When present, duplicate specs with matching inputs and outputs will be deduplicated +// with a warning instead of causing a fatal error. +const extern DataProcessorLabel allowDuplicatesLabel; + } // namespace o2::framework #endif // O2_FRAMEWORK_COMMONLABELS_H diff --git a/Framework/Core/src/CommonLabels.cxx b/Framework/Core/src/CommonLabels.cxx index f728e194f611b..d660bdd84c6db 100644 --- a/Framework/Core/src/CommonLabels.cxx +++ b/Framework/Core/src/CommonLabels.cxx @@ -15,5 +15,6 @@ namespace o2::framework { const DataProcessorLabel suppressDomainInfoLabel = {"suppress-domain-info"}; +const DataProcessorLabel allowDuplicatesLabel = {"allow-duplicates"}; } // namespace o2::framework diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 188b6653c6a43..6d3630d6c0aa8 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -11,10 +11,12 @@ #include "WorkflowHelpers.h" #include "Framework/AnalysisSupportHelpers.h" #include "Framework/AlgorithmSpec.h" +#include "Framework/CommonLabels.h" #include "Framework/ConfigParamSpec.h" #include "Framework/ConfigParamsHelper.h" #include "Framework/CommonDataProcessors.h" #include "Framework/ConfigContext.h" +#include "Framework/DataProcessorSpecHelpers.h" #include "Framework/DeviceSpec.h" #include "Framework/DataSpecUtils.h" #include "Framework/DataSpecViews.h" @@ -32,6 +34,7 @@ #include "Headers/DataHeader.h" #include #include +#include #include #include #include @@ -959,7 +962,7 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl if (workflow.empty()) { return WorkflowParsingState::Empty; } - std::set validNames; + std::map validNames; // std::vector availableOutputs; // std::vector requiredInputs; @@ -971,17 +974,25 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl std::ostringstream ss; - for (auto& spec : workflow) { + for (size_t si = 0; si < workflow.size(); ++si) { + auto& spec = workflow[si]; if (spec.name.empty()) { throw std::runtime_error("Invalid DataProcessorSpec name"); } if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) { throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name"); } - if (validNames.find(spec.name) != validNames.end()) { - throw std::runtime_error("Name " + spec.name + " is used twice."); + auto it = validNames.find(spec.name); + if (it != validNames.end()) { + auto& firstSpec = workflow[it->second]; + if (!DataProcessorSpecHelpers::hasLabel(firstSpec, allowDuplicatesLabel.value.c_str()) || + !DataProcessorSpecHelpers::hasLabel(spec, allowDuplicatesLabel.value.c_str())) { + throw std::runtime_error("Name " + spec.name + " is used twice."); + } + LOG(warning) << "Duplicate DataProcessorSpec " << spec.name << " found with allow-duplicates label. Will be deduplicated."; + continue; } - validNames.insert(spec.name); + validNames.emplace(spec.name, si); for (auto& option : spec.options) { if (option.defaultValue.type() != VariantType::Empty && option.type != option.defaultValue.type()) { @@ -1005,6 +1016,22 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl return WorkflowParsingState::Valid; } +void WorkflowHelpers::removeDuplicates(WorkflowSpec& workflow) +{ + std::set seen; + auto it = std::remove_if(workflow.begin(), workflow.end(), [&seen](DataProcessorSpec const& spec) { + if (seen.find(spec.name) == seen.end()) { + seen.insert(spec.name); + return false; + } + if (!DataProcessorSpecHelpers::hasLabel(spec, allowDuplicatesLabel.value.c_str())) { + return false; + } + return true; + }); + workflow.erase(it, workflow.end()); +} + using UnifiedDataSpecType = std::variant; struct DataMatcherId { size_t workflowId; diff --git a/Framework/Core/src/WorkflowHelpers.h b/Framework/Core/src/WorkflowHelpers.h index 5c0aa363c6d67..c7a990cd2730c 100644 --- a/Framework/Core/src/WorkflowHelpers.h +++ b/Framework/Core/src/WorkflowHelpers.h @@ -175,6 +175,10 @@ struct WorkflowHelpers { // it contains no empty labels. [[nodiscard]] static WorkflowParsingState verifyWorkflow(const WorkflowSpec& workflow); + // Remove duplicate DataProcessorSpecs that have the "allow-duplicates" label. + // Duplicate specs must have the same inputs and outputs, otherwise an exception is thrown. + static void removeDuplicates(WorkflowSpec& workflow); + // Depending on the workflow and the dangling inputs inside it, inject "fake" // devices to mark the fact we might need some extra action to make sure // dangling inputs are satisfied. diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index c58f8e7287b3b..17be198db8a5e 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1663,6 +1663,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, /// extract and apply process switches /// prune device inputs auto altered_workflow = workflow; + WorkflowHelpers::removeDuplicates(altered_workflow); auto confNameFromParam = [](std::string const& paramName) { std::regex name_regex(R"(^control:([\w-]+)\/(\w+))"); diff --git a/Framework/Core/test/test_WorkflowHelpers.cxx b/Framework/Core/test/test_WorkflowHelpers.cxx index d43e74558c0bb..9e219fc0dfb83 100644 --- a/Framework/Core/test/test_WorkflowHelpers.cxx +++ b/Framework/Core/test/test_WorkflowHelpers.cxx @@ -10,6 +10,7 @@ // or submit itself to any jurisdiction. #include "Mocking.h" #include "test_HelperMacros.h" +#include "Framework/CommonLabels.h" #include "Framework/ConfigContext.h" #include "Framework/WorkflowSpec.h" #include "Framework/DataSpecUtils.h" @@ -60,6 +61,30 @@ TEST_CASE("TestVerifyWorkflow") checkOk(WorkflowSpec{{"A", {InputSpec{"x", "TST", "A"}}}}); // Check for duplicate DataProcessorSpecs names checkNotOk(WorkflowSpec{{"A"}, {"A"}}); + // Duplicates with allow-duplicates label should not throw + checkOk(WorkflowSpec{ + {.name = "A", .labels = {allowDuplicatesLabel}}, + {.name = "A", .labels = {allowDuplicatesLabel}}, + }); + // Duplicates without the label should still throw + checkNotOk(WorkflowSpec{ + {.name = "A"}, + {.name = "A", .labels = {allowDuplicatesLabel}}, + }); +} + +TEST_CASE("TestRemoveDuplicates") +{ + // removeDuplicates should keep only the first spec with a given name + WorkflowSpec workflow{ + {.name = "A", .labels = {allowDuplicatesLabel}}, + {.name = "B"}, + {.name = "A", .labels = {allowDuplicatesLabel}}, + }; + WorkflowHelpers::removeDuplicates(workflow); + REQUIRE(workflow.size() == 2); + REQUIRE(workflow[0].name == "A"); + REQUIRE(workflow[1].name == "B"); } TEST_CASE("TestWorkflowHelpers")