Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

/go/libraries/doltcore/sql/dsess: parallelize sql.NewDatabase work #8740

Merged
merged 10 commits into from
Jan 14, 2025
Merged
Next Next commit
/go/libraries/doltcore/sqle/dsess: parallelize new sqle db stuff
  • Loading branch information
coffeegoddd committed Jan 14, 2025
commit d869c70c26af2f1339e5ee2a43f98f1e0e056b03
47 changes: 26 additions & 21 deletions go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/dolthub/go-mysql-server/sql"
gmstypes "github.com/dolthub/go-mysql-server/sql/types"
"golang.org/x/sync/errgroup"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
Expand Down Expand Up @@ -398,34 +399,38 @@ func (a *AutoIncrementTracker) AcquireTableLock(ctx *sql.Context, tableName stri
}

func (a *AutoIncrementTracker) InitWithRoots(ctx context.Context, roots ...doltdb.Rootish) error {
for _, root := range roots {
r, err := root.ResolveRootValue(ctx)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(128)

err = r.IterTables(ctx, func(tableName doltdb.TableName, table *doltdb.Table, sch schema.Schema) (bool, error) {
if !schema.HasAutoIncrement(sch) {
return false, nil
for _, root := range roots {
eg.Go(func() error {
if egCtx.Err() != nil {
return egCtx.Err()
}

seq, err := table.GetAutoIncrementValue(ctx)
if err != nil {
return true, err
r, rerr := root.ResolveRootValue(egCtx)
if rerr != nil {
return rerr
}
return r.IterTables(ctx, func(tableName doltdb.TableName, table *doltdb.Table, sch schema.Schema) (bool, error) {
if !schema.HasAutoIncrement(sch) {
return false, nil
}

tableNameStr := tableName.ToLower().Name
if oldValue, loaded := a.sequences.LoadOrStore(tableNameStr, seq); loaded && seq > oldValue.(uint64) {
a.sequences.Store(tableNameStr, seq)
}
seq, iErr := table.GetAutoIncrementValue(egCtx)
if iErr != nil {
return true, iErr
}

return false, nil
})
tableNameStr := tableName.ToLower().Name
if oldValue, loaded := a.sequences.LoadOrStore(tableNameStr, seq); loaded && seq > oldValue.(uint64) {
a.sequences.Store(tableNameStr, seq)
}

if err != nil {
return err
}
return false, nil
})
})
}

return nil
return eg.Wait()
}
88 changes: 64 additions & 24 deletions go/libraries/doltcore/sqle/dsess/globalstate.go
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"

"github.com/dolthub/go-mysql-server/sql"
"golang.org/x/sync/errgroup"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
Expand All @@ -36,39 +37,78 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol
return GlobalStateImpl{}, err
}

eg2, egCtx := errgroup.WithContext(ctx)
eg2.SetLimit(128)

rootRefs := make([]ref.DoltRef, 0, len(branches)+len(remotes))
rootRefs = append(rootRefs, branches...)
rootRefs = append(rootRefs, remotes...)

var roots []doltdb.Rootish
rootChan := make(chan doltdb.Rootish, len(rootRefs))

wg := sync.WaitGroup{}

eg, _ := errgroup.WithContext(egCtx)
eg.Go(func() error {
wg.Wait()
close(rootChan)
return nil
})

for _, b := range rootRefs {
switch b.GetType() {
case ref.BranchRefType:
wsRef, err := ref.WorkingSetRefForHead(b)
if err != nil {
return GlobalStateImpl{}, err
wg.Add(1)

eg2.Go(func() error {
defer wg.Done()

if egCtx.Err() != nil {
return egCtx.Err()
}

ws, err := db.ResolveWorkingSet(ctx, wsRef)
if err == doltdb.ErrWorkingSetNotFound {
// use the branch head if there isn't a working set for it
cm, err := db.ResolveCommitRef(ctx, b)
if err != nil {
return GlobalStateImpl{}, err
switch b.GetType() {
case ref.BranchRefType:
wsRef, rerr := ref.WorkingSetRefForHead(b)
if rerr != nil {
return rerr
}
roots = append(roots, cm)
} else if err != nil {
return GlobalStateImpl{}, err
} else {
roots = append(roots, ws)
}
case ref.RemoteRefType:
cm, err := db.ResolveCommitRef(ctx, b)
if err != nil {
return GlobalStateImpl{}, err

ws, rerr := db.ResolveWorkingSet(egCtx, wsRef)
if rerr == doltdb.ErrWorkingSetNotFound {
// use the branch head if there isn't a working set for it
cm, rerr := db.ResolveCommitRef(egCtx, b)
if rerr != nil {
return rerr
}
rootChan <- cm
} else if err != nil {
return rerr
} else {
rootChan <- ws
}
case ref.RemoteRefType:
cm, rerr := db.ResolveCommitRef(egCtx, b)
if rerr != nil {
return rerr
}
rootChan <- cm
}
roots = append(roots, cm)
}
return nil
})
}

err = eg2.Wait()
if err != nil {
return GlobalStateImpl{}, err
}

err = eg.Wait()
if err != nil {
return GlobalStateImpl{}, err
}

var roots []doltdb.Rootish
for root := range rootChan {
roots = append(roots, root)
}

tracker, err := NewAutoIncrementTracker(ctx, dbName, roots...)
Expand Down
Loading