Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage/transfermanager): automatically shard downloads #10379

Merged
merged 9 commits into from
Jun 25, 2024

Conversation

BrennaEpp
Copy link
Contributor

@BrennaEpp BrennaEpp commented Jun 13, 2024

This is missing a few components that should be added in follow ups:

  • checksums
  • transcoding test

Note that this is missing a few components still:
- checksums
- unit tests for DownloadBuffer
- more integration tests (error testing, transcoding...)

It does work as-is (tests pass)
@product-auto-label product-auto-label bot added the api: storage Issues related to the Cloud Storage API. label Jun 13, 2024
Copy link
Contributor

@tritone tritone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start on this, a few initial comments...

requiredLength := int64(len(p)) + off

// Our buffer isn't big enough, let's grow it.
if int64(cap(db.bytes)) < requiredLength {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it could have to grow by a lot of small increments potentially; we should see if that causes problems in profiling (and if so maybe do bigger increments of growth).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's completely possible that that happens (depends on partsize), but I think we also don't want to increment by a lot more than is needed. I was initially thinking to double it as append does, but that could more easily run into problems with large buffers.

To avoid a lot of small increments, users could pass in a buffer that's already close to the object size to NewDownloadBuffer, or we could schedule the last shard first, so that it grows the buffer all it needs to since the beginning (but that has other potential performance implications). Another option is to give users the ability to configure how much the buffer grows by.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think even if partsize is large it still might be possible for many small writes to happen. But yeah profiling is the way to diagnose whether this is an issue.

I think we can add the growth rate later as an exported field, so we can merge as-is and just add that later if need be.

errs := []error{}
var shardOut *DownloadOutput
for i := 1; i < shards; i++ {
// Add monitoring here? This could hang if any individual piece does.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By monitoring do you mean metrics or some kind of go routine to monitor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A go routine, but it might not be necessary as everything should cancel when the ctx does (so timeouts would stop this from hanging)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think as long as we have a test for a hanging part then we can remove the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there is an easy way to simulate a hanging part, besides that would just be testing that a read request gets cancelled when its context is, which feels out of scope for TM.

Other than that, if we add some logic here then we could test - some options:

  1. extend the perOpTimeout and stop listening for pieces when done (start a timer with per_op_timeout * num_shards)
  2. have a default timer for collecting all pieces (probably not this)
  3. check for ctx cancel directly in here and stop when cancelled, but I don't like that because then it would miss receiving any other errors in shards that were just completing...
  4. instead of 3 with immediate cancellation, maybe a timer for a couple seconds could start once ctx cancel is received, and we only stop listening once that timer is up

storage/transfermanager/downloader.go Show resolved Hide resolved
storage/transfermanager/downloader.go Show resolved Hide resolved
storage/transfermanager/downloader_test.go Show resolved Hide resolved
// WithPartSize returns a TransferManagerOption that specifies the size of the
// shards to transfer; that is, if the object is larger than this size, it will
// be uploaded or downloaded in concurrent pieces.
// The default is 32 MiB for downloads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this based on other langs' default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is based off of Python. We should do some perf testing and change this accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also probably set a minimum for partSize, and a way to turn off sharding (set it to zero, or negative?) But that can come in a separate PR.

storage/transfermanager/downloader.go Show resolved Hide resolved
@BrennaEpp BrennaEpp marked this pull request as ready for review June 19, 2024 08:21
@BrennaEpp BrennaEpp requested review from a team as code owners June 19, 2024 08:21
Copy link
Contributor

@tritone tritone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few more minor comments, but otherwise looks good

@@ -290,6 +293,7 @@ func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error) {
}

// DownloadRange specifies the object range.
// Transcoded objects do not support ranged reads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs a little more description, I would copy the note from the NewRangeReader docs https://pkg.go.dev/cloud.google.com/go/storage#ObjectHandle.NewRangeReader

storage/transfermanager/downloader.go Show resolved Hide resolved
)

func TestDownloadBuffer(t *testing.T) {
t.Parallel()
// Unit test DownloadBuffer

// Create without an underlying buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test!

requiredLength := int64(len(p)) + off

// Our buffer isn't big enough, let's grow it.
if int64(cap(db.bytes)) < requiredLength {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think even if partsize is large it still might be possible for many small writes to happen. But yeah profiling is the way to diagnose whether this is an issue.

I think we can add the growth rate later as an exported field, so we can merge as-is and just add that later if need be.

storage/transfermanager/downloader.go Outdated Show resolved Hide resolved
errs := []error{}
var shardOut *DownloadOutput
for i := 1; i < shards; i++ {
// Add monitoring here? This could hang if any individual piece does.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think as long as we have a test for a hanging part then we can remove the comment.

storage/transfermanager/downloader.go Show resolved Hide resolved
// WithPartSize returns a TransferManagerOption that specifies the size of the
// shards to transfer; that is, if the object is larger than this size, it will
// be uploaded or downloaded in concurrent pieces.
// The default is 32 MiB for downloads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also probably set a minimum for partSize, and a way to turn off sharding (set it to zero, or negative?) But that can come in a separate PR.

@BrennaEpp BrennaEpp requested a review from tritone June 24, 2024 21:54
@tritone tritone added the automerge Merge the pull request once unit tests and other checks pass. label Jun 25, 2024
@BrennaEpp BrennaEpp added the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Jun 25, 2024
@kokoro-team kokoro-team removed the kokoro:force-run Add this label to force Kokoro to re-run the tests. label Jun 25, 2024
@gcf-merge-on-green gcf-merge-on-green bot merged commit 05816f9 into googleapis:main Jun 25, 2024
8 checks passed
@gcf-merge-on-green gcf-merge-on-green bot removed the automerge Merge the pull request once unit tests and other checks pass. label Jun 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the Cloud Storage API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants