Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

command/cp: add cross-region transfer support #194

Merged
merged 28 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c9ddb46
command/{app, cp}, storage/{s3, s3_test}: added cross-region transfer…
fbarotov Jul 16, 2020
4244867
command/{}, storage/{}, e2e/cp_test: batch run for cross-region transfer
fbarotov Jul 17, 2020
3891977
Merge branch 'master' into transfer
fbarotov Jul 17, 2020
7d53d22
merge and few fixes
fbarotov Jul 22, 2020
ed32853
Merge branch 'master' of https://github.com/peak/s5cmd into transfer
fbarotov Jul 24, 2020
b50bed9
command/, storage/: updates based on pr review
fbarotov Jul 24, 2020
7d1d18f
storage/{s3, s3_test}: updates based on pr review
fbarotov Jul 25, 2020
4ad98e0
command/, readme: updates based on pr review
fbarotov Jul 28, 2020
540b11c
Merge branch 'master' of https://github.com/peak/s5cmd into transfer
fbarotov Jul 28, 2020
4ab4450
command/{app, cp, run}: updates based on pr review
fbarotov Jul 28, 2020
ab3dfb4
command/: updates based on pr review
fbarotov Jul 29, 2020
cb13391
storage/{s3, s3_test}: updates based on pr review
fbarotov Jul 29, 2020
377ed32
command/{app, cp, options}, storage/: updates based on pr review
fbarotov Jul 29, 2020
1b19b3a
storage/s3_test: update based on feedback: more? testcase
fbarotov Aug 4, 2020
053de4f
storage/s3_test: fixed an error from the last commit
fbarotov Aug 4, 2020
4a7730a
storage/ : updates based on pr review
fbarotov Aug 8, 2020
c77c2ce
command/: `infer` source and destination bucket regions
fbarotov Aug 10, 2020
cf9d41e
command/{cp, run}: removed unnecessary flags
fbarotov Aug 10, 2020
2611610
command/ : updates based on pr review
fbarotov Aug 11, 2020
2692fe7
changelog, command/ : some roll-backs
fbarotov Aug 11, 2020
f957e57
merged with remote master
fbarotov Aug 18, 2020
cd0f119
merge with upstream master
fbarotov Sep 30, 2020
4e54be4
storage/storage: removed region from options
fbarotov Oct 1, 2020
080ce0e
storage/s3: updates based on pr review
fbarotov Oct 2, 2020
868569c
storage/s3: updates based on pr review
fbarotov Oct 5, 2020
56a0963
storage/s3: updates based on pr review
fbarotov Oct 17, 2020
339ad77
storage/s3: updates based on pr review
fbarotov Oct 20, 2020
cc3115d
changelog: updates based on pr review
fbarotov Oct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
command/, storage/: updates based on pr review
  • Loading branch information
fbarotov committed Jul 24, 2020
commit b50bed95516ba5faf3bb16076e05ae86a1f0892d
23 changes: 0 additions & 23 deletions command/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/peak/s5cmd/log"
"github.com/peak/s5cmd/parallel"
"github.com/peak/s5cmd/storage"
)

const (
Expand All @@ -19,10 +18,6 @@ const (
appName = "s5cmd"
)

// AppStorageOptions will be overridden by inner command flags
// such as if provided `cp -region` will override s5cmd -region flag value.
var AppStorageOptions storage.StorageOptions

var app = &cli.App{
Name: appName,
Usage: "Blazing fast S3 and local filesystem execution tool",
Expand Down Expand Up @@ -59,19 +54,9 @@ var app = &cli.App{
Name: "install-completion",
Usage: "install completion for your shell",
},
&cli.StringFlag{
Name: "source-region",
Usage: "connect to a specific region of the remote object storage service",
},
&cli.StringFlag{
Name: "region",
Usage: "(global) region of the destination bucket for cp/mv operations; default is source-region",
},
},
Before: func(c *cli.Context) error {
noVerifySSL := c.Bool("no-verify-ssl")
retryCount := c.Int("retry-count")
endpointURL := c.String("endpoint-url")
workerCount := c.Int("numworkers")
printJSON := c.Bool("json")
logLevel := c.String("log")
Expand All @@ -83,14 +68,6 @@ var app = &cli.App{
return fmt.Errorf("retry count cannot be a negative value")
}

AppStorageOptions = storage.StorageOptions{
MaxRetries: retryCount,
Endpoint: endpointURL,
NoVerifySSL: noVerifySSL,
SourceRegion: c.String("source-region"),
DestinationRegion: c.String("region"),
}

return nil
},
Action: func(c *cli.Context) error {
Expand Down
6 changes: 3 additions & 3 deletions command/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ var catCommand = &cli.Command{
return err
}

return Cat(c.Context, src)
return Cat(c.Context, src, storage.NewS3Options(c, true))
},
}

// Cat prints content of given source to standard output.
func Cat(ctx context.Context, src *url.URL) error {
client, err := storage.NewClient(src, AppStorageOptions)
func Cat(ctx context.Context, src *url.URL, s3Opts storage.S3Options) error {
client, err := storage.NewClient(src, s3Opts)
if err != nil {
return err
}
Expand Down
67 changes: 26 additions & 41 deletions command/cp.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,9 @@ var copyCommand = &cli.Command{
Flags: copyCommandFlags,
CustomHelpTemplate: copyHelpTemplate,
Before: func(c *cli.Context) error {
return validate(c, AppStorageOptions)
return validate(c, storage.NewS3Options(c, true))
},
Action: func(c *cli.Context) error {

// get application level values for the flags if not provided
srcRegion := c.String("source-region")
if srcRegion == "" {
srcRegion = AppStorageOptions.SourceRegion
}
dstRegion := c.String("region")
if dstRegion == "" {
dstRegion = AppStorageOptions.DestinationRegion
}

return Copy{
src: c.Args().Get(0),
dst: c.Args().Get(1),
Expand All @@ -179,15 +168,8 @@ var copyCommand = &cli.Command{
encryptionKeyID: c.String("sse-kms-key-id"),
acl: c.String("acl"),

StorageOptions: storage.StorageOptions{
Concurrency: c.Int("concurrency"),
PartSize: c.Int64("part-size") * megabytes,
SourceRegion: srcRegion,
DestinationRegion: dstRegion,
MaxRetries: AppStorageOptions.MaxRetries,
NoVerifySSL: AppStorageOptions.NoVerifySSL,
Endpoint: AppStorageOptions.Endpoint,
},
srcS3opts: storage.NewS3Options(c, true),
dstS3opts: storage.NewS3Options(c, false),
}.Run(c.Context)
},
}
Expand All @@ -213,7 +195,10 @@ type Copy struct {
acl string

// s3 options
storage.StorageOptions
concurrency int
partSize int64
srcS3opts storage.S3Options
dstS3opts storage.S3Options
}

const fdlimitWarning = `
Expand All @@ -234,7 +219,7 @@ func (c Copy) Run(ctx context.Context) error {
return err
}

client, err := storage.NewClient(srcurl, c.StorageOptions)
client, err := storage.NewClient(srcurl, c.srcS3opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -336,7 +321,7 @@ func (c Copy) prepareDownloadTask(
isBatch bool,
) func() error {
return func() error {
dsturl, err := prepareLocalDestination(ctx, srcurl, dsturl, c.flatten, isBatch, c.StorageOptions)
dsturl, err := prepareLocalDestination(ctx, srcurl, dsturl, c.flatten, isBatch, c.dstS3opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -377,11 +362,11 @@ func (c Copy) prepareUploadTask(

// doDownload is used to fetch a remote object and save as a local object.
func (c Copy) doDownload(ctx context.Context, srcurl *url.URL, dsturl *url.URL) error {
srcClient, err := storage.NewClient(srcurl, c.StorageOptions)
srcClient, err := storage.NewClient(srcurl, c.srcS3opts)
if err != nil {
return err
}
dstClient, err := storage.NewClient(dsturl, c.StorageOptions)
dstClient, err := storage.NewClient(dsturl, c.dstS3opts)
if err != nil {
return err
}
Expand All @@ -402,7 +387,7 @@ func (c Copy) doDownload(ctx context.Context, srcurl *url.URL, dsturl *url.URL)
}
defer f.Close()

size, err := srcClient.Get(ctx, srcurl, f, c.Concurrency, c.PartSize)
size, err := srcClient.Get(ctx, srcurl, f, c.concurrency, c.partSize)
if err != nil {
_ = dstClient.Delete(ctx, dsturl)
return err
Expand Down Expand Up @@ -442,7 +427,7 @@ func (c Copy) doUpload(ctx context.Context, srcurl *url.URL, dsturl *url.URL) er
return err
}

dstClient, err := storage.NewClient(dsturl, c.StorageOptions)
dstClient, err := storage.NewClient(dsturl, c.dstS3opts)
if err != nil {
return err
}
Expand All @@ -454,12 +439,12 @@ func (c Copy) doUpload(ctx context.Context, srcurl *url.URL, dsturl *url.URL) er
SetSSEKeyID(c.encryptionKeyID).
SetACL(c.acl)

err = dstClient.Put(ctx, f, dsturl, metadata, c.Concurrency, c.PartSize)
err = dstClient.Put(ctx, f, dsturl, metadata, c.concurrency, c.partSize)
if err != nil {
return err
}

srcClient, err := storage.NewClient(srcurl, c.StorageOptions)
srcClient, err := storage.NewClient(srcurl, c.srcS3opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -490,7 +475,7 @@ func (c Copy) doUpload(ctx context.Context, srcurl *url.URL, dsturl *url.URL) er
}

func (c Copy) doCopy(ctx context.Context, srcurl *url.URL, dsturl *url.URL) error {
srcClient, err := storage.NewClient(srcurl, c.StorageOptions)
srcClient, err := storage.NewClient(srcurl, c.srcS3opts)
sonmezonur marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand Down Expand Up @@ -546,12 +531,12 @@ func (c Copy) shouldOverride(ctx context.Context, srcurl *url.URL, dsturl *url.U
return nil
}

srcObj, err := getObject(ctx, srcurl, c.StorageOptions)
srcObj, err := getObject(ctx, srcurl, c.srcS3opts)
if err != nil {
return err
}

dstObj, err := getObject(ctx, dsturl, c.StorageOptions)
dstObj, err := getObject(ctx, dsturl, c.dstS3opts)
if err != nil {
return err
}
Expand Down Expand Up @@ -614,7 +599,7 @@ func prepareLocalDestination(
dsturl *url.URL,
flatten bool,
isBatch bool,
storageOpts storage.StorageOptions,
s3opts storage.S3Options,
) (*url.URL, error) {
objname := srcurl.Base()
if isBatch && !flatten {
Expand All @@ -627,7 +612,7 @@ func prepareLocalDestination(
}
}

client, err := storage.NewClient(dsturl, storageOpts)
client, err := storage.NewClient(dsturl, s3opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -662,8 +647,8 @@ func prepareLocalDestination(

// getObject checks if the object from given url exists. If no object is
// found, error and returning object would be nil.
func getObject(ctx context.Context, url *url.URL, storageOpts storage.StorageOptions) (*storage.Object, error) {
client, err := storage.NewClient(url, storageOpts)
func getObject(ctx context.Context, url *url.URL, s3opts storage.S3Options) (*storage.Object, error) {
client, err := storage.NewClient(url, s3opts)
if err != nil {
return nil, err
}
Expand All @@ -676,7 +661,7 @@ func getObject(ctx context.Context, url *url.URL, storageOpts storage.StorageOpt
return obj, err
}

func validate(c *cli.Context, storageOpts storage.StorageOptions) error {
func validate(c *cli.Context, srcS3opts storage.S3Options) error {
if c.Args().Len() != 2 {
return fmt.Errorf("expected source and destination arguments")
}
Expand Down Expand Up @@ -715,7 +700,7 @@ func validate(c *cli.Context, storageOpts storage.StorageOptions) error {
case srcurl.Type == dsturl.Type:
return validateCopy(srcurl, dsturl)
case dsturl.IsRemote():
return validateUpload(ctx, srcurl, dsturl, storageOpts)
return validateUpload(ctx, srcurl, dsturl, srcS3opts)
default:
return nil
}
Expand All @@ -730,8 +715,8 @@ func validateCopy(srcurl, dsturl *url.URL) error {
return fmt.Errorf("local->local copy operations are not permitted")
}

func validateUpload(ctx context.Context, srcurl, dsturl *url.URL, storageOpts storage.StorageOptions) error {
srcclient, err := storage.NewClient(srcurl, storageOpts)
func validateUpload(ctx context.Context, srcurl, dsturl *url.URL, srcS3opts storage.S3Options) error {
srcclient, err := storage.NewClient(srcurl, srcS3opts)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion command/du.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var sizeCommand = &cli.Command{
c.Args().First(),
groupByClass,
humanize,
storage.NewS3Options(c, true),
)
},
}
Expand All @@ -73,13 +74,14 @@ func Size(
src string,
groupByClass bool,
humanize bool,
s3opts storage.S3Options,
) error {
srcurl, err := url.New(src)
if err != nil {
return err
}

client, err := storage.NewClient(srcurl, AppStorageOptions)
client, err := storage.NewClient(srcurl, s3opts)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions command/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var listCommand = &cli.Command{
},
Action: func(c *cli.Context) error {
if !c.Args().Present() {
return ListBuckets(c.Context)
return ListBuckets(c.Context, storage.NewS3Options(c, true))
}

showEtag := c.Bool("etag")
Expand All @@ -80,15 +80,16 @@ var listCommand = &cli.Command{
showEtag,
humanize,
showStorageClass,
storage.NewS3Options(c, true),
)
},
}

// ListBuckets prints all buckets.
func ListBuckets(ctx context.Context) error {
func ListBuckets(ctx context.Context, s3opts storage.S3Options) error {
// set as remote storage
url := &url.URL{Type: 0}
client, err := storage.NewClient(url, AppStorageOptions)
client, err := storage.NewClient(url, s3opts)
if err != nil {
return err
}
Expand All @@ -113,13 +114,14 @@ func List(
showEtag bool,
humanize bool,
showStorageClass bool,
s3opts storage.S3Options,
) error {
srcurl, err := url.New(src)
if err != nil {
return err
}

client, err := storage.NewClient(srcurl, AppStorageOptions)
client, err := storage.NewClient(srcurl, s3opts)
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion command/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var makeBucketCommand = &cli.Command{
c.Context,
c.Command.Name,
c.Args().First(),
storage.NewS3Options(c, true),
)
},
}
Expand All @@ -60,13 +61,14 @@ func MakeBucket(
ctx context.Context,
op string,
src string,
s3opts storage.S3Options,
) error {
bucket, err := url.New(src)
if err != nil {
return err
}

client, err := storage.NewClient(bucket, AppStorageOptions)
client, err := storage.NewClient(bucket, s3opts)
if err != nil {
return err
}
Expand Down
Loading