diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go index 0ed77a9ef79..ee2e527599c 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/common_storage_db.go @@ -42,6 +42,8 @@ type StateDBConfig struct { // It is internally computed by the ledger component, // so it is not in ledger.StateDBConfig and not exposed to other components. LevelDBPath string + // Size of the stateDB cache. + CacheSizeMBs int } // CommonStorageDBProvider implements interface DBProvider @@ -60,8 +62,13 @@ func NewCommonStorageDBProvider( ) (DBProvider, error) { var vdbProvider statedb.VersionedDBProvider var err error + // TODO: system namespaces needed for the cache would be passed from kvLedger using + // the DeployedChaincodeProvider in FAB-13314 + sysNamespaces := []string{"lscc", "_lifecycle"} + cache := statedb.NewCache(stateDBConf.CacheSizeMBs, sysNamespaces) + if stateDBConf != nil && stateDBConf.StateDatabase == couchDB { - if vdbProvider, err = statecouchdb.NewVersionedDBProvider(stateDBConf.CouchDB, metricsProvider); err != nil { + if vdbProvider, err = statecouchdb.NewVersionedDBProvider(stateDBConf.CouchDB, metricsProvider, cache); err != nil { return nil, err } } else { diff --git a/core/ledger/kvledger/txmgmt/privacyenabledstate/test_exports.go b/core/ledger/kvledger/txmgmt/privacyenabledstate/test_exports.go index c6f27bfc40a..f6f5434e069 100644 --- a/core/ledger/kvledger/txmgmt/privacyenabledstate/test_exports.go +++ b/core/ledger/kvledger/txmgmt/privacyenabledstate/test_exports.go @@ -60,6 +60,7 @@ func (env *LevelDBCommonStorageTestEnv) Init(t testing.TB) { &StateDBConfig{ &ledger.StateDBConfig{}, dbPath, + 0, }, ) assert.NoError(t, err) diff --git a/core/ledger/kvledger/txmgmt/statedb/cache.go b/core/ledger/kvledger/txmgmt/statedb/cache.go index fe9ec7adc74..bfe6c4330ea 100644 --- a/core/ledger/kvledger/txmgmt/statedb/cache.go +++ b/core/ledger/kvledger/txmgmt/statedb/cache.go @@ -8,30 +8,158 @@ package statedb import ( "github.com/VictoriaMetrics/fastcache" + "github.com/golang/protobuf/proto" +) + +var ( + keySep = []byte{0x00} ) // Cache holds both the system and user cache type Cache struct { - sysCache *fastcache.Cache - usrCache *fastcache.Cache + sysCache *fastcache.Cache + usrCache *fastcache.Cache + sysNamespaces []string } -// New creates a Cache. The cache consists of both system state cache (for lscc, _lifecycle) +// NewCache creates a Cache. The cache consists of both system state cache (for lscc, _lifecycle) // and user state cache (for all user deployed chaincodes). The size of the // system state cache is 64 MB, by default. The size of the user state cache, in terms of MB, is -// specified via usrCacheSize parameter. Note that the fastcache allocates memory -// only in the multiples of 32 MB (due to 512 buckets & an equal number of 64 KB chunks per bucket). -// If the usrCacheSize is not a multiple of 32 MB, the fastcache would round the size +// specified via usrCacheSize parameter. Note that the maximum memory consumption of fastcache +// would be in the multiples of 32 MB (due to 512 buckets & an equal number of 64 KB chunks per bucket). +// If the usrCacheSizeMBs is not a multiple of 32 MB, the fastcache would round the size // to the next multiple of 32 MB. -func New(usrCacheSize int) *Cache { +func NewCache(usrCacheSizeMBs int, sysNamespaces []string) *Cache { cache := &Cache{} // By default, 64 MB is allocated for the system cache cache.sysCache = fastcache.New(64 * 1024 * 1024) + cache.sysNamespaces = sysNamespaces // User passed size is used to allocate memory for the user cache - if usrCacheSize <= 0 { + if usrCacheSizeMBs <= 0 { return cache } - cache.usrCache = fastcache.New(usrCacheSize * 1024 * 1024) + cache.usrCache = fastcache.New(usrCacheSizeMBs * 1024 * 1024) return cache } + +// Enabled returns true if the cache is enabled for a given namespace. +// Namespace can be of two types: system namespace (such as lscc) and user +// namespace (all user's chaincode states). +func (c *Cache) Enabled(namespace string) bool { + for _, ns := range c.sysNamespaces { + if namespace == ns { + return true + } + } + return c.usrCache != nil +} + +// GetState returns the value for a given namespace and key from +// a cache associated with the chainID. +func (c *Cache) GetState(chainID, namespace, key string) (*CacheValue, error) { + cache := c.getCache(namespace) + if cache == nil { + return nil, nil + } + + cacheKey := constructCacheKey(chainID, namespace, key) + valBytes := cache.Get(nil, cacheKey) + if valBytes == nil { + return nil, nil + } + + cacheValue := &CacheValue{} + if err := proto.Unmarshal(valBytes, cacheValue); err != nil { + return nil, err + } + return cacheValue, nil +} + +// PutState stores a given value in a cache associated with the chainID. +func (c *Cache) PutState(chainID, namespace, key string, cacheValue *CacheValue) error { + cache := c.getCache(namespace) + if cache == nil { + return nil + } + + cacheKey := constructCacheKey(chainID, namespace, key) + valBytes, err := proto.Marshal(cacheValue) + if err != nil { + return err + } + cache.Set(cacheKey, valBytes) + return nil +} + +// CacheUpdates is a map from a namespace to a set of cache KV updates +type CacheUpdates map[string]CacheKVs + +// CacheKVs is a map from a key to a cache value +type CacheKVs map[string]*CacheValue + +// Add adds the given cacheKVs to the CacheUpdates +func (u CacheUpdates) Add(namespace string, ckvs CacheKVs) { + nsu, ok := u[namespace] + if !ok { + nsu = CacheKVs{} + u[namespace] = nsu + } + + for k, v := range ckvs { + nsu[k] = v + } +} + +// UpdateStates updates only the existing entries in the cache associated with +// the chainID. +func (c *Cache) UpdateStates(chainID string, updates CacheUpdates) error { + for ns, kvs := range updates { + cache := c.getCache(ns) + if cache == nil { + continue + } + + for key, newVal := range kvs { + cacheKey := constructCacheKey(chainID, ns, key) + if newVal == nil { + cache.Del(cacheKey) + continue + } + if oldVal := cache.Get(nil, cacheKey); oldVal != nil { + newValBytes, err := proto.Marshal(newVal) + if err != nil { + return err + } + cache.Set(cacheKey, newValBytes) + } + } + } + return nil +} + +// Reset removes all the items from the cache. +func (c *Cache) Reset() { + c.sysCache.Reset() + if c.usrCache != nil { + c.usrCache.Reset() + } +} + +func (c *Cache) getCache(namespace string) *fastcache.Cache { + for _, ns := range c.sysNamespaces { + if namespace == ns { + return c.sysCache + } + } + return c.usrCache +} + +func constructCacheKey(chainID, namespace, key string) []byte { + var cacheKey []byte + cacheKey = append(cacheKey, []byte(chainID)...) + cacheKey = append(cacheKey, keySep...) + cacheKey = append(cacheKey, []byte(namespace)...) + cacheKey = append(cacheKey, keySep...) + return append(cacheKey, []byte(key)...) +} diff --git a/core/ledger/kvledger/txmgmt/statedb/cache_test.go b/core/ledger/kvledger/txmgmt/statedb/cache_test.go index 8522df66aee..3c84a3298b1 100644 --- a/core/ledger/kvledger/txmgmt/statedb/cache_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/cache_test.go @@ -10,21 +10,213 @@ import ( "testing" "github.com/VictoriaMetrics/fastcache" - "github.com/stretchr/testify/assert" + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/require" ) +var sysNamespaces = []string{"lscc", "_lifecycle"} + func TestNewCache(t *testing.T) { - cache := New(10) + cache := NewCache(32, sysNamespaces) expectedCache := &Cache{ - sysCache: fastcache.New(64 * 1024 * 1024), - usrCache: fastcache.New(10 * 1024 * 1024), + sysCache: fastcache.New(64 * 1024 * 1024), + usrCache: fastcache.New(32 * 1024 * 1024), + sysNamespaces: sysNamespaces, } - assert.Equal(t, expectedCache, cache) + require.Equal(t, expectedCache, cache) + require.True(t, cache.Enabled("lscc")) + require.True(t, cache.Enabled("_lifecycle")) + require.True(t, cache.Enabled("xyz")) - cache = New(0) + cache = NewCache(0, sysNamespaces) expectedCache = &Cache{ - sysCache: fastcache.New(64 * 1024 * 1024), - usrCache: nil, + sysCache: fastcache.New(64 * 1024 * 1024), + usrCache: nil, + sysNamespaces: sysNamespaces, } - assert.Equal(t, expectedCache, cache) + require.Equal(t, expectedCache, cache) + require.True(t, cache.Enabled("lscc")) + require.True(t, cache.Enabled("_lifecycle")) + require.False(t, cache.Enabled("xyz")) +} + +func TestGetPutState(t *testing.T) { + cache := NewCache(32, sysNamespaces) + + // test GetState + v, err := cache.GetState("ch1", "ns1", "k1") + require.NoError(t, err) + require.Nil(t, v) + + // test PutState + expectedValue1 := &CacheValue{Value: []byte("value1")} + require.NoError(t, cache.PutState("ch1", "ns1", "k1", expectedValue1)) + + v, err = cache.GetState("ch1", "ns1", "k1") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue1, v)) +} + +func TestUpdateStates(t *testing.T) { + cache := NewCache(32, sysNamespaces) + + // create states for three namespaces (ns1, ns2, ns3) + // each with two keys (k1, k2) + expectedValue1 := &CacheValue{Value: []byte("value1")} + require.NoError(t, cache.PutState("ch1", "ns1", "k1", expectedValue1)) + expectedValue2 := &CacheValue{Value: []byte("value2")} + require.NoError(t, cache.PutState("ch1", "ns1", "k2", expectedValue2)) + expectedValue3 := &CacheValue{Value: []byte("value3")} + require.NoError(t, cache.PutState("ch1", "ns2", "k1", expectedValue3)) + expectedValue4 := &CacheValue{Value: []byte("value4")} + require.NoError(t, cache.PutState("ch1", "ns2", "k2", expectedValue4)) + expectedValue5 := &CacheValue{Value: []byte("value5")} + require.NoError(t, cache.PutState("ch1", "ns3", "k1", expectedValue5)) + expectedValue6 := &CacheValue{Value: []byte("value6")} + require.NoError(t, cache.PutState("ch1", "ns3", "k2", expectedValue6)) + + v1, err := cache.GetState("ch1", "ns1", "k1") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue1, v1)) + v2, err := cache.GetState("ch1", "ns1", "k2") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue2, v2)) + v3, err := cache.GetState("ch1", "ns2", "k1") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue3, v3)) + v4, err := cache.GetState("ch1", "ns2", "k2") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue4, v4)) + v5, err := cache.GetState("ch1", "ns3", "k1") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue5, v5)) + v6, err := cache.GetState("ch1", "ns3", "k2") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue6, v6)) + + // delete (ns2, k1), (ns3, k1), and (ns3, k2) while updating others. + // nil value represents a delete operation. A new entry (ns3, k3) + // is also being passed but would not get added to the cache as the + // entry does not exist in cache already. + expectedValue7 := &CacheValue{Value: []byte("value7")} + expectedValue8 := &CacheValue{Value: []byte("value8")} + expectedValue9 := &CacheValue{Value: []byte("value9")} + expectedValue10 := &CacheValue{Value: []byte("value10")} + updates := CacheUpdates{ + "ns1": CacheKVs{ + "k1": expectedValue7, + "k2": expectedValue8, + }, + "ns2": CacheKVs{ + "k1": nil, + "k2": expectedValue9, + }, + "ns3": CacheKVs{ + "k1": nil, + "k2": nil, + "k3": expectedValue10, + }, + } + + require.NoError(t, cache.UpdateStates("ch1", updates)) + + v7, err := cache.GetState("ch1", "ns1", "k1") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue7, v7)) + + v8, err := cache.GetState("ch1", "ns1", "k2") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue8, v8)) + + v9, err := cache.GetState("ch1", "ns2", "k2") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue9, v9)) + + v, err := cache.GetState("ch1", "ns2", "k1") + require.NoError(t, err) + require.Nil(t, v) + v, err = cache.GetState("ch1", "ns3", "k1") + require.NoError(t, err) + require.Nil(t, v) + v, err = cache.GetState("ch1", "ns3", "k2") + require.NoError(t, err) + require.Nil(t, v) + v, err = cache.GetState("ch1", "ns3", "k3") + require.NoError(t, err) + require.Nil(t, v) +} + +func TestCacheReset(t *testing.T) { + cache := NewCache(32, sysNamespaces) + + // create states for three namespaces (ns1, ns2, ns3) + // each with two keys (k1, k2) + expectedValue1 := &CacheValue{Value: []byte("value1")} + require.NoError(t, cache.PutState("ch1", "ns1", "k1", expectedValue1)) + + expectedValue2 := &CacheValue{Value: []byte("value2")} + require.NoError(t, cache.PutState("ch1", "ns2", "k1", expectedValue2)) + + expectedValue3 := &CacheValue{Value: []byte("value3")} + require.NoError(t, cache.PutState("ch1", "lscc", "k1", expectedValue3)) + + v1, err := cache.GetState("ch1", "ns1", "k1") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue1, v1)) + + v2, err := cache.GetState("ch1", "ns2", "k1") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue2, v2)) + + v3, err := cache.GetState("ch1", "lscc", "k1") + require.NoError(t, err) + require.True(t, proto.Equal(expectedValue3, v3)) + + cache.Reset() + + v, err := cache.GetState("ch1", "ns1", "k1") + require.NoError(t, err) + require.Nil(t, v) + + v, err = cache.GetState("ch1", "ns2", "k1") + require.NoError(t, err) + require.Nil(t, v) + + v, err = cache.GetState("ch1", "lscc", "k1") + require.NoError(t, err) + require.Nil(t, v) +} + +func TestCacheUpdates(t *testing.T) { + u := make(CacheUpdates) + u.Add("ns1", CacheKVs{ + "k1": &CacheValue{Value: []byte("v1")}, + "k2": &CacheValue{Value: []byte("v2")}, + }) + + u.Add("ns1", CacheKVs{ + "k3": &CacheValue{Value: []byte("v1")}, + "k4": &CacheValue{Value: []byte("v2")}, + }) + + u.Add("ns2", CacheKVs{ + "k1": &CacheValue{Value: []byte("v1")}, + "k2": &CacheValue{Value: []byte("v2")}, + }) + + expectedCacheUpdates := CacheUpdates{ + "ns1": CacheKVs{ + "k1": &CacheValue{Value: []byte("v1")}, + "k2": &CacheValue{Value: []byte("v2")}, + "k3": &CacheValue{Value: []byte("v1")}, + "k4": &CacheValue{Value: []byte("v2")}, + }, + + "ns2": CacheKVs{ + "k1": &CacheValue{Value: []byte("v1")}, + "k2": &CacheValue{Value: []byte("v2")}, + }, + } + + require.Equal(t, expectedCacheUpdates, u) } diff --git a/core/ledger/kvledger/txmgmt/statedb/cache_value.pb.go b/core/ledger/kvledger/txmgmt/statedb/cache_value.pb.go new file mode 100644 index 00000000000..dfb45f20a34 --- /dev/null +++ b/core/ledger/kvledger/txmgmt/statedb/cache_value.pb.go @@ -0,0 +1,107 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: cache_value.proto + +package statedb + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type CacheValue struct { + VersionBytes []byte `protobuf:"bytes,1,opt,name=version_bytes,json=versionBytes,proto3" json:"version_bytes,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Metadata []byte `protobuf:"bytes,3,opt,name=metadata,proto3" json:"metadata,omitempty"` + AdditionalInfo []byte `protobuf:"bytes,4,opt,name=additional_info,json=additionalInfo,proto3" json:"additional_info,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *CacheValue) Reset() { *m = CacheValue{} } +func (m *CacheValue) String() string { return proto.CompactTextString(m) } +func (*CacheValue) ProtoMessage() {} +func (*CacheValue) Descriptor() ([]byte, []int) { + return fileDescriptor_c9816941fba5d88a, []int{0} +} + +func (m *CacheValue) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_CacheValue.Unmarshal(m, b) +} +func (m *CacheValue) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_CacheValue.Marshal(b, m, deterministic) +} +func (m *CacheValue) XXX_Merge(src proto.Message) { + xxx_messageInfo_CacheValue.Merge(m, src) +} +func (m *CacheValue) XXX_Size() int { + return xxx_messageInfo_CacheValue.Size(m) +} +func (m *CacheValue) XXX_DiscardUnknown() { + xxx_messageInfo_CacheValue.DiscardUnknown(m) +} + +var xxx_messageInfo_CacheValue proto.InternalMessageInfo + +func (m *CacheValue) GetVersionBytes() []byte { + if m != nil { + return m.VersionBytes + } + return nil +} + +func (m *CacheValue) GetValue() []byte { + if m != nil { + return m.Value + } + return nil +} + +func (m *CacheValue) GetMetadata() []byte { + if m != nil { + return m.Metadata + } + return nil +} + +func (m *CacheValue) GetAdditionalInfo() []byte { + if m != nil { + return m.AdditionalInfo + } + return nil +} + +func init() { + proto.RegisterType((*CacheValue)(nil), "statedb.CacheValue") +} + +func init() { proto.RegisterFile("cache_value.proto", fileDescriptor_c9816941fba5d88a) } + +var fileDescriptor_c9816941fba5d88a = []byte{ + // 208 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x44, 0x8f, 0x3d, 0x4e, 0xc4, 0x30, + 0x10, 0x85, 0x15, 0xfe, 0x65, 0x2d, 0x20, 0x2c, 0x0a, 0x8b, 0x0a, 0x41, 0x01, 0xd5, 0xba, 0xe0, + 0x04, 0xec, 0x56, 0xb4, 0x14, 0x14, 0x34, 0xd1, 0xd8, 0x9e, 0x24, 0x16, 0xb1, 0x27, 0x72, 0x26, + 0x11, 0xb9, 0x03, 0x87, 0x46, 0x31, 0x16, 0xdb, 0xcd, 0xfb, 0xbe, 0xd1, 0x93, 0x9e, 0xb8, 0xb1, + 0x60, 0x3b, 0xac, 0x67, 0xe8, 0x27, 0xdc, 0x0e, 0x89, 0x98, 0xe4, 0xf9, 0xc8, 0xc0, 0xe8, 0xcc, + 0xc3, 0x4f, 0x25, 0xc4, 0x7e, 0xd5, 0x1f, 0xab, 0x95, 0x8f, 0xe2, 0x72, 0xc6, 0x34, 0x7a, 0x8a, + 0xb5, 0x59, 0x18, 0x47, 0x55, 0xdd, 0x57, 0xcf, 0x9b, 0xf7, 0x4d, 0x81, 0xbb, 0x95, 0xc9, 0x5b, + 0x71, 0x9a, 0xbb, 0xd4, 0x51, 0x96, 0x7f, 0x41, 0xde, 0x89, 0x8b, 0x80, 0x0c, 0x0e, 0x18, 0xd4, + 0x71, 0x16, 0xff, 0x59, 0x3e, 0x89, 0x6b, 0x70, 0xce, 0xb3, 0xa7, 0x08, 0x7d, 0xed, 0x63, 0x43, + 0xea, 0x24, 0xbf, 0x5c, 0x1d, 0xf0, 0x5b, 0x6c, 0x68, 0xb7, 0xff, 0x7c, 0x6d, 0x3d, 0x77, 0x93, + 0xd9, 0x5a, 0x0a, 0xba, 0x5b, 0x06, 0x4c, 0x3d, 0xba, 0x16, 0x93, 0x6e, 0xc0, 0x24, 0x6f, 0xb5, + 0xa5, 0x84, 0xba, 0xa0, 0xaf, 0xb9, 0x1c, 0xfc, 0x1d, 0xda, 0xc0, 0xba, 0x6c, 0x32, 0x67, 0x79, + 0xe3, 0xcb, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x9d, 0x49, 0x6c, 0x17, 0xf8, 0x00, 0x00, 0x00, +} diff --git a/core/ledger/kvledger/txmgmt/statedb/cache_value.proto b/core/ledger/kvledger/txmgmt/statedb/cache_value.proto new file mode 100644 index 00000000000..49721ce8ace --- /dev/null +++ b/core/ledger/kvledger/txmgmt/statedb/cache_value.proto @@ -0,0 +1,18 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +syntax = "proto3"; + +option go_package = "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"; + +package statedb; + +message CacheValue { + bytes version_bytes = 1; + bytes value = 2; + bytes metadata = 3; + bytes additional_info = 4; +} diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/batch_util.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/batch_util.go index 7403b205aa7..3f1cbbd6d2b 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/batch_util.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/batch_util.go @@ -34,7 +34,6 @@ func executeBatches(batches []batch) error { defer batchWG.Done() if err := b.execute(); err != nil { errsChan <- err - return } }(b) } diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/commit_handling.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/commit_handling.go index f6ed359b6c9..1cb570be283 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/commit_handling.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/commit_handling.go @@ -18,6 +18,40 @@ import ( type committer struct { db *couchdb.CouchDatabase batchUpdateMap map[string]*batchableDocument + namespace string + cacheKVs statedb.CacheKVs + cacheEnabled bool +} + +func (c *committer) addToCacheUpdate(kv *keyValue) { + if !c.cacheEnabled { + return + } + + if kv.Value == nil { + // nil value denotes a delete operation + c.cacheKVs[kv.key] = nil + return + } + + c.cacheKVs[kv.key] = &statedb.CacheValue{ + VersionBytes: kv.Version.ToBytes(), + Value: kv.Value, + Metadata: kv.Metadata, + AdditionalInfo: []byte(kv.revision), + } +} + +func (c *committer) updateRevisionInCacheUpdate(key, rev string) { + if !c.cacheEnabled { + return + } + cv := c.cacheKVs[key] + if cv == nil { + // nil value denotes a delete + return + } + cv.AdditionalInfo = []byte(rev) } // buildCommitters builds committers per namespace. Each committer transforms the @@ -69,7 +103,6 @@ func (vdb *VersionedDB) buildCommittersForNs(ns string, nsUpdates map[string]*st if err != nil { return nil, err } - // for each namespace, build mutiple committers based on the maxBatchSize maxBatchSize := db.CouchInstance.MaxBatchUpdateSize() numCommitters := 1 @@ -77,10 +110,16 @@ func (vdb *VersionedDB) buildCommittersForNs(ns string, nsUpdates map[string]*st numCommitters = int(math.Ceil(float64(len(nsUpdates)) / float64(maxBatchSize))) } committers := make([]*committer, numCommitters) + + cacheEnabled := vdb.cache.Enabled(ns) + for i := 0; i < numCommitters; i++ { committers[i] = &committer{ db: db, batchUpdateMap: make(map[string]*batchableDocument), + namespace: ns, + cacheKVs: make(statedb.CacheKVs), + cacheEnabled: cacheEnabled, } } @@ -93,11 +132,13 @@ func (vdb *VersionedDB) buildCommittersForNs(ns string, nsUpdates map[string]*st i := 0 for key, vv := range nsUpdates { - couchDoc, err := keyValToCouchDoc(&keyValue{key: key, VersionedValue: vv}, revisions[key]) + kv := &keyValue{key: key, revision: revisions[key], VersionedValue: vv} + couchDoc, err := keyValToCouchDoc(kv) if err != nil { return nil, err } committers[i].batchUpdateMap[key] = &batchableDocument{CouchDoc: *couchDoc, Deleted: vv.Value == nil} + committers[i].addToCacheUpdate(kv) if maxBatchSize > 0 && len(committers[i].batchUpdateMap) == maxBatchSize { i++ } @@ -105,37 +146,6 @@ func (vdb *VersionedDB) buildCommittersForNs(ns string, nsUpdates map[string]*st return committers, nil } -func (vdb *VersionedDB) getRevisions(ns string, nsUpdates map[string]*statedb.VersionedValue) (map[string]string, error) { - // for now, getRevisions does not use cache. In FAB-15616, we will ensure that the getRevisions uses - // the cache which would be introduced in FAB-15537 - revisions := make(map[string]string) - nsRevs := vdb.committedDataCache.revs[ns] - - var missingKeys []string - var ok bool - for key := range nsUpdates { - if revisions[key], ok = nsRevs[key]; !ok { - missingKeys = append(missingKeys, key) - } - } - - db, err := vdb.getNamespaceDBHandle(ns) - if err != nil { - return nil, err - } - - logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.DBName) - retrievedMetadata, err := retrieveNsMetadata(db, missingKeys) - if err != nil { - return nil, err - } - for _, metadata := range retrievedMetadata { - revisions[metadata.ID] = metadata.Rev - } - - return revisions, nil -} - func (vdb *VersionedDB) executeCommitter(committers []*committer) error { errsChan := make(chan error, len(committers)) defer close(errsChan) @@ -172,14 +182,17 @@ func (c *committer) commitUpdates() error { if err != nil { return err } + // IF INDIVIDUAL DOCUMENTS IN THE BULK UPDATE DID NOT SUCCEED, TRY THEM INDIVIDUALLY // iterate through the response from CouchDB by document for _, resp := range responses { // If the document returned an error, retry the individual document if resp.Ok == true { + c.updateRevisionInCacheUpdate(resp.ID, resp.Rev) continue } doc := c.batchUpdateMap[resp.ID] + var err error //Remove the "_rev" from the JSON before saving //this will allow the CouchDB retry logic to retry revisions without encountering @@ -201,7 +214,9 @@ func (c *committer) commitUpdates() error { logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", resp.ID) // Save the individual document to couchdb // Note that this will do retries as needed - _, err = c.db.SaveDoc(resp.ID, "", &doc.CouchDoc) + var revision string + revision, err = c.db.SaveDoc(resp.ID, "", &doc.CouchDoc) + c.updateRevisionInCacheUpdate(resp.ID, revision) } // If the single document update or delete returns an error, then throw the error @@ -216,6 +231,79 @@ func (c *committer) commitUpdates() error { return nil } +func (vdb *VersionedDB) getRevisions(ns string, nsUpdates map[string]*statedb.VersionedValue) (map[string]string, error) { + revisions := make(map[string]string) + nsRevs := vdb.committedDataCache.revs[ns] + + var missingKeys []string + var ok bool + for key := range nsUpdates { + if revisions[key], ok = nsRevs[key]; !ok { + missingKeys = append(missingKeys, key) + } + } + + if len(missingKeys) == 0 { + // all revisions were present in the committedDataCache + return revisions, nil + } + + missingKeys, err := vdb.addMissingRevisionsFromCache(ns, missingKeys, revisions) + if err != nil { + return nil, err + } + + if len(missingKeys) == 0 { + // remaining revisions were present in the state cache + return revisions, nil + } + + // don't update the cache for missing entries as + // revisions are going to get changed after the commit + if err := vdb.addMissingRevisionsFromDB(ns, missingKeys, revisions); err != nil { + return nil, err + } + return revisions, nil +} + +func (vdb *VersionedDB) addMissingRevisionsFromCache(ns string, keys []string, revs map[string]string) ([]string, error) { + if !vdb.cache.Enabled(ns) { + return keys, nil + } + + var missingKeys []string + for _, k := range keys { + cv, err := vdb.cache.GetState(vdb.chainName, ns, k) + if err != nil { + return nil, err + } + if cv == nil { + missingKeys = append(missingKeys, k) + continue + } + revs[k] = string(cv.AdditionalInfo) + } + return missingKeys, nil +} + +func (vdb *VersionedDB) addMissingRevisionsFromDB(ns string, missingKeys []string, revisions map[string]string) error { + db, err := vdb.getNamespaceDBHandle(ns) + if err != nil { + return err + } + + logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.DBName) + retrievedMetadata, err := retrieveNsMetadata(db, missingKeys) + if err != nil { + return err + } + for _, metadata := range retrievedMetadata { + revisions[metadata.ID] = metadata.Rev + } + + return nil +} + //batchableDocument defines a document for a batch type batchableDocument struct { CouchDoc couchdb.CouchDoc diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go index cd6a5eaf46f..b26c6160689 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/couchdoc_conv.go @@ -26,7 +26,8 @@ const ( ) type keyValue struct { - key string + key string + revision string *statedb.VersionedValue } @@ -86,6 +87,11 @@ func couchDocToKeyValue(doc *couchdb.CouchDoc) (*keyValue, error) { if err != nil { return nil, err } + var revision string + if jsonResult[revField] != nil { + revision = jsonResult[revField].(string) + } + // remove the _id, _rev and version fields delete(jsonResult, idField) delete(jsonResult, revField) @@ -105,14 +111,16 @@ func couchDocToKeyValue(doc *couchdb.CouchDoc) (*keyValue, error) { return nil, err } } - return &keyValue{key, &statedb.VersionedValue{ - Value: returnValue, - Metadata: returnMetadata, - Version: returnVersion}, + return &keyValue{ + key, revision, + &statedb.VersionedValue{ + Value: returnValue, + Metadata: returnMetadata, + Version: returnVersion}, }, nil } -func keyValToCouchDoc(kv *keyValue, revision string) (*couchdb.CouchDoc, error) { +func keyValToCouchDoc(kv *keyValue) (*couchdb.CouchDoc, error) { type kvType int32 const ( kvTypeDelete = iota @@ -148,8 +156,8 @@ func keyValToCouchDoc(kv *keyValue, revision string) (*couchdb.CouchDoc, error) // add the (version + metadata), id, revision, and delete marker (if needed) jsonMap[versionField] = verAndMetadata jsonMap[idField] = key - if revision != "" { - jsonMap[revField] = revision + if kv.revision != "" { + jsonMap[revField] = kv.revision } if kvtype == kvTypeDelete { jsonMap[deletedField] = true diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go index 434f42e1151..3f56181c815 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb.go @@ -42,10 +42,11 @@ type VersionedDBProvider struct { mux sync.Mutex openCounts uint64 redoLoggerProvider *redoLoggerProvider + cache *statedb.Cache } // NewVersionedDBProvider instantiates VersionedDBProvider -func NewVersionedDBProvider(config *couchdb.Config, metricsProvider metrics.Provider) (*VersionedDBProvider, error) { +func NewVersionedDBProvider(config *couchdb.Config, metricsProvider metrics.Provider, cache *statedb.Cache) (*VersionedDBProvider, error) { logger.Debugf("constructing CouchDB VersionedDBProvider") couchInstance, err := couchdb.CreateCouchInstance(config, metricsProvider) if err != nil { @@ -64,6 +65,7 @@ func NewVersionedDBProvider(config *couchdb.Config, metricsProvider metrics.Prov mux: sync.Mutex{}, openCounts: 0, redoLoggerProvider: p, + cache: cache, }, nil } @@ -140,6 +142,7 @@ func (provider *VersionedDBProvider) GetDBHandle(dbName string) (statedb.Version provider.couchInstance, provider.redoLoggerProvider.newRedoLogger(dbName), dbName, + provider.cache, ) if err != nil { return nil, err @@ -170,10 +173,11 @@ type VersionedDB struct { verCacheLock sync.RWMutex mux sync.RWMutex redoLogger *redoLogger + cache *statedb.Cache } // newVersionedDB constructs an instance of VersionedDB -func newVersionedDB(couchInstance *couchdb.CouchInstance, redoLogger *redoLogger, dbName string) (*VersionedDB, error) { +func newVersionedDB(couchInstance *couchdb.CouchInstance, redoLogger *redoLogger, dbName string, cache *statedb.Cache) (*VersionedDB, error) { // CreateCouchDatabase creates a CouchDB database object, as well as the underlying database if it does not exist chainName := dbName dbName = couchdb.ConstructMetadataDBName(dbName) @@ -190,6 +194,7 @@ func newVersionedDB(couchInstance *couchdb.CouchInstance, redoLogger *redoLogger namespaceDBs: namespaceDBMap, committedDataCache: newVersionCache(), redoLogger: redoLogger, + cache: cache, } logger.Debugf("chain [%s]: checking for redolog record", chainName) redologRecord, err := redoLogger.load() @@ -305,7 +310,7 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro // GetVersion implements method in VersionedDB interface func (vdb *VersionedDB) GetVersion(namespace string, key string) (*version.Height, error) { - returnVersion, keyFound := vdb.GetCachedVersion(namespace, key) + version, keyFound := vdb.GetCachedVersion(namespace, key) if !keyFound { // This if block get executed only during simulation because during commit // we always call `LoadCommittedVersions` before calling `GetVersion` @@ -313,9 +318,9 @@ func (vdb *VersionedDB) GetVersion(namespace string, key string) (*version.Heigh if err != nil || vv == nil { return nil, err } - returnVersion = vv.Version + version = vv.Version } - return returnVersion, nil + return version, nil } // GetCachedVersion returns version from cache. `LoadCommittedVersions` function populates the cache @@ -344,6 +349,43 @@ func (vdb *VersionedDB) BytesKeySupported() bool { func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) { logger.Debugf("GetState(). ns=%s, key=%s", namespace, key) + // (1) read the KV from the cache if available + cacheEnabled := vdb.cache.Enabled(namespace) + if cacheEnabled { + cv, err := vdb.cache.GetState(vdb.chainName, namespace, key) + if err != nil { + return nil, err + } + if cv != nil { + vv, err := constructVersionedValue(cv) + if err != nil { + return nil, err + } + return vv, nil + } + } + + // (2) read from the database if cache miss occurs + kv, err := vdb.readFromDB(namespace, key) + if err != nil { + return nil, err + } + if kv == nil { + return nil, nil + } + + // (3) if the value is not nil, store in the cache + if cacheEnabled { + cacheValue := constructCacheValue(kv.VersionedValue, kv.revision) + if err := vdb.cache.PutState(vdb.chainName, namespace, key, cacheValue); err != nil { + return nil, err + } + } + + return kv.VersionedValue, nil +} + +func (vdb *VersionedDB) readFromDB(namespace, key string) (*keyValue, error) { db, err := vdb.getNamespaceDBHandle(namespace) if err != nil { return nil, err @@ -359,8 +401,7 @@ func (vdb *VersionedDB) GetState(namespace string, key string) (*statedb.Version if err != nil { return nil, err } - - return kv.VersionedValue, nil + return kv, nil } // GetStateMultipleKeys implements method in VersionedDB interface @@ -581,7 +622,6 @@ func (vdb *VersionedDB) ApplyUpdates(updates *statedb.UpdateBatch, height *versi func (vdb *VersionedDB) applyUpdates(updates *statedb.UpdateBatch, height *version.Height) error { // TODO a note about https://jira.hyperledger.org/browse/FAB-8622 - // the function `Apply update can be split into three functions. Each carrying out one of the following three stages`. // The write lock is needed only for the stage 2. // stage 1 - buildCommitters builds committers per namespace (per DB). Each committer transforms the @@ -596,15 +636,57 @@ func (vdb *VersionedDB) applyUpdates(updates *statedb.UpdateBatch, height *versi return err } - // Stgae 3 - PostUpdateProcessing - flush and record savepoint. + // Stgae 3 - postCommitProcessing - flush and record savepoint. namespaces := updates.GetUpdatedNamespaces() + if err := vdb.postCommitProcessing(committers, namespaces, height); err != nil { + return err + } + + return nil +} + +func (vdb *VersionedDB) postCommitProcessing(committers []*committer, namespaces []string, height *version.Height) error { + var wg sync.WaitGroup + + wg.Add(1) + errChan := make(chan error, 1) + defer close(errChan) + go func() { + defer wg.Done() + + cacheUpdates := make(statedb.CacheUpdates) + for _, c := range committers { + if !c.cacheEnabled { + continue + } + cacheUpdates.Add(c.namespace, c.cacheKVs) + } + + if len(cacheUpdates) == 0 { + return + } + + // update the cache + if err := vdb.cache.UpdateStates(vdb.chainName, cacheUpdates); err != nil { + vdb.cache.Reset() + errChan <- err + } + + }() + // Record a savepoint at a given height - if err = vdb.ensureFullCommitAndRecordSavepoint(height, namespaces); err != nil { + if err := vdb.ensureFullCommitAndRecordSavepoint(height, namespaces); err != nil { logger.Errorf("Error during recordSavepoint: %s", err.Error()) return err } - return nil + wg.Wait() + select { + case err := <-errChan: + return err + default: + return nil + } } // ClearCachedVersions clears committedVersions and revisionNumbers @@ -639,6 +721,7 @@ func (vdb *VersionedDB) ensureFullCommitAndRecordSavepoint(height *version.Heigh defer close(errsChan) var commitWg sync.WaitGroup commitWg.Add(len(namespaces)) + for _, ns := range namespaces { go func(ns string) { defer commitWg.Done() @@ -844,3 +927,25 @@ func (scanner *queryScanner) GetBookmarkAndClose() string { scanner.Close() return retval } + +func constructCacheValue(v *statedb.VersionedValue, rev string) *statedb.CacheValue { + return &statedb.CacheValue{ + VersionBytes: v.Version.ToBytes(), + Value: v.Value, + Metadata: v.Metadata, + AdditionalInfo: []byte(rev), + } +} + +func constructVersionedValue(cv *statedb.CacheValue) (*statedb.VersionedValue, error) { + height, _, err := version.NewHeightFromBytes(cv.VersionBytes) + if err != nil { + return nil, err + } + + return &statedb.VersionedValue{ + Value: cv.Value, + Version: height, + Metadata: cv.Metadata, + }, nil +} diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go index 66d69379e33..cdb0f73d20e 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test.go @@ -60,6 +60,285 @@ func TestBasicRW(t *testing.T) { } +// TestGetStateFromCache checks cache hits, cache misses, and cache +// updates during GetState call. +func TestGetStateFromCache(t *testing.T) { + cache := statedb.NewCache(32, []string{"lscc"}) + + env := newTestVDBEnvWithCache(t, cache) + defer env.Cleanup() + chainID := "testgetstatefromcache" + db, err := env.DBProvider.GetDBHandle(chainID) + require.NoError(t, err) + + // scenario 1: get state would receives a + // cache miss as the given key does not exist. + // As the key does not exist in the + // db also, get state call would not update + // the cache. + vv, err := db.GetState("ns", "key1") + require.NoError(t, err) + require.Nil(t, vv) + testDoesNotExistInCache(t, cache, chainID, "ns", "key1") + + // scenario 2: get state would receive a cache hit. + // directly store an entry in the cache + cacheValue := &statedb.CacheValue{ + Value: []byte("value1"), + Metadata: []byte("meta1"), + VersionBytes: version.NewHeight(1, 1).ToBytes(), + AdditionalInfo: []byte("rev1"), + } + require.NoError(t, cache.PutState(chainID, "ns", "key1", cacheValue)) + + vv, err = db.GetState("ns", "key1") + expectedVV, err := constructVersionedValue(cacheValue) + require.NoError(t, err) + require.Equal(t, expectedVV, vv) + + // scenario 3: get state would receives a + // cache miss as the given key does not present. + // The value associated with the key would be + // fetched from the database and the cache would + // be updated accordingly. + + // store an entry in the db + batch := statedb.NewUpdateBatch() + vv2 := &statedb.VersionedValue{Value: []byte("value2"), Metadata: []byte("meta2"), Version: version.NewHeight(1, 2)} + batch.PutValAndMetadata("lscc", "key1", vv2.Value, vv2.Metadata, vv2.Version) + savePoint := version.NewHeight(1, 2) + db.ApplyUpdates(batch, savePoint) + // Note that the ApplyUpdates() updates only the existing entry in the cache. Currently, the + // cache has only ns, key1 but we are storing lscc, key1. Hence, no changes would happen in the cache. + testDoesNotExistInCache(t, cache, chainID, "lscc", "key1") + + // calling GetState() would update the cache + vv, err = db.GetState("lscc", "key1") + require.NoError(t, err) + require.Equal(t, vv2, vv) + + // cache should have been updated with lscc, key1 + nsdb, err := db.(*VersionedDB).getNamespaceDBHandle("lscc") + require.NoError(t, err) + testExistInCache(t, nsdb, cache, chainID, "lscc", "key1", vv2) +} + +// TestGetVersionFromCache checks cache hits, cache misses, and +// updates during GetVersion call. +func TestGetVersionFromCache(t *testing.T) { + cache := statedb.NewCache(32, []string{"lscc"}) + + env := newTestVDBEnvWithCache(t, cache) + defer env.Cleanup() + chainID := "testgetstatefromcache" + db, err := env.DBProvider.GetDBHandle(chainID) + require.NoError(t, err) + + // scenario 1: get version would receives a + // cache miss as the given key does not exist. + // As the key does not exist in the + // db also, get version call would not update + // the cache. + ver, err := db.GetVersion("ns", "key1") + require.Nil(t, err) + require.Nil(t, ver) + testDoesNotExistInCache(t, cache, chainID, "ns", "key1") + + // scenario 2: get version would receive a cache hit. + // directly store an entry in the cache + cacheValue := &statedb.CacheValue{ + Value: []byte("value1"), + Metadata: []byte("meta1"), + VersionBytes: version.NewHeight(1, 1).ToBytes(), + AdditionalInfo: []byte("rev1"), + } + require.NoError(t, cache.PutState(chainID, "ns", "key1", cacheValue)) + + ver, err = db.GetVersion("ns", "key1") + expectedVer, _, err := version.NewHeightFromBytes(cacheValue.VersionBytes) + require.NoError(t, err) + require.Equal(t, expectedVer, ver) + + // scenario 3: get version would receives a + // cache miss as the given key does not present. + // The value associated with the key would be + // fetched from the database and the cache would + // be updated accordingly. + + // store an entry in the db + batch := statedb.NewUpdateBatch() + vv2 := &statedb.VersionedValue{Value: []byte("value2"), Metadata: []byte("meta2"), Version: version.NewHeight(1, 2)} + batch.PutValAndMetadata("lscc", "key1", vv2.Value, vv2.Metadata, vv2.Version) + savePoint := version.NewHeight(1, 2) + db.ApplyUpdates(batch, savePoint) + // Note that the ApplyUpdates() updates only the existing entry in the cache. Currently, the + // cache has only ns, key1 but we are storing lscc, key1. Hence, no changes would happen in the cache. + testDoesNotExistInCache(t, cache, chainID, "lscc", "key1") + + // calling GetVersion() would update the cache + ver, err = db.GetVersion("lscc", "key1") + require.NoError(t, err) + require.Equal(t, vv2.Version, ver) + + // cache should have been updated with lscc, key1 + nsdb, err := db.(*VersionedDB).getNamespaceDBHandle("lscc") + require.NoError(t, err) + testExistInCache(t, nsdb, cache, chainID, "lscc", "key1", vv2) +} + +// TestGetMultipleStatesFromCache checks cache hits, cache misses, +// and updates during GetStateMultipleKeys call. +func TestGetMultipleStatesFromCache(t *testing.T) { + cache := statedb.NewCache(32, []string{"lscc"}) + + env := newTestVDBEnvWithCache(t, cache) + defer env.Cleanup() + chainID := "testgetmultiplestatesfromcache" + db, err := env.DBProvider.GetDBHandle(chainID) + require.NoError(t, err) + + // scenario: given 5 keys, get multiple states find + // 2 keys in the cache. The remaining 2 keys would be fetched + // from the database and the cache would be updated. The last + // key is not present in the db and hence it won't be sent to + // the cache. + + // key1 and key2 exist only in the cache + cacheValue1 := &statedb.CacheValue{ + Value: []byte("value1"), + Metadata: []byte("meta1"), + VersionBytes: version.NewHeight(1, 1).ToBytes(), + AdditionalInfo: []byte("rev1"), + } + require.NoError(t, cache.PutState(chainID, "ns", "key1", cacheValue1)) + cacheValue2 := &statedb.CacheValue{ + Value: []byte("value2"), + Metadata: []byte("meta2"), + VersionBytes: version.NewHeight(1, 1).ToBytes(), + AdditionalInfo: []byte("rev2"), + } + require.NoError(t, cache.PutState(chainID, "ns", "key2", cacheValue2)) + + // key3 and key4 exist only in the db + batch := statedb.NewUpdateBatch() + vv3 := &statedb.VersionedValue{Value: []byte("value3"), Metadata: []byte("meta3"), Version: version.NewHeight(1, 1)} + batch.PutValAndMetadata("ns", "key3", vv3.Value, vv3.Metadata, vv3.Version) + vv4 := &statedb.VersionedValue{Value: []byte("value4"), Metadata: []byte("meta4"), Version: version.NewHeight(1, 1)} + batch.PutValAndMetadata("ns", "key4", vv4.Value, vv4.Metadata, vv4.Version) + savePoint := version.NewHeight(1, 2) + db.ApplyUpdates(batch, savePoint) + + testDoesNotExistInCache(t, cache, chainID, "ns", "key3") + testDoesNotExistInCache(t, cache, chainID, "ns", "key4") + + // key5 does not exist at all while key3 and key4 does not exist in the cache + vvalues, err := db.GetStateMultipleKeys("ns", []string{"key1", "key2", "key3", "key4", "key5"}) + require.Nil(t, err) + vv1, err := constructVersionedValue(cacheValue1) + require.NoError(t, err) + vv2, err := constructVersionedValue(cacheValue2) + require.NoError(t, err) + require.Equal(t, []*statedb.VersionedValue{vv1, vv2, vv3, vv4, nil}, vvalues) + + // cache should have been updated with key3 and key4 + nsdb, err := db.(*VersionedDB).getNamespaceDBHandle("ns") + require.NoError(t, err) + testExistInCache(t, nsdb, cache, chainID, "ns", "key3", vv3) + testExistInCache(t, nsdb, cache, chainID, "ns", "key4", vv4) +} + +// TestCacheUpdatesAfterCommit checks whether the cache is updated +// after a commit of a update batch. +func TestCacheUpdatesAfterCommit(t *testing.T) { + cache := statedb.NewCache(32, []string{"lscc"}) + + env := newTestVDBEnvWithCache(t, cache) + defer env.Cleanup() + chainID := "testcacheupdatesaftercommit" + db, err := env.DBProvider.GetDBHandle(chainID) + require.NoError(t, err) + + // scenario: cache has 4 keys while the commit operation + // updates 2 of those keys, delete the remaining 2 keys, and + // adds a new key. At the end of the commit operation, only + // those 2 keys should be present with the recent value + // in the cache and the new key should not be present in the cache. + + // store 4 keys in the db + batch := statedb.NewUpdateBatch() + vv1 := &statedb.VersionedValue{Value: []byte("value1"), Metadata: []byte("meta1"), Version: version.NewHeight(1, 2)} + vv2 := &statedb.VersionedValue{Value: []byte("value2"), Metadata: []byte("meta2"), Version: version.NewHeight(1, 2)} + vv3 := &statedb.VersionedValue{Value: []byte("value3"), Metadata: []byte("meta3"), Version: version.NewHeight(1, 2)} + vv4 := &statedb.VersionedValue{Value: []byte("value4"), Metadata: []byte("meta4"), Version: version.NewHeight(1, 2)} + + batch.PutValAndMetadata("ns1", "key1", vv1.Value, vv1.Metadata, vv1.Version) + batch.PutValAndMetadata("ns1", "key2", vv2.Value, vv2.Metadata, vv2.Version) + batch.PutValAndMetadata("ns2", "key1", vv3.Value, vv3.Metadata, vv3.Version) + batch.PutValAndMetadata("ns2", "key2", vv4.Value, vv4.Metadata, vv4.Version) + savePoint := version.NewHeight(1, 5) + db.ApplyUpdates(batch, savePoint) + + // key1, key2 in ns1 and ns2 would not be in cache + testDoesNotExistInCache(t, cache, chainID, "ns1", "key1") + testDoesNotExistInCache(t, cache, chainID, "ns1", "key2") + testDoesNotExistInCache(t, cache, chainID, "ns2", "key1") + testDoesNotExistInCache(t, cache, chainID, "ns2", "key2") + + // add key1 and key2 from ns1 to the cache + _, err = db.GetState("ns1", "key1") + require.NoError(t, err) + _, err = db.GetState("ns1", "key2") + require.NoError(t, err) + // add key1 and key2 from ns2 to the cache + _, err = db.GetState("ns2", "key1") + require.NoError(t, err) + _, err = db.GetState("ns2", "key2") + require.NoError(t, err) + + v, err := cache.GetState(chainID, "ns1", "key1") + require.NoError(t, err) + ns1key1rev := string(v.AdditionalInfo) + + v, err = cache.GetState(chainID, "ns1", "key2") + require.NoError(t, err) + ns1key2rev := string(v.AdditionalInfo) + + // update key1 and key2 in ns1. delete key1 and key2 in ns2. add a new key3 in ns2. + batch = statedb.NewUpdateBatch() + vv1Update := &statedb.VersionedValue{Value: []byte("new-value1"), Metadata: []byte("meta1"), Version: version.NewHeight(2, 2)} + vv2Update := &statedb.VersionedValue{Value: []byte("new-value2"), Metadata: []byte("meta2"), Version: version.NewHeight(2, 2)} + vv3Update := &statedb.VersionedValue{Version: version.NewHeight(2, 4)} + vv4Update := &statedb.VersionedValue{Version: version.NewHeight(2, 5)} + vv5 := &statedb.VersionedValue{Value: []byte("value5"), Metadata: []byte("meta5"), Version: version.NewHeight(1, 2)} + + batch.PutValAndMetadata("ns1", "key1", vv1Update.Value, vv1Update.Metadata, vv1Update.Version) + batch.PutValAndMetadata("ns1", "key2", vv2Update.Value, vv2Update.Metadata, vv2Update.Version) + batch.Delete("ns2", "key1", vv3Update.Version) + batch.Delete("ns2", "key2", vv4Update.Version) + batch.PutValAndMetadata("ns2", "key3", vv5.Value, vv5.Metadata, vv5.Version) + savePoint = version.NewHeight(2, 5) + db.ApplyUpdates(batch, savePoint) + + // cache should have only the update key1 and key2 in ns1 + cacheValue, err := cache.GetState(chainID, "ns1", "key1") + require.NoError(t, err) + vv, err := constructVersionedValue(cacheValue) + require.NoError(t, err) + require.Equal(t, vv1Update, vv) + require.NotEqual(t, ns1key1rev, string(cacheValue.AdditionalInfo)) + + cacheValue, err = cache.GetState(chainID, "ns1", "key2") + require.NoError(t, err) + vv, err = constructVersionedValue(cacheValue) + require.NoError(t, err) + require.Equal(t, vv2Update, vv) + require.NotEqual(t, ns1key2rev, string(cacheValue.AdditionalInfo)) + + testDoesNotExistInCache(t, cache, chainID, "ns2", "key1") + testDoesNotExistInCache(t, cache, chainID, "ns2", "key2") + testDoesNotExistInCache(t, cache, chainID, "ns2", "key3") +} + func TestMultiDBBasicRW(t *testing.T) { env := NewTestVDBEnv(t) defer env.Cleanup() @@ -807,7 +1086,7 @@ func testFormatCheck(t *testing.T, dataFormat string, dataExists bool, expectedE RequestTimeout: 35 * time.Second, RedoLogPath: redoPath, } - dbProvider, err := NewVersionedDBProvider(config, &disabled.Provider{}) + dbProvider, err := NewVersionedDBProvider(config, &disabled.Provider{}, &statedb.Cache{}) require.NoError(t, err) // create preconditions for test @@ -827,7 +1106,7 @@ func testFormatCheck(t *testing.T, dataFormat string, dataExists bool, expectedE defer cleanupDB(t, dbProvider.couchInstance) // close and reopen with preconditions set and check the expected behavior - dbProvider, err = NewVersionedDBProvider(config, &disabled.Provider{}) + dbProvider, err = NewVersionedDBProvider(config, &disabled.Provider{}, &statedb.Cache{}) if expectedErr != nil { require.Equal(t, expectedErr, err) return @@ -842,3 +1121,20 @@ func testFormatCheck(t *testing.T, dataFormat string, dataExists bool, expectedE require.NoError(t, err) require.Equal(t, expectedFormat, format) } + +func testDoesNotExistInCache(t *testing.T, cache *statedb.Cache, chainID, ns, key string) { + cacheValue, err := cache.GetState(chainID, ns, key) + require.NoError(t, err) + require.Nil(t, cacheValue) +} + +func testExistInCache(t *testing.T, db *couchdb.CouchDatabase, cache *statedb.Cache, chainID, ns, key string, expectedVV *statedb.VersionedValue) { + cacheValue, err := cache.GetState(chainID, ns, key) + require.NoError(t, err) + vv, err := constructVersionedValue(cacheValue) + require.NoError(t, err) + require.Equal(t, expectedVV, vv) + metadata, err := retrieveNsMetadata(db, []string{key}) + require.NoError(t, err) + require.Equal(t, metadata[0].Rev, string(cacheValue.AdditionalInfo)) +} diff --git a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test_export.go b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test_export.go index ea391bbbaf6..0a102e10990 100644 --- a/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test_export.go +++ b/core/ledger/kvledger/txmgmt/statedb/statecouchdb/statecouchdb_test_export.go @@ -29,6 +29,10 @@ type TestVDBEnv struct { // NewTestVDBEnv instantiates and new couch db backed TestVDB func NewTestVDBEnv(t testing.TB) *TestVDBEnv { + return newTestVDBEnvWithCache(t, &statedb.Cache{}) +} + +func newTestVDBEnvWithCache(t testing.TB, cache *statedb.Cache) *TestVDBEnv { t.Logf("Creating new TestVDBEnv") redoPath, err := ioutil.TempDir("", "cvdbenv") if err != nil { @@ -45,7 +49,7 @@ func NewTestVDBEnv(t testing.TB) *TestVDBEnv { RequestTimeout: 35 * time.Second, RedoLogPath: redoPath, } - dbProvider, err := NewVersionedDBProvider(config, &disabled.Provider{}) + dbProvider, err := NewVersionedDBProvider(config, &disabled.Provider{}, cache) if err != nil { t.Fatalf("Error creating CouchDB Provider: %s", err) } @@ -60,7 +64,7 @@ func NewTestVDBEnv(t testing.TB) *TestVDBEnv { func (env *TestVDBEnv) CloseAndReopen() { env.DBProvider.Close() - dbProvider, _ := NewVersionedDBProvider(env.config, &disabled.Provider{}) + dbProvider, _ := NewVersionedDBProvider(env.config, &disabled.Provider{}, &statedb.Cache{}) env.DBProvider = dbProvider }