From c0201395cd2960629c29201a19ad710354d6e7b4 Mon Sep 17 00:00:00 2001 From: Patryk Kwiatek Date: Wed, 13 Dec 2023 16:52:53 +0100 Subject: [PATCH] Add tests for tailable cursors (#3833) --- integration/cursors/getmore_test.go | 51 +---- integration/cursors/helpers.go | 75 +++++++ integration/cursors/tailable_test.go | 309 ++++++++++++++++++++++++++- 3 files changed, 385 insertions(+), 50 deletions(-) create mode 100644 integration/cursors/helpers.go diff --git a/integration/cursors/getmore_test.go b/integration/cursors/getmore_test.go index b5f9f0a48760..67ee6b9f8b16 100644 --- a/integration/cursors/getmore_test.go +++ b/integration/cursors/getmore_test.go @@ -335,22 +335,7 @@ func TestCursorsGetMoreCommand(t *testing.T) { err := collection.Database().RunCommand(ctx, command).Decode(&res) require.NoError(t, err) - doc := integration.ConvertDocument(t, res) - - v, _ := doc.Get("cursor") - require.NotNil(t, v) - - cursor, ok := v.(*types.Document) - require.True(t, ok) - - cursorID, _ := cursor.Get("id") - assert.NotNil(t, cursorID) - - v, _ = cursor.Get("firstBatch") - require.NotNil(t, v) - - firstBatch, ok := v.(*types.Array) - require.True(t, ok) + firstBatch, cursorID := getFirstBatch(t, res) require.Equal(t, len(tc.firstBatch), firstBatch.Len(), "expected: %v, got: %v", tc.firstBatch, firstBatch) for i, elem := range tc.firstBatch { @@ -382,22 +367,7 @@ func TestCursorsGetMoreCommand(t *testing.T) { integration.AssertEqualAltCommandError(t, *tc.err, tc.altMessage, err) // upon error response contains firstBatch field. - doc = integration.ConvertDocument(t, res) - - v, _ = doc.Get("cursor") - require.NotNil(t, v) - - cursor, ok = v.(*types.Document) - require.True(t, ok) - - cursorID, _ = cursor.Get("id") - assert.NotNil(t, cursorID) - - v, _ = cursor.Get("firstBatch") - require.NotNil(t, v) - - firstBatch, ok = v.(*types.Array) - require.True(t, ok) + firstBatch, _ = getFirstBatch(t, res) require.Equal(t, len(tc.firstBatch), firstBatch.Len(), "expected: %v, got: %v", tc.firstBatch, firstBatch) for i, elem := range tc.firstBatch { @@ -409,22 +379,7 @@ func TestCursorsGetMoreCommand(t *testing.T) { require.NoError(t, err) - doc = integration.ConvertDocument(t, res) - - v, _ = doc.Get("cursor") - require.NotNil(t, v) - - cursor, ok = v.(*types.Document) - require.True(t, ok) - - cursorID, _ = cursor.Get("id") - assert.NotNil(t, cursorID) - - v, _ = cursor.Get("nextBatch") - require.NotNil(t, v) - - nextBatch, ok := v.(*types.Array) - require.True(t, ok) + nextBatch, _ := getNextBatch(t, res) require.Equal(t, len(tc.nextBatch), nextBatch.Len(), "expected: %v, got: %v", tc.nextBatch, nextBatch) for i, elem := range tc.nextBatch { diff --git a/integration/cursors/helpers.go b/integration/cursors/helpers.go new file mode 100644 index 000000000000..8c2bb2c8e93f --- /dev/null +++ b/integration/cursors/helpers.go @@ -0,0 +1,75 @@ +// Copyright 2021 FerretDB Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cursors + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/bson" + + "github.com/FerretDB/FerretDB/integration" + "github.com/FerretDB/FerretDB/internal/types" + "github.com/FerretDB/FerretDB/internal/util/testutil/testtb" +) + +// getFirstBatch takes the response from the query that generates the cursors, +// validates if it contains cursor.firstBatch, and cursor ID, and returns those. +func getFirstBatch(t testtb.TB, res bson.D) (*types.Array, any) { + t.Helper() + + doc := integration.ConvertDocument(t, res) + + v, _ := doc.Get("cursor") + require.NotNil(t, v) + + cursor, ok := v.(*types.Document) + require.True(t, ok) + + cursorID, _ := cursor.Get("id") + assert.NotNil(t, cursorID) + + v, _ = cursor.Get("firstBatch") + require.NotNil(t, v) + + firstBatch, ok := v.(*types.Array) + require.True(t, ok) + + return firstBatch, cursorID +} + +// getNextBatch takes the response from the getMore query, +// validates if it contains cursor.nextBatch, and cursor ID, and returns those. +func getNextBatch(t testtb.TB, res bson.D) (*types.Array, any) { + t.Helper() + + doc := integration.ConvertDocument(t, res) + + v, _ := doc.Get("cursor") + require.NotNil(t, v) + + cursor, ok := v.(*types.Document) + require.True(t, ok) + + cursorID, _ := cursor.Get("id") + assert.NotNil(t, cursorID) + + v, _ = cursor.Get("nextBatch") + require.NotNil(t, v) + + firstBatch, ok := v.(*types.Array) + require.True(t, ok) + + return firstBatch, cursorID +} diff --git a/integration/cursors/tailable_test.go b/integration/cursors/tailable_test.go index 9a4318f63bcd..3117db790772 100644 --- a/integration/cursors/tailable_test.go +++ b/integration/cursors/tailable_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -25,9 +26,13 @@ import ( "github.com/FerretDB/FerretDB/integration" "github.com/FerretDB/FerretDB/integration/setup" "github.com/FerretDB/FerretDB/integration/shareddata" + "github.com/FerretDB/FerretDB/internal/types" + "github.com/FerretDB/FerretDB/internal/util/must" + "github.com/FerretDB/FerretDB/internal/util/testutil/testfail" + "github.com/FerretDB/FerretDB/internal/util/testutil/testtb" ) -func TestCursorsTailable(t *testing.T) { +func TestCursorsTailableErrors(t *testing.T) { t.Parallel() t.Run("NonCapped", func(t *testing.T) { @@ -41,11 +46,311 @@ func TestCursorsTailable(t *testing.T) { Code: 2, Name: "BadValue", Message: "error processing query: " + - "ns=TestCursorsTailable-NonCapped.TestCursorsTailable-NonCappedTree: $and\nSort: {}\nProj: {}\n " + + "ns=TestCursorsTailableErrors-NonCapped.TestCursorsTailableErrors-NonCappedTree: $and\nSort: {}\nProj: {}\n " + "tailable cursor requested on non capped collection", } integration.AssertEqualAltCommandError(t, expected, "tailable cursor requested on non capped collection", err) assert.Nil(t, cursor) } }) + + t.Run("GetMoreDifferentCollection", func(tt *testing.T) { + tt.Parallel() + + var t testtb.TB = tt + + if !setup.IsMongoDB(tt) { + t = testfail.Expected(tt, "https://github.com/FerretDB/FerretDB/issues/2283") + } + + s := setup.SetupWithOpts(t, nil) + + db, ctx := s.Collection.Database(), s.Ctx + + opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(10000) + err := db.CreateCollection(s.Ctx, t.Name(), opts) + require.NoError(t, err) + + collection := db.Collection(t.Name()) + + bsonArr, arr := integration.GenerateDocuments(0, 3) + + _, err = collection.InsertMany(ctx, bsonArr) + require.NoError(tt, err) + + var cursorID any + + findCmd := bson.D{ + {"find", collection.Name()}, + {"batchSize", 1}, + {"tailable", true}, + } + + var res bson.D + err = collection.Database().RunCommand(ctx, findCmd).Decode(&res) + require.NoError(t, err) + + var firstBatch *types.Array + firstBatch, cursorID = getFirstBatch(t, res) + + expectedFirstBatch := integration.ConvertDocuments(t, arr[:1]) + require.Equal(t, len(expectedFirstBatch), firstBatch.Len()) + require.Equal(t, expectedFirstBatch[0], must.NotFail(firstBatch.Get(0))) + + getMoreCmd := bson.D{ + {"getMore", cursorID}, + {"collection", "different-collection"}, + {"batchSize", 1}, + } + + err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res) + + expected := mongo.CommandError{ + Code: 13, + Name: "Unauthorized", + Message: "Requested getMore on namespace 'TestCursorsTailableErrors-GetMoreDifferentCollection.different-collection', " + + "but cursor belongs to a different namespace " + + "TestCursorsTailableErrors-GetMoreDifferentCollection.TestCursorsTailableErrors/GetMoreDifferentCollection", + } + integration.AssertEqualCommandError(t, expected, err) + + // Check if cursor is not closed after the error + err = collection.Database().RunCommand(ctx, bson.D{ + {"getMore", cursorID}, + {"collection", collection.Name()}, + {"batchSize", 1}, + }).Decode(&res) + + require.NoError(t, err) + + nextBatch, nextID := getNextBatch(t, res) + require.Equal(t, cursorID, nextID) + + doc, _ := nextBatch.Get(0) + require.NotNil(t, doc) + }) +} + +func TestCursorsTailable(t *testing.T) { + t.Parallel() + + s := setup.SetupWithOpts(t, nil) + + db, ctx := s.Collection.Database(), s.Ctx + + opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(10000) + err := db.CreateCollection(s.Ctx, t.Name(), opts) + require.NoError(t, err) + + collection := db.Collection(t.Name()) + + bsonArr, arr := integration.GenerateDocuments(0, 3) + + _, err = collection.InsertMany(ctx, bsonArr) + require.NoError(t, err) + + var cursorID any + + t.Run("FirstBatch", func(tt *testing.T) { + var t testtb.TB = tt + if !setup.IsMongoDB(tt) { + t = testfail.Expected(tt, "https://github.com/FerretDB/FerretDB/issues/2283") + } + + cmd := bson.D{ + {"find", collection.Name()}, + {"batchSize", 1}, + {"tailable", true}, + } + + var res bson.D + err = collection.Database().RunCommand(ctx, cmd).Decode(&res) + require.NoError(t, err) + + var firstBatch *types.Array + firstBatch, cursorID = getFirstBatch(t, res) + + expectedFirstBatch := integration.ConvertDocuments(t, arr[:1]) + require.Equal(t, len(expectedFirstBatch), firstBatch.Len()) + require.Equal(t, expectedFirstBatch[0], must.NotFail(firstBatch.Get(0))) + }) + + getMoreCmd := bson.D{ + {"getMore", cursorID}, + {"collection", collection.Name()}, + {"batchSize", 1}, + } + + t.Run("GetMore", func(tt *testing.T) { + var t testtb.TB = tt + if !setup.IsMongoDB(tt) { + t = testfail.Expected(tt, "https://github.com/FerretDB/FerretDB/issues/2283") + } + for i := 0; i < 2; i++ { + var res bson.D + err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res) + require.NoError(t, err) + + nextBatch, nextID := getNextBatch(t, res) + expectedNextBatch := integration.ConvertDocuments(t, arr[i+1:i+2]) + + assert.Equal(t, cursorID, nextID) + + require.Equal(t, len(expectedNextBatch), nextBatch.Len()) + require.Equal(t, expectedNextBatch[0], must.NotFail(nextBatch.Get(0))) + } + }) + + t.Run("GetMoreEmpty", func(tt *testing.T) { + var t testtb.TB = tt + if !setup.IsMongoDB(tt) { + t = testfail.Expected(tt, "https://github.com/FerretDB/FerretDB/issues/2283") + } + var res bson.D + err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res) + require.NoError(t, err) + + nextBatch, nextID := getNextBatch(t, res) + require.Equal(t, 0, nextBatch.Len()) + assert.Equal(t, cursorID, nextID) + }) + + t.Run("GetMoreNewDoc", func(tt *testing.T) { + var t testtb.TB = tt + if !setup.IsMongoDB(tt) { + t = testfail.Expected(tt, "https://github.com/FerretDB/FerretDB/issues/2283") + } + newDoc := bson.D{{"_id", "new"}} + _, err = collection.InsertOne(ctx, newDoc) + require.NoError(t, err) + + var res bson.D + err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res) + require.NoError(t, err) + + nextBatch, nextID := getNextBatch(t, res) + + assert.Equal(t, cursorID, nextID) + + require.Equal(t, 1, nextBatch.Len()) + require.Equal(t, integration.ConvertDocument(tt, newDoc), must.NotFail(nextBatch.Get(0))) + }) + + t.Run("GetMoreEmptyAfterInsertion", func(tt *testing.T) { + var t testtb.TB = tt + if !setup.IsMongoDB(tt) { + t = testfail.Expected(tt, "https://github.com/FerretDB/FerretDB/issues/2283") + } + var res bson.D + err = collection.Database().RunCommand(ctx, getMoreCmd).Decode(&res) + require.NoError(t, err) + + nextBatch, nextID := getNextBatch(t, res) + require.Equal(t, 0, nextBatch.Len()) + assert.Equal(t, cursorID, nextID) + }) +} + +func TestCursorsTailableTwoCursorsSameCollection(t *testing.T) { + t.Parallel() + + var tt testtb.TB = t + if !setup.IsMongoDB(tt) { + tt = testfail.Expected(t, "https://github.com/FerretDB/FerretDB/issues/2283") + } + + s := setup.SetupWithOpts(tt, nil) + + db, ctx := s.Collection.Database(), s.Ctx + + opts := options.CreateCollection().SetCapped(true).SetSizeInBytes(10000) + err := db.CreateCollection(s.Ctx, tt.Name(), opts) + require.NoError(tt, err) + + collection := db.Collection(tt.Name()) + + bsonArr, arr := integration.GenerateDocuments(0, 50) + + _, err = collection.InsertMany(ctx, bsonArr) + require.NoError(tt, err) + + var cursorID1, cursorID2 any + + cmd := bson.D{ + {"find", collection.Name()}, + {"batchSize", 1}, + {"tailable", true}, + } + + var res bson.D + + err = collection.Database().RunCommand(ctx, cmd).Decode(&res) + require.NoError(tt, err) + + var firstBatch1 *types.Array + firstBatch1, cursorID1 = getFirstBatch(tt, res) + + err = collection.Database().RunCommand(ctx, cmd).Decode(&res) + require.NoError(tt, err) + + var firstBatch2 *types.Array + firstBatch2, cursorID2 = getFirstBatch(tt, res) + + expectedFirstBatch := integration.ConvertDocuments(tt, arr[:1]) + + require.Equal(tt, len(expectedFirstBatch), firstBatch1.Len()) + require.Equal(tt, expectedFirstBatch[0], must.NotFail(firstBatch1.Get(0))) + + require.Equal(tt, len(expectedFirstBatch), firstBatch2.Len()) + require.Equal(tt, expectedFirstBatch[0], must.NotFail(firstBatch2.Get(0))) + + getMoreCmd1 := bson.D{ + {"getMore", cursorID1}, + {"collection", collection.Name()}, + {"batchSize", 1}, + } + + getMoreCmd2 := bson.D{ + {"getMore", cursorID2}, + {"collection", collection.Name()}, + {"batchSize", 1}, + } + + for i := 0; i < 49; i++ { + err = collection.Database().RunCommand(ctx, getMoreCmd1).Decode(&res) + require.NoError(tt, err) + + nextBatch1, nextID1 := getNextBatch(tt, res) + + err = collection.Database().RunCommand(ctx, getMoreCmd2).Decode(&res) + require.NoError(tt, err) + + nextBatch2, nextID2 := getNextBatch(tt, res) + + expectedNextBatch := integration.ConvertDocuments(tt, arr[i+1:i+2]) + + assert.Equal(tt, cursorID1, nextID1) + require.Equal(tt, len(expectedNextBatch), nextBatch1.Len()) + require.Equal(tt, expectedNextBatch[0], must.NotFail(nextBatch1.Get(0))) + + assert.Equal(tt, cursorID2, nextID2) + require.Equal(tt, len(expectedNextBatch), nextBatch2.Len()) + require.Equal(tt, expectedNextBatch[0], must.NotFail(nextBatch2.Get(0))) + } + + err = collection.Database().RunCommand(ctx, getMoreCmd1).Decode(&res) + require.NoError(tt, err) + + nextBatch1, nextID1 := getNextBatch(tt, res) + + err = collection.Database().RunCommand(ctx, getMoreCmd2).Decode(&res) + require.NoError(tt, err) + + nextBatch2, nextID2 := getNextBatch(tt, res) + + require.Equal(tt, 0, nextBatch1.Len()) + assert.Equal(tt, cursorID1, nextID1) + + require.Equal(tt, 0, nextBatch2.Len()) + assert.Equal(tt, cursorID2, nextID2) }