Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Framework/Core/include/Framework/CommonLabels.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ namespace o2::framework
// When present on a DataProcessor, no DomainInfoHeader messages will be sent downstream.
const extern DataProcessorLabel suppressDomainInfoLabel;

// Label to allow multiple DataProcessorSpecs with the same name in the topology.
// When present, duplicate specs with matching inputs and outputs will be deduplicated
// with a warning instead of causing a fatal error.
const extern DataProcessorLabel allowDuplicatesLabel;

} // namespace o2::framework

#endif // O2_FRAMEWORK_COMMONLABELS_H
1 change: 1 addition & 0 deletions Framework/Core/src/CommonLabels.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ namespace o2::framework
{

const DataProcessorLabel suppressDomainInfoLabel = {"suppress-domain-info"};
const DataProcessorLabel allowDuplicatesLabel = {"allow-duplicates"};

} // namespace o2::framework
37 changes: 32 additions & 5 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
#include "WorkflowHelpers.h"
#include "Framework/AnalysisSupportHelpers.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/CommonLabels.h"
#include "Framework/ConfigParamSpec.h"
#include "Framework/ConfigParamsHelper.h"
#include "Framework/CommonDataProcessors.h"
#include "Framework/ConfigContext.h"
#include "Framework/DataProcessorSpecHelpers.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DataSpecUtils.h"
#include "Framework/DataSpecViews.h"
Expand All @@ -32,6 +34,7 @@
#include "Headers/DataHeader.h"
#include <algorithm>
#include <list>
#include <map>
#include <set>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -959,7 +962,7 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl
if (workflow.empty()) {
return WorkflowParsingState::Empty;
}
std::set<std::string> validNames;
std::map<std::string, size_t> validNames;
// std::vector<OutputSpec> availableOutputs;
// std::vector<InputSpec> requiredInputs;

Expand All @@ -971,17 +974,25 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl

std::ostringstream ss;

for (auto& spec : workflow) {
for (size_t si = 0; si < workflow.size(); ++si) {
auto& spec = workflow[si];
if (spec.name.empty()) {
throw std::runtime_error("Invalid DataProcessorSpec name");
}
if (strpbrk(spec.name.data(), ",;:\"'$") != nullptr) {
throw std::runtime_error("Cannot use any of ,;:\"'$ as DataProcessor name");
}
if (validNames.find(spec.name) != validNames.end()) {
throw std::runtime_error("Name " + spec.name + " is used twice.");
auto it = validNames.find(spec.name);
if (it != validNames.end()) {
auto& firstSpec = workflow[it->second];
if (!DataProcessorSpecHelpers::hasLabel(firstSpec, allowDuplicatesLabel.value.c_str()) ||
!DataProcessorSpecHelpers::hasLabel(spec, allowDuplicatesLabel.value.c_str())) {
throw std::runtime_error("Name " + spec.name + " is used twice.");
}
LOG(warning) << "Duplicate DataProcessorSpec " << spec.name << " found with allow-duplicates label. Will be deduplicated.";
continue;
}
validNames.insert(spec.name);
validNames.emplace(spec.name, si);
for (auto& option : spec.options) {
if (option.defaultValue.type() != VariantType::Empty &&
option.type != option.defaultValue.type()) {
Expand All @@ -1005,6 +1016,22 @@ WorkflowParsingState WorkflowHelpers::verifyWorkflow(const o2::framework::Workfl
return WorkflowParsingState::Valid;
}

void WorkflowHelpers::removeDuplicates(WorkflowSpec& workflow)
{
std::set<std::string> seen;
auto it = std::remove_if(workflow.begin(), workflow.end(), [&seen](DataProcessorSpec const& spec) {
if (seen.find(spec.name) == seen.end()) {
seen.insert(spec.name);
return false;
}
if (!DataProcessorSpecHelpers::hasLabel(spec, allowDuplicatesLabel.value.c_str())) {
return false;
}
return true;
});
workflow.erase(it, workflow.end());
}

using UnifiedDataSpecType = std::variant<InputSpec, OutputSpec>;
struct DataMatcherId {
size_t workflowId;
Expand Down
4 changes: 4 additions & 0 deletions Framework/Core/src/WorkflowHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ struct WorkflowHelpers {
// it contains no empty labels.
[[nodiscard]] static WorkflowParsingState verifyWorkflow(const WorkflowSpec& workflow);

// Remove duplicate DataProcessorSpecs that have the "allow-duplicates" label.
// Duplicate specs must have the same inputs and outputs, otherwise an exception is thrown.
static void removeDuplicates(WorkflowSpec& workflow);

// Depending on the workflow and the dangling inputs inside it, inject "fake"
// devices to mark the fact we might need some extra action to make sure
// dangling inputs are satisfied.
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
/// extract and apply process switches
/// prune device inputs
auto altered_workflow = workflow;
WorkflowHelpers::removeDuplicates(altered_workflow);

auto confNameFromParam = [](std::string const& paramName) {
std::regex name_regex(R"(^control:([\w-]+)\/(\w+))");
Expand Down
25 changes: 25 additions & 0 deletions Framework/Core/test/test_WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// or submit itself to any jurisdiction.
#include "Mocking.h"
#include "test_HelperMacros.h"
#include "Framework/CommonLabels.h"
#include "Framework/ConfigContext.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/DataSpecUtils.h"
Expand Down Expand Up @@ -60,6 +61,30 @@ TEST_CASE("TestVerifyWorkflow")
checkOk(WorkflowSpec{{"A", {InputSpec{"x", "TST", "A"}}}});
// Check for duplicate DataProcessorSpecs names
checkNotOk(WorkflowSpec{{"A"}, {"A"}});
// Duplicates with allow-duplicates label should not throw
checkOk(WorkflowSpec{
{.name = "A", .labels = {allowDuplicatesLabel}},
{.name = "A", .labels = {allowDuplicatesLabel}},
});
// Duplicates without the label should still throw
checkNotOk(WorkflowSpec{
{.name = "A"},
{.name = "A", .labels = {allowDuplicatesLabel}},
});
}

TEST_CASE("TestRemoveDuplicates")
{
// removeDuplicates should keep only the first spec with a given name
WorkflowSpec workflow{
{.name = "A", .labels = {allowDuplicatesLabel}},
{.name = "B"},
{.name = "A", .labels = {allowDuplicatesLabel}},
};
WorkflowHelpers::removeDuplicates(workflow);
REQUIRE(workflow.size() == 2);
REQUIRE(workflow[0].name == "A");
REQUIRE(workflow[1].name == "B");
}

TEST_CASE("TestWorkflowHelpers")
Expand Down