Skip to content

Commit

Permalink
Use logrus for structured logging (kubeflow#416)
Browse files Browse the repository at this point in the history
* Move from glog to sirupsen/logrus for logging
* Add a new flag json_log_format
* Refactor all log statements to use logrus
* Use Info level for log level V(1) and below
* Use Debug level for log level V(2) and above
* Tested locally

Addresses kubeflow#24

Sample logs
```
{"filename":"app/server.go:54","level":"info","msg":"EnvKubeflowNamespace not set, use default namespace","time":"2018-02-27T18:25:18-08:00"}
{"filename":"app/server.go:59","level":"info","msg":"[Version: 0.3.0+git Git SHA: Not provided. Go Version: go1.9.3 Go OS/Arch: darwin/amd64]","time":"2018-02-27T18:25:18-08:00"}
{"filename":"app/server.go:145","level":"info","msg":"No controller_config_file provided; using empty config.","time":"2018-02-27T18:25:18-08:00"}
{"filename":"controller/controller.go:110","level":"info","msg":"Setting up event handlers","time":"2018-02-27T18:25:18-08:00"}
```
  • Loading branch information
Ankush Agarwal authored and Jiayu Liu committed Mar 7, 2018
1 parent 22ea9bd commit 45c8e90
Show file tree
Hide file tree
Showing 55 changed files with 4,273 additions and 59 deletions.
2 changes: 2 additions & 0 deletions cmd/tf-operator/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ServerOption struct {
ControllerConfigFile string
PrintVersion bool
GCInterval time.Duration
JsonLogFormat bool
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -40,4 +41,5 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) {
fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit")
fs.DurationVar(&s.GCInterval, "gc-interval", 10*time.Minute, "GC interval")
fs.StringVar(&s.ControllerConfigFile, "controller-config-file", "", "Path to file containing the controller config.")
fs.BoolVar(&s.JsonLogFormat, "json-log-format", true, "Set true to use json style log format. Set false to use plaintext style log format")
}
29 changes: 15 additions & 14 deletions cmd/tf-operator/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"time"

"github.com/ghodss/yaml"
"github.com/golang/glog"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -49,19 +49,20 @@ var (
)

func Run(opt *options.ServerOption) error {

// Check if the -version flag was passed and, if so, print the version and exit.
if opt.PrintVersion {
version.PrintVersionAndExit()
}

namespace := os.Getenv(util.EnvKubeflowNamespace)
if len(namespace) == 0 {
glog.Infof("EnvKubeflowNamespace not set, use default namespace")
log.Infof("EnvKubeflowNamespace not set, use default namespace")
namespace = metav1.NamespaceDefault
}

// To help debugging, immediately log version
glog.Infof("%+v", version.Info())

// Check if the -version flag was passed and, if so, print the version and exit.
if opt.PrintVersion {
version.PrintVersionAndExit()
}
log.Infof("%+v", version.Info())

config, err := k8sutil.GetClusterConfig()
if err != nil {
Expand Down Expand Up @@ -119,7 +120,7 @@ func Run(opt *options.ServerOption) error {
Callbacks: election.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leader election lost")
log.Fatalf("leader election lost")
},
},
})
Expand All @@ -130,19 +131,19 @@ func Run(opt *options.ServerOption) error {
func readControllerConfig(controllerConfigFile string) *v1alpha1.ControllerConfig {
controllerConfig := &v1alpha1.ControllerConfig{}
if controllerConfigFile != "" {
glog.Infof("Loading controller config from %v.", controllerConfigFile)
log.Infof("Loading controller config from %v.", controllerConfigFile)
data, err := ioutil.ReadFile(controllerConfigFile)
if err != nil {
glog.Fatalf("Could not read file: %v. Error: %v", controllerConfigFile, err)
log.Fatalf("Could not read file: %v. Error: %v", controllerConfigFile, err)
return controllerConfig
}
err = yaml.Unmarshal(data, controllerConfig)
if err != nil {
glog.Fatalf("Could not parse controller config; Error: %v\n", err)
log.Fatalf("Could not parse controller config; Error: %v\n", err)
}
glog.Infof("ControllerConfig: %v", util.Pformat(controllerConfig))
log.Infof("ControllerConfig: %v", util.Pformat(controllerConfig))
} else {
glog.Info("No controller_config_file provided; using empty config.")
log.Info("No controller_config_file provided; using empty config.")
}
return controllerConfig
}
Expand Down
21 changes: 16 additions & 5 deletions cmd/tf-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,22 +16,33 @@ package main

import (
"flag"
"fmt"
"os"

"github.com/kubeflow/tf-operator/cmd/tf-operator/app"
"github.com/kubeflow/tf-operator/cmd/tf-operator/app/options"
"github.com/onrik/logrus/filename"
log "github.com/sirupsen/logrus"
)

func init() {
// Add filename as one of the fields of the structured log message
filenameHook := filename.NewHook()
filenameHook.Field = "filename"
log.AddHook(filenameHook)
}

func main() {
s := options.NewServerOption()
s.AddFlags(flag.CommandLine)

flag.Parse()

if s.JsonLogFormat {
// Output logs in a json format so that it can be parsed by services like Stackdriver
log.SetFormatter(&log.JSONFormatter{})
}

if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
log.Fatalf("%v\n", err)
}

}
12 changes: 9 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ import:
version: kubernetes-1.8.5
- package: github.com/hashicorp/golang-lru
version: a0d98a5f288019575c6d1f4bb1573fef2d1fcdc4

- package: github.com/onrik/logrus/filename
version: 6a64e23a4923a8d0d4db2806dcf3e55af1e48f61
- package: github.com/sirupsen/logrus
version: v1.0.4
4 changes: 2 additions & 2 deletions pkg/client/clientset/versioned/clientset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package versioned

import (
glog "github.com/golang/glog"
kubeflowv1alpha1 "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/typed/kubeflow/v1alpha1"
log "github.com/sirupsen/logrus"
discovery "k8s.io/client-go/discovery"
rest "k8s.io/client-go/rest"
flowcontrol "k8s.io/client-go/util/flowcontrol"
Expand Down Expand Up @@ -72,7 +72,7 @@ func NewForConfig(c *rest.Config) (*Clientset, error) {

cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)
if err != nil {
glog.Errorf("failed to create the DiscoveryClient: %v", err)
log.Errorf("failed to create the DiscoveryClient: %v", err)
return nil, err
}
return &cs, nil
Expand Down
24 changes: 12 additions & 12 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"fmt"
"time"

"github.com/golang/glog"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -90,9 +90,9 @@ func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Inter
tfJobInformer := tfJobInformerFactory.Kubeflow().V1alpha1().TFJobs()

kubeflowscheme.AddToScheme(scheme.Scheme)
glog.V(4).Info("Creating event broadcaster")
log.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartLogging(log.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})

Expand All @@ -107,14 +107,14 @@ func New(kubeClient kubernetes.Interface, APIExtclient apiextensionsclient.Inter
config: config,
}

glog.Info("Setting up event handlers")
log.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change
tfJobInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *tfv1alpha1.TFJob:
glog.V(4).Infof("filter tfjob name: %v", t.Name)
log.Debugf("filter tfjob name: %v", t.Name)
return true
default:
return false
Expand Down Expand Up @@ -145,23 +145,23 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer c.WorkQueue.ShutDown()

// Start the informer factories to begin populating the informer caches
glog.Info("Starting TFJob controller")
log.Info("Starting TFJob controller")

// Wait for the caches to be synced before starting workers
glog.Info("Waiting for informer caches to sync")
log.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.TFJobSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

glog.Infof("Starting %v workers", threadiness)
log.Infof("Starting %v workers", threadiness)
// Launch workers to process TFJob resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

glog.Info("Started workers")
log.Info("Started workers")
<-stopCh
glog.Info("Shutting down workers")
log.Info("Shutting down workers")

return nil
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func (c *Controller) processNextWorkItem() bool {
func (c *Controller) syncTFJob(key string) (bool, error) {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
log.Debugf("Finished syncing job %q (%v)", key, time.Since(startTime))
}()

ns, name, err := cache.SplitMetaNamespaceKey(key)
Expand All @@ -220,7 +220,7 @@ func (c *Controller) syncTFJob(key string) (bool, error) {

if err != nil {
if apierrors.IsNotFound(err) {
glog.V(4).Infof("Job has been deleted: %v", key)
log.Debugf("Job has been deleted: %v", key)
return true, nil
}
return false, err
Expand Down
14 changes: 7 additions & 7 deletions pkg/trainer/replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"fmt"
"strings"

log "github.com/golang/glog"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -250,7 +250,7 @@ func (s *TFReplicaSet) Delete() error {
LabelSelector: selector,
}

log.V(1).Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
log.Infof("Deleting Jobs namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.BatchV1().Jobs(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)

if err != nil {
Expand All @@ -259,7 +259,7 @@ func (s *TFReplicaSet) Delete() error {
}

// We need to delete the completed pods.
log.V(1).Infof("Deleting Pods namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
log.Infof("Deleting Pods namespace=%v selector=%v", s.Job.job.ObjectMeta.Namespace, selector)
err = s.ClientSet.CoreV1().Pods(s.Job.job.ObjectMeta.Namespace).DeleteCollection(&meta_v1.DeleteOptions{}, options)

if err != nil {
Expand All @@ -270,7 +270,7 @@ func (s *TFReplicaSet) Delete() error {
// Services doesn't support DeleteCollection so we delete them individually.
// TODO(jlewi): We should check if this has changed with K8s 1.8 or other releases.
for index := int32(0); index < *s.Spec.Replicas; index++ {
log.V(1).Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.jobName((index)))
log.Infof("Deleting Service %v:%v", s.Job.job.ObjectMeta.Namespace, s.jobName((index)))
err = s.ClientSet.CoreV1().Services(s.Job.job.ObjectMeta.Namespace).Delete(s.jobName(index), &meta_v1.DeleteOptions{})

if err != nil {
Expand All @@ -280,15 +280,15 @@ func (s *TFReplicaSet) Delete() error {
}

// If the ConfigMap for the default parameter server exists, we delete it
log.V(1).Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
log.Infof("Get ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
_, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{})
if err != nil {
if !k8sutil.IsKubernetesResourceNotFoundError(err) {
log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err)
failures = true
}
} else {
log.V(1).Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
log.Infof("Delete ConfigMaps %v:%v", s.Job.job.ObjectMeta.Namespace, s.defaultPSConfigMapName())
err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{})
if err != nil {
log.Errorf("There was a problem deleting the ConfigMaps; %v", err)
Expand All @@ -304,7 +304,7 @@ func (s *TFReplicaSet) Delete() error {

// replicaStatusFromPodList returns a status from a list of pods for a job.
func replicaStatusFromPodList(l v1.PodList, name tfv1alpha1.ContainerName) tfv1alpha1.ReplicaState {
log.V(1).Infof("Get replicaStatus from PodList: %v", util.Pformat(l))
log.Infof("Get replicaStatus from PodList: %v", util.Pformat(l))
var latest *v1.Pod
for _, i := range l.Items {
if latest == nil {
Expand Down
7 changes: 3 additions & 4 deletions pkg/trainer/training.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ package trainer
import (
"fmt"

"reflect"

log "github.com/golang/glog"
"github.com/kubeflow/tf-operator/pkg/util"
log "github.com/sirupsen/logrus"
"reflect"

"strings"

Expand Down Expand Up @@ -367,7 +366,7 @@ func (j *TrainingJob) Reconcile(config *tfv1alpha1.ControllerConfig) error {
j.status.Phase = tfv1alpha1.TFJobPhaseDone
j.status.State = tfv1alpha1.StateSucceeded
} else {
log.V(1).Infof("Job %v status=%v", j.job.ObjectMeta.Name, util.Pformat(j.status))
log.Infof("Job %v status=%v", j.job.ObjectMeta.Name, util.Pformat(j.status))
}
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
package k8sutil

import (
"net"
"os"

log "github.com/golang/glog"
tfv1alpha1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha1"
log "github.com/sirupsen/logrus"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -28,6 +25,8 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // for gcp auth
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"net"
"os"
)

const RecommendedConfigPathEnvVar = "KUBECONFIG"
Expand Down
Loading

0 comments on commit 45c8e90

Please sign in to comment.