Skip to content

Commit

Permalink
[patch] added hooks for create and remove clients from SSE active cli…
Browse files Browse the repository at this point in the history
…ents list
  • Loading branch information
bnkamalesh authored Mar 14, 2022
1 parent d0f695c commit 1daf90e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 11 deletions.
6 changes: 6 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ func setup() (*webgo.Router, *sse.SSE) {
routeGroup.Use(routegroupMiddleware)

sseService := sse.New()
sseService.OnRemoveClient = func(clientID string, count int) {
log.Println(fmt.Sprintf("Client %q removed, active client(s): %d", clientID, count))
}
sseService.OnCreateClient = func(clientID string, count int) {
log.Println(fmt.Sprintf("Client %q added, active client(s): %d", clientID, count))
}
routes := getRoutes(sseService)
routes = append(routes, routeGroup.Routes()...)

Expand Down
47 changes: 36 additions & 11 deletions extensions/sse/sse.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Package sse implements Server Sent Events(SSE)
// Package sse implements Server-Sent Events(SSE)
// This extension is compliant with any net/http implementation, and is not limited to WebGo.
package sse

import (
Expand All @@ -18,6 +19,12 @@ type SSE struct {
// UnsupportedMessage is used to send the error response to client if the
// server doesn't support SSE
UnsupportedMessage func(http.ResponseWriter, *http.Request) error

// OnCreateClient is a hook, for when a client is added to the active clients
OnCreateClient func(clientID string, count int)

// OnRemoveClient is a hook, for when a client is removed from the active clients
OnRemoveClient func(clientID string, count int)
}

// Handler returns an error rather than being directly used as an http.HandlerFunc,
Expand Down Expand Up @@ -63,6 +70,12 @@ func (sse *SSE) Handler(w http.ResponseWriter, r *http.Request) error {
}
}

// HandlerFunc is a convenience function which can be directly used with net/http implementations.
// Important: You cannot handle any error returned by the Handler
func (sse *SSE) HandlerFunc(w http.ResponseWriter, r *http.Request) {
_ = sse.Handler(w, r)
}

// ClientMessageChan returns a message channel to stream data to a client
// The boolean value is `true` if the client didn't exist before
func (sse *SSE) ClientMessageChan(clientID string) (chan *Message, bool) {
Expand All @@ -71,16 +84,22 @@ func (sse *SSE) ClientMessageChan(clientID string) (chan *Message, bool) {
msg = make(chan *Message)
sse.Clients.Store(clientID, msg)
count := sse.clientsCount.Load().(int)
sse.clientsCount.Store(count + 1)
count++
sse.clientsCount.Store(count)
sse.OnCreateClient(clientID, count)
}

return msg.(chan *Message), !ok
}

// RemoveClientMessageChan removes the channel from clients map given a clientID
func (sse *SSE) RemoveClientMessageChan(clientID string) {
sse.Clients.Delete(clientID)
count := sse.clientsCount.Load().(int)
sse.clientsCount.Store(count - 1)
count--
sse.clientsCount.Store(count)

sse.OnRemoveClient(clientID, count)
}

// Broadcast sends the message to all active clients
Expand Down Expand Up @@ -134,19 +153,25 @@ func (m *Message) Bytes() []byte {
return buff.Bytes()
}

func DefaultUnsupportedMessageHandler(w http.ResponseWriter, r *http.Request) error {
w.WriteHeader(http.StatusNotImplemented)
_, err := w.Write([]byte("Streaming not supported"))
return err
}

func DefaultHook(clientID string, count int) {}

func New() *SSE {
clientsCount := atomic.Value{}
clientsCount.Store(int(0))

s := &SSE{
Clients: sync.Map{},
clientsCount: clientsCount,
ClientIDHeader: "sse-clientid",
UnsupportedMessage: func(w http.ResponseWriter, r *http.Request) error {
w.WriteHeader(http.StatusNotImplemented)
_, err := w.Write([]byte("Streaming not supported"))
return err
},
Clients: sync.Map{},
clientsCount: clientsCount,
ClientIDHeader: "sse-clientid",
UnsupportedMessage: DefaultUnsupportedMessageHandler,
OnRemoveClient: DefaultHook,
OnCreateClient: DefaultHook,
}

return s
Expand Down

0 comments on commit 1daf90e

Please sign in to comment.