Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement metadata storage #2656

Merged
merged 41 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,13 @@ issues:
path: cmd/envtool
text: pgdb

# only `pg` handler can import `sjson` package, no other handler can do that
# only `pg` handler and `sqlite` backend can import `sjson` package, no other handlers or backends can do that
- linters: [depguard]
path: internal/handlers/pg
text: sjson
- linters: [depguard]
path: internal/backends/sqlite
test: sjson

# only `tigris` handler can import `tigrisdb` package, no other handler can do that
- linters: [depguard]
Expand Down
7 changes: 5 additions & 2 deletions internal/backends/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ func CollectionContract(c Collection) Collection {

// InsertParams represents the parameters of Collection.Insert method.
type InsertParams struct {
Docs types.DocumentsIterator
Docs *types.Array
Ordered bool
}

// InsertResult represents the results of Collection.Insert method.
type InsertResult struct{}
type InsertResult struct {
Errors []error
InsertedCount int64
}

// Insert inserts documents into the collection.
//
Expand Down
42 changes: 35 additions & 7 deletions internal/backends/sqlite/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package sqlite

import (
"context"
"os"
"path/filepath"

_ "modernc.org/sqlite"

Expand All @@ -24,7 +26,9 @@ import (

// backend implements backends.Backend interface.
type backend struct {
dir string
dir string
pool *connPool
metadataStorage *metadataStorage
}

// NewBackendParams represents the parameters of NewBackend function.
Expand All @@ -33,15 +37,24 @@ type NewBackendParams struct {
}

// NewBackend creates a new SQLite backend.
func NewBackend(params *NewBackendParams) backends.Backend {
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
pool := newConnPool(params.Dir)

storage, err := newMetadataStorage(params.Dir, pool)
if err != nil {
return nil, err
}

return backends.BackendContract(&backend{
dir: params.Dir,
})
dir: params.Dir,
pool: pool,
metadataStorage: storage,
}), nil
}

// Close implements backends.Backend interface.
func (b *backend) Close() {
panic("not implemented") // TODO: Implement
b.pool.Close()
}

// Database implements backends.Backend interface.
Expand All @@ -53,12 +66,27 @@ func (b *backend) Database(name string) backends.Database {
//
//nolint:lll // for readability
func (b *backend) ListDatabases(ctx context.Context, params *backends.ListDatabasesParams) (*backends.ListDatabasesResult, error) {
panic("not implemented") // TODO: Implement
list, err := b.metadataStorage.listDatabases()
if err != nil {
return nil, err
}

var result backends.ListDatabasesResult
for _, db := range list {
result.Databases = append(result.Databases, backends.DatabaseInfo{Name: db})
}

return &result, nil
}

// DropDatabase implements backends.Backend interface.
func (b *backend) DropDatabase(ctx context.Context, params *backends.DropDatabaseParams) error {
panic("not implemented") // TODO: Implement
err := os.Remove(filepath.Join(b.dir, params.Name+dbExtension))
if err != nil && !os.IsNotExist(err) {
return err
}

return nil
}

// check interfaces
Expand Down
77 changes: 76 additions & 1 deletion internal/backends/sqlite/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@ package sqlite

import (
"context"
"errors"
"fmt"

"github.com/FerretDB/FerretDB/internal/backends"
"github.com/FerretDB/FerretDB/internal/handlers/sjson"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/iterator"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
)

// collection implements backends.Collection interface.
Expand All @@ -36,7 +42,76 @@ func newCollection(db *database, name string) backends.Collection {

// Insert implements backends.Collection interface.
func (c *collection) Insert(ctx context.Context, params *backends.InsertParams) (*backends.InsertResult, error) {
panic("not implemented") // TODO: Implement
err := c.db.CreateCollection(ctx, &backends.CreateCollectionParams{Name: c.name})
if err != nil {
return nil, err
}

conn, err := c.db.b.pool.DB(c.db.name)
if err != nil {
return nil, err
}

table, err := c.db.b.metadataStorage.tableName(ctx, c.db.name, c.name)
if err != nil {
return nil, err
}

tx, err := conn.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
// TODO: check error
defer tx.Rollback()

var inserted int64

iter := params.Docs.Iterator()
defer iter.Close()

for {
var val any

_, val, err = iter.Next()
if errors.Is(err, iterator.ErrIteratorDone) {
break
}

if err != nil {
return nil, err
}

doc, ok := val.(*types.Document)
if !ok {
return nil, lazyerrors.Errorf("expected document, got %T", val)
}

query := fmt.Sprintf(`INSERT INTO %s (sjson) VALUES (?)`, table)

var bytes []byte

bytes, err = sjson.Marshal(doc)
if err != nil {
return nil, err
}

_, err = tx.ExecContext(ctx, query, bytes)
if err != nil {
return nil, err
}

inserted++
}

err = tx.Commit()
if err != nil {
return nil, err
}

return &backends.InsertResult{
InsertedCount: inserted,
Errors: []error{},
}, nil
}

// check interfaces
Expand Down
100 changes: 97 additions & 3 deletions internal/backends/sqlite/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package sqlite

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/FerretDB/FerretDB/internal/backends"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
)

// database implements backends.Database interface.
Expand Down Expand Up @@ -48,17 +52,107 @@ func (db *database) Collection(name string) backends.Collection {
//
//nolint:lll // for readability
func (db *database) ListCollections(ctx context.Context, params *backends.ListCollectionsParams) (*backends.ListCollectionsResult, error) {
panic("not implemented") // TODO: Implement
var result backends.ListCollectionsResult

exists, err := db.b.metadataStorage.dbExists(db.name)
if err != nil {
return nil, lazyerrors.Error(err)
}

if !exists {
return &result, nil
}

list, err := db.b.metadataStorage.listCollections(ctx, db.name)
if err != nil {
return nil, err
}

for _, name := range list {
result.Collections = append(result.Collections, backends.CollectionInfo{
Name: name,
})
}

return &result, nil
}

// CreateCollection implements backends.Database interface.
func (db *database) CreateCollection(ctx context.Context, params *backends.CreateCollectionParams) error {
panic("not implemented") // TODO: Implement
exists, err := db.b.metadataStorage.dbExists(db.name)
if err != nil {
return lazyerrors.Error(err)
}

if !exists {
if err = db.create(ctx); err != nil {
return lazyerrors.Error(err)
}
}

tableName, err := db.b.metadataStorage.createCollection(ctx, db.name, params.Name)
if err != nil {
return err
}

conn, err := db.b.pool.DB(db.name)
if err != nil {
return err
}

query := fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (sjson string)", tableName)

_, err = conn.ExecContext(ctx, query)
if err != nil {
return err
}

return nil
}

// DropCollection implements backends.Database interface.
func (db *database) DropCollection(ctx context.Context, params *backends.DropCollectionParams) error {
panic("not implemented") // TODO: Implement
table, err := db.b.metadataStorage.tableName(ctx, db.name, params.Name)
if err != nil {
return err
}

err = db.b.metadataStorage.removeCollection(ctx, db.name, params.Name)
if err != nil {
return err
}

conn, err := db.b.pool.DB(db.name)
if err != nil {
return err
}

query := fmt.Sprintf("DROP TABLE %s", table)

_, err = conn.ExecContext(ctx, query)
if err != nil {
return err
}

return nil
}

func (db *database) create(ctx context.Context) error {
f, err := os.Create(filepath.Join(db.b.dir, db.name+dbExtension))
if err != nil {
return err
}

if err = f.Close(); err != nil {
return err
}

err = db.b.metadataStorage.createDatabase(ctx, db.name)
if err != nil {
return err
}

return nil
}

// check interfaces
Expand Down
Loading