Skip to content

Commit

Permalink
[-] minor changes to the sample SSE implementation (refactored to use…
Browse files Browse the repository at this point in the history
… serviceworker)

[-] updated SSE README for clarity
  • Loading branch information
bnkamalesh committed Jun 14, 2022
1 parent 627108a commit deab16e
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 56 deletions.
9 changes: 7 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,12 @@ func setup() (*webgo.Router, *sse.SSE) {

sseService := sse.New()
sseService.OnRemoveClient = func(ctx context.Context, clientID string, count int) {
log.Println(fmt.Sprintf("Client %q removed, active client(s): %d", clientID, count))
log.Printf("\nClient %q removed, active client(s): %d\n", clientID, count)
}
sseService.OnCreateClient = func(ctx context.Context, client *sse.Client, count int) {
log.Println(fmt.Sprintf("Client %q added, active client(s): %d", client.ID, count))
log.Printf("\nClient %q added, active client(s): %d\n", client.ID, count)
}

routes := getRoutes(sseService)
routes = append(routes, routeGroup.Routes()...)

Expand All @@ -147,6 +148,10 @@ func setup() (*webgo.Router, *sse.SSE) {

func main() {
router, sseService := setup()
clients := []*sse.Client{}
sseService.OnCreateClient = func(ctx context.Context, client *sse.Client, count int) {
clients = append(clients, client)
}
// broadcast server time to all SSE listeners
go func() {
retry := time.Millisecond * 500
Expand Down
81 changes: 29 additions & 52 deletions cmd/static/js/main.js
Original file line number Diff line number Diff line change
@@ -1,51 +1,4 @@
const webgo = async () => {
const sse = (url, config = {}) => {
const {
onMessage,
onError,
initialBackoff = 10, // milliseconds
maxBackoff = 15 * 1000, // 15 seconds
backoffStep = 50, // milliseconds
} = config;

let backoff = initialBackoff,
sseRetryTimeout = null;

const start = () => {
const source = new EventSource(url);
const configState = { initialBackoff, maxBackoff, backoffStep, backoff };

source.onopen = () => {
clearTimeout(sseRetryTimeout);
// reset backoff to initial, so further failures will again start with initial backoff
// instead of previous duration
backoff = initialBackoff;
configState.backoff = backoff
};

source.onmessage = (event, configState) => {
onMessage && onMessage(event, configState);
};

source.onerror = (err) => {
source.close();
clearTimeout(sseRetryTimeout);
// reattempt connecting with *linear* backoff
sseRetryTimeout = window.setTimeout(() => {
start(url, onMessage);
if (backoff < maxBackoff) {
backoff += backoffStep;
if (backoff > maxBackoff) {
backoff = maxBackoff;
}
}
}, backoff);
onError && onError(err, configState);
};
};
return start;
};

const clientID = Math.random()
.toString(36)
.replace(/[^a-z]+/g, "")
Expand All @@ -62,9 +15,14 @@ const webgo = async () => {
return boff;
};

sse(`/sse/${clientID}`, {
onMessage: (event) => {
const parts = event.data?.split("(");
const config = {
url: `/sse/${clientID}`,
onMessage: (data) => {
const parts = data?.split?.("(");
if (!parts || !parts.length) {
return;
}

const date = new Date(parts[0]);
const activeClients = parts[1].replace(")", "");
sseDOM.innerText = date.toLocaleString();
Expand All @@ -81,7 +39,7 @@ const webgo = async () => {
0
)}</strong>`;
backoff -= 1000;
if (backoff < 0) {
if (backoff < 0) {
sseDOM.innerHTML = `SSE failed, attempting reconnect in <strong>0s</strong>`;
window.clearInterval(interval);
}
Expand All @@ -91,6 +49,25 @@ const webgo = async () => {
},
initialBackoff: 1000,
backoffStep: 1000,
})();
};

const sseworker = new Worker("/static/js/sse.js");
sseworker.onerror = (e) => {
sseworker.terminate();
};

sseworker.onmessage = (e) => {
if (e?.data?.error) {
config.onError("SSE failed", e?.data);
} else {
config.onMessage(e?.data);
}
};

sseworker.postMessage({
url: config.url,
initialBackoff: config.initialBackoff,
backoffStep: config.backoffStep,
});
};
webgo();
62 changes: 62 additions & 0 deletions cmd/static/js/sse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
const sse = (url, config = {}) => {
const {
onMessage,
onError,
initialBackoff = 10, // milliseconds
maxBackoff = 15 * 1000, // 15 seconds
backoffStep = 50, // milliseconds
} = config;

let backoff = initialBackoff,
sseRetryTimeout = null;

const start = () => {
const source = new EventSource(url);
const configState = { initialBackoff, maxBackoff, backoffStep, backoff };

source.onopen = () => {
clearTimeout(sseRetryTimeout);
// reset backoff to initial, so further failures will again start with initial backoff
// instead of previous duration
backoff = initialBackoff;
configState.backoff = backoff;
};

source.onmessage = (event) => {
onMessage && onMessage(event, configState);
};

source.onerror = (err) => {
source.close();
if (!backoffStep) {
onError && onError(err, configState);
return;
}

clearTimeout(sseRetryTimeout);
// reattempt connecting with *linear* backoff
sseRetryTimeout = self.setTimeout(() => {
start(url, onMessage);
if (backoff < maxBackoff) {
backoff += backoffStep;
if (backoff > maxBackoff) {
backoff = maxBackoff;
}
}
}, backoff);
onError && onError(err, configState);
};
};
return start;
};

onmessage = (e) => {
sse(e?.data?.url, {
onMessage: (event) => {
postMessage(event?.data);
},
onError: (err, attrs) => {
postMessage({ error: "SSE failed", ...attrs });
},
})();
};
5 changes: 4 additions & 1 deletion extensions/sse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ func main() {
Retry: time.MilliSecond,
})

// You can replace the ClientManager with your custom implementation, and override the default one
// sseService.Clients = <your custom client manager>

// send message to an individual client
clientID := "cli123"
cli := sseService.Client(clientID)
Expand All @@ -31,7 +34,7 @@ func main() {

## Client Manager

Client manager is an interface which is required for SSE to function, was implemented so it's easier for you to replace it if required. The default implementation is rather simple one, using a mutex. If you have a custom implementation which is faster/better, you can easily swap out the default one.
Client manager is an interface which is required for SSE to function, since this is an interface it's easier for you to replace if required. The default implementation is a simple one using mutex. If you have a custom implementation which is faster/better, you can easily swap out the default one.

```golang
type ClientManager interface {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/bnkamalesh/webgo/v6

go 1.17
go 1.18

0 comments on commit deab16e

Please sign in to comment.