From 9861d72afd0f56824ac57e4b1b261e5a50af678c Mon Sep 17 00:00:00 2001 From: adetunjii Date: Mon, 1 May 2023 20:57:44 +0100 Subject: [PATCH 1/7] verify message checksum --- internal/wire/msg_body.go | 59 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/internal/wire/msg_body.go b/internal/wire/msg_body.go index 5f64aa9bffde..4c19a9a4715b 100644 --- a/internal/wire/msg_body.go +++ b/internal/wire/msg_body.go @@ -17,8 +17,10 @@ package wire import ( "bufio" "encoding" + "encoding/binary" "errors" "fmt" + "hash/crc32" "io" "github.com/FerretDB/FerretDB/internal/util/lazyerrors" @@ -34,6 +36,9 @@ type MsgBody interface { msgbody() // seal for go-sumtype } +// crc32c checksum byte size +const kCrc32Size = 4 + //go-sumtype:decl MsgBody // ErrZeroRead is returned when zero bytes was read from connection, @@ -44,6 +49,10 @@ var ErrZeroRead = errors.New("zero bytes read") // // Error is (possibly wrapped) ErrZeroRead if zero bytes was read. func ReadMessage(r *bufio.Reader) (*MsgHeader, MsgBody, error) { + if err := verifyChecksum(r); err != nil { + return nil, nil, lazyerrors.Error(err) + } + var header MsgHeader if err := header.readFrom(r); err != nil { return nil, nil, lazyerrors.Error(err) @@ -123,3 +132,53 @@ func WriteMessage(w *bufio.Writer, header *MsgHeader, msg MsgBody) error { return nil } + +// verifyChecksum verifies the checksum of the message it is attached +func verifyChecksum(r *bufio.Reader) error { + // n = MsgHeaderLen + flagbits length + n := MsgHeaderLen + 4 + msgHeader, err := r.Peek(n) + if err != nil { + if err == io.EOF { + return ErrZeroRead + } + + return lazyerrors.Error(err) + } + + msgLen := int(binary.LittleEndian.Uint32(msgHeader[0:4])) + + if msgLen < MsgHeaderLen || msgLen > MaxMsgLen { + return lazyerrors.Errorf("invalid message length %d", msgLen) + } + + b, err := r.Peek(msgLen) + if err != nil { + return lazyerrors.Error(err) + } + + flagbits := OpMsgFlags(binary.LittleEndian.Uint32(msgHeader[MsgHeaderLen:n])) + if flagbits.FlagSet(OpMsgChecksumPresent) { + // remove checksum from the message + actualMsg, checksum := detachChecksum(b) + + if checksum != calculateChecksum(actualMsg) { + return lazyerrors.New("OP_MSG checksum does not match contents") + } + } + + return nil +} + +func detachChecksum(data []byte) ([]byte, uint32) { + msgLen := len(data) + msg := data[:msgLen-kCrc32Size] + checksum := binary.LittleEndian.Uint32(data[msgLen-kCrc32Size:]) + + return msg, checksum +} + +func calculateChecksum(msg []byte) uint32 { + table := crc32.MakeTable(crc32.Castagnoli) + return crc32.Checksum(msg, table) +} From 9b978c3f544bf62bf3f0fc9f6d2309192ed17cec Mon Sep 17 00:00:00 2001 From: adetunjii Date: Sat, 6 May 2023 09:00:44 +0100 Subject: [PATCH 2/7] fix: removed todo tag for closed issue --- integration/update_field_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration/update_field_test.go b/integration/update_field_test.go index e37efa67fc91..6a5a8ec916f9 100644 --- a/integration/update_field_test.go +++ b/integration/update_field_test.go @@ -39,7 +39,6 @@ func TestUpdateFieldSet(t *testing.T) { alt string }{ "ArrayNil": { - // TODO remove https://github.com/FerretDB/FerretDB/issues/1662 id: "string", update: bson.D{{"$set", bson.D{{"v", bson.A{nil}}}}}, expected: bson.D{{"_id", "string"}, {"v", bson.A{nil}}}, @@ -50,7 +49,6 @@ func TestUpdateFieldSet(t *testing.T) { }, }, "SetSameValueInt": { - // TODO remove https://github.com/FerretDB/FerretDB/issues/1662 id: "int32", update: bson.D{{"$set", bson.D{{"v", int32(42)}}}}, expected: bson.D{{"_id", "int32"}, {"v", int32(42)}}, From 6cc869272631ea8962ed4e044de517fe6f592162 Mon Sep 17 00:00:00 2001 From: adetunjii Date: Sat, 6 May 2023 09:46:38 +0100 Subject: [PATCH 3/7] fix: removed todo tags for closed issues --- internal/wire/msg_body.go | 55 --------------------------------------- 1 file changed, 55 deletions(-) diff --git a/internal/wire/msg_body.go b/internal/wire/msg_body.go index 4c19a9a4715b..575137ebc0cc 100644 --- a/internal/wire/msg_body.go +++ b/internal/wire/msg_body.go @@ -17,10 +17,8 @@ package wire import ( "bufio" "encoding" - "encoding/binary" "errors" "fmt" - "hash/crc32" "io" "github.com/FerretDB/FerretDB/internal/util/lazyerrors" @@ -49,9 +47,6 @@ var ErrZeroRead = errors.New("zero bytes read") // // Error is (possibly wrapped) ErrZeroRead if zero bytes was read. func ReadMessage(r *bufio.Reader) (*MsgHeader, MsgBody, error) { - if err := verifyChecksum(r); err != nil { - return nil, nil, lazyerrors.Error(err) - } var header MsgHeader if err := header.readFrom(r); err != nil { @@ -132,53 +127,3 @@ func WriteMessage(w *bufio.Writer, header *MsgHeader, msg MsgBody) error { return nil } - -// verifyChecksum verifies the checksum of the message it is attached -func verifyChecksum(r *bufio.Reader) error { - // n = MsgHeaderLen + flagbits length - n := MsgHeaderLen + 4 - msgHeader, err := r.Peek(n) - if err != nil { - if err == io.EOF { - return ErrZeroRead - } - - return lazyerrors.Error(err) - } - - msgLen := int(binary.LittleEndian.Uint32(msgHeader[0:4])) - - if msgLen < MsgHeaderLen || msgLen > MaxMsgLen { - return lazyerrors.Errorf("invalid message length %d", msgLen) - } - - b, err := r.Peek(msgLen) - if err != nil { - return lazyerrors.Error(err) - } - - flagbits := OpMsgFlags(binary.LittleEndian.Uint32(msgHeader[MsgHeaderLen:n])) - if flagbits.FlagSet(OpMsgChecksumPresent) { - // remove checksum from the message - actualMsg, checksum := detachChecksum(b) - - if checksum != calculateChecksum(actualMsg) { - return lazyerrors.New("OP_MSG checksum does not match contents") - } - } - - return nil -} - -func detachChecksum(data []byte) ([]byte, uint32) { - msgLen := len(data) - msg := data[:msgLen-kCrc32Size] - checksum := binary.LittleEndian.Uint32(data[msgLen-kCrc32Size:]) - - return msg, checksum -} - -func calculateChecksum(msg []byte) uint32 { - table := crc32.MakeTable(crc32.Castagnoli) - return crc32.Checksum(msg, table) -} From 898adb96169b68e7eebad35e1f54b75edcfd7858 Mon Sep 17 00:00:00 2001 From: adetunjii Date: Sat, 6 May 2023 09:48:40 +0100 Subject: [PATCH 4/7] fix: removed todo tags for closed issues --- internal/wire/msg_body.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/wire/msg_body.go b/internal/wire/msg_body.go index 575137ebc0cc..a6cd948fd737 100644 --- a/internal/wire/msg_body.go +++ b/internal/wire/msg_body.go @@ -34,9 +34,6 @@ type MsgBody interface { msgbody() // seal for go-sumtype } -// crc32c checksum byte size -const kCrc32Size = 4 - //go-sumtype:decl MsgBody // ErrZeroRead is returned when zero bytes was read from connection, From e93b955afcea7287139e4ddbc945d9024bde689e Mon Sep 17 00:00:00 2001 From: adetunjii Date: Mon, 8 May 2023 07:58:07 +0100 Subject: [PATCH 5/7] fix: linting error --- internal/wire/msg_body.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/wire/msg_body.go b/internal/wire/msg_body.go index a6cd948fd737..5f64aa9bffde 100644 --- a/internal/wire/msg_body.go +++ b/internal/wire/msg_body.go @@ -44,7 +44,6 @@ var ErrZeroRead = errors.New("zero bytes read") // // Error is (possibly wrapped) ErrZeroRead if zero bytes was read. func ReadMessage(r *bufio.Reader) (*MsgHeader, MsgBody, error) { - var header MsgHeader if err := header.readFrom(r); err != nil { return nil, nil, lazyerrors.Error(err) From 17e185592ca96bf515fe75720925a7c5e0af2057 Mon Sep 17 00:00:00 2001 From: adetunjii Date: Tue, 12 Sep 2023 10:05:44 +0100 Subject: [PATCH 6/7] fix: ConsumeValues in aggreagation stage --- .../common/aggregations/stages/group.go | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/internal/handlers/common/aggregations/stages/group.go b/internal/handlers/common/aggregations/stages/group.go index 03decbbad05d..82785c0f1490 100644 --- a/internal/handlers/common/aggregations/stages/group.go +++ b/internal/handlers/common/aggregations/stages/group.go @@ -118,13 +118,8 @@ func newGroup(stage *types.Document) (aggregations.Stage, error) { // Process implements Stage interface. func (g *group) Process(ctx context.Context, iter types.DocumentsIterator, closer *iterator.MultiCloser) (types.DocumentsIterator, error) { //nolint:lll // for readability - // TODO https://github.com/FerretDB/FerretDB/issues/2863 - docs, err := iterator.ConsumeValues(iterator.Interface[struct{}, *types.Document](iter)) - if err != nil { - return nil, lazyerrors.Error(err) - } - groupedDocuments, err := g.groupDocuments(docs) + groupedDocuments, err := g.groupDocuments(iter) if err != nil { return nil, err } @@ -232,10 +227,20 @@ func validateGroupKey(groupKey any) error { // groupDocuments groups documents into groups using group key. If group key contains expressions // or operators, they are evaluated before using it as the group key of documents. -func (g *group) groupDocuments(in []*types.Document) ([]groupedDocuments, error) { +func (g *group) groupDocuments(iter types.DocumentsIterator) ([]groupedDocuments, error) { var m groupMap - for _, doc := range in { + for { + + _, doc, err := iter.Next() + if errors.Is(err, iterator.ErrIteratorDone) { + break + } + + if err != nil { + return nil, lazyerrors.Error(err) + } + switch groupKey := g.groupExpression.(type) { case *types.Document: val, err := evaluateDocument(groupKey, doc, false) From 506f2783ccbb3724871c2cdda62b98b5e86de0b4 Mon Sep 17 00:00:00 2001 From: adetunjii Date: Tue, 12 Sep 2023 10:29:42 +0100 Subject: [PATCH 7/7] fix: ConsumeValues in aggreagation stage --- internal/handlers/common/aggregations/stages/group.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/handlers/common/aggregations/stages/group.go b/internal/handlers/common/aggregations/stages/group.go index 82785c0f1490..2a35356623ba 100644 --- a/internal/handlers/common/aggregations/stages/group.go +++ b/internal/handlers/common/aggregations/stages/group.go @@ -118,7 +118,6 @@ func newGroup(stage *types.Document) (aggregations.Stage, error) { // Process implements Stage interface. func (g *group) Process(ctx context.Context, iter types.DocumentsIterator, closer *iterator.MultiCloser) (types.DocumentsIterator, error) { //nolint:lll // for readability - groupedDocuments, err := g.groupDocuments(iter) if err != nil { return nil, err @@ -231,7 +230,6 @@ func (g *group) groupDocuments(iter types.DocumentsIterator) ([]groupedDocuments var m groupMap for { - _, doc, err := iter.Next() if errors.Is(err, iterator.ErrIteratorDone) { break