Skip to content

Commit

Permalink
refactor: 利用context做取消
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie committed Mar 11, 2021
1 parent 79a93d5 commit eee6112
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 51 deletions.
100 changes: 50 additions & 50 deletions download/http/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package http

import (
"bytes"
"context"
"fmt"
"github.com/monkeyWie/gopeed-core/download/base"
"github.com/monkeyWie/gopeed-core/download/http/model"
"golang.org/x/sync/errgroup"
"io"
"io/ioutil"
"mime"
Expand Down Expand Up @@ -39,6 +41,8 @@ type Fetcher struct {
clients []*http.Response
chunks []*model.Chunk

ctx context.Context
cancel context.CancelFunc
pauseCh chan interface{}
}

Expand All @@ -58,7 +62,7 @@ func FetcherBuilder() ([]string, func() base.Fetcher) {
}

func (f *Fetcher) Resolve(req *base.Request) (*base.Resource, error) {
httpReq, err := buildRequest(req)
httpReq, err := buildRequest(nil, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -176,7 +180,7 @@ func (f *Fetcher) Pause() (err error) {
return
}
f.status = base.DownloadStatusPause
f.stop()
f.cancel()
<-f.pauseCh
return
}
Expand Down Expand Up @@ -205,30 +209,19 @@ func (f *Fetcher) filename() string {
}

func (f *Fetcher) fetch() {
errCh := make(chan error, f.opts.Connections)

f.ctx, f.cancel = context.WithCancel(context.Background())
eg, _ := errgroup.WithContext(f.ctx)
for i := 0; i < f.opts.Connections; i++ {
go func(i int) {
errCh <- f.fetchChunk(i, f.filename(), f.chunks[i])
}(i)
i := i
eg.Go(func() error {
return f.fetchChunk(i)
})
}

go func() {
var err error
stopFlag := false
for i := 0; i < f.opts.Connections; i++ {
fetchErr := <-errCh
if fetchErr != nil && !stopFlag {
// 有一个连接失败就立即终止下载
err = fetchErr
stopFlag = true
f.stop()
}
}

err := eg.Wait()
// 下载停止,关闭文件句柄
f.Ctl.Close(f.filename())

if f.status == base.DownloadStatusPause {
f.pauseCh <- nil
} else {
Expand All @@ -242,18 +235,11 @@ func (f *Fetcher) fetch() {
}()
}

func (f *Fetcher) stop() {
if len(f.clients) > 0 {
for _, client := range f.clients {
if client != nil {
client.Body.Close()
}
}
}
}
func (f *Fetcher) fetchChunk(index int) (err error) {
filename := f.filename()
chunk := f.chunks[index]

func (f *Fetcher) fetchChunk(index int, name string, chunk *model.Chunk) (err error) {
httpReq, err := buildRequest(f.res.Req)
httpReq, err := buildRequest(f.ctx, f.res.Req)
if err != nil {
return err
}
Expand Down Expand Up @@ -302,7 +288,7 @@ func (f *Fetcher) fetchChunk(index int, name string, chunk *model.Chunk) (err er
for {
n, err := resp.Body.Read(buf)
if n > 0 {
_, err := f.Ctl.Write(name, chunk.Begin+chunk.Downloaded, buf[:n])
_, err := f.Ctl.Write(filename, chunk.Begin+chunk.Downloaded, buf[:n])
if err != nil {
return false, err
}
Expand Down Expand Up @@ -346,30 +332,44 @@ func buildClient() *http.Client {
}
}

func buildRequest(req *base.Request) (*http.Request, error) {
func buildRequest(ctx context.Context, req *base.Request) (httpReq *http.Request, err error) {
url, err := url.Parse(req.URL)
if err != nil {
return nil, err
}
httpReq := &http.Request{
URL: url,
Header: map[string][]string{},
return
}

if req.Extra != nil {
if extra, ok := req.Extra.(model.Extra); ok {
if extra.Method != "" {
httpReq.Method = extra.Method
} else {
httpReq.Method = http.MethodGet
}
if len(extra.Header) > 0 {
for k, v := range extra.Header {
httpReq.Header[k] = []string{v}
}
var (
method string
body io.Reader
)
headers := make(map[string][]string)
if req.Extra == nil {
method = http.MethodGet
} else {
extra := req.Extra.(model.Extra)
if extra.Method != "" {
method = extra.Method
} else {
method = http.MethodGet
}
if len(extra.Header) > 0 {
for k, v := range extra.Header {
headers[k] = []string{v}
}
httpReq.Body = ioutil.NopCloser(bytes.NewBufferString(extra.Body))
}
if extra.Body != "" {
body = ioutil.NopCloser(bytes.NewBufferString(extra.Body))
}
}

if ctx != nil {
httpReq, err = http.NewRequestWithContext(ctx, method, url.String(), body)
} else {
httpReq, err = http.NewRequest(method, url.String(), body)
}
if err != nil {
return
}
httpReq.Header = headers
return httpReq, nil
}
3 changes: 2 additions & 1 deletion download/http/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,10 +264,11 @@ func downloadContinue(listener net.Listener, connections int, t *testing.T) {
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 500)
time.Sleep(time.Millisecond * 200)
if err := fetcher.Pause(); err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 200)
if err := fetcher.Continue(); err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ require (
github.com/marksamman/bencode v0.0.0-20150821143521-dc84f26e086e
github.com/sirupsen/logrus v1.4.2
golang.org/x/net v0.0.0-20200707034311-ab3426394381
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down

0 comments on commit eee6112

Please sign in to comment.