Skip to content

Commit

Permalink
Merge pull request kubernetes#457 from dchen1107/minionlogs
Browse files Browse the repository at this point in the history
Add support to query kubelet's logs and cadvisor's stats through apiserver by passing rawquery
  • Loading branch information
lavalamp committed Jul 21, 2014
2 parents 5f13c4b + dc921b8 commit f672edd
Show file tree
Hide file tree
Showing 91 changed files with 30,435 additions and 0 deletions.
122 changes: 122 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ limitations under the License.
package apiserver

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
"net/url"
"path"
"runtime/debug"
"strings"
"time"

"code.google.com/p/go.net/html"
"code.google.com/p/go.net/html/atom"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
Expand Down Expand Up @@ -153,6 +159,8 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer {

s.mux.HandleFunc("/", s.notFound)

s.mux.HandleFunc("/proxy/minion/", s.handleMinionReq)

return s
}

Expand All @@ -171,6 +179,120 @@ func (server *APIServer) handleIndex(w http.ResponseWriter, req *http.Request) {
fmt.Fprint(w, data)
}

func (server *APIServer) handleMinionReq(w http.ResponseWriter, req *http.Request) {
minionPrefix := "/proxy/minion/"
if !strings.HasPrefix(req.URL.Path, minionPrefix) {
server.notFound(w, req)
return
}

path := req.URL.Path[len(minionPrefix):]
rawQuery := req.URL.RawQuery

// Expect path as: ${minion}/${query_to_minion}
// and query_to_minion can be any query that kubelet will accept.
//
// For example:
// To query stats of a minion or a pod or a container,
// path string can be ${minion}/stats/<podid>/<containerName> or
// ${minion}/podInfo?podID=<podid>
//
// To query logs on a minion, path string can be:
// ${minion}/logs/
idx := strings.Index(path, "/")
minionHost := path[:idx]
_, port, _ := net.SplitHostPort(minionHost)
if port == "" {
// Couldn't retrieve port information
// TODO: Retrieve port info from a common object
minionHost += ":10250"
}
minionPath := path[idx:]

minionURL := &url.URL{
Scheme: "http",
Host: minionHost,
}
newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil)
if err != nil {
glog.Errorf("Failed to create request: %s", err)
}

proxy := httputil.NewSingleHostReverseProxy(minionURL)
proxy.Transport = &minionTransport{}
proxy.ServeHTTP(w, newReq)
}

type minionTransport struct{}

func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := http.DefaultTransport.RoundTrip(req)

if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") {
// Do nothing, simply pass through
return resp, err
}

resp, err = t.ProcessResponse(req, resp)
return resp, err
}

func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// copying the response body did not work
return nil, err
}

bodyNode := &html.Node{
Type: html.ElementNode,
Data: "body",
DataAtom: atom.Body,
}
nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode)
if err != nil {
glog.Errorf("Failed to found <body> node: %v", err)
return resp, err
}

// Define the method to traverse the doc tree and update href node to
// point to correct minion
var updateHRef func(*html.Node)
updateHRef = func(n *html.Node) {
if n.Type == html.ElementNode && n.Data == "a" {
for i, attr := range n.Attr {
if attr.Key == "href" {
Url := &url.URL{
Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val,
}
n.Attr[i].Val = Url.String()
break
}
}
}
for c := n.FirstChild; c != nil; c = c.NextSibling {
updateHRef(c)
}
}

newContent := &bytes.Buffer{}
for _, n := range nodes {
updateHRef(n)
err = html.Render(newContent, n)
if err != nil {
glog.Errorf("Failed to render: %v", err)
}
}

resp.Body = ioutil.NopCloser(newContent)
// Update header node with new content-length
// TODO: Remove any hash/signature headers here?
resp.Header.Del("Content-Length")
resp.ContentLength = int64(newContent.Len())

return resp, err
}

// HTTP Handler interface
func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer func() {
Expand Down
50 changes: 50 additions & 0 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -666,3 +667,52 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected non-error")
}
}

func TestMinionTransport(t *testing.T) {
content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
transport := &minionTransport{}

// Test /logs/
request := &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Host: "minion1:10250",
Path: "/logs/",
},
}
response := &http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader(content)),
Close: true,
}
updated_resp, _ := transport.ProcessResponse(request, response)
body, _ := ioutil.ReadAll(updated_resp.Body)
expected := string(`<pre><a href="/proxy/minion/minion1:10250/logs/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:10250/logs/google.log">google.log</a></pre>`)
if !strings.Contains(string(body), expected) {
t.Errorf("Received wrong content: %s", string(body))
}

// Test subdir under /logs/
request = &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Host: "minion1:8080",
Path: "/whatever/apt/",
},
}
response = &http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader(content)),
Close: true,
}
updated_resp, _ = transport.ProcessResponse(request, response)
body, _ = ioutil.ReadAll(updated_resp.Body)
expected = string(`<pre><a href="/proxy/minion/minion1:8080/whatever/apt/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:8080/whatever/apt/google.log">google.log</a></pre>`)
if !strings.Contains(string(body), expected) {
t.Errorf("Received wrong content: %s", string(body))
}
}
12 changes: 12 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type CadvisorInterface interface {
}

// New creates a new Kubelet.
// TODO: currently it is only called by test code.
// Need cleanup.
func New() *Kubelet {
return &Kubelet{}
}
Expand All @@ -77,6 +79,7 @@ type Kubelet struct {
HTTPCheckFrequency time.Duration
pullLock sync.Mutex
HealthChecker health.HealthChecker
LogServer http.Handler
}

type manifestUpdate struct {
Expand All @@ -94,6 +97,9 @@ const (
// RunKubelet starts background goroutines. If config_path, manifest_url, or address are empty,
// they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL string, etcdServers []string, address string, port uint) {
if kl.LogServer == nil {
kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.CadvisorClient == nil {
var err error
kl.CadvisorClient, err = cadvisor.NewClient("http://127.0.0.1:5000")
Expand Down Expand Up @@ -814,3 +820,9 @@ func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIC
}
return kl.HealthChecker.HealthCheck(container)
}

// Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve
kl.LogServer.ServeHTTP(w, req)
}
3 changes: 3 additions & 0 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type kubeletInterface interface {
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error)
GetPodInfo(name string) (api.PodInfo, error)
ServeLogs(w http.ResponseWriter, req *http.Request)
}

func (s *Server) error(w http.ResponseWriter, err error) {
Expand Down Expand Up @@ -131,6 +132,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
w.Header().Add("Content-type", "application/json")
w.Write(data)
case strings.HasPrefix(u.Path, "/logs/"):
s.Kubelet.ServeLogs(w, req)
default:
s.DelegateHandler.ServeHTTP(w, req)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/kubelet/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/http/httputil"
"reflect"
"strings"
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
Expand All @@ -37,6 +39,7 @@ type fakeKubelet struct {
containerInfoFunc func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
machineInfoFunc func() (*info.MachineInfo, error)
logFunc func(w http.ResponseWriter, req *http.Request)
}

func (fk *fakeKubelet) GetPodInfo(name string) (api.PodInfo, error) {
Expand All @@ -55,6 +58,10 @@ func (fk *fakeKubelet) GetMachineInfo() (*info.MachineInfo, error) {
return fk.machineInfoFunc()
}

func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
fk.logFunc(w, req)
}

type serverTestFramework struct {
updateChan chan manifestUpdate
updateReader *channelReader
Expand Down Expand Up @@ -253,3 +260,31 @@ func TestMachineInfo(t *testing.T) {
t.Errorf("received wrong data: %#v", receivedInfo)
}
}

func TestServeLogs(t *testing.T) {
fw := makeServerTest()

content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)

fw.fakeKubelet.logFunc = func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "text/html")
w.Write([]byte(content))
}

resp, err := http.Get(fw.testHTTPServer.URL + "/logs/")
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
defer resp.Body.Close()

body, err := httputil.DumpResponse(resp, true)
if err != nil {
// copying the response body did not work
t.Errorf("Cannot copy resp: %#v", err)
}
result := string(body)
if !strings.Contains(result, "kubelet.log") || !strings.Contains(result, "google.log") {
t.Errorf("Received wrong data: %s", result)
}
}
1 change: 1 addition & 0 deletions third_party/deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ DEP_PACKAGES="
gopkg.in/v1/yaml
bitbucket.org/kardianos/osext
code.google.com/p/google-api-go-client/googleapi
code.google.com/p/go.net/html
github.com/coreos/go-log/log
github.com/coreos/go-systemd/journal
code.google.com/p/go.net/websocket
Expand Down
Loading

0 comments on commit f672edd

Please sign in to comment.