From ce24a750cb92c26db666c309e1e735b0ed26bd4b Mon Sep 17 00:00:00 2001 From: Jason Parraga Date: Thu, 16 Jan 2025 01:16:11 -0800 Subject: [PATCH 1/5] Remove unused ingress/service code (#4142) Signed-off-by: Jason Parraga Co-authored-by: Chris Martin --- .../executor/util/ingress_service_config.go | 86 ---- internal/executor/util/ingress_util.go | 117 ----- internal/executor/util/ingress_util_test.go | 464 ------------------ internal/executor/util/kubernetes_object.go | 135 ----- .../executor/util/kubernetes_objects_test.go | 341 ------------- 5 files changed, 1143 deletions(-) delete mode 100644 internal/executor/util/ingress_service_config.go delete mode 100644 internal/executor/util/ingress_util.go delete mode 100644 internal/executor/util/ingress_util_test.go diff --git a/internal/executor/util/ingress_service_config.go b/internal/executor/util/ingress_service_config.go deleted file mode 100644 index 2f72f7147d6..00000000000 --- a/internal/executor/util/ingress_service_config.go +++ /dev/null @@ -1,86 +0,0 @@ -package util - -import ( - "golang.org/x/exp/maps" - "golang.org/x/exp/slices" - - "github.com/armadaproject/armada/pkg/api" -) - -type IngressServiceType int - -const ( - Ingress IngressServiceType = iota - NodePort - Headless -) - -func (st IngressServiceType) String() string { - return []string{"Ingress", "NodePort", "Headless"}[st] -} - -type IngressServiceConfig struct { - Type IngressServiceType - Ports []uint32 - Annotations map[string]string - TlsEnabled bool - CertName string - UseClusterIp bool -} - -func deepCopy(config *IngressServiceConfig) *IngressServiceConfig { - return &IngressServiceConfig{ - Type: config.Type, - Ports: slices.Clone(config.Ports), - Annotations: maps.Clone(config.Annotations), - TlsEnabled: config.TlsEnabled, - CertName: config.CertName, - UseClusterIp: config.UseClusterIp, - } -} - -func CombineIngressService(ingresses []*api.IngressConfig, services []*api.ServiceConfig) []*IngressServiceConfig { - result := []*IngressServiceConfig{} - - for _, ing := range ingresses { - result = append( - result, - &IngressServiceConfig{ - Type: Ingress, - Ports: slices.Clone(ing.Ports), - Annotations: maps.Clone(ing.Annotations), - TlsEnabled: ing.TlsEnabled, - CertName: ing.CertName, - UseClusterIp: ing.UseClusterIP, - }, - ) - } - - for _, svc := range services { - svcType := NodePort - useClusterIP := true - if svc.Type == api.ServiceType_Headless { - svcType = Headless - useClusterIP = false - } - result = append( - result, - &IngressServiceConfig{ - Type: svcType, - Ports: slices.Clone(svc.Ports), - UseClusterIp: useClusterIP, - }, - ) - } - - return result -} - -func useClusterIP(configs []*IngressServiceConfig) bool { - for _, config := range configs { - if config.UseClusterIp { - return true - } - } - return false -} diff --git a/internal/executor/util/ingress_util.go b/internal/executor/util/ingress_util.go deleted file mode 100644 index aef695d96e6..00000000000 --- a/internal/executor/util/ingress_util.go +++ /dev/null @@ -1,117 +0,0 @@ -package util - -import ( - "fmt" - "strings" - - v1 "k8s.io/api/core/v1" - networking "k8s.io/api/networking/v1" - - "github.com/armadaproject/armada/internal/common/util" - "github.com/armadaproject/armada/internal/executor/configuration" - "github.com/armadaproject/armada/pkg/api" -) - -func GenerateIngresses(job *api.Job, pod *v1.Pod, ingressConfig *configuration.IngressConfiguration) ([]*v1.Service, []*networking.Ingress) { - services := []*v1.Service{} - ingresses := []*networking.Ingress{} - ingressToGen := CombineIngressService(job.Ingress, job.Services) - groupedIngressConfigs := groupIngressConfig(ingressToGen) - for svcType, configs := range groupedIngressConfigs { - if len(GetServicePorts(configs, &pod.Spec)) > 0 { - service := CreateService(job, pod, GetServicePorts(configs, &pod.Spec), svcType, useClusterIP(configs)) - services = append(services, service) - - if svcType == Ingress { - for index, config := range configs { - if len(GetServicePorts([]*IngressServiceConfig{config}, &pod.Spec)) <= 0 { - continue - } - // TODO: This results in an invalid name (one starting with "-") if pod.Name is the empty string; - // we should return an error if that's the case. - ingressName := fmt.Sprintf("%s-%s-%d", pod.Name, strings.ToLower(svcType.String()), index) - ingress := CreateIngress(ingressName, job, pod, service, ingressConfig, config) - ingresses = append(ingresses, ingress) - } - } - } - } - - return services, ingresses -} - -func groupIngressConfig(configs []*IngressServiceConfig) map[IngressServiceType][]*IngressServiceConfig { - result := gatherIngressConfig(configs) - - for ingressType, grp := range result { - result[ingressType] = mergeOnAnnotations(grp) - } - - return result -} - -// gatherIngressConfig takes a list of ingress configs and groups them by IngressServiceType -func gatherIngressConfig(configs []*IngressServiceConfig) map[IngressServiceType][]*IngressServiceConfig { - result := make(map[IngressServiceType][]*IngressServiceConfig, 10) - - for _, config := range configs { - result[config.Type] = append(result[config.Type], deepCopy(config)) - } - - return result -} - -func mergeOnAnnotations(configs []*IngressServiceConfig) []*IngressServiceConfig { - result := make([]*IngressServiceConfig, 0, len(configs)) - - for _, config := range configs { - matchFound := false - - for _, existingConfig := range result { - if util.Equal(config.Annotations, existingConfig.Annotations) { - existingConfig.Ports = append(existingConfig.Ports, config.Ports...) - matchFound = true - } - } - if !matchFound { - result = append(result, deepCopy(config)) - } - } - - return result -} - -func GetServicePorts(svcConfigs []*IngressServiceConfig, podSpec *v1.PodSpec) []v1.ServicePort { - var servicePorts []v1.ServicePort - - for _, container := range podSpec.Containers { - ports := container.Ports - for _, svcConfig := range svcConfigs { - for _, port := range ports { - // Don't expose host via service, this will already be handled by kubernetes - if port.HostPort > 0 { - continue - } - if contains(svcConfig, uint32(port.ContainerPort)) { - servicePort := v1.ServicePort{ - Name: fmt.Sprintf("%s-%d", container.Name, port.ContainerPort), - Port: port.ContainerPort, - Protocol: port.Protocol, - } - servicePorts = append(servicePorts, servicePort) - } - } - } - } - - return servicePorts -} - -func contains(portConfig *IngressServiceConfig, port uint32) bool { - for _, p := range portConfig.Ports { - if p == port { - return true - } - } - return false -} diff --git a/internal/executor/util/ingress_util_test.go b/internal/executor/util/ingress_util_test.go deleted file mode 100644 index 3b3ee63bd33..00000000000 --- a/internal/executor/util/ingress_util_test.go +++ /dev/null @@ -1,464 +0,0 @@ -package util - -import ( - "testing" - - "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - - "github.com/armadaproject/armada/pkg/api" -) - -func TestDeepCopy(t *testing.T) { - input := &IngressServiceConfig{ - Type: NodePort, - Ports: []uint32{1, 2, 3}, - Annotations: map[string]string{ - "a": "value", - "b": "value2", - }, - } - result := deepCopy(input) - assert.Equal(t, input, result) - - result.Annotations["c"] = "value3" - assert.NotEqual(t, input, result) - - result = deepCopy(input) - result.Ports = append(result.Ports, 4) - assert.NotEqual(t, input, result) -} - -func TestGetServicePorts(t *testing.T) { - config := &IngressServiceConfig{ - Ports: []uint32{1, 2, 3}, - } - podSpec := &v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "a", - Ports: []v1.ContainerPort{ - { - ContainerPort: 1, - Protocol: v1.ProtocolTCP, - }, - { - ContainerPort: 2, - Protocol: v1.ProtocolUDP, - }, - }, - }, - }, - } - expected := []v1.ServicePort{ - { - Name: "a-1", - Protocol: v1.ProtocolTCP, - Port: 1, - }, - { - Name: "a-2", - Protocol: v1.ProtocolUDP, - Port: 2, - }, - } - - assert.Equal(t, GetServicePorts([]*IngressServiceConfig{config}, podSpec), expected) -} - -func TestGetServicePorts_MultipleContainer(t *testing.T) { - config := &IngressServiceConfig{ - Ports: []uint32{1, 2, 3}, - } - podSpec := &v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "a", - Ports: []v1.ContainerPort{ - { - ContainerPort: 1, - Protocol: v1.ProtocolTCP, - }, - }, - }, - { - Name: "b", - Ports: []v1.ContainerPort{ - { - ContainerPort: 2, - Protocol: v1.ProtocolUDP, - }, - }, - }, - }, - } - expected := []v1.ServicePort{ - { - Name: "a-1", - Protocol: v1.ProtocolTCP, - Port: 1, - }, - { - Name: "b-2", - Protocol: v1.ProtocolUDP, - Port: 2, - }, - } - - assert.Equal(t, GetServicePorts([]*IngressServiceConfig{config}, podSpec), expected) -} - -func TestGetServicePorts_MultipleIngressConfigs(t *testing.T) { - config1 := &IngressServiceConfig{ - Ports: []uint32{1}, - } - config2 := &IngressServiceConfig{ - Ports: []uint32{2}, - } - config3 := &IngressServiceConfig{ - Ports: []uint32{3}, - } - podSpec := &v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "a", - Ports: []v1.ContainerPort{ - { - ContainerPort: 1, - Protocol: v1.ProtocolTCP, - }, - { - ContainerPort: 2, - Protocol: v1.ProtocolUDP, - }, - }, - }, - }, - } - expected := []v1.ServicePort{ - { - Name: "a-1", - Protocol: v1.ProtocolTCP, - Port: 1, - }, - { - Name: "a-2", - Protocol: v1.ProtocolUDP, - Port: 2, - }, - } - servicePorts := GetServicePorts([]*IngressServiceConfig{config1, config2, config3}, podSpec) - assert.Equal(t, servicePorts, expected) -} - -func TestGetServicePorts_HostPortSkipped(t *testing.T) { - config := &IngressServiceConfig{ - Ports: []uint32{1, 2, 3}, - } - podSpec := &v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "a", - Ports: []v1.ContainerPort{ - { - ContainerPort: 1, - HostPort: 100, - Protocol: v1.ProtocolTCP, - }, - { - ContainerPort: 2, - Protocol: v1.ProtocolUDP, - }, - }, - }, - }, - } - expected := []v1.ServicePort{ - { - Name: "a-2", - Protocol: v1.ProtocolUDP, - Port: 2, - }, - } - - assert.Equal(t, GetServicePorts([]*IngressServiceConfig{config}, podSpec), expected) -} - -func TestGroupIngressConfig_IngressTypeNodePort_AlwaysGrouped(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - NodePort: { - { - Type: NodePort, - Ports: []uint32{1, 2, 3}, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: NodePort, - Ports: []uint32{1, 2}, - } - input2 := &IngressServiceConfig{ - Type: NodePort, - Ports: []uint32{3}, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input1, input2}) - assert.Equal(t, groupedConfig, expected) - - // Non ingress type will never have annotations anymore - assert.Equal(t, groupIngressConfig([]*IngressServiceConfig{input1, input2}), expected) -} - -func TestGroupIngressConfig_IngressType_NoAnnotations(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1, 2, 3}, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{1, 2}, - } - input2 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{3}, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input1, input2}) - assert.Equal(t, groupedConfig, expected) -} - -func TestGroupIngressConfig_IngressType_SameAnnotations(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1, 2, 3}, - Annotations: map[string]string{ - "test": "value", - }, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - } - input2 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value", - }, - } - assert.Equal(t, groupIngressConfig([]*IngressServiceConfig{input1, input2}), expected) -} - -func TestGroupIngressConfig_IngressType_DifferentAnnotations(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - }, - { - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value2", - }, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - } - input2 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value2", - }, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input1, input2}) - assert.Equal(t, groupedConfig, expected) -} - -func TestGroupIngressConfig_MixedIngressType(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - }, - { - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value2", - }, - }, - }, - NodePort: { - { - Type: NodePort, - Ports: []uint32{4, 5}, - }, - }, - } - input1 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{1, 2}, - Annotations: map[string]string{ - "test": "value", - }, - } - input2 := &IngressServiceConfig{ - Type: Ingress, - Ports: []uint32{3}, - Annotations: map[string]string{ - "test": "value2", - }, - } - input3 := &IngressServiceConfig{ - Type: NodePort, - Ports: []uint32{4, 5}, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input1, input2, input3}) - assert.Equal(t, groupedConfig, expected) -} - -func TestGroupIngressConfig_IngressType_Headless(t *testing.T) { - expected := map[IngressServiceType][]*IngressServiceConfig{ - Headless: { - { - Type: Headless, - Ports: []uint32{1}, - }, - }, - } - input := &IngressServiceConfig{ - Type: Headless, - Ports: []uint32{1}, - } - groupedConfig := groupIngressConfig([]*IngressServiceConfig{input}) - assert.Equal(t, groupedConfig, expected) -} - -func TestGatherIngressConfigs(t *testing.T) { - inputConfigs := []*IngressServiceConfig{ - { - Type: Ingress, - Ports: []uint32{1}, - }, - { - Type: Ingress, - Ports: []uint32{2}, - }, - { - Type: Headless, - Ports: []uint32{1}, - }, - { - Type: NodePort, - Ports: []uint32{1}, - }, - { - Type: Headless, - Ports: []uint32{2}, - }, - } - - expected := map[IngressServiceType][]*IngressServiceConfig{ - Ingress: { - { - Type: Ingress, - Ports: []uint32{1}, - }, - { - Type: Ingress, - Ports: []uint32{2}, - }, - }, - NodePort: { - { - Type: NodePort, - Ports: []uint32{1}, - }, - }, - Headless: { - { - Type: Headless, - Ports: []uint32{1}, - }, - { - Type: Headless, - Ports: []uint32{2}, - }, - }, - } - - assert.Equal(t, gatherIngressConfig(inputConfigs), expected) -} - -func TestCombineIngressService(t *testing.T) { - ingress := []*api.IngressConfig{ - { - Ports: []uint32{1, 2, 3}, - Annotations: map[string]string{ - "Hello": "World", - }, - TlsEnabled: true, - UseClusterIP: false, - }, - } - - services := []*api.ServiceConfig{ - { - Type: api.ServiceType_Headless, - Ports: []uint32{4}, - }, - { - Type: api.ServiceType_NodePort, - Ports: []uint32{5}, - }, - } - - expected := []*IngressServiceConfig{ - { - Type: Ingress, - Ports: []uint32{1, 2, 3}, - Annotations: map[string]string{ - "Hello": "World", - }, - TlsEnabled: true, - UseClusterIp: false, - }, - { - Type: Headless, - Ports: []uint32{4}, - UseClusterIp: false, - }, - { - Type: NodePort, - Ports: []uint32{5}, - UseClusterIp: true, - }, - } - - assert.Equal(t, expected, CombineIngressService(ingress, services)) -} diff --git a/internal/executor/util/kubernetes_object.go b/internal/executor/util/kubernetes_object.go index 1b66502e1ab..ccfcf7de929 100644 --- a/internal/executor/util/kubernetes_object.go +++ b/internal/executor/util/kubernetes_object.go @@ -3,7 +3,6 @@ package util import ( "fmt" "strconv" - "strings" v1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" @@ -18,140 +17,6 @@ import ( "github.com/armadaproject/armada/pkg/executorapi" ) -func CreateService( - job *api.Job, - pod *v1.Pod, - ports []v1.ServicePort, - ingSvcType IngressServiceType, - useClusterIP bool, -) *v1.Service { - serviceType := v1.ServiceTypeClusterIP - if ingSvcType == NodePort { - serviceType = v1.ServiceTypeNodePort - } - - clusterIP := "" - if !useClusterIP { - clusterIP = "None" - } - - serviceSpec := v1.ServiceSpec{ - Type: serviceType, - Selector: map[string]string{ - domain.JobId: pod.Labels[domain.JobId], - domain.Queue: pod.Labels[domain.Queue], - domain.PodNumber: pod.Labels[domain.PodNumber], - }, - Ports: ports, - ClusterIP: clusterIP, - } - labels := util.MergeMaps(job.Labels, map[string]string{ - domain.JobId: pod.Labels[domain.JobId], - domain.Queue: pod.Labels[domain.Queue], - domain.PodNumber: pod.Labels[domain.PodNumber], - }) - annotation := util.MergeMaps(job.Annotations, map[string]string{ - domain.JobSetId: job.JobSetId, - domain.Owner: job.Owner, - }) - service := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", pod.Name, strings.ToLower(ingSvcType.String())), - Labels: labels, - Annotations: annotation, - Namespace: job.Namespace, - }, - Spec: serviceSpec, - } - return service -} - -func CreateIngress( - name string, - job *api.Job, - pod *v1.Pod, - service *v1.Service, - executorIngressConfig *configuration.IngressConfiguration, - jobConfig *IngressServiceConfig, -) *networking.Ingress { - labels := util.MergeMaps(job.Labels, map[string]string{ - domain.JobId: pod.Labels[domain.JobId], - domain.Queue: pod.Labels[domain.Queue], - domain.PodNumber: pod.Labels[domain.PodNumber], - }) - annotations := util.MergeMaps(job.Annotations, executorIngressConfig.Annotations) - annotations = util.MergeMaps(annotations, jobConfig.Annotations) - annotations = util.MergeMaps(annotations, map[string]string{ - domain.JobSetId: job.JobSetId, - domain.Owner: job.Owner, - }) - - rules := make([]networking.IngressRule, 0, len(service.Spec.Ports)) - tlsHosts := make([]string, 0, len(service.Spec.Ports)) - - // Rest of the hosts are generated off port information - for _, servicePort := range service.Spec.Ports { - if !contains(jobConfig, uint32(servicePort.Port)) { - continue - } - host := fmt.Sprintf("%s-%s.%s.%s", servicePort.Name, pod.Name, pod.Namespace, executorIngressConfig.HostnameSuffix) - tlsHosts = append(tlsHosts, host) - - // Workaround to get constant's address - pathType := networking.PathTypePrefix - path := networking.IngressRule{ - Host: host, - IngressRuleValue: networking.IngressRuleValue{ - HTTP: &networking.HTTPIngressRuleValue{ - Paths: []networking.HTTPIngressPath{ - { - Path: "/", - PathType: &pathType, - Backend: networking.IngressBackend{ - Service: &networking.IngressServiceBackend{ - Name: service.Name, - Port: networking.ServiceBackendPort{ - Number: servicePort.Port, - }, - }, - }, - }, - }, - }, - }, - } - rules = append(rules, path) - } - - tls := make([]networking.IngressTLS, 0, 1) - - if jobConfig.TlsEnabled { - certName := jobConfig.CertName - if certName == "" { - certName = fmt.Sprintf("%s-%s", job.Namespace, executorIngressConfig.CertNameSuffix) - } - - tls = append(tls, networking.IngressTLS{ - Hosts: tlsHosts, - SecretName: certName, - }) - } - - ingress := &networking.Ingress{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: labels, - Annotations: annotations, - Namespace: job.Namespace, - }, - Spec: networking.IngressSpec{ - Rules: rules, - TLS: tls, - }, - } - return ingress -} - func CreateOwnerReference(pod *v1.Pod) metav1.OwnerReference { return metav1.OwnerReference{ APIVersion: "v1", diff --git a/internal/executor/util/kubernetes_objects_test.go b/internal/executor/util/kubernetes_objects_test.go index 9ee9bca2f66..639aa843aab 100644 --- a/internal/executor/util/kubernetes_objects_test.go +++ b/internal/executor/util/kubernetes_objects_test.go @@ -9,7 +9,6 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" - networking "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/armadaproject/armada/internal/common" @@ -140,346 +139,6 @@ func makePodSpec() *v1.PodSpec { return &spec } -func makeTestJob() *api.Job { - return &api.Job{ - Id: "Id", - JobSetId: "JobSetId", - Queue: "QueueTest", - Owner: "UserTest", - Namespace: "testNamespace", - PodSpecs: []*v1.PodSpec{makePodSpec()}, - } -} - -func makeTestService() *v1.Service { - return &v1.Service{ - ObjectMeta: metav1.ObjectMeta{Name: "testService"}, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Name: "testPort", - Port: 8080, - }, - }, - }, - } -} - -func TestCreateIngress_Basic(t *testing.T) { - // Boilerplate, should be the same in TlsEnabled - job := makeTestJob() - service := makeTestService() - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: "testNamespace"}} - ingressConfig := &configuration.IngressConfiguration{ - HostnameSuffix: "testSuffix", - } - - // TLS disabled jobconfig - jobConfig := &IngressServiceConfig{ - Ports: []uint32{8080}, - } - - result := CreateIngress("testIngress", job, pod, service, ingressConfig, jobConfig) - - pathType := networking.PathTypePrefix - expectedIngressSpec := networking.IngressSpec{ - TLS: []networking.IngressTLS{}, - Rules: []networking.IngressRule{ - { - Host: "testPort-testPod.testNamespace.testSuffix", - IngressRuleValue: networking.IngressRuleValue{ - HTTP: &networking.HTTPIngressRuleValue{ - Paths: []networking.HTTPIngressPath{ - { - Path: "/", - PathType: &pathType, - Backend: networking.IngressBackend{ - Service: &networking.IngressServiceBackend{ - Name: "testService", - Port: networking.ServiceBackendPort{ - Number: 8080, - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - assert.Equal(t, result.Spec, expectedIngressSpec) -} - -func TestCreateIngress_TLS(t *testing.T) { - // Boilerplate setup - job := makeTestJob() - service := makeTestService() - pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: "testNamespace"}} - ingressConfig := &configuration.IngressConfiguration{ - HostnameSuffix: "testSuffix", - CertNameSuffix: "ingress-tls-certificate", - } - - // TLS enabled in this test - jobConfig := &IngressServiceConfig{ - TlsEnabled: true, - Ports: []uint32{8080}, - } - - result := CreateIngress("testIngress", job, pod, service, ingressConfig, jobConfig) - - pathType := networking.PathTypePrefix - expectedIngressSpec := networking.IngressSpec{ - TLS: []networking.IngressTLS{ - { - Hosts: []string{ - "testPort-testPod.testNamespace.testSuffix", - }, - SecretName: "testNamespace-ingress-tls-certificate", - }, - }, - Rules: []networking.IngressRule{ - { - Host: "testPort-testPod.testNamespace.testSuffix", - IngressRuleValue: networking.IngressRuleValue{ - HTTP: &networking.HTTPIngressRuleValue{ - Paths: []networking.HTTPIngressPath{ - { - Path: "/", - PathType: &pathType, - Backend: networking.IngressBackend{ - Service: &networking.IngressServiceBackend{ - Name: "testService", - Port: networking.ServiceBackendPort{ - Number: 8080, - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - assert.Equal(t, result.Spec, expectedIngressSpec) -} - -func TestCreateService_Ingress_Headless(t *testing.T) { - job := makeTestJob() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - }, - } - ports := []v1.ServicePort{ - { - Port: 123, - }, - } - ingressType := Ingress - createdService := CreateService(job, pod, ports, ingressType, false) - - expected := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod-ingress", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Annotations: map[string]string{ - "armada_jobset_id": "JobSetId", - "armada_owner": "UserTest", - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 123, - }, - }, - Selector: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Type: "ClusterIP", - ClusterIP: "None", - }, - } - assert.Equal(t, createdService, expected) -} - -func TestCreateService_Ingress_ClusterIP(t *testing.T) { - job := makeTestJob() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - }, - } - ports := []v1.ServicePort{ - { - Port: 123, - }, - } - ingressType := Ingress - createdService := CreateService(job, pod, ports, ingressType, true) - - expected := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod-ingress", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Annotations: map[string]string{ - "armada_jobset_id": "JobSetId", - "armada_owner": "UserTest", - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 123, - }, - }, - Selector: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Type: "ClusterIP", - }, - } - assert.Equal(t, createdService, expected) -} - -func TestCreateService_NodePort(t *testing.T) { - job := makeTestJob() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - }, - } - ports := []v1.ServicePort{ - { - Port: 123, - NodePort: 456, - }, - } - ingressType := NodePort - createdService := CreateService(job, pod, ports, ingressType, true) - - expected := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod-nodeport", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Annotations: map[string]string{ - "armada_jobset_id": "JobSetId", - "armada_owner": "UserTest", - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 123, - NodePort: 456, - }, - }, - Selector: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Type: "NodePort", - }, - } - assert.Equal(t, createdService, expected) -} - -func TestCreateService_Headless(t *testing.T) { - job := makeTestJob() - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - }, - } - ports := []v1.ServicePort{ - { - Port: 123, - }, - } - ingressType := Headless - createdService := CreateService(job, pod, ports, ingressType, false) - - expected := &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testPod-headless", - Namespace: "testNamespace", - Labels: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Annotations: map[string]string{ - "armada_jobset_id": "JobSetId", - "armada_owner": "UserTest", - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{ - { - Port: 123, - }, - }, - Selector: map[string]string{ - "armada_job_id": "test_id", - "armada_pod_number": "0", - "armada_queue_id": "test_queue_id", - }, - Type: "ClusterIP", - ClusterIP: "None", - }, - } - assert.Equal(t, createdService, expected) -} - func TestCreatePodFromExecutorApiJob(t *testing.T) { runId := uuid.NewString() jobId := util.NewULID() From f4871c014a960a3ff18dfad0e44a5d8f46ee8d3e Mon Sep 17 00:00:00 2001 From: Martynas Asipauskas Date: Thu, 16 Jan 2025 11:22:50 +0000 Subject: [PATCH 2/5] Upgrade operator and client python dependencies (#4146) --- .github/workflows/airflow-operator-release-to-pypi.yml | 4 ++-- .github/workflows/airflow-operator.yml | 9 +++++---- .github/workflows/python-client-release-to-pypi.yml | 4 ++-- .github/workflows/python-client.yml | 7 +++++-- build/airflow-operator/Dockerfile | 2 +- build/python-client/Dockerfile | 3 +-- client/python/pyproject.toml | 8 ++++---- third_party/airflow/pyproject.toml | 6 +++--- 8 files changed, 23 insertions(+), 20 deletions(-) diff --git a/.github/workflows/airflow-operator-release-to-pypi.yml b/.github/workflows/airflow-operator-release-to-pypi.yml index 5a0e27210d3..4b1f7a8d17f 100644 --- a/.github/workflows/airflow-operator-release-to-pypi.yml +++ b/.github/workflows/airflow-operator-release-to-pypi.yml @@ -20,8 +20,8 @@ jobs: - run: go run github.com/magefile/mage@v1.14.0 -v airflowOperator - uses: ./.github/workflows/python-tests with: - python-version: '3.8' - tox-env: 'py38' + python-version: '3.10' + tox-env: 'py310' path: third_party/airflow github-token: ${{secrets.GITHUB_TOKEN}} - name: Publish package to PyPI diff --git a/.github/workflows/airflow-operator.yml b/.github/workflows/airflow-operator.yml index 50747f98dc7..03e717b029e 100644 --- a/.github/workflows/airflow-operator.yml +++ b/.github/workflows/airflow-operator.yml @@ -40,13 +40,14 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python: [ '3.8', '3.9', '3.10' ] + python: [ '3.10', '3.11', '3.12' ] include: - - tox-env: 'py38' - - tox-env: 'py39' - python: '3.9' - tox-env: 'py310' python: '3.10' + - tox-env: 'py311' + python: '3.11' + - tox-env: 'py312' + python: '3.12' steps: - uses: actions/checkout@v4 - name: Setup Go diff --git a/.github/workflows/python-client-release-to-pypi.yml b/.github/workflows/python-client-release-to-pypi.yml index 33866ffd5c9..7c4877b2f6c 100644 --- a/.github/workflows/python-client-release-to-pypi.yml +++ b/.github/workflows/python-client-release-to-pypi.yml @@ -19,8 +19,8 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: ./.github/workflows/python-tests with: - python-version: '3.8' - tox-env: 'py38' + python-version: '3.9' + tox-env: 'py39' path: 'client/python' github-token: ${{secrets.GITHUB_TOKEN}} - name: Publish package to PyPI diff --git a/.github/workflows/python-client.yml b/.github/workflows/python-client.yml index 8541394a811..a9f5cfd9155 100644 --- a/.github/workflows/python-client.yml +++ b/.github/workflows/python-client.yml @@ -34,13 +34,16 @@ jobs: runs-on: ubuntu-22.04 strategy: matrix: - python: [ '3.8', '3.9', '3.10' ] + python: [ '3.9', '3.10', '3.11', '3.12' ] include: - - tox-env: 'py38' - tox-env: 'py39' python: '3.9' - tox-env: 'py310' python: '3.10' + - tox-env: 'py311' + python: '3.11' + - tox-env: 'py312' + python: '3.12' steps: - uses: actions/checkout@v4 - name: Setup Go diff --git a/build/airflow-operator/Dockerfile b/build/airflow-operator/Dockerfile index 87a2e81a5cb..4fbe0f49bab 100644 --- a/build/airflow-operator/Dockerfile +++ b/build/airflow-operator/Dockerfile @@ -1,5 +1,5 @@ ARG PLATFORM=x86_64 -ARG BASE_IMAGE=python:3.10.14-bookworm +ARG BASE_IMAGE=python:3.10-bookworm FROM --platform=$PLATFORM ${BASE_IMAGE} RUN mkdir /proto diff --git a/build/python-client/Dockerfile b/build/python-client/Dockerfile index 10aa957944b..3122bc8c982 100644 --- a/build/python-client/Dockerfile +++ b/build/python-client/Dockerfile @@ -1,6 +1,5 @@ ARG PLATFORM=x86_64 -ARG BASE_IMAGE=python:3.8.18-bookworm - +ARG BASE_IMAGE=python:3.9-bookworm FROM --platform=$PLATFORM ${BASE_IMAGE} RUN mkdir /proto diff --git a/client/python/pyproject.toml b/client/python/pyproject.toml index c9f06f55ba7..2d3b0783062 100644 --- a/client/python/pyproject.toml +++ b/client/python/pyproject.toml @@ -1,18 +1,18 @@ [project] name = "armada_client" -version = "0.4.8" +version = "0.4.10" description = "Armada gRPC API python client" readme = "README.md" -requires-python = ">=3.7" +requires-python = ">=3.9" dependencies = ["grpcio==1.66.1", "grpcio-tools==1.66.1", "mypy-protobuf>=3.2.0", "protobuf>=5.26.1,<6.0dev" ] license = { text = "Apache Software License" } authors = [{ name = "G-Research Open Source Software", email = "armada@armadaproject.io" }] [project.optional-dependencies] -format = ["black==23.7.0", "flake8==7.0.0", "pylint==2.17.5"] +format = ["black>=23.7.0", "flake8>=7.0.0", "pylint>=2.17.5"] # note(JayF): sphinx-jekyll-builder was broken by sphinx-markdown-builder 0.6 -- so pin to 0.5.5 docs = ["sphinx==7.1.2", "sphinx-jekyll-builder==0.3.0", "sphinx-toolbox==3.2.0b1", "sphinx-markdown-builder==0.5.5"] -test = ["pytest==7.3.1", "coverage>=6.5.0", "pytest-asyncio==0.21.1"] +test = ["pytest==7.3.1", "coverage==6.5.0", "pytest-asyncio==0.21.1"] [build-system] requires = ["setuptools"] diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index fd162626326..4982987b4c4 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -17,15 +17,15 @@ dependencies=[ 'kubernetes_asyncio>=24.2.3', 'opentelemetry-exporter-otlp>=1.28.1' # We want to force dependency upgrade for transitive Airflow dependency ] -requires-python=">=3.8" +requires-python=">=3.10" classifiers=[ 'Programming Language :: Python :: 3', 'Operating System :: OS Independent', ] [project.optional-dependencies] -format = ["black>=24.0.0", "flake8==7.0.0", "pylint==2.17.5"] -test = ["pytest==7.3.1", "coverage==7.3.2", "pytest-asyncio==0.21.1", +format = ["black>=24.0.0", "flake8>=7.0.0", "pylint>=2.17.5"] +test = ["pytest==7.3.1", "coverage==6.5.0", "pytest-asyncio==0.21.1", "pytest-mock>=3.14.0"] # note(JayF): sphinx-jekyll-builder was broken by sphinx-markdown-builder 0.6 -- so pin to 0.5.5 docs = ["sphinx==7.1.2", "sphinx-jekyll-builder==0.3.0", "sphinx-toolbox==3.2.0b1", "sphinx-markdown-builder==0.5.5"] From f146e3b0f39707eea9f3888c05d427bfc4f4bdfb Mon Sep 17 00:00:00 2001 From: Martynas Asipauskas Date: Thu, 16 Jan 2025 12:38:19 +0000 Subject: [PATCH 3/5] New Airflow Operator release 1.0.13 (#4148) --- third_party/airflow/pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index 4982987b4c4..cc1f14c4a63 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "armada_airflow" -version = "1.0.12" +version = "1.0.13" description = "Armada Airflow Operator" readme='README.md' authors = [{name = "Armada-GROSS", email = "armada@armadaproject.io"}] @@ -41,7 +41,7 @@ include = ["armada_airflow*"] [tool.black] line-length = 88 -target-version = ['py38', 'py39', 'py310'] +target-version = ['py310', 'py311', 'py312'] include = ''' /( armada From 8db0d5e3cb96d760d6b7b981bbccb3f79b99535b Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Fri, 17 Jan 2025 10:22:57 +0000 Subject: [PATCH 4/5] Improve handling when node is running jobs from multiple pools (#4131) * Improve handling when node is running jobs from multiple pools The scenario where a job is running jobs from multiple pools should only happen when a node has its pool changed - The jobs already running will have a pool that doesn't match the nodes current pool Currently what happens is that the scheduler sees the node as empty and double schedules the node This PR changes that, so we set any resource used by jobs in other pools as unallocatable, to prevent this double scheduling - This is a slightly more generic approach than the current approach where we just mark jobs from Away pools as unallocatable I had to change when we calculate jobsByPool to calculate for running jobs of all pools Signed-off-by: JamesMurkin * Update naming and comments Signed-off-by: JamesMurkin * Rename pool Signed-off-by: JamesMurkin * Fix typo Signed-off-by: JamesMurkin * Improve comment * Gofumpt Signed-off-by: JamesMurkin --------- Signed-off-by: JamesMurkin --- .../scheduler/scheduling/scheduling_algo.go | 94 ++++++++++--------- 1 file changed, 49 insertions(+), 45 deletions(-) diff --git a/internal/scheduler/scheduling/scheduling_algo.go b/internal/scheduler/scheduling/scheduling_algo.go index af40dc61855..e56e01f3d90 100644 --- a/internal/scheduler/scheduling/scheduling_algo.go +++ b/internal/scheduler/scheduling/scheduling_algo.go @@ -177,7 +177,7 @@ type FairSchedulingAlgoContext struct { Txn *jobdb.Txn } -func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, pool configuration.PoolConfig) (*FairSchedulingAlgoContext, error) { +func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Context, txn *jobdb.Txn, currentPool configuration.PoolConfig) (*FairSchedulingAlgoContext, error) { executors, err := l.executorRepository.GetExecutors(ctx) if err != nil { return nil, err @@ -194,12 +194,12 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con awayAllocationPools := []string{} for _, otherPool := range l.schedulingConfig.Pools { - if slices.Contains(otherPool.AwayPools, pool.Name) { + if slices.Contains(otherPool.AwayPools, currentPool.Name) { awayAllocationPools = append(awayAllocationPools, otherPool.Name) } } - allPools := []string{pool.Name} - allPools = append(allPools, pool.AwayPools...) + allPools := []string{currentPool.Name} + allPools = append(allPools, currentPool.AwayPools...) allPools = append(allPools, awayAllocationPools...) jobSchedulingInfo, err := calculateJobSchedulingInfo(ctx, @@ -208,7 +208,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con func(_ *schedulerobjects.Executor) bool { return true }), queueByName, txn.GetAll(), - pool.Name, + currentPool.Name, awayAllocationPools, allPools) if err != nil { @@ -238,35 +238,41 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con ctx.Error(errMes) }) - homeJobs := jobSchedulingInfo.jobsByPool[pool.Name] - awayJobs := []*jobdb.Job{} + currentPoolJobs := jobSchedulingInfo.jobsByPool[currentPool.Name] + otherPoolsJobs := []*jobdb.Job{} - for _, otherPool := range l.schedulingConfig.Pools { - if pool.Name == otherPool.Name { + for _, pool := range l.schedulingConfig.Pools { + if currentPool.Name == pool.Name { continue } - if slices.Contains(otherPool.AwayPools, pool.Name) { - homeJobs = append(homeJobs, jobSchedulingInfo.jobsByPool[otherPool.Name]...) + if slices.Contains(pool.AwayPools, currentPool.Name) { + // Jobs from away pools need to be considered in the current scheduling round, so should be added here + // This is so the jobs are available for eviction, if a home job needs to take their place + currentPoolJobs = append(currentPoolJobs, jobSchedulingInfo.jobsByPool[pool.Name]...) + } else { + // Jobs not used by the current pool belong to other pools we aren't currently considering + // Add them here, so their resource can made unallocatable in the nodeDb, preventing us scheduling over them + // The cases this is needed (a node has jobs from multiple pools is) + // - The pool of the node was changed, but still has jobs running from the pool it was previously in + // - A node running home jobs and cross-pool away jobs. In this case when scheduling the cross-pool away jobs + // we need to not schedule over resource used by the home jobs + otherPoolsJobs = append(otherPoolsJobs, jobSchedulingInfo.jobsByPool[pool.Name]...) } } - for _, awayPool := range pool.AwayPools { - awayJobs = append(awayJobs, jobSchedulingInfo.jobsByPool[awayPool]...) - } - - nodePools := append(pool.AwayPools, pool.Name) + nodePools := append(currentPool.AwayPools, currentPool.Name) - nodeDb, err := l.constructNodeDb(homeJobs, awayJobs, + nodeDb, err := l.constructNodeDb(currentPoolJobs, otherPoolsJobs, armadaslices.Filter(nodes, func(node *internaltypes.Node) bool { return slices.Contains(nodePools, node.GetPool()) })) if err != nil { return nil, err } totalResources := nodeDb.TotalKubernetesResources() - totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(pool.Name)) + totalResources = totalResources.Add(l.floatingResourceTypes.GetTotalAvailableForPool(currentPool.Name)) schedulingContext, err := l.constructSchedulingContext( - pool.Name, + currentPool.Name, totalResources, jobSchedulingInfo.demandByQueueAndPriorityClass, jobSchedulingInfo.allocatedByQueueAndPriorityClass, @@ -278,7 +284,7 @@ func (l *FairSchedulingAlgo) newFairSchedulingAlgoContext(ctx *armadacontext.Con return &FairSchedulingAlgoContext{ queues: queueByName, - pool: pool.Name, + pool: currentPool.Name, nodeDb: nodeDb, schedulingContext: schedulingContext, nodeIdByJobId: jobSchedulingInfo.nodeIdByJobId, @@ -331,17 +337,6 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m pools = []string{pool} } - matches := false - for _, pool := range pools { - if slices.Contains(allPools, pool) { - matches = true - break - } - } - if !matches { - continue - } - if slices.Contains(pools, currentPool) { queueResources, ok := demandByQueueAndPriorityClass[job.Queue()] if !ok { @@ -369,6 +364,21 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m } pool := job.LatestRun().Pool() + if _, present := jobsByPool[pool]; !present { + jobsByPool[pool] = []*jobdb.Job{} + } + jobsByPool[pool] = append(jobsByPool[pool], job) + + matches := false + for _, pool := range pools { + if slices.Contains(allPools, pool) { + matches = true + break + } + } + if !matches { + continue + } if _, isActive := activeExecutorsSet[executorId]; isActive { if pool == currentPool { @@ -387,10 +397,7 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m awayAllocation[job.PriorityClassName()] = awayAllocation[job.PriorityClassName()].Add(job.AllResourceRequirements()) } } - if _, present := jobsByPool[pool]; !present { - jobsByPool[pool] = []*jobdb.Job{} - } - jobsByPool[pool] = append(jobsByPool[pool], job) + jobsByExecutorId[executorId] = append(jobsByExecutorId[executorId], job) nodeIdByJobId[job.Id()] = nodeId gangInfo, err := schedulercontext.GangInfoFromLegacySchedulerJob(job) @@ -420,7 +427,7 @@ func calculateJobSchedulingInfo(ctx *armadacontext.Context, activeExecutorsSet m }, nil } -func (l *FairSchedulingAlgo) constructNodeDb(homeJobs []*jobdb.Job, awayJobs []*jobdb.Job, nodes []*internaltypes.Node) (*nodedb.NodeDb, error) { +func (l *FairSchedulingAlgo) constructNodeDb(currentPoolJobs []*jobdb.Job, otherPoolsJobs []*jobdb.Job, nodes []*internaltypes.Node) (*nodedb.NodeDb, error) { nodeDb, err := nodedb.NewNodeDb( l.schedulingConfig.PriorityClasses, l.schedulingConfig.IndexedResources, @@ -432,7 +439,7 @@ func (l *FairSchedulingAlgo) constructNodeDb(homeJobs []*jobdb.Job, awayJobs []* if err != nil { return nil, err } - if err := l.populateNodeDb(nodeDb, homeJobs, awayJobs, nodes); err != nil { + if err := l.populateNodeDb(nodeDb, currentPoolJobs, otherPoolsJobs, nodes); err != nil { return nil, err } @@ -590,7 +597,7 @@ func (l *FairSchedulingAlgo) SchedulePool( } // populateNodeDb adds all the nodes and jobs associated with a particular pool to the nodeDb. -func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, homeJobs []*jobdb.Job, awayJobs []*jobdb.Job, nodes []*internaltypes.Node) error { +func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, currentPoolJobs []*jobdb.Job, otherPoolsJobs []*jobdb.Job, nodes []*internaltypes.Node) error { txn := nodeDb.Txn(true) defer txn.Abort() nodesById := armadaslices.GroupByFuncUnique( @@ -598,7 +605,7 @@ func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, homeJobs []*j func(node *internaltypes.Node) string { return node.GetId() }, ) jobsByNodeId := make(map[string][]*jobdb.Job, len(nodes)) - for _, job := range homeJobs { + for _, job := range currentPoolJobs { if job.InTerminalState() || !job.HasRuns() { continue } @@ -612,20 +619,17 @@ func (l *FairSchedulingAlgo) populateNodeDb(nodeDb *nodedb.NodeDb, homeJobs []*j } jobsByNodeId[nodeId] = append(jobsByNodeId[nodeId], job) } - for _, job := range awayJobs { + for _, job := range otherPoolsJobs { if job.InTerminalState() || !job.HasRuns() { continue } nodeId := job.LatestRun().NodeId() node, ok := nodesById[nodeId] if !ok { - logrus.Errorf( - "job %s assigned to node %s on executor %s, but no such node found", - job.Id(), nodeId, job.LatestRun().Executor(), - ) + // Job is allocated to a node which isn't part of this pool, ignore it continue } - + // Mark resource used by jobs of other pools as unallocatable so we don't double schedule this resource markResourceUnallocatable(node.AllocatableByPriority, job.KubernetesResourceRequirements()) } From 2ca67c9b8db3b564caf6e476d6138c13d7ad57cf Mon Sep 17 00:00:00 2001 From: Maurice Yap Date: Fri, 17 Jan 2025 16:51:28 +0000 Subject: [PATCH 5/5] Add description and alerts for job commands on Lookout UI (#4152) Add optional descriptions and alerts for commands for a job displayed on the Lookout UI. These can be specified in Markdown. --- config/lookoutv2/config.yaml | 4 ++ internal/lookout/ui/package.json | 2 + .../sidebar/SidebarTabJobCommands.tsx | 53 +++++++++++++------ .../src/services/lookoutV2/useGetUiConfig.ts | 25 +++++++-- internal/lookout/ui/src/utils.tsx | 28 ++++++++-- internal/lookout/ui/yarn.lock | 14 ++++- internal/lookoutv2/configuration/types.go | 24 ++++++++- 7 files changed, 124 insertions(+), 26 deletions(-) diff --git a/config/lookoutv2/config.yaml b/config/lookoutv2/config.yaml index f423cc5eeed..3a35d84592d 100644 --- a/config/lookoutv2/config.yaml +++ b/config/lookoutv2/config.yaml @@ -32,3 +32,7 @@ uiConfig: template: "kubectl --context {{ runs[runs.length - 1].cluster }} -n {{ namespace }} logs armada-{{ jobId }}-0" - name: Exec template: "kubectl --context {{ runs[runs.length - 1].cluster }} -n {{ namespace }} exec -it armada-{{ jobId }}-0 /bin/sh" + descriptionMd: Execute a command on the job's container. + alertMessageMd: | + This will only work if the container is still running. + alertLevel: info diff --git a/internal/lookout/ui/package.json b/internal/lookout/ui/package.json index 052baa34c44..a0cd1b4347b 100644 --- a/internal/lookout/ui/package.json +++ b/internal/lookout/ui/package.json @@ -38,6 +38,8 @@ "date-fns-tz": "^1.3.7", "js-yaml": "^4.0.0", "lodash": "^4.17.21", + "markdown-to-jsx": "^7.7.3", + "mui-markdown": "^1.2.5", "notistack": "^3.0.1", "oidc-client-ts": "^2.3.0", "prism-react-renderer": "^2.4.1", diff --git a/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabJobCommands.tsx b/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabJobCommands.tsx index 938175564e2..8409370aee2 100644 --- a/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabJobCommands.tsx +++ b/internal/lookout/ui/src/components/lookoutV2/sidebar/SidebarTabJobCommands.tsx @@ -1,6 +1,7 @@ import { OpenInNew } from "@mui/icons-material" -import { Link, Stack } from "@mui/material" +import { Alert, AlertColor, Link, Stack } from "@mui/material" import { template, templateSettings } from "lodash" +import { MuiMarkdown } from "mui-markdown" import { Fragment } from "react/jsx-runtime" import validator from "validator" @@ -11,6 +12,8 @@ import { SPACING } from "../../../styling/spacing" import { CommandSpec } from "../../../utils" import { CodeBlock } from "../../CodeBlock" +const KNOWN_ALERT_COLORS: AlertColor[] = ["success", "info", "warning", "error"] + export interface SidebarTabJobCommandsProps { job: Job commandSpecs: CommandSpec[] @@ -35,27 +38,43 @@ export const SidebarTabJobCommands = ({ job, commandSpecs }: SidebarTabJobComman return ( <> {commandSpecs.map((commandSpec) => { - const { name } = commandSpec + const { name, descriptionMd, alertLevel, alertMessageMd } = commandSpec const commandText = getCommandText(job, commandSpec) + + const alertSeverity: AlertColor = + alertLevel && (KNOWN_ALERT_COLORS as string[]).includes(alertLevel) ? (alertLevel as AlertColor) : "info" + return ( {name} - {validator.isURL(commandText) ? ( - - -
{commandText}
- -
- - ) : ( - + {descriptionMd && ( +
+ {descriptionMd} +
+ )} + {alertMessageMd && ( + + {alertMessageMd} + )} +
+ {validator.isURL(commandText) ? ( + + +
{commandText}
+ +
+ + ) : ( + + )} +
) })} diff --git a/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts b/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts index 683c41b3062..4010d3159a1 100644 --- a/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts +++ b/internal/lookout/ui/src/services/lookoutV2/useGetUiConfig.ts @@ -55,10 +55,27 @@ export const useGetUiConfig = (enabled = true) => { } if (json.CommandSpecs) { - config.commandSpecs = json.CommandSpecs.map(({ Name, Template }: { Name: string; Template: string }) => ({ - name: Name, - template: Template, - })) + config.commandSpecs = json.CommandSpecs.map( + ({ + Name, + Template, + DescriptionMd, + AlertMessageMd, + AlertLevel, + }: { + Name: string + Template: string + DescriptionMd: string + AlertMessageMd: string + AlertLevel: string + }) => ({ + name: Name, + template: Template, + descriptionMd: DescriptionMd, + alertMessageMd: AlertMessageMd, + alertLevel: AlertLevel, + }), + ) } if (json.Backend) config.backend = json.Backend diff --git a/internal/lookout/ui/src/utils.tsx b/internal/lookout/ui/src/utils.tsx index 3c4caf410d7..a458e946f89 100644 --- a/internal/lookout/ui/src/utils.tsx +++ b/internal/lookout/ui/src/utils.tsx @@ -9,9 +9,13 @@ export interface OidcConfig { clientId: string scope: string } + export interface CommandSpec { name: string template: string + descriptionMd?: string + alertMessageMd?: string + alertLevel?: string } export interface UIConfig { @@ -75,9 +79,27 @@ export async function getUIConfig(): Promise { scope: json.Oidc.Scope, } if (json.CommandSpecs) { - config.commandSpecs = json.CommandSpecs.map((c: { Name: string; Template: string }) => { - return { name: c.Name, template: c.Template } - }) + config.commandSpecs = json.CommandSpecs.map( + ({ + Name, + Template, + DescriptionMd, + AlertMessageMd, + AlertLevel, + }: { + Name: string + Template: string + DescriptionMd: string + AlertMessageMd: string + AlertLevel: string + }) => ({ + name: Name, + template: Template, + descriptionMd: DescriptionMd, + alertMessageMd: AlertMessageMd, + alertLevel: AlertLevel, + }), + ) } } if (json.Backend) config.backend = json.Backend diff --git a/internal/lookout/ui/yarn.lock b/internal/lookout/ui/yarn.lock index 462a894822e..7a1cf05c44a 100644 --- a/internal/lookout/ui/yarn.lock +++ b/internal/lookout/ui/yarn.lock @@ -3261,6 +3261,11 @@ magic-string@^0.30.17: dependencies: "@jridgewell/sourcemap-codec" "^1.5.0" +markdown-to-jsx@^7.7.3: + version "7.7.3" + resolved "https://registry.yarnpkg.com/markdown-to-jsx/-/markdown-to-jsx-7.7.3.tgz#c75927252592696e9e8b2a9557628749d8ab023e" + integrity sha512-o35IhJDFP6Fv60zPy+hbvZSQMmgvSGdK5j8NRZ7FeZMY+Bgqw+dSg7SC1ZEzC26++CiOUCqkbq96/c3j/FfTEQ== + math-intrinsics@^1.1.0: version "1.1.0" resolved "https://registry.yarnpkg.com/math-intrinsics/-/math-intrinsics-1.1.0.tgz#a0dd74be81e2aa5c2f27e65ce283605ee4e2b7f9" @@ -3325,6 +3330,13 @@ ms@^2.1.1, ms@^2.1.3: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +mui-markdown@^1.2.5: + version "1.2.5" + resolved "https://registry.yarnpkg.com/mui-markdown/-/mui-markdown-1.2.5.tgz#48e8a800c6707f84b77f56f3e553eb00754f15ff" + integrity sha512-zgLSXxYgHmUkUZ6mp2aM8C1vcoAsCyQLyvvaiSf8AAutNnAXhA8tlBiiGg8hOvX77VQs1A1dssbOyT/W2ytonA== + optionalDependencies: + prism-react-renderer "^2.0.3" + nanoid@^3.3.7: version "3.3.8" resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.3.8.tgz#b1be3030bee36aaff18bacb375e5cce521684baf" @@ -3579,7 +3591,7 @@ pretty-format@^29.0.0, pretty-format@^29.7.0: ansi-styles "^5.0.0" react-is "^18.0.0" -prism-react-renderer@^2.4.1: +prism-react-renderer@^2.0.3, prism-react-renderer@^2.4.1: version "2.4.1" resolved "https://registry.yarnpkg.com/prism-react-renderer/-/prism-react-renderer-2.4.1.tgz#ac63b7f78e56c8f2b5e76e823a976d5ede77e35f" integrity sha512-ey8Ls/+Di31eqzUxC46h8MksNuGx/n0AAC8uKpwFau4RPDYLuE3EXTp8N8G2vX2N7UC/+IXeNUnlWBGGcAG+Ig== diff --git a/internal/lookoutv2/configuration/types.go b/internal/lookoutv2/configuration/types.go index fa4d399f166..eb3ebfb77d1 100644 --- a/internal/lookoutv2/configuration/types.go +++ b/internal/lookoutv2/configuration/types.go @@ -36,9 +36,31 @@ type PrunerConfig struct { Postgres configuration.PostgresConfig } +// Alert level enum values correspond to the severity levels of the MUI Alert +// component: https://mui.com/material-ui/react-alert/#severity +type AlertLevel string + +const ( + AlertLevelSuccess AlertLevel = "success" + AlertLevelInfo AlertLevel = "info" + AlertLevelWarning AlertLevel = "warning" + AlertLevelError AlertLevel = "error" +) + +// CommandSpec details a command to be displayed on a job's "Commands" sidebar +// tab in the Lookout UI type CommandSpec struct { - Name string + // Name is the title of the command + Name string + // Tempate is the template string for the command Template string + // DescriptionMd is an optional description for the command in Markdown + DescriptionMd string + // AlertMessageMd is an optional message for the command, to be displayed as + // an alert, written in Markdown + AlertMessageMd string + // AlertLevel is the severity level of the alert + AlertLevel AlertLevel } type UIConfig struct {