Skip to content

Commit

Permalink
Merge pull request nats-io#615 from nats-io/varz_func
Browse files Browse the repository at this point in the history
[ADDED] Monitor endpoints as server methods (Varz(), etc..)
  • Loading branch information
kozlovic authored Mar 16, 2018
2 parents 4aa04a6 + 4e9d785 commit 8d539a4
Show file tree
Hide file tree
Showing 3 changed files with 772 additions and 983 deletions.
239 changes: 185 additions & 54 deletions server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ type Connz struct {
Conns []ConnInfo `json:"connections"`
}

// ConnzOptions are the options passed to Connz()
type ConnzOptions struct {
// Sort indicates how the results will be sorted. Check SortOpt for possible values.
// Only the sort by connection ID (ByCid) is ascending, all others are descending.
Sort SortOpt `json:"sort"`

// Username indicates if user names should be included in the results.
Username bool `json:"auth"`

// Subscriptions indicates if subscriptions should be included in the results.
Subscriptions bool `json:"subscriptions"`

// Offset is used for pagination. Connz() only returns connections starting at this
// offset from the global results.
Offset int `json:"offset"`

// Limit is the maximum number of connections that should be returned by Connz().
Limit int `json:"limit"`
}

// ConnInfo has detailed information on a per connection basis.
type ConnInfo struct {
Cid uint64 `json:"cid"`
Expand Down Expand Up @@ -64,36 +84,46 @@ const DefaultConnListSize = 1024

const defaultStackBufSize = 10000

// HandleConnz process HTTP requests for connection information.
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
sortOpt := SortOpt(r.URL.Query().Get("sort"))

// If no sort option given or sort is by uptime, then sort by cid
if sortOpt == "" || sortOpt == byUptime {
sortOpt = byCid
} else if !sortOpt.IsValid() {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("Invalid sorting option: %s", sortOpt)))
return
// Connz returns a Connz struct containing inormation about connections.
func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
var (
sortOpt = ByCid
auth bool
subs bool
offset int
limit = DefaultConnListSize
)

if opts != nil {
// If no sort option given or sort is by uptime, then sort by cid
if opts.Sort == "" || opts.Sort == ByUptime {
sortOpt = ByCid
} else {
sortOpt = opts.Sort
if !sortOpt.IsValid() {
return nil, fmt.Errorf("Invalid sorting option: %s", sortOpt)
}
}
auth = opts.Username
subs = opts.Subscriptions
offset = opts.Offset
if offset < 0 {
offset = 0
}
limit = opts.Limit
if limit <= 0 {
limit = DefaultConnListSize
}
}

c := &Connz{}
c.Now = time.Now()

auth, _ := strconv.Atoi(r.URL.Query().Get("auth"))
subs, _ := strconv.Atoi(r.URL.Query().Get("subs"))
c.Offset, _ = strconv.Atoi(r.URL.Query().Get("offset"))
if c.Offset < 0 {
c.Offset = 0
}
c.Limit, _ = strconv.Atoi(r.URL.Query().Get("limit"))
if c.Limit <= 0 {
c.Limit = DefaultConnListSize
c := &Connz{
Offset: offset,
Limit: limit,
Now: time.Now(),
}

// Walk the list
s.mu.Lock()
s.httpReqStats[ConnzPath]++
tlsRequired := s.info.TLSRequired

// copy the server id for monitoring
Expand All @@ -109,23 +139,23 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
for _, client := range s.clients {
client.mu.Lock()
switch sortOpt {
case byCid:
case ByCid:
pairs[i] = Pair{Key: client, Val: int64(client.cid)}
case bySubs:
case BySubs:
pairs[i] = Pair{Key: client, Val: int64(len(client.subs))}
case byPending:
case ByPending:
pairs[i] = Pair{Key: client, Val: int64(client.bw.Buffered())}
case byOutMsgs:
case ByOutMsgs:
pairs[i] = Pair{Key: client, Val: client.outMsgs}
case byInMsgs:
case ByInMsgs:
pairs[i] = Pair{Key: client, Val: atomic.LoadInt64(&client.inMsgs)}
case byOutBytes:
case ByOutBytes:
pairs[i] = Pair{Key: client, Val: client.outBytes}
case byInBytes:
case ByInBytes:
pairs[i] = Pair{Key: client, Val: atomic.LoadInt64(&client.inBytes)}
case byLast:
case ByLast:
pairs[i] = Pair{Key: client, Val: client.last.UnixNano()}
case byIdle:
case ByIdle:
pairs[i] = Pair{Key: client, Val: c.Now.Sub(client.last).Nanoseconds()}
}
client.mu.Unlock()
Expand All @@ -134,7 +164,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
s.mu.Unlock()

if totalClients > 0 {
if sortOpt == byCid {
if sortOpt == ByCid {
// Return in ascending order
sort.Sort(pairs)
} else {
Expand Down Expand Up @@ -193,21 +223,21 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
// still look sorted even if the value has changed since sort occurred.
sortValue := pair.Val
switch sortOpt {
case bySubs:
case BySubs:
ci.NumSubs = uint32(sortValue)
case byPending:
case ByPending:
ci.Pending = int(sortValue)
case byOutMsgs:
case ByOutMsgs:
ci.OutMsgs = sortValue
case byInMsgs:
case ByInMsgs:
ci.InMsgs = sortValue
case byOutBytes:
case ByOutBytes:
ci.OutBytes = sortValue
case byInBytes:
case ByInBytes:
ci.InBytes = sortValue
case byLast:
case ByLast:
ci.LastActivity = time.Unix(0, sortValue)
case byIdle:
case ByIdle:
ci.Idle = myUptime(time.Duration(sortValue))
}

Expand All @@ -229,7 +259,7 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
}

// Fill in subscription data if requested.
if subs == 1 {
if subs {
sublist := make([]*subscription, 0, len(client.subs))
for _, sub := range client.subs {
sublist = append(sublist, sub)
Expand All @@ -238,14 +268,68 @@ func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
}

// Fill in user if auth requested.
if auth == 1 {
if auth {
ci.AuthorizedUser = client.opts.Username
}

client.mu.Unlock()
i++
}
return c, nil
}

func decodeInt(w http.ResponseWriter, r *http.Request, param string) (int, error) {
str := r.URL.Query().Get(param)
if str == "" {
return 0, nil
}
val, err := strconv.Atoi(str)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("Error decoding %s: %v", param, err)))
return 0, err
}
return val, nil
}

// HandleConnz process HTTP requests for connection information.
func (s *Server) HandleConnz(w http.ResponseWriter, r *http.Request) {
sortOpt := SortOpt(r.URL.Query().Get("sort"))
auth, err := decodeInt(w, r, "auth")
if err != nil {
return
}
subs, err := decodeInt(w, r, "subs")
if err != nil {
return
}
offset, err := decodeInt(w, r, "offset")
if err != nil {
return
}
limit, err := decodeInt(w, r, "limit")
if err != nil {
return
}

connzOpts := &ConnzOptions{
Sort: sortOpt,
Username: auth == 1,
Subscriptions: subs == 1,
Offset: offset,
Limit: limit,
}

s.mu.Lock()
s.httpReqStats[ConnzPath]++
s.mu.Unlock()

c, err := s.Connz(connzOpts)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error()))
return
}
b, err := json.MarshalIndent(c, "", " ")
if err != nil {
s.Errorf("Error marshaling response to /connz request: %v", err)
Expand All @@ -268,6 +352,10 @@ type Subsz struct {
*SublistStats
}

// SubszOptions are the options passed to Subsz.
// As of now, there are no options defined.
type SubszOptions struct{}

// Routez represents detailed information on current client connections.
type Routez struct {
ID string `json:"server_id"`
Expand All @@ -276,6 +364,12 @@ type Routez struct {
Routes []*RouteInfo `json:"routes"`
}

// RoutezOptions are options passed to Routez
type RoutezOptions struct {
// Subscriptions indicates that Routez will return a route's subscriptions
Subscriptions bool `json:"subscriptions"`
}

// RouteInfo has detailed information on a per connection basis.
type RouteInfo struct {
Rid uint64 `json:"rid"`
Expand All @@ -293,17 +387,15 @@ type RouteInfo struct {
Subs []string `json:"subscriptions_list,omitempty"`
}

// HandleRoutez process HTTP requests for route information.
func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
// Routez returns a Routez struct containing inormation about routes.
func (s *Server) Routez(routezOpts *RoutezOptions) (*Routez, error) {
rs := &Routez{Routes: []*RouteInfo{}}
rs.Now = time.Now()

subs, _ := strconv.Atoi(r.URL.Query().Get("subs"))
subs := routezOpts != nil && routezOpts.Subscriptions

// Walk the list
s.mu.Lock()

s.httpReqStats[RoutezPath]++
rs.NumRoutes = len(s.routes)

// copy the server id for monitoring
Expand All @@ -323,7 +415,7 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
NumSubs: uint32(len(r.subs)),
}

if subs == 1 {
if subs {
sublist := make([]*subscription, 0, len(r.subs))
for _, sub := range r.subs {
sublist = append(sublist, sub)
Expand All @@ -340,7 +432,26 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
rs.Routes = append(rs.Routes, ri)
}
s.mu.Unlock()
return rs, nil
}

// HandleRoutez process HTTP requests for route information.
func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
subs, err := decodeInt(w, r, "subs")
if err != nil {
return
}
var opts *RoutezOptions
if subs == 1 {
opts = &RoutezOptions{Subscriptions: true}
}

s.mu.Lock()
s.httpReqStats[RoutezPath]++
s.mu.Unlock()

// As of now, no error is ever returned.
rs, _ := s.Routez(opts)
b, err := json.MarshalIndent(rs, "", " ")
if err != nil {
s.Errorf("Error marshaling response to /routez request: %v", err)
Expand All @@ -350,13 +461,19 @@ func (s *Server) HandleRoutez(w http.ResponseWriter, r *http.Request) {
ResponseHandler(w, r, b)
}

// Subsz returns a Subsz struct containing subjects statistics
func (s *Server) Subsz(subszOpts *SubszOptions) (*Subsz, error) {
return &Subsz{s.sl.Stats()}, nil
}

// HandleSubsz processes HTTP requests for subjects stats.
func (s *Server) HandleSubsz(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
s.httpReqStats[SubszPath]++
s.mu.Unlock()

st := &Subsz{s.sl.Stats()}
// As of now, no error is ever returned
st, _ := s.Subsz(nil)
b, err := json.MarshalIndent(st, "", " ")
if err != nil {
s.Errorf("Error marshaling response to /subscriptionsz request: %v", err)
Expand Down Expand Up @@ -412,6 +529,10 @@ type Varz struct {
ConfigLoadTime time.Time `json:"config_load_time"`
}

// VarzOptions are the options passed to Varz().
// Currently, there are no options defined.
type VarzOptions struct{}

func myUptime(d time.Duration) string {
// Just use total seconds for uptime, and display days / years
tsecs := d / time.Second
Expand Down Expand Up @@ -466,8 +587,8 @@ func (s *Server) HandleRoot(w http.ResponseWriter, r *http.Request) {
</html>`)
}

// HandleVarz will process HTTP requests for server information.
func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
// Varz returns a Varz struct containing the server information.
func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
// Snapshot server options.
opts := s.getOpts()

Expand All @@ -490,7 +611,6 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
v.SlowConsumers = atomic.LoadInt64(&s.slowConsumers)
v.Subscriptions = s.sl.Count()
v.ConfigLoadTime = s.configTime
s.httpReqStats[VarzPath]++
// Need a copy here since s.httpReqStas can change while doing
// the marshaling down below.
v.HTTPReqStats = make(map[string]uint64, len(s.httpReqStats))
Expand All @@ -499,6 +619,17 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
}
s.mu.Unlock()

return v, nil
}

// HandleVarz will process HTTP requests for server information.
func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
s.httpReqStats[VarzPath]++
s.mu.Unlock()

// As of now, no error is ever returned
v, _ := s.Varz(nil)
b, err := json.MarshalIndent(v, "", " ")
if err != nil {
s.Errorf("Error marshaling response to /varz request: %v", err)
Expand Down
Loading

0 comments on commit 8d539a4

Please sign in to comment.