Skip to content

Commit

Permalink
started S2S implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman committed May 29, 2018
1 parent 20daacb commit 0beedee
Show file tree
Hide file tree
Showing 36 changed files with 627 additions and 342 deletions.
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion auth/scram.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"hash"
"strings"

"github.com/ortuman/jackal/server/transport"
"github.com/ortuman/jackal/storage"
"github.com/ortuman/jackal/storage/model"
"github.com/ortuman/jackal/stream"
"github.com/ortuman/jackal/transport"
"github.com/ortuman/jackal/util"
"github.com/ortuman/jackal/xml"
"github.com/pborman/uuid"
Expand Down
4 changes: 2 additions & 2 deletions auth/scram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"strings"
"testing"

"github.com/ortuman/jackal/server/compress"
"github.com/ortuman/jackal/server/transport"
"github.com/ortuman/jackal/storage/model"
"github.com/ortuman/jackal/transport"
"github.com/ortuman/jackal/transport/compress"
"github.com/ortuman/jackal/util"
"github.com/ortuman/jackal/xml"
"github.com/stretchr/testify/require"
Expand Down
55 changes: 26 additions & 29 deletions c2s/c2s.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ import (
"github.com/ortuman/jackal/module/xep0191"
"github.com/ortuman/jackal/module/xep0199"
"github.com/ortuman/jackal/router"
"github.com/ortuman/jackal/server/compress"
"github.com/ortuman/jackal/server/transport"
"github.com/ortuman/jackal/storage"
"github.com/ortuman/jackal/storage/model"
"github.com/ortuman/jackal/stream"
"github.com/ortuman/jackal/transport"
"github.com/ortuman/jackal/transport/compress"
"github.com/ortuman/jackal/xml"
"github.com/pborman/uuid"
)
Expand Down Expand Up @@ -109,8 +109,9 @@ type Stream struct {
authenticators []auth.Authenticator
activeAuth auth.Authenticator
mods modules
actorCh chan func()
doneCh chan<- struct{}

actorCh chan func()
doneCh chan<- struct{}
}

// New returns a new c2s stream instance.
Expand Down Expand Up @@ -210,18 +211,21 @@ func (s *Stream) Presence() *xml.Presence {
}

// SendElement sends the given XML element.
func (s *Stream) SendElement(element xml.XElement) {
func (s *Stream) SendElement(elem xml.XElement) {
s.actorCh <- func() {
s.writeElement(element)
s.writeElement(elem)
}
}

// Disconnect disconnects remote peer by closing
// the underlying TCP socket connection.
func (s *Stream) Disconnect(err error) {
waitCh := make(chan struct{})
s.actorCh <- func() {
s.disconnect(err)
close(waitCh)
}
<-waitCh
}

func (s *Stream) initializeAuthenticators() {
Expand Down Expand Up @@ -405,7 +409,7 @@ func (s *Stream) unauthenticatedFeatures() []xml.XElement {
// allow In-band registration over encrypted Stream only
allowRegistration := s.IsSecured()

if _, ok := s.cfg.Modules.Enabled["registration"]; ok && allowRegistration {
if reg := s.mods.register; reg != nil && allowRegistration {
registerFeature := xml.NewElementNamespace("register", "http://jabber.org/features/iq-register")
features = append(features, registerFeature)
}
Expand All @@ -415,10 +419,10 @@ func (s *Stream) unauthenticatedFeatures() []xml.XElement {
func (s *Stream) authenticatedFeatures() []xml.XElement {
var features []xml.XElement

isSocketTransport := s.tr.Type() == transport.Socket
isSocketTr := s.tr.Type() == transport.Socket

// attach compression feature
compressionAvailable := isSocketTransport && s.cfg.Compression.Level != compress.NoCompression
compressionAvailable := isSocketTr && s.cfg.Compression.Level != compress.NoCompression

if !s.IsCompressed() && compressionAvailable {
compression := xml.NewElementNamespace("compression", "http://jabber.org/features/compress")
Expand Down Expand Up @@ -529,7 +533,6 @@ func (s *Stream) handleSessionStarted(elem xml.XElement) {
if p := s.mods.ping; p != nil {
p.ResetDeadline()
}

stanza, err := s.buildStanza(elem, true)
if err != nil {
s.handleElementError(elem, err)
Expand Down Expand Up @@ -705,7 +708,7 @@ func (s *Stream) bindResource(iq *xml.IQ) {

s.writeElement(result)

if err := router.Instance().AuthenticateStream(s); err != nil {
if err := router.Instance().AuthenticateC2S(s); err != nil {
log.Error(err)
}
}
Expand All @@ -728,7 +731,6 @@ func (s *Stream) startSession(iq *xml.IQ) {
for _, mod := range s.mods.all {
mod.RegisterDisco(s.mods.discoInfo)
}

if p := s.mods.ping; p != nil {
p.StartPinging()
}
Expand All @@ -743,6 +745,10 @@ func (s *Stream) processStanza(stanza xml.Stanza) {
s.writeElement(resp)
return
}
if !router.Instance().IsLocalDomain(toJID.Domain()) {
router.Instance().Route(stanza)
return
}
switch stanza := stanza.(type) {
case *xml.Presence:
s.processPresence(stanza)
Expand All @@ -758,12 +764,8 @@ func (s *Stream) processComponentStanza(stanza xml.Stanza) {

func (s *Stream) processIQ(iq *xml.IQ) {
toJID := iq.ToJID()
if !router.Instance().IsLocalDomain(toJID.Domain()) {
// TODO(ortuman): Implement XMPP federation
return
}
if node := toJID.Node(); len(node) > 0 && router.Instance().IsBlockedJID(s.JID(), node) {
// destination user blocked Stream JID
// destination user blocked stream JID
if iq.IsGet() || iq.IsSet() {
s.writeElement(iq.ServiceUnavailableError())
}
Expand Down Expand Up @@ -793,10 +795,6 @@ func (s *Stream) processIQ(iq *xml.IQ) {

func (s *Stream) processPresence(presence *xml.Presence) {
toJID := presence.ToJID()
if !router.Instance().IsLocalDomain(toJID.Domain()) {
// TODO(ortuman): Implement XMPP federation
return
}
if toJID.IsBare() && (toJID.Node() != s.Username() || toJID.Domain() != s.Domain()) {
if rst := s.mods.roster; rst != nil {
rst.ProcessPresence(presence)
Expand Down Expand Up @@ -831,10 +829,6 @@ func (s *Stream) processPresence(presence *xml.Presence) {

func (s *Stream) processMessage(message *xml.Message) {
toJID := message.ToJID()
if !router.Instance().IsLocalDomain(toJID.Domain()) {
// TODO(ortuman): Implement XMPP federation
return
}

sendMessage:
err := router.Instance().Route(message)
Expand Down Expand Up @@ -931,6 +925,9 @@ func (s *Stream) readElement(elem xml.XElement) {
}

func (s *Stream) disconnect(err error) {
if s.getState() == disconnected {
return
}
switch err {
case nil:
s.disconnectClosingStream(false)
Expand Down Expand Up @@ -1108,9 +1105,6 @@ func (s *Stream) disconnectWithStreamError(err *streamerror.Error) {
}

func (s *Stream) disconnectClosingStream(closeStream bool) {
if err := s.updateLogoutInfo(); err != nil {
log.Error(err)
}
if presence := s.Presence(); presence != nil && presence.IsAvailable() && s.mods.roster != nil {
s.mods.roster.BroadcastPresenceAndWait(xml.NewPresence(s.JID(), s.JID(), xml.UnavailableType))
}
Expand All @@ -1126,7 +1120,10 @@ func (s *Stream) disconnectClosingStream(closeStream bool) {
close(s.doneCh)

// unregister stream
if err := router.Instance().UnregisterStream(s); err != nil {
if err := router.Instance().UnregisterC2S(s); err != nil {
log.Error(err)
}
if err := s.updateLogoutInfo(); err != nil {
log.Error(err)
}
s.setState(disconnected)
Expand Down
Loading

0 comments on commit 0beedee

Please sign in to comment.