Skip to content

Commit

Permalink
Move logger to own package; clean up apiserver to use serve mux.
Browse files Browse the repository at this point in the history
  • Loading branch information
lavalamp committed Jul 16, 2014
1 parent c7d31fa commit 90afdb0
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 44 deletions.
95 changes: 55 additions & 40 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"runtime/debug"
"strings"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
Expand Down Expand Up @@ -83,25 +85,37 @@ func MakeAsync(fn WorkFunc) <-chan interface{} {
//
// TODO: consider migrating this to go-restful which is a more full-featured version of the same thing.
type APIServer struct {
prefix string
storage map[string]RESTStorage
ops *Operations
logserver http.Handler
prefix string
storage map[string]RESTStorage
ops *Operations
mux *http.ServeMux
}

// New creates a new APIServer object.
// 'storage' contains a map of handlers.
// 'prefix' is the hosting path prefix.
func New(storage map[string]RESTStorage, prefix string) *APIServer {
return &APIServer{
storage: storage,
prefix: prefix,
ops: NewOperations(),
logserver: http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))),
s := &APIServer{
storage: storage,
prefix: strings.TrimRight(prefix, "/"),
ops: NewOperations(),
mux: http.NewServeMux(),
}

s.mux.Handle("/logs/", http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))))
s.mux.HandleFunc(s.prefix+"/", s.ServeREST)
healthz.InstallHandler(s.mux)
s.mux.HandleFunc("/index.html", s.handleIndex)

// Handle both operations and operations/* with the same handler
opPrefix := path.Join(s.prefix, "operations")
s.mux.HandleFunc(opPrefix, s.handleOperationRequest)
s.mux.HandleFunc(opPrefix+"/", s.handleOperationRequest)

return s
}

func (server *APIServer) handleIndex(w http.ResponseWriter) {
func (server *APIServer) handleIndex(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
// TODO: serve this out of a file?
data := "<html><body>Welcome to Kubernetes</body></html>"
Expand All @@ -117,47 +131,37 @@ func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
glog.Infof("APIServer panic'd on %v %v: %#v\n%s\n", req.Method, req.RequestURI, x, debug.Stack())
}
}()
defer MakeLogged(req, &w).StacktraceWhen(
StatusIsNot(
defer httplog.MakeLogged(req, &w).StacktraceWhen(
httplog.StatusIsNot(
http.StatusOK,
http.StatusAccepted,
http.StatusConflict,
),
).Log()
url, err := url.ParseRequestURI(req.RequestURI)
if err != nil {
server.error(err, w)
return
}
if url.Path == "/index.html" || url.Path == "/" || url.Path == "" {
server.handleIndex(w)
return
}
if strings.HasPrefix(url.Path, "/logs/") {
server.logserver.ServeHTTP(w, req)
return
}
if !strings.HasPrefix(url.Path, server.prefix) {

// Dispatch via our mux.
server.mux.ServeHTTP(w, req)
}

// ServeREST handles requests to all our RESTStorage objects.
func (server *APIServer) ServeREST(w http.ResponseWriter, req *http.Request) {
if !strings.HasPrefix(req.URL.Path, server.prefix) {
server.notFound(req, w)
return
}
requestParts := strings.Split(url.Path[len(server.prefix):], "/")[1:]
requestParts := strings.Split(req.URL.Path[len(server.prefix):], "/")[1:]
if len(requestParts) < 1 {
server.notFound(req, w)
return
}
if requestParts[0] == "operations" {
server.handleOperationRequest(requestParts[1:], w, req)
return
}
storage := server.storage[requestParts[0]]
if storage == nil {
LogOf(w).Addf("'%v' has no storage object", requestParts[0])
httplog.LogOf(w).Addf("'%v' has no storage object", requestParts[0])
server.notFound(req, w)
return
}

server.handleREST(requestParts, url, req, w, storage)
server.handleREST(requestParts, req, w, storage)
}

func (server *APIServer) notFound(req *http.Request, w http.ResponseWriter) {
Expand Down Expand Up @@ -197,7 +201,7 @@ func (server *APIServer) finishReq(out <-chan interface{}, sync bool, timeout ti
status := http.StatusOK
switch stat := obj.(type) {
case api.Status:
LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
httplog.LogOf(w).Addf("programmer error: use *api.Status as a result, not api.Status.")
if stat.Code != 0 {
status = stat.Code
}
Expand Down Expand Up @@ -236,14 +240,14 @@ func parseTimeout(str string) time.Duration {
// sync=[false|true] Synchronous request (only applies to create, update, delete operations)
// timeout=<duration> Timeout for synchronous requests, only applies if sync=true
// labels=<label-selector> Used for filtering list operations
func (server *APIServer) handleREST(parts []string, requestURL *url.URL, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
sync := requestURL.Query().Get("sync") == "true"
timeout := parseTimeout(requestURL.Query().Get("timeout"))
func (server *APIServer) handleREST(parts []string, req *http.Request, w http.ResponseWriter, storage RESTStorage) {
sync := req.URL.Query().Get("sync") == "true"
timeout := parseTimeout(req.URL.Query().Get("timeout"))
switch req.Method {
case "GET":
switch len(parts) {
case 1:
selector, err := labels.ParseSelector(requestURL.Query().Get("labels"))
selector, err := labels.ParseSelector(req.URL.Query().Get("labels"))
if err != nil {
server.error(err, w)
return
Expand Down Expand Up @@ -325,7 +329,18 @@ func (server *APIServer) handleREST(parts []string, requestURL *url.URL, req *ht
}
}

func (server *APIServer) handleOperationRequest(parts []string, w http.ResponseWriter, req *http.Request) {
func (server *APIServer) handleOperationRequest(w http.ResponseWriter, req *http.Request) {
opPrefix := path.Join(server.prefix, "operations")
if !strings.HasPrefix(req.URL.Path, opPrefix) {
server.notFound(req, w)
return
}
trimmed := strings.TrimLeft(req.URL.Path[len(opPrefix):], "/")
parts := strings.Split(trimmed, "/")
if len(parts) > 1 {
server.notFound(req, w)
return
}
if req.Method != "GET" {
server.notFound(req, w)
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,3 +409,37 @@ func TestSyncCreateTimeout(t *testing.T) {
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, 202, response)
}
}

func TestOpGet(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, "/prefix/version")
server := httptest.NewServer(handler)
client := http.Client{}

simple := Simple{Name: "foo"}
data, _ := api.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
expectNoError(t, err)
response, err := client.Do(request)
expectNoError(t, err)
if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response)
}

var itemOut api.Status
body, err := extractBody(response, &itemOut)
expectNoError(t, err)
if itemOut.Status != api.StatusWorking || itemOut.Details == "" {
t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body))
}

req2, err := http.NewRequest("GET", server.URL+"/prefix/version/operations/"+itemOut.Details, nil)
expectNoError(t, err)
_, err = client.Do(req2)
expectNoError(t, err)
if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response)
}
}
4 changes: 4 additions & 0 deletions pkg/healthz/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ func handleHealthz(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}

func InstallHandler(mux *http.ServeMux) {
mux.HandleFunc("/healthz", handleHealthz)
}
19 changes: 19 additions & 0 deletions pkg/httplog/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package httplog contains a helper object and functions to maintain a log
// along with an http response.
package httplog
20 changes: 18 additions & 2 deletions pkg/apiserver/logger.go → pkg/httplog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package apiserver
package httplog

import (
"fmt"
Expand All @@ -25,6 +25,18 @@ import (
"github.com/golang/glog"
)

// Handler wraps all HTTP calls to delegate with nice logging.
// delegate may use LogOf(w).Addf(...) to write additional info to
// the per-request log message.
//
// Intended to wrap calls to your ServeMux.
func Handler(delegate http.Handler, pred StacktracePred) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer MakeLogged(req, &w).StacktraceWhen(pred).Log()
delegate.ServeHTTP(w, req)
})
}

// StacktracePred returns true if a stacktrace should be logged for this status
type StacktracePred func(httpStatus int) (logStacktrace bool)

Expand All @@ -44,7 +56,7 @@ type respLogger struct {

// DefaultStacktracePred is the default implementation of StacktracePred.
func DefaultStacktracePred(status int) bool {
return status != http.StatusOK && status != http.StatusAccepted
return status < http.StatusOK || status >= http.StatusBadRequest
}

// MakeLogged turns a normal response writer into a logged response writer.
Expand All @@ -60,6 +72,10 @@ func DefaultStacktracePred(status int) bool {
//
// Use LogOf(w).Addf(...) to log something along with the response result.
func MakeLogged(req *http.Request, w *http.ResponseWriter) *respLogger {
if _, ok := (*w).(*respLogger); ok {
// Don't double-wrap!
panic("multiple MakeLogged calls!")
}
rl := &respLogger{
startTime: time.Now(),
req: req,
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubelet/kubelet_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/google/cadvisor/info"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"gopkg.in/v1/yaml"
)

Expand All @@ -53,13 +53,14 @@ func (s *KubeletServer) error(w http.ResponseWriter, err error) {
}

func (s *KubeletServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer apiserver.MakeLogged(req, &w).Log()
defer httplog.MakeLogged(req, &w).Log()

u, err := url.ParseRequestURI(req.RequestURI)
if err != nil {
s.error(w, err)
return
}
// TODO: use an http.ServeMux instead of a switch.
switch {
case u.Path == "/container" || u.Path == "/containers":
defer req.Body.Close()
Expand Down

0 comments on commit 90afdb0

Please sign in to comment.