Skip to content

Commit

Permalink
Refactor kubelet, standalone k8s and integration test to all use the …
Browse files Browse the repository at this point in the history
…same code.
  • Loading branch information
brendandburns committed Dec 1, 2014
1 parent ff1e9f4 commit d47b510
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 238 deletions.
24 changes: 4 additions & 20 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"net"
"net/http"
"net/http/httptest"
"os"
"reflect"
"runtime"
"strconv"
Expand All @@ -40,12 +39,11 @@ import (
minionControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
"github.com/GoogleCloudPlatform/kubernetes/pkg/standalone"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"
Expand All @@ -56,6 +54,7 @@ import (
)

const testRootDir = "/tmp/kubelet"
const testRootDir2 = "/tmp/kubelet2"

var (
fakeDocker1, fakeDocker2 dockertools.FakeDockerClient
Expand Down Expand Up @@ -187,26 +186,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
minionController.Run(10 * time.Second)

// Kubelet (localhost)
os.MkdirAll(testRootDir, 0750)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, cfg1.Channel("etcd"))
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], testRootDir, &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), net.ParseIP("127.0.0.1"), 10250, true)
}, 0)

standalone.SimpleRunKubelet(etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, cfg2.Channel("etcd"))
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], testRootDir, &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), net.ParseIP("127.0.0.1"), 10251, true)
}, 0)
standalone.SimpleRunKubelet(etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)

return apiServer.URL
}
Expand Down
237 changes: 37 additions & 200 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,15 @@ import (
"flag"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
"path"
"strings"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
"github.com/GoogleCloudPlatform/kubernetes/pkg/standalone"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
cadvisor "github.com/google/cadvisor/client"
)

const defaultRootDir = "/var/lib/kubelet"
Expand Down Expand Up @@ -82,52 +69,15 @@ func init() {
flag.Var(&apiServerList, "api_servers", "List of Kubernetes API servers to publish events to. (ip:port), comma separated.")
}

func getDockerEndpoint() string {
var endpoint string
if len(*dockerEndpoint) > 0 {
endpoint = *dockerEndpoint
} else if len(os.Getenv("DOCKER_HOST")) > 0 {
endpoint = os.Getenv("DOCKER_HOST")
} else {
endpoint = "unix:///var/run/docker.sock"
}
glog.Infof("Connecting to docker on %s", endpoint)

return endpoint
}

func getHostname() string {
hostname := []byte(*hostnameOverride)
if string(hostname) == "" {
// Note: We use exec here instead of os.Hostname() because we
// want the FQDN, and this is the easiest way to get it.
fqdn, err := exec.Command("hostname", "-f").Output()
if err != nil {
glog.Fatalf("Couldn't determine hostname: %v", err)
func setupRunOnce() {
if *runonce {
if len(etcdServerList) > 0 {
glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive")
}
if *enableServer {
glog.Infof("--runonce is set, disabling server")
*enableServer = false
}
hostname = fqdn
}
return strings.TrimSpace(string(hostname))
}

func getApiserverClient() (*client.Client, error) {
authInfo, err := clientauth.LoadFromFile(*authPath)
if err != nil {
return nil, err
}
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
if err != nil {
return nil, err
}
// TODO: adapt Kube client to support LB over several servers
if len(apiServerList) > 1 {
glog.Infof("Mulitple api servers specified. Picking first one")
}
clientConfig.Host = apiServerList[0]
if c, err := client.New(&clientConfig); err != nil {
return nil, err
} else {
return c, nil
}
}

Expand All @@ -139,147 +89,34 @@ func main() {

verflag.PrintAndExitIfRequested()

if *runonce {
exclusiveFlag := "invalid option: --runonce and %s are mutually exclusive"
if len(etcdServerList) > 0 {
glog.Fatalf(exclusiveFlag, "--etcd_servers")
}
if *enableServer {
glog.Infof("--runonce is set, disabling server")
*enableServer = false
}
}

etcd.SetLogger(util.NewLogger("etcd "))

// Make an API client if possible.
if len(apiServerList) < 1 {
glog.Info("No api servers specified.")
} else {
if apiClient, err := getApiserverClient(); err != nil {
glog.Errorf("Unable to make apiserver client: %v", err)
} else {
// Send events to APIserver if there is a client.
glog.Infof("Sending events to APIserver.")
record.StartRecording(apiClient.Events(""), "kubelet")
}
}

// Log the events locally too.
record.StartLogging(glog.Infof)

capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: *allowPrivileged,
})

dockerClient, err := docker.NewClient(getDockerEndpoint())
if err != nil {
glog.Fatal("Couldn't connect to docker.")
}

hostname := getHostname()

if *rootDirectory == "" {
glog.Fatal("Invalid root directory path.")
}
*rootDirectory = path.Clean(*rootDirectory)
if err := os.MkdirAll(*rootDirectory, 0750); err != nil {
glog.Fatalf("Error creating root directory: %v", err)
}

// source of all configuration
cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates)

// define file config source
if *config != "" {
kconfig.NewSourceFile(*config, *fileCheckFrequency, cfg.Channel("file"))
}

// define url config source
if *manifestURL != "" {
kconfig.NewSourceURL(*manifestURL, *httpCheckFrequency, cfg.Channel("http"))
}

// define etcd config source and initialize etcd client
var etcdClient *etcd.Client
if len(etcdServerList) > 0 {
etcdClient = etcd.NewClient(etcdServerList)
} else if *etcdConfigFile != "" {
var err error
etcdClient, err = etcd.NewClientFromFile(*etcdConfigFile)
if err != nil {
glog.Fatalf("Error with etcd config file: %v", err)
}
}

if etcdClient != nil {
glog.Infof("Watching for etcd configs at %v", etcdClient.GetCluster())
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), etcdClient, cfg.Channel("etcd"))
}

// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations

k := kubelet.NewMainKubelet(
getHostname(),
dockerClient,
etcdClient,
*rootDirectory,
*networkContainerImage,
*syncFrequency,
float32(*registryPullQPS),
*registryBurst,
*minimumGCAge,
*maxContainerCount)

k.BirthCry()

go func() {
util.Forever(func() {
err := k.GarbageCollectContainers()
if err != nil {
glog.Errorf("Garbage collect failed: %v", err)
}
}, time.Minute*1)
}()

go func() {
defer util.HandleCrash()
// TODO: Monitor this connection, reconnect if needed?
glog.V(1).Infof("Trying to create cadvisor client.")
cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:4194")
if err != nil {
glog.Errorf("Error on creating cadvisor client: %v", err)
return
}
glog.V(1).Infof("Successfully created cadvisor client.")
k.SetCadvisorClient(cadvisorClient)
}()

// TODO: These should probably become more plugin-ish: register a factory func
// in each checker's init(), iterate those here.
health.AddHealthChecker(health.NewExecHealthChecker(k))
health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{}))
health.AddHealthChecker(&health.TCPHealthChecker{})

// process pods and exit.
if *runonce {
if _, err := k.RunOnce(cfg.Updates()); err != nil {
glog.Fatalf("--runonce failed: %v", err)
}
return
}

// start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)

// start the kubelet server
if *enableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), net.IP(address), *port, *enableDebuggingHandlers)
}, 0)
}

setupRunOnce()

kcfg := standalone.KubeletConfig{
Address: address,
AuthPath: *authPath,
ApiServerList: apiServerList,
AllowPrivileged: *allowPrivileged,
HostnameOverride: *hostnameOverride,
RootDirectory: *rootDirectory,
ConfigFile: *config,
ManifestURL: *manifestURL,
FileCheckFrequency: *fileCheckFrequency,
HttpCheckFrequency: *httpCheckFrequency,
NetworkContainerImage: *networkContainerImage,
SyncFrequency: *syncFrequency,
RegistryPullQPS: *registryPullQPS,
RegistryBurst: *registryBurst,
MinimumGCAge: *minimumGCAge,
MaxContainerCount: *maxContainerCount,
Runonce: *runonce,
Port: *port,
EnableServer: *enableServer,
EnableDebuggingHandlers: *enableDebuggingHandlers,
DockerClient: kubelet.ConnectToDockerOrDie(*dockerEndpoint),
EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile),
}

standalone.RunKubelet(&kcfg)
// runs forever
select {}
}
5 changes: 4 additions & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/standalone"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
Expand All @@ -50,7 +51,9 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string
standalone.RunApiServer(cl, etcdClient, addr, port)
standalone.RunScheduler(cl)
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)
standalone.RunKubelet(etcdClient, machineList[0], *dockerEndpoint)

dockerClient := kubelet.ConnectToDockerOrDie(*dockerEndpoint)
standalone.SimpleRunKubelet(etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
}

func newApiClient(addr string, port int) *client.Client {
Expand Down
Loading

0 comments on commit d47b510

Please sign in to comment.