Skip to content

Commit

Permalink
Merge pull request prometheus#1151 from prometheus/fix-sd-source-hand…
Browse files Browse the repository at this point in the history
…ling

Fix SD mechanism source prefix handling.
  • Loading branch information
juliusv committed Oct 9, 2015
2 parents 0088aa4 + d88aea7 commit 288964e
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 63 deletions.
13 changes: 6 additions & 7 deletions retrieval/discovery/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type ConsulDiscovery struct {
// consulService contains data belonging to the same service.
type consulService struct {
name string
tgroup *config.TargetGroup
tgroup config.TargetGroup
lastIndex uint64
removed bool
running bool
Expand Down Expand Up @@ -143,7 +143,7 @@ func (cd *ConsulDiscovery) Sources() []string {
}

// Run implements the TargetProvider interface.
func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
func (cd *ConsulDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch)
defer cd.stop()

Expand All @@ -159,7 +159,7 @@ func (cd *ConsulDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct
close(srv.done)

// Send clearing update.
ch <- &config.TargetGroup{Source: srv.name}
ch <- config.TargetGroup{Source: srv.name}
break
}
// Launch watcher for the service.
Expand Down Expand Up @@ -219,9 +219,8 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch
srv, ok := cd.services[name]
if !ok {
srv = &consulService{
name: name,
tgroup: &config.TargetGroup{},
done: make(chan struct{}),
name: name,
done: make(chan struct{}),
}
srv.tgroup.Source = name
cd.services[name] = srv
Expand All @@ -246,7 +245,7 @@ func (cd *ConsulDiscovery) watchServices(update chan<- *consulService, done <-ch

// watchService retrieves updates about srv from Consul's service endpoint.
// On a potential update the resulting target group is sent to ch.
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- *config.TargetGroup) {
func (cd *ConsulDiscovery) watchService(srv *consulService, ch chan<- config.TargetGroup) {
catalog := cd.client.Catalog()
for {
nodes, meta, err := catalog.Service(srv.name, "", &consul.QueryOptions{
Expand Down
8 changes: 4 additions & 4 deletions retrieval/discovery/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func NewDNSDiscovery(conf *config.DNSSDConfig) *DNSDiscovery {
}

// Run implements the TargetProvider interface.
func (dd *DNSDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
func (dd *DNSDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch)

ticker := time.NewTicker(dd.interval)
Expand Down Expand Up @@ -119,7 +119,7 @@ func (dd *DNSDiscovery) Sources() []string {
return srcs
}

func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) {
func (dd *DNSDiscovery) refreshAll(ch chan<- config.TargetGroup) {
var wg sync.WaitGroup
wg.Add(len(dd.names))
for _, name := range dd.names {
Expand All @@ -133,15 +133,15 @@ func (dd *DNSDiscovery) refreshAll(ch chan<- *config.TargetGroup) {
wg.Wait()
}

func (dd *DNSDiscovery) refresh(name string, ch chan<- *config.TargetGroup) error {
func (dd *DNSDiscovery) refresh(name string, ch chan<- config.TargetGroup) error {
response, err := lookupAll(name, dd.qtype)
dnsSDLookupsCount.Inc()
if err != nil {
dnsSDLookupFailuresCount.Inc()
return err
}

tg := &config.TargetGroup{}
var tg config.TargetGroup
for _, record := range response.Answer {
target := model.LabelValue("")
switch addr := record.(type) {
Expand Down
6 changes: 3 additions & 3 deletions retrieval/discovery/ec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewEC2Discovery(conf *config.EC2SDConfig) *EC2Discovery {
}

// Run implements the TargetProvider interface.
func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
func (ed *EC2Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch)

ticker := time.NewTicker(ed.interval)
Expand All @@ -73,7 +73,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{})
if err != nil {
log.Error(err)
} else {
ch <- tg
ch <- *tg
}

for {
Expand All @@ -83,7 +83,7 @@ func (ed *EC2Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{})
if err != nil {
log.Error(err)
} else {
ch <- tg
ch <- *tg
}
case <-done:
return
Expand Down
8 changes: 4 additions & 4 deletions retrieval/discovery/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (fd *FileDiscovery) watchFiles() {
}

// Run implements the TargetProvider interface.
func (fd *FileDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
func (fd *FileDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch)
defer fd.stop()

Expand Down Expand Up @@ -188,7 +188,7 @@ func (fd *FileDiscovery) stop() {

// refresh reads all files matching the discovery's patterns and sends the respective
// updated target groups through the channel.
func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
func (fd *FileDiscovery) refresh(ch chan<- config.TargetGroup) {
ref := map[string]int{}
for _, p := range fd.listFiles() {
tgroups, err := readFile(p)
Expand All @@ -199,7 +199,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
continue
}
for _, tg := range tgroups {
ch <- tg
ch <- *tg
}
ref[p] = len(tgroups)
}
Expand All @@ -208,7 +208,7 @@ func (fd *FileDiscovery) refresh(ch chan<- *config.TargetGroup) {
m, ok := ref[f]
if !ok || n > m {
for i := m; i < n; i++ {
ch <- &config.TargetGroup{Source: fileSource(f, i)}
ch <- config.TargetGroup{Source: fileSource(f, i)}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion retrieval/discovery/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func testFileSD(t *testing.T, ext string) {

var (
fsd = NewFileDiscovery(&conf)
ch = make(chan *config.TargetGroup)
ch = make(chan config.TargetGroup)
done = make(chan struct{})
)
go fsd.Run(ch, done)
Expand Down
36 changes: 25 additions & 11 deletions retrieval/discovery/kubernetes/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,25 +173,35 @@ func (kd *Discovery) Sources() []string {
}

// Run implements the TargetProvider interface.
func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
func (kd *Discovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch)

select {
case ch <- kd.updateMastersTargetGroup():
case <-done:
return
if tg := kd.updateMastersTargetGroup(); tg != nil {
select {
case ch <- *tg:
case <-done:
return
}
}

select {
case ch <- kd.updateNodesTargetGroup():
case <-done:
return
if tg := kd.updateNodesTargetGroup(); tg != nil {
select {
case ch <- *tg:
case <-done:
return
}
}

for _, ns := range kd.services {
for _, service := range ns {
tg := kd.addService(service)

if tg == nil {
continue
}

select {
case ch <- kd.addService(service):
case ch <- *tg:
case <-done:
return
}
Expand Down Expand Up @@ -223,8 +233,12 @@ func (kd *Discovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
}
}

if tg == nil {
continue
}

select {
case ch <- tg:
case ch <- *tg:
case <-done:
return
}
Expand Down
8 changes: 4 additions & 4 deletions retrieval/discovery/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (md *MarathonDiscovery) Sources() []string {
}

// Run implements the TargetProvider interface.
func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
func (md *MarathonDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch)

for {
Expand All @@ -69,23 +69,23 @@ func (md *MarathonDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan stru
}
}

func (md *MarathonDiscovery) updateServices(ch chan<- *config.TargetGroup) error {
func (md *MarathonDiscovery) updateServices(ch chan<- config.TargetGroup) error {
targetMap, err := md.fetchTargetGroups()
if err != nil {
return err
}

// Update services which are still present
for _, tg := range targetMap {
ch <- tg
ch <- *tg
}

// Remove services which did disappear
for source := range md.lastRefresh {
_, ok := targetMap[source]
if !ok {
log.Debugf("Removing group for %s", source)
ch <- &config.TargetGroup{Source: source}
ch <- config.TargetGroup{Source: source}
}
}

Expand Down
4 changes: 2 additions & 2 deletions retrieval/discovery/marathon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (

var marathonValidLabel = map[string]string{"prometheus": "yes"}

func newTestDiscovery(client marathon.AppListClient) (chan *config.TargetGroup, *MarathonDiscovery) {
ch := make(chan *config.TargetGroup)
func newTestDiscovery(client marathon.AppListClient) (chan config.TargetGroup, *MarathonDiscovery) {
ch := make(chan config.TargetGroup)
md := NewMarathonDiscovery(&config.MarathonSDConfig{
Servers: []string{"http://localhost:8080"},
})
Expand Down
8 changes: 4 additions & 4 deletions retrieval/discovery/serverset.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type ServersetDiscovery struct {
conn *zk.Conn
mu sync.RWMutex
sources map[string]*config.TargetGroup
sdUpdates *chan<- *config.TargetGroup
sdUpdates *chan<- config.TargetGroup
updates chan zookeeperTreeCacheEvent
treeCaches []*zookeeperTreeCache
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func (sd *ServersetDiscovery) processUpdates() {
}
sd.mu.Unlock()
if sd.sdUpdates != nil {
*sd.sdUpdates <- tg
*sd.sdUpdates <- *tg
}
}

Expand All @@ -134,11 +134,11 @@ func (sd *ServersetDiscovery) processUpdates() {
}

// Run implements the TargetProvider interface.
func (sd *ServersetDiscovery) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
func (sd *ServersetDiscovery) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
// Send on everything we have seen so far.
sd.mu.Lock()
for _, targetGroup := range sd.sources {
ch <- targetGroup
ch <- *targetGroup
}
// Tell processUpdates to send future updates.
sd.sdUpdates = &ch
Expand Down
4 changes: 2 additions & 2 deletions retrieval/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ type fakeTargetProvider struct {
update chan *config.TargetGroup
}

func (tp *fakeTargetProvider) Run(ch chan<- *config.TargetGroup, done <-chan struct{}) {
func (tp *fakeTargetProvider) Run(ch chan<- config.TargetGroup, done <-chan struct{}) {
defer close(ch)
for {
select {
case tg := <-tp.update:
ch <- tg
ch <- *tg
case <-done:
return
}
Expand Down
Loading

0 comments on commit 288964e

Please sign in to comment.