-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage/transfermanager): automatically shard downloads #10379
Conversation
Note that this is missing a few components still: - checksums - unit tests for DownloadBuffer - more integration tests (error testing, transcoding...) It does work as-is (tests pass)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good start on this, a few initial comments...
requiredLength := int64(len(p)) + off | ||
|
||
// Our buffer isn't big enough, let's grow it. | ||
if int64(cap(db.bytes)) < requiredLength { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like it could have to grow by a lot of small increments potentially; we should see if that causes problems in profiling (and if so maybe do bigger increments of growth).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's completely possible that that happens (depends on partsize), but I think we also don't want to increment by a lot more than is needed. I was initially thinking to double it as append
does, but that could more easily run into problems with large buffers.
To avoid a lot of small increments, users could pass in a buffer that's already close to the object size to NewDownloadBuffer, or we could schedule the last shard first, so that it grows the buffer all it needs to since the beginning (but that has other potential performance implications). Another option is to give users the ability to configure how much the buffer grows by.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even if partsize is large it still might be possible for many small writes to happen. But yeah profiling is the way to diagnose whether this is an issue.
I think we can add the growth rate later as an exported field, so we can merge as-is and just add that later if need be.
errs := []error{} | ||
var shardOut *DownloadOutput | ||
for i := 1; i < shards; i++ { | ||
// Add monitoring here? This could hang if any individual piece does. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By monitoring do you mean metrics or some kind of go routine to monitor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A go routine, but it might not be necessary as everything should cancel when the ctx does (so timeouts would stop this from hanging)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think as long as we have a test for a hanging part then we can remove the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if there is an easy way to simulate a hanging part, besides that would just be testing that a read request gets cancelled when its context is, which feels out of scope for TM.
Other than that, if we add some logic here then we could test - some options:
- extend the perOpTimeout and stop listening for pieces when done (start a timer with per_op_timeout * num_shards)
- have a default timer for collecting all pieces (probably not this)
- check for ctx cancel directly in here and stop when cancelled, but I don't like that because then it would miss receiving any other errors in shards that were just completing...
- instead of 3 with immediate cancellation, maybe a timer for a couple seconds could start once ctx cancel is received, and we only stop listening once that timer is up
// WithPartSize returns a TransferManagerOption that specifies the size of the | ||
// shards to transfer; that is, if the object is larger than this size, it will | ||
// be uploaded or downloaded in concurrent pieces. | ||
// The default is 32 MiB for downloads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this based on other langs' default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is based off of Python. We should do some perf testing and change this accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also probably set a minimum for partSize, and a way to turn off sharding (set it to zero, or negative?) But that can come in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few more minor comments, but otherwise looks good
@@ -290,6 +293,7 @@ func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error) { | |||
} | |||
|
|||
// DownloadRange specifies the object range. | |||
// Transcoded objects do not support ranged reads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs a little more description, I would copy the note from the NewRangeReader docs https://pkg.go.dev/cloud.google.com/go/storage#ObjectHandle.NewRangeReader
) | ||
|
||
func TestDownloadBuffer(t *testing.T) { | ||
t.Parallel() | ||
// Unit test DownloadBuffer | ||
|
||
// Create without an underlying buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice test!
requiredLength := int64(len(p)) + off | ||
|
||
// Our buffer isn't big enough, let's grow it. | ||
if int64(cap(db.bytes)) < requiredLength { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think even if partsize is large it still might be possible for many small writes to happen. But yeah profiling is the way to diagnose whether this is an issue.
I think we can add the growth rate later as an exported field, so we can merge as-is and just add that later if need be.
errs := []error{} | ||
var shardOut *DownloadOutput | ||
for i := 1; i < shards; i++ { | ||
// Add monitoring here? This could hang if any individual piece does. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think as long as we have a test for a hanging part then we can remove the comment.
// WithPartSize returns a TransferManagerOption that specifies the size of the | ||
// shards to transfer; that is, if the object is larger than this size, it will | ||
// be uploaded or downloaded in concurrent pieces. | ||
// The default is 32 MiB for downloads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also probably set a minimum for partSize, and a way to turn off sharding (set it to zero, or negative?) But that can come in a separate PR.
This is missing a few components that should be added in follow ups: