Skip to content

Commit

Permalink
Refactor kubelet to use http.ServeMux
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwaynecarr committed Aug 20, 2014
1 parent 6b05d71 commit fd8741e
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 102 deletions.
4 changes: 2 additions & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
myKubelet := kubelet.NewIntegrationTestKubelet(machineList[0], &fakeDocker1)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250)
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), "localhost", 10250)
}, 0)

// Kubelet (machine)
Expand All @@ -129,7 +129,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
otherKubelet := kubelet.NewIntegrationTestKubelet(machineList[1], &fakeDocker2)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251)
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), "localhost", 10251)
}, 0)

return apiServer.URL
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func main() {
// start the kubelet server
if *enableServer {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), http.DefaultServeMux, *address, *port)
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), *address, *port)
}, 0)
}

Expand Down
234 changes: 139 additions & 95 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,13 @@ import (
type Server struct {
host HostInterface
updates chan<- interface{}
handler http.Handler
mux *http.ServeMux
}

func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, delegate http.Handler, address string, port uint) {
// ListenAndServeKubeletServer initializes a server to respond to HTTP network requests on the Kubelet
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, address string, port uint) {
glog.Infof("Starting to listen on %s:%d", address, port)
handler := Server{
host: host,
updates: updates,
handler: delegate,
}
handler := NewServer(host, updates)
s := &http.Server{
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
Expand All @@ -71,107 +68,154 @@ type HostInterface interface {
ServeLogs(w http.ResponseWriter, req *http.Request)
}

// NewServer initializes and configures a kubelet.Server object to handle HTTP requests
func NewServer(host HostInterface, updates chan<- interface{}) Server {
server := Server{
host: host,
updates: updates,
mux: http.NewServeMux(),
}
server.InstallDefaultHandlers()
return server
}

// InstallDefaultHandlers registers the set of supported HTTP request patterns with the mux
func (s *Server) InstallDefaultHandlers() {
s.mux.HandleFunc("/healthz", s.handleHealth)
s.mux.HandleFunc("/container", s.handleContainer)
s.mux.HandleFunc("/containers", s.handleContainers)
s.mux.HandleFunc("/podInfo", s.handlePodInfo)
s.mux.HandleFunc("/stats/", s.handleStats)
s.mux.HandleFunc("/logs/", s.handleLogs)
s.mux.HandleFunc("/spec/", s.handleSpec)
}

// error serializes an error object into an HTTP response
func (s *Server) error(w http.ResponseWriter, err error) {
http.Error(w, fmt.Sprintf("Internal Error: %v", err), http.StatusInternalServerError)
}

func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer httplog.MakeLogged(req, &w).StacktraceWhen(
httplog.StatusIsNot(
http.StatusOK,
http.StatusNotFound,
),
).Log()
// handleHealth handles health checking requests against the Kubelet
func (s *Server) handleHealth(w http.ResponseWriter, req *http.Request) {
}

// handleContainer handles container requests against the Kubelet
func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := ioutil.ReadAll(req.Body)
if err != nil {
s.error(w, err)
return
}
// This is to provide backward compatibility. It only supports a single manifest
var pod Pod
err = yaml.Unmarshal(data, &pod.Manifest)
if err != nil {
s.error(w, err)
return
}
//TODO: sha1 of manifest?
pod.Name = "1"
s.updates <- PodUpdate{[]Pod{pod}, SET}

}

// handleContainers handles containers requests against the Kubelet
func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
data, err := ioutil.ReadAll(req.Body)
if err != nil {
s.error(w, err)
return
}
var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests)
if err != nil {
s.error(w, err)
return
}
pods := make([]Pod, len(manifests))
for i := range manifests {
pods[i].Name = fmt.Sprintf("%d", i+1)
pods[i].Manifest = manifests[i]
}
s.updates <- PodUpdate{pods, SET}

}

// handlePodInfo handles podInfo requests against the Kubelet
func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) {
u, err := url.ParseRequestURI(req.RequestURI)
if err != nil {
s.error(w, err)
return
}
// TODO: use an http.ServeMux instead of a switch.
switch {
case u.Path == "/container" || u.Path == "/containers":
defer req.Body.Close()
data, err := ioutil.ReadAll(req.Body)
if err != nil {
s.error(w, err)
return
}
if u.Path == "/container" {
// This is to provide backward compatibility. It only supports a single manifest
var pod Pod
err = yaml.Unmarshal(data, &pod.Manifest)
if err != nil {
s.error(w, err)
return
}
//TODO: sha1 of manifest?
pod.Name = "1"
s.updates <- PodUpdate{[]Pod{pod}, SET}
} else if u.Path == "/containers" {
var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests)
if err != nil {
s.error(w, err)
return
}
pods := make([]Pod, len(manifests))
for i := range manifests {
pods[i].Name = fmt.Sprintf("%d", i+1)
pods[i].Manifest = manifests[i]
}
s.updates <- PodUpdate{pods, SET}
}
case u.Path == "/podInfo":
podID := u.Query().Get("podID")
if len(podID) == 0 {
w.WriteHeader(http.StatusBadRequest)
http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest)
return
}
// TODO: backwards compatibility with existing API, needs API change
podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"})
info, err := s.host.GetPodInfo(podFullName)
if err == ErrNoContainersInPod {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
if err != nil {
s.error(w, err)
return
}
data, err := json.Marshal(info)
if err != nil {
s.error(w, err)
return
}
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-type", "application/json")
w.Write(data)
case strings.HasPrefix(u.Path, "/stats"):
s.serveStats(w, req)
case strings.HasPrefix(u.Path, "/spec"):
info, err := s.host.GetMachineInfo()
if err != nil {
s.error(w, err)
return
}
data, err := json.Marshal(info)
if err != nil {
s.error(w, err)
return
}
w.Header().Add("Content-type", "application/json")
w.Write(data)
case strings.HasPrefix(u.Path, "/logs/"):
s.host.ServeLogs(w, req)
default:
if s.handler != nil {
s.handler.ServeHTTP(w, req)
}
podID := u.Query().Get("podID")
if len(podID) == 0 {
w.WriteHeader(http.StatusBadRequest)
http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest)
return
}
// TODO: backwards compatibility with existing API, needs API change
podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"})
info, err := s.host.GetPodInfo(podFullName)
if err == ErrNoContainersInPod {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
}
if err != nil {
s.error(w, err)
return
}
data, err := json.Marshal(info)
if err != nil {
s.error(w, err)
return
}
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-type", "application/json")
w.Write(data)
}

// handleStats handles stats requests against the Kubelet
func (s *Server) handleStats(w http.ResponseWriter, req *http.Request) {
s.serveStats(w, req)
}

// handleLogs handles logs requests against the Kubelet
func (s *Server) handleLogs(w http.ResponseWriter, req *http.Request) {
s.host.ServeLogs(w, req)
}

// handleSpec handles spec requests against the Kubelet
func (s *Server) handleSpec(w http.ResponseWriter, req *http.Request) {
info, err := s.host.GetMachineInfo()
if err != nil {
s.error(w, err)
return
}
data, err := json.Marshal(info)
if err != nil {
s.error(w, err)
return
}
w.Header().Add("Content-type", "application/json")
w.Write(data)

}

// ServeHTTP responds to HTTP requests on the Kubelet
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer httplog.MakeLogged(req, &w).StacktraceWhen(
httplog.StatusIsNot(
http.StatusOK,
http.StatusNotFound,
),
).Log()
s.mux.ServeHTTP(w, req)
}

// serveStats implements stats logic
func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
// /stats/<podfullname>/<containerName>
components := strings.Split(strings.TrimPrefix(path.Clean(req.URL.Path), "/"), "/")
Expand Down
6 changes: 2 additions & 4 deletions pkg/kubelet/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,8 @@ func makeServerTest() *serverTestFramework {
}
fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{}
fw.serverUnderTest = &Server{
host: fw.fakeKubelet,
updates: fw.updateChan,
}
server := NewServer(fw.fakeKubelet, fw.updateChan)
fw.serverUnderTest = &server
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw
}
Expand Down

0 comments on commit fd8741e

Please sign in to comment.