Skip to content

Commit

Permalink
Merge pull request kubernetes#22 from feiskyer/fix-hyperclient
Browse files Browse the repository at this point in the history
Fix hyperclient
  • Loading branch information
feiskyer committed Oct 25, 2015
2 parents fa86ae3 + faae738 commit 8b81dd4
Showing 1 changed file with 59 additions and 61 deletions.
120 changes: 59 additions & 61 deletions pkg/kubelet/hyper/hyperclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,14 @@ func parseImageName(image string) (string, string) {
return repoToPull, tag
}

func (cli *HyperClient) clientRequest(method, path string, in io.Reader, headers map[string][]string) (io.ReadCloser, string, int, error) {
func (cli *HyperClient) clientRequest(method, path string, in io.Reader, headers map[string][]string) (io.ReadCloser, string, int, *net.Conn, *httputil.ClientConn, error) {
expectedPayload := (method == "POST" || method == "PUT")
if expectedPayload && in == nil {
in = bytes.NewReader([]byte{})
}
req, err := http.NewRequest(method, path, in)
if err != nil {
return nil, "", -1, err
return nil, "", -1, nil, nil, err
}
req.Header.Set("User-Agent", "kubelet")
req.URL.Host = cli.addr
Expand All @@ -196,44 +196,39 @@ func (cli *HyperClient) clientRequest(method, path string, in io.Reader, headers
var dial net.Conn
dial, err = net.DialTimeout(HYPER_PROTO, HYPER_ADDR, 32*time.Second)
if err != nil {
return nil, "", -1, err
return nil, "", -1, nil, nil, err
}
defer dial.Close()

clientconn := httputil.NewClientConn(dial, nil)
defer clientconn.Close()

resp, err := clientconn.Do(req)
statusCode := -1
if resp != nil {
statusCode = resp.StatusCode
}
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
return nil, "", statusCode, ErrConnectionRefused
return nil, "", statusCode, &dial, clientconn, ErrConnectionRefused
}

return nil, "", statusCode, fmt.Errorf("An error occurred trying to connect: %v", err)
return nil, "", statusCode, &dial, clientconn, fmt.Errorf("An error occurred trying to connect: %v", err)
}

if statusCode < 200 || statusCode >= 400 {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, "", statusCode, err
return nil, "", statusCode, &dial, clientconn, err
}
if len(body) == 0 {
return nil, "", statusCode, fmt.Errorf("Error: request returned %s for API route and version %s, check if the server supports the requested API version", http.StatusText(statusCode), req.URL)
}
if len(bytes.TrimSpace(body)) > 150 {
return nil, "", statusCode, fmt.Errorf("Error from daemon's response")
return nil, "", statusCode, nil, nil, fmt.Errorf("Error: request returned %s for API route and version %s, check if the server supports the requested API version", http.StatusText(statusCode), req.URL)
}
return nil, "", statusCode, fmt.Errorf("%s", bytes.TrimSpace(body))

return nil, "", statusCode, &dial, clientconn, fmt.Errorf("%s", bytes.TrimSpace(body))
}

return resp.Body, resp.Header.Get("Content-Type"), statusCode, nil
return resp.Body, resp.Header.Get("Content-Type"), statusCode, &dial, clientconn, nil
}

func (cli *HyperClient) call(method, path string, data interface{}, headers map[string][]string) (io.ReadCloser, int, error) {
func (cli *HyperClient) call(method, path string, data interface{}, headers map[string][]string) ([]byte, int, error) {
params, err := cli.encodeData(data)
if err != nil {
return nil, -1, err
Expand All @@ -246,63 +241,67 @@ func (cli *HyperClient) call(method, path string, data interface{}, headers map[
headers["Content-Type"] = []string{"application/json"}
}

body, _, statusCode, err := cli.clientRequest(method, path, params, headers)
return body, statusCode, err
}
body, _, statusCode, dial, clientconn, err := cli.clientRequest(method, path, params, headers)
if dial != nil {
defer (*dial).Close()
}
if clientconn != nil {
defer clientconn.Close()
}
if err != nil {
return nil, statusCode, err
}

func (cli *HyperClient) stream(method, path string, in io.Reader, out io.Writer, headers map[string][]string) error {
return cli.streamHelper(method, path, true, in, out, nil, headers)
}
if body == nil {
return nil, statusCode, err
}

defer body.Close()

func (cli *HyperClient) streamHelper(method, path string, setRawTerminal bool, in io.Reader, stdout, stderr io.Writer, headers map[string][]string) error {
body, contentType, _, err := cli.clientRequest(method, path, in, headers)
result, err := ioutil.ReadAll(body)
if err != nil {
return err
return nil, -1, err
}
return cli.streamBody(body, contentType, setRawTerminal, stdout, stderr)

return result, statusCode, nil
}

func MatchesContentType(contentType, expectedType string) bool {
mimetype, _, err := mime.ParseMediaType(contentType)
func (cli *HyperClient) stream(method, path string, in io.Reader, out io.Writer, headers map[string][]string) error {
body, contentType, _, dial, clientconn, err := cli.clientRequest(method, path, in, headers)
if dial != nil {
defer (*dial).Close()
}
if clientconn != nil {
defer clientconn.Close()
}
if err != nil {
glog.V(4).Infof("Error parsing media type: %s error: %v", contentType, err)
return err
}
return err == nil && mimetype == expectedType
}

func (cli *HyperClient) streamBody(body io.ReadCloser, contentType string, setRawTerminal bool, stdout, stderr io.Writer) error {
defer body.Close()

if MatchesContentType(contentType, "application/json") {
buf := new(bytes.Buffer)
buf.ReadFrom(body)
if stdout != nil {
stdout.Write(buf.Bytes())
if out != nil {
out.Write(buf.Bytes())
}
return nil
}
return nil

}

func readBody(stream io.ReadCloser, statusCode int, err error) ([]byte, int, error) {
if stream != nil {
defer stream.Close()
}
if err != nil {
return nil, statusCode, err
}
if stream == nil {
return nil, statusCode, err
}
body, err := ioutil.ReadAll(stream)
func MatchesContentType(contentType, expectedType string) bool {
mimetype, _, err := mime.ParseMediaType(contentType)
if err != nil {
return nil, -1, err
glog.V(4).Infof("Error parsing media type: %s error: %v", contentType, err)
}
return body, statusCode, nil
return err == nil && mimetype == expectedType
}

func (client *HyperClient) Version() (string, error) {
body, _, err := readBody(client.call("GET", "/version", nil, nil))
body, _, err := client.call("GET", "/version", nil, nil)
if err != nil {
return "", err
}
Expand All @@ -324,7 +323,7 @@ func (client *HyperClient) Version() (string, error) {
func (client *HyperClient) ListPods() ([]HyperPod, error) {
v := url.Values{}
v.Set(KEY_ITEM, TYPE_POD)
body, _, err := readBody(client.call("GET", "/list?"+v.Encode(), nil, nil))
body, _, err := client.call("GET", "/list?"+v.Encode(), nil, nil)
if err != nil {
return nil, err
}
Expand All @@ -346,7 +345,7 @@ func (client *HyperClient) ListPods() ([]HyperPod, error) {

values := url.Values{}
values.Set(KEY_POD_NAME, hyperPod.PodID)
body, _, err = readBody(client.call("GET", "/pod/info?"+values.Encode(), nil, nil))
body, _, err = client.call("GET", "/pod/info?"+values.Encode(), nil, nil)
if err != nil {
return nil, err
}
Expand All @@ -365,7 +364,7 @@ func (client *HyperClient) ListPods() ([]HyperPod, error) {
func (client *HyperClient) ListContainers() ([]HyperContainer, error) {
v := url.Values{}
v.Set(KEY_ITEM, TYPE_CONTAINER)
body, _, err := readBody(client.call("GET", "/list?"+v.Encode(), nil, nil))
body, _, err := client.call("GET", "/list?"+v.Encode(), nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -395,7 +394,7 @@ func (client *HyperClient) ListContainers() ([]HyperContainer, error) {
}

func (client *HyperClient) Info() (map[string]interface{}, error) {
body, _, err := readBody(client.call("GET", "/info", nil, nil))
body, _, err := client.call("GET", "/info", nil, nil)
if err != nil {
return nil, err
}
Expand All @@ -412,7 +411,7 @@ func (client *HyperClient) Info() (map[string]interface{}, error) {
func (client *HyperClient) ListImages() ([]HyperImage, error) {
v := url.Values{}
v.Set("all", "no")
body, _, err := readBody(client.call("GET", "/images/get?"+v.Encode(), nil, nil))
body, _, err := client.call("GET", "/images/get?"+v.Encode(), nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -457,7 +456,7 @@ func (client *HyperClient) ListImages() ([]HyperImage, error) {
func (client *HyperClient) RemoveImage(imageID string) error {
v := url.Values{}
v.Set(KEY_IMAGEID, imageID)
_, _, err := readBody(client.call("POST", "/images/remove?"+v.Encode(), nil, nil))
_, _, err := client.call("POST", "/images/remove?"+v.Encode(), nil, nil)
if err != nil {
return err
}
Expand All @@ -468,7 +467,7 @@ func (client *HyperClient) RemoveImage(imageID string) error {
func (client *HyperClient) RemovePod(podID string) error {
v := url.Values{}
v.Set(KEY_POD_ID, podID)
_, _, err := readBody(client.call("POST", "/pod/remove?"+v.Encode(), nil, nil))
_, _, err := client.call("POST", "/pod/remove?"+v.Encode(), nil, nil)
if err != nil {
return err
}
Expand All @@ -479,7 +478,7 @@ func (client *HyperClient) RemovePod(podID string) error {
func (client *HyperClient) StartPod(podID string) error {
v := url.Values{}
v.Set(KEY_POD_ID, podID)
_, _, err := readBody(client.call("POST", "/pod/start?"+v.Encode(), nil, nil))
_, _, err := client.call("POST", "/pod/start?"+v.Encode(), nil, nil)
if err != nil {
return err
}
Expand All @@ -491,7 +490,7 @@ func (client *HyperClient) StopPod(podID string) error {
v := url.Values{}
v.Set(KEY_POD_ID, podID)
v.Set("stopVM", "yes")
_, _, err := readBody(client.call("POST", "/pod/stop?"+v.Encode(), nil, nil))
_, _, err := client.call("POST", "/pod/stop?"+v.Encode(), nil, nil)
if err != nil {
return err
}
Expand All @@ -508,7 +507,6 @@ func (client *HyperClient) PullImage(image string, credential string) error {
headers["X-Registry-Auth"] = []string{credential}
}

//_, _, err := readBody(client.call("POST", "/image/create?"+v.Encode(), nil, headers))
err := client.stream("POST", "/image/create?"+v.Encode(), nil, nil, headers)
if err != nil {
return err
Expand All @@ -521,7 +519,7 @@ func (client *HyperClient) CreatePod(podArgs string) (map[string]interface{}, er
glog.V(5).Infof("Hyper: starting to create pod %s", podArgs)
v := url.Values{}
v.Set(KEY_POD_ARGS, podArgs)
body, _, err := readBody(client.call("POST", "/pod/create?"+v.Encode(), nil, nil))
body, _, err := client.call("POST", "/pod/create?"+v.Encode(), nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -641,7 +639,7 @@ func (client *HyperClient) IsImagePresent(repo, tag string) (bool, error) {
func (client *HyperClient) ListServices(podId string) ([]HyperService, error) {
v := url.Values{}
v.Set("podId", podId)
body, _, err := readBody(client.call("GET", "/service/list?"+v.Encode(), nil, nil))
body, _, err := client.call("GET", "/service/list?"+v.Encode(), nil, nil)
if err != nil {
if strings.Contains(err.Error(), "doesn't have services discovery") {
return nil, nil
Expand All @@ -668,7 +666,7 @@ func (client *HyperClient) UpdateServices(podId string, services []HyperService)
return err
}
v.Set("services", string(serviceData))
_, _, err = readBody(client.call("POST", "/service/update?"+v.Encode(), nil, nil))
_, _, err = client.call("POST", "/service/update?"+v.Encode(), nil, nil)
if err != nil {
return err
}
Expand Down

0 comments on commit 8b81dd4

Please sign in to comment.