Skip to content

Commit

Permalink
refactor: host heartbeat (#1417)
Browse files Browse the repository at this point in the history
* refactor host heartbeat
  • Loading branch information
710leo authored Mar 11, 2023
1 parent 4f6a0bf commit 462e9dd
Show file tree
Hide file tree
Showing 29 changed files with 454 additions and 460 deletions.
15 changes: 10 additions & 5 deletions alert/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}
ctx := ctx.NewContext(context.Background(), db)

redis, err := storage.NewRedis(config.Redis)
if err != nil {
return nil, err
}

syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()

targetCache := memsto.NewTargetCache(ctx, syncStats)
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats)

Expand All @@ -56,7 +61,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {

externalProcessors := process.NewExternalProcessors()

Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients)
Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients, false)

r := httpx.GinEngine(config.Global.RunMode, config.HTTP)
rt := router.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
Expand All @@ -71,7 +76,7 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
}

func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, alertStats *astats.Stats, externalProcessors *process.ExternalProcessorsType, targetCache *memsto.TargetCacheType, busiGroupCache *memsto.BusiGroupCacheType,
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, ctx *ctx.Context, promClients *prom.PromClientMap) {
alertMuteCache *memsto.AlertMuteCacheType, alertRuleCache *memsto.AlertRuleCacheType, ctx *ctx.Context, promClients *prom.PromClientMap, isCenter bool) {
userCache := memsto.NewUserCache(ctx, syncStats)
userGroupCache := memsto.NewUserGroupCache(ctx, syncStats)
alertSubscribeCache := memsto.NewAlertSubscribeCache(ctx, syncStats)
Expand All @@ -81,12 +86,12 @@ func Start(alertc aconf.Alert, pushgwc pconf.Pushgw, syncStats *memsto.Stats, al

go models.InitNotifyConfig(ctx, alertc.Alerting.TemplatesDir)

naming := naming.NewNaming(ctx, alertc.Heartbeat)
naming := naming.NewNaming(ctx, alertc.Heartbeat, isCenter)

writers := writer.NewWriters(pushgwc)
record.NewScheduler(alertc, recordingRuleCache, promClients, writers, alertStats)

eval.NewScheduler(alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, promClients, naming, ctx, alertStats)
eval.NewScheduler(isCenter, alertc, externalProcessors, alertRuleCache, targetCache, busiGroupCache, alertMuteCache, promClients, naming, ctx, alertStats)

dp := dispatch.NewDispatch(alertRuleCache, userCache, userGroupCache, alertSubscribeCache, targetCache, webhookCache, notifyScript, alertc.Alerting, alertc.Ibex, ctx)
consumer := dispatch.NewConsumer(alertc.Alerting, ctx, dp)
Expand Down
12 changes: 9 additions & 3 deletions alert/eval/alert_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

type Scheduler struct {
isCenter bool
// key: hash
alertRules map[string]*AlertRuleWorker

Expand All @@ -35,10 +36,11 @@ type Scheduler struct {
stats *astats.Stats
}

func NewScheduler(aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
func NewScheduler(isCenter bool, aconf aconf.Alert, externalProcessors *process.ExternalProcessorsType, arc *memsto.AlertRuleCacheType, targetCache *memsto.TargetCacheType,
busiGroupCache *memsto.BusiGroupCacheType, alertMuteCache *memsto.AlertMuteCacheType, promClients *prom.PromClientMap, naming *naming.Naming,
ctx *ctx.Context, stats *astats.Stats) *Scheduler {
scheduler := &Scheduler{
isCenter: isCenter,
aconf: aconf,
alertRules: make(map[string]*AlertRuleWorker),
// recordRules: make(map[string]RuleContext),
Expand Down Expand Up @@ -96,8 +98,12 @@ func (s *Scheduler) syncAlertRules() {
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
}
} else if rule.IsHostRule() && s.naming.IamLeader() {
// all host rule will be processed by leader
} else if rule.IsHostRule() && s.isCenter {
// all host rule will be processed by center instance

if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
continue
}
processor := process.NewProcessor(rule, 0, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.promClients, s.ctx, s.stats)
alertRule := NewAlertRuleWorker(rule, 0, processor, s.promClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
Expand Down
31 changes: 19 additions & 12 deletions alert/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,36 +188,43 @@ func (arw *AlertRuleWorker) GetHostAnomalyPoint(ruleConfig string) []common.Anom

query := models.GetHostsQuery(rule.Queries)
switch trigger.Type {
case "target_miss", "offset":
t := now - int64(trigger.Duration)
if trigger.Type == "offset" {
t = int64(trigger.Duration)
}

hosts, err := models.TargetGetsByFilter(arw.ctx, query, trigger.Type, t, 0, 0)
case "target_miss":
targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0)
if err != nil {
logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err)
continue
}
t := now - int64(trigger.Duration)
hosts := arw.processor.TargetCache.GetMissHost(targets, t)

for _, host := range hosts {
m := make(map[string]string)
m["ident"] = host.Ident
m["ident"] = host
lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(t), trigger.Severity))
}
case "pct_target_miss":
AllCount, err := models.TargetCountByFilter(arw.ctx, query, "", 0)
case "offset":
t := int64(trigger.Duration)
targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0)
if err != nil {
logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err)
continue
}
missCount, err := models.TargetCountByFilter(arw.ctx, query, trigger.Type, now-int64(trigger.Duration))
hosts := arw.processor.TargetCache.GetOffsetHost(targets, t)
for _, host := range hosts {
m := make(map[string]string)
m["ident"] = host
lst = append(lst, common.NewAnomalyPoint(trigger.Type, m, now, float64(t), trigger.Severity))
}
case "pct_target_miss":
targets, err := models.TargetGetsByFilter(arw.ctx, query, 0, 0)
if err != nil {
logger.Errorf("rule_eval:%s query:%v, error:%v", arw.Key(), query, err)
continue
}

pct := float64(missCount) / float64(AllCount) * 100
t := now - int64(trigger.Duration)
hosts := arw.processor.TargetCache.GetMissHost(targets, t)
pct := float64(len(hosts)) / float64(len(targets)) * 100
if pct >= float64(trigger.Percent) {
lst = append(lst, common.NewAnomalyPoint(trigger.Type, nil, now, pct, trigger.Severity))
}
Expand Down
1 change: 1 addition & 0 deletions alert/naming/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type DatasourceHashRingType struct {
}

// for alert_rule sharding
var HostDatasource int64 = 100000
var DatasourceHashRing = DatasourceHashRingType{Rings: make(map[int64]*consistent.Consistent)}

func NewConsistentHashRing(replicas int32, nodes []string) *consistent.Consistent {
Expand Down
51 changes: 37 additions & 14 deletions alert/naming/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@ import (
)

type Naming struct {
ctx *ctx.Context
Heartbeat aconf.HeartbeatConfig
ctx *ctx.Context
heartbeatConfig aconf.HeartbeatConfig
isCenter bool
}

func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig) *Naming {
func NewNaming(ctx *ctx.Context, heartbeat aconf.HeartbeatConfig, isCenter bool) *Naming {
naming := &Naming{
ctx: ctx,
Heartbeat: heartbeat,
ctx: ctx,
heartbeatConfig: heartbeat,
isCenter: isCenter,
}
naming.Heartbeats()
return naming
Expand Down Expand Up @@ -58,7 +60,7 @@ func (n *Naming) DeleteInactiveInstances() {
}

func (n *Naming) loopHeartbeat() {
interval := time.Duration(n.Heartbeat.Interval) * time.Millisecond
interval := time.Duration(n.heartbeatConfig.Interval) * time.Millisecond
for {
time.Sleep(interval)
if err := n.heartbeat(); err != nil {
Expand All @@ -72,19 +74,19 @@ func (n *Naming) heartbeat() error {
var err error

// 在页面上维护实例和集群的对应关系
datasourceIds, err = models.GetDatasourceIdsByClusterName(n.ctx, n.Heartbeat.ClusterName)
datasourceIds, err = models.GetDatasourceIdsByClusterName(n.ctx, n.heartbeatConfig.ClusterName)
if err != nil {
return err
}

if len(datasourceIds) == 0 {
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.Heartbeat.Endpoint, n.Heartbeat.ClusterName, 0)
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, 0)
if err != nil {
logger.Warningf("heartbeat with cluster %s err:%v", "", err)
}
} else {
for i := 0; i < len(datasourceIds); i++ {
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.Heartbeat.Endpoint, n.Heartbeat.ClusterName, datasourceIds[i])
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, datasourceIds[i])
if err != nil {
logger.Warningf("heartbeat with cluster %d err:%v", datasourceIds[i], err)
}
Expand All @@ -110,6 +112,32 @@ func (n *Naming) heartbeat() error {
localss[datasourceIds[i]] = newss
}

if n.isCenter {
// 如果是中心节点,还需要处理 host 类型的告警规则,host 类型告警规则,和数据源无关,想复用下数据源的 hash ring,想用一个虚假的数据源 id 来处理
// if is center node, we need to handle host type alerting rules, host type alerting rules are not related to datasource, we want to reuse the hash ring of datasource, we want to use a fake datasource id to handle it
err := models.AlertingEngineHeartbeatWithCluster(n.ctx, n.heartbeatConfig.Endpoint, n.heartbeatConfig.ClusterName, HostDatasource)
if err != nil {
logger.Warningf("heartbeat with cluster %s err:%v", "", err)
}

servers, err := n.ActiveServers(HostDatasource)
if err != nil {
logger.Warningf("hearbeat %d get active server err:%v", HostDatasource, err)
return nil
}

sort.Strings(servers)
newss := strings.Join(servers, " ")

oldss, exists := localss[HostDatasource]
if exists && oldss == newss {
return nil
}

RebuildConsistentHashRing(HostDatasource, servers)
localss[HostDatasource] = newss
}

return nil
}

Expand All @@ -121,8 +149,3 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
// 30秒内有心跳,就认为是活的
return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
}

func (n *Naming) AllActiveServers() ([]string, error) {
// 30秒内有心跳,就认为是活的
return models.AlertingEngineGetsInstances(n.ctx, "clock > ?", time.Now().Unix()-30)
}
24 changes: 0 additions & 24 deletions alert/naming/leader.go

This file was deleted.

8 changes: 4 additions & 4 deletions alert/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Processor struct {
groupName string

atertRuleCache *memsto.AlertRuleCacheType
targetCache *memsto.TargetCacheType
TargetCache *memsto.TargetCacheType
busiGroupCache *memsto.BusiGroupCacheType
alertMuteCache *memsto.AlertMuteCacheType

Expand Down Expand Up @@ -90,7 +90,7 @@ func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *me
quit: make(chan struct{}),
rule: rule,

targetCache: targetCache,
TargetCache: targetCache,
busiGroupCache: busiGroupCache,
alertMuteCache: alertMuteCache,
atertRuleCache: atertRuleCache,
Expand Down Expand Up @@ -126,7 +126,7 @@ func (arw *Processor) Handle(anomalyPoints []common.AnomalyPoint, from string, i
// 如果 event 被 mute 了,本质也是 fire 的状态,这里无论如何都添加到 alertingKeys 中,防止 fire 的事件自动恢复了
hash := event.Hash
alertingKeys[hash] = struct{}{}
if mute.IsMuted(cachedRule, event, arw.targetCache, arw.alertMuteCache) {
if mute.IsMuted(cachedRule, event, arw.TargetCache, arw.alertMuteCache) {
logger.Debugf("rule_eval:%s event:%v is muted", arw.Key(), event)
continue
}
Expand Down Expand Up @@ -392,7 +392,7 @@ func (arw *Processor) fillTags(anomalyPoint common.AnomalyPoint) {
func (arw *Processor) mayHandleIdent() {
// handle ident
if ident, has := arw.tagsMap["ident"]; has {
if target, exists := arw.targetCache.Get(ident); exists {
if target, exists := arw.TargetCache.Get(ident); exists {
arw.target = target.Ident
arw.targetNote = target.Note
}
Expand Down
14 changes: 8 additions & 6 deletions center/center.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ccfos/nightingale/v6/alert/astats"
"github.com/ccfos/nightingale/v6/alert/process"
"github.com/ccfos/nightingale/v6/center/cconf"
"github.com/ccfos/nightingale/v6/center/idents"
"github.com/ccfos/nightingale/v6/center/sso"
"github.com/ccfos/nightingale/v6/conf"
"github.com/ccfos/nightingale/v6/memsto"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/ccfos/nightingale/v6/pkg/i18nx"
"github.com/ccfos/nightingale/v6/pkg/logx"
"github.com/ccfos/nightingale/v6/prom"
"github.com/ccfos/nightingale/v6/pushgw/idents"
"github.com/ccfos/nightingale/v6/pushgw/writer"
"github.com/ccfos/nightingale/v6/storage"

Expand Down Expand Up @@ -54,27 +54,29 @@ func Initialize(configDir string, cryptoKey string) (func(), error) {
return nil, err
}

idents := idents.New(db, redis)

syncStats := memsto.NewSyncStats()
alertStats := astats.NewSyncStats()
idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset)
// idents := idents.New(db, config.Pushgw.DatasourceId, config.Pushgw.MaxOffset)
sso := sso.Init(config.Center, ctx)

busiGroupCache := memsto.NewBusiGroupCache(ctx, syncStats)
targetCache := memsto.NewTargetCache(ctx, syncStats)
targetCache := memsto.NewTargetCache(ctx, syncStats, redis)
dsCache := memsto.NewDatasourceCache(ctx, syncStats)
alertMuteCache := memsto.NewAlertMuteCache(ctx, syncStats)
alertRuleCache := memsto.NewAlertRuleCache(ctx, syncStats)

promClients := prom.NewPromClient(ctx, config.Alert.Heartbeat)

externalProcessors := process.NewExternalProcessors()
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients)
alert.Start(config.Alert, config.Pushgw, syncStats, alertStats, externalProcessors, targetCache, busiGroupCache, alertMuteCache, alertRuleCache, ctx, promClients, true)

writers := writer.NewWriters(config.Pushgw)

alertrtRouter := alertrt.New(config.HTTP, config.Alert, alertMuteCache, targetCache, busiGroupCache, alertStats, ctx, externalProcessors)
centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, promClients, redis, sso, ctx)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, idents, writers, ctx)
centerRouter := centerrt.New(config.HTTP, config.Center, cconf.Operations, dsCache, promClients, redis, sso, ctx, idents)
pushgwRouter := pushgwrt.New(config.HTTP, config.Pushgw, targetCache, busiGroupCache, writers, ctx)

r := httpx.GinEngine(config.Global.RunMode, config.HTTP)

Expand Down
Loading

0 comments on commit 462e9dd

Please sign in to comment.