Skip to content

Commit

Permalink
Fix and improve the fp locker.
Browse files Browse the repository at this point in the history
Benchmark:
$ go test -bench 'Fingerprint' -test.run 'Fingerprint' -test.cpu=1,2,4

OLD
BenchmarkFingerprintLockerParallel        500000              3618 ns/op
BenchmarkFingerprintLockerParallel-2      100000             12257 ns/op
BenchmarkFingerprintLockerParallel-4      500000             10164 ns/op
BenchmarkFingerprintLockerSerial        10000000               283 ns/op
BenchmarkFingerprintLockerSerial-2      10000000               284 ns/op
BenchmarkFingerprintLockerSerial-4      10000000               288 ns/op

NEW
BenchmarkFingerprintLockerParallel       1000000              1018 ns/op
BenchmarkFingerprintLockerParallel-2     1000000              1164 ns/op
BenchmarkFingerprintLockerParallel-4     2000000               910 ns/op
BenchmarkFingerprintLockerSerial        50000000                56.0 ns/op
BenchmarkFingerprintLockerSerial-2      50000000                47.9 ns/op
BenchmarkFingerprintLockerSerial-4      50000000                54.5 ns/op

Change-Id: I3c65a43822840e7e64c3c3cfe759e1de51272581
  • Loading branch information
juliusv authored and Bjoern Rabenstein committed Nov 25, 2014
1 parent 7ad55ef commit 7f5d3c2
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 104 deletions.
2 changes: 1 addition & 1 deletion retrieval/targetpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (p *TargetPool) Run() {
}
}

func (p TargetPool) Stop() {
func (p *TargetPool) Stop() {
stopped := make(chan bool)
p.done <- stopped
<-stopped
Expand Down
2 changes: 1 addition & 1 deletion rules/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func getTestVectorFromTestMatrix(matrix ast.Matrix) ast.Vector {
return vector
}

func storeMatrix(storage storage_ng.Storage, matrix ast.Matrix) {
func storeMatrix(storage local.Storage, matrix ast.Matrix) {
pendingSamples := clientmodel.Samples{}
for _, sampleSet := range matrix {
for _, sample := range sampleSet.Values {
Expand Down
4 changes: 2 additions & 2 deletions rules/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func vectorComparisonString(expected []string, actual []string) string {
separator)
}

func newTestStorage(t testing.TB) (storage storage_ng.Storage, closer test.Closer) {
storage, closer = storage_ng.NewTestStorage(t)
func newTestStorage(t testing.TB) (storage local.Storage, closer test.Closer) {
storage, closer = local.NewTestStorage(t)
storeMatrix(storage, testMatrix)
return storage, closer
}
Expand Down
93 changes: 19 additions & 74 deletions storage/local/locker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,93 +6,38 @@ import (
clientmodel "github.com/prometheus/client_golang/model"
)

// fingerprintLock allows locking exactly one fingerprint. When refCount is 0
// after the mutex is unlocked, the fingerprintLock is discarded from the
// fingerprintLocker.
type fingerprintLock struct {
sync.Mutex
refCount int
}

// fingerprintLocker allows locking individual fingerprints in such a manner
// that the lock only exists and uses memory while it is being held (or waiting
// to be acquired) by at least one party.
//
// TODO: This could be implemented as just a fixed number n of locks, assigned
// based on the fingerprint % n. There can be collisons, but they would
// statistically rarely matter (if n is much larger than the number of
// goroutines requiring locks concurrently). Only problem is locking of two
// different fingerprints by the same goroutine.
// fingerprintLocker allows locking individual fingerprints. To limit the number
// of mutexes needed for that, only a fixed number of mutexes are
// allocated. Fingerprints to be locked are assigned to those pre-allocated
// mutexes by their value. (Note that fingerprints are calculated by a hash
// function, so that an approximately equal distribution over the mutexes is
// expected, even without additional hashing of the fingerprint value.)
// Collisions are not detected. If two fingerprints get assigned to the same
// mutex, only one of them can be locked at the same time. As long as the number
// of pre-allocated mutexes is much larger than the number of goroutines
// requiring a fingerprint lock concurrently, the loss in efficiency is
// small. However, a goroutine must never lock more than one fingerprint at the
// same time. (In that case a collision would try to acquire the same mutex
// twice).
type fingerprintLocker struct {
mtx sync.Mutex
fpLocks map[clientmodel.Fingerprint]*fingerprintLock
fpLockPool []*fingerprintLock
fpMtxs []sync.Mutex
numFpMtxs uint
}

// newFingerprintLocker returns a new fingerprintLocker ready for use.
func newFingerprintLocker(preallocatedMutexes int) *fingerprintLocker {
lockPool := make([]*fingerprintLock, preallocatedMutexes)
for i := range lockPool {
lockPool[i] = &fingerprintLock{}
}
return &fingerprintLocker{
fpLocks: map[clientmodel.Fingerprint]*fingerprintLock{},
fpLockPool: lockPool,
make([]sync.Mutex, preallocatedMutexes),
uint(preallocatedMutexes),
}
}

// getLock either returns an existing fingerprintLock from a pool, or allocates
// a new one if the pool is depleted.
func (l *fingerprintLocker) getLock() *fingerprintLock {
return l.fpLockPool[0]
if len(l.fpLockPool) == 0 {
return &fingerprintLock{}
}

lock := l.fpLockPool[len(l.fpLockPool)-1]
l.fpLockPool = l.fpLockPool[:len(l.fpLockPool)-1]
return lock
}

// putLock either stores a fingerprintLock back in the pool, or throws it away
// if the pool is full.
func (l *fingerprintLocker) putLock(fpl *fingerprintLock) {
if len(l.fpLockPool) == cap(l.fpLockPool) {
return
}

l.fpLockPool = l.fpLockPool[:len(l.fpLockPool)+1]
l.fpLockPool[len(l.fpLockPool)-1] = fpl
}

// Lock locks the given fingerprint.
func (l *fingerprintLocker) Lock(fp clientmodel.Fingerprint) {
l.mtx.Lock()

fpLock, ok := l.fpLocks[fp]
if ok {
fpLock.refCount++
} else {
fpLock = l.getLock()
l.fpLocks[fp] = fpLock
}

l.mtx.Unlock()
fpLock.Lock()
l.fpMtxs[uint(fp)%l.numFpMtxs].Lock()
}

// Unlock unlocks the given fingerprint.
func (l *fingerprintLocker) Unlock(fp clientmodel.Fingerprint) {
l.mtx.Lock()
defer l.mtx.Unlock()

fpLock := l.fpLocks[fp]
fpLock.Unlock()

if fpLock.refCount == 0 {
delete(l.fpLocks, fp)
l.putLock(fpLock)
} else {
fpLock.refCount--
}
l.fpMtxs[uint(fp)%l.numFpMtxs].Unlock()
}
32 changes: 16 additions & 16 deletions storage/local/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
// - Make sure the length of the seriesMap doesn't change during the runtime.
// - Lock the fingerprints while persisting unpersisted head chunks.
// - Write to temporary file and only rename after successfully finishing.
func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries seriesMap) error {
func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries *seriesMap) error {
f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
return err
Expand Down Expand Up @@ -431,66 +431,66 @@ func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries seriesMap) er
// open (non-full) head chunks. Only call this method during start-up while
// nothing else is running in storage land. This method is utterly
// goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) {
func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
f, err := os.Open(p.headsPath())
if os.IsNotExist(err) {
return newSeriesMap(), nil
}
if err != nil {
return seriesMap{}, err
return nil, err
}
defer f.Close()
r := bufio.NewReaderSize(f, fileBufSize)

buf := make([]byte, len(headsMagicString))
if _, err := io.ReadFull(r, buf); err != nil {
return seriesMap{}, err
return nil, err
}
magic := string(buf)
if magic != headsMagicString {
return seriesMap{}, fmt.Errorf(
return nil, fmt.Errorf(
"unexpected magic string, want %q, got %q",
headsMagicString, magic,
)
}
if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil {
return seriesMap{}, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion)
return nil, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion)
}
numSeries, err := binary.ReadVarint(r)
if err != nil {
return seriesMap{}, err
return nil, err
}
fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries, numSeries)

for ; numSeries > 0; numSeries-- {
seriesFlags, err := r.ReadByte()
if err != nil {
return seriesMap{}, err
return nil, err
}
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
fp, err := codable.DecodeUint64(r)
if err != nil {
return seriesMap{}, err
return nil, err
}
var metric codable.Metric
if err := metric.UnmarshalFromReader(r); err != nil {
return seriesMap{}, err
return nil, err
}
numChunkDescs, err := binary.ReadVarint(r)
if err != nil {
return seriesMap{}, err
return nil, err
}
chunkDescs := make(chunkDescs, numChunkDescs)

for i := int64(0); i < numChunkDescs; i++ {
if headChunkPersisted || i < numChunkDescs-1 {
firstTime, err := binary.ReadVarint(r)
if err != nil {
return seriesMap{}, err
return nil, err
}
lastTime, err := binary.ReadVarint(r)
if err != nil {
return seriesMap{}, err
return nil, err
}
chunkDescs[i] = &chunkDesc{
firstTimeField: clientmodel.Timestamp(firstTime),
Expand All @@ -500,11 +500,11 @@ func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) {
// Non-persisted head chunk.
chunkType, err := r.ReadByte()
if err != nil {
return seriesMap{}, err
return nil, err
}
chunk := chunkForType(chunkType)
if err := chunk.unmarshal(r); err != nil {
return seriesMap{}, err
return nil, err
}
chunkDescs[i] = &chunkDesc{
chunk: chunk,
Expand All @@ -520,7 +520,7 @@ func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) {
headChunkPersisted: headChunkPersisted,
}
}
return seriesMap{m: fingerprintToSeries}, nil
return &seriesMap{m: fingerprintToSeries}, nil
}

// dropChunks deletes all chunks from a series whose last sample time is before
Expand Down
16 changes: 8 additions & 8 deletions storage/local/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ type seriesMap struct {

// newSeriesMap returns a newly allocated empty seriesMap. To create a seriesMap
// based on a prefilled map, use an explicit initializer.
func newSeriesMap() seriesMap {
return seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)}
func newSeriesMap() *seriesMap {
return &seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)}
}

// length returns the number of mappings in the seriesMap.
func (sm seriesMap) length() int {
func (sm *seriesMap) length() int {
sm.mtx.RLock()
defer sm.mtx.RUnlock()

Expand All @@ -52,7 +52,7 @@ func (sm seriesMap) length() int {

// get returns a memorySeries for a fingerprint. Return values have the same
// semantics as the native Go map.
func (sm seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) {
func (sm *seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()

Expand All @@ -61,15 +61,15 @@ func (sm seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) {
}

// put adds a mapping to the seriesMap.
func (sm seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) {
func (sm *seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) {
sm.mtx.Lock()
defer sm.mtx.Unlock()

sm.m[fp] = s
}

// del removes a mapping from the series Map.
func (sm seriesMap) del(fp clientmodel.Fingerprint) {
func (sm *seriesMap) del(fp clientmodel.Fingerprint) {
sm.mtx.Lock()
defer sm.mtx.Unlock()

Expand All @@ -83,7 +83,7 @@ func (sm seriesMap) del(fp clientmodel.Fingerprint) {
// for iterating over a map with a 'range' clause. However, if the next element
// in iteration order is removed after the current element has been received
// from the channel, it will still be produced by the channel.
func (sm seriesMap) iter() <-chan fingerprintSeriesPair {
func (sm *seriesMap) iter() <-chan fingerprintSeriesPair {
ch := make(chan fingerprintSeriesPair)
go func() {
sm.mtx.RLock()
Expand All @@ -105,7 +105,7 @@ func (sm seriesMap) iter() <-chan fingerprintSeriesPair {
// for iterating over a map with a 'range' clause. However, if the next element
// in iteration order is removed after the current element has been received
// from the channel, it will still be produced by the channel.
func (sm seriesMap) fpIter() <-chan clientmodel.Fingerprint {
func (sm *seriesMap) fpIter() <-chan clientmodel.Fingerprint {
ch := make(chan clientmodel.Fingerprint)
go func() {
sm.mtx.RLock()
Expand Down
2 changes: 1 addition & 1 deletion storage/local/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type memorySeriesStorage struct {
persistDone chan bool
stopServing chan chan<- bool

fingerprintToSeries seriesMap
fingerprintToSeries *seriesMap

memoryEvictionInterval time.Duration
memoryRetentionPeriod time.Duration
Expand Down
2 changes: 1 addition & 1 deletion templates/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestTemplateExpansion(t *testing.T) {

time := clientmodel.Timestamp(0)

storage, closer := storage_ng.NewTestStorage(t)
storage, closer := local.NewTestStorage(t)
defer closer.Close()
storage.AppendSamples(clientmodel.Samples{
{
Expand Down

0 comments on commit 7f5d3c2

Please sign in to comment.