-
Notifications
You must be signed in to change notification settings - Fork 416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reorganise aggregation by deciding how initial document is fetched #3102
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #3102 +/- ##
===========================================
- Coverage 75.26% 25.98% -49.29%
===========================================
Files 400 400
Lines 22386 22436 +50
===========================================
- Hits 16849 5829 -11020
- Misses 4563 16050 +11487
+ Partials 974 557 -417
... and 209 files with indirect coverage changes
Flags with carried forward coverage won't be shown. Click here to find out more. |
// The returned document iterator may be originated from querying database or | ||
// from in-memory value iterator. | ||
// This allows first stage of the pipeline to decide how to create initial document iterator. | ||
FetchDocuments(ctx context.Context, closer *iterator.MultiCloser) (types.DocumentsIterator, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this function is only used for the first stage, do we need it in the stage interface at all? If something doesn't describe a typical stage, it shouldn't be a part of it.
Maybe it's part of another interface that is called something like FirstStage
, or not an interface at all...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe it's part of a "pipeline" interface. E.g. we "feed" the pipeline with some documents' iterator first, and then apply stages on it. And then it's pipeline responsibility/implementation how to fetch these documents from the DB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some thinking... Actually, the current approach looks good to me too. I was confused by the phrase "it is only used if this is the first stage of the pipeline", but actually any stage could be the first stage, so it's correct that this method exists for each stage.
On the other hand, the implementation of this method is exactly the same in most of the cases, so that's another sign to me that maybe this function shouldn't be here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function seems reasonable if the way of fetching documents is different at the first stage for some stages, although I cannot define any of them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid point about the implementation of this FetchDocuments
being mostly the same. For upcoming pipelines, the ones that have different implementations are $documents
, $listLocalSessions
, $listSessions
and existing $collStats
. Would be nice to have base implementation and override it for those special cases.
It cannot be part of pipeline because it would expose specific details about each stage to the pipeline. Let me try a few other things if it's possible to make it not an interface. Yes thanks we should rename this to something better 🤗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chilagrow yeah, I agree with your point. My thought was to somehow "identify" the type of pipeline based on the first stage and then fetch documents based on it, but it doesn't seem better (because then we need to come up with a "pipeline" type which will depend on the first stage, so the implementation doesn't look clear and simple).
So, it looks to me that the current function "How documents are fetched if this stage is the first in the pipeline" is a better solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried your suggestion in another PR #3110
This approach exposes logic related to $collStats
to aggregation package level. But it removes first stage logic from the stages. Let me know what you think 🤗
// Process applies an aggregate stage on documents from iterator. | ||
Process(ctx context.Context, iter types.DocumentsIterator, closer *iterator.MultiCloser) (types.DocumentsIterator, error) | ||
} | ||
|
||
// AggregateQuery is a common interface for fetching from database. | ||
type AggregateQuery interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this interface will grow when we have listSessions
and other stages that need different types of documents. I think it's fine. This way, we have a unified interface to work with different backends.
The approach looks similar to the Backend
, Collection
, and other interfaces that we have in the new handler.
Maybe we can have a similar naming too? And then this could be just Aggregate
/Aggregation
? But I think the naming it's not so critical.
} | ||
|
||
// CollStats describes collection statistics retrieved from the database. | ||
type CollStats struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, if we want a similar naming to how it's called in handler, this should be called CollStatsResult
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach looks good to me, and I don't see a better interface/approach/implementation, so I'm ready to approve.
Maybe the only question is how similar to internal/backends
this new interface should be. If we want more similarity, maybe some naming/structuring needs to be changed. At some point, it looks reasonable to make them similar, because the idea is quite the same - we have a common implementation of aggregation pipelines and all the backends should implement a particular interface to support it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach makes sense to me!
// FirstStage fetches document iterator, it is only used if this is | ||
// the first stage of the pipeline. | ||
// The returned document iterator may be originated from querying database or | ||
// from in-memory value iterator. | ||
// This allows first stage of the pipeline to decide how to create initial document iterator. | ||
FirstStage(ctx context.Context, closer *iterator.MultiCloser) (types.DocumentsIterator, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need that method at all. First of all, it looks suspiciously like a constructor; it is almost as if the io.Reader
interface had NewReader
method. Second, the implication there is that Stage
has access to the data source, but then it is not clear why Process
can't use it.
One option could be to fit everything into the existing Stage
interface with only the Process
method. $collStats
implementation, for example, could ignore incoming iter
and use the data source instead. Then the code that arranges the pipeline (a sequence of stages in this case) could check that the order is correct.
Another option is to say that pipelines are not just a sequence of stages but a combination of zero or one Producer stage and zero or more Processor stages. That way, $collStats
would implement only the Producer
interface, $count
would implement only the Processor interface, and $facet
would implement both. Then the compiler itself could check the order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both are definitely better options. First option being simpler. The second option seems efficient 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using ProducerStage
and ProcessorStage
works quite well, thanks for the suggestion. Updated implementation a bit 🤗
// Process applies an aggregate stage on documents from iterator. | ||
Process(ctx context.Context, iter types.DocumentsIterator, closer *iterator.MultiCloser) (types.DocumentsIterator, error) | ||
} | ||
|
||
// Aggregation is a common interface for fetching from database. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Independently of everything else, that's a bad name given this aggregation does not aggregate anything – it returns original data to be processed and aggregated, and that's pretty much the opposite.
|
||
// ProducerStage is a common interface aggregation stages produce documentiterator. | ||
type ProducerStage interface { | ||
// Produce applies an aggregate stage on documents from iterator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed thanks, it should return a document iterator.
// ProducerStageDataSource is a common interface for fetching from database. | ||
type ProducerStageDataSource interface { | ||
// CollStats fetches collection statistics from the database. | ||
CollStats(ctx context.Context, closer *iterator.MultiCloser) (*CollStatsResult, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That should be ProducerStage
that returns an iterator with one document
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So $collStats
is a ProducerStage
and wants to fetch data from DB and this interface allows access to DB. Maybe I misunderstood and ProducerStage
should be implemented on pg
package? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are mixing stages and what this code called Aggregation before and calls data source now.
Data source allows one to run queries.
ProducerStage produces documents. It is not important for the interface how those documents are produced – are they fetched from the database or just made up.
There are two implementations of the ProducerStage. Both would use a data source. The first one would return all documents from the collection. The second one would produce collection stats. (in the future, $match
stage could implement both ProcessorStage
and ProducerStage
for pushdown).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So hopefully I get this. We want to keep ProducerStageDataSource
or DataSource
to its ProducerStage
. No need to expose it here.
And I'm missing ProducerStage
that returns all documents 😅
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is quite a bit more cleanup I can do, but overall I like it. 🤗
// | ||
// Processing consists of modification of the input document, so it contains all the necessary fields | ||
// Producing consists of modification of the input document, so it contains all the necessary fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, updated 🙏
@@ -29,12 +31,29 @@ import ( | |||
"github.com/FerretDB/FerretDB/internal/util/must" | |||
) | |||
|
|||
// CollStatsDataSource fetches collection statistics from the database. | |||
type CollStatsDataSource interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call it DataSource
. It has only CollStats
for now, it will have Query
or something soon and will be replaced with a backend interface later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, updated this name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this new approach with two interfaces! We no longer duplicate any code, and the idea's easy to understand. I have some minor questions about the usage (I asked one question), but it's not blocking us, we can clarify it later.
// Process applies an aggregate stage on documents from iterator. | ||
Process(ctx context.Context, iter types.DocumentsIterator, closer *iterator.MultiCloser) (types.DocumentsIterator, error) | ||
} | ||
|
||
// ProducerStage is a common interface for aggregation stages that produce document iterator. | ||
type ProducerStage interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this new naming for the interfaces!
iter, err = processStagesStats(ctx, closer, &stagesStatsParams{ | ||
dbPool, db, collection, statistics, collStatsDocuments, | ||
}) | ||
// handle case where there are multiple producer stages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a possible situation - multiple producer stages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, having two consecutive $collStats
stage should return error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct, but that's not what this code does, isn't it?
I think we should return an error and exit early waaay up when we iterate over aggregationStages
@@ -163,10 +162,27 @@ func (h *Handler) MsgAggregate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs | |||
} | |||
|
|||
aggregationStages := must.NotFail(iterator.ConsumeValues(pipeline.Iterator())) | |||
stagesDocuments := make([]aggregations.Stage, 0, len(aggregationStages)) | |||
collStatsDocuments := make([]aggregations.Stage, 0, len(aggregationStages)) | |||
producerStages := make([]aggregations.ProducerStage, 0, len(aggregationStages)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var producerStage aggregations.ProducerStage
As there could be only zero (nil
) or one (non-nil value)
@@ -176,28 +192,27 @@ func (h *Handler) MsgAggregate(ctx context.Context, msg *wire.OpMsg) (*wire.OpMs | |||
) | |||
} | |||
|
|||
var s aggregations.Stage | |||
if _, ok := stages.ProducerStages[d.Command()]; ok { | |||
var s aggregations.ProducerStage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if producerStage != nil {
return nil, errors.New("oops, already have one")
}
Pausing work on this PR for now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving a comment as this PR is on hold
@chilagrow this pull request has merge conflicts. |
Close for now |
Description
Closes #2423.
This PR demonstrates intended design for aggregation pipeline to handle special stage like
$collStats
.$collStats
created a bit of special case in aggregation. It's because it wants to decide if it wants to query DB or not, and if it queries DB, it uses query specific to its need.In this PR, I'm intending to remove special handling for
$collStats
by allowing stage to decide how it's initial document iterator should be fetched. This would be useful for stages that haven't been added such as$documents
stage which does not query DB at all.Do not merge, as this PR is not intended to be merged. It demonstrates the design and that's the purpose. Some tests failing, that's okay.
Readiness checklist
task all
, and it passed.@FerretDB/core
), Labels, Project and project's Sprint fields.