Skip to content

Commit

Permalink
Use transactions in more pgdb functions (#1157)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlekSi authored Sep 22, 2022
1 parent a8ba10a commit 3025fa2
Show file tree
Hide file tree
Showing 9 changed files with 348 additions and 555 deletions.
15 changes: 10 additions & 5 deletions cmd/envtool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"strings"
"time"

"github.com/jackc/pgx/v4"
"github.com/tigrisdata/tigris-client-go/config"
"github.com/tigrisdata/tigris-client-go/driver"
"go.uber.org/zap"
Expand Down Expand Up @@ -216,13 +217,17 @@ func setupPostgres(ctx context.Context, logger *zap.SugaredLogger) error {

logger.Info("Creating databases...")

for _, db := range []string{"admin", "test"} {
if err = pgdb.CreateDatabase(ctx, pgPool, db); err != nil {
return err
err = pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
for _, db := range []string{"admin", "test"} {
if err = pgdb.CreateDatabaseIfNotExists(ctx, tx, db); err != nil {
return err
}
}
}

return nil
return nil
})

return err
}

// setupTigris configures Tigris.
Expand Down
6 changes: 5 additions & 1 deletion internal/handlers/pg/msg_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ func (h *Handler) MsgUpdate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg,
return nil, err
}

created, err := pgdb.CreateCollectionIfNotExist(ctx, h.pgPool, sp.DB, sp.Collection)
var created bool
err = h.pgPool.InTransaction(ctx, func(tx pgx.Tx) error {
created, err = pgdb.CreateCollectionIfNotExist(ctx, tx, sp.DB, sp.Collection)
return err
})
if err != nil {
if errors.Is(pgdb.ErrInvalidTableName, err) ||
errors.Is(pgdb.ErrInvalidDatabaseName, err) {
Expand Down
46 changes: 20 additions & 26 deletions internal/handlers/pg/pgdb/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,8 @@ import (
"github.com/FerretDB/FerretDB/internal/util/must"
)

var (
// Regex validateCollectionNameRe validates collection names.
validateCollectionNameRe = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]{0,119}$")

// Regex validateDatabaseNameRe validates database names.
validateDatabaseNameRe = regexp.MustCompile("^[a-z_][a-z0-9_]{0,62}$")
)
// validateCollectionNameRe validates collection names.
var validateCollectionNameRe = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]{0,119}$")

// Collections returns a sorted list of FerretDB collection names.
//
Expand Down Expand Up @@ -84,7 +79,7 @@ func CollectionExists(ctx context.Context, querier pgxtype.Querier, db, collecti
return slices.Contains(collections, collection), nil
}

// CreateCollection creates a new FerretDB collection in existing schema.
// CreateCollection creates a new FerretDB collection in existing database.
//
// It returns a possibly wrapped error:
// - ErrInvalidTableName - if a FerretDB collection name doesn't conform to restrictions.
Expand Down Expand Up @@ -162,32 +157,31 @@ func CreateCollection(ctx context.Context, querier pgxtype.Querier, db, collecti

// CreateCollectionIfNotExist ensures that given FerretDB database / PostgreSQL schema
// and FerretDB collection / PostgreSQL table exist.
// If needed, it creates both schema and table.
// If needed, it creates both database and collection.
//
// True is returned if table was created.
func CreateCollectionIfNotExist(ctx context.Context, querier pgxtype.Querier, db, collection string) (bool, error) {
exists, err := CollectionExists(ctx, querier, db, collection)
// True is returned if collection was created.
func CreateCollectionIfNotExist(ctx context.Context, tx pgx.Tx, db, collection string) (bool, error) {
exists, err := CollectionExists(ctx, tx, db, collection)
if err != nil {
return false, lazyerrors.Error(err)
}

if exists {
return false, nil
}

// Table (or even schema) does not exist. Try to create it,
// Collection (or even database) does not exist. Try to create them,
// but keep in mind that it can be created in concurrent connection.

if err := CreateDatabase(ctx, querier, db); err != nil && !errors.Is(err, ErrAlreadyExist) {
if err = CreateDatabaseIfNotExists(ctx, tx, db); err != nil {
return false, lazyerrors.Error(err)
}

// TODO use a transaction instead of pgPool: https://github.com/FerretDB/FerretDB/issues/866
if err := CreateCollection(ctx, querier, db, collection); err != nil {
if errors.Is(err, ErrAlreadyExist) {
return false, nil
}
err = CreateCollection(ctx, tx, db, collection)
if errors.Is(err, ErrAlreadyExist) {
return false, nil
}

if err != nil {
return false, lazyerrors.Error(err)
}

Expand All @@ -196,10 +190,10 @@ func CreateCollectionIfNotExist(ctx context.Context, querier pgxtype.Querier, db

// DropCollection drops FerretDB collection.
//
// It returns (possibly wrapped) ErrTableNotExist if schema or table does not exist.
// It returns (possibly wrapped) ErrTableNotExist if database or collection does not exist.
// Please use errors.Is to check the error.
func DropCollection(ctx context.Context, querier pgxtype.Querier, schema, collection string) error {
schemaExists, err := schemaExists(ctx, querier, schema)
func DropCollection(ctx context.Context, querier pgxtype.Querier, db, collection string) error {
schemaExists, err := schemaExists(ctx, querier, db)
if err != nil {
return lazyerrors.Error(err)
}
Expand All @@ -209,15 +203,15 @@ func DropCollection(ctx context.Context, querier pgxtype.Querier, schema, collec
}

table := formatCollectionName(collection)
tables, err := tables(ctx, querier, schema)
tables, err := tables(ctx, querier, db)
if err != nil {
return lazyerrors.Error(err)
}
if !slices.Contains(tables, table) {
return ErrTableNotExist
}

err = removeTableFromSettings(ctx, querier, schema, collection)
err = removeTableFromSettings(ctx, querier, db, collection)
if err != nil && !errors.Is(err, ErrTableNotExist) {
return lazyerrors.Error(err)
}
Expand All @@ -226,7 +220,7 @@ func DropCollection(ctx context.Context, querier pgxtype.Querier, schema, collec
}

// TODO https://github.com/FerretDB/FerretDB/issues/811
sql := `DROP TABLE IF EXISTS ` + pgx.Identifier{schema, table}.Sanitize() + ` CASCADE`
sql := `DROP TABLE IF EXISTS ` + pgx.Identifier{db, table}.Sanitize() + ` CASCADE`
_, err = querier.Exec(ctx, sql)
if err != nil {
return lazyerrors.Error(err)
Expand Down
49 changes: 6 additions & 43 deletions internal/handlers/pg/pgdb/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ package pgdb
import (
"context"
"errors"
"regexp"
"strings"

"github.com/jackc/pgconn"
"github.com/jackc/pgerrcode"
"github.com/jackc/pgtype/pgxtype"
"github.com/jackc/pgx/v4"

"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
)

// Databases returns a sorted list of FerretDB database names / PostgreSQL schema names.
// validateDatabaseNameRe validates FerretDB database / PostgreSQL schema names.
var validateDatabaseNameRe = regexp.MustCompile("^[a-z_][a-z0-9_]{0,62}$")

// Databases returns a sorted list of FerretDB database / PostgreSQL schema.
func Databases(ctx context.Context, tx pgx.Tx) ([]string, error) {
sql := "SELECT schema_name FROM information_schema.schemata ORDER BY schema_name"
rows, err := tx.Query(ctx, sql)
Expand Down Expand Up @@ -56,48 +59,8 @@ func Databases(ctx context.Context, tx pgx.Tx) ([]string, error) {
return res, nil
}

// CreateDatabase creates a new FerretDB database (PostgreSQL schema).
//
// It returns (possibly wrapped):
//
// - ErrAlreadyExist if schema already exist.
// - ErrInvalidDatabaseName if db name doesn't comply with the rules.
//
// Use errors.Is to check the error.
func CreateDatabase(ctx context.Context, querier pgxtype.Querier, db string) error {
if !validateDatabaseNameRe.MatchString(db) ||
strings.HasPrefix(db, reservedPrefix) {
return ErrInvalidDatabaseName
}

_, err := querier.Exec(ctx, `CREATE SCHEMA `+pgx.Identifier{db}.Sanitize())
if err == nil {
err = createSettingsTable(ctx, querier, db)
}

if err == nil {
return nil
}

var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) {
return lazyerrors.Error(err)
}

switch pgErr.Code {
case pgerrcode.DuplicateSchema:
return ErrAlreadyExist
case pgerrcode.UniqueViolation, pgerrcode.DuplicateObject:
// https://www.postgresql.org/message-id/CA+TgmoZAdYVtwBfp1FL2sMZbiHCWT4UPrzRLNnX1Nb30Ku3-gg@mail.gmail.com
// The same thing for schemas. Reproducible by integration tests.
return ErrAlreadyExist
default:
return lazyerrors.Error(err)
}
}

// CreateDatabaseIfNotExists creates a new FerretDB database (PostgreSQL schema).
// If the schema already exists, no error is returned.
// If the schema already exists, no error is returned, and transaction is not aborted.
func CreateDatabaseIfNotExists(ctx context.Context, tx pgx.Tx, db string) error {
if !validateDatabaseNameRe.MatchString(db) ||
strings.HasPrefix(db, reservedPrefix) {
Expand Down
10 changes: 5 additions & 5 deletions internal/handlers/pg/pgdb/pgdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ import "fmt"
// for example, errors.Is(err, ErrSchemaNotExist).
var (
// ErrTableNotExist indicates that there is no such table.
ErrTableNotExist = fmt.Errorf("table does not exist")
ErrTableNotExist = fmt.Errorf("collection/table does not exist")

// ErrSchemaNotExist indicates that there is no such schema.
ErrSchemaNotExist = fmt.Errorf("schema does not exist")
ErrSchemaNotExist = fmt.Errorf("database/schema does not exist")

// ErrAlreadyExist indicates that a schema or table already exists.
ErrAlreadyExist = fmt.Errorf("schema or table already exist")
ErrAlreadyExist = fmt.Errorf("database/schema or collection/table already exist")

// ErrInvalidTableName indicates that a schema or table didn't passed name checks.
ErrInvalidTableName = fmt.Errorf("invalid table name")
ErrInvalidTableName = fmt.Errorf("invalid collection/table name")

// ErrInvalidDatabaseName indicates that a database name didn't passed checks.
ErrInvalidDatabaseName = fmt.Errorf("invalid database name")
ErrInvalidDatabaseName = fmt.Errorf("invalid database/schema name")
)
Loading

1 comment on commit 3025fa2

@vercel
Copy link

@vercel vercel bot commented on 3025fa2 Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

ferret-db – ./

ferret-db-ferretdb.vercel.app
ferret-db.vercel.app
ferret-db-git-main-ferretdb.vercel.app

Please sign in to comment.