Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,45 @@ type ClientDriverExtensionHasher interface {
}
```

#### Detect transfer errors and interruptions

FTP has no length header for `STOR` uploads, so the server can't always tell
whether an upload that "ended" did so because the client sent every byte or
because it gave up. To get notified when the server *does* detect an
interruption (ABOR, dropped TCP connection, I/O error), implement the
`FileTransferError` interface on the value you return from your `afero.Fs`
`OpenFile`/`Create` (or from `ClientDriverExtentionFileTransfer.GetHandle`):

```go
type monitoredFile struct {
afero.File
transferErr error
}

// TransferError is called before Close whenever the transfer did not
// complete normally (ABOR, broken connection, copy error, ...).
func (f *monitoredFile) TransferError(err error) {
f.transferErr = err
}

func (f *monitoredFile) Close() error {
err := f.File.Close()
if f.transferErr != nil {
// upload was interrupted - run your "discard partial file" logic
} else {
// upload completed - run your "publish file" logic
}
return err
}
```

`TransferError` is always invoked before `Close`. Both methods are called
on the same `FileTransfer` instance, so a simple flag is enough to
distinguish the two cases. A client that closes the data connection
mid-upload without sending `ABOR` is indistinguishable from a clean
completion - this is a limitation of the FTP protocol itself, not of this
library.

## History of the project

I wanted to make a system which would accept files through FTP and redirect them to something else. Go seemed like the obvious choice and it seemed there was a lot of libraries available but it turns out none of them were in a useable state.
Expand Down
38 changes: 37 additions & 1 deletion driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,43 @@ type FileTransfer interface {
io.Closer
}

// FileTransferError is a FileTransfer extension used to notify errors.
// FileTransferError is a FileTransfer extension used to notify the driver
// that a transfer did not complete normally.
//
// TransferError is called before the file's Close method whenever the data
// transfer is interrupted by:
// - an ABOR command from the client,
// - the client dropping the data connection (TCP RST or otherwise),
// - an I/O error while copying bytes between the data connection and the file.
//
// It is NOT called when the transfer ends cleanly (the data connection is
// closed gracefully by the client after sending all of its bytes). In that
// case only Close is invoked.
//
// A common pattern is to set a flag in TransferError and inspect it in Close
// to distinguish a completed upload from an interrupted one:
//
// type monitoredFile struct {
// afero.File
// transferErr error
// }
//
// func (f *monitoredFile) TransferError(err error) { f.transferErr = err }
//
// func (f *monitoredFile) Close() error {
// err := f.File.Close()
// if f.transferErr != nil {
// // upload was interrupted: f.transferErr describes why
// } else {
// // upload completed (as far as the server can tell)
// }
// return err
// }
//
// Keep in mind that FTP provides no length header for STOR transfers, so a
// client that closes the data connection mid-upload without sending ABOR is
// indistinguishable from a normal completion. Implementing this interface
// catches every case the server can detect.
type FileTransferError interface {
TransferError(err error)
}
Expand Down
47 changes: 43 additions & 4 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,34 @@ type TestServerDriver struct {
Debug bool // To display connection logs information
TLS bool
CloseOnConnect bool // disconnect the client as soon as it connects
// transferFiles tracks the *testFile wrappers handed back from OpenFile,
// keyed by the path the FTP layer requested. Tests use it to inspect
// errTransfer after an interrupted transfer.
transferFiles sync.Map
}

// getTransferFile returns the *testFile previously registered under path, if any.
func (driver *TestServerDriver) getTransferFile(path string) *testFile {
v, ok := driver.transferFiles.Load(path)
if !ok {
return nil
}

file, _ := v.(*testFile)

return file
}

// TestClientDriver defines a minimal serverftp client driver
type TestClientDriver struct {
afero.Fs
server *TestServerDriver
}

type testFile struct {
afero.File
errTransfer error
errTransferMu sync.Mutex
errTransfer error
}

var (
Expand Down Expand Up @@ -214,13 +232,26 @@ func (f *testFile) Readdir(count int) ([]os.FileInfo, error) {

// TransferError implements the FileTransferError interface
func (f *testFile) TransferError(err error) {
f.errTransferMu.Lock()
defer f.errTransferMu.Unlock()
f.errTransfer = err
}

// getErrTransfer returns the error recorded by TransferError, if any. It is
// safe to call concurrently with the transfer goroutine (e.g. while polling
// for a connection-drop interruption).
func (f *testFile) getErrTransfer() error {
f.errTransferMu.Lock()
defer f.errTransferMu.Unlock()

return f.errTransfer
}

// NewTestClientDriver creates a client driver
func NewTestClientDriver(server *TestServerDriver) *TestClientDriver {
return &TestClientDriver{
Fs: server.fs,
Fs: server.fs,
server: server,
}
}

Expand Down Expand Up @@ -417,7 +448,11 @@ func (driver *TestClientDriver) OpenFile(path string, flag int, perm os.FileMode
file, err := driver.Fs.OpenFile(path, flag, perm)

if err == nil {
file = &testFile{File: file}
wrapped := &testFile{File: file}
if driver.server != nil {
driver.server.transferFiles.Store(path, wrapped)
}
file = wrapped
}

return file, err
Expand All @@ -431,7 +466,11 @@ func (driver *TestClientDriver) Open(name string) (afero.File, error) {
file, err := driver.Fs.Open(name)

if err == nil {
file = &testFile{File: file}
wrapped := &testFile{File: file}
if driver.server != nil {
driver.server.transferFiles.Store(name, wrapped)
}
file = wrapped
}

return file, err
Expand Down
162 changes: 162 additions & 0 deletions transfer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,168 @@ func TestFailingFileTransfer(t *testing.T) {
})
}

// TestFileTransferErrorNotification covers the contract documented on the
// FileTransferError interface: TransferError is called before Close when the
// transfer is interrupted, and is NOT called for a clean completion. This is
// the recommended way for drivers to detect upload interruptions (issue #635).
func TestFileTransferErrorNotification(t *testing.T) {
t.Parallel()

t.Run("clean upload does not trigger TransferError", testTransferErrorCleanUpload)
t.Run("write error triggers TransferError before Close", testTransferErrorOnWriteFailure)
t.Run("ABOR mid-upload triggers TransferError", testTransferErrorOnABOR)
t.Run("dropped connection mid-upload triggers TransferError", testTransferErrorOnDroppedConn)
}

// dialTransferErrorServer spins up a test server with a TestServerDriver that
// tracks the files it hands out, so tests can inspect errTransfer afterwards.
func dialTransferErrorServer(t *testing.T) (*TestServerDriver, *goftp.Client) {
t.Helper()

driver := &TestServerDriver{Debug: false}
server := NewTestServerWithTestDriver(t, driver)
client, err := goftp.DialConfig(goftp.Config{
User: authUser,
Password: authPass,
}, server.Addr())
require.NoError(t, err)

return driver, client
}

func testTransferErrorCleanUpload(t *testing.T) {
t.Parallel()

driver, client := dialTransferErrorServer(t)

defer func() { require.NoError(t, client.Close()) }()

file := createTemporaryFile(t, 1*1024)
require.NoError(t, client.Store("ok.bin", file))

tf := driver.getTransferFile("/ok.bin")
require.NotNil(t, tf, "test driver should have tracked the upload file")
require.NoError(t, tf.getErrTransfer(), "TransferError must not fire on a clean upload")
}

func testTransferErrorOnWriteFailure(t *testing.T) {
t.Parallel()

driver, client := dialTransferErrorServer(t)

defer func() { require.NoError(t, client.Close()) }()

file := createTemporaryFile(t, 1*1024)
require.Error(t, client.Store("fail-to-write.bin", file))

tf := driver.getTransferFile("/fail-to-write.bin")
require.NotNil(t, tf, "test driver should have tracked the upload file")
require.Error(t, tf.getErrTransfer(), "TransferError must fire when the file write fails")
}

func testTransferErrorOnABOR(t *testing.T) {
t.Parallel()

driver, client := dialTransferErrorServer(t)

defer func() { require.NoError(t, client.Close()) }()

raw, err := client.OpenRawConn()
require.NoError(t, err)

defer func() { require.NoError(t, raw.Close()) }()

// "delay-io" makes each write sleep 500ms, giving us room to ABOR
// while the transfer is in flight.
fileName := "/delay-io-abort.bin"

dcGetter, err := raw.PrepareDataConn()
require.NoError(t, err)

require.NoError(t, raw.SendCommandNoWaitResponse("STOR %s", fileName))

returnCode, _, err := raw.ReadResponse()
require.NoError(t, err)
require.Equal(t, StatusFileStatusOK, returnCode)

dataConn, err := dcGetter()
require.NoError(t, err)

// Push a chunk so the server's io.Copy enters the slow Write path.
_, err = dataConn.Write(bytes.Repeat([]byte("x"), 1024))
require.NoError(t, err)

returnCode, _, err = raw.SendCommand("%s", getABORCmd())
require.NoError(t, err)
require.Equal(t, StatusTransferAborted, returnCode)

returnCode, _, err = raw.ReadResponse()
require.NoError(t, err)
require.Equal(t, StatusClosingDataConn, returnCode)

require.NoError(t, dataConn.Close())

// NOOP is serialized behind the transfer goroutine (transferWg.Wait),
// so a successful NOOP response guarantees the STOR goroutine has
// finished and TransferError/Close have already run.
returnCode, _, err = raw.SendCommand("NOOP")
require.NoError(t, err)
require.Equal(t, StatusOK, returnCode)

tf := driver.getTransferFile(fileName)
require.NotNil(t, tf, "test driver should have tracked the upload file")
require.Error(t, tf.getErrTransfer(), "TransferError must fire on an ABOR-interrupted upload")
}

// testTransferErrorOnDroppedConn reproduces the scenario from issue #635: a
// client (e.g. FileZilla) is in the middle of an upload and the whole
// connection goes away - the process is killed, the network drops, etc. The
// server detects the broken data connection and must report it through
// TransferError so the driver can discard the partial file instead of treating
// it as a finished upload.
func testTransferErrorOnDroppedConn(t *testing.T) {
t.Parallel()

driver, client := dialTransferErrorServer(t)

raw, err := client.OpenRawConn()
require.NoError(t, err)

// "delay-io" makes each write sleep 500ms, so the server is still
// inside io.Copy when we tear the connection down.
fileName := "/delay-io-dropped.bin"

dcGetter, err := raw.PrepareDataConn()
require.NoError(t, err)

require.NoError(t, raw.SendCommandNoWaitResponse("STOR %s", fileName))

returnCode, _, err := raw.ReadResponse()
require.NoError(t, err)
require.Equal(t, StatusFileStatusOK, returnCode)

dataConn, err := dcGetter()
require.NoError(t, err)

_, err = dataConn.Write(bytes.Repeat([]byte("x"), 1024))
require.NoError(t, err)

// Tear everything down without an ABOR or a clean end-of-transfer:
// this mimics a client that simply vanishes mid-upload.
require.NoError(t, dataConn.Close())
require.NoError(t, raw.Close())
require.NoError(t, client.Close())

// The transfer runs in its own goroutine; once the server notices the
// broken connection it calls TransferError before closing the file.
tf := driver.getTransferFile(fileName)
require.NotNil(t, tf, "test driver should have tracked the upload file")
require.Eventually(t, func() bool {
return tf.getErrTransfer() != nil
}, 5*time.Second, 10*time.Millisecond,
"TransferError must fire when the connection drops mid-upload")
}

func TestAPPEExistingFile(t *testing.T) {
driver := &TestServerDriver{
Debug: false,
Expand Down
Loading