Skip to content

Commit

Permalink
c2s: implemented new c2s module.
Browse files Browse the repository at this point in the history
  • Loading branch information
ortuman committed May 24, 2018
1 parent 13d5832 commit 850b86d
Show file tree
Hide file tree
Showing 14 changed files with 1,559 additions and 1,580 deletions.
1,132 changes: 1,132 additions & 0 deletions c2s/c2s.go

Large diffs are not rendered by default.

62 changes: 30 additions & 32 deletions server/c2s_test.go → c2s/c2s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@
* See the LICENSE file for more information.
*/

package server
package c2s

import (
"io"
"net"
"testing"
"time"

"crypto/tls"

"github.com/ortuman/jackal/module/offline"
"github.com/ortuman/jackal/module/xep0077"
"github.com/ortuman/jackal/module/xep0092"
Expand All @@ -22,6 +20,7 @@ import (
"github.com/ortuman/jackal/server/transport"
"github.com/ortuman/jackal/storage"
"github.com/ortuman/jackal/storage/model"
"github.com/ortuman/jackal/util"
"github.com/ortuman/jackal/xml"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -127,7 +126,7 @@ func TestStream_ConnectTimeout(t *testing.T) {
router.Initialize(&router.Config{Domains: []string{"localhost"}})
defer router.Shutdown()

stm, _ := tUtilStreamInit()
stm, _ := tUtilStreamInit(t)
time.Sleep(time.Second * 2)
require.Equal(t, disconnected, stm.getState())
}
Expand All @@ -139,7 +138,7 @@ func TestStream_Disconnect(t *testing.T) {
router.Initialize(&router.Config{Domains: []string{"localhost"}})
defer router.Shutdown()

stm, conn := tUtilStreamInit()
stm, conn := tUtilStreamInit(t)
stm.Disconnect(nil)
require.True(t, conn.waitClose())

Expand All @@ -153,7 +152,7 @@ func TestStream_Features(t *testing.T) {
router.Initialize(&router.Config{Domains: []string{"localhost"}})
defer router.Shutdown()

stm, conn := tUtilStreamInit()
stm, conn := tUtilStreamInit(t)
tUtilStreamOpen(conn)

elem := conn.parseOutboundElement()
Expand All @@ -174,7 +173,7 @@ func TestStream_TLS(t *testing.T) {

storage.Instance().InsertOrUpdateUser(&model.User{Username: "user", Password: "pencil"})

stm, conn := tUtilStreamInit()
stm, conn := tUtilStreamInit(t)
tUtilStreamOpen(conn)
_ = conn.parseOutboundElement() // read stream opening...
_ = conn.parseOutboundElement() // read stream features...
Expand All @@ -198,7 +197,7 @@ func TestStream_Compression(t *testing.T) {

storage.Instance().InsertOrUpdateUser(&model.User{Username: "user", Password: "pencil"})

stm, conn := tUtilStreamInit()
stm, conn := tUtilStreamInit(t)
tUtilStreamOpen(conn)
_ = conn.parseOutboundElement() // read stream opening...
_ = conn.parseOutboundElement() // read stream features...
Expand Down Expand Up @@ -229,7 +228,7 @@ func TestStream_StartSession(t *testing.T) {

storage.Instance().InsertOrUpdateUser(&model.User{Username: "user", Password: "pencil"})

stm, conn := tUtilStreamInit()
stm, conn := tUtilStreamInit(t)
tUtilStreamOpen(conn)
_ = conn.parseOutboundElement() // read stream opening...
_ = conn.parseOutboundElement() // read stream features...
Expand All @@ -254,7 +253,7 @@ func TestStream_SendIQ(t *testing.T) {

storage.Instance().InsertOrUpdateUser(&model.User{Username: "user", Password: "pencil"})

stm, conn := tUtilStreamInit()
stm, conn := tUtilStreamInit(t)
tUtilStreamOpen(conn)
_ = conn.parseOutboundElement() // read stream opening...
_ = conn.parseOutboundElement() // read stream features...
Expand Down Expand Up @@ -293,7 +292,7 @@ func TestStream_SendPresence(t *testing.T) {

storage.Instance().InsertOrUpdateUser(&model.User{Username: "user", Password: "pencil"})

stm, conn := tUtilStreamInit()
stm, conn := tUtilStreamInit(t)
tUtilStreamOpen(conn)
_ = conn.parseOutboundElement() // read stream opening...
_ = conn.parseOutboundElement() // read stream features...
Expand Down Expand Up @@ -340,7 +339,7 @@ func TestStream_SendMessage(t *testing.T) {

storage.Instance().InsertOrUpdateUser(&model.User{Username: "user", Password: "pencil"})

stm, conn := tUtilStreamInit()
stm, conn := tUtilStreamInit(t)
tUtilStreamOpen(conn)
_ = conn.parseOutboundElement() // read stream opening...
_ = conn.parseOutboundElement() // read stream features...
Expand Down Expand Up @@ -434,12 +433,18 @@ func tUtilStreamStartSession(conn *fakeSocketConn, t *testing.T) {
time.Sleep(time.Millisecond * 100) // wait until stream internal state changes
}

func tUtilStreamInit() (*c2sStream, *fakeSocketConn) {
func tUtilStreamInit(t *testing.T) (*stream, *fakeSocketConn) {
keyFile := "../testdata/cert/test.server.key"
certFile := "../testdata/cert/test.server.crt"

tlsConfig, err := util.LoadCertificate(keyFile, certFile, "localhost")
require.Nil(t, err)

conn := newFakeSocketConn()
tr := transport.NewSocketTransport(conn, 4096)
stm := newC2SStream("abcd1234", tr, &tls.Config{}, tUtilStreamDefaultConfig())
stm := New("abcd1234", tr, tlsConfig, tUtilStreamDefaultConfig())
router.Instance().RegisterStream(stm)
return stm, conn
return stm.(*stream), conn
}

func tUtilStreamDefaultConfig() *Config {
Expand All @@ -453,24 +458,17 @@ func tUtilStreamDefaultConfig() *Config {
modules["offline"] = struct{}{}

return &Config{
ID: "server-id:1234",
ConnectTimeout: 1,
MaxStanzaSize: 8192,
ResourceConflict: Reject,
Type: C2SServerType,
Transport: TransportConfig{
Type: transport.Socket,
ConnectTimeout: 1,
KeepAlive: 5,
},
TLS: TLSConfig{
PrivKeyFile: "../testdata/cert/test.server.key",
CertFile: "../testdata/cert/test.server.crt",
Compression: CompressConfig{Level: compress.DefaultCompression},
SASL: []string{"plain", "digest_md5", "scram_sha_1", "scram_sha_256"},
Modules: ModulesConfig{
Enabled: modules,
Offline: offline.Config{QueueSize: 10},
Registration: xep0077.Config{AllowRegistration: true, AllowChange: true},
Version: xep0092.Config{ShowOS: true},
Ping: xep0199.Config{SendInterval: 5, Send: true},
},
Compression: CompressConfig{Level: compress.DefaultCompression},
SASL: []string{"plain", "digest_md5", "scram_sha_1", "scram_sha_256"},
Modules: modules,
ModOffline: offline.Config{QueueSize: 10},
ModRegistration: xep0077.Config{AllowRegistration: true, AllowChange: true},
ModVersion: xep0092.Config{ShowOS: true},
ModPing: xep0199.Config{SendInterval: 5, Send: true},
}
}
177 changes: 150 additions & 27 deletions c2s/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,166 @@
package c2s

import (
"crypto/tls"
"fmt"
"strings"

"github.com/ortuman/jackal/auth"
"github.com/ortuman/jackal/module"
"github.com/ortuman/jackal/module/offline"
"github.com/ortuman/jackal/module/roster"
"github.com/ortuman/jackal/module/xep0012"
"github.com/ortuman/jackal/module/xep0030"
"github.com/ortuman/jackal/module/xep0049"
"github.com/ortuman/jackal/module/xep0054"
"github.com/ortuman/jackal/module/xep0077"
"github.com/ortuman/jackal/module/xep0092"
"github.com/ortuman/jackal/module/xep0191"
"github.com/ortuman/jackal/module/xep0199"
"github.com/ortuman/jackal/server/transport"
"github.com/ortuman/jackal/server/compress"
)

type Modules struct {
Roster *roster.Roster
Offline *offline.Offline
LastActivity *xep0012.LastActivity
DiscoInfo *xep0030.DiscoInfo
Private *xep0049.Private
VCard *xep0054.VCard
Register *xep0077.Register
Version *xep0092.Version
BlockingCmd *xep0191.BlockingCommand
Ping *xep0199.Ping
const (
defaultTransportConnectTimeout = 5
defaultTransportMaxStanzaSize = 32768
)

// ResourceConflictPolicy represents a resource conflict policy.
type ResourceConflictPolicy int

const (
// Override represents 'override' resource conflict policy.
Override ResourceConflictPolicy = iota

// Reject represents 'reject' resource conflict policy.
Reject

// Replace represents 'replace' resource conflict policy.
Replace
)

// CompressConfig represents a server stream compression configuration.
type CompressConfig struct {
Level compress.Level
}

type compressionProxyType struct {
Level string `yaml:"level"`
}

IQHandlers []module.IQHandler
// UnmarshalYAML satisfies Unmarshaler interface.
func (c *CompressConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
p := compressionProxyType{}
if err := unmarshal(&p); err != nil {
return err
}
switch p.Level {
case "":
c.Level = compress.NoCompression
case "best":
c.Level = compress.BestCompression
case "speed":
c.Level = compress.SpeedCompression
case "default":
c.Level = compress.DefaultCompression
default:
return fmt.Errorf("c2s.CompressConfig: unrecognized compression level: %s", p.Level)
}
return nil
}

type ModulesConfig struct {
Enabled map[string]struct{}
Roster roster.Config
Offline offline.Config
Registration xep0077.Config
Version xep0092.Config
Ping xep0199.Config
}

type modulesConfigProxy struct {
Enabled []string `yaml:"enabled"`
Roster roster.Config `yaml:"mod_roster"`
Offline offline.Config `yaml:"mod_offline"`
Registration xep0077.Config `yaml:"mod_registration"`
Version xep0092.Config `yaml:"mod_version"`
Ping xep0199.Config `yaml:"mod_ping"`
}

// UnmarshalYAML satisfies Unmarshaler interface.
func (cfg *ModulesConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
p := modulesConfigProxy{}
if err := unmarshal(&p); err != nil {
return err
}
// validate modules
enabled := make(map[string]struct{}, len(p.Enabled))
for _, mod := range p.Enabled {
switch mod {
case "roster", "last_activity", "private", "vcard", "registration", "version", "blocking_command",
"ping", "offline":
break
default:
return fmt.Errorf("c2s.ModulesConfig: unrecognized module: %s", mod)
}
enabled[mod] = struct{}{}
}
cfg.Enabled = enabled
cfg.Roster = p.Roster
cfg.Offline = p.Offline
cfg.Registration = p.Registration
cfg.Version = p.Version
cfg.Ping = p.Ping
return nil
}

type Config struct {
TLSConfig *tls.Config
Transport transport.Transport
ConnectTimeout int
MaxStanzaSize int
Authenticators []auth.Authenticator
Modules Modules
ConnectTimeout int
MaxStanzaSize int
ResourceConflict ResourceConflictPolicy
SASL []string
Compression CompressConfig
Modules ModulesConfig
}

type configProxy struct {
ConnectTimeout int `yaml:"connect_timeout"`
MaxStanzaSize int `yaml:"max_stanza_size"`
ResourceConflict string `yaml:"resource_conflict"`
SASL []string `yaml:"sasl"`
Compression CompressConfig `yaml:"compression"`
Modules ModulesConfig `yaml:"modules"`
}

// UnmarshalYAML satisfies Unmarshaler interface.
func (cfg *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
p := configProxy{}
if err := unmarshal(&p); err != nil {
return err
}
// validate resource conflict policy type
rc := strings.ToLower(p.ResourceConflict)
switch rc {
case "override":
cfg.ResourceConflict = Override
case "reject":
cfg.ResourceConflict = Reject
case "", "replace":
cfg.ResourceConflict = Replace
default:
return fmt.Errorf("c2s.Config: invalid resource_conflict option: %s", rc)
}
// validate SASL mechanisms
for _, sasl := range p.SASL {
switch sasl {
case "plain", "digest_md5", "scram_sha_1", "scram_sha_256":
continue
default:
return fmt.Errorf("c2s.Config: unrecognized SASL mechanism: %s", sasl)
}
}
cfg.ConnectTimeout = p.ConnectTimeout
if cfg.ConnectTimeout == 0 {
cfg.ConnectTimeout = defaultTransportConnectTimeout
}
cfg.MaxStanzaSize = p.MaxStanzaSize
if cfg.MaxStanzaSize == 0 {
cfg.MaxStanzaSize = defaultTransportMaxStanzaSize
}
cfg.SASL = p.SASL
cfg.Compression = p.Compression
cfg.Modules = p.Modules
return nil
}
Loading

0 comments on commit 850b86d

Please sign in to comment.