Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to query kubelet's logs and cadvisor's stats through apiserver by passing rawquery #457

Merged
merged 2 commits into from
Jul 21, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ limitations under the License.
package apiserver

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
"net/url"
"path"
"runtime/debug"
"strings"
"time"

"code.google.com/p/go.net/html"
"code.google.com/p/go.net/html/atom"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
Expand Down Expand Up @@ -153,6 +159,8 @@ func New(storage map[string]RESTStorage, prefix string) *APIServer {

s.mux.HandleFunc("/", s.notFound)

s.mux.HandleFunc("/proxy/minion/", s.handleMinionReq)

return s
}

Expand All @@ -171,6 +179,120 @@ func (server *APIServer) handleIndex(w http.ResponseWriter, req *http.Request) {
fmt.Fprint(w, data)
}

func (server *APIServer) handleMinionReq(w http.ResponseWriter, req *http.Request) {
minionPrefix := "/proxy/minion/"
if !strings.HasPrefix(req.URL.Path, minionPrefix) {
server.notFound(w, req)
return
}

path := req.URL.Path[len(minionPrefix):]
rawQuery := req.URL.RawQuery

// Expect path as: ${minion}/${query_to_minion}
// and query_to_minion can be any query that kubelet will accept.
//
// For example:
// To query stats of a minion or a pod or a container,
// path string can be ${minion}/stats/<podid>/<containerName> or
// ${minion}/podInfo?podID=<podid>
//
// To query logs on a minion, path string can be:
// ${minion}/logs/
idx := strings.Index(path, "/")
minionHost := path[:idx]
_, port, _ := net.SplitHostPort(minionHost)
if port == "" {
// Couldn't retrieve port information
// TODO: Retrieve port info from a common object
minionHost += ":10250"
}
minionPath := path[idx:]

minionURL := &url.URL{
Scheme: "http",
Host: minionHost,
}
newReq, err := http.NewRequest("GET", minionPath+"?"+rawQuery, nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, can you construct this URL via a URL object? Sorry to be such a pain about this...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried that, actually that is my initial version I started from at the beginning, but ran into mystery issue with a nil response. I tried it yesterday again, and had the same result. I also dumped the entire Request, before and after proxied, and comparing them with dumps with this version field by field yesterday, cannot figure it out. It looks like a bug in httputil package. Already spent too much time on this, didn't dive deeper.

if err != nil {
glog.Errorf("Failed to create request: %s", err)
}

proxy := httputil.NewSingleHostReverseProxy(minionURL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this URL and the one you pass to the GET request match?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I thought initially. Unfortunately they don't. From godoc, the new ReverseProxy returned here treats Path provides in URL as base path in target.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

proxy.Transport = &minionTransport{}
proxy.ServeHTTP(w, newReq)
}

type minionTransport struct{}

func (t *minionTransport) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := http.DefaultTransport.RoundTrip(req)

if strings.Contains(resp.Header.Get("Content-Type"), "text/plain") {
// Do nothing, simply pass through
return resp, err
}

resp, err = t.ProcessResponse(req, resp)
return resp, err
}

func (t *minionTransport) ProcessResponse(req *http.Request, resp *http.Response) (*http.Response, error) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// copying the response body did not work
return nil, err
}

bodyNode := &html.Node{
Type: html.ElementNode,
Data: "body",
DataAtom: atom.Body,
}
nodes, err := html.ParseFragment(bytes.NewBuffer(body), bodyNode)
if err != nil {
glog.Errorf("Failed to found <body> node: %v", err)
return resp, err
}

// Define the method to traverse the doc tree and update href node to
// point to correct minion
var updateHRef func(*html.Node)
updateHRef = func(n *html.Node) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test for this rewriting (maybe I missed it)? I think it deserves a test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is adding

if n.Type == html.ElementNode && n.Data == "a" {
for i, attr := range n.Attr {
if attr.Key == "href" {
Url := &url.URL{
Path: "/proxy/minion/" + req.URL.Host + req.URL.Path + attr.Val,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is req.URL.Path guaranteed to end with a /?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

}
n.Attr[i].Val = Url.String()
break
}
}
}
for c := n.FirstChild; c != nil; c = c.NextSibling {
updateHRef(c)
}
}

newContent := &bytes.Buffer{}
for _, n := range nodes {
updateHRef(n)
err = html.Render(newContent, n)
if err != nil {
glog.Errorf("Failed to render: %v", err)
}
}

resp.Body = ioutil.NopCloser(newContent)
// Update header node with new content-length
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also may need to remove any hash/signature headers-- that's a can of worms. TODO?

// TODO: Remove any hash/signature headers here?
resp.Header.Del("Content-Length")
resp.ContentLength = int64(newContent.Len())

return resp, err
}

// HTTP Handler interface
func (server *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer func() {
Expand Down
50 changes: 50 additions & 0 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http/httptest"
"net/url"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -666,3 +667,52 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected non-error")
}
}

func TestMinionTransport(t *testing.T) {
content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
transport := &minionTransport{}

// Test /logs/
request := &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Host: "minion1:10250",
Path: "/logs/",
},
}
response := &http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader(content)),
Close: true,
}
updated_resp, _ := transport.ProcessResponse(request, response)
body, _ := ioutil.ReadAll(updated_resp.Body)
expected := string(`<pre><a href="/proxy/minion/minion1:10250/logs/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:10250/logs/google.log">google.log</a></pre>`)
if !strings.Contains(string(body), expected) {
t.Errorf("Received wrong content: %s", string(body))
}

// Test subdir under /logs/
request = &http.Request{
Method: "GET",
URL: &url.URL{
Scheme: "http",
Host: "minion1:8080",
Path: "/whatever/apt/",
},
}
response = &http.Response{
Status: "200 OK",
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(strings.NewReader(content)),
Close: true,
}
updated_resp, _ = transport.ProcessResponse(request, response)
body, _ = ioutil.ReadAll(updated_resp.Body)
expected = string(`<pre><a href="/proxy/minion/minion1:8080/whatever/apt/kubelet.log">kubelet.log</a><a href="/proxy/minion/minion1:8080/whatever/apt/google.log">google.log</a></pre>`)
if !strings.Contains(string(body), expected) {
t.Errorf("Received wrong content: %s", string(body))
}
}
12 changes: 12 additions & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type CadvisorInterface interface {
}

// New creates a new Kubelet.
// TODO: currently it is only called by test code.
// Need cleanup.
func New() *Kubelet {
return &Kubelet{}
}
Expand All @@ -77,6 +79,7 @@ type Kubelet struct {
HTTPCheckFrequency time.Duration
pullLock sync.Mutex
HealthChecker health.HealthChecker
LogServer http.Handler
}

type manifestUpdate struct {
Expand All @@ -94,6 +97,9 @@ const (
// RunKubelet starts background goroutines. If config_path, manifest_url, or address are empty,
// they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL string, etcdServers []string, address string, port uint) {
if kl.LogServer == nil {
kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.CadvisorClient == nil {
var err error
kl.CadvisorClient, err = cadvisor.NewClient("http://127.0.0.1:5000")
Expand Down Expand Up @@ -812,3 +818,9 @@ func (kl *Kubelet) healthy(container api.Container, dockerContainer *docker.APIC
}
return kl.HealthChecker.HealthCheck(container)
}

// Returns logs of current machine.
func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
// TODO: whitelist logs we are willing to serve
kl.LogServer.ServeHTTP(w, req)
}
3 changes: 3 additions & 0 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type kubeletInterface interface {
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error)
GetPodInfo(name string) (api.PodInfo, error)
ServeLogs(w http.ResponseWriter, req *http.Request)
}

func (s *Server) error(w http.ResponseWriter, err error) {
Expand Down Expand Up @@ -131,6 +132,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
w.Header().Add("Content-type", "application/json")
w.Write(data)
case strings.HasPrefix(u.Path, "/logs/"):
s.Kubelet.ServeLogs(w, req)
default:
s.DelegateHandler.ServeHTTP(w, req)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/kubelet/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"net/http/httputil"
"reflect"
"strings"
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
Expand All @@ -37,6 +39,7 @@ type fakeKubelet struct {
containerInfoFunc func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
machineInfoFunc func() (*info.MachineInfo, error)
logFunc func(w http.ResponseWriter, req *http.Request)
}

func (fk *fakeKubelet) GetPodInfo(name string) (api.PodInfo, error) {
Expand All @@ -55,6 +58,10 @@ func (fk *fakeKubelet) GetMachineInfo() (*info.MachineInfo, error) {
return fk.machineInfoFunc()
}

func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
fk.logFunc(w, req)
}

type serverTestFramework struct {
updateChan chan manifestUpdate
updateReader *channelReader
Expand Down Expand Up @@ -253,3 +260,31 @@ func TestMachineInfo(t *testing.T) {
t.Errorf("received wrong data: %#v", receivedInfo)
}
}

func TestServeLogs(t *testing.T) {
fw := makeServerTest()

content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)

fw.fakeKubelet.logFunc = func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "text/html")
w.Write([]byte(content))
}

resp, err := http.Get(fw.testHTTPServer.URL + "/logs/")
if err != nil {
t.Fatalf("Got error GETing: %v", err)
}
defer resp.Body.Close()

body, err := httputil.DumpResponse(resp, true)
if err != nil {
// copying the response body did not work
t.Errorf("Cannot copy resp: %#v", err)
}
result := string(body)
if !strings.Contains(result, "kubelet.log") || !strings.Contains(result, "google.log") {
t.Errorf("Received wrong data: %s", result)
}
}
1 change: 1 addition & 0 deletions third_party/deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ DEP_PACKAGES="
gopkg.in/v1/yaml
bitbucket.org/kardianos/osext
code.google.com/p/google-api-go-client/googleapi
code.google.com/p/go.net/html
github.com/coreos/go-log/log
github.com/coreos/go-systemd/journal
code.google.com/p/go.net/websocket
Expand Down
Loading