diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index dfeda9034f2e7..79317933298b1 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -272,6 +272,11 @@ "Comment": "0.1.3-8-g6633656", "Rev": "6633656539c1639d9d78127b7d47c622b5d7b6dc" }, + { + "ImportPath": "github.com/influxdb/influxdb/client", + "Comment": "v0.8.8", + "Rev": "afde71eb1740fd763ab9450e1f700ba0e53c36d0" + }, { "ImportPath": "github.com/kr/pty", "Comment": "release.r56-25-g05017fc", diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md new file mode 100644 index 0000000000000..2d849dfb7e088 --- /dev/null +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/README.md @@ -0,0 +1,2 @@ +influxdb-go +=========== diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/examples/example.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/examples/example.go new file mode 100644 index 0000000000000..6cc866e88c0c8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/examples/example.go @@ -0,0 +1,200 @@ +package examples + +import ( + "fmt" + + "github.com/influxdb/influxdb/client" +) + +func main() { + TestClient() +} + +func TestClient() { + internalTest(true) +} + +func TestClientWithoutCompression() { + internalTest(false) +} + +func internalTest(compression bool) { + c, err := client.NewClient(&client.ClientConfig{}) + if err != nil { + panic(err) + } + + admins, err := c.GetClusterAdminList() + if err != nil { + panic(err) + } + + if len(admins) == 1 { + if err := c.CreateClusterAdmin("admin", "password"); err != nil { + panic(err) + } + } + + admins, err = c.GetClusterAdminList() + if err != nil { + panic(err) + } + + if len(admins) != 2 { + panic("more than two admins returned") + } + + dbs, err := c.GetDatabaseList() + if err != nil { + panic(err) + } + + if len(dbs) == 0 { + if err := c.CreateDatabase("foobar"); err != nil { + panic(err) + } + } + + dbs, err = c.GetDatabaseList() + if err != nil { + panic(err) + } + + if len(dbs) != 1 && dbs[0]["foobar"] == nil { + panic("List of databases don't match") + } + + users, err := c.GetDatabaseUserList("foobar") + if err != nil { + panic(err) + } + + if len(users) == 0 { + if err := c.CreateDatabaseUser("foobar", "dbuser", "pass"); err != nil { + panic(err) + } + + if err := c.AlterDatabasePrivilege("foobar", "dbuser", true); err != nil { + panic(err) + } + } + + users, err = c.GetDatabaseUserList("foobar") + if err != nil { + panic(err) + } + + if len(users) != 1 { + panic("more than one user returned") + } + + c, err = client.NewClient(&client.ClientConfig{ + Username: "dbuser", + Password: "pass", + Database: "foobar", + }) + + if !compression { + c.DisableCompression() + } + + if err != nil { + panic(err) + } + + name := "ts9" + if !compression { + name = "ts9_uncompressed" + } + + series := &client.Series{ + Name: name, + Columns: []string{"value"}, + Points: [][]interface{}{ + {1.0}, + }, + } + if err := c.WriteSeries([]*client.Series{series}); err != nil { + panic(err) + } + + result, err := c.Query("select * from " + name) + if err != nil { + panic(err) + } + + if len(result) != 1 { + panic(fmt.Errorf("expected one time series returned: %d", len(result))) + } + + if len(result[0].Points) != 1 { + panic(fmt.Errorf("Expected one point: %d", len(result[0].Points))) + } + + if result[0].Points[0][2].(float64) != 1 { + panic("Value not equal to 1") + } + + c, err = client.NewClient(&client.ClientConfig{ + Username: "root", + Password: "root", + }) + + if err != nil { + panic(err) + } + + spaces, err := c.GetShardSpaces() + if err != nil || len(spaces) == 0 { + panic(fmt.Errorf("Got empty spaces back: %s", err)) + } + if spaces[0].Name != "default" { + panic("Space name isn't default") + } + space := &client.ShardSpace{Name: "foo", Regex: "/^paul_is_rad/"} + err = c.CreateShardSpace("foobar", space) + if err != nil { + panic(err) + } + spaces, _ = c.GetShardSpaces() + if spaces[1].Name != "foo" { + panic("Space name isn't foo") + } + shards, err := c.GetShards() + if err != nil { + panic(fmt.Errorf("Couldn't get shards back: %s", err)) + } + + c, err = client.NewClient(&client.ClientConfig{ + Username: "root", + Password: "root", + Database: "", + }) + series = &client.Series{ + Name: "paul_is_rad", + Columns: []string{"value"}, + Points: [][]interface{}{ + {1.0}, + }, + } + if err := c.WriteSeries([]*client.Series{series}); err != nil { + panic(err) + } + + spaces, _ = c.GetShardSpaces() + count := 0 + for _, s := range shards.All { + if s.SpaceName == "foo" { + count++ + } + } + + if err := c.DropShardSpace("foobar", "foo"); err != nil { + panic(fmt.Errorf("Error: %s", err)) + } + + spaces, err = c.GetShardSpaces() + if err != nil || len(spaces) != 1 || spaces[0].Name != "default" { + panic(fmt.Errorf("Error: %s, %d, %s", err, len(spaces), spaces[0].Name)) + } +} diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go new file mode 100644 index 0000000000000..22a50e5bcd20a --- /dev/null +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/influxdb.go @@ -0,0 +1,610 @@ +package client + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/url" + "strings" +) + +const ( + UDPMaxMessageSize = 2048 +) + +type Client struct { + host string + username string + password string + database string + httpClient *http.Client + udpConn *net.UDPConn + schema string + compression bool +} + +type ClientConfig struct { + Host string + Username string + Password string + Database string + HttpClient *http.Client + IsSecure bool + IsUDP bool +} + +var defaults *ClientConfig + +func init() { + defaults = &ClientConfig{ + Host: "localhost:8086", + Username: "root", + Password: "root", + Database: "", + HttpClient: http.DefaultClient, + } +} + +func getDefault(value, defaultValue string) string { + if value == "" { + return defaultValue + } + return value +} + +func New(config *ClientConfig) (*Client, error) { + return NewClient(config) +} + +func NewClient(config *ClientConfig) (*Client, error) { + host := getDefault(config.Host, defaults.Host) + username := getDefault(config.Username, defaults.Username) + password := getDefault(config.Password, defaults.Password) + database := getDefault(config.Database, defaults.Database) + if config.HttpClient == nil { + config.HttpClient = defaults.HttpClient + } + var udpConn *net.UDPConn + if config.IsUDP { + serverAddr, err := net.ResolveUDPAddr("udp", host) + if err != nil { + return nil, err + } + udpConn, err = net.DialUDP("udp", nil, serverAddr) + if err != nil { + return nil, err + } + } + + schema := "http" + if config.IsSecure { + schema = "https" + } + return &Client{host, username, password, database, config.HttpClient, udpConn, schema, false}, nil +} + +func (self *Client) DisableCompression() { + self.compression = false +} + +func (self *Client) getUrl(path string) string { + return self.getUrlWithUserAndPass(path, self.username, self.password) +} + +func (self *Client) getUrlWithUserAndPass(path, username, password string) string { + return fmt.Sprintf("%s://%s%s?u=%s&p=%s", self.schema, self.host, path, username, password) +} + +func responseToError(response *http.Response, err error, closeResponse bool) error { + if err != nil { + return err + } + if closeResponse { + defer response.Body.Close() + } + if response.StatusCode >= 200 && response.StatusCode < 300 { + return nil + } + defer response.Body.Close() + body, err := ioutil.ReadAll(response.Body) + if err != nil { + return err + } + return fmt.Errorf("Server returned (%d): %s", response.StatusCode, string(body)) +} + +func (self *Client) CreateDatabase(name string) error { + url := self.getUrl("/db") + payload := map[string]string{"name": name} + data, err := json.Marshal(payload) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} + +func (self *Client) del(url string) (*http.Response, error) { + return self.delWithBody(url, nil) +} + +func (self *Client) delWithBody(url string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequest("DELETE", url, body) + if err != nil { + return nil, err + } + return self.httpClient.Do(req) +} + +func (self *Client) DeleteDatabase(name string) error { + url := self.getUrl("/db/" + name) + resp, err := self.del(url) + return responseToError(resp, err, true) +} + +func (self *Client) get(url string) ([]byte, error) { + resp, err := self.httpClient.Get(url) + err = responseToError(resp, err, false) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + return body, err +} + +func (self *Client) getWithVersion(url string) ([]byte, string, error) { + resp, err := self.httpClient.Get(url) + err = responseToError(resp, err, false) + if err != nil { + return nil, "", err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + version := resp.Header.Get("X-Influxdb-Version") + fields := strings.Fields(version) + if len(fields) > 2 { + return body, fields[1], err + } + return body, "", err +} + +func (self *Client) listSomething(url string) ([]map[string]interface{}, error) { + body, err := self.get(url) + if err != nil { + return nil, err + } + somethings := []map[string]interface{}{} + err = json.Unmarshal(body, &somethings) + if err != nil { + return nil, err + } + return somethings, nil +} + +func (self *Client) GetDatabaseList() ([]map[string]interface{}, error) { + url := self.getUrl("/db") + return self.listSomething(url) +} + +func (self *Client) CreateClusterAdmin(name, password string) error { + url := self.getUrl("/cluster_admins") + payload := map[string]string{"name": name, "password": password} + data, err := json.Marshal(payload) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} + +func (self *Client) UpdateClusterAdmin(name, password string) error { + url := self.getUrl("/cluster_admins/" + name) + payload := map[string]string{"password": password} + data, err := json.Marshal(payload) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} + +func (self *Client) DeleteClusterAdmin(name string) error { + url := self.getUrl("/cluster_admins/" + name) + resp, err := self.del(url) + return responseToError(resp, err, true) +} + +func (self *Client) GetClusterAdminList() ([]map[string]interface{}, error) { + url := self.getUrl("/cluster_admins") + return self.listSomething(url) +} + +func (self *Client) Servers() ([]map[string]interface{}, error) { + url := self.getUrl("/cluster/servers") + return self.listSomething(url) +} + +func (self *Client) RemoveServer(id int) error { + resp, err := self.del(self.getUrl(fmt.Sprintf("/cluster/servers/%d", id))) + return responseToError(resp, err, true) +} + +// Creates a new database user for the given database. permissions can +// be omitted in which case the user will be able to read and write to +// all time series. If provided, there should be two strings, the +// first for read and the second for write. The strings are regexes +// that are used to match the time series name to determine whether +// the user has the ability to read/write to the given time series. +// +// client.CreateDatabaseUser("db", "user", "pass") +// // the following user cannot read from any series and can write +// // to the limited time series only +// client.CreateDatabaseUser("db", "limited", "pass", "^$", "limited") +func (self *Client) CreateDatabaseUser(database, name, password string, permissions ...string) error { + readMatcher, writeMatcher := ".*", ".*" + switch len(permissions) { + case 0: + case 2: + readMatcher, writeMatcher = permissions[0], permissions[1] + default: + return fmt.Errorf("You have to provide two ") + } + + url := self.getUrl("/db/" + database + "/users") + payload := map[string]string{"name": name, "password": password, "readFrom": readMatcher, "writeTo": writeMatcher} + data, err := json.Marshal(payload) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} + +// Change the cluster admin password +func (self *Client) ChangeClusterAdminPassword(name, newPassword string) error { + url := self.getUrl("/cluster_admins/" + name) + payload := map[string]interface{}{"password": newPassword} + data, err := json.Marshal(payload) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} + +// Change the user password, adming flag and optionally permissions +func (self *Client) ChangeDatabaseUser(database, name, newPassword string, isAdmin bool, newPermissions ...string) error { + switch len(newPermissions) { + case 0, 2: + default: + return fmt.Errorf("You have to provide two ") + } + + url := self.getUrl("/db/" + database + "/users/" + name) + payload := map[string]interface{}{"password": newPassword, "admin": isAdmin} + if len(newPermissions) == 2 { + payload["readFrom"] = newPermissions[0] + payload["writeTo"] = newPermissions[1] + } + data, err := json.Marshal(payload) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} + +// See Client.CreateDatabaseUser for more info on the permissions +// argument +func (self *Client) updateDatabaseUserCommon(database, name string, password *string, isAdmin *bool, permissions ...string) error { + url := self.getUrl("/db/" + database + "/users/" + name) + payload := map[string]interface{}{} + if password != nil { + payload["password"] = *password + } + if isAdmin != nil { + payload["admin"] = *isAdmin + } + switch len(permissions) { + case 0: + case 2: + payload["readFrom"] = permissions[0] + payload["writeTo"] = permissions[1] + default: + } + + data, err := json.Marshal(payload) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} + +func (self *Client) UpdateDatabaseUser(database, name, password string) error { + return self.updateDatabaseUserCommon(database, name, &password, nil) +} + +func (self *Client) UpdateDatabaseUserPermissions(database, name, readPermission, writePermissions string) error { + return self.updateDatabaseUserCommon(database, name, nil, nil, readPermission, writePermissions) +} + +func (self *Client) DeleteDatabaseUser(database, name string) error { + url := self.getUrl("/db/" + database + "/users/" + name) + resp, err := self.del(url) + return responseToError(resp, err, true) +} + +func (self *Client) GetDatabaseUserList(database string) ([]map[string]interface{}, error) { + url := self.getUrl("/db/" + database + "/users") + return self.listSomething(url) +} + +func (self *Client) AlterDatabasePrivilege(database, name string, isAdmin bool, permissions ...string) error { + return self.updateDatabaseUserCommon(database, name, nil, &isAdmin, permissions...) +} + +type TimePrecision string + +const ( + Second TimePrecision = "s" + Millisecond TimePrecision = "ms" + Microsecond TimePrecision = "u" +) + +func (self *Client) WriteSeries(series []*Series) error { + return self.writeSeriesCommon(series, nil) +} + +func (self *Client) WriteSeriesOverUDP(series []*Series) error { + if self.udpConn == nil { + return fmt.Errorf("UDP isn't enabled. Make sure to set config.IsUDP to true") + } + + data, err := json.Marshal(series) + if err != nil { + return err + } + // because max of msg over upd is 2048 bytes + // https://github.com/influxdb/influxdb/blob/master/src/api/udp/api.go#L65 + if len(data) >= UDPMaxMessageSize { + err = fmt.Errorf("data size over limit %v limit is %v", len(data), UDPMaxMessageSize) + fmt.Println(err) + return err + } + _, err = self.udpConn.Write(data) + if err != nil { + return err + } + return nil +} + +func (self *Client) WriteSeriesWithTimePrecision(series []*Series, timePrecision TimePrecision) error { + return self.writeSeriesCommon(series, map[string]string{"time_precision": string(timePrecision)}) +} + +func (self *Client) writeSeriesCommon(series []*Series, options map[string]string) error { + data, err := json.Marshal(series) + if err != nil { + return err + } + url := self.getUrl("/db/" + self.database + "/series") + for name, value := range options { + url += fmt.Sprintf("&%s=%s", name, value) + } + var b *bytes.Buffer + if self.compression { + b = bytes.NewBuffer(nil) + w := gzip.NewWriter(b) + if _, err := w.Write(data); err != nil { + return err + } + w.Flush() + w.Close() + } else { + b = bytes.NewBuffer(data) + } + req, err := http.NewRequest("POST", url, b) + if err != nil { + return err + } + if self.compression { + req.Header.Set("Content-Encoding", "gzip") + } + resp, err := self.httpClient.Do(req) + return responseToError(resp, err, true) +} + +func (self *Client) Query(query string, precision ...TimePrecision) ([]*Series, error) { + return self.queryCommon(query, false, precision...) +} + +func (self *Client) QueryWithNumbers(query string, precision ...TimePrecision) ([]*Series, error) { + return self.queryCommon(query, true, precision...) +} + +func (self *Client) queryCommon(query string, useNumber bool, precision ...TimePrecision) ([]*Series, error) { + escapedQuery := url.QueryEscape(query) + url := self.getUrl("/db/" + self.database + "/series") + if len(precision) > 0 { + url += "&time_precision=" + string(precision[0]) + } + url += "&q=" + escapedQuery + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + if !self.compression { + req.Header.Set("Accept-Encoding", "identity") + } + resp, err := self.httpClient.Do(req) + err = responseToError(resp, err, false) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + series := []*Series{} + decoder := json.NewDecoder(resp.Body) + if useNumber { + decoder.UseNumber() + } + err = decoder.Decode(&series) + if err != nil { + return nil, err + } + return series, nil +} + +func (self *Client) Ping() error { + url := self.getUrl("/ping") + resp, err := self.httpClient.Get(url) + return responseToError(resp, err, true) +} + +func (self *Client) AuthenticateDatabaseUser(database, username, password string) error { + url := self.getUrlWithUserAndPass(fmt.Sprintf("/db/%s/authenticate", database), username, password) + resp, err := self.httpClient.Get(url) + return responseToError(resp, err, true) +} + +func (self *Client) AuthenticateClusterAdmin(username, password string) error { + url := self.getUrlWithUserAndPass("/cluster_admins/authenticate", username, password) + resp, err := self.httpClient.Get(url) + return responseToError(resp, err, true) +} + +type LongTermShortTermShards struct { + // Long term shards, (doesn't get populated for version >= 0.8.0) + LongTerm []*Shard `json:"longTerm"` + // Short term shards, (doesn't get populated for version >= 0.8.0) + ShortTerm []*Shard `json:"shortTerm"` + // All shards in the system (Long + Short term shards for version < 0.8.0) + All []*Shard `json:"-"` +} + +type Shard struct { + Id uint32 `json:"id"` + EndTime int64 `json:"endTime"` + StartTime int64 `json:"startTime"` + ServerIds []uint32 `json:"serverIds"` + SpaceName string `json:"spaceName"` + Database string `json:"database"` +} + +type ShardSpaceCollection struct { + ShardSpaces []ShardSpace +} + +func (self *Client) GetShards() (*LongTermShortTermShards, error) { + url := self.getUrlWithUserAndPass("/cluster/shards", self.username, self.password) + body, version, err := self.getWithVersion(url) + if err != nil { + return nil, err + } + return parseShards(body, version) +} + +func isOrNewerThan(version, reference string) bool { + if version == "vdev" { + return true + } + majorMinor := strings.Split(version[1:], ".")[:2] + refMajorMinor := strings.Split(reference[1:], ".")[:2] + if majorMinor[0] > refMajorMinor[0] { + return true + } + if majorMinor[1] > refMajorMinor[1] { + return true + } + return majorMinor[1] == refMajorMinor[1] +} + +func parseShards(body []byte, version string) (*LongTermShortTermShards, error) { + // strip the initial v in `v0.8.0` and split on the dots + if version != "" && isOrNewerThan(version, "v0.8") { + return parseNewShards(body) + } + shards := &LongTermShortTermShards{} + err := json.Unmarshal(body, &shards) + if err != nil { + return nil, err + } + + shards.All = make([]*Shard, len(shards.LongTerm)+len(shards.ShortTerm)) + copy(shards.All, shards.LongTerm) + copy(shards.All[len(shards.LongTerm):], shards.ShortTerm) + return shards, nil +} + +func parseNewShards(body []byte) (*LongTermShortTermShards, error) { + shards := []*Shard{} + err := json.Unmarshal(body, &shards) + if err != nil { + return nil, err + } + + return &LongTermShortTermShards{All: shards}, nil +} + +// Added to InfluxDB in 0.8.0 +func (self *Client) GetShardSpaces() ([]*ShardSpace, error) { + url := self.getUrlWithUserAndPass("/cluster/shard_spaces", self.username, self.password) + body, err := self.get(url) + if err != nil { + return nil, err + } + spaces := []*ShardSpace{} + err = json.Unmarshal(body, &spaces) + if err != nil { + return nil, err + } + + return spaces, nil +} + +// Added to InfluxDB in 0.8.0 +func (self *Client) DropShardSpace(database, name string) error { + url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name), self.username, self.password) + _, err := self.del(url) + return err +} + +// Added to InfluxDB in 0.8.0 +func (self *Client) CreateShardSpace(database string, space *ShardSpace) error { + url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s", database)) + data, err := json.Marshal(space) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} + +func (self *Client) DropShard(id uint32, serverIds []uint32) error { + url := self.getUrlWithUserAndPass(fmt.Sprintf("/cluster/shards/%d", id), self.username, self.password) + ids := map[string][]uint32{"serverIds": serverIds} + body, err := json.Marshal(ids) + if err != nil { + return err + } + _, err = self.delWithBody(url, bytes.NewBuffer(body)) + return err +} + +// Added to InfluxDB in 0.8.2 +func (self *Client) UpdateShardSpace(database, name string, space *ShardSpace) error { + url := self.getUrl(fmt.Sprintf("/cluster/shard_spaces/%s/%s", database, name)) + data, err := json.Marshal(space) + if err != nil { + return err + } + resp, err := self.httpClient.Post(url, "application/json", bytes.NewBuffer(data)) + return responseToError(resp, err, true) +} diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/series.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/series.go new file mode 100644 index 0000000000000..f18b8bbb59ea1 --- /dev/null +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/series.go @@ -0,0 +1,19 @@ +package client + +type Series struct { + Name string `json:"name"` + Columns []string `json:"columns"` + Points [][]interface{} `json:"points"` +} + +func (self *Series) GetName() string { + return self.Name +} + +func (self *Series) GetColumns() []string { + return self.Columns +} + +func (self *Series) GetPoints() [][]interface{} { + return self.Points +} diff --git a/Godeps/_workspace/src/github.com/influxdb/influxdb/client/shard_space.go b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/shard_space.go new file mode 100644 index 0000000000000..87dea1173bc0f --- /dev/null +++ b/Godeps/_workspace/src/github.com/influxdb/influxdb/client/shard_space.go @@ -0,0 +1,15 @@ +package client + +type ShardSpace struct { + // required, must be unique within the database + Name string `json:"name"` + // required, a database has many shard spaces and a shard space belongs to a database + Database string `json:"database"` + // this is optional, if they don't set it, we'll set to /.*/ + Regex string `json:"regex"` + // this is optional, if they don't set it, it will default to the storage.dir in the config + RetentionPolicy string `json:"retentionPolicy"` + ShardDuration string `json:"shardDuration"` + ReplicationFactor uint32 `json:"replicationFactor"` + Split uint32 `json:"split"` +} diff --git a/cluster/addons/cluster-monitoring/influxdb-service.yaml b/cluster/addons/cluster-monitoring/influxdb-service.yaml index c2b9d4c87f423..cc62a91d3ee6f 100644 --- a/cluster/addons/cluster-monitoring/influxdb-service.yaml +++ b/cluster/addons/cluster-monitoring/influxdb-service.yaml @@ -5,6 +5,5 @@ port: 80 containerPort: 8086 labels: name: influxdb - kubernetes.io/cluster-service: "true" selector: name: influxGrafana diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index bc5bcde55317d..46b4413c67d4a 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -47,7 +47,7 @@ ENABLE_DOCKER_REGISTRY_CACHE=true ENABLE_NODE_MONITORING="${KUBE_ENABLE_NODE_MONITORING:-true}" # Optional: When set to true, heapster will be setup as part of the cluster bring up. -ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-false}" +ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-true}" # Optional: Enable node logging. ENABLE_NODE_LOGGING="${KUBE_ENABLE_NODE_LOGGING:-true}" diff --git a/hack/e2e-suite/monitoring.sh b/hack/e2e-suite/monitoring.sh deleted file mode 100755 index 1b1c3cc3ee361..0000000000000 --- a/hack/e2e-suite/monitoring.sh +++ /dev/null @@ -1,134 +0,0 @@ -#!/bin/bash - -# Copyright 2014 Google Inc. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Assumes a running Kubernetes test cluster; verifies that the monitoring setup -# works. Assumes that we're being called by hack/e2e-test.sh (we use some env -# vars it sets up). - -set -o errexit -set -o nounset -set -o pipefail - -KUBE_ROOT=$(dirname "${BASH_SOURCE}")/../.. - -: ${KUBE_VERSION_ROOT:=${KUBE_ROOT}} -: ${KUBECTL:="${KUBE_VERSION_ROOT}/cluster/kubectl.sh"} -: ${KUBE_CONFIG_FILE:="config-test.sh"} - -export KUBECTL KUBE_CONFIG_FILE - -source "${KUBE_ROOT}/cluster/kube-env.sh" -source "${KUBE_VERSION_ROOT}/cluster/${KUBERNETES_PROVIDER}/util.sh" - -prepare-e2e - -MONITORING="${KUBE_ROOT}/cluster/addons/cluster-monitoring" -KUBECTL="${KUBE_ROOT}/cluster/kubectl.sh" -BIGRAND=$(printf "%x\n" $(( $RANDOM << 16 | $RANDOM ))) # random 2^32 in hex -MONITORING_FIREWALL_RULE="monitoring-test-${BIGRAND}" - -function setup { - # This only has work to do on gce and gke - if [[ "${KUBERNETES_PROVIDER}" == "gce" ]] || [[ "${KUBERNETES_PROVIDER}" == "gke" ]]; then - detect-project - if ! "${GCLOUD}" compute firewall-rules create "${MONITORING_FIREWALL_RULE}" \ - --project "${PROJECT}" \ - --network "${NETWORK}" \ - --quiet \ - --allow tcp:80 tcp:8083 tcp:8086 tcp:9200; then - echo "Failed to set up firewall for monitoring" && false - fi - fi - - "${KUBECTL}" create -f "${MONITORING}/" -} - -function cleanup { - "${KUBECTL}" stop rc monitoring-influx-grafana-controller &> /dev/null || true - "${KUBECTL}" stop rc monitoring-heapster-controller &> /dev/null || true - "${KUBECTL}" delete -f "${MONITORING}/" &> /dev/null || true - - # This only has work to do on gce and gke - if [[ "${KUBERNETES_PROVIDER}" == "gce" ]] || [[ "${KUBERNETES_PROVIDER}" == "gke" ]]; then - detect-project - if "${GCLOUD}" compute firewall-rules describe "${MONITORING_FIREWALL_RULE}" &> /dev/null; then - "${GCLOUD}" compute firewall-rules delete \ - --project "${PROJECT}" \ - --quiet \ - "${MONITORING_FIREWALL_RULE}" || true - fi - fi -} - -function influx-data-exists { - local max_retries=10 - local retry_delay=30 #seconds - local influx_ip=$("${KUBECTL}" get pods -l name=influxGrafana -o template -t {{range.items}}{{.currentState.hostIP}}:{{end}} | sed s/://g) - local influx_url="http://$influx_ip:8086/db/k8s/series?u=root&p=root" - local ok="false" - for i in `seq 1 10`; do - if curl --retry $max_retries --retry-delay $retry_delay -G $influx_url --data-urlencode "q=select * from stats limit 1" \ - && curl --retry $max_retries --retry-delay $retry_delay -G $influx_url --data-urlencode "q=select * from machine limit 1"; then - echo "retrieved data from InfluxDB." - ok="true" - break - fi - sleep 5 - done - if [[ "${ok}" != "true" ]]; then - echo "failed to retrieve stats from InfluxDB. monitoring test failed" - exit 1 - fi -} - -function wait-for-pods { - local running=false - for i in `seq 1 20`; do - sleep 20 - if "${KUBECTL}" get pods -l name=influxGrafana -o template -t {{range.items}}{{.currentState.status}}:{{end}} | grep Running &> /dev/null \ - && "${KUBECTL}" get pods -l name=heapster -o template -t {{range.items}}{{.currentState.status}}:{{end}} | grep Running &> /dev/null; then - running=true - break - fi - done - if [ running == false ]; then - echo "giving up waiting on monitoring pods to be active. monitoring test failed" - exit 1 - fi -} - -trap cleanup EXIT - -# Remove any pre-existing monitoring services. -cleanup - -# Start monitoring pods and services. -setup - -# Wait for a maximum of 5 minutes for the influx grafana pod to be running. -echo "waiting for monitoring pods to be running" -wait-for-pods - -# Wait for some time to let heapster push some stats to InfluxDB. -echo "monitoring pods are running. waiting for stats to be pushed to InfluxDB" -sleep 60 - -# Check if stats data exists in InfluxDB -echo "checking if stats exist in InfluxDB" -influx-data-exists - -echo "monitoring setup works" -exit 0 diff --git a/test/e2e/monitoring.go b/test/e2e/monitoring.go new file mode 100644 index 0000000000000..8a6507a27ccea --- /dev/null +++ b/test/e2e/monitoring.go @@ -0,0 +1,236 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + "net/http" + "net/url" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + influxdb "github.com/influxdb/influxdb/client" + + . "github.com/onsi/ginkgo" +) + +var _ = Describe("Monitoring", func() { + var c *client.Client + + BeforeEach(func() { + var err error + c, err = loadClient() + expectNoError(err) + }) + + It("verify monitoring pods and all cluster nodes are available on influxdb using heapster.", func() { + if testContext.provider != "gce" { + By(fmt.Sprintf("Skipping Monitoring test, which is only supported for provider gce (not %s)", + testContext.provider)) + return + } + testMonitoringUsingHeapsterInfluxdb(c) + }) +}) + +const ( + influxdbService = "monitoring-influxdb" + influxdbDatabaseName = "k8s" + influxdbUser = "root" + influxdbPW = "root" + podlistQuery = "select distinct(pod) from stats" + nodelistQuery = "select distinct(hostname) from machine" + sleepBetweenAttempts = 5 * time.Second + testTimeout = 5 * time.Minute +) + +var ( + expectedRcs = map[string]bool{ + "monitoring-heapster-controller": false, + "monitoring-influx-grafana-controller": false, + } + expectedServices = map[string]bool{ + influxdbService: false, + "monitoring-heapster": false, + "monitoring-grafana": false, + } +) + +func verifyExpectedRcsExistAndGetExpectedPods(c *client.Client) ([]string, error) { + rcList, err := c.ReplicationControllers(api.NamespaceDefault).List(labels.Everything()) + if err != nil { + return nil, err + } + expectedPods := []string{} + for _, rc := range rcList.Items { + if _, ok := expectedRcs[rc.Name]; ok { + if rc.Status.Replicas != 1 { + return nil, fmt.Errorf("expected to find only one replica for rc %q, found %d", rc.Name, rc.Status.Replicas) + } + expectedRcs[rc.Name] = true + podList, err := c.Pods(api.NamespaceDefault).List(labels.Set(rc.Spec.Selector).AsSelector()) + if err != nil { + return nil, err + } + for _, pod := range podList.Items { + expectedPods = append(expectedPods, pod.Name) + } + } + } + for rc, found := range expectedRcs { + if !found { + return nil, fmt.Errorf("Replication Controller %q not found.", rc) + } + } + return expectedPods, nil +} + +func expectedServicesExist(c *client.Client) error { + serviceList, err := c.Services(api.NamespaceDefault).List(labels.Everything()) + if err != nil { + return err + } + for _, service := range serviceList.Items { + if _, ok := expectedServices[service.Name]; ok { + expectedServices[service.Name] = true + } + } + for service, found := range expectedServices { + if !found { + return fmt.Errorf("Service %q not found", service) + } + } + return nil +} + +func getAllNodesInCluster(c *client.Client) ([]string, error) { + nodeList, err := c.Nodes().List() + if err != nil { + return nil, err + } + result := []string{} + for _, node := range nodeList.Items { + result = append(result, node.Name) + } + return result, nil +} + +func getInfluxdbData(c *influxdb.Client, query string) (map[string]bool, error) { + series, err := c.Query(query, influxdb.Second) + if err != nil { + return nil, err + } + if len(series) != 1 { + Failf("expected only one series from Influxdb for query %q. Got %+v", query, series) + } + if len(series[0].GetColumns()) != 2 { + Failf("Expected two columns for query %q. Found %v", query, series[0].GetColumns()) + } + result := map[string]bool{} + for _, point := range series[0].GetPoints() { + if len(point) != 2 { + Failf("Expected only two entries in a point for query %q. Got %v", query, point) + } + name, ok := point[1].(string) + if !ok { + Failf("expected %v to be a string, but it is %T", point[1], point[1]) + } + result[name] = false + } + return result, nil +} + +func expectedItemsExist(expectedItems []string, actualItems map[string]bool) bool { + if len(actualItems) < len(expectedItems) { + return false + } + for _, item := range expectedItems { + if _, found := actualItems[item]; !found { + return false + } + } + return true +} + +func validatePodsAndNodes(influxdbClient *influxdb.Client, expectedPods, expectedNodes []string) bool { + pods, err := getInfluxdbData(influxdbClient, podlistQuery) + if err != nil { + // We don't fail the test here because the influxdb service might still not be running. + Logf("failed to query list of pods from influxdb. Query: %q, Err: %v", podlistQuery, err) + return false + } + nodes, err := getInfluxdbData(influxdbClient, nodelistQuery) + if err != nil { + Logf("failed to query list of nodes from influxdb. Query: %q, Err: %v", nodelistQuery, err) + return false + } + if !expectedItemsExist(expectedPods, pods) { + Logf("failed to find all expected Pods.\nExpected: %v\nActual: %v", expectedPods, pods) + return false + } + if !expectedItemsExist(expectedNodes, nodes) { + Logf("failed to find all expected Nodes.\nExpected: %v\nActual: %v", expectedNodes, nodes) + return false + } + return true +} + +func getMasterHost() string { + masterUrl, err := url.Parse(testContext.host) + expectNoError(err) + return masterUrl.Host +} + +func testMonitoringUsingHeapsterInfluxdb(c *client.Client) { + // Check if heapster pods and services are up. + expectedPods, err := verifyExpectedRcsExistAndGetExpectedPods(c) + expectNoError(err) + expectNoError(expectedServicesExist(c)) + // TODO: Wait for all pods and services to be running. + kubeMasterHttpClient, ok := c.Client.(*http.Client) + if !ok { + Failf("failed to get master http client") + } + proxyUrl := fmt.Sprintf("%s/api/v1beta1/proxy/services/%s/", getMasterHost(), influxdbService) + config := &influxdb.ClientConfig{ + Host: proxyUrl, + // TODO(vishh): Infer username and pw from the Pod spec. + Username: influxdbUser, + Password: influxdbPW, + Database: influxdbDatabaseName, + HttpClient: kubeMasterHttpClient, + IsSecure: true, + } + influxdbClient, err := influxdb.NewClient(config) + expectNoError(err, "failed to create influxdb client") + + expectedNodes, err := getAllNodesInCluster(c) + expectNoError(err) + startTime := time.Now() + for { + if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) { + return + } + if time.Since(startTime) >= testTimeout { + break + } + time.Sleep(sleepBetweenAttempts) + } + Failf("monitoring using heapster and influxdb test failed") +}