Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch Logging To Zerolog #4136

Merged
merged 68 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
746a7bb
update pulsar mock
d80tb7 Jan 2, 2025
29cc41b
Merge branch 'master' of github.com:armadaproject/armada into f/chris…
d80tb7 Jan 3, 2025
64355fe
go mod tidy
d80tb7 Jan 3, 2025
2bc7d8d
import order
d80tb7 Jan 3, 2025
0fc3e7f
wip
d80tb7 Jan 3, 2025
93ac227
wip
d80tb7 Jan 3, 2025
7999ecc
wip
d80tb7 Jan 3, 2025
3b7dca8
Merge branch 'master' of github.com:armadaproject/armada into f/chris…
d80tb7 Jan 3, 2025
f98b23c
supress logging
d80tb7 Jan 3, 2025
60c04fb
lint
d80tb7 Jan 3, 2025
4be8a0a
lint
d80tb7 Jan 4, 2025
32c79c8
lint
d80tb7 Jan 4, 2025
e1ced01
wip
d80tb7 Jan 4, 2025
8c184b0
wip
d80tb7 Jan 4, 2025
da74217
wip
d80tb7 Jan 4, 2025
e249f2d
wip
d80tb7 Jan 4, 2025
d490b0b
wip
d80tb7 Jan 4, 2025
1ccd7b1
wip
d80tb7 Jan 4, 2025
4e83b61
wip
d80tb7 Jan 4, 2025
94de1dd
remove logrus
d80tb7 Jan 4, 2025
4e5450d
remove logrus
d80tb7 Jan 4, 2025
59d3e9b
fix test
d80tb7 Jan 4, 2025
b660df0
fix armadactl
d80tb7 Jan 4, 2025
42baeae
clean up interfaces
d80tb7 Jan 4, 2025
b882e6a
clean up interfaces
d80tb7 Jan 4, 2025
67d3db0
Merge branch 'master' of github.com:armadaproject/armada into f/chris…
d80tb7 Jan 5, 2025
b708709
wip
d80tb7 Jan 5, 2025
4c918e0
merged
d80tb7 Jan 5, 2025
05f1750
added back json logging
d80tb7 Jan 5, 2025
edd9734
lint
d80tb7 Jan 5, 2025
3d45d6c
fix test
d80tb7 Jan 5, 2025
dd94cb0
add back log tests
d80tb7 Jan 5, 2025
94ad206
wip
d80tb7 Jan 6, 2025
9f10d8e
wip
d80tb7 Jan 6, 2025
1fc7ead
wip
d80tb7 Jan 6, 2025
04ebd1a
lint
d80tb7 Jan 6, 2025
951ab70
fix skip frames
d80tb7 Jan 7, 2025
10f4813
fixed milli time
d80tb7 Jan 7, 2025
faf182e
formatting
d80tb7 Jan 7, 2025
ba90268
formatting
d80tb7 Jan 8, 2025
498789b
more tests
d80tb7 Jan 8, 2025
f0ab4e7
lint
d80tb7 Jan 8, 2025
ed5fca5
merged master
d80tb7 Jan 8, 2025
7901e4b
colorful
d80tb7 Jan 8, 2025
a834009
fixes
d80tb7 Jan 8, 2025
e8cb59a
fixes
d80tb7 Jan 12, 2025
6e950b9
fixes
d80tb7 Jan 12, 2025
cb8891f
fixes
d80tb7 Jan 12, 2025
9800c79
wip
d80tb7 Jan 12, 2025
0fe5017
lint
d80tb7 Jan 12, 2025
34b97dc
lint
d80tb7 Jan 12, 2025
7273f2e
wip
d80tb7 Jan 12, 2025
fc8b26e
wip
d80tb7 Jan 12, 2025
314e5a5
update docker
d80tb7 Jan 13, 2025
d89d12f
wip
d80tb7 Jan 13, 2025
bda5f33
add prometheus hook
d80tb7 Jan 14, 2025
5833afb
add prometheus hook
d80tb7 Jan 14, 2025
69328c5
add prometheus hook
d80tb7 Jan 14, 2025
3c45419
go mod tidy
d80tb7 Jan 14, 2025
f85c39e
unit test for prometheus
d80tb7 Jan 14, 2025
44ad8a0
Merge branch 'master' of github.com:armadaproject/armada into f/chris…
d80tb7 Jan 14, 2025
1d6f9f5
go mod tidy
d80tb7 Jan 14, 2025
3b985c7
fix
d80tb7 Jan 14, 2025
fd4b712
fix
d80tb7 Jan 14, 2025
ab21792
fix
d80tb7 Jan 14, 2025
c7833da
Merge branch 'master' of github.com:armadaproject/armada into f/chris…
d80tb7 Jan 16, 2025
715c832
merge master
d80tb7 Jan 18, 2025
d2f4beb
Merge branch 'master' into f/chrisma/zerlog-minimal
robertdavidsmith Jan 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip
  • Loading branch information
d80tb7 committed Jan 4, 2025
commit da74217b987efc8e8aaf44dc1ceda68afa29defb
2 changes: 1 addition & 1 deletion internal/common/etcdhealth/etcdhealth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestEtcdReplicaHealthMonitor(t *testing.T) {
ctx, cancel := armadacontext.WithCancel(armadacontext.Background())
defer cancel()
g, ctx := armadacontext.ErrGroup(ctx)
g.Go(func() error { return hm.Run(ctx, logging.NewLogger()) })
g.Go(func() error { return hm.Run(ctx, logging.StdLogger()) })

// Should still be unavailable due to missing metrics.
hm.BlockUntilNextMetricsCollection(ctx)
Expand Down
2 changes: 1 addition & 1 deletion internal/common/ingest/controlplaneevents/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
commonmetrics "github.com/armadaproject/armada/internal/common/ingest/metrics"
"github.com/armadaproject/armada/internal/common/ingest/utils"
log "github.com/armadaproject/armada/internal/common/logging"

"github.com/armadaproject/armada/pkg/controlplaneevents"
)

func MessageUnmarshaller(msg pulsar.ConsumerMessage, metrics *commonmetrics.Metrics) *utils.EventsWithIds[*controlplaneevents.Event] {
Expand Down
3 changes: 1 addition & 2 deletions internal/eventingester/store/eventstore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"github.com/armadaproject/armada/internal/eventingester/model"
"regexp"
"time"

Expand All @@ -10,9 +11,7 @@ import (
"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/ingest"
log "github.com/armadaproject/armada/internal/common/logging"
"github
"github.com/armadaproject/armada/internal/eventingester/configuration"
log "github.com/armadaproject/armada/internal/common/logging"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion internal/executor/service/cluster_allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (allocationService *ClusterAllocationService) AllocateSpareClusterCapacity(
// If a health monitor is provided, avoid leasing jobs when the cluster is unhealthy.
if allocationService.clusterHealthMonitor != nil {
if ok, reason, err := allocationService.clusterHealthMonitor.IsHealthy(); err != nil {
logging.WithStacktrace(logging.NewLogger(), err).Error("failed to check cluster health")
logging.WithStacktrace(err).Error("failed to check cluster health")
return
} else if !ok {
log.Warnf("cluster is not healthy; will not request more jobs: %s", reason)
Expand Down
2 changes: 1 addition & 1 deletion internal/lookoutv2/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Serve(configuration configuration.LookoutV2Config) error {
// create new service API
api := operations.NewLookoutAPI(swaggerSpec)

logger := logging.NewLogger()
logger := logging.StdLogger()

api.Logger = logger.Debugf

Expand Down
3 changes: 1 addition & 2 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/auth"
"github.com/armadaproject/armada/internal/common/compress"
"github.com/armadaproject/armada/internal/common/logging"
log "github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/common/pulsarutils"
Expand Down Expand Up @@ -371,7 +370,7 @@ func (srv *ExecutorApi) executorFromLeaseRequest(ctx *armadacontext.Context, req
now := srv.clock.Now().UTC()
for _, nodeInfo := range req.Nodes {
if node, err := executorapi.NewNodeFromNodeInfo(nodeInfo, req.ExecutorId, srv.allowedPriorities, now); err != nil {
logging.WithStacktrace(ctx, err).Warnf(
ctx.Logger().WithStacktrace(err).Warnf(
"skipping node %s from executor %s", nodeInfo.GetName(), req.GetExecutorId(),
)
} else {
Expand Down
5 changes: 2 additions & 3 deletions internal/scheduler/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"k8s.io/utils/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/logging"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
commonmetrics "github.com/armadaproject/armada/internal/common/metrics"
"github.com/armadaproject/armada/internal/common/resource"
Expand Down Expand Up @@ -103,8 +102,8 @@ func (c *MetricsCollector) Run(ctx *armadacontext.Context) error {
case <-ticker.C():
err := c.refresh(ctx)
if err != nil {
logging.
WithStacktrace(ctx, err).
ctx.Logger().
WithStacktrace(err).
Warnf("error refreshing metrics state")
}
}
Expand Down
17 changes: 10 additions & 7 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"k8s.io/utils/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/logging"
protoutil "github.com/armadaproject/armada/internal/common/proto"
"github.com/armadaproject/armada/internal/scheduler/database"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
Expand Down Expand Up @@ -153,7 +152,7 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error {
syncContext, cancel := armadacontext.WithTimeout(ctx, 5*time.Minute)
err := s.ensureDbUpToDate(syncContext, 1*time.Second)
if err != nil {
logging.WithStacktrace(ctx, err).Error("could not become leader")
ctx.Logger().WithStacktrace(err).Error("could not become leader")
leaderToken = leader.InvalidLeaderToken()
} else {
fullUpdate = true
Expand All @@ -175,7 +174,7 @@ func (s *Scheduler) Run(ctx *armadacontext.Context) error {

result, err := s.cycle(ctx, fullUpdate, leaderToken, shouldSchedule)
if err != nil {
logging.WithStacktrace(ctx, err).Error("scheduling cycle failure")
ctx.Logger().WithStacktrace(err).Error("scheduling cycle failure")
leaderToken = leader.InvalidLeaderToken()
}

Expand Down Expand Up @@ -981,7 +980,9 @@ func (s *Scheduler) initialise(ctx *armadacontext.Context) error {
return nil
default:
if _, _, err := s.syncState(ctx, true); err != nil {
logging.WithStacktrace(ctx, err).Error("failed to initialise; trying again in 1 second")
ctx.Logger().
WithStacktrace(err).
Error("failed to initialise; trying again in 1 second")
time.Sleep(1 * time.Second)
} else {
ctx.Info("initialisation succeeded")
Expand All @@ -1008,7 +1009,9 @@ func (s *Scheduler) ensureDbUpToDate(ctx *armadacontext.Context, pollInterval ti
default:
numSent, err = s.publisher.PublishMarkers(ctx, groupId)
if err != nil {
logging.WithStacktrace(ctx, err).Error("Error sending marker messages to pulsar")
ctx.Logger().
WithStacktrace(err).
Error("Error sending marker messages to pulsar")
s.clock.Sleep(pollInterval)
} else {
messagesSent = true
Expand All @@ -1024,8 +1027,8 @@ func (s *Scheduler) ensureDbUpToDate(ctx *armadacontext.Context, pollInterval ti
default:
numReceived, err := s.jobRepository.CountReceivedPartitions(ctx, groupId)
if err != nil {
logging.
WithStacktrace(ctx, err).
ctx.Logger().
WithStacktrace(err).
Error("Error querying the database or marker messages")
}
if numSent == numReceived {
Expand Down
5 changes: 2 additions & 3 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
dbcommon "github.com/armadaproject/armada/internal/common/database"
grpcCommon "github.com/armadaproject/armada/internal/common/grpc"
"github.com/armadaproject/armada/internal/common/health"
"github.com/armadaproject/armada/internal/common/logging"
log "github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/profiling"
"github.com/armadaproject/armada/internal/common/pulsarutils"
Expand Down Expand Up @@ -118,8 +117,8 @@ func Run(config schedulerconfig.Configuration) error {
defer func() {
err := conn.Close()
if err != nil {
logging.
WithStacktrace(ctx, err).
ctx.Logger().
WithStacktrace(err).
Warnf("Armada api client didn't close down cleanly")
}
}()
Expand Down
20 changes: 10 additions & 10 deletions internal/scheduler/scheduling/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
scheduledJobsById := make(map[string]*schedulercontext.JobSchedulingContext)

// Evict preemptible jobs.
ctx.WithField("stage", "scheduling-algo").Infof("Evicting preemptible jobs")
ctx.Logger().WithField("stage", "scheduling-algo").Infof("Evicting preemptible jobs")
evictorResult, inMemoryJobRepo, err := sch.evict(
armadacontext.WithLogField(ctx, "stage", "evict for resource balancing"),
NewNodeEvictor(
Expand Down Expand Up @@ -144,14 +144,14 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
if err != nil {
return nil, err
}
ctx.WithField("stage", "scheduling-algo").Info("Finished evicting preemptible jobs")
ctx.Logger().WithField("stage", "scheduling-algo").Info("Finished evicting preemptible jobs")
for _, jctx := range evictorResult.EvictedJctxsByJobId {
preemptedJobsById[jctx.JobId] = jctx
}
maps.Copy(sch.nodeIdByJobId, evictorResult.NodeIdByJobId)

// Re-schedule evicted jobs/schedule new jobs.
ctx.WithField("stage", "scheduling-algo").Info("Performing initial scheduling of jobs onto nodes")
ctx.Logger().WithField("stage", "scheduling-algo").Info("Performing initial scheduling of jobs onto nodes")
schedulerResult, err := sch.schedule(
armadacontext.WithLogField(ctx, "stage", "re-schedule after balancing eviction"),
inMemoryJobRepo,
Expand All @@ -162,7 +162,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
if err != nil {
return nil, err
}
ctx.WithField("stage", "scheduling-algo").Info("Finished initial scheduling of jobs onto nodes")
ctx.Logger().WithField("stage", "scheduling-algo").Info("Finished initial scheduling of jobs onto nodes")
for _, jctx := range schedulerResult.ScheduledJobs {
if _, ok := preemptedJobsById[jctx.JobId]; ok {
delete(preemptedJobsById, jctx.JobId)
Expand All @@ -173,7 +173,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
maps.Copy(sch.nodeIdByJobId, schedulerResult.NodeIdByJobId)

// Evict jobs on oversubscribed nodes.
ctx.WithField("stage", "scheduling-algo").Info("Evicting jobs from oversubscribed nodes")
ctx.Logger().WithField("stage", "scheduling-algo").Info("Evicting jobs from oversubscribed nodes")
reevictResult, inMemoryJobRepo, err := sch.evict(
armadacontext.WithLogField(ctx, "stage", "evict oversubscribed"),
NewOversubscribedEvictor(
Expand All @@ -185,7 +185,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
if err != nil {
return nil, err
}
ctx.WithField("stage", "scheduling-algo").Info("Finished evicting jobs from oversubscribed nodes")
ctx.Logger().WithField("stage", "scheduling-algo").Info("Finished evicting jobs from oversubscribed nodes")
scheduledAndEvictedJobsById := armadamaps.FilterKeys(
scheduledJobsById,
func(jobId string) bool {
Expand All @@ -205,7 +205,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
// Re-schedule evicted jobs/schedule new jobs.
// Only necessary if a non-zero number of jobs were evicted.
if len(reevictResult.EvictedJctxsByJobId) > 0 {
ctx.WithField("stage", "scheduling-algo").Info("Performing second scheduling ")
ctx.Logger().WithField("stage", "scheduling-algo").Info("Performing second scheduling ")
rescheduleSchedulerResult, rescheduleErr := sch.schedule(
armadacontext.WithLogField(ctx, "stage", "schedule after oversubscribed eviction"),
inMemoryJobRepo,
Expand All @@ -217,7 +217,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
if rescheduleErr != nil {
return nil, rescheduleErr
}
ctx.WithField("stage", "scheduling-algo").Info("Finished second scheduling pass")
ctx.Logger().WithField("stage", "scheduling-algo").Info("Finished second scheduling pass")
for _, jctx := range rescheduleSchedulerResult.ScheduledJobs {
if _, ok := preemptedJobsById[jctx.JobId]; ok {
delete(preemptedJobsById, jctx.JobId)
Expand All @@ -231,14 +231,14 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche

preemptedJobs := maps.Values(preemptedJobsById)
scheduledJobs := maps.Values(scheduledJobsById)
ctx.WithField("stage", "scheduling-algo").Infof("Unbinding %d preempted and %d evicted jobs", len(preemptedJobs), len(maps.Values(scheduledAndEvictedJobsById)))
ctx.Logger().WithField("stage", "scheduling-algo").Infof("Unbinding %d preempted and %d evicted jobs", len(preemptedJobs), len(maps.Values(scheduledAndEvictedJobsById)))
if err := sch.unbindJobs(append(
slices.Clone(preemptedJobs),
maps.Values(scheduledAndEvictedJobsById)...),
); err != nil {
return nil, err
}
ctx.WithField("stage", "scheduling-algo").Infof("Finished unbinding preempted and evicted jobs")
ctx.Logger().WithField("stage", "scheduling-algo").Infof("Finished unbinding preempted and evicted jobs")

PopulatePreemptionDescriptions(preemptedJobs, scheduledJobs)
schedulercontext.PrintJobSummary(ctx, "Preempting running jobs;", preemptedJobs)
Expand Down
13 changes: 6 additions & 7 deletions internal/scheduler/submitcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"k8s.io/utils/clock"

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/logging"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/database"
Expand Down Expand Up @@ -90,16 +89,16 @@ func (srv *SubmitChecker) Run(ctx *armadacontext.Context) error {
func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) {
queues, err := srv.queueCache.GetAll(ctx)
if err != nil {
logging.
WithStacktrace(ctx, err).
ctx.Logger().
WithStacktrace(err).
Error("Error fetching queues")
return
}

executors, err := srv.executorRepository.GetExecutors(ctx)
if err != nil {
logging.
WithStacktrace(ctx, err).
ctx.Logger().
WithStacktrace(err).
Error("Error fetching executors")
return
}
Expand Down Expand Up @@ -140,8 +139,8 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) {
nodeDb: nodeDb,
}
} else {
logging.
WithStacktrace(ctx, err).
ctx.Logger().
WithStacktrace(err).
Warnf("Error constructing nodedb for executor: %s", ex.Id)
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduleringester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
controlplaneevents_ingest_utils "github.com/armadaproject/armada/internal/common/ingest/controlplaneevents"
"github.com/armadaproject/armada/internal/common/ingest/jobsetevents"
"github.com/armadaproject/armada/internal/common/ingest/metrics"
log "github.com/armadaproject/armada/internal/common/loggin
log "github.com/armadaproject/armada/internal/common/logging"
"github.com/armadaproject/armada/internal/common/profiling"
"github.com/armadaproject/armada/pkg/armadaevents"
"github.com/armadaproject/armada/pkg/controlplaneevents"
Expand Down
Loading