Skip to content

Commit

Permalink
offline message gateway (ortuman#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman authored Apr 8, 2019
1 parent f96db58 commit bbfcf5a
Showing 13 changed files with 247 additions and 12 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [0.5.0] - 2019-04-08
### Added
- Offline message gateway support.

## [0.4.11] - 2019-04-07
### Fixed
- Fixed pgsql private storage.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -32,6 +32,23 @@ jackal supports the following features:
- Database connectivity for storing offline messages and user settings ([BadgerDB](https://github.com/dgraph-io/badger), MySQL 5.7+, MariaDB 10.2+, PostgreSQL 9.5+)
- Cross-platform (OS X, Linux)

## Push notifications

Support for [XEP-0357: Push Notifications](https://xmpp.org/extensions/xep-0357.html) is not yet available in `jackal`.

However there's a chance to forward offline messages to some external service by configuring offline module as follows:

```yaml
mod_offline:
queue_size: 2500
gateway:
type: http
auth: a-secret-token-here
pass: http://127.0.0.1:6666
```
Each time a message is sent to an offline user a `POST` http request to the `pass` URL is made, using the specified `Authorization` header and including the message stanza into the request body.

## Installing

### Getting Started
3 changes: 3 additions & 0 deletions dockerfiles/jackal.yml
Original file line number Diff line number Diff line change
@@ -35,6 +35,9 @@ modules:

mod_offline:
queue_size: 2500
# gateway:
# type: http
# pass: http://127.0.0.1:6666

mod_registration:
allow_registration: yes
3 changes: 3 additions & 0 deletions example.jackal.yml
Original file line number Diff line number Diff line change
@@ -55,6 +55,9 @@ modules:

mod_offline:
queue_size: 2500
# gateway:
# type: http
# pass: http://127.0.0.1:6666

mod_registration:
allow_registration: yes
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -33,7 +33,9 @@ require (
github.com/pkg/errors v0.8.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v0.9.2 // indirect
github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/sony/gobreaker v0.0.0-20180905101324-b2a34562d02c
github.com/stretchr/testify v1.2.2
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 // indirect
golang.org/x/crypto v0.0.0-20181106171534-e4dc69e5b2fd
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -82,8 +82,11 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jO
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE=
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rubyist/circuitbreaker v2.2.1+incompatible/go.mod h1:Ycs3JgJADPuzJDwffe12k6BZT8hxVi6lFK+gWYJLN4A=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sony/gobreaker v0.0.0-20180905101324-b2a34562d02c h1:7EMc5KMRVlkzEyK5n4YqdPEsmO+6AlAGCJiqnqW6n2Y=
github.com/sony/gobreaker v0.0.0-20180905101324-b2a34562d02c/go.mod h1:XvpJiTD8NibaH7z0NzyfhR1+NQDtR9F/x92xheTwC9k=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 h1:G3dpKMzFDjgEh2q1Z7zUUtKa8ViPtH+ocF0bE0g00O8=
40 changes: 40 additions & 0 deletions module/offline/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package offline

import "fmt"

const (
httpGatewayType = "http"
)

// Config represents Offline Storage module configuration.
type Config struct {
QueueSize int
Gateway gateway
}

type configProxy struct {
QueueSize int `yaml:"queue_size"`
Gateway *struct {
Type string `yaml:"type"`
Auth string `yaml:"auth"`
Pass string `yaml:"pass"`
} `yaml:"gateway"`
}

// UnmarshalYAML satisfies Unmarshaler interface.
func (cfg *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
p := configProxy{}
if err := unmarshal(&p); err != nil {
return err
}
cfg.QueueSize = p.QueueSize
if p.Gateway != nil {
switch p.Gateway.Type {
case httpGatewayType:
cfg.Gateway = newHTTPGateway(p.Gateway.Pass, p.Gateway.Auth)
default:
return fmt.Errorf("unrecognized offline gateway type: %s", p.Gateway.Type)
}
}
return nil
}
40 changes: 40 additions & 0 deletions module/offline/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package offline

import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
)

func TestOfflineConfig(t *testing.T) {
badCfg := `enabled [roster]`
cfg := &Config{}
err := yaml.Unmarshal([]byte(badCfg), &cfg)
require.NotNil(t, err)

goodCfg := `queue_size: 100`
cfg = &Config{}
err = yaml.Unmarshal([]byte(goodCfg), &cfg)
require.Nil(t, err)

wrongGatewayTypeCfg := `
queue_size: 100
gateway:
type: foo
url: http://127.0.0.1:6666
`
cfg = &Config{}
err = yaml.Unmarshal([]byte(wrongGatewayTypeCfg), &cfg)
require.NotNil(t, err)

goodGatewayTypeCfg := `
queue_size: 100
gateway:
type: http
url: http://127.0.0.1:6666
`
cfg = &Config{}
err = yaml.Unmarshal([]byte(goodGatewayTypeCfg), &cfg)
require.NotNil(t, err)
}
62 changes: 62 additions & 0 deletions module/offline/gateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package offline

import (
"bytes"
"fmt"
"net/http"

"github.com/ortuman/jackal/xmpp"
"github.com/sony/gobreaker"
)

type gateway interface {
Route(msg *xmpp.Message) error
}

type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}

type httpGateway struct {
url string
authToken string
reqBuf *bytes.Buffer
cb *gobreaker.CircuitBreaker
client httpClient
}

func newHTTPGateway(url string, authToken string) gateway {
return &httpGateway{
url: url,
authToken: authToken,
reqBuf: bytes.NewBuffer(nil),
cb: gobreaker.NewCircuitBreaker(gobreaker.Settings{}),
client: &http.Client{},
}
}

func (g *httpGateway) Route(msg *xmpp.Message) error {
msg.ToXML(g.reqBuf, true)
defer g.reqBuf.Reset()

req, err := http.NewRequest(http.MethodPost, g.url, g.reqBuf)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/xml")
req.Header.Set("Authorization", g.authToken)

_, err = g.cb.Execute(func() (i interface{}, e error) {
resp, err := g.client.Do(req)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("response status code: %d", resp.StatusCode)
}
return nil, nil
})
return err
}
61 changes: 61 additions & 0 deletions module/offline/gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package offline

import (
"io/ioutil"
"net/http"
"testing"

"github.com/google/uuid"
"github.com/ortuman/jackal/xmpp"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

type fakeReadCloser struct{}

func (rc *fakeReadCloser) Read(p []byte) (n int, err error) { return 0, nil }
func (rc *fakeReadCloser) Close() error { return nil }

type fakeHTTPClient struct {
do func(req *http.Request) (*http.Response, error)
}

func (c *fakeHTTPClient) Do(req *http.Request) (*http.Response, error) {
return c.do(req)
}

func TestHttpGateway_Route(t *testing.T) {
g := newHTTPGateway("http://127.0.0.1:6666", "a-secret-key").(*httpGateway)
fakeClient := &fakeHTTPClient{}
g.client = fakeClient

msg := xmpp.NewMessageType(uuid.New().String(), xmpp.ChatType)
body := xmpp.NewElementName("body")
body.SetText("This is an offline message!")
msg.AppendElement(body)

var reqBody string
fakeClient.do = func(req *http.Request) (response *http.Response, e error) {
require.Equal(t, http.MethodPost, req.Method)
require.Equal(t, "a-secret-key", req.Header.Get("Authorization"))
require.Equal(t, "application/xml", req.Header.Get("Content-Type"))

b, _ := ioutil.ReadAll(req.Body)
reqBody = string(b)
return &http.Response{StatusCode: http.StatusOK, Body: &fakeReadCloser{}}, nil
}

err := g.Route(msg)
require.Nil(t, err)
require.Equal(t, msg.String(), reqBody)

fakeClient.do = func(req *http.Request) (response *http.Response, e error) {
return &http.Response{StatusCode: http.StatusInternalServerError, Body: &fakeReadCloser{}}, nil
}
require.NotNil(t, g.Route(msg))

fakeClient.do = func(req *http.Request) (response *http.Response, e error) {
return nil, errors.New("foo error")
}
require.NotNil(t, g.Route(msg))
}
19 changes: 10 additions & 9 deletions module/offline/offline.go
Original file line number Diff line number Diff line change
@@ -20,11 +20,6 @@ const offlineNamespace = "msgoffline"

const offlineDeliveredCtxKey = "offline:delivered"

// Config represents Offline Storage module configuration.
type Config struct {
QueueSize int `yaml:"queue_size"`
}

// Offline represents an offline server stream module.
type Offline struct {
cfg *Config
@@ -101,6 +96,12 @@ func (o *Offline) archiveMessage(message *xmpp.Message) {
return
}
log.Infof("archived offline message... id: %s", message.ID())

if o.cfg.Gateway != nil {
if err := o.cfg.Gateway.Route(message); err != nil {
log.Errorf("bad offline gateway: %v", err)
}
}
}

func (o *Offline) deliverOfflineMessages(stm stream.C2S) {
@@ -109,17 +110,17 @@ func (o *Offline) deliverOfflineMessages(stm stream.C2S) {
}
// deliver offline messages
userJID := stm.JID()
msgs, err := storage.FetchOfflineMessages(userJID.Node())
messages, err := storage.FetchOfflineMessages(userJID.Node())
if err != nil {
log.Error(err)
return
}
if len(msgs) == 0 {
if len(messages) == 0 {
return
}
log.Infof("delivering offline msgs: %s... count: %d", userJID, len(msgs))
log.Infof("delivering offline messages: %s... count: %d", userJID, len(messages))

for _, m := range msgs {
for _, m := range messages {
o.router.Route(m)
}
if err := storage.DeleteOfflineMessages(userJID.Node()); err != nil {
3 changes: 1 addition & 2 deletions module/roster/roster.go
Original file line number Diff line number Diff line change
@@ -10,9 +10,8 @@ import (
"strconv"
"sync"

"github.com/ortuman/jackal/model"

"github.com/ortuman/jackal/log"
"github.com/ortuman/jackal/model"
"github.com/ortuman/jackal/model/rostermodel"
"github.com/ortuman/jackal/router"
"github.com/ortuman/jackal/storage"
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ import (
)

// ApplicationVersion represents application version.
var ApplicationVersion = NewVersion(0, 4, 11)
var ApplicationVersion = NewVersion(0, 5, 0)

// SemanticVersion represents version information with Semantic Versioning specifications.
type SemanticVersion struct {

0 comments on commit bbfcf5a

Please sign in to comment.