Skip to content

Commit

Permalink
[patch] refactored SSE for more capabilities of customizing (#42)
Browse files Browse the repository at this point in the history
[-] improved reattempting of SSE reconnection, in the sample app
  • Loading branch information
bnkamalesh authored Mar 19, 2022
1 parent 1daf90e commit 37494fa
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 99 deletions.
7 changes: 4 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -129,11 +130,11 @@ func setup() (*webgo.Router, *sse.SSE) {
routeGroup.Use(routegroupMiddleware)

sseService := sse.New()
sseService.OnRemoveClient = func(clientID string, count int) {
sseService.OnRemoveClient = func(ctx context.Context, clientID string, count int) {
log.Println(fmt.Sprintf("Client %q removed, active client(s): %d", clientID, count))
}
sseService.OnCreateClient = func(clientID string, count int) {
log.Println(fmt.Sprintf("Client %q added, active client(s): %d", clientID, count))
sseService.OnCreateClient = func(ctx context.Context, client *sse.Client, count int) {
log.Println(fmt.Sprintf("Client %q added, active client(s): %d", client.ID, count))
}
routes := getRoutes(sseService)
routes = append(routes, routeGroup.Routes()...)
Expand Down
1 change: 1 addition & 0 deletions cmd/static/js/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const webgo = async () => {
const configState = { initialBackoff, maxBackoff, backoffStep, backoff };

source.onopen = () => {
clearTimeout(sseRetryTimeout);
// reset backoff to initial, so further failures will again start with initial backoff
// instead of previous duration
backoff = initialBackoff;
Expand Down
51 changes: 51 additions & 0 deletions extensions/sse/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Server-Sent Events

This extension provides support for [Server-Sent](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) Events for any net/http compliant http server.
It provides the following hooks for customizing the workflows:

1. `OnCreateClient func(ctx context.Context, client *Client, count int)`
2. `OnRemoveClient func(ctx context.Context, clientID string, count int)`
3. `OnSend func(ctx context.Context, client *Client, err error)`
4. `BeforeSend func(ctx context.Context, client *Client)`

```golang
import (
"github.com/bnkamalesh/webgo/extensions/sse"
)
func main() {
sseService := sse.New()
// broadcast to all active clients
sseService.Broadcast(Message{
Data: "Hello world",
Retry: time.MilliSecond,
})

// send message to an individual client
clientID := "cli123"
cli := sseService.Client(clientID)
if cli != nil {
cli.Message <- &Message{Data: fmt.Sprintf("Hello %s",clientID), Retry: time.MilliSecond }
}
}
```

## Client Manager

Client manager is an interface which is required for SSE to function, was implemented so it's easier for you to replace it if required. The default implementation is rather simple one, using a mutex. If you have a custom implementation which is faster/better, you can easily swap out the default one.

```golang
type ClientManager interface {
// New should return a new client, and the total number of active clients after adding this new one
New(ctx context.Context, w http.ResponseWriter, clientID string) (*Client, int)
// Range should iterate through all the active clients
Range(func(*Client))
// Remove should remove the active client given a clientID, and close the connection
Remove(clientID string) int
// Active returns the number of active clients
Active() int
// Clients returns a list of all active clients
Clients() []*Client
// Client returns *Client if clientID is active
Client(clientID string) *Client
}
```
106 changes: 106 additions & 0 deletions extensions/sse/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package sse

import (
"context"
"net/http"
"sync"
)

type ClientManager interface {
// New should return a new client, and the total number of active clients after adding this new one
New(ctx context.Context, w http.ResponseWriter, clientID string) (*Client, int)
// Range should iterate through all the active clients
Range(func(*Client))
// Remove should remove the active client given a clientID, and close the connection
Remove(clientID string) int
// Active returns the number of active clients
Active() int
// Clients returns a list of all active clients
Clients() []*Client
// Client returns *Client if clientID is active
Client(clientID string) *Client
}

type Client struct {
ID string
Msg chan *Message
ResponseWriter http.ResponseWriter
Ctx context.Context
}

type Clients struct {
clients map[string]*Client
locker sync.Mutex
MsgBuffer int
}

func (cs *Clients) New(ctx context.Context, w http.ResponseWriter, clientID string) (*Client, int) {
mchan := make(chan *Message, cs.MsgBuffer)
cli := &Client{
ID: clientID,
Msg: mchan,
ResponseWriter: w,
Ctx: ctx,
}

cs.locker.Lock()
cs.clients[clientID] = cli
count := len(cs.clients)
cs.locker.Unlock()

return cli, count
}

func (cs *Clients) Range(f func(cli *Client)) {
cs.locker.Lock()
for clientID := range cs.clients {
f(cs.clients[clientID])
}
cs.locker.Unlock()
}

func (cs *Clients) Remove(clientID string) int {
cs.locker.Lock()
delete(cs.clients, clientID)
count := len(cs.clients)
cs.locker.Unlock()
return count
}

func (cs *Clients) Active() int {
cs.locker.Lock()
count := len(cs.clients)
cs.locker.Unlock()
return count

}

// MessageChannels returns a slice of message channels of all clients
// which you can then use to send message concurrently
func (cs *Clients) Clients() []*Client {
idx := 0
cs.locker.Lock()
list := make([]*Client, len(cs.clients))
for clientID := range cs.clients {
cli := cs.clients[clientID]
list[idx] = cli
idx++
}
cs.locker.Unlock()
return list
}

func (cs *Clients) Client(clientID string) *Client {
cs.locker.Lock()
cli := cs.clients[clientID]
cs.locker.Unlock()

return cli
}

func NewClientManager() ClientManager {
return &Clients{
clients: make(map[string]*Client),
locker: sync.Mutex{},
}
}
52 changes: 52 additions & 0 deletions extensions/sse/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sse

import (
"bytes"
"net/http"
"strconv"
"time"
)

// Message represents a valid SSE message
// ref: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
type Message struct {
// Event is a string identifying the type of event described. If this is specified, an event will be dispatched on the browser to the listener for the specified event name; the website source code should use addEventListener() to listen for named events. The onmessage handler is called if no event name is specified for a message.
Event string

// Data field for the message. When the EventSource receives multiple consecutive lines that begin with data:, it concatenates them, inserting a newline character between each one. Trailing newlines are removed.
Data string

// ID to set the EventSource object's last event ID value.
ID string

// Retry is the reconnection time. If the connection to the server is lost, the browser will wait for the specified time before attempting to reconnect. This must be an integer, specifying the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored.
Retry time.Duration
}

func (m *Message) Bytes() []byte {
// The event stream is a simple stream of text data which must be encoded using UTF-8.
// Messages in the event stream are separated by a pair of newline characters.
// A colon as the first character of a line is in essence a comment, and is ignored.

buff := bytes.NewBufferString("")
if m.Event != "" {
buff.WriteString("event:" + m.Event + "\n")
}
if m.ID != "" {
buff.WriteString("id:" + m.ID + "\n")
}
if m.Data != "" {
buff.WriteString("data:" + m.Data + "\n")
}
if m.Retry != 0 {
buff.WriteString("retry:" + strconv.Itoa(int(m.Retry.Milliseconds())) + "\n")
}
buff.WriteString("\n")
return buff.Bytes()
}

func DefaultUnsupportedMessageHandler(w http.ResponseWriter, r *http.Request) error {
w.WriteHeader(http.StatusNotImplemented)
_, err := w.Write([]byte("Streaming not supported"))
return err
}
Loading

0 comments on commit 37494fa

Please sign in to comment.