Skip to content

Commit

Permalink
Clean up how client is passed to Kubelet in preparation for reading pods
Browse files Browse the repository at this point in the history
Also fixes how Kubelet server looks up pods by name when there are multiple
sources.
  • Loading branch information
smarterclayton committed Jan 7, 2015
1 parent 880ecef commit ba53d72
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 100 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Session.vim

# Go test binaries
*.test
/hack/.test-cmd-auth

# Mercurial files
**/.hg
Expand Down
4 changes: 2 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ func startComponents(manifestURL string) (apiServerURL string) {
minionController.Run(10 * time.Second)

// Kubelet (localhost)
standalone.SimpleRunKubelet(etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
standalone.SimpleRunKubelet(etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)
standalone.SimpleRunKubelet(cl, etcdClient, &fakeDocker2, machineList[1], testRootDir2, "", "127.0.0.1", 10251)

return apiServer.URL
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,13 @@ func main() {
glog.Info(err)
}

client, err := standalone.GetAPIServerClient(*authPath, apiServerList)
if err != nil && len(apiServerList) > 0 {
glog.Warningf("No API client: %v", err)
}

kcfg := standalone.KubeletConfig{
Address: address,
AuthPath: *authPath,
ApiServerList: apiServerList,
AllowPrivileged: *allowPrivileged,
HostnameOverride: *hostnameOverride,
RootDirectory: *rootDirectory,
Expand All @@ -125,6 +128,7 @@ func main() {
EnableServer: *enableServer,
EnableDebuggingHandlers: *enableDebuggingHandlers,
DockerClient: util.ConnectToDockerOrDie(*dockerEndpoint),
KubeClient: client,
EtcdClient: kubelet.EtcdClientOrDie(etcdServerList, *etcdConfigFile),
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr string
standalone.RunControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory)

dockerClient := util.ConnectToDockerOrDie(*dockerEndpoint)
standalone.SimpleRunKubelet(etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
standalone.SimpleRunKubelet(cl, etcdClient, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250)
}

func newApiClient(addr string, port int) *client.Client {
Expand Down
2 changes: 2 additions & 0 deletions hack/local-up-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ sudo "${GO_OUT}/kubelet" \
--etcd_servers="http://127.0.0.1:4001" \
--hostname_override="127.0.0.1" \
--address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
--port="$KUBELET_PORT" >"${KUBELET_LOG}" 2>&1 &
KUBELET_PID=$!

Expand Down
2 changes: 2 additions & 0 deletions hack/test-cmd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ kube::log::status "Starting kubelet"
--etcd_servers="http://${ETCD_HOST}:${ETCD_PORT}" \
--hostname_override="127.0.0.1" \
--address="127.0.0.1" \
--api_servers="${API_HOST}:${API_PORT}" \
--auth_path="${KUBE_ROOT}/hack/.test-cmd-auth" \
--port="$KUBELET_PORT" 1>&2 &
KUBELET_PID=$!

Expand Down
14 changes: 14 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,8 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {

// GetKubeletContainerLogs returns logs from the container
// The second parameter of GetPodInfo and FindPodContainer methods represents pod UUID, which is allowed to be blank
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error {
_, err := kl.GetPodInfo(podFullName, "")
if err == dockertools.ErrNoContainersInPod {
Expand All @@ -1153,6 +1155,18 @@ func (kl *Kubelet) GetBoundPods() ([]api.BoundPod, error) {
return kl.pods, nil
}

// GetPodFullName provides the first pod that matches namespace and name, or false
// if no such pod can be found.
func (kl *Kubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
for i := range kl.pods {
pod := &kl.pods[i]
if pod.Namespace == namespace && pod.Name == name {
return pod, true
}
}
return nil, false
}

// GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) {
var manifest api.PodSpec
Expand Down
75 changes: 30 additions & 45 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/golang/glog"
"github.com/google/cadvisor/info"
Expand Down Expand Up @@ -66,6 +65,7 @@ type HostInterface interface {
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error)
GetBoundPods() ([]api.BoundPod, error)
GetPodByName(namespace, name string) (*api.BoundPod, bool)
GetPodInfo(name, uuid string) (api.PodInfo, error)
RunInContainer(name, uuid, container string, cmd []string) ([]byte, error)
GetKubeletContainerLogs(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
Expand Down Expand Up @@ -146,13 +146,11 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
follow, _ := strconv.ParseBool(uriValues.Get("follow"))
tail := uriValues.Get("tail")

podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: podID,
Namespace: podNamespace,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}

fw := FlushWriter{writer: w}
if flusher, ok := fw.writer.(http.Flusher); ok {
Expand All @@ -162,7 +160,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
}
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
err = s.host.GetKubeletContainerLogs(podFullName, containerName, tail, follow, &fw, &fw)
err = s.host.GetKubeletContainerLogs(GetPodFullName(pod), containerName, tail, follow, &fw, &fw)
if err != nil {
s.error(w, err)
return
Expand Down Expand Up @@ -217,19 +215,12 @@ func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request, version
http.Error(w, "Missing 'podNamespace=' query entry.", http.StatusBadRequest)
return
}
// TODO: backwards compatibility with existing API, needs API change
podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: podID,
Namespace: podNamespace,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
info, err := s.host.GetPodInfo(podFullName, podUUID)
if err == dockertools.ErrNoContainersInPod {
http.Error(w, "api.BoundPod does not exist", http.StatusNotFound)
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
info, err := s.host.GetPodInfo(GetPodFullName(pod), podUUID)
if err != nil {
s.error(w, err)
return
Expand Down Expand Up @@ -293,15 +284,13 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) {
http.Error(w, "Unexpected path for command running", http.StatusBadRequest)
return
}
podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: podID,
Namespace: podNamespace,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
pod, ok := s.host.GetPodByName(podNamespace, podID)
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
command := strings.Split(u.Query().Get("cmd"), " ")
data, err := s.host.RunInContainer(podFullName, uuid, container, command)
data, err := s.host.RunInContainer(GetPodFullName(pod), uuid, container, command)
if err != nil {
s.error(w, err)
return
Expand Down Expand Up @@ -344,24 +333,20 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
// TODO(monnand) Implement this
errors.New("pod level status currently unimplemented")
case 3:
// Backward compatibility without uuid information
podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: components[1],
Namespace: api.NamespaceDefault,
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
stats, err = s.host.GetContainerInfo(podFullName, "", components[2], &query)
// Backward compatibility without uuid information, does not support namespace
pod, ok := s.host.GetPodByName(api.NamespaceDefault, components[1])
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
stats, err = s.host.GetContainerInfo(GetPodFullName(pod), "", components[2], &query)
case 5:
podFullName := GetPodFullName(&api.BoundPod{
ObjectMeta: api.ObjectMeta{
Name: components[2],
Namespace: components[1],
Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"},
},
})
stats, err = s.host.GetContainerInfo(podFullName, components[3], components[4], &query)
pod, ok := s.host.GetPodByName(components[1], components[2])
if !ok {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
stats, err = s.host.GetContainerInfo(GetPodFullName(pod), components[3], components[4], &query)
default:
http.Error(w, "unknown resource.", http.StatusNotFound)
return
Expand Down
19 changes: 18 additions & 1 deletion pkg/kubelet/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
)

type fakeKubelet struct {
podByNameFunc func(namespace, name string) (*api.BoundPod, bool)
infoFunc func(name string) (api.PodInfo, error)
containerInfoFunc func(podFullName, uid, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
Expand All @@ -43,6 +44,10 @@ type fakeKubelet struct {
containerLogsFunc func(podFullName, containerName, tail string, follow bool, stdout, stderr io.Writer) error
}

func (fk *fakeKubelet) GetPodByName(namespace, name string) (*api.BoundPod, bool) {
return fk.podByNameFunc(namespace, name)
}

func (fk *fakeKubelet) GetPodInfo(name, uuid string) (api.PodInfo, error) {
return fk.infoFunc(name)
}
Expand Down Expand Up @@ -88,7 +93,19 @@ func newServerTest() *serverTestFramework {
updateChan: make(chan interface{}),
}
fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{}
fw.fakeKubelet = &fakeKubelet{
podByNameFunc: func(namespace, name string) (*api.BoundPod, bool) {
return &api.BoundPod{
ObjectMeta: api.ObjectMeta{
Namespace: namespace,
Name: name,
Annotations: map[string]string{
ConfigSourceAnnotationKey: "etcd",
},
},
}, true
},
}
server := NewServer(fw.fakeKubelet, true)
fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
Expand Down
53 changes: 8 additions & 45 deletions pkg/kubelet/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package kubelet

import (
"fmt"
"net/http"
"os"
"path"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
Expand Down Expand Up @@ -97,47 +95,12 @@ func SetupLogging() {
record.StartLogging(glog.Infof)
}

// TODO: move this into pkg/client
func getApiserverClient(authPath string, apiServerList util.StringList) (*client.Client, error) {
authInfo, err := clientauth.LoadFromFile(authPath)
if err != nil {
return nil, err
}
clientConfig, err := authInfo.MergeWithConfig(client.Config{})
if err != nil {
return nil, err
}
if len(apiServerList) < 1 {
return nil, fmt.Errorf("no apiservers specified.")
}
// TODO: adapt Kube client to support LB over several servers
if len(apiServerList) > 1 {
glog.Infof("Mulitple api servers specified. Picking first one")
}
clientConfig.Host = apiServerList[0]
if c, err := client.New(&clientConfig); err != nil {
return nil, err
} else {
return c, nil
}
}

func SetupEventSending(authPath string, apiServerList util.StringList) {
// Make an API client if possible.
if len(apiServerList) < 1 {
glog.Info("No api servers specified.")
} else {
if apiClient, err := getApiserverClient(authPath, apiServerList); err != nil {
glog.Errorf("Unable to make apiserver client: %v", err)
} else {
// Send events to APIserver if there is a client.
hostname := util.GetHostname("")
glog.Infof("Sending events to APIserver.")
record.StartRecording(apiClient.Events(""),
api.EventSource{
Component: "kubelet",
Host: hostname,
})
}
}
func SetupEventSending(client *client.Client) {
glog.Infof("Sending events to api server.")
hostname := util.GetHostname("")
record.StartRecording(client.Events(""),
api.EventSource{
Component: "kubelet",
Host: hostname,
})
}
Loading

0 comments on commit ba53d72

Please sign in to comment.