Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement metadata storage #2656

Merged
merged 41 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP
  • Loading branch information
Dmitry committed May 17, 2023
commit 0c8d1f8d6cc0945efc94f1b1417e9813f1653e4e
15 changes: 12 additions & 3 deletions internal/backends/sqlite/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type NewBackendParams struct {

// NewBackend creates a new SQLite backend.
func NewBackend(params *NewBackendParams) (backends.Backend, error) {
// TODO: we should close it when backend is closed.
pool := newConnPool()

storage, err := newMetadataStorage(params.Dir, pool)
Expand All @@ -53,14 +52,24 @@ func NewBackend(params *NewBackendParams) (backends.Backend, error) {

// Database implements backends.Backend interface.
func (b *backend) Database(ctx context.Context, params *backends.DatabaseParams) backends.Database {
return newDatabase(b)
return newDatabase(params.Name, b)
}

// ListDatabases implements backends.Backend interface.
//
//nolint:lll // for readability
func (b *backend) ListDatabases(ctx context.Context, params *backends.ListDatabasesParams) (*backends.ListDatabasesResult, error) {
panic("not implemented") // TODO: Implement
list, err := b.metadataStorage.ListDatabases()
if err != nil {
return nil, err
}

var result backends.ListDatabasesResult
for _, db := range list {
result.Databases = append(result.Databases, backends.DatabaseInfo{Name: db})
}

return &result, nil
}

// DropDatabase implements backends.Backend interface.
Expand Down
22 changes: 19 additions & 3 deletions internal/backends/sqlite/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (

// database implements backends.Database interface.
type database struct {
name string

b *backend
}

// newDatabase creates a new Database.
func newDatabase(b *backend) backends.Database {
func newDatabase(name string, b *backend) backends.Database {
return backends.DatabaseContract(&database{
b: b,
b: b,
name: name,
})
}

Expand All @@ -41,7 +44,20 @@ func (db *database) Collection(params *backends.CollectionParams) backends.Colle
//
//nolint:lll // for readability
func (db *database) ListCollections(ctx context.Context, params *backends.ListCollectionsParams) (*backends.ListCollectionsResult, error) {
panic("not implemented") // TODO: Implement
list, err := db.b.metadataStorage.ListCollections(ctx, db.name)
if err != nil {
return nil, err
}

var result backends.ListCollectionsResult

for _, name := range list {
result.Collections = append(result.Collections, backends.CollectionInfo{
Name: name,
})
}

return &result, nil
}

// CreateCollection implements backends.Database interface.
Expand Down
2 changes: 1 addition & 1 deletion internal/backends/sqlite/metadata_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (m *metadataStorage) load(ctx context.Context, dbName string) (*dbInfo, err
// TODO: check error
defer result.Close()

var metadata = dbInfo{
metadata := dbInfo{
collections: map[string]string{},
}

Expand Down
21 changes: 16 additions & 5 deletions internal/backends/sqlite/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,29 @@ import (
"database/sql"
"errors"
"sync"

"github.com/FerretDB/FerretDB/internal/util/resource"
)

// newConnPool creates a new connection pool.
func newConnPool() *connPool {
return &connPool{
mx: sync.Mutex{},
dbs: map[string]*sql.DB{},
pool := &connPool{
mx: sync.Mutex{},
dbs: map[string]*sql.DB{},
token: resource.NewToken(),
}

resource.Track(pool, pool.token)

return pool
}

// connPool is a pool of database connections.
type connPool struct {
dbs map[string]*sql.DB
type connPool struct { //nolint:vet // for readability
mx sync.Mutex
dbs map[string]*sql.DB

token *resource.Token
}

// DB returns a database connection for the given name.
Expand Down Expand Up @@ -66,5 +75,7 @@ func (c *connPool) Close() error {
}
}

resource.Untrack(c, c.token)

return errs
}
38 changes: 37 additions & 1 deletion internal/handlers/sqlite/cmd_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,47 @@ package sqlite

import (
"context"
"fmt"
"time"

"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/handlers/commonerrors"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
)

// CmdQuery implements HandlerInterface.
func (h *Handler) CmdQuery(ctx context.Context, query *wire.OpQuery) (*wire.OpReply, error) {
return nil, notImplemented(query.Query.Command())
if query.FullCollectionName == "admin.$cmd" {
switch cmd := query.Query.Command(); cmd {
case "ismaster", "isMaster": // both are valid
reply := &wire.OpReply{
NumberReturned: 1,
Documents: []*types.Document{must.NotFail(types.NewDocument(
"ismaster", true, // only lowercase
// topologyVersion
"maxBsonObjectSize", int32(types.MaxDocumentLen),
"maxMessageSizeBytes", int32(wire.MaxMsgLen),
"maxWriteBatchSize", int32(100000),
"localTime", time.Now(),
// logicalSessionTimeoutMinutes
"connectionId", int32(42),
"minWireVersion", common.MinWireVersion,
"maxWireVersion", common.MaxWireVersion,
"readOnly", false,
"ok", float64(1),
))},
}
return reply, nil

default:
msg := fmt.Sprintf("CmdQuery: unhandled command %q", cmd)
return nil, commonerrors.NewCommandErrorMsg(commonerrors.ErrNotImplemented, msg)
}
}

msg := fmt.Sprintf("CmdQuery: unhandled collection %q", query.FullCollectionName)

return nil, commonerrors.NewCommandErrorMsg(commonerrors.ErrNotImplemented, msg)
}
23 changes: 22 additions & 1 deletion internal/handlers/sqlite/msg_ismaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,33 @@ package sqlite

import (
"context"
"time"

"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
)

// MsgIsMaster implements HandlerInterface.
func (h *Handler) MsgIsMaster(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, error) {
return nil, notImplemented(must.NotFail(msg.Document()).Command())
var reply wire.OpMsg
must.NoError(reply.SetSections(wire.OpMsgSection{
Documents: []*types.Document{must.NotFail(types.NewDocument(
"ismaster", true, // only lowercase
// topologyVersion
"maxBsonObjectSize", int32(types.MaxDocumentLen),
"maxMessageSizeBytes", int32(wire.MaxMsgLen),
"maxWriteBatchSize", int32(100000),
"localTime", time.Now(),
// logicalSessionTimeoutMinutes
// connectionId
"minWireVersion", common.MinWireVersion,
"maxWireVersion", common.MaxWireVersion,
"readOnly", false,
"ok", float64(1),
))},
}))

return &reply, nil
}
41 changes: 40 additions & 1 deletion internal/handlers/sqlite/msg_listcollections.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,50 @@ package sqlite
import (
"context"

"github.com/FerretDB/FerretDB/internal/backends"
"github.com/FerretDB/FerretDB/internal/handlers/common"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
)

// MsgListCollections implements HandlerInterface.
func (h *Handler) MsgListCollections(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, error) {
return nil, notImplemented(must.NotFail(msg.Document()).Command())
document, err := msg.Document()
if err != nil {
return nil, lazyerrors.Error(err)
}

db, err := common.GetRequiredParam[string](document, "$db")
if err != nil {
return nil, err
}

result, err := h.b.Database(ctx, &backends.DatabaseParams{Name: db}).ListCollections(ctx, nil)

collections := types.MakeArray(len(result.Collections))
for _, col := range result.Collections {
d := must.NotFail(types.NewDocument(
"name", col.Name,
"type", "collection",
))

collections.Append(d)
}

var reply wire.OpMsg

must.NoError(reply.SetSections(wire.OpMsgSection{
Documents: []*types.Document{must.NotFail(types.NewDocument(
"cursor", must.NotFail(types.NewDocument(
"id", int64(0),
"ns", db+".$cmd.listCollections",
"firstBatch", collections,
)),
"ok", float64(1),
))},
}))

return &reply, nil
}
31 changes: 30 additions & 1 deletion internal/handlers/sqlite/msg_listdatabases.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,40 @@ package sqlite
import (
"context"

"github.com/FerretDB/FerretDB/internal/backends"
"github.com/FerretDB/FerretDB/internal/types"
"github.com/FerretDB/FerretDB/internal/util/lazyerrors"
"github.com/FerretDB/FerretDB/internal/util/must"
"github.com/FerretDB/FerretDB/internal/wire"
)

// MsgListDatabases implements HandlerInterface.
func (h *Handler) MsgListDatabases(ctx context.Context, msg *wire.OpMsg) (*wire.OpMsg, error) {
return nil, notImplemented(must.NotFail(msg.Document()).Command())
response, err := h.b.ListDatabases(ctx, &backends.ListDatabasesParams{})
if err != nil {
return nil, lazyerrors.Error(err)
}

databases := types.MakeArray(len(response.Databases))

for _, db := range response.Databases {
databases.Append(types.NewDocument(
"name", db.Name,
"sizeOnDisk", int64(0),
"empty", false,
))
}

var reply wire.OpMsg

must.NoError(reply.SetSections(wire.OpMsgSection{
Documents: []*types.Document{must.NotFail(types.NewDocument(
"databases", databases,
"totalSize", int64(0),
"totalSizeMb", int64(0),
"ok", float64(1),
))},
}))

return &reply, nil
}