Skip to content

Commit

Permalink
cloud backends: fix SFTP error message for some write failures
Browse files Browse the repository at this point in the history
Fixes drakkan#119

Signed-off-by: Nicola Murino <nicola.murino@gmail.com>
  • Loading branch information
drakkan committed May 19, 2020
1 parent a08dd85 commit cfa7100
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 9 deletions.
9 changes: 7 additions & 2 deletions sftpd/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func TestReadWriteErrors(t *testing.T) {
assert.NoError(t, err)
transfer = Transfer{
readerAt: nil,
writerAt: w,
writerAt: vfs.NewPipeWriter(w),
start: time.Now(),
bytesSent: 0,
bytesReceived: 0,
Expand All @@ -334,8 +334,13 @@ func TestReadWriteErrors(t *testing.T) {
}
err = r.Close()
assert.NoError(t, err)
errFake := fmt.Errorf("fake upload error")
go func() {
time.Sleep(100 * time.Millisecond)
transfer.writerAt.Done(errFake)
}()
err = transfer.closeIO()
assert.NoError(t, err)
assert.EqualError(t, err, errFake.Error())
_, err = transfer.WriteAt([]byte("test"), 0)
assert.Error(t, err, "writing to closed pipe must fail")

Expand Down
6 changes: 5 additions & 1 deletion sftpd/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/drakkan/sftpgo/dataprovider"
"github.com/drakkan/sftpgo/logger"
"github.com/drakkan/sftpgo/metrics"
"github.com/drakkan/sftpgo/vfs"
)

const (
Expand All @@ -28,7 +29,7 @@ var (
// It implements the io Reader and Writer interface to handle files downloads and uploads
type Transfer struct {
file *os.File
writerAt *pipeat.PipeWriterAt
writerAt *vfs.PipeWriter
readerAt *pipeat.PipeReaderAt
cancelFn func()
path string
Expand Down Expand Up @@ -173,6 +174,9 @@ func (t *Transfer) closeIO() error {
var err error
if t.writerAt != nil {
err = t.writerAt.Close()
if err != nil {
t.transferError = err
}
} else if t.readerAt != nil {
err = t.readerAt.Close()
} else {
Expand Down
6 changes: 4 additions & 2 deletions vfs/gcsfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,12 @@ func (fs GCSFs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error
}

// Create creates or opens the named file for writing
func (fs GCSFs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, func(), error) {
func (fs GCSFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
return nil, nil, nil, err
}
p := NewPipeWriter(w)
bkt := fs.svc.Bucket(fs.config.Bucket)
obj := bkt.Object(name)
ctx, cancelFn := context.WithCancel(context.Background())
Expand All @@ -185,10 +186,11 @@ func (fs GCSFs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, f
defer objectWriter.Close()
n, err := io.Copy(objectWriter, r)
r.CloseWithError(err) //nolint:errcheck // the returned error is always null
p.Done(GetSFTPError(fs, err))
fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, readed bytes: %v, err: %v", name, n, err)
metrics.GCSTransferCompleted(n, 0, err)
}()
return nil, w, cancelFn, nil
return nil, p, cancelFn, nil
}

// Rename renames (moves) source to target.
Expand Down
2 changes: 1 addition & 1 deletion vfs/osfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (OsFs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error) {
}

// Create creates or opens the named file for writing
func (OsFs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, func(), error) {
func (OsFs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
var err error
var f *os.File
if flag == 0 {
Expand Down
6 changes: 4 additions & 2 deletions vfs/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,12 @@ func (fs S3Fs) Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error)
}

// Create creates or opens the named file for writing
func (fs S3Fs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, func(), error) {
func (fs S3Fs) Create(name string, flag int) (*os.File, *PipeWriter, func(), error) {
r, w, err := pipeat.PipeInDir(fs.localTempDir)
if err != nil {
return nil, nil, nil, err
}
p := NewPipeWriter(w)
ctx, cancelFn := context.WithCancel(context.Background())
uploader := s3manager.NewUploaderWithClient(fs.svc)
go func() {
Expand All @@ -224,11 +225,12 @@ func (fs S3Fs) Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, fu
u.PartSize = fs.config.UploadPartSize
})
r.CloseWithError(err) //nolint:errcheck // the returned error is always null
p.Done(GetSFTPError(fs, err))
fsLog(fs, logger.LevelDebug, "upload completed, path: %#v, response: %v, readed bytes: %v, err: %+v",
name, response, r.GetReadedBytes(), err)
metrics.S3TransferCompleted(r.GetReadedBytes(), 0, err)
}()
return nil, w, cancelFn, nil
return nil, p, cancelFn, nil
}

// Rename renames (moves) source to target.
Expand Down
37 changes: 36 additions & 1 deletion vfs/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Fs interface {
Stat(name string) (os.FileInfo, error)
Lstat(name string) (os.FileInfo, error)
Open(name string) (*os.File, *pipeat.PipeReaderAt, func(), error)
Create(name string, flag int) (*os.File, *pipeat.PipeWriterAt, func(), error)
Create(name string, flag int) (*os.File, *PipeWriter, func(), error)
Rename(source, target string) error
Remove(name string, isDir bool) error
Mkdir(name string) error
Expand All @@ -44,6 +44,41 @@ type Fs interface {
Join(elem ...string) string
}

// PipeWriter defines a wrapper for pipeat.PipeWriterAt.
type PipeWriter struct {
writer *pipeat.PipeWriterAt
err error
done chan bool
}

// NewPipeWriter initializes a new PipeWriter
func NewPipeWriter(w *pipeat.PipeWriterAt) *PipeWriter {
return &PipeWriter{
writer: w,
err: nil,
done: make(chan bool),
}
}

// Close waits for the upload to end, closes the pipeat.PipeWriterAt and returns an error if any.
func (p *PipeWriter) Close() error {
p.writer.Close() //nolint:errcheck // the returned error is always null
<-p.done
return p.err
}

// Done unlocks other goroutines waiting on Close().
// It must be called when the upload ends
func (p *PipeWriter) Done(err error) {
p.err = err
p.done <- true
}

// WriteAt is a wrapper for pipeat WriteAt
func (p *PipeWriter) WriteAt(data []byte, off int64) (int, error) {
return p.writer.WriteAt(data, off)
}

// VirtualFolder defines a mapping between a SFTP/SCP virtual path and a
// filesystem path outside the user home directory.
// The specified paths must be absolute and the virtual path cannot be "/",
Expand Down

0 comments on commit cfa7100

Please sign in to comment.