Skip to content

Commit

Permalink
added session unit.
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman committed Jun 5, 2018
1 parent be7498e commit e954e84
Showing 9 changed files with 389 additions and 15 deletions.
11 changes: 4 additions & 7 deletions c2s/c2s.go
Original file line number Diff line number Diff line change
@@ -354,7 +354,6 @@ func (s *Stream) handleConnecting(elem xml.XElement) {
s.connectTm.Stop()
s.connectTm = nil
}

// validate stream element
if err := s.validateStreamElement(elem); err != nil {
s.disconnectWithStreamError(err)
@@ -364,7 +363,7 @@ func (s *Stream) handleConnecting(elem xml.XElement) {
s.ctx.SetString(elem.To(), domainCtxKey)

// open stream
s.opentream()
s.openStream()

features := xml.NewElementName("stream:features")
features.SetAttribute("xmlns:stream", streamNamespace)
@@ -554,7 +553,7 @@ func (s *Stream) proceedStartTLS() {

s.writeElement(xml.NewElementNamespace("proceed", tlsNamespace))

// don't do anything in case no TLS configurations has been provided (useful for testing purposes).
// don't do anything in case no TLS configuration has been provided (useful for testing purposes).
if tlsCfg := s.tlsCfg; tlsCfg != nil {
s.tr.StartTLS(tlsCfg, false)
}
@@ -671,7 +670,6 @@ func (s *Stream) bindResource(iq *xml.IQ) {
stm = s
}
}

if stm != nil {
switch s.cfg.ResourceConflict {
case Override:
@@ -780,7 +778,6 @@ func (s *Stream) processIQ(iq *xml.IQ) {
}
return
}

for _, handler := range s.mods.iqHandlers {
if !handler.MatchesIQ(iq) {
continue
@@ -943,7 +940,7 @@ func (s *Stream) disconnect(err error) {
}
}

func (s *Stream) opentream() {
func (s *Stream) openStream() {
var ops *xml.Element
var includeClosing bool

@@ -1100,7 +1097,7 @@ func (s *Stream) isComponentDomain(domain string) bool {

func (s *Stream) disconnectWithStreamError(err *streamerror.Error) {
if s.getState() == connecting {
s.opentream()
s.openStream()
}
s.writeElement(err.Element())
s.disconnectClosingStream(true)
3 changes: 3 additions & 0 deletions errors/error.go
Original file line number Diff line number Diff line change
@@ -48,6 +48,9 @@ var (
// ErrSystemShutdown represents 'system-shutdown' stream error.
ErrSystemShutdown = newStreamError("system-shutdown")

// ErrUndefinedCondition represents 'system-shutdown' stream error.
ErrUndefinedCondition = newStreamError("undefined-condition")

// ErrInternalServerError represents 'internal-server-error' stream error.
ErrInternalServerError = newStreamError("internal-server-error")
)
14 changes: 14 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -12,11 +12,14 @@ import (
"path/filepath"
"strconv"

"time"

"github.com/ortuman/jackal/log"
"github.com/ortuman/jackal/router"
"github.com/ortuman/jackal/s2s"
"github.com/ortuman/jackal/server"
"github.com/ortuman/jackal/storage"
"github.com/ortuman/jackal/stream"
"github.com/ortuman/jackal/version"
)

@@ -88,6 +91,8 @@ func main() {

storage.Initialize(&cfg.Storage)

testS2S()

// create PID file
if err := createPIDFile(cfg.PIDFile); err != nil {
log.Warnf("%v", err)
@@ -102,6 +107,15 @@ func main() {
server.Initialize(cfg.Servers, cfg.Debug.Port)
}

func testS2S() {
s, err := s2s.Dial("weuste.club", stream.S2SDialerOptions{KeepAlive: time.Second * time.Duration(2)})
if err != nil {
log.Error(err)
return
}
s.StartSession()
}

func createPIDFile(pidFile string) error {
if len(pidFile) == 0 {
return nil
88 changes: 81 additions & 7 deletions s2s/out.go
Original file line number Diff line number Diff line change
@@ -10,17 +10,21 @@ import (

"bytes"

"fmt"

"github.com/ortuman/jackal/errors"
"github.com/ortuman/jackal/log"
"github.com/ortuman/jackal/router"
"github.com/ortuman/jackal/transport"
"github.com/ortuman/jackal/xml"
)

const streamMailboxSize = 64

const (
connecting uint32 = iota
idle uint32 = iota
connecting
connected
securing
disconnected
)

@@ -37,7 +41,7 @@ func NewOut(domain string, tr transport.Transport) *Out {
domain: domain,
tr: tr,
parser: xml.NewParser(tr, 32768),
state: connecting,
state: idle,
actorCh: make(chan func(), streamMailboxSize),
}
go s.actorLoop()
@@ -46,14 +50,23 @@ func NewOut(domain string, tr transport.Transport) *Out {
return s
}

func (s *Out) ID() string {
func (s *Out) Domain() string {
return s.domain
}

func (s *Out) SendElement(elem xml.XElement) {
s.actorCh <- func() {
s.writeElement(elem)
}
}

func (s *Out) Disconnect(err error) {
waitCh := make(chan struct{})
s.actorCh <- func() {
s.disconnect(err)
close(waitCh)
}
<-waitCh
}

func (s *Out) StartSession() {
@@ -72,8 +85,7 @@ func (s *Out) startSession() {
ops.SetAttribute("xmlns:stream", streamNamespace)
buf.WriteString(`<?xml version="1.0"?>`)

ops.SetAttribute("from", router.Instance().DefaultLocalDomain())
ops.SetAttribute("to", s.ID())
ops.SetAttribute("to", s.domain)
ops.SetAttribute("version", "1.0")
ops.ToXML(buf, includeClosing)

@@ -93,6 +105,56 @@ func (s *Out) actorLoop() {
}
}

func (s *Out) handleElement(elem xml.XElement) {
switch s.getState() {
case connecting:
s.handleConnecting(elem)
case connected:
s.handleConnected(elem)
}
}

func (s *Out) handleConnecting(elem xml.XElement) {
switch elem.Name() {
case "stream:stream":
if len(elem.Namespace()) > 0 && elem.Namespace() != jabberServerNamespace {
s.disconnectWithStreamError(streamerror.ErrInvalidNamespace)
return
}
if elem.Attributes().Get("version") != "1.0" {
s.disconnectWithStreamError(streamerror.ErrUnsupportedVersion)
return
}
}
s.setState(connected)
}

func (s *Out) handleConnected(elem xml.XElement) {
fmt.Println(elem)
}

func (s *Out) disconnect(err error) {
if s.getState() == disconnected {
return
}
switch err {
case nil:
s.disconnectClosingStream(false)
default:
if strmErr, ok := err.(*streamerror.Error); ok {
s.disconnectWithStreamError(strmErr)
} else {
log.Error(err)
s.disconnectClosingStream(false)
}
}
}

func (s *Out) writeElement(element xml.XElement) {
log.Debugf("SEND: %v", element)
s.tr.WriteElement(element, true)
}

func (s *Out) readElement(elem xml.XElement) {
if elem != nil {
log.Debugf("RECV: %v", elem)
@@ -103,7 +165,19 @@ func (s *Out) readElement(elem xml.XElement) {
}
}

func (s *Out) handleElement(elem xml.XElement) {
func (s *Out) disconnectWithStreamError(err *streamerror.Error) {
s.writeElement(err.Element())
s.disconnectClosingStream(true)
}

func (s *Out) disconnectClosingStream(closeStream bool) {
if closeStream {
s.tr.WriteString("</stream:stream>")
}
// TODO(ortuman): unregister from router manager

s.setState(disconnected)
s.tr.Close()
}

func (s *Out) doRead() {
1 change: 1 addition & 0 deletions s2s/out_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package s2s
Loading

0 comments on commit e954e84

Please sign in to comment.