Skip to content

Commit

Permalink
refactor statecouchdb committer
Browse files Browse the repository at this point in the history
FAB-17025 #done

The goal of the CR is to make the committer code more readable.

Change-Id: Idd90da550b6bdcb1bbf2eda28fb3733616df13ba
Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Nov 9, 2019
1 parent a24aaa7 commit 53a1bce
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ func executeBatches(batches []batch) error {
}(b)
}
batchWG.Wait()
if len(errsChan) > 0 {
return <-errsChan

select {
case err := <-errsChan:
return err
default:
return nil
}
return nil
}
289 changes: 160 additions & 129 deletions core/ledger/kvledger/txmgmt/statedb/statecouchdb/commit_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,180 +7,211 @@ package statecouchdb

import (
"fmt"
"math"
"sync"

"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/pkg/errors"
)

// nsCommittersBuilder implements `batch` interface. Each batch operates on a specific namespace in the updates and
// builds one or more batches of type subNsCommitter.
type nsCommittersBuilder struct {
updates map[string]*statedb.VersionedValue
db *couchdb.CouchDatabase
revisions map[string]string
subNsCommitters []batch
}

// subNsCommitter implements `batch` interface. Each batch commits the portion of updates within a namespace assigned to it
type subNsCommitter struct {
type committer struct {
db *couchdb.CouchDatabase
batchUpdateMap map[string]*batchableDocument
}

// buildCommitters build the batches of type subNsCommitter. This functions processes different namespaces in parallel
func (vdb *VersionedDB) buildCommitters(updates *statedb.UpdateBatch) ([]batch, error) {
// buildCommitters builds committers per namespace. Each committer transforms the
// given batch in the form of underlying db and keep it in memory.
func (vdb *VersionedDB) buildCommitters(updates *statedb.UpdateBatch) ([]*committer, error) {
namespaces := updates.GetUpdatedNamespaces()
var nsCommitterBuilder []batch

// for each namespace, we build multiple committers (based on maxBatchSize per namespace)
var wg sync.WaitGroup
nsCommittersChan := make(chan []*committer, len(namespaces))
defer close(nsCommittersChan)
errsChan := make(chan error, len(namespaces))
defer close(errsChan)

// for each namespace, we build committers in parallel. This is because,
// the committer building process requires fetching of missing revisions
// that in turn, we want to do in parallel
for _, ns := range namespaces {
nsUpdates := updates.GetUpdates(ns)
db, err := vdb.getNamespaceDBHandle(ns)
if err != nil {
return nil, err
}
nsRevs := vdb.committedDataCache.revs[ns]
if nsRevs == nil {
nsRevs = make(nsRevisions)
wg.Add(1)
go func(ns string) {
defer wg.Done()
committers, err := vdb.buildCommittersForNs(ns, nsUpdates)
if err != nil {
errsChan <- err
return
}
nsCommittersChan <- committers
}(ns)
}
wg.Wait()

// collect all committers
var allCommitters []*committer
select {
case err := <-errsChan:
return nil, err
default:
for i := 0; i < len(namespaces); i++ {
allCommitters = append(allCommitters, <-nsCommittersChan...)
}
// for each namespace, construct one builder with the corresponding couchdb handle and couch revisions
// that are already loaded into cache (during validation phase)
nsCommitterBuilder = append(nsCommitterBuilder, &nsCommittersBuilder{updates: nsUpdates, db: db, revisions: nsRevs})
}
if err := executeBatches(nsCommitterBuilder); err != nil {

return allCommitters, nil
}

func (vdb *VersionedDB) buildCommittersForNs(ns string, nsUpdates map[string]*statedb.VersionedValue) ([]*committer, error) {
db, err := vdb.getNamespaceDBHandle(ns)
if err != nil {
return nil, err
}
// accumulate results across namespaces (one or more batches of `subNsCommitter` for a namespace from each builder)
var combinedSubNsCommitters []batch
for _, b := range nsCommitterBuilder {
combinedSubNsCommitters = append(combinedSubNsCommitters, b.(*nsCommittersBuilder).subNsCommitters...)

// for each namespace, build mutiple committers based on the maxBatchSize
maxBatchSize := db.CouchInstance.MaxBatchUpdateSize()
numCommitters := 1
if maxBatchSize > 0 {
numCommitters = int(math.Ceil(float64(len(nsUpdates)) / float64(maxBatchSize)))
}
committers := make([]*committer, numCommitters)
for i := 0; i < numCommitters; i++ {
committers[i] = &committer{
db: db,
batchUpdateMap: make(map[string]*batchableDocument),
}
}
return combinedSubNsCommitters, nil
}

// execute implements the function in `batch` interface. This function builds one or more `subNsCommitter`s that
// cover the updates for a namespace
func (builder *nsCommittersBuilder) execute() error {
if err := addRevisionsForMissingKeys(builder.revisions, builder.db, builder.updates); err != nil {
return err
// for each committer, create a couchDoc per key-value pair present in the update batch
// which are associated with the committer's namespace.
revisions, err := vdb.getRevisions(ns, nsUpdates)
if err != nil {
return nil, err
}
maxBatchSize := builder.db.CouchInstance.MaxBatchUpdateSize()
batchUpdateMap := make(map[string]*batchableDocument)
for key, vv := range builder.updates {
couchDoc, err := keyValToCouchDoc(&keyValue{key: key, VersionedValue: vv}, builder.revisions[key])

i := 0
for key, vv := range nsUpdates {
couchDoc, err := keyValToCouchDoc(&keyValue{key: key, VersionedValue: vv}, revisions[key])
if err != nil {
return err
return nil, err
}
batchUpdateMap[key] = &batchableDocument{CouchDoc: *couchDoc, Deleted: vv.Value == nil}
if len(batchUpdateMap) == maxBatchSize {
builder.subNsCommitters = append(builder.subNsCommitters, &subNsCommitter{builder.db, batchUpdateMap})
batchUpdateMap = make(map[string]*batchableDocument)
committers[i].batchUpdateMap[key] = &batchableDocument{CouchDoc: *couchDoc, Deleted: vv.Value == nil}
if maxBatchSize > 0 && len(committers[i].batchUpdateMap) == maxBatchSize {
i++
}
}
if len(batchUpdateMap) > 0 {
builder.subNsCommitters = append(builder.subNsCommitters, &subNsCommitter{builder.db, batchUpdateMap})
}
return nil
return committers, nil
}

// execute implements the function in `batch` interface. This function commits the updates managed by a `subNsCommitter`
func (committer *subNsCommitter) execute() error {
return commitUpdates(committer.db, committer.batchUpdateMap)
}
func (vdb *VersionedDB) getRevisions(ns string, nsUpdates map[string]*statedb.VersionedValue) (map[string]string, error) {
// for now, getRevisions does not use cache. In FAB-15616, we will ensure that the getRevisions uses
// the cache which would be introduced in FAB-15537
revisions := make(map[string]string)
nsRevs := vdb.committedDataCache.revs[ns]

// commitUpdates commits the given updates to couchdb
func commitUpdates(db *couchdb.CouchDatabase, batchUpdateMap map[string]*batchableDocument) error {
//Add the documents to the batch update array
batchUpdateDocs := []*couchdb.CouchDoc{}
for _, updateDocument := range batchUpdateMap {
batchUpdateDocument := updateDocument
batchUpdateDocs = append(batchUpdateDocs, &batchUpdateDocument.CouchDoc)
var missingKeys []string
var ok bool
for key := range nsUpdates {
if revisions[key], ok = nsRevs[key]; !ok {
missingKeys = append(missingKeys, key)
}
}

// Do the bulk update into couchdb. Note that this will do retries if the entire bulk update fails or times out
batchUpdateResp, err := db.BatchUpdateDocuments(batchUpdateDocs)
db, err := vdb.getNamespaceDBHandle(ns)
if err != nil {
return err
return nil, err
}
// IF INDIVIDUAL DOCUMENTS IN THE BULK UPDATE DID NOT SUCCEED, TRY THEM INDIVIDUALLY
// iterate through the response from CouchDB by document
for _, respDoc := range batchUpdateResp {
// If the document returned an error, retry the individual document
if respDoc.Ok != true {
batchUpdateDocument := batchUpdateMap[respDoc.ID]
var err error
//Remove the "_rev" from the JSON before saving
//this will allow the CouchDB retry logic to retry revisions without encountering
//a mismatch between the "If-Match" and the "_rev" tag in the JSON
if batchUpdateDocument.CouchDoc.JSONValue != nil {
err = removeJSONRevision(&batchUpdateDocument.CouchDoc.JSONValue)
if err != nil {
return err
}
}
// Check to see if the document was added to the batch as a delete type document
if batchUpdateDocument.Deleted {
logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", respDoc.ID)
// If this is a deleted document, then retry the delete
// If the delete fails due to a document not being found (404 error),
// the document has already been deleted and the DeleteDoc will not return an error
err = db.DeleteDoc(respDoc.ID, "")
} else {
logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", respDoc.ID)
// Save the individual document to couchdb
// Note that this will do retries as needed
_, err = db.SaveDoc(respDoc.ID, "", &batchUpdateDocument.CouchDoc)
}

// If the single document update or delete returns an error, then throw the error
if err != nil {
errorString := fmt.Sprintf("error saving document ID: %v. Error: %s, Reason: %s",
respDoc.ID, respDoc.Error, respDoc.Reason)

logger.Errorf(errorString)
return errors.WithMessage(err, errorString)
}
}
logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.DBName)
retrievedMetadata, err := retrieveNsMetadata(db, missingKeys)
if err != nil {
return nil, err
}
for _, metadata := range retrievedMetadata {
revisions[metadata.ID] = metadata.Rev
}
return nil
}

// nsFlusher implements `batch` interface and a batch executes the function `couchdb.EnsureFullCommit()` for the given namespace
type nsFlusher struct {
db *couchdb.CouchDatabase
return revisions, nil
}

func (vdb *VersionedDB) ensureFullCommit(dbs []*couchdb.CouchDatabase) error {
var flushers []batch
for _, db := range dbs {
flushers = append(flushers, &nsFlusher{db})
func (vdb *VersionedDB) executeCommitter(committers []*committer) error {
errsChan := make(chan error, len(committers))
defer close(errsChan)
var wg sync.WaitGroup
wg.Add(len(committers))

for _, c := range committers {
go func(c *committer) {
defer wg.Done()
if err := c.commitUpdates(); err != nil {
errsChan <- err
}
}(c)
}
return executeBatches(flushers)
}
wg.Wait()

func (f *nsFlusher) execute() error {
dbResponse, err := f.db.EnsureFullCommit()
if err != nil || dbResponse.Ok != true {
logger.Errorf("Failed to perform full commit")
return errors.New("failed to perform full commit")
select {
case err := <-errsChan:
return err
default:
return nil
}
return nil
}

func addRevisionsForMissingKeys(revisions map[string]string, db *couchdb.CouchDatabase, nsUpdates map[string]*statedb.VersionedValue) error {
var missingKeys []string
for key := range nsUpdates {
_, ok := revisions[key]
if !ok {
missingKeys = append(missingKeys, key)
}
// commitUpdates commits the given updates to couchdb
func (c *committer) commitUpdates() error {
docs := []*couchdb.CouchDoc{}
for _, update := range c.batchUpdateMap {
docs = append(docs, &update.CouchDoc)
}
logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.DBName)
retrievedMetadata, err := retrieveNsMetadata(db, missingKeys)

// Do the bulk update into couchdb. Note that this will do retries if the entire bulk update fails or times out
responses, err := c.db.BatchUpdateDocuments(docs)
if err != nil {
return err
}
for _, metadata := range retrievedMetadata {
revisions[metadata.ID] = metadata.Rev
// IF INDIVIDUAL DOCUMENTS IN THE BULK UPDATE DID NOT SUCCEED, TRY THEM INDIVIDUALLY
// iterate through the response from CouchDB by document
for _, resp := range responses {
// If the document returned an error, retry the individual document
if resp.Ok == true {
continue
}
doc := c.batchUpdateMap[resp.ID]
var err error
//Remove the "_rev" from the JSON before saving
//this will allow the CouchDB retry logic to retry revisions without encountering
//a mismatch between the "If-Match" and the "_rev" tag in the JSON
if doc.CouchDoc.JSONValue != nil {
err = removeJSONRevision(&doc.CouchDoc.JSONValue)
if err != nil {
return err
}
}
// Check to see if the document was added to the batch as a delete type document
if doc.Deleted {
logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", resp.ID)
// If this is a deleted document, then retry the delete
// If the delete fails due to a document not being found (404 error),
// the document has already been deleted and the DeleteDoc will not return an error
err = c.db.DeleteDoc(resp.ID, "")
} else {
logger.Warningf("CouchDB batch document update encountered an problem. Retrying update for document ID:%s", resp.ID)
// Save the individual document to couchdb
// Note that this will do retries as needed
_, err = c.db.SaveDoc(resp.ID, "", &doc.CouchDoc)
}

// If the single document update or delete returns an error, then throw the error
if err != nil {
errorString := fmt.Sprintf("error saving document ID: %v. Error: %s, Reason: %s",
resp.ID, resp.Error, resp.Reason)

logger.Errorf(errorString)
return errors.WithMessage(err, errorString)
}
}
return nil
}
Expand Down
Loading

0 comments on commit 53a1bce

Please sign in to comment.