Skip to content

Commit

Permalink
Make curation semaphore behavior idiomatic.
Browse files Browse the repository at this point in the history
Idiomatic semaphore usage in Go, unless it is wrapping a concrete type,
should use anonymous empty structs (``struct{}``).  This has several
features that are worthwhile:

  1. It conveys that the object in the channel is likely used for
     resource limiting / semaphore use.  This is by idiom.

  2. Due to magic under the hood, empty structs have a width of zero,
     meaning they consume little space.  It is presumed that slices,
     channels, and other values of them can be represented specially
     with alternative optimizations.  Dmitry Vyukov has done
     investigations into improvements that can be made to the channel
     design and Go and concludes that there are already nice short
     circuiting behaviors at work with this type.

This is the first change of several that apply this type of change to
suitable places.

In this one change, we fix a bug in the previous revision, whereby a
semaphore can be acquired for curation and never released back for
subsequent work: http://goo.gl/70Y2qK.  Compare that versus the
compaction definition above.

On top of that, the use of the semaphore in the mode better supports
system shutdown idioms through the closing of channels.

Change-Id: Idb4fca310f26b73c9ec690bbdd4136180d14c32d
  • Loading branch information
matttproud committed Apr 14, 2014
1 parent e9eda76 commit 1d01435
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type prometheus struct {
compactionTimer *time.Ticker
deletionTimer *time.Ticker

curationSema chan bool
curationSema chan struct{}
stopBackgroundOperations chan bool

unwrittenSamples chan *extraction.Result
Expand All @@ -101,17 +101,22 @@ func (p *prometheus) interruptHandler() {

func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {
select {
case p.curationSema <- true:
case s, ok := <-p.curationSema:
if !ok {
glog.Warning("Prometheus is shutting down; no more curation runs are allowed.")
return nil
}

defer func() {
p.curationSema <- s
}()

default:
glog.Warningf("Deferred compaction for %s and %s due to existing operation.", olderThan, groupSize)

return nil
}

defer func() {
<-p.curationSema
}()

processor := metric.NewCompactionProcessor(&metric.CompactionProcessorOptions{
MaximumMutationPoolBatch: groupSize * 3,
MinimumGroupSize: groupSize,
Expand All @@ -130,7 +135,16 @@ func (p *prometheus) compact(olderThan time.Duration, groupSize int) error {

func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
select {
case p.curationSema <- true:
case s, ok := <-p.curationSema:
if !ok {
glog.Warning("Prometheus is shutting down; no more curation runs are allowed.")
return nil
}

defer func() {
p.curationSema <- s
}()

default:
glog.Warningf("Deferred deletion for %s due to existing operation.", olderThan)

Expand All @@ -153,10 +167,7 @@ func (p *prometheus) delete(olderThan time.Duration, batchSize int) error {
}

func (p *prometheus) close() {
select {
case p.curationSema <- true:
default:
}
close(p.curationSema)

if p.compactionTimer != nil {
p.compactionTimer.Stop()
Expand Down Expand Up @@ -291,7 +302,7 @@ func main() {
deletionTimer: deletionTimer,

curationState: prometheusStatus,
curationSema: make(chan bool, 1),
curationSema: make(chan struct{}, 1),

unwrittenSamples: unwrittenSamples,

Expand All @@ -305,6 +316,8 @@ func main() {
}
defer prometheus.close()

prometheus.curationSema <- struct{}{}

storageStarted := make(chan bool)
go ts.Serve(storageStarted)
<-storageStarted
Expand Down

0 comments on commit 1d01435

Please sign in to comment.