Skip to content

Commit

Permalink
fix: task lock by unreachable torrent (#431)
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie authored Mar 29, 2024
1 parent bd6d6be commit 56ef250
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 66 deletions.
5 changes: 5 additions & 0 deletions internal/protocol/bt/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Fetcher struct {
meta *fetcher.FetcherMeta

torrentReady atomic.Bool
torrentDrop atomic.Bool
create atomic.Bool
progress fetcher.Progress
}
Expand Down Expand Up @@ -143,6 +144,7 @@ func (f *Fetcher) Pause() (err error) {
}

func (f *Fetcher) Close() (err error) {
f.torrentDrop.Store(false)
f.safeDrop()
return nil
}
Expand All @@ -158,6 +160,9 @@ func (f *Fetcher) safeDrop() {

func (f *Fetcher) Wait() (err error) {
for {
if f.torrentDrop.Load() {
break
}
if f.torrentReady.Load() && len(f.meta.Opts.SelectFiles) > 0 {
done := true
for _, selectIndex := range f.meta.Opts.SelectFiles {
Expand Down
144 changes: 81 additions & 63 deletions pkg/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (d *Downloader) Setup() error {
if len(d.tasks) > 0 {
for _, task := range d.tasks {
func() {
task.lock.Lock()
defer task.lock.Unlock()
task.statusLock.Lock()
defer task.statusLock.Unlock()
if task.Status != base.DownloadStatusRunning {
return
}
Expand Down Expand Up @@ -329,8 +329,7 @@ func (d *Downloader) Pause(id string) (err error) {
if task.Status == base.DownloadStatusPause {
return
}
err = d.doPause(task)
if err != nil {
if err = d.doPause(task); err != nil {
return
}
d.notifyRunning()
Expand Down Expand Up @@ -363,19 +362,7 @@ func (d *Downloader) Continue(id string) (err error) {
}
}()

func() {
task.lock.Lock()
defer task.lock.Unlock()

if task.Status == base.DownloadStatusRunning {
return
}

if err = d.doStart(task); err != nil {
return
}
}()
return
return d.doStart(task)
}

func (d *Downloader) PauseAll() (err error) {
Expand All @@ -388,13 +375,7 @@ func (d *Downloader) PauseAll() (err error) {
}()

for _, task := range d.tasks {
err = func() error {
task.lock.Lock()
defer task.lock.Unlock()

return d.doPause(task)
}()
if err != nil {
if err = d.doPause(task); err != nil {
return
}
}
Expand Down Expand Up @@ -424,13 +405,7 @@ func (d *Downloader) ContinueAll() (err error) {

for _, task := range continuedTasks {
tt := task
err = func() error {
tt.lock.Lock()
defer tt.lock.Unlock()

return d.doStart(tt)
}()
if err != nil {
if err = d.doStart(tt); err != nil {
return
}
}
Expand Down Expand Up @@ -522,17 +497,21 @@ func (d *Downloader) Stats(id string) (sr any, err error) {

func (d *Downloader) doDelete(task *Task, force bool) (err error) {
err = func() error {
if task.fetcher != nil {
if err := task.fetcher.Close(); err != nil {
return err
}
}
d.lock.Lock()
defer d.lock.Unlock()

if err := d.storage.Delete(bucketTask, task.ID); err != nil {
return err
}
if err := d.storage.Delete(bucketSave, task.ID); err != nil {
return err
}

if task.fetcher != nil {
if err := task.fetcher.Close(); err != nil {
return err
}
}
if force && task.Meta.Res != nil {
if task.Meta.Res.Name != "" {
if err := os.RemoveAll(task.Meta.FolderPath()); err != nil {
Expand Down Expand Up @@ -794,31 +773,52 @@ func (d *Downloader) doCreate(fetcher fetcher.Fetcher, opts *base.Options) (task
return
}

d.doStart(task)
err = d.doStart(task)
}()

go d.watch(task)
return
}

func (d *Downloader) statusMut(task *Task, fn func() (bool, error)) (bool, error) {
task.statusLock.Lock()
defer task.statusLock.Unlock()

return fn()
}

func (d *Downloader) doStart(task *Task) (err error) {
if task.Status != base.DownloadStatusRunning && task.Status != base.DownloadStatusDone {
err := d.restoreFetcher(task)
var isCreate bool
isReturn, err := d.statusMut(task, func() (isReturn bool, err error) {
if task.Status == base.DownloadStatusRunning || task.Status == base.DownloadStatusDone {
isReturn = true
return
}

err = d.restoreFetcher(task)
if err != nil {
d.Logger.Error().Stack().Err(err).Msgf("restore fetcher failed, task id: %s", task.ID)
return err
return
}
}
isCreate = task.Status == base.DownloadStatusReady

isCreate := task.Status == base.DownloadStatusReady
d.triggerOnStart(task)
task.updateStatus(base.DownloadStatusRunning)

doStart := func() error {
return
})
if err != nil {
d.Logger.Error().Stack().Err(err).Msgf("start task failed, task id: %s", task.ID)
return
}
if isReturn {
return
}

handler := func() error {
task.lock.Lock()
defer task.lock.Unlock()

d.triggerOnStart(task)
task.updateStatus(base.DownloadStatusRunning)

if task.Meta.Res == nil {
err := task.fetcher.Resolve(task.Meta.Req)
if err != nil {
Expand Down Expand Up @@ -865,8 +865,7 @@ func (d *Downloader) doStart(task *Task) (err error) {
return nil
}
go func() {
err := doStart()
if err != nil {
if err := handler(); err != nil {
d.doOnError(task, err)
}
}()
Expand All @@ -875,27 +874,44 @@ func (d *Downloader) doStart(task *Task) (err error) {
}

func (d *Downloader) doPause(task *Task) (err error) {
err = func() error {
if task.Status != base.DownloadStatusDone {
task.updateStatus(base.DownloadStatusPause)
task.timer.Pause()
if task.fetcher != nil {
if err := task.fetcher.Pause(); err != nil {
return err
}
}
if err := d.storage.Put(bucketTask, task.ID, task.clone()); err != nil {
return err
}
d.emit(EventKeyPause, task)
isReturn, err := d.statusMut(task, func() (isReturn bool, err error) {
if task.Status == base.DownloadStatusPause || task.Status == base.DownloadStatusDone {
isReturn = true
return
}
return nil
}()

task.updateStatus(base.DownloadStatusPause)
task.timer.Pause()
return
})
if err != nil {
d.Logger.Error().Stack().Err(err).Msgf("pause task failed, task id: %s", task.ID)
return
}
if isReturn {
return
}

handler := func() error {
task.lock.Lock()
defer task.lock.Unlock()

if task.fetcher != nil {
if err := task.fetcher.Pause(); err != nil {
return err
}
}
if err := d.storage.Put(bucketTask, task.ID, task.clone()); err != nil {
return err
}
d.emit(EventKeyPause, task)
return nil
}
go func() {
if err := handler(); err != nil {
d.Logger.Error().Stack().Err(err).Msgf("pause task handle failed, task id: %s", task.ID)
}
}()
return
}

Expand Down Expand Up @@ -923,6 +939,8 @@ func (d *Downloader) buildFetcher(url string) (fetcher.Fetcher, error) {

func initTask(task *Task) {
task.timer = util.NewTimer(task.Progress.Used)

task.statusLock = &sync.Mutex{}
task.lock = &sync.Mutex{}
task.speedArr = make([]int64, 0)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/download/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Task struct {
fetcherBuilder fetcher.FetcherBuilder
fetcher fetcher.Fetcher
timer *util.Timer
statusLock *sync.Mutex
lock *sync.Mutex
speedArr []int64
}
Expand Down
2 changes: 1 addition & 1 deletion ui/flutter/lib/api/model/options.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Options {
Options({
required this.name,
required this.path,
required this.selectFiles,
this.selectFiles = const [],
this.extra,
});

Expand Down
6 changes: 4 additions & 2 deletions ui/flutter/lib/api/model/options.g.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 56ef250

Please sign in to comment.