Skip to content

Commit

Permalink
Add support to query kubelet's logs and cadvisor's stats through
Browse files Browse the repository at this point in the history
apiserver by passing rawquery.

minor changes

Fixed a minor rebase issues.

Using ioutil.ReadAll instead of httputil.DumpResponse
  • Loading branch information
dchen1107 committed Jul 21, 2014
1 parent 7910ae8 commit 064237a
Show file tree
Hide file tree
Showing 91 changed files with 30,435 additions and 0 deletions.
122 changes: 122 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ limitations under the License.
package apiserver

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
"net/url"
"path"
"runtime/debug"
"strings"
"time"

"code.google.com/p/go.net/html"
"code.google.com/p/go.net/html/atom"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
Expand Down Expand Up @@ -153,6 +159,8 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer {

s.mux.HandleFunc("/", s.notFound)

s.mux.HandleFunc("/proxy/minion/", s.handleMinionReq)

return s
}

Expand All @@ -171,6 +179,120 @@ func (server *APIServer) handleIndex(w http.ResponseWriter, req *http.Request) {
fmt.Fprint(w, data)
}

func (server *APIServer) handleMinionReq(w http.ResponseWriter, req *http.Request) {
minionPrefix := "/proxy/minion/"
if !strings.HasPrefix(req.URL.Path, minionPrefix) {
server.notFound(w, req)
return
}

path := req.URL.Path[len(minionPrefix):]
rawQuery := req.URL.RawQuery

// Expect path as: ${minion}/${query_to_minion}
// and query_to_minion can be any query that kubelet will accept.
//
// For example:
// To query stats of a minion or a pod or a container,
// path string can be ${minion}/stats/<podid>/<containerName> or
// ${minion}/podInfo?podID=<podid>
//
// To query logs on a minion, path string can be:
// ${minion}/logs/
idx := strings.Index(path, "/")
minionHost := path[:idx]
_, port, _ := net.SplitHostPort(minionHost)
if port == "" {
// No port information
// TODO: Retrieve port info from a common object
minionHost += ":10250"
}
minionPath := path[idx:]

minionURL := &url.URL{
Scheme: "http",
Host: minionHost,
}
newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil)
if err != nil {
glog.Errorf("Failed to create request: %s", err)
}

proxy := httputil.NewSingleHostReverseProxy(minionURL)
proxy.Transport = &minionTransport{}
proxy.ServeHTTP(w, newReq)
}

type minionTransport struct{}

func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := http.DefaultTransport.RoundTrip(req)

if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") {
// Do nothing, simply pass through
return resp, err
}

resp, err = t.ProcessResponse(req, resp)
return resp, err
}

func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// copying the response body did not work
return nil, err
}

bodyNode := &html.Node{
Type: html.ElementNode,
Data: "body",
DataAtom: atom.Body,
}
nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode)
if err != nil {
glog.Errorf("Failed to found <body> node: %v", err)
return resp, err
}

// Define the method to traverse the doc tree and update href node to
// point to correct minion
var updateHRef func(*html.Node)
updateHRef = func(n *html.Node) {
if n.Type == html.ElementNode && n.Data == "a" {
for i, attr := range n.Attr {
if attr.Key == "href" {
Url := &url.URL{
Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val,
}
n.Attr[i].Val = Url.String()
break
}
}
}
for c := n.FirstChild; c != nil; c = c.NextSibling {
updateHRef(c)
}
}

newContent := &bytes.Buffer{}
for _, n := range nodes {
updateHRef(n)
err = html.Render(newContent, n)
if err != nil {
glog.Errorf("Failed to render: %v", err)
}
}

resp.Body = ioutil.NopCloser(newContent)
// Update header node with new content-length
// TODO: Remove any hash/signature headers here?
resp.Header.Del("Content-Length")
resp.ContentLength = int64(newContent.Len())

return resp, err
}

// HTTP Handler interface
func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer func() {
Expand Down
50 changes: 50 additions & 0 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -666,3 +667,52 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected non-error")
}
}

func TestMinionTransport(t *testing.T) {
content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
transport := &minionTransport{}

// Test /logs/
request := &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Host: "minion1:10250",
Path: "/logs/",
},
}
response := &http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader(content)),
Close: true,
}
updated_resp, _ := transport.ProcessResponse(request, response)
body, _ := ioutil.ReadAll(updated_resp.Body)
expected := string(`<pre><a href="/proxy/minion/minion1:10250/logs/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:10250/logs/google.log">google.log</a></pre>`)
if !strings.Contains(string(body), expected) {
t.Errorf("Received wrong content: %s", string(body))
}

// Test subdir under /logs/
request = &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Host: "minion1:8080",
Path: "/whatever/apt/",
},
}
response = &http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader(content)),
Close: true,
}
updated_resp, _ = transport.ProcessResponse(request, response)
body, _ = ioutil.ReadAll(updated_resp.Body)
expected = string(`<pre><a href="/proxy/minion/minion1:8080/whatever/apt/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:8080/whatever/apt/google.log">google.log</a></pre>`)
if !strings.Contains(string(body), expected) {
t.Errorf("Received wrong content: %s", string(body))
}
}
12 changes: 12 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type CadvisorInterface interface {
}

// New creates a new Kubelet.
// TODO: currently it is only called by test code.
// Need cleanup.
func New() *Kubelet {
return &Kubelet{}
}
Expand All @@ -77,6 +79,7 @@ type Kubelet struct {
HTTPCheckFrequency time.Duration
pullLock sync.Mutex
HealthChecker health.HealthChecker
LogServer http.Handler
}

type manifestUpdate struct {
Expand All @@ -94,6 +97,9 @@ const (
// RunKubelet starts background goroutines. If config_path, manifest_url, or address are empty,
// they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL string, etcdServers []string, address string, port uint) {
if kl.LogServer == nil {
kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.CadvisorClient == nil {
var err error
kl.CadvisorClient, err = cadvisor.NewClient("http://127.0.0.1:5000")
Expand Down Expand Up @@ -812,3 +818,9 @@ func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIC
}
return kl.HealthChecker.HealthCheck(container)
}

// Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve
kl.LogServer.ServeHTTP(w, req)
}
3 changes: 3 additions & 0 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type kubeletInterface interface {
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error)
GetPodInfo(name string) (api.PodInfo, error)
ServeLogs(w http.ResponseWriter, req *http.Request)
}

func (s *Server) error(w http.ResponseWriter, err error) {
Expand Down Expand Up @@ -131,6 +132,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
w.Header().Add("Content-type", "application/json")
w.Write(data)
case strings.HasPrefix(u.Path, "/logs/"):
s.Kubelet.ServeLogs(w, req)
default:
s.DelegateHandler.ServeHTTP(w, req)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/kubelet/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/http/httputil"
"reflect"
"strings"
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
Expand All @@ -37,6 +39,7 @@ type fakeKubelet struct {
containerInfoFunc func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
machineInfoFunc func() (*info.MachineInfo, error)
logFunc func(w http.ResponseWriter, req *http.Request)
}

func (fk *fakeKubelet) GetPodInfo(name string) (api.PodInfo, error) {
Expand All @@ -55,6 +58,10 @@ func (fk *fakeKubelet) GetMachineInfo() (*info.MachineInfo, error) {
return fk.machineInfoFunc()
}

func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
fk.logFunc(w, req)
}

type serverTestFramework struct {
updateChan chan manifestUpdate
updateReader *channelReader
Expand Down Expand Up @@ -253,3 +260,31 @@ func TestMachineInfo(t *testing.T) {
t.Errorf("received wrong data: %#v", receivedInfo)
}
}

func TestServeLogs(t *testing.T) {
fw := makeServerTest()

content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)

fw.fakeKubelet.logFunc = func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "text/html")
w.Write([]byte(content))
}

resp, err := http.Get(fw.testHTTPServer.URL + "/logs/")
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
defer resp.Body.Close()

body, err := httputil.DumpResponse(resp, true)
if err != nil {
// copying the response body did not work
t.Errorf("Cannot copy resp: %#v", err)
}
result := string(body)
if !strings.Contains(result, "kubelet.log") || !strings.Contains(result, "google.log") {
t.Errorf("Received wrong data: %s", result)
}
}
1 change: 1 addition & 0 deletions third_party/deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ DEP_PACKAGES="
gopkg.in/v1/yaml
bitbucket.org/kardianos/osext
code.google.com/p/google-api-go-client/googleapi
code.google.com/p/go.net/html
github.com/coreos/go-log/log
github.com/coreos/go-systemd/journal
code.google.com/p/go.net/websocket
Expand Down
Loading

0 comments on commit 064237a

Please sign in to comment.