Skip to content

Commit

Permalink
Retry creating unique index (#613)
Browse files Browse the repository at this point in the history
Introduce a retry mechanism for creating unique indexes.
First, create unique index concurrently. Then wait until Postgres is
done with the index creation, periodically checking the view
`pg_stat_progress_create_index`. Once it is done, lookup pg_index to see
if the index is validated or not. If it is, we are good to go. If not,
drop the index and try again.

Since we are running select queries on `pg_stat_progress_create_index`
and `pg_index` and we are expecting a real output to see the progress,
this approach doesn't work with fake db. I have added hardcoded
responses for the fake db scenario, so that we can safely process
migrations to update virtual schema as well.
Not sure if this is a good solution or not. Open for discussions.

Tested manually with high load, while 10m rows being inserted. But I
wonder if there's a way to add a test for the "high load" scenario.
  • Loading branch information
agedemenli authored Jan 24, 2025
1 parent c09619a commit 070e388
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 91 deletions.
32 changes: 9 additions & 23 deletions pkg/migrations/duplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const (
dataTypeMismatchErrorCode pq.ErrorCode = "42804"
undefinedFunctionErrorCode pq.ErrorCode = "42883"

cCreateUniqueIndexSQL = `CREATE UNIQUE INDEX CONCURRENTLY %s ON %s (%s)`
cSetDefaultSQL = `ALTER TABLE %s ALTER COLUMN %s SET DEFAULT %s`
cAlterTableAddCheckConstraintSQL = `ALTER TABLE %s ADD CONSTRAINT %s %s NOT VALID`
cAlterTableAddForeignKeySQL = `ALTER TABLE %s ADD CONSTRAINT %s FOREIGN KEY (%s) REFERENCES %s (%s) ON DELETE %s`
Expand Down Expand Up @@ -134,12 +133,16 @@ func (d *Duplicator) Duplicate(ctx context.Context) error {
}
}

// Generate SQL to duplicate any unique constraints on the columns
// The constraint is duplicated by adding a unique index on the column concurrently.
// Create indexes for unique constraints on the columns concurrently.
// The index is converted into a unique constraint on migration completion.
for _, sql := range d.stmtBuilder.duplicateUniqueConstraints(d.withoutConstraint, colNames...) {
if _, err := d.conn.ExecContext(ctx, sql); err != nil {
return err
for _, uc := range d.stmtBuilder.table.UniqueConstraints {
if slices.Contains(d.withoutConstraint, uc.Name) {
continue
}
if duplicatedMember, constraintColumns := d.stmtBuilder.allConstraintColumns(uc.Columns, colNames...); duplicatedMember {
if err := createUniqueIndexConcurrently(ctx, d.conn, "", DuplicationName(uc.Name), d.stmtBuilder.table.Name, constraintColumns); err != nil {
return err
}
}
}

Expand Down Expand Up @@ -180,23 +183,6 @@ func (d *duplicatorStmtBuilder) duplicateCheckConstraints(withoutConstraint []st
return stmts
}

func (d *duplicatorStmtBuilder) duplicateUniqueConstraints(withoutConstraint []string, colNames ...string) []string {
stmts := make([]string, 0, len(d.table.UniqueConstraints))
for _, uc := range d.table.UniqueConstraints {
if slices.Contains(withoutConstraint, uc.Name) {
continue
}
if duplicatedMember, constraintColumns := d.allConstraintColumns(uc.Columns, colNames...); duplicatedMember {
stmts = append(stmts, fmt.Sprintf(cCreateUniqueIndexSQL,
pq.QuoteIdentifier(DuplicationName(uc.Name)),
pq.QuoteIdentifier(d.table.Name),
strings.Join(quoteColumnNames(constraintColumns), ", "),
))
}
}
return stmts
}

func (d *duplicatorStmtBuilder) duplicateForeignKeyConstraints(withoutConstraint []string, colNames ...string) []string {
stmts := make([]string, 0, len(d.table.ForeignKeys))
for _, fk := range d.table.ForeignKeys {
Expand Down
83 changes: 42 additions & 41 deletions pkg/migrations/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,47 +102,6 @@ func TestDuplicateStmtBuilderCheckConstraints(t *testing.T) {
}
}

func TestDuplicateStmtBuilderUniqueConstraints(t *testing.T) {
d := &duplicatorStmtBuilder{table}
for name, testCases := range map[string]struct {
columns []string
expectedStmts []string
}{
"single column duplicated": {
columns: []string{"city"},
expectedStmts: []string{},
},
"single-column constraint with single column duplicated": {
columns: []string{"email"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_email" ON "test_table" ("_pgroll_new_email")`},
},
"single-column constraint with multiple column duplicated": {
columns: []string{"email", "description"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_email" ON "test_table" ("_pgroll_new_email")`},
},
"multi-column constraint with single column duplicated": {
columns: []string{"name"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "nick")`},
},
"multi-column constraint with multiple unrelated column duplicated": {
columns: []string{"name", "description"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "nick")`},
},
"multi-column constraint with multiple columns": {
columns: []string{"name", "nick"},
expectedStmts: []string{`CREATE UNIQUE INDEX CONCURRENTLY "_pgroll_dup_unique_name_nick" ON "test_table" ("_pgroll_new_name", "_pgroll_new_nick")`},
},
} {
t.Run(name, func(t *testing.T) {
stmts := d.duplicateUniqueConstraints(nil, testCases.columns...)
assert.Equal(t, len(testCases.expectedStmts), len(stmts))
for _, stmt := range stmts {
assert.True(t, slices.Contains(testCases.expectedStmts, stmt))
}
})
}
}

func TestDuplicateStmtBuilderForeignKeyConstraints(t *testing.T) {
d := &duplicatorStmtBuilder{table}
for name, testCases := range map[string]struct {
Expand Down Expand Up @@ -233,4 +192,46 @@ func TestDuplicateStmtBuilderIndexes(t *testing.T) {
}
}

func TestCreateIndexConcurrentlySqlGeneration(t *testing.T) {
for name, testCases := range map[string]struct {
indexName string
schemaName string
tableName string
columns []string
expectedStmt string
}{
"single column with schemaname": {
indexName: "idx_email",
schemaName: "test_sch",
tableName: "test_table",
columns: []string{"email"},
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_email" ON "test_sch"."test_table" ("email")`,
},
"single column with no schema name": {
indexName: "idx_email",
tableName: "test_table",
columns: []string{"email"},
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_email" ON "test_table" ("email")`,
},
"multi-column with no schema name": {
indexName: "idx_name_city",
tableName: "test_table",
columns: []string{"name", "city"},
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_name_city" ON "test_table" ("name", "city")`,
},
"multi-column with schema name": {
indexName: "idx_name_city",
schemaName: "test_sch",
tableName: "test_table",
columns: []string{"id", "name", "city"},
expectedStmt: `CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "idx_name_city" ON "test_sch"."test_table" ("id", "name", "city")`,
},
} {
t.Run(name, func(t *testing.T) {
stmt := getCreateUniqueIndexConcurrentlySQL(testCases.indexName, testCases.schemaName, testCases.tableName, testCases.columns)
assert.Equal(t, testCases.expectedStmt, stmt)
})
}
}

func ptr[T any](x T) *T { return &x }
124 changes: 124 additions & 0 deletions pkg/migrations/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// SPDX-License-Identifier: Apache-2.0

package migrations

import (
"context"
"fmt"
"strings"
"time"

"github.com/lib/pq"
"github.com/xataio/pgroll/pkg/db"
)

func createUniqueIndexConcurrently(ctx context.Context, conn db.DB, schemaName string, indexName string, tableName string, columnNames []string) error {
quotedQualifiedIndexName := pq.QuoteIdentifier(indexName)
if schemaName != "" {
quotedQualifiedIndexName = fmt.Sprintf("%s.%s", pq.QuoteIdentifier(schemaName), pq.QuoteIdentifier(indexName))
}
for retryCount := 0; retryCount < 5; retryCount++ {
// Add a unique index to the new column
// Indexes are created in the same schema with the table automatically. Instead of the qualified one, just pass the index name.
createIndexSQL := getCreateUniqueIndexConcurrentlySQL(indexName, schemaName, tableName, columnNames)
if _, err := conn.ExecContext(ctx, createIndexSQL); err != nil {
return fmt.Errorf("failed to add unique index %q: %w", indexName, err)
}

// Make sure Postgres is done creating the index
isInProgress, err := isIndexInProgress(ctx, conn, quotedQualifiedIndexName)
if err != nil {
return err
}

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for isInProgress {
<-ticker.C
isInProgress, err = isIndexInProgress(ctx, conn, quotedQualifiedIndexName)
if err != nil {
return err
}
}

// Check pg_index to see if it's valid or not. Break if it's valid.
isValid, err := isIndexValid(ctx, conn, quotedQualifiedIndexName)
if err != nil {
return err
}

if isValid {
// success
return nil
}

// If not valid, since Postgres has already given up validating the index,
// it will remain invalid forever. Drop it and try again.
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP INDEX IF EXISTS %s", quotedQualifiedIndexName))
if err != nil {
return fmt.Errorf("failed to drop index: %w", err)
}
}

// ran out of retries, return an error
return fmt.Errorf("failed to create unique index %q", indexName)
}

func getCreateUniqueIndexConcurrentlySQL(indexName string, schemaName string, tableName string, columnNames []string) string {
// create unique index concurrently
qualifiedTableName := pq.QuoteIdentifier(tableName)
if schemaName != "" {
qualifiedTableName = fmt.Sprintf("%s.%s", pq.QuoteIdentifier(schemaName), pq.QuoteIdentifier(tableName))
}

indexQuery := fmt.Sprintf(
"CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(indexName),
qualifiedTableName,
strings.Join(quoteColumnNames(columnNames), ", "),
)

return indexQuery
}

func isIndexInProgress(ctx context.Context, conn db.DB, quotedQualifiedIndexName string) (bool, error) {
rows, err := conn.QueryContext(ctx, `SELECT EXISTS(
SELECT * FROM pg_catalog.pg_stat_progress_create_index
WHERE index_relid = $1::regclass
)`, quotedQualifiedIndexName)
if err != nil {
return false, fmt.Errorf("getting index in progress with name %q: %w", quotedQualifiedIndexName, err)
}
if rows == nil {
// if rows == nil && err != nil, then it means we have queried a `FakeDB`.
// In that case, we can safely return false.
return false, nil
}
var isInProgress bool
if err := db.ScanFirstValue(rows, &isInProgress); err != nil {
return false, fmt.Errorf("scanning index in progress with name %q: %w", quotedQualifiedIndexName, err)
}

return isInProgress, nil
}

func isIndexValid(ctx context.Context, conn db.DB, quotedQualifiedIndexName string) (bool, error) {
rows, err := conn.QueryContext(ctx, `SELECT indisvalid
FROM pg_catalog.pg_index
WHERE indexrelid = $1::regclass`,
quotedQualifiedIndexName)
if err != nil {
return false, fmt.Errorf("getting index with name %q: %w", quotedQualifiedIndexName, err)
}
if rows == nil {
// if rows == nil && err != nil, then it means we have queried a fake db.
// In that case, we can safely return true.
return true, nil
}
var isValid bool
if err := db.ScanFirstValue(rows, &isValid); err != nil {
return false, fmt.Errorf("scanning index with name %q: %w", quotedQualifiedIndexName, err)
}

return isValid, nil
}
16 changes: 5 additions & 11 deletions pkg/migrations/op_create_constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ func (o *OpCreateConstraint) Start(ctx context.Context, conn db.DB, latestSchema

switch o.Type {
case OpCreateConstraintTypeUnique:
return table, o.addUniqueIndex(ctx, conn)
temporaryColumnNames := make([]string, len(o.Columns))
for i, col := range o.Columns {
temporaryColumnNames[i] = TemporaryName(col)
}
return table, createUniqueIndexConcurrently(ctx, conn, s.Name, o.Name, o.Table, temporaryColumnNames)
case OpCreateConstraintTypeCheck:
return table, o.addCheckConstraint(ctx, conn)
case OpCreateConstraintTypeForeignKey:
Expand Down Expand Up @@ -232,16 +236,6 @@ func (o *OpCreateConstraint) Validate(ctx context.Context, s *schema.Schema) err
return nil
}

func (o *OpCreateConstraint) addUniqueIndex(ctx context.Context, conn db.DB) error {
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(o.Name),
pq.QuoteIdentifier(o.Table),
strings.Join(quotedTemporaryNames(o.Columns), ", "),
))

return err
}

func (o *OpCreateConstraint) addCheckConstraint(ctx context.Context, conn db.DB) error {
_, err := conn.ExecContext(ctx, fmt.Sprintf("ALTER TABLE %s ADD CONSTRAINT %s CHECK (%s) NOT VALID",
pq.QuoteIdentifier(o.Table),
Expand Down
17 changes: 1 addition & 16 deletions pkg/migrations/op_set_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ func (o *OpSetUnique) Start(ctx context.Context, conn db.DB, latestSchema string
table := s.GetTable(o.Table)
column := table.GetColumn(o.Column)

// Add a unique index to the new column
if err := addUniqueIndex(ctx, conn, table.Name, column.Name, o.Name); err != nil {
return nil, fmt.Errorf("failed to add unique index: %w", err)
}

return table, nil
return table, createUniqueIndexConcurrently(ctx, conn, s.Name, o.Name, table.Name, []string{column.Name})
}

func (o *OpSetUnique) Complete(ctx context.Context, conn db.DB, tr SQLTransformer, s *schema.Schema) error {
Expand Down Expand Up @@ -73,13 +68,3 @@ func (o *OpSetUnique) Validate(ctx context.Context, s *schema.Schema) error {

return nil
}

func addUniqueIndex(ctx context.Context, conn db.DB, table, column, name string) error {
// create unique index concurrently
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS %s ON %s (%s)",
pq.QuoteIdentifier(name),
pq.QuoteIdentifier(table),
pq.QuoteIdentifier(column)))

return err
}

0 comments on commit 070e388

Please sign in to comment.