Skip to content

Commit

Permalink
global config refactoring (ortuman#178)
Browse files Browse the repository at this point in the history
Signed-off-by: ortuman <ortuman@gmail.com>
  • Loading branch information
ortuman authored Oct 11, 2021
1 parent 890bb5e commit ea4dae7
Show file tree
Hide file tree
Showing 115 changed files with 559 additions and 459 deletions.
163 changes: 40 additions & 123 deletions pkg/cmd/jackal/app.go → cmd/jackal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,14 @@ import (
"syscall"
"time"

"google.golang.org/grpc/keepalive"

"google.golang.org/grpc"

etcdv3 "github.com/coreos/etcd/clientv3"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
adminserver "github.com/ortuman/jackal/pkg/admin/server"
"github.com/ortuman/jackal/pkg/auth/pepper"
"github.com/ortuman/jackal/pkg/c2s"
clusterconnmanager "github.com/ortuman/jackal/pkg/cluster/connmanager"
"github.com/ortuman/jackal/pkg/cluster/etcd"
"github.com/ortuman/jackal/pkg/cluster/kv"
etcdkv "github.com/ortuman/jackal/pkg/cluster/kv/etcd"
"github.com/ortuman/jackal/pkg/cluster/locker"
etcdlocker "github.com/ortuman/jackal/pkg/cluster/locker/etcd"
"github.com/ortuman/jackal/pkg/cluster/memberlist"
clusterrouter "github.com/ortuman/jackal/pkg/cluster/router"
clusterserver "github.com/ortuman/jackal/pkg/cluster/server"
Expand All @@ -50,15 +44,12 @@ import (
"github.com/ortuman/jackal/pkg/log"
"github.com/ortuman/jackal/pkg/log/zap"
"github.com/ortuman/jackal/pkg/module"
"github.com/ortuman/jackal/pkg/repository"
measuredrepository "github.com/ortuman/jackal/pkg/repository/measured"
pgsqlrepository "github.com/ortuman/jackal/pkg/repository/pgsql"
"github.com/ortuman/jackal/pkg/router"
"github.com/ortuman/jackal/pkg/s2s"
"github.com/ortuman/jackal/pkg/shaper"
"github.com/ortuman/jackal/pkg/storage"
"github.com/ortuman/jackal/pkg/storage/repository"
"github.com/ortuman/jackal/pkg/util/crashreporter"
"github.com/ortuman/jackal/pkg/util/stringmatcher"
tlsutil "github.com/ortuman/jackal/pkg/util/tls"
"github.com/ortuman/jackal/pkg/version"
)

Expand Down Expand Up @@ -108,9 +99,8 @@ type serverApp struct {
peppers *pepper.Keys
hk *hook.Hooks

etcdCli *etcdv3.Client
locker locker.Locker
kv kv.KV
locker locker.Locker
kv kv.KV

rep repository.Repository
memberList *memberlist.MemberList
Expand Down Expand Up @@ -202,7 +192,7 @@ func run(output io.Writer, args []string) error {
)

// init pepper keys
peppers, err := pepper.NewKeys(cfg.Peppers.Keys, cfg.Peppers.UseID)
peppers, err := pepper.NewKeys(cfg.Peppers)
if err != nil {
return err
}
Expand All @@ -212,11 +202,8 @@ func run(output io.Writer, args []string) error {
a.hk = hook.NewHooks()

// init etcd
if err := a.initEtcd(cfg.Cluster.Etcd); err != nil {
return err
}
a.initLocker()
a.initKVStore()
a.initLocker(cfg.Cluster.Etcd)
a.initKVStore(cfg.Cluster.Etcd)

// init cluster connection manager
a.initClusterConnManager()
Expand Down Expand Up @@ -245,14 +232,13 @@ func run(output io.Writer, args []string) error {
a.registerStartStopper(newHTTPServer(cfg.HTTPPort))

// init admin server
if !cfg.Admin.Disabled {
a.initAdminServer(cfg.Admin.BindAddr, cfg.Admin.Port)
}
a.initAdminServer(cfg.Admin)

// init cluster server
a.initClusterServer(cfg.Cluster.BindAddr, cfg.Cluster.Port)
a.initClusterServer(cfg.Cluster.Server)

// init memberlist
a.initMemberList(cfg.Cluster.Port)
a.initMemberList(cfg.Cluster.Server.Port)

// init C2S/S2S listeners
if err := a.initListeners(cfg.Listeners); err != nil {
Expand All @@ -262,53 +248,20 @@ func run(output io.Writer, args []string) error {
if err := a.bootstrap(); err != nil {
return err
}
// ...wait for stop signal to shutdown
// ...wait for stop signal to shut down
sig := a.waitForStopSignal()
log.Infof("Received %s signal... shutting down...", sig.String())

return a.shutdown()
}

func (a *serverApp) initEtcd(cfg etcdConfig) error {
const (
dialKeepAliveTime = 30 * time.Second
dialKeepAliveTimeout = 10 * time.Second
dialTimeout = 20 * time.Second

keepAlive = time.Second * 10
timeout = time.Minute * 20
)
dialOptions := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: keepAlive,
Timeout: timeout,
PermitWithoutStream: true,
}),
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
}
cli, err := etcdv3.New(etcdv3.Config{
Endpoints: cfg.Endpoints,
DialTimeout: dialTimeout,
DialKeepAliveTime: dialKeepAliveTime,
DialKeepAliveTimeout: dialKeepAliveTimeout,
DialOptions: dialOptions,
})
if err != nil {
return err
}
a.etcdCli = cli
return nil
}

func (a *serverApp) initLocker() {
a.locker = etcdlocker.New(a.etcdCli)
func (a *serverApp) initLocker(cfg etcd.Config) {
a.locker = etcd.NewLocker(cfg)
a.registerStartStopper(a.locker)
}

func (a *serverApp) initKVStore() {
etcdKV := etcdkv.New(a.etcdCli)
func (a *serverApp) initKVStore(cfg etcd.Config) {
etcdKV := etcd.NewKV(cfg)
a.kv = kv.NewMeasured(etcdKV)
a.registerStartStopper(a.kv)
}
Expand All @@ -318,71 +271,33 @@ func (a *serverApp) initClusterConnManager() {
a.registerStartStopper(a.clusterConnMng)
}

func (a *serverApp) initRepository(sCfg storageConfig) error {
cfg := sCfg.PgSQL
opts := pgsqlrepository.Config{
MaxIdleConns: cfg.MaxIdleConns,
MaxOpenConns: cfg.MaxOpenConns,
ConnMaxIdleTime: cfg.ConnMaxIdleTime,
ConnMaxLifetime: cfg.ConnMaxLifetime,
func (a *serverApp) initRepository(cfg storage.Config) error {
rep, err := storage.New(cfg)
if err != nil {
return err
}
pgRep := pgsqlrepository.New(
cfg.Host,
cfg.User,
cfg.Password,
cfg.Database,
cfg.SSLMode,
opts,
)
a.rep = measuredrepository.New(pgRep)
a.rep = rep
a.registerStartStopper(a.rep)
return nil
}

func (a *serverApp) initHosts(configs []hostConfig) error {
const defaultDomain = "localhost"
h := host.New()
if len(configs) == 0 {
cer, err := tlsutil.LoadCertificate("", "", defaultDomain)
if err != nil {
return err
}
h.RegisterDefaultHost(defaultDomain, cer)
a.hosts = h
return nil
}
for i, config := range configs {
cer, err := tlsutil.LoadCertificate(config.TLS.PrivateKeyFile, config.TLS.CertFile, config.Domain)
if err != nil {
return err
}
if i == 0 {
h.RegisterDefaultHost(config.Domain, cer)
} else {
h.RegisterHost(config.Domain, cer)
}
func (a *serverApp) initHosts(configs []host.Config) error {
h, err := host.NewHost(configs)
if err != nil {
return err
}
a.hosts = h
return nil
}

func (a *serverApp) initShapers(cfgs []shaperConfig) error {
func (a *serverApp) initShapers(configs []shaper.Config) error {
a.shapers = make(shaper.Shapers, 0)
for _, cfg := range cfgs {
var sm stringmatcher.Matcher
switch {
case len(cfg.Matching.JID.In) > 0:
sm = stringmatcher.NewStringMatcher(cfg.Matching.JID.In)
case len(cfg.Matching.JID.RegEx) > 0:
var err error
sm, err = stringmatcher.NewRegExMatcher(cfg.Matching.JID.RegEx)
if err != nil {
return err
}
default:
sm = stringmatcher.Any
for _, cfg := range configs {
shp, err := shaper.New(cfg)
if err != nil {
return err
}
a.shapers = append(a.shapers, shaper.New(cfg.MaxSessions, cfg.Rate.Limit, cfg.Rate.Burst, sm))
a.shapers = append(a.shapers, shp)

log.Infow(fmt.Sprintf("Registered '%s' shaper configuration", cfg.Name),
"name", cfg.Name,
Expand All @@ -403,7 +318,7 @@ func (a *serverApp) initListeners(configs []listenerConfig) error {
for _, cfg := range configs {
lnFn, ok := lnFns[cfg.Type]
if !ok {
return fmt.Errorf("main: unrecognized listener: %s", cfg.Type)
return fmt.Errorf("main: unrecognized listener type: %s", cfg.Type)
}
ln := lnFn(a, cfg)
a.registerStartStopper(ln)
Expand Down Expand Up @@ -472,18 +387,21 @@ func (a *serverApp) initModules(cfg modulesConfig) error {
return nil
}

func (a *serverApp) initAdminServer(bindAddr string, port int) {
adminSrv := adminserver.New(bindAddr, port, a.rep, a.peppers, a.hk)
func (a *serverApp) initAdminServer(cfg adminserver.Config) {
adminSrv := adminserver.New(cfg, a.rep, a.peppers, a.hk)
a.registerStartStopper(adminSrv)
}

func (a *serverApp) initClusterServer(bindAddr string, port int) {
clusterSrv := clusterserver.New(bindAddr, port, a.localRouter, a.comps)
func (a *serverApp) initClusterServer(cfg clusterserver.Config) {
clusterSrv := clusterserver.New(cfg, a.localRouter, a.comps)
a.registerStartStopper(clusterSrv)
return
}

func (a *serverApp) registerStartStopper(ss startStopper) {
if ss == nil {
return
}
a.starters = append(a.starters, ss)
a.stoppers = append([]stopper{ss}, a.stoppers...)
}
Expand Down Expand Up @@ -526,7 +444,6 @@ func (a *serverApp) shutdown() error {
return
}
}
_ = a.etcdCli.Close()
log.Close()
errCh <- nil
}()
Expand Down
Loading

0 comments on commit ea4dae7

Please sign in to comment.