Skip to content

Commit

Permalink
Integrate message handler methods into S2S (ortuman#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman authored Apr 1, 2021
1 parent faa9604 commit c23251e
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 6 deletions.
2 changes: 1 addition & 1 deletion cmd/jackal/s2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package main
import "github.com/ortuman/jackal/s2s"

func initS2S(a *serverApp, cfg s2sOutConfig) {
a.s2sOutProvider = s2s.NewOutProvider(a.hosts, a.kv, a.shapers, a.sonar, s2s.Options{
a.s2sOutProvider = s2s.NewOutProvider(a.hosts, a.mods, a.kv, a.shapers, a.sonar, s2s.Options{
DialTimeout: cfg.DialTimeout,
DialbackSecret: cfg.DialbackSecret,
ConnectTimeout: cfg.ConnectTimeout,
Expand Down
8 changes: 6 additions & 2 deletions s2s/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,13 @@ func (s *inS2S) processIQ(ctx context.Context, iq *stravaganza.IQ) error {
}

func (s *inS2S) processMessage(ctx context.Context, message *stravaganza.Message) error {
// preprocess message stanza
msg, err := s.mods.PreProcessMessage(ctx, message)
if err != nil {
return err
}
// post message received event
err := s.postStreamEvent(ctx, event.S2SInStreamMessageReceived, &event.S2SStreamEventInfo{
err = s.postStreamEvent(ctx, event.S2SInStreamMessageReceived, &event.S2SStreamEventInfo{
ID: s.ID().String(),
Sender: s.sender,
Target: s.target,
Expand All @@ -392,7 +397,6 @@ func (s *inS2S) processMessage(ctx context.Context, message *stravaganza.Message
if err != nil {
return err
}
msg := message

sndMessage:
err = s.router.Route(ctx, msg)
Expand Down
3 changes: 3 additions & 0 deletions s2s/in_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,9 @@ func TestInS2S_HandleSessionElement(t *testing.T) {

// modules mock
modsMock.IsModuleIQFunc = func(iq *stravaganza.IQ) bool { return false }
modsMock.PreProcessMessageFunc = func(ctx context.Context, msg *stravaganza.Message) (*stravaganza.Message, error) {
return msg, nil
}

// session mock
outBuf := bytes.NewBuffer(nil)
Expand Down
3 changes: 3 additions & 0 deletions s2s/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ type modules interface {
IsModuleIQ(iq *stravaganza.IQ) bool
ProcessIQ(ctx context.Context, iq *stravaganza.IQ) error

PreProcessMessage(ctx context.Context, msg *stravaganza.Message) (*stravaganza.Message, error)
PreRouteMessage(ctx context.Context, msg *stravaganza.Message) (*stravaganza.Message, error)

IsEnabled(modName string) bool
}

Expand Down
19 changes: 17 additions & 2 deletions s2s/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type outS2S struct {
dbParams DialbackParams
dialer dialer
hosts *host.Hosts
mods modules
tlsCfg *tls.Config
onClose func(s *outS2S)
dbResCh chan stream.DialbackResult
Expand All @@ -114,6 +115,7 @@ func newOutS2S(
target string,
tlsCfg *tls.Config,
hosts *host.Hosts,
mods modules,
opts Options,
kv kv.KV,
shapers shaper.Shapers,
Expand All @@ -125,6 +127,7 @@ func newOutS2S(
sender: sender,
target: target,
hosts: hosts,
mods: mods,
tlsCfg: tlsCfg,
opts: opts,
onClose: onClose,
Expand All @@ -142,6 +145,7 @@ func newDialbackS2S(
target string,
tlsCfg *tls.Config,
hosts *host.Hosts,
mods modules,
opts Options,
dbParams DialbackParams,
shapers shaper.Shapers,
Expand All @@ -151,6 +155,7 @@ func newDialbackS2S(
sender: sender,
target: target,
hosts: hosts,
mods: mods,
tlsCfg: tlsCfg,
opts: opts,
dbParams: dbParams,
Expand Down Expand Up @@ -537,7 +542,17 @@ func (s *outS2S) sendOrEnqueueElement(ctx context.Context, elem stravaganza.Elem
}

func (s *outS2S) sendElement(ctx context.Context, elem stravaganza.Element) error {
err := s.session.Send(ctx, elem)
var sndErr error
msg, ok := elem.(*stravaganza.Message)
if ok {
newMsg, err := s.mods.PreRouteMessage(ctx, msg)
if err != nil {
return err
}
sndErr = s.session.Send(ctx, newMsg)
} else {
sndErr = s.session.Send(ctx, elem)
}

switch stanza := elem.(type) {
case stravaganza.Stanza:
Expand All @@ -556,7 +571,7 @@ func (s *outS2S) sendElement(ctx context.Context, elem stravaganza.Element) erro
elem.Name(),
elem.Attribute(stravaganza.Type),
)
return err
return sndErr
}

func (s *outS2S) close(ctx context.Context) error {
Expand Down
7 changes: 7 additions & 0 deletions s2s/out_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sync"
"time"

"github.com/ortuman/jackal/module"

"github.com/jackal-xmpp/sonar"
streamerror "github.com/jackal-xmpp/stravaganza/errors/stream"
"github.com/ortuman/jackal/cluster/kv"
Expand All @@ -33,6 +35,7 @@ import (
// OutProvider is an outgoing S2S stream provider.
type OutProvider struct {
hosts *host.Hosts
mods modules
opts Options
kv kv.KV
shapers shaper.Shapers
Expand All @@ -49,13 +52,15 @@ type OutProvider struct {
// NewOutProvider creates and initializes a new OutProvider instance.
func NewOutProvider(
hosts *host.Hosts,
mods *module.Modules,
kv kv.KV,
shapers shaper.Shapers,
sn *sonar.Sonar,
opts Options,
) *OutProvider {
op := &OutProvider{
hosts: hosts,
mods: mods,
shapers: shapers,
kv: kv,
sn: sn,
Expand Down Expand Up @@ -189,6 +194,7 @@ func (p *OutProvider) newOutS2S(sender, target string) s2sOut {
target,
p.tlsConfig(target),
p.hosts,
p.mods,
p.opts,
p.kv,
p.shapers,
Expand All @@ -203,6 +209,7 @@ func (p *OutProvider) newDialbackS2S(sender, target string, dbParam DialbackPara
target,
p.tlsConfig(target),
p.hosts,
p.mods,
p.opts,
dbParam,
p.shapers,
Expand Down
2 changes: 1 addition & 1 deletion version/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

// Version represents application version.
var Version = NewVersion(0, 52, 0)
var Version = NewVersion(0, 52, 1)

// APIVersion represents admin API version.
var APIVersion = NewVersion(1, 0, 0)
Expand Down

0 comments on commit c23251e

Please sign in to comment.