Skip to content

Commit

Permalink
refactorized mock storage.
Browse files Browse the repository at this point in the history
ortuman committed Apr 30, 2018
1 parent 4012201 commit 72c226b
Showing 5 changed files with 107 additions and 74 deletions.
8 changes: 4 additions & 4 deletions module/roster.go
Original file line number Diff line number Diff line change
@@ -636,10 +636,10 @@ func (r *ModRoster) pushItem(ri *model.RosterItem, to *xml.JID) error {
}

func (r *ModRoster) routePresencesFrom(from *xml.JID, to *xml.JID, presenceType string) {
fromStreams := c2s.Instance().StreamsMatchingJID(from.ToBareJID())
for _, fromStream := range fromStreams {
p := xml.NewPresence(fromStream.JID(), to.ToBareJID(), presenceType)
if presence := fromStream.Presence(); presence != nil && presenceType == xml.AvailableType {
stms := c2s.Instance().StreamsMatchingJID(from.ToBareJID())
for _, stm := range stms {
p := xml.NewPresence(stm.JID(), to.ToBareJID(), presenceType)
if presence := stm.Presence(); presence != nil && presenceType == xml.AvailableType {
p.AppendElements(presence.Elements().All())
}
r.routePresence(p, to)
47 changes: 36 additions & 11 deletions module/xep_0191.go
Original file line number Diff line number Diff line change
@@ -98,21 +98,27 @@ func (x *XEPBlockingCommand) sendBlockList(iq *xml.IQ) {
}

func (x *XEPBlockingCommand) block(iq *xml.IQ, block xml.XElement) {
ris, _, err := storage.Instance().FetchRosterItems(x.stm.Username())
if err != nil {
log.Error(err)
x.stm.SendElement(iq.InternalServerError())
return
}
items := block.Elements().Children("item")
if len(items) == 0 {
x.stm.SendElement(iq.BadRequestError())
return
}
jids, err := x.extractItemJIDs(items)
jds, err := x.extractItemJIDs(items)
if err != nil {
log.Error(err)
x.stm.SendElement(iq.JidMalformedError())
return
}
var bl []model.BlockListItem
for _, j := range jids {
for _, j := range jds {
if x.insertInMemBlockListJID(j) {
x.sendAvailablePresence(j)
x.broadcastPresenceMatching(j, ris, xml.UnavailableType)
bl = append(bl, model.BlockListItem{Username: x.stm.Username(), JID: j.String()})
}
}
@@ -128,26 +134,35 @@ func (x *XEPBlockingCommand) block(iq *xml.IQ, block xml.XElement) {
}

func (x *XEPBlockingCommand) unblock(iq *xml.IQ, unblock xml.XElement) {
ris, _, err := storage.Instance().FetchRosterItems(x.stm.Username())
if err != nil {
log.Error(err)
x.stm.SendElement(iq.InternalServerError())
return
}
items := unblock.Elements().Children("item")
if len(items) == 0 {
if err := storage.Instance().DeleteBlockList(x.stm.Username()); err != nil {
log.Error(err)
x.stm.SendElement(iq.InternalServerError())
return
}
for _, j := range x.inMemBl {
x.broadcastPresenceMatching(j, ris, xml.AvailableType)
}
x.inMemBl = nil

} else {
jids, err := x.extractItemJIDs(items)
jds, err := x.extractItemJIDs(items)
if err != nil {
log.Error(err)
x.stm.SendElement(iq.JidMalformedError())
return
}
var bl []model.BlockListItem
for _, j := range jids {
for _, j := range jds {
if x.deleteInMemBlockListJID(j) {
x.sendUnavailablePresence(j)
x.broadcastPresenceMatching(j, ris, xml.AvailableType)
bl = append(bl, model.BlockListItem{Username: x.stm.Username(), JID: j.String()})
}
}
@@ -211,11 +226,21 @@ func (x *XEPBlockingCommand) deleteInMemBlockListJID(jid *xml.JID) bool {
return false
}

func (x *XEPBlockingCommand) sendUnavailablePresence(jid *xml.JID) {

func (x *XEPBlockingCommand) broadcastPresenceMatching(jid *xml.JID, ris []model.RosterItem, presenceType string) {
stms := c2s.Instance().StreamsMatchingJID(jid)
for _, stm := range stms {
if !x.isSubscribedFrom(jid, ris) {
continue
}
p := xml.NewPresence(stm.JID(), x.stm.JID().ToBareJID(), presenceType)
if presence := stm.Presence(); presence != nil && presenceType == xml.AvailableType {
p.AppendElements(presence.Elements().All())
}
}
}

func (x *XEPBlockingCommand) sendAvailablePresence(jid *xml.JID) {
func (x *XEPBlockingCommand) isSubscribedFrom(jid *xml.JID, ris []model.RosterItem) bool {
return false
}

func (x *XEPBlockingCommand) extractItemJIDs(items []xml.XElement) ([]*xml.JID, error) {
@@ -233,10 +258,10 @@ func (x *XEPBlockingCommand) extractItemJIDs(items []xml.XElement) ([]*xml.JID,
func (x *XEPBlockingCommand) jidMatchesBlockedJID(jid, blockedJID *xml.JID) bool {
if blockedJID.IsFullWithUser() {
return jid.Matches(blockedJID, xml.JIDMatchesNode|xml.JIDMatchesDomain|xml.JIDMatchesResource)
} else if blockedJID.IsFullWithServer() {
return jid.Matches(blockedJID, xml.JIDMatchesDomain|xml.JIDMatchesResource)
} else if blockedJID.IsBare() {
return jid.Matches(blockedJID, xml.JIDMatchesNode|xml.JIDMatchesDomain)
} else if blockedJID.IsServer() && blockedJID.IsFull() {
return jid.Matches(blockedJID, xml.JIDMatchesDomain|xml.JIDMatchesResource)
}
return jid.Matches(blockedJID, xml.JIDMatchesDomain)
}
65 changes: 6 additions & 59 deletions server/c2s_stream.go
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@ import (
"bytes"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
@@ -70,12 +69,6 @@ const (
offlineOnce = "offlineOnce"
)

var (
errNotExistingAccount = errors.New("account does not exist")
errResourceNotFound = errors.New("resource not found")
errNotAuthenticated = errors.New("user not authenticated")
)

type c2sStream struct {
cfg *config.Server
tr transport.Transport
@@ -704,7 +697,7 @@ func (s *c2sStream) processIQ(iq *xml.IQ) {

toJid := iq.ToJID()
if toJid.IsFullWithUser() {
if err := s.sendElement(iq, toJid); err == errResourceNotFound {
if err := c2s.Instance().Route(iq); err == c2s.ErrResourceNotFound {
resp := xml.NewElementFromElement(iq)
resp.SetFrom(toJid.String())
resp.SetTo(s.JID().String())
@@ -740,7 +733,7 @@ func (s *c2sStream) processPresence(presence *xml.Presence) {
return
}
if toJid.IsFullWithUser() {
s.sendElement(presence, toJid)
c2s.Instance().Route(presence)
return
}
// set context presence
@@ -771,22 +764,22 @@ func (s *c2sStream) processMessage(message *xml.Message) {
toJid := message.ToJID()

sendMessage:
err := s.sendElement(message, toJid)
err := c2s.Instance().Route(message)
switch err {
case nil:
break
case errNotAuthenticated:
case c2s.ErrNotAuthenticated:
if s.offline != nil {
if (message.IsChat() || message.IsGroupChat()) && message.IsMessageWithBody() {
return
}
s.offline.ArchiveMessage(message)
}
case errResourceNotFound:
case c2s.ErrResourceNotFound:
// treat the stanza as if it were addressed to <node@domain>
toJid = toJid.ToBareJID()
goto sendMessage
case errNotExistingAccount:
case c2s.ErrNotExistingAccount:
response := xml.NewElementFromElement(message)
response.SetFrom(toJid.String())
response.SetTo(s.JID().String())
@@ -1106,49 +1099,3 @@ func (s *c2sStream) userResourceStream(resource string) c2s.Stream {
}
return nil
}

func (s *c2sStream) sendElement(element xml.XElement, to *xml.JID) error {
recipients := c2s.Instance().StreamsMatchingJID(to.ToBareJID())
if len(recipients) == 0 {
exists, err := storage.Instance().UserExists(to.Node())
if err != nil {
return err
}
if exists {
return errNotAuthenticated
}
return errNotExistingAccount
}
if to.IsFullWithUser() {
for _, strm := range recipients {
if strm.Resource() == to.Resource() {
strm.SendElement(element)
return nil
}
}
return errResourceNotFound
}
switch element.(type) {
case *xml.Message:
// send to highest priority stream
stm := recipients[0]
var highestPriority int8
if p := stm.Presence(); p != nil {
highestPriority = p.Priority()
}
for i := 1; i < len(recipients); i++ {
rcp := recipients[i]
if p := rcp.Presence(); p != nil && p.Priority() > highestPriority {
stm = rcp
}
}
stm.SendElement(element)

default:
// broadcast to all streams
for _, strm := range recipients {
strm.SendElement(element)
}
}
return nil
}
55 changes: 55 additions & 0 deletions stream/c2s/c2s.go
Original file line number Diff line number Diff line change
@@ -6,16 +6,24 @@
package c2s

import (
"errors"
"fmt"
"sync"
"sync/atomic"

"github.com/ortuman/jackal/config"
"github.com/ortuman/jackal/log"
"github.com/ortuman/jackal/storage"
"github.com/ortuman/jackal/stream"
"github.com/ortuman/jackal/xml"
)

var (
ErrNotExistingAccount = errors.New("c2s: account does not exist")
ErrResourceNotFound = errors.New("c2s: resource not found")
ErrNotAuthenticated = errors.New("c2s: user not authenticated")
)

// Stream represents a client-to-server XMPP stream.
type Stream interface {
ID() string
@@ -168,6 +176,53 @@ func (m *Manager) AuthenticateStream(strm Stream) error {
return nil
}

func (m *Manager) Route(elem xml.RoutableElement) error {
to := elem.ToJID()
rcps := m.StreamsMatchingJID(to.ToBareJID())
if len(rcps) == 0 {
exists, err := storage.Instance().UserExists(to.Node())
if err != nil {
return err
}
if exists {
return ErrNotAuthenticated
}
return ErrNotExistingAccount
}
if to.IsFullWithUser() {
for _, stm := range rcps {
if stm.Resource() == to.Resource() {
stm.SendElement(elem)
return nil
}
}
return ErrResourceNotFound
}
switch elem.(type) {
case *xml.Message:
// send to highest priority stream
stm := rcps[0]
var highestPriority int8
if p := stm.Presence(); p != nil {
highestPriority = p.Priority()
}
for i := 1; i < len(rcps); i++ {
rcp := rcps[i]
if p := rcp.Presence(); p != nil && p.Priority() > highestPriority {
stm = rcp
}
}
stm.SendElement(elem)

default:
// broadcast to all streams
for _, stm := range rcps {
stm.SendElement(elem)
}
}
return nil
}

func (m *Manager) StreamsMatchingJID(jid *xml.JID) []Stream {
if !m.IsLocalDomain(jid.Domain()) {
return nil
6 changes: 6 additions & 0 deletions xml/xml.go
Original file line number Diff line number Diff line change
@@ -41,3 +41,9 @@ type XElement interface {
ToXML(w io.Writer, includeClosing bool)
ToGob(enc *gob.Encoder)
}

type RoutableElement interface {
XElement
FromJID() *JID
ToJID() *JID
}

0 comments on commit 72c226b

Please sign in to comment.