Skip to content

Commit

Permalink
Added cluster mode support
Browse files Browse the repository at this point in the history
ortuman authored Jan 2, 2019
1 parent bfd54d7 commit ec1eeda
Showing 103 changed files with 3,534 additions and 940 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [0.4.0] - 2019-01-01
### Added
- Cluster mode support.

## [0.3.6] - 2018-12-15
### Fixed
- Fixed bug in roster item deletion.
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
install:
@export GO111MODULE=on && go install github.com/ortuman/jackal

install-tools:
@export GO111MODULE=on && go get -u \
golang.org/x/lint/golint

test:
@echo "Running tests..."
@go test -race $$(go list ./...)

coverage:
@go test -race -coverprofile=coverage.txt -covermode=atomic $$(go list ./...)

vet:
@echo "Looking for buggy code..."
@go vet $$(go list ./...)

lint: install-tools
@golint $$(go list ./...)

clean:
@go clean
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -80,6 +80,22 @@ mysql -h localhost -D jackal -u jackal -p < mysql.sql

Your database is now ready to connect with jackal.

### Cluster configuration

The purpose of clustering is to be able to use several servers for fault-tolerance and scalability.

To run `jackal` in clustering mode make sure to add a `cluster` section configuration in each of the cluster nodes.

Here is an example of how this section should look like:
```yaml
cluster:
name: node1
port: 5010
hosts: [node2:5010, node3:5010]
```
Do not forget to include all cluster nodes, excluding the local one, in the `hosts` array field. Otherwise the expected behavior will be undefined.

## Run jackal in Docker

Set up `jackal` in the cloud in under 5 minutes with zero knowledge of Golang or Linux shell using our [jackal Docker image](https://hub.docker.com/r/ortuman/jackal/).
109 changes: 67 additions & 42 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ import (
"time"

"github.com/ortuman/jackal/c2s"
"github.com/ortuman/jackal/cluster"
"github.com/ortuman/jackal/component"
"github.com/ortuman/jackal/log"
"github.com/ortuman/jackal/module"
@@ -54,36 +55,13 @@ Common Options:
-v, --version Show version
`

var initLogger = func(config *loggerConfig, output io.Writer) (log.Logger, error) {
var logFiles []io.WriteCloser
if len(config.LogPath) > 0 {
// create logFile intermediate directories.
if err := os.MkdirAll(filepath.Dir(config.LogPath), os.ModePerm); err != nil {
return nil, err
}
f, err := os.OpenFile(config.LogPath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return nil, err
}
logFiles = append(logFiles, f)
}
logger, err := log.New(config.Level, output, logFiles...)
if err != nil {
return nil, err
}
return logger, nil
}

var initStorage = func(config *storage.Config) (storage.Storage, error) {
return storage.New(config)
}

// Application encapsulates a jackal server application.
type Application struct {
output io.Writer
args []string
logger log.Logger
storage storage.Storage
cluster *cluster.Cluster
router *router.Router
mods *module.Modules
comps *component.Components
@@ -150,40 +128,52 @@ func (a *Application) Run() error {
}

// initialize logger
a.logger, err = initLogger(&cfg.Logger, a.output)
err = a.initLogger(&cfg.Logger, a.output)
if err != nil {
return err
}
log.Set(a.logger)

// show jackal's fancy logo
a.printLogo()

// initialize storage
a.storage, err = initStorage(&cfg.Storage)
err = a.initStorage(&cfg.Storage)
if err != nil {
return err
}
storage.Set(a.storage)

a.printLogo()

// initialize router
a.router, err = router.New(&cfg.Router)
if err != nil {
return err
}

// initialize cluster
if cfg.Cluster != nil && storage.IsClusterCompatible() {
a.cluster, err = cluster.New(cfg.Cluster, a.router.ClusterDelegate())
if err != nil {
return err
}
if a.cluster != nil {
a.router.SetCluster(a.cluster)
if err := a.cluster.Join(); err != nil {
log.Warnf("%v", err)
}
}
} else {
log.Warnf("cluster mode disabled: storage type '%s' is not compatible", cfg.Storage.Type)
}

// initialize modules & components...
a.mods = module.New(&cfg.Modules, a.router)
a.comps = component.New(&cfg.Components, a.mods.DiscoInfo)

// start serving s2s...
a.s2s = s2s.New(cfg.S2S, a.mods, a.router)
if a.s2s.Enabled() {
a.router.SetS2SOutProvider(a.s2s)
if a.s2s != nil {
a.router.SetOutS2SProvider(a.s2s)
a.s2s.Start()
} else {
log.Infof("s2s disabled")
}

// start serving c2s...
a.c2s, err = c2s.New(cfg.C2S, a.mods, a.comps, a.router)
if err != nil {
@@ -197,9 +187,11 @@ func (a *Application) Run() error {
return err
}
}
a.waitForStopSignal()

// shutdown gracefully
// ...wait for stop signal to shutdown
sig := a.waitForStopSignal()
log.Infof("received %s signal... shutting down...", sig.String())

if err := a.gracefullyShutdown(); err != nil {
return err
}
@@ -230,6 +222,38 @@ func (a *Application) createPIDFile(pidFile string) error {
return nil
}

func (a *Application) initLogger(config *loggerConfig, output io.Writer) error {
var logFiles []io.WriteCloser
if len(config.LogPath) > 0 {
// create logFile intermediate directories.
if err := os.MkdirAll(filepath.Dir(config.LogPath), os.ModePerm); err != nil {
return err
}
f, err := os.OpenFile(config.LogPath, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666)
if err != nil {
return err
}
logFiles = append(logFiles, f)
}
l, err := log.New(config.Level, output, logFiles...)
if err != nil {
return err
}
a.logger = l
log.Set(a.logger)
return nil
}

func (a *Application) initStorage(config *storage.Config) error {
s, err := storage.New(config)
if err != nil {
return err
}
a.storage = s
storage.Set(a.storage)
return nil
}

func (a *Application) printLogo() {
for i := range logoStr {
log.Infof("%s", logoStr[i])
@@ -249,14 +273,12 @@ func (a *Application) initDebugServer(port int) error {
return nil
}

func (a *Application) waitForStopSignal() {
func (a *Application) waitForStopSignal() os.Signal {
signal.Notify(a.waitStopCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
<-a.waitStopCh
return <-a.waitStopCh
}

func (a *Application) gracefullyShutdown() error {
log.Infof("received stop signal... shutting down...")

// wait until application has been shut down
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(a.shutDownWaitSecs))
defer cancel()
@@ -276,9 +298,12 @@ func (a *Application) shutdown(ctx context.Context) <-chan bool {
a.debugSrv.Shutdown(ctx)
}
a.c2s.Shutdown(ctx)
if a.s2s.Enabled() {
if a.s2s != nil {
a.s2s.Shutdown(ctx)
}
if a.cluster != nil {
a.cluster.Shutdown()
}
a.comps.Shutdown(ctx)
a.mods.Shutdown(ctx)

3 changes: 3 additions & 0 deletions app/config.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,8 @@ import (
"bytes"
"io/ioutil"

"github.com/ortuman/jackal/cluster"

"github.com/ortuman/jackal/c2s"
"github.com/ortuman/jackal/component"
"github.com/ortuman/jackal/module"
@@ -34,6 +36,7 @@ type Config struct {
Debug debugConfig `yaml:"debug"`
Logger loggerConfig `yaml:"logger"`
Storage storage.Config `yaml:"storage"`
Cluster *cluster.Config `yaml:"cluster"`
Router router.Config `yaml:"router"`
Modules module.Config `yaml:"modules"`
Components component.Config `yaml:"components"`
6 changes: 3 additions & 3 deletions auth/digest_md5_test.go
Original file line number Diff line number Diff line change
@@ -78,7 +78,7 @@ func TestDigesMD5Authentication(t *testing.T) {
auth.SetAttribute("mechanism", "DIGEST-MD5")
authr.ProcessElement(auth)

challenge := testStm.FetchElement()
challenge := testStm.ReceiveElement()
require.Equal(t, challenge.Name(), "challenge")
clParams := helper.clientParamsFromChallenge(challenge.Text())
clientResp := authr.computeResponse(clParams, user, true)
@@ -143,15 +143,15 @@ func TestDigesMD5Authentication(t *testing.T) {
// successful authentication...
require.Nil(t, helper.sendClientParamsResponse(clParams))

challenge = testStm.FetchElement()
challenge = testStm.ReceiveElement()

serverResp := authr.computeResponse(clParams, user, false)
require.Equal(t, base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("rspauth=%s", serverResp))), challenge.Text())

response.SetText("")
authr.ProcessElement(response)

success := testStm.FetchElement()
success := testStm.ReceiveElement()
require.Equal(t, "success", success.Name())

// successfully authenticated
6 changes: 3 additions & 3 deletions auth/scram_test.go
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ type fakeTransport struct {
func (ft *fakeTransport) Read(p []byte) (n int, err error) { return 0, nil }
func (ft *fakeTransport) Write(p []byte) (n int, err error) { return 0, nil }
func (ft *fakeTransport) Close() error { return nil }
func (ft *fakeTransport) Type() transport.TransportType { return transport.Socket }
func (ft *fakeTransport) Type() transport.Type { return transport.Socket }
func (ft *fakeTransport) WriteString(s string) (n int, err error) { return 0, nil }
func (ft *fakeTransport) StartTLS(*tls.Config, bool) { return }
func (ft *fakeTransport) EnableCompression(compress.Level) { return }
@@ -269,7 +269,7 @@ func processScramTestCase(t *testing.T, tc *scramAuthTestCase) error {
if err != nil {
return err
}
challenge := testStm.FetchElement()
challenge := testStm.ReceiveElement()
require.NotNil(t, challenge)
require.Equal(t, "challenge", challenge.Name())

@@ -301,7 +301,7 @@ func processScramTestCase(t *testing.T, tc *scramAuthTestCase) error {
return err
}

success := testStm.FetchElement()
success := testStm.ReceiveElement()
require.Equal(t, "success", success.Name())

vb64, err := base64.StdEncoding.DecodeString(success.Text())
15 changes: 12 additions & 3 deletions c2s/c2s.go
Original file line number Diff line number Diff line change
@@ -32,10 +32,19 @@ const (
blockedErrorNamespace = "urn:xmpp:blocking:errors"
)

type c2sServer interface {
start()
shutdown(ctx context.Context) error
}

var createC2SServer = func(config *Config, mods *module.Modules, comps *component.Components, router *router.Router) c2sServer {
return &server{cfg: config, mods: mods, comps: comps, router: router}
}

// C2S represents a client-to-server connection manager.
type C2S struct {
mu sync.RWMutex
servers map[string]*server
servers map[string]c2sServer
started uint32
}

@@ -44,9 +53,9 @@ func New(configs []Config, mods *module.Modules, comps *component.Components, ro
if len(configs) == 0 {
return nil, errors.New("at least one c2s configuration is required")
}
c := &C2S{servers: make(map[string]*server)}
c := &C2S{servers: make(map[string]c2sServer)}
for _, config := range configs {
srv := &server{cfg: &config, mods: mods, comps: comps, router: router}
srv := createC2SServer(&config, mods, comps, router)
c.servers[config.ID] = srv
}
return c, nil
Loading
Oops, something went wrong.

0 comments on commit ec1eeda

Please sign in to comment.