Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[executor] Allow retrying fail pods that haven't had any containers start #4147

Merged
merged 10 commits into from
Jan 20, 2025
Prev Previous commit
Next Next commit
Split event reporter and state reporter
Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
  • Loading branch information
JamesMurkin committed Jan 15, 2025
commit 9e9a8f6112c88287aa842b3af51f60b1d9461f6a
27 changes: 14 additions & 13 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
@@ -203,17 +203,7 @@ func setupExecutorApiComponents(
os.Exit(-1)
}

eventReporter, stopReporter, err := reporter.NewJobEventReporter(
clusterContext,
jobRunState,
eventSender,
clock.RealClock{},
200,
nil)
if err != nil {
log.Errorf("Failed to create job event reporter: %s", err)
os.Exit(-1)
}
eventReporter, stopReporter := reporter.NewJobEventReporter(eventSender, clock.RealClock{}, 200)

submitter := job.NewSubmitter(
clusterContext,
@@ -242,7 +232,7 @@ func setupExecutorApiComponents(
submitter,
clusterHealthMonitor,
)
podIssueService, err := service.NewIssueHandler(
podIssueService, err := service.NewPodIssuerHandler(
jobRunState,
clusterContext,
eventReporter,
@@ -256,12 +246,23 @@ func setupExecutorApiComponents(
os.Exit(-1)
}

jobStateReporter, err := service.NewJobStateReporter(
clusterContext,
jobRunState,
eventReporter,
podIssueService,
)
if err != nil {
log.Errorf("Failed to create job state reporter: %s", err)
os.Exit(-1)
}

taskManager.Register(podIssueService.HandlePodIssues, config.Task.PodIssueHandlingInterval, "pod_issue_handling")
taskManager.Register(preemptRunProcessor.Run, config.Task.StateProcessorInterval, "preempt_runs")
taskManager.Register(removeRunProcessor.Run, config.Task.StateProcessorInterval, "remove_runs")
taskManager.Register(jobRequester.RequestJobsRuns, config.Task.JobLeaseRenewalInterval, "request_runs")
taskManager.Register(clusterAllocationService.AllocateSpareClusterCapacity, config.Task.AllocateSpareClusterCapacityInterval, "submit_runs")
taskManager.Register(eventReporter.ReportMissingJobEvents, config.Task.MissingJobEventReconciliationInterval, "event_reconciliation")
taskManager.Register(jobStateReporter.ReportMissingJobEvents, config.Task.MissingJobEventReconciliationInterval, "event_reconciliation")
_, err = pod_metrics.ExposeClusterContextMetrics(clusterContext, clusterUtilisationService, podUtilisationService, nodeInfoService)
if err != nil {
log.Errorf("Failed to setup cluster context metrics: %s", err)
224 changes: 6 additions & 218 deletions internal/executor/reporter/job_event_reporter.go
Original file line number Diff line number Diff line change
@@ -4,21 +4,17 @@ import (
"sync"
"time"

"github.com/armadaproject/armada/internal/executor/service"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"

clusterContext "github.com/armadaproject/armada/internal/executor/context"
domain2 "github.com/armadaproject/armada/internal/executor/domain"
"github.com/armadaproject/armada/internal/executor/job"
"github.com/armadaproject/armada/internal/executor/util"
)

type EventReporter interface {
Report(events []EventMessage) error
QueueEvent(event EventMessage, callback func(error))
HasPendingEvents(pod *v1.Pod) bool
}

type queuedEvent struct {
@@ -32,139 +28,34 @@ type JobEventReporter struct {
eventQueued map[string]uint8
eventQueuedMutex sync.Mutex

jobRunStateStore *job.JobRunStateStore
clusterContext clusterContext.ClusterContext
clock clock.WithTicker
maxBatchSize int
podIssueHandler *service.IssueHandler
clock clock.WithTicker
maxBatchSize int
}

func NewJobEventReporter(
clusterContext clusterContext.ClusterContext,
jobRunState *job.JobRunStateStore,
eventSender EventSender,
clock clock.WithTicker,
maxBatchSize int,
podIssueHandler *service.IssueHandler,
) (*JobEventReporter, chan bool, error) {
) (*JobEventReporter, chan bool) {
stop := make(chan bool)
reporter := &JobEventReporter{
eventSender: eventSender,
clusterContext: clusterContext,
jobRunStateStore: jobRunState,
eventBuffer: make(chan *queuedEvent, 1000000),
eventQueued: map[string]uint8{},
eventQueuedMutex: sync.Mutex{},
clock: clock,
maxBatchSize: maxBatchSize,
podIssueHandler: podIssueHandler,
}

_, err := clusterContext.AddPodEventHandler(reporter.podEventHandler())
if err != nil {
return nil, nil, err
}

go reporter.processEventQueue(stop)

return reporter, stop, nil
}

func (eventReporter *JobEventReporter) podEventHandler() cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod, ok := obj.(*v1.Pod)
if !ok {
log.Errorf("Failed to process pod event due to it being an unexpected type. Failed to process %+v", obj)
return
}
go eventReporter.reportCurrentStatus(pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*v1.Pod)
if !ok {
log.Errorf("Failed to process pod event due to it being an unexpected type. Failed to process %+v", oldObj)
return
}
newPod, ok := newObj.(*v1.Pod)
if !ok {
log.Errorf("Failed to process pod event due to it being an unexpected type. Failed to process %+v", newObj)
return
}
go eventReporter.reportStatusUpdate(oldPod, newPod)
},
}
return reporter, stop
}

func (eventReporter *JobEventReporter) Report(events []EventMessage) error {
return eventReporter.eventSender.SendEvents(events)
}

func (eventReporter *JobEventReporter) reportStatusUpdate(old *v1.Pod, new *v1.Pod) {
// Don't report status if the pod phase didn't change
if old.Status.Phase == new.Status.Phase {
return
}
// Don't report status change for pods Armada is deleting
// This prevents reporting JobFailed when we delete a pod - for example due to cancellation
if util.IsMarkedForDeletion(new) {
log.Infof("not sending event to report pod %s moving into phase %s as pod is marked for deletion", new.Name, new.Status.Phase)
return
}
eventReporter.reportCurrentStatus(new)
}

func (eventReporter *JobEventReporter) reportCurrentStatus(pod *v1.Pod) {
if !util.IsManagedPod(pod) {
return
}
if util.HasCurrentStateBeenReported(pod) {
return
}

event, err := CreateEventForCurrentState(pod, eventReporter.clusterContext.GetClusterId())
if pod.Status.Phase == v1.PodFailed {
hasIssue := eventReporter.podIssueHandler.HasIssue(util.ExtractJobRunId(pod))
if hasIssue {
// Pod already being handled by issue handler
return
}
issueAdded, err := eventReporter.podIssueHandler.DetectAndRegisterFailedPodIssue(pod)
if issueAdded {
// Pod already being handled by issue handler
return
}
if err != nil {
log.Errorf("Failed detecting issue on failed pod %s(%s) - %v", pod.Name, util.ExtractJobRunId(pod), err)
// Don't return here, as it is very important we don't block reporting a terminal event (failed)
}
}

if err != nil {
log.Errorf("Failed to report event: %v", err)
return
}

eventReporter.QueueEvent(EventMessage{Event: event, JobRunId: util.ExtractJobRunId(pod)}, func(err error) {
if err != nil {
log.Errorf("Failed to report event: %s", err)
return
}

if util.IsReportingPhaseRequired(pod.Status.Phase) {
err = eventReporter.addAnnotationToMarkStateReported(pod)
if err != nil {
log.Errorf("Failed to add state annotation %s to pod %s: %v", string(pod.Status.Phase), pod.Name, err)
return
}
}
})

if pod.Status.Phase == v1.PodRunning && requiresIngressToBeReported(pod) {
eventReporter.attemptToReportIngressInfoEvent(pod)
}
}

func (eventReporter *JobEventReporter) QueueEvent(event EventMessage, callback func(error)) {
eventReporter.eventQueuedMutex.Lock()
defer eventReporter.eventQueuedMutex.Unlock()
@@ -236,112 +127,9 @@ func queuedEventsToEventMessages(queuedEvents []*queuedEvent) []EventMessage {
return result
}

func (eventReporter *JobEventReporter) addAnnotationToMarkStateReported(pod *v1.Pod) error {
annotations := make(map[string]string)
annotationName := string(pod.Status.Phase)
annotations[annotationName] = time.Now().String()

return eventReporter.clusterContext.AddAnnotation(pod, annotations)
}

func (eventReporter *JobEventReporter) addAnnotationToMarkIngressReported(pod *v1.Pod) error {
annotations := make(map[string]string)
annotationName := domain2.IngressReported
annotations[annotationName] = time.Now().String()

return eventReporter.clusterContext.AddAnnotation(pod, annotations)
}

func (eventReporter *JobEventReporter) ReportMissingJobEvents() {
allBatchPods, err := eventReporter.clusterContext.GetActiveBatchPods()
if err != nil {
log.Errorf("Failed to reconcile missing job events: %v", err)
return
}
podsWithCurrentPhaseNotReported := filterPodsWithCurrentStateNotReported(allBatchPods)

for _, pod := range podsWithCurrentPhaseNotReported {
if util.IsReportingPhaseRequired(pod.Status.Phase) && !eventReporter.hasPendingEvents(pod) {
eventReporter.reportCurrentStatus(pod)
}
}

podWithIngressNotReported := util.FilterPods(allBatchPods, func(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodRunning &&
requiresIngressToBeReported(pod) &&
util.HasPodBeenInStateForLongerThanGivenDuration(pod, 15*time.Second)
})

for _, pod := range podWithIngressNotReported {
if !eventReporter.hasPendingEvents(pod) {
eventReporter.attemptToReportIngressInfoEvent(pod)
}
}
}

func (eventReporter *JobEventReporter) attemptToReportIngressInfoEvent(pod *v1.Pod) {
expectedNumberOfServices := util.GetExpectedNumberOfAssociatedServices(pod)
expectedNumberOfIngresses := util.GetExpectedNumberOfAssociatedIngresses(pod)
associatedServices, err := eventReporter.clusterContext.GetServices(pod)
if err != nil {
log.Errorf("Failed to report event JobIngressInfoEvent for pod %s: %v", pod.Name, err)
return
}
associatedIngresses, err := eventReporter.clusterContext.GetIngresses(pod)
if err != nil {
log.Errorf("Failed to report event JobIngressInfoEvent for pod %s: %v", pod.Name, err)
return
}
if len(associatedServices) != expectedNumberOfServices || len(associatedIngresses) != expectedNumberOfIngresses {
log.Warnf("Not reporting JobIngressInfoEvent for pod %s because not all expected associated services "+
"(current %d, expected %d) or ingresses (current %d, expected %d) exist yet",
pod.Name, len(associatedServices), expectedNumberOfServices, len(associatedIngresses), expectedNumberOfIngresses)
// Don't report ingress info until all expected ingresses exist
return
}

ingressInfoEvent, err := CreateJobIngressInfoEvent(pod, eventReporter.clusterContext.GetClusterId(), associatedServices, associatedIngresses)
if err != nil {
log.Errorf("Failed to report event JobIngressInfoEvent for pod %s: %v", pod.Name, err)
return
}
eventReporter.QueueEvent(EventMessage{Event: ingressInfoEvent, JobRunId: util.ExtractJobRunId(pod)}, func(err error) {
if err != nil {
log.Errorf("Failed to report event JobIngressInfoEvent for pod %s: %v", pod.Name, err)
return
}

err = eventReporter.addAnnotationToMarkIngressReported(pod)
if err != nil {
log.Errorf("Failed to add ingress reported annotation %s to pod %s: %v", string(pod.Status.Phase), pod.Name, err)
return
}
})
}

func requiresIngressToBeReported(pod *v1.Pod) bool {
if !util.HasIngress(pod) {
return false
}
if _, exists := pod.Annotations[domain2.IngressReported]; exists {
return false
}
return true
}

func (eventReporter *JobEventReporter) hasPendingEvents(pod *v1.Pod) bool {
func (eventReporter *JobEventReporter) HasPendingEvents(pod *v1.Pod) bool {
eventReporter.eventQueuedMutex.Lock()
defer eventReporter.eventQueuedMutex.Unlock()
id := util.ExtractJobRunId(pod)
return eventReporter.eventQueued[id] > 0
}

func filterPodsWithCurrentStateNotReported(pods []*v1.Pod) []*v1.Pod {
podsWithMissingEvent := make([]*v1.Pod, 0)
for _, pod := range pods {
if !util.HasCurrentStateBeenReported(pod) && util.HasPodBeenInStateForLongerThanGivenDuration(pod, 30*time.Second) {
podsWithMissingEvent = append(podsWithMissingEvent, pod)
}
}
return podsWithMissingEvent
}
Loading
Loading