Skip to content

Commit

Permalink
Support capped collection cleanup (#3831)
Browse files Browse the repository at this point in the history
Closes #3459.

Co-authored-by: Alexey Palazhchenko <alexey.palazhchenko@gmail.com>
  • Loading branch information
Elena Grahovac and AlekSi authored Dec 14, 2023
1 parent bff4306 commit b2b0aaf
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 38 deletions.
14 changes: 10 additions & 4 deletions cmd/ferretdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ var cli struct {

DisablePushdown bool `default:"false" help:"Experimental: disable pushdown."`
EnableOplog bool `default:"false" help:"Experimental: enable capped collections, tailable cursors and OpLog." hidden:""`
EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication." hidden:""`
CappedCleanup struct {
Interval time.Duration `default:"1m" help:"Experimental: capped collections cleanup interval." hidden:""`
Percentage uint8 `default:"10" help:"Experimental: percentage of documents to cleanup." hidden:""`
} `embed:"" prefix:"capped-cleanup-"`
EnableNewAuth bool `default:"false" help:"Experimental: enable new authentication." hidden:""`

//nolint:lll // for readability
Telemetry struct {
Expand Down Expand Up @@ -397,9 +401,11 @@ func run() {
MySQLURL: mySQLFlags.MySQLURL,

TestOpts: registry.TestOpts{
DisablePushdown: cli.Test.DisablePushdown,
EnableOplog: cli.Test.EnableOplog,
EnableNewAuth: cli.Test.EnableNewAuth,
DisablePushdown: cli.Test.DisablePushdown,
EnableOplog: cli.Test.EnableOplog,
CappedCleanupInterval: cli.Test.CappedCleanup.Interval,
CappedCleanupPercentage: cli.Test.CappedCleanup.Percentage,
EnableNewAuth: cli.Test.EnableNewAuth,
},
})
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions ferretdb/ferretdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func New(config *Config) (*FerretDB, error) {
PostgreSQLURL: config.PostgreSQLURL,

SQLiteURL: config.SQLiteURL,

TestOpts: registry.TestOpts{
CappedCleanupPercentage: 10, // handler expects it to be a non-zero value
},
})
if err != nil {
return nil, fmt.Errorf("failed to construct handler: %s", err)
Expand Down
82 changes: 82 additions & 0 deletions integration/commands_administration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,88 @@ func TestCommandsAdministrationCompactForce(t *testing.T) {
}
}

func TestCommandsAdministrationCompactCapped(t *testing.T) {
t.Parallel()

ctx, coll := setup.Setup(t)

for name, tc := range map[string]struct { //nolint:vet // for readability
force bool
maxDocuments int64
sizeInBytes int64
insertDocuments int32
expectedDocuments int64 // insertDocuments - insertDocuments*0.2 (cleanup 20%) + 1 (extra insert after compact)

skipForMongoDB string // optional, skip test for MongoDB backend with a specific reason
}{
"OverflowDocs": {
force: true,
maxDocuments: 10,
sizeInBytes: 100000,
insertDocuments: 12, // overflows capped collection max documents
expectedDocuments: 11,
},
"OverflowSize": {
force: true,
maxDocuments: 1000,
sizeInBytes: 256,
insertDocuments: 20, // overflows capped collection size
expectedDocuments: 17,
},
"ForceFalse": {
force: false,
maxDocuments: 10,
sizeInBytes: 100000,
insertDocuments: 12, // overflows capped collection max documents
expectedDocuments: 11,
skipForMongoDB: "Only {force:true} can be run on active replica set primary",
},
} {
name, tc := name, tc
t.Run(name, func(t *testing.T) {
if tc.skipForMongoDB != "" {
setup.SkipForMongoDB(t, tc.skipForMongoDB)
}

t.Parallel()

collName := testutil.CollectionName(t) + name

opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(tc.sizeInBytes).SetMaxDocuments(tc.maxDocuments)
err := coll.Database().CreateCollection(ctx, collName, opts)
require.NoError(t, err)

collection := coll.Database().Collection(collName)

arr, _ := GenerateDocuments(0, tc.insertDocuments)
_, err = collection.InsertMany(ctx, arr)
require.NoError(t, err)

count, err := collection.CountDocuments(ctx, bson.D{})
require.NoError(t, err)
require.InDelta(t, int64(tc.insertDocuments), count, 2)

var res bson.D
err = collection.Database().RunCommand(ctx,
bson.D{{"compact", collection.Name()}, {"force", tc.force}},
).Decode(&res)
require.NoError(t, err)

doc := ConvertDocument(t, res)
assert.Equal(t, float64(1), must.NotFail(doc.Get("ok")))
assert.NotNil(t, must.NotFail(doc.Get("bytesFreed")))

// some documents should be removed from capped collection after the insertion
_, err = collection.InsertOne(ctx, bson.D{{"foo", "bar"}})
require.NoError(t, err)

count, err = collection.CountDocuments(ctx, bson.D{})
require.NoError(t, err)
require.InDelta(t, tc.expectedDocuments, count, 1)
})
}
}

func TestCommandsAdministrationCompactErrors(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 5 additions & 3 deletions integration/setup/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ func setupListener(tb testtb.TB, ctx context.Context, logger *zap.Logger) string
HANAURL: *hanaURLF,

TestOpts: registry.TestOpts{
DisablePushdown: *disablePushdownF,
EnableOplog: true,
EnableNewAuth: false,
DisablePushdown: *disablePushdownF,
EnableOplog: true,
CappedCleanupPercentage: 20,
CappedCleanupInterval: 0,
EnableNewAuth: false,
},
}
h, closeBackend, err := registry.NewHandler(handler, handlerOpts)
Expand Down
Loading

0 comments on commit b2b0aaf

Please sign in to comment.