diff --git a/control-operator/pkg/client/client.go b/control-operator/pkg/client/client.go new file mode 100644 index 00000000..059a37fd --- /dev/null +++ b/control-operator/pkg/client/client.go @@ -0,0 +1,133 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2026 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package client + +import ( + "context" + "fmt" + + "github.com/AliceO2Group/Control/operator/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + crClient "sigs.k8s.io/controller-runtime/pkg/client" +) + +type Client struct { + client crClient.WithWatch + namespace string +} + +func New(kubeconfigPath, namespace string) (*Client, error) { + config, err := buildConfig(kubeconfigPath) + if err != nil { + return nil, fmt.Errorf("building kubeconfig: %w", err) + } + return NewFromConfig(config, namespace) +} + +func NewFromConfig(config *rest.Config, namespace string) (*Client, error) { + scheme := runtime.NewScheme() + if err := v1alpha1.AddToScheme(scheme); err != nil { + return nil, fmt.Errorf("registering v1alpha1 scheme: %w", err) + } + + c, err := crClient.NewWithWatch(config, crClient.Options{Scheme: scheme}) + if err != nil { + return nil, fmt.Errorf("creating kubernetes client: %w", err) + } + + return &Client{client: c, namespace: namespace}, nil +} + +func buildConfig(kubeconfigPath string) (*rest.Config, error) { + if kubeconfigPath != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfigPath) + } + if config, err := rest.InClusterConfig(); err == nil { + return config, nil + } + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil).ClientConfig() +} + +func (c *Client) CreateTask(ctx context.Context, task *v1alpha1.Task) error { + task.Namespace = c.namespace + return c.client.Create(ctx, task) +} + +func (c *Client) GetTask(ctx context.Context, name string) (*v1alpha1.Task, error) { + task := &v1alpha1.Task{} + err := c.client.Get(ctx, types.NamespacedName{Name: name, Namespace: c.namespace}, task) + return task, err +} + +func (c *Client) UpdateTask(ctx context.Context, task *v1alpha1.Task) error { + task.Namespace = c.namespace + return c.client.Update(ctx, task) +} + +func (c *Client) DeleteTask(ctx context.Context, name string) error { + return c.client.Delete(ctx, &v1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: c.namespace}, + }) +} + +// WatchTasks returns a watcher for all Task resources in the namespace. +// Each event on ResultChan() carries a *v1alpha1.Task as event.Object. +func (c *Client) WatchTasks(ctx context.Context) (watch.Interface, error) { + return c.client.Watch(ctx, &v1alpha1.TaskList{}, crClient.InNamespace(c.namespace)) +} + +func (c *Client) CreateEnvironment(ctx context.Context, env *v1alpha1.Environment) error { + env.Namespace = c.namespace + return c.client.Create(ctx, env) +} + +func (c *Client) GetEnvironment(ctx context.Context, name string) (*v1alpha1.Environment, error) { + env := &v1alpha1.Environment{} + err := c.client.Get(ctx, types.NamespacedName{Name: name, Namespace: c.namespace}, env) + return env, err +} + +func (c *Client) UpdateEnvironment(ctx context.Context, env *v1alpha1.Environment) error { + env.Namespace = c.namespace + return c.client.Update(ctx, env) +} + +func (c *Client) DeleteEnvironment(ctx context.Context, name string) error { + return c.client.Delete(ctx, &v1alpha1.Environment{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: c.namespace}, + }) +} + +// WatchEnvironments returns a watcher for all Environment resources in the namespace. +// Each event on ResultChan() carries a *v1alpha1.Environment as event.Object. +func (c *Client) WatchEnvironments(ctx context.Context) (watch.Interface, error) { + return c.client.Watch(ctx, &v1alpha1.EnvironmentList{}, crClient.InNamespace(c.namespace)) +} diff --git a/control-operator/pkg/client/client_test.go b/control-operator/pkg/client/client_test.go new file mode 100644 index 00000000..e63255b9 --- /dev/null +++ b/control-operator/pkg/client/client_test.go @@ -0,0 +1,188 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2026 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package client_test + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8swatch "k8s.io/apimachinery/pkg/watch" + + aliecsv1alpha1 "github.com/AliceO2Group/Control/operator/api/v1alpha1" + "github.com/AliceO2Group/Control/operator/pkg/client" +) + +var _ = Describe("Client", func() { + var ( + ctx context.Context + c *client.Client + ) + + BeforeEach(func() { + ctx = context.Background() + var err error + c, err = client.NewFromConfig(cfg, "default") + Expect(err).NotTo(HaveOccurred()) + }) + + Describe("Task Create, Read, Update, Delete", func() { + var task *aliecsv1alpha1.Task + + BeforeEach(func() { + task = &aliecsv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{Name: "test-task"}, + Spec: aliecsv1alpha1.TaskSpec{State: "standby", Pod: v1.PodSpec{Containers: []v1.Container{}}}, + } + Expect(c.CreateTask(ctx, task)).To(Succeed()) + }) + + AfterEach(func() { + _ = c.DeleteTask(ctx, task.Name) + }) + + It("gets a created task", func() { + got, err := c.GetTask(ctx, "test-task") + Expect(err).NotTo(HaveOccurred()) + Expect(got.Name).To(Equal("test-task")) + Expect(got.Spec.State).To(Equal("standby")) + }) + + It("updates a task", func() { + got, err := c.GetTask(ctx, "test-task") + Expect(err).NotTo(HaveOccurred()) + + got.Spec.State = "running" + Expect(c.UpdateTask(ctx, got)).To(Succeed()) + + updated, err := c.GetTask(ctx, "test-task") + Expect(err).NotTo(HaveOccurred()) + Expect(updated.Spec.State).To(Equal("running")) + }) + + It("deletes a task", func() { + Expect(c.DeleteTask(ctx, "test-task")).To(Succeed()) + + _, err := c.GetTask(ctx, "test-task") + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("Task Watch", func() { + It("receives events for task changes", func() { + watcher, err := c.WatchTasks(ctx) + Expect(err).NotTo(HaveOccurred()) + defer watcher.Stop() + + task := &aliecsv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{Name: "watch-task"}, + Spec: aliecsv1alpha1.TaskSpec{State: "standby", Pod: v1.PodSpec{Containers: []v1.Container{}}}, + } + Expect(c.CreateTask(ctx, task)).To(Succeed()) + defer c.DeleteTask(ctx, task.Name) + + Eventually(watcher.ResultChan()).Should(Receive(Satisfy(func(e k8swatch.Event) bool { + t, ok := e.Object.(*aliecsv1alpha1.Task) + return ok && t.Name == "watch-task" && e.Type == k8swatch.Added + }))) + }) + }) + + Describe("Environment Create, Read, Update, Delete", func() { + var env *aliecsv1alpha1.Environment + + BeforeEach(func() { + env = &aliecsv1alpha1.Environment{ + ObjectMeta: metav1.ObjectMeta{Name: "test-env"}, + Spec: aliecsv1alpha1.EnvironmentSpec{ + State: "standby", + Tasks: map[string][]aliecsv1alpha1.TaskDefinition{}, + }, + TaskTemplates: aliecsv1alpha1.TemplateSpecification{ + Tasks: map[string][]aliecsv1alpha1.TaskReference{}, + }, + } + Expect(c.CreateEnvironment(ctx, env)).To(Succeed()) + }) + + AfterEach(func() { + _ = c.DeleteEnvironment(ctx, env.Name) + }) + + It("gets a created environment", func() { + got, err := c.GetEnvironment(ctx, "test-env") + Expect(err).NotTo(HaveOccurred()) + Expect(got.Name).To(Equal("test-env")) + Expect(got.Spec.State).To(Equal("standby")) + }) + + It("updates an environment", func() { + got, err := c.GetEnvironment(ctx, "test-env") + Expect(err).NotTo(HaveOccurred()) + + got.Spec.State = "running" + Expect(c.UpdateEnvironment(ctx, got)).To(Succeed()) + + updated, err := c.GetEnvironment(ctx, "test-env") + Expect(err).NotTo(HaveOccurred()) + Expect(updated.Spec.State).To(Equal("running")) + }) + + It("deletes an environment", func() { + Expect(c.DeleteEnvironment(ctx, "test-env")).To(Succeed()) + + _, err := c.GetEnvironment(ctx, "test-env") + Expect(err).To(HaveOccurred()) + }) + }) + + Describe("Environment Watch", func() { + It("receives events for environment changes", func() { + watcher, err := c.WatchEnvironments(ctx) + Expect(err).NotTo(HaveOccurred()) + defer watcher.Stop() + + env := &aliecsv1alpha1.Environment{ + ObjectMeta: metav1.ObjectMeta{Name: "watch-env"}, + Spec: aliecsv1alpha1.EnvironmentSpec{ + State: "standby", + Tasks: map[string][]aliecsv1alpha1.TaskDefinition{}, + }, + TaskTemplates: aliecsv1alpha1.TemplateSpecification{ + Tasks: map[string][]aliecsv1alpha1.TaskReference{}, + }, + } + Expect(c.CreateEnvironment(ctx, env)).To(Succeed()) + defer c.DeleteEnvironment(ctx, env.Name) + + Eventually(watcher.ResultChan()).Should(Receive(Satisfy(func(e k8swatch.Event) bool { + ev, ok := e.Object.(*aliecsv1alpha1.Environment) + return ok && ev.Name == "watch-env" && e.Type == k8swatch.Added + }))) + }) + }) +}) diff --git a/control-operator/pkg/client/suite_test.go b/control-operator/pkg/client/suite_test.go new file mode 100644 index 00000000..cd571f8b --- /dev/null +++ b/control-operator/pkg/client/suite_test.go @@ -0,0 +1,68 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2026 CERN and copyright holders of ALICE O². + * Author: Michal Tichak + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +package client_test + +import ( + "path/filepath" + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +var ( + cfg *rest.Config + testEnv *envtest.Environment +) + +func TestClient(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Client Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("bootstrapping test environment") + testEnv = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")}, + ErrorIfCRDPathMissing: true, + } + + var err error + cfg, err = testEnv.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(cfg).NotTo(BeNil()) +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + Expect(testEnv.Stop()).To(Succeed()) +})