Skip to content

Commit

Permalink
Wrap media uploads in TimeoutReader
Browse files Browse the repository at this point in the history
  • Loading branch information
prasmussen committed Feb 20, 2016
1 parent 308c7dc commit a9e9da7
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 10 deletions.
14 changes: 10 additions & 4 deletions drive/sync_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,12 @@ func (self *Drive) uploadMissingFile(parentId string, lf *LocalFile, args Upload
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))

// Wrap file in progress reader
srcReader := getProgressReader(srcFile, args.Progress, lf.info.Size())
progressReader := getProgressReader(srcFile, args.Progress, lf.info.Size())

_, err = self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum").Media(srcReader, chunkSize).Do()
// Wrap reader in timeout reader
reader, ctx := getTimeoutReaderContext(progressReader)

_, err = self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum").Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
if isBackendError(err) && try < MaxBackendErrorRetries {
exponentialBackoffSleep(try)
Expand Down Expand Up @@ -341,9 +344,12 @@ func (self *Drive) updateChangedFile(cf *changedFile, args UploadSyncArgs, try i
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))

// Wrap file in progress reader
srcReader := getProgressReader(srcFile, args.Progress, cf.local.info.Size())
progressReader := getProgressReader(srcFile, args.Progress, cf.local.info.Size())

// Wrap reader in timeout reader
reader, ctx := getTimeoutReaderContext(progressReader)

_, err = self.service.Files.Update(cf.remote.file.Id, dstFile).Media(srcReader, chunkSize).Do()
_, err = self.service.Files.Update(cf.remote.file.Id, dstFile).Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
if isBackendError(err) && try < MaxBackendErrorRetries {
exponentialBackoffSleep(try)
Expand Down
5 changes: 5 additions & 0 deletions drive/timeout_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
const MaxIdleTimeout = time.Second * 120
const TimeoutTimerInterval = time.Second * 10

func getTimeoutReaderContext(r io.Reader) (io.Reader, context.Context) {
ctx, cancel := context.WithCancel(context.TODO())
return getTimeoutReader(r, cancel), ctx
}

func getTimeoutReader(r io.Reader, cancel context.CancelFunc) io.Reader {
return &TimeoutReader{
reader: r,
Expand Down
7 changes: 5 additions & 2 deletions drive/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ func (self *Drive) Update(args UpdateArgs) error {
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))

// Wrap file in progress reader
srcReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())
progressReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())

// Wrap reader in timeout reader
reader, ctx := getTimeoutReaderContext(progressReader)

fmt.Fprintf(args.Out, "Uploading %s\n", args.Path)
started := time.Now()

f, err := self.service.Files.Update(args.Id, dstFile).Fields("id", "name", "size").Media(srcReader, chunkSize).Do()
f, err := self.service.Files.Update(args.Id, dstFile).Fields("id", "name", "size").Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
return fmt.Errorf("Failed to upload file: %s", err)
}
Expand Down
14 changes: 10 additions & 4 deletions drive/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,15 @@ func (self *Drive) uploadFile(args UploadArgs) (*drive.File, int64, error) {
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))

// Wrap file in progress reader
srcReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())
progressReader := getProgressReader(srcFile, args.Progress, srcFileInfo.Size())

// Wrap reader in timeout reader
reader, ctx := getTimeoutReaderContext(progressReader)

fmt.Fprintf(args.Out, "Uploading %s\n", args.Path)
started := time.Now()

f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum", "webContentLink").Media(srcReader, chunkSize).Do()
f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "md5Checksum", "webContentLink").Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
return nil, 0, fmt.Errorf("Failed to upload file: %s", err)
}
Expand Down Expand Up @@ -205,12 +208,15 @@ func (self *Drive) UploadStream(args UploadStreamArgs) error {
chunkSize := googleapi.ChunkSize(int(args.ChunkSize))

// Wrap file in progress reader
srcReader := getProgressReader(args.In, args.Progress, 0)
progressReader := getProgressReader(args.In, args.Progress, 0)

// Wrap reader in timeout reader
reader, ctx := getTimeoutReaderContext(progressReader)

fmt.Fprintf(args.Out, "Uploading %s\n", dstFile.Name)
started := time.Now()

f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "webContentLink").Media(srcReader, chunkSize).Do()
f, err := self.service.Files.Create(dstFile).Fields("id", "name", "size", "webContentLink").Context(ctx).Media(reader, chunkSize).Do()
if err != nil {
return fmt.Errorf("Failed to upload file: %s", err)
}
Expand Down

0 comments on commit a9e9da7

Please sign in to comment.