Skip to content

Commit

Permalink
[kjobctl] Fix output Slurm variables (kubernetes-sigs#3188)
Browse files Browse the repository at this point in the history
* Fix total resources calculation

* Fix calculations when ntasks greater than 1

* Fix naming of replicated containers when ntasks greater than 1

* Move objects creation to the end
  • Loading branch information
IrvingMg authored Oct 8, 2024
1 parent 04bfc06 commit f6f80a5
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 75 deletions.
174 changes: 105 additions & 69 deletions cmd/experimental/kjobctl/pkg/builder/slurm_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type slurmBuilder struct {
cpusOnNode *resource.Quantity
cpusPerGpu *resource.Quantity
totalMemPerNode *resource.Quantity
totalGpus int32
totalGpus *resource.Quantity
}

var _ builder = (*slurmBuilder)(nil)
Expand Down Expand Up @@ -244,45 +244,14 @@ func (b *slurmBuilder) build(ctx context.Context) (runtime.Object, []runtime.Obj
job.Labels[kueue.WorkloadPriorityClassLabel] = b.priority
}

envEntrypointScript, err := b.buildInitEntrypointScript()
if err != nil {
return nil, nil, err
}

entrypointScript, err := b.buildEntrypointScript()
if err != nil {
return nil, nil, err
}

configMap := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{Kind: "ConfigMap", APIVersion: "v1"},
ObjectMeta: objectMeta,
Data: map[string]string{
slurmInitEntrypointFilename: envEntrypointScript,
slurmEntrypointFilename: entrypointScript,
slurmScriptFilename: b.scriptContent,
},
}

service := &corev1.Service{
TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "v1"},
ObjectMeta: objectMeta,
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
"job-name": b.objectName,
},
},
}

b.buildPodSpecVolumesAndEnv(&job.Spec.Template.Spec)
job.Spec.Template.Spec.Volumes = append(job.Spec.Template.Spec.Volumes,
corev1.Volume{
Name: "slurm-scripts",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: configMap.Name,
Name: b.objectName,
},
Items: []corev1.KeyToPath{
{
Expand Down Expand Up @@ -326,14 +295,28 @@ func (b *slurmBuilder) build(ctx context.Context) (runtime.Object, []runtime.Obj
},
})

gpusPerTask, err := resource.ParseQuantity("0")
if err != nil {
return nil, nil, errors.New("error initializing gpus counter")
}
var totalGPUsPerTask resource.Quantity
for _, number := range b.gpusPerTask {
gpusPerTask.Add(*number)
totalGPUsPerTask.Add(*number)
}

var memPerCPU, memPerGPU, memPerContainer resource.Quantity
if b.memPerCPU != nil && b.cpusPerTask != nil {
memPerCPU = *b.memPerCPU
memPerCPU.Mul(b.cpusPerTask.Value())
}

if b.memPerGPU != nil && b.gpusPerTask != nil {
memPerGPU = *b.memPerGPU
memPerGPU.Mul(totalGPUsPerTask.Value())
}

if b.memPerNode != nil {
mem := b.memPerNode.MilliValue() / int64(len(job.Spec.Template.Spec.Containers))
memPerContainer = *resource.NewMilliQuantity(mem, b.memPerNode.Format)
}

var totalCpus, totalGpus, totalMem resource.Quantity
for i := range job.Spec.Template.Spec.Containers {
container := &job.Spec.Template.Spec.Containers[i]

Expand All @@ -348,40 +331,40 @@ func (b *slurmBuilder) build(ctx context.Context) (runtime.Object, []runtime.Obj

if b.cpusPerTask != nil {
requests[corev1.ResourceCPU] = *b.cpusPerTask
totalCpus.Add(*b.cpusPerTask)
}

if b.gpusPerTask != nil {
for name, number := range b.gpusPerTask {
requests[corev1.ResourceName(name)] = *number
}
}

limits := corev1.ResourceList{}
if b.memPerNode != nil {
memPerContainer := b.memPerNode.MilliValue() / int64(len(job.Spec.Template.Spec.Containers))
limits[corev1.ResourceMemory] = *resource.NewMilliQuantity(memPerContainer, b.memPerNode.Format)
totalGpus.Add(totalGPUsPerTask)
}

if b.memPerTask != nil {
requests[corev1.ResourceMemory] = *b.memPerTask
totalMem.Add(*b.memPerTask)
}

if b.memPerCPU != nil && b.cpusPerTask != nil {
memPerCPU := *b.memPerCPU
memPerCPU.Mul(b.cpusPerTask.Value())
if !memPerCPU.IsZero() {
requests[corev1.ResourceMemory] = memPerCPU
totalMem.Add(memPerCPU)
}

if b.memPerGPU != nil && b.gpusPerTask != nil {
memPerGpu := *b.memPerGPU
memPerGpu.Mul(gpusPerTask.Value())
requests[corev1.ResourceMemory] = memPerGpu
if !memPerGPU.IsZero() {
requests[corev1.ResourceMemory] = memPerGPU
totalMem.Add(memPerGPU)
}

if len(requests) > 0 {
container.Resources.Requests = requests
}

limits := corev1.ResourceList{}
if !memPerContainer.IsZero() {
limits[corev1.ResourceMemory] = memPerContainer
}

if len(limits) > 0 {
container.Resources.Limits = limits
}
Expand All @@ -405,13 +388,31 @@ func (b *slurmBuilder) build(ctx context.Context) (runtime.Object, []runtime.Obj
job.Spec.Parallelism = b.nodes

if nTasks > 1 {
replicatedContainer := job.Spec.Template.Spec.Containers[0]
for i := 1; i < int(nTasks); i++ {
job.Spec.Template.Spec.Containers = append(job.Spec.Template.Spec.Containers, job.Spec.Template.Spec.Containers[0])
job.Spec.Template.Spec.Containers = append(job.Spec.Template.Spec.Containers, replicatedContainer)

if b.cpusPerTask != nil {
totalCpus.Add(*b.cpusPerTask)
}

if !memPerCPU.IsZero() {
totalMem.Add(memPerCPU)
}

if !memPerGPU.IsZero() {
totalMem.Add(memPerGPU)
}
}

for i := range nTasks {
for i, j := 0, 0; i <= int(nTasks); i++ {
if job.Spec.Template.Spec.Containers[i].Name != replicatedContainer.Name {
continue
}

job.Spec.Template.Spec.Containers[i].Name =
fmt.Sprintf("%s-%d", job.Spec.Template.Spec.Containers[i].Name, i)
fmt.Sprintf("%s-%d", job.Spec.Template.Spec.Containers[i].Name, j)
j++
}
}

Expand All @@ -426,24 +427,59 @@ func (b *slurmBuilder) build(ctx context.Context) (runtime.Object, []runtime.Obj
job.Spec.Parallelism = b.nodes
}

if b.cpusPerTask != nil {
b.cpusOnNode = ptr.To(b.cpusPerTask.DeepCopy())
b.cpusOnNode.Mul(int64(len(job.Spec.Template.Spec.Containers)))
if !totalCpus.IsZero() {
b.cpusOnNode = &totalCpus
}

if b.memPerCPU != nil {
b.totalMemPerNode = ptr.To(b.memPerCPU.DeepCopy())
b.totalMemPerNode.Mul(int64(len(job.Spec.Template.Spec.Containers)))
if !totalGpus.IsZero() {
b.totalGpus = &totalGpus
}

if b.memPerNode != nil {
b.totalMemPerNode = b.memPerNode
} else if !totalMem.IsZero() {
b.totalMemPerNode = &totalMem
}

if b.cpusOnNode != nil && b.totalGpus != nil && !totalGpus.IsZero() {
cpusPerGpu := totalCpus.MilliValue() / totalGpus.MilliValue()
b.cpusPerGpu = resource.NewQuantity(cpusPerGpu, b.cpusOnNode.Format)
}

envEntrypointScript, err := b.buildInitEntrypointScript()
if err != nil {
return nil, nil, err
}

entrypointScript, err := b.buildEntrypointScript()
if err != nil {
return nil, nil, err
}

totalGpus := gpusPerTask
totalTasks := int64(len(job.Spec.Template.Spec.Containers))
totalGpus.Mul(totalTasks)
b.totalGpus = int32(totalGpus.Value())
configMap := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: b.buildObjectMeta(template.Template.ObjectMeta),
Data: map[string]string{
slurmInitEntrypointFilename: envEntrypointScript,
slurmEntrypointFilename: entrypointScript,
slurmScriptFilename: b.scriptContent,
},
}
configMap.ObjectMeta.GenerateName = ""
configMap.ObjectMeta.Name = b.objectName

if b.cpusOnNode != nil && b.totalGpus > 0 {
cpusPerGpu := b.cpusOnNode.Value() / int64(b.totalGpus)
b.cpusPerGpu = resource.NewQuantity(cpusPerGpu, resource.DecimalSI)
service := &corev1.Service{
TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "v1"},
ObjectMeta: objectMeta,
Spec: corev1.ServiceSpec{
ClusterIP: "None",
Selector: map[string]string{
"job-name": b.objectName,
},
},
}

return job, []runtime.Object{configMap, service}, nil
Expand Down Expand Up @@ -496,7 +532,7 @@ type slurmInitEntrypointScript struct {
SlurmMemPerCPU string
SlurmMemPerGPU string
SlurmMemPerNode string
SlurmGPUs int32
SlurmGPUs string
SlurmNTasks int32
SlurmNTasksPerNode int32
SlurmNProcs int32
Expand Down Expand Up @@ -571,7 +607,7 @@ func (b *slurmBuilder) buildInitEntrypointScript() (string, error) {
SlurmMemPerCPU: getValueOrEmpty(b.memPerCPU),
SlurmMemPerGPU: getValueOrEmpty(b.memPerGPU),
SlurmMemPerNode: getValueOrEmpty(b.totalMemPerNode),
SlurmGPUs: b.totalGpus,
SlurmGPUs: getValueOrEmpty(b.totalGpus),
SlurmNTasks: nTasks,
SlurmNTasksPerNode: nTasks,
SlurmNProcs: nTasks,
Expand Down
2 changes: 1 addition & 1 deletion cmd/experimental/kjobctl/pkg/builder/slurm_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ SLURM_CPUS_PER_GPU=
SLURM_MEM_PER_CPU=
SLURM_MEM_PER_GPU=
SLURM_MEM_PER_NODE=
SLURM_GPUS=0
SLURM_GPUS=
SLURM_NTASKS=1
SLURM_NTASKS_PER_NODE=1
SLURM_NPROCS=1
Expand Down
21 changes: 16 additions & 5 deletions cmd/experimental/kjobctl/pkg/cmd/create/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ SLURM_CPUS_PER_GPU=
SLURM_MEM_PER_CPU=
SLURM_MEM_PER_GPU=
SLURM_MEM_PER_NODE=
SLURM_GPUS=0
SLURM_GPUS=
SLURM_NTASKS=1
SLURM_NTASKS_PER_NODE=1
SLURM_NPROCS=1
Expand Down Expand Up @@ -943,12 +943,14 @@ error_path=$(unmask_filename "$SBATCH_ERROR")
"--job-name", "job-name",
"--partition", "lq1",
"--chdir", "/mydir",
"--cpus-per-task", "2",
tc.tempFile,
}
},
kjobctlObjs: []runtime.Object{
wrappers.MakeJobTemplate("slurm-job-template", metav1.NamespaceDefault).
WithContainer(*wrappers.MakeContainer("c1", "bash:4.4").Obj()).
WithContainer(*wrappers.MakeContainer("c2", "bash:4.4").Obj()).
Obj(),
wrappers.MakeApplicationProfile("profile", metav1.NamespaceDefault).
WithSupportedMode(*wrappers.MakeSupportedMode(v1alpha1.SlurmMode, "slurm-job-template").Obj()).
Expand Down Expand Up @@ -988,16 +990,25 @@ error_path=$(unmask_filename "$SBATCH_ERROR")
Command("bash", "/slurm/scripts/entrypoint.sh").
WithVolumeMount(corev1.VolumeMount{Name: "slurm-scripts", MountPath: "/slurm/scripts"}).
WithVolumeMount(corev1.VolumeMount{Name: "slurm-env", MountPath: "/slurm/env"}).
WithRequest(corev1.ResourceCPU, resource.MustParse("2")).
Obj()).
WithContainer(*wrappers.MakeContainer("c2", "bash:4.4").
Command("bash", "/slurm/scripts/entrypoint.sh").
WithVolumeMount(corev1.VolumeMount{Name: "slurm-scripts", MountPath: "/slurm/scripts"}).
WithVolumeMount(corev1.VolumeMount{Name: "slurm-env", MountPath: "/slurm/env"}).
WithRequest(corev1.ResourceCPU, resource.MustParse("2")).
Obj()).
WithContainer(*wrappers.MakeContainer("c1-1", "bash:4.4").
Command("bash", "/slurm/scripts/entrypoint.sh").
WithVolumeMount(corev1.VolumeMount{Name: "slurm-scripts", MountPath: "/slurm/scripts"}).
WithVolumeMount(corev1.VolumeMount{Name: "slurm-env", MountPath: "/slurm/env"}).
WithRequest(corev1.ResourceCPU, resource.MustParse("2")).
Obj()).
WithContainer(*wrappers.MakeContainer("c1-2", "bash:4.4").
Command("bash", "/slurm/scripts/entrypoint.sh").
WithVolumeMount(corev1.VolumeMount{Name: "slurm-scripts", MountPath: "/slurm/scripts"}).
WithVolumeMount(corev1.VolumeMount{Name: "slurm-env", MountPath: "/slurm/env"}).
WithRequest(corev1.ResourceCPU, resource.MustParse("2")).
Obj()).
WithVolume(corev1.Volume{
Name: "slurm-scripts",
Expand Down Expand Up @@ -1087,14 +1098,14 @@ SLURM_ARRAY_TASK_COUNT=26
SLURM_ARRAY_TASK_MAX=25
SLURM_ARRAY_TASK_MIN=0
SLURM_TASKS_PER_NODE=3
SLURM_CPUS_PER_TASK=
SLURM_CPUS_ON_NODE=
SLURM_JOB_CPUS_PER_NODE=
SLURM_CPUS_PER_TASK=2
SLURM_CPUS_ON_NODE=8
SLURM_JOB_CPUS_PER_NODE=8
SLURM_CPUS_PER_GPU=
SLURM_MEM_PER_CPU=
SLURM_MEM_PER_GPU=
SLURM_MEM_PER_NODE=
SLURM_GPUS=0
SLURM_GPUS=
SLURM_NTASKS=3
SLURM_NTASKS_PER_NODE=3
SLURM_NPROCS=3
Expand Down

0 comments on commit f6f80a5

Please sign in to comment.