Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganise aggregation by deciding how initial document is fetched #3102

Closed
wants to merge 15 commits into from

Conversation

chilagrow
Copy link
Member

@chilagrow chilagrow commented Jul 24, 2023

Description

Closes #2423.

This PR demonstrates intended design for aggregation pipeline to handle special stage like $collStats.

$collStats created a bit of special case in aggregation. It's because it wants to decide if it wants to query DB or not, and if it queries DB, it uses query specific to its need.

In this PR, I'm intending to remove special handling for $collStats by allowing stage to decide how it's initial document iterator should be fetched. This would be useful for stages that haven't been added such as $documents stage which does not query DB at all.

Do not merge, as this PR is not intended to be merged. It demonstrates the design and that's the purpose. Some tests failing, that's okay.

Readiness checklist

  • I added/updated unit tests (and they pass).
  • I added/updated integration/compatibility tests (and they pass).
  • I added/updated comments and checked rendering.
  • I made spot refactorings.
  • I updated user documentation.
  • I ran task all, and it passed.
  • I ensured that PR title is good enough for the changelog.
  • (for maintainers only) I set Reviewers (@FerretDB/core), Labels, Project and project's Sprint fields.
  • I marked all done items in this checklist.

@chilagrow chilagrow added code/chore Code maintenance improvements do not merge PRs that should not be merged labels Jul 24, 2023
@chilagrow chilagrow self-assigned this Jul 24, 2023
@codecov
Copy link

codecov bot commented Jul 24, 2023

Codecov Report

Merging #3102 (3f38c30) into main (a71936d) will decrease coverage by 49.29%.
The diff coverage is 0.00%.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff             @@
##             main    #3102       +/-   ##
===========================================
- Coverage   75.26%   25.98%   -49.29%     
===========================================
  Files         400      400               
  Lines       22386    22436       +50     
===========================================
- Hits        16849     5829    -11020     
- Misses       4563    16050    +11487     
+ Partials      974      557      -417     
Files Changed Coverage Δ
.../handlers/common/aggregations/stages/add_fields.go 0.00% <0.00%> (-75.00%) ⬇️
...l/handlers/common/aggregations/stages/collstats.go 0.00% <0.00%> (-85.25%) ⬇️
...ernal/handlers/common/aggregations/stages/count.go 0.00% <0.00%> (-100.00%) ⬇️
...l/handlers/common/aggregations/stages/documents.go 0.00% <0.00%> (ø)
...ernal/handlers/common/aggregations/stages/group.go 0.00% <0.00%> (-82.30%) ⬇️
...ernal/handlers/common/aggregations/stages/limit.go 0.00% <0.00%> (-78.58%) ⬇️
...ernal/handlers/common/aggregations/stages/match.go 0.00% <0.00%> (-100.00%) ⬇️
...nal/handlers/common/aggregations/stages/project.go 0.00% <0.00%> (-100.00%) ⬇️
...nternal/handlers/common/aggregations/stages/set.go 0.00% <0.00%> (-75.00%) ⬇️
...ternal/handlers/common/aggregations/stages/skip.go 0.00% <0.00%> (-78.58%) ⬇️
... and 6 more

... and 209 files with indirect coverage changes

Flag Coverage Δ
hana ?
integration 5.24% <0.00%> (-66.46%) ⬇️
mongodb 5.24% <0.00%> (-0.02%) ⬇️
pg ∅ <ø> (∅)
shard-1 5.24% <0.00%> (-51.50%) ⬇️
shard-2 ∅ <ø> (∅)
shard-3 ∅ <ø> (∅)
sqlite ∅ <ø> (∅)
unit 24.42% <0.00%> (-0.09%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

@chilagrow chilagrow changed the title Reorganise aggregation Reorganise aggregation by each stage deciding how initial document is fetched Jul 24, 2023
@chilagrow chilagrow marked this pull request as ready for review July 24, 2023 10:45
@chilagrow chilagrow requested a review from a team as a code owner July 24, 2023 10:45
@chilagrow chilagrow requested review from AlekSi, rumyantseva, a team, quasilyte and noisersup July 24, 2023 10:45
// The returned document iterator may be originated from querying database or
// from in-memory value iterator.
// This allows first stage of the pipeline to decide how to create initial document iterator.
FetchDocuments(ctx context.Context, closer *iterator.MultiCloser) (types.DocumentsIterator, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this function is only used for the first stage, do we need it in the stage interface at all? If something doesn't describe a typical stage, it shouldn't be a part of it.

Maybe it's part of another interface that is called something like FirstStage, or not an interface at all...

Copy link
Contributor

@rumyantseva rumyantseva Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe it's part of a "pipeline" interface. E.g. we "feed" the pipeline with some documents' iterator first, and then apply stages on it. And then it's pipeline responsibility/implementation how to fetch these documents from the DB.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some thinking... Actually, the current approach looks good to me too. I was confused by the phrase "it is only used if this is the first stage of the pipeline", but actually any stage could be the first stage, so it's correct that this method exists for each stage.

On the other hand, the implementation of this method is exactly the same in most of the cases, so that's another sign to me that maybe this function shouldn't be here...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems reasonable if the way of fetching documents is different at the first stage for some stages, although I cannot define any of them

Copy link
Member Author

@chilagrow chilagrow Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point about the implementation of this FetchDocuments being mostly the same. For upcoming pipelines, the ones that have different implementations are $documents, $listLocalSessions, $listSessions and existing $collStats. Would be nice to have base implementation and override it for those special cases.

It cannot be part of pipeline because it would expose specific details about each stage to the pipeline. Let me try a few other things if it's possible to make it not an interface. Yes thanks we should rename this to something better 🤗

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chilagrow yeah, I agree with your point. My thought was to somehow "identify" the type of pipeline based on the first stage and then fetch documents based on it, but it doesn't seem better (because then we need to come up with a "pipeline" type which will depend on the first stage, so the implementation doesn't look clear and simple).

So, it looks to me that the current function "How documents are fetched if this stage is the first in the pipeline" is a better solution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried your suggestion in another PR #3110
This approach exposes logic related to $collStats to aggregation package level. But it removes first stage logic from the stages. Let me know what you think 🤗

// Process applies an aggregate stage on documents from iterator.
Process(ctx context.Context, iter types.DocumentsIterator, closer *iterator.MultiCloser) (types.DocumentsIterator, error)
}

// AggregateQuery is a common interface for fetching from database.
type AggregateQuery interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this interface will grow when we have listSessions and other stages that need different types of documents. I think it's fine. This way, we have a unified interface to work with different backends.

The approach looks similar to the Backend, Collection, and other interfaces that we have in the new handler.
Maybe we can have a similar naming too? And then this could be just Aggregate/Aggregation? But I think the naming it's not so critical.

}

// CollStats describes collection statistics retrieved from the database.
type CollStats struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, if we want a similar naming to how it's called in handler, this should be called CollStatsResult.

rumyantseva
rumyantseva previously approved these changes Jul 25, 2023
Copy link
Contributor

@rumyantseva rumyantseva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach looks good to me, and I don't see a better interface/approach/implementation, so I'm ready to approve.

Maybe the only question is how similar to internal/backends this new interface should be. If we want more similarity, maybe some naming/structuring needs to be changed. At some point, it looks reasonable to make them similar, because the idea is quite the same - we have a common implementation of aggregation pipelines and all the backends should implement a particular interface to support it.

noisersup
noisersup previously approved these changes Jul 25, 2023
Copy link
Member

@noisersup noisersup left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach makes sense to me!

Comment on lines 27 to 32
// FirstStage fetches document iterator, it is only used if this is
// the first stage of the pipeline.
// The returned document iterator may be originated from querying database or
// from in-memory value iterator.
// This allows first stage of the pipeline to decide how to create initial document iterator.
FirstStage(ctx context.Context, closer *iterator.MultiCloser) (types.DocumentsIterator, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need that method at all. First of all, it looks suspiciously like a constructor; it is almost as if the io.Reader interface had NewReader method. Second, the implication there is that Stage has access to the data source, but then it is not clear why Process can't use it.

One option could be to fit everything into the existing Stage interface with only the Process method. $collStats implementation, for example, could ignore incoming iter and use the data source instead. Then the code that arranges the pipeline (a sequence of stages in this case) could check that the order is correct.

Another option is to say that pipelines are not just a sequence of stages but a combination of zero or one Producer stage and zero or more Processor stages. That way, $collStats would implement only the Producer interface, $count would implement only the Processor interface, and $facet would implement both. Then the compiler itself could check the order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are definitely better options. First option being simpler. The second option seems efficient 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using ProducerStage and ProcessorStage works quite well, thanks for the suggestion. Updated implementation a bit 🤗

// Process applies an aggregate stage on documents from iterator.
Process(ctx context.Context, iter types.DocumentsIterator, closer *iterator.MultiCloser) (types.DocumentsIterator, error)
}

// Aggregation is a common interface for fetching from database.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Independently of everything else, that's a bad name given this aggregation does not aggregate anything – it returns original data to be processed and aggregated, and that's pretty much the opposite.

@chilagrow chilagrow requested a review from AlekSi July 28, 2023 08:28
@chilagrow chilagrow changed the title Reorganise aggregation by each stage deciding how initial document is fetched Reorganise aggregation by deciding how initial document is fetched Jul 28, 2023

// ProducerStage is a common interface aggregation stages produce documentiterator.
type ProducerStage interface {
// Produce applies an aggregate stage on documents from iterator.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not correct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed thanks, it should return a document iterator.

// ProducerStageDataSource is a common interface for fetching from database.
type ProducerStageDataSource interface {
// CollStats fetches collection statistics from the database.
CollStats(ctx context.Context, closer *iterator.MultiCloser) (*CollStatsResult, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be ProducerStage that returns an iterator with one document

Copy link
Member Author

@chilagrow chilagrow Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So $collStats is a ProducerStage and wants to fetch data from DB and this interface allows access to DB. Maybe I misunderstood and ProducerStage should be implemented on pg package? 🤔

Copy link
Member

@AlekSi AlekSi Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are mixing stages and what this code called Aggregation before and calls data source now.

Data source allows one to run queries.

ProducerStage produces documents. It is not important for the interface how those documents are produced – are they fetched from the database or just made up.

There are two implementations of the ProducerStage. Both would use a data source. The first one would return all documents from the collection. The second one would produce collection stats. (in the future, $match stage could implement both ProcessorStage and ProducerStage for pushdown).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So hopefully I get this. We want to keep ProducerStageDataSource or DataSource to its ProducerStage. No need to expose it here.

And I'm missing ProducerStage that returns all documents 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is quite a bit more cleanup I can do, but overall I like it. 🤗

@AlekSi AlekSi removed the request for review from quasilyte July 28, 2023 09:26
//
// Processing consists of modification of the input document, so it contains all the necessary fields
// Producing consists of modification of the input document, so it contains all the necessary fields
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not correct

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated 🙏

@@ -29,12 +31,29 @@ import (
"github.com/FerretDB/FerretDB/internal/util/must"
)

// CollStatsDataSource fetches collection statistics from the database.
type CollStatsDataSource interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call it DataSource. It has only CollStats for now, it will have Query or something soon and will be replaced with a backend interface later

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, updated this name.

rumyantseva
rumyantseva previously approved these changes Jul 31, 2023
Copy link
Contributor

@rumyantseva rumyantseva left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this new approach with two interfaces! We no longer duplicate any code, and the idea's easy to understand. I have some minor questions about the usage (I asked one question), but it's not blocking us, we can clarify it later.

// Process applies an aggregate stage on documents from iterator.
Process(ctx context.Context, iter types.DocumentsIterator, closer *iterator.MultiCloser) (types.DocumentsIterator, error)
}

// ProducerStage is a common interface for aggregation stages that produce document iterator.
type ProducerStage interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this new naming for the interfaces!

iter, err = processStagesStats(ctx, closer, &stagesStatsParams{
dbPool, db, collection, statistics, collStatsDocuments,
})
// handle case where there are multiple producer stages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a possible situation - multiple producer stages?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, having two consecutive $collStats stage should return error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, but that's not what this code does, isn't it?

I think we should return an error and exit early waaay up when we iterate over aggregationStages

@@ -163,10 +162,27 @@ func (h *Handler) MsgAggregate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs
}

aggregationStages := must.NotFail(iterator.ConsumeValues(pipeline.Iterator()))
stagesDocuments := make([]aggregations.Stage, 0, len(aggregationStages))
collStatsDocuments := make([]aggregations.Stage, 0, len(aggregationStages))
producerStages := make([]aggregations.ProducerStage, 0, len(aggregationStages))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

var producerStage aggregations.ProducerStage

As there could be only zero (nil) or one (non-nil value)

@@ -176,28 +192,27 @@ func (h *Handler) MsgAggregate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs
)
}

var s aggregations.Stage
if _, ok := stages.ProducerStages[d.Command()]; ok {
var s aggregations.ProducerStage
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if producerStage != nil {
  return nil, errors.New("oops, already have one")
}

@chilagrow chilagrow marked this pull request as draft July 31, 2023 10:06
@chilagrow
Copy link
Member Author

Pausing work on this PR for now

Copy link
Member

@noisersup noisersup left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving a comment as this PR is on hold

@mergify
Copy link
Contributor

mergify bot commented Oct 17, 2023

@chilagrow this pull request has merge conflicts.

@mergify mergify bot added the conflict PRs that have merge conflicts label Oct 17, 2023
@chilagrow
Copy link
Member Author

Close for now

@chilagrow chilagrow closed this Oct 30, 2023
@mergify mergify bot removed the conflict PRs that have merge conflicts label Oct 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
code/chore Code maintenance improvements do not merge PRs that should not be merged
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

Move $collStats logic to its stage implementation
4 participants