-
Notifications
You must be signed in to change notification settings - Fork 31
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[patch] refactored SSE for more capabilities of customizing (#42)
[-] improved reattempting of SSE reconnection, in the sample app
- Loading branch information
1 parent
1daf90e
commit 37494fa
Showing
6 changed files
with
270 additions
and
99 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
# Server-Sent Events | ||
|
||
This extension provides support for [Server-Sent](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) Events for any net/http compliant http server. | ||
It provides the following hooks for customizing the workflows: | ||
|
||
1. `OnCreateClient func(ctx context.Context, client *Client, count int)` | ||
2. `OnRemoveClient func(ctx context.Context, clientID string, count int)` | ||
3. `OnSend func(ctx context.Context, client *Client, err error)` | ||
4. `BeforeSend func(ctx context.Context, client *Client)` | ||
|
||
```golang | ||
import ( | ||
"github.com/bnkamalesh/webgo/extensions/sse" | ||
) | ||
func main() { | ||
sseService := sse.New() | ||
// broadcast to all active clients | ||
sseService.Broadcast(Message{ | ||
Data: "Hello world", | ||
Retry: time.MilliSecond, | ||
}) | ||
|
||
// send message to an individual client | ||
clientID := "cli123" | ||
cli := sseService.Client(clientID) | ||
if cli != nil { | ||
cli.Message <- &Message{Data: fmt.Sprintf("Hello %s",clientID), Retry: time.MilliSecond } | ||
} | ||
} | ||
``` | ||
|
||
## Client Manager | ||
|
||
Client manager is an interface which is required for SSE to function, was implemented so it's easier for you to replace it if required. The default implementation is rather simple one, using a mutex. If you have a custom implementation which is faster/better, you can easily swap out the default one. | ||
|
||
```golang | ||
type ClientManager interface { | ||
// New should return a new client, and the total number of active clients after adding this new one | ||
New(ctx context.Context, w http.ResponseWriter, clientID string) (*Client, int) | ||
// Range should iterate through all the active clients | ||
Range(func(*Client)) | ||
// Remove should remove the active client given a clientID, and close the connection | ||
Remove(clientID string) int | ||
// Active returns the number of active clients | ||
Active() int | ||
// Clients returns a list of all active clients | ||
Clients() []*Client | ||
// Client returns *Client if clientID is active | ||
Client(clientID string) *Client | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package sse | ||
|
||
import ( | ||
"context" | ||
"net/http" | ||
"sync" | ||
) | ||
|
||
type ClientManager interface { | ||
// New should return a new client, and the total number of active clients after adding this new one | ||
New(ctx context.Context, w http.ResponseWriter, clientID string) (*Client, int) | ||
// Range should iterate through all the active clients | ||
Range(func(*Client)) | ||
// Remove should remove the active client given a clientID, and close the connection | ||
Remove(clientID string) int | ||
// Active returns the number of active clients | ||
Active() int | ||
// Clients returns a list of all active clients | ||
Clients() []*Client | ||
// Client returns *Client if clientID is active | ||
Client(clientID string) *Client | ||
} | ||
|
||
type Client struct { | ||
ID string | ||
Msg chan *Message | ||
ResponseWriter http.ResponseWriter | ||
Ctx context.Context | ||
} | ||
|
||
type Clients struct { | ||
clients map[string]*Client | ||
locker sync.Mutex | ||
MsgBuffer int | ||
} | ||
|
||
func (cs *Clients) New(ctx context.Context, w http.ResponseWriter, clientID string) (*Client, int) { | ||
mchan := make(chan *Message, cs.MsgBuffer) | ||
cli := &Client{ | ||
ID: clientID, | ||
Msg: mchan, | ||
ResponseWriter: w, | ||
Ctx: ctx, | ||
} | ||
|
||
cs.locker.Lock() | ||
cs.clients[clientID] = cli | ||
count := len(cs.clients) | ||
cs.locker.Unlock() | ||
|
||
return cli, count | ||
} | ||
|
||
func (cs *Clients) Range(f func(cli *Client)) { | ||
cs.locker.Lock() | ||
for clientID := range cs.clients { | ||
f(cs.clients[clientID]) | ||
} | ||
cs.locker.Unlock() | ||
} | ||
|
||
func (cs *Clients) Remove(clientID string) int { | ||
cs.locker.Lock() | ||
delete(cs.clients, clientID) | ||
count := len(cs.clients) | ||
cs.locker.Unlock() | ||
return count | ||
} | ||
|
||
func (cs *Clients) Active() int { | ||
cs.locker.Lock() | ||
count := len(cs.clients) | ||
cs.locker.Unlock() | ||
return count | ||
|
||
} | ||
|
||
// MessageChannels returns a slice of message channels of all clients | ||
// which you can then use to send message concurrently | ||
func (cs *Clients) Clients() []*Client { | ||
idx := 0 | ||
cs.locker.Lock() | ||
list := make([]*Client, len(cs.clients)) | ||
for clientID := range cs.clients { | ||
cli := cs.clients[clientID] | ||
list[idx] = cli | ||
idx++ | ||
} | ||
cs.locker.Unlock() | ||
return list | ||
} | ||
|
||
func (cs *Clients) Client(clientID string) *Client { | ||
cs.locker.Lock() | ||
cli := cs.clients[clientID] | ||
cs.locker.Unlock() | ||
|
||
return cli | ||
} | ||
|
||
func NewClientManager() ClientManager { | ||
return &Clients{ | ||
clients: make(map[string]*Client), | ||
locker: sync.Mutex{}, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
package sse | ||
|
||
import ( | ||
"bytes" | ||
"net/http" | ||
"strconv" | ||
"time" | ||
) | ||
|
||
// Message represents a valid SSE message | ||
// ref: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events | ||
type Message struct { | ||
// Event is a string identifying the type of event described. If this is specified, an event will be dispatched on the browser to the listener for the specified event name; the website source code should use addEventListener() to listen for named events. The onmessage handler is called if no event name is specified for a message. | ||
Event string | ||
|
||
// Data field for the message. When the EventSource receives multiple consecutive lines that begin with data:, it concatenates them, inserting a newline character between each one. Trailing newlines are removed. | ||
Data string | ||
|
||
// ID to set the EventSource object's last event ID value. | ||
ID string | ||
|
||
// Retry is the reconnection time. If the connection to the server is lost, the browser will wait for the specified time before attempting to reconnect. This must be an integer, specifying the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored. | ||
Retry time.Duration | ||
} | ||
|
||
func (m *Message) Bytes() []byte { | ||
// The event stream is a simple stream of text data which must be encoded using UTF-8. | ||
// Messages in the event stream are separated by a pair of newline characters. | ||
// A colon as the first character of a line is in essence a comment, and is ignored. | ||
|
||
buff := bytes.NewBufferString("") | ||
if m.Event != "" { | ||
buff.WriteString("event:" + m.Event + "\n") | ||
} | ||
if m.ID != "" { | ||
buff.WriteString("id:" + m.ID + "\n") | ||
} | ||
if m.Data != "" { | ||
buff.WriteString("data:" + m.Data + "\n") | ||
} | ||
if m.Retry != 0 { | ||
buff.WriteString("retry:" + strconv.Itoa(int(m.Retry.Milliseconds())) + "\n") | ||
} | ||
buff.WriteString("\n") | ||
return buff.Bytes() | ||
} | ||
|
||
func DefaultUnsupportedMessageHandler(w http.ResponseWriter, r *http.Request) error { | ||
w.WriteHeader(http.StatusNotImplemented) | ||
_, err := w.Write([]byte("Streaming not supported")) | ||
return err | ||
} |
Oops, something went wrong.