From eee6112be5e1041f073a73370eedb40bf10c1fb2 Mon Sep 17 00:00:00 2001 From: liwei Date: Thu, 11 Mar 2021 17:21:56 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E5=88=A9=E7=94=A8context=E5=81=9A?= =?UTF-8?q?=E5=8F=96=E6=B6=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- download/http/fetcher.go | 100 +++++++++++++++++----------------- download/http/fetcher_test.go | 3 +- go.mod | 1 + go.sum | 3 + 4 files changed, 56 insertions(+), 51 deletions(-) diff --git a/download/http/fetcher.go b/download/http/fetcher.go index b53ec8e84..3a6b47c9d 100644 --- a/download/http/fetcher.go +++ b/download/http/fetcher.go @@ -2,9 +2,11 @@ package http import ( "bytes" + "context" "fmt" "github.com/monkeyWie/gopeed-core/download/base" "github.com/monkeyWie/gopeed-core/download/http/model" + "golang.org/x/sync/errgroup" "io" "io/ioutil" "mime" @@ -39,6 +41,8 @@ type Fetcher struct { clients []*http.Response chunks []*model.Chunk + ctx context.Context + cancel context.CancelFunc pauseCh chan interface{} } @@ -58,7 +62,7 @@ func FetcherBuilder() ([]string, func() base.Fetcher) { } func (f *Fetcher) Resolve(req *base.Request) (*base.Resource, error) { - httpReq, err := buildRequest(req) + httpReq, err := buildRequest(nil, req) if err != nil { return nil, err } @@ -176,7 +180,7 @@ func (f *Fetcher) Pause() (err error) { return } f.status = base.DownloadStatusPause - f.stop() + f.cancel() <-f.pauseCh return } @@ -205,30 +209,19 @@ func (f *Fetcher) filename() string { } func (f *Fetcher) fetch() { - errCh := make(chan error, f.opts.Connections) - + f.ctx, f.cancel = context.WithCancel(context.Background()) + eg, _ := errgroup.WithContext(f.ctx) for i := 0; i < f.opts.Connections; i++ { - go func(i int) { - errCh <- f.fetchChunk(i, f.filename(), f.chunks[i]) - }(i) + i := i + eg.Go(func() error { + return f.fetchChunk(i) + }) } go func() { - var err error - stopFlag := false - for i := 0; i < f.opts.Connections; i++ { - fetchErr := <-errCh - if fetchErr != nil && !stopFlag { - // 有一个连接失败就立即终止下载 - err = fetchErr - stopFlag = true - f.stop() - } - } - + err := eg.Wait() // 下载停止,关闭文件句柄 f.Ctl.Close(f.filename()) - if f.status == base.DownloadStatusPause { f.pauseCh <- nil } else { @@ -242,18 +235,11 @@ func (f *Fetcher) fetch() { }() } -func (f *Fetcher) stop() { - if len(f.clients) > 0 { - for _, client := range f.clients { - if client != nil { - client.Body.Close() - } - } - } -} +func (f *Fetcher) fetchChunk(index int) (err error) { + filename := f.filename() + chunk := f.chunks[index] -func (f *Fetcher) fetchChunk(index int, name string, chunk *model.Chunk) (err error) { - httpReq, err := buildRequest(f.res.Req) + httpReq, err := buildRequest(f.ctx, f.res.Req) if err != nil { return err } @@ -302,7 +288,7 @@ func (f *Fetcher) fetchChunk(index int, name string, chunk *model.Chunk) (err er for { n, err := resp.Body.Read(buf) if n > 0 { - _, err := f.Ctl.Write(name, chunk.Begin+chunk.Downloaded, buf[:n]) + _, err := f.Ctl.Write(filename, chunk.Begin+chunk.Downloaded, buf[:n]) if err != nil { return false, err } @@ -346,30 +332,44 @@ func buildClient() *http.Client { } } -func buildRequest(req *base.Request) (*http.Request, error) { +func buildRequest(ctx context.Context, req *base.Request) (httpReq *http.Request, err error) { url, err := url.Parse(req.URL) if err != nil { - return nil, err - } - httpReq := &http.Request{ - URL: url, - Header: map[string][]string{}, + return } - if req.Extra != nil { - if extra, ok := req.Extra.(model.Extra); ok { - if extra.Method != "" { - httpReq.Method = extra.Method - } else { - httpReq.Method = http.MethodGet - } - if len(extra.Header) > 0 { - for k, v := range extra.Header { - httpReq.Header[k] = []string{v} - } + var ( + method string + body io.Reader + ) + headers := make(map[string][]string) + if req.Extra == nil { + method = http.MethodGet + } else { + extra := req.Extra.(model.Extra) + if extra.Method != "" { + method = extra.Method + } else { + method = http.MethodGet + } + if len(extra.Header) > 0 { + for k, v := range extra.Header { + headers[k] = []string{v} } - httpReq.Body = ioutil.NopCloser(bytes.NewBufferString(extra.Body)) } + if extra.Body != "" { + body = ioutil.NopCloser(bytes.NewBufferString(extra.Body)) + } + } + + if ctx != nil { + httpReq, err = http.NewRequestWithContext(ctx, method, url.String(), body) + } else { + httpReq, err = http.NewRequest(method, url.String(), body) + } + if err != nil { + return } + httpReq.Header = headers return httpReq, nil } diff --git a/download/http/fetcher_test.go b/download/http/fetcher_test.go index a04c814ee..86b528c22 100644 --- a/download/http/fetcher_test.go +++ b/download/http/fetcher_test.go @@ -264,10 +264,11 @@ func downloadContinue(listener net.Listener, connections int, t *testing.T) { if err != nil { t.Fatal(err) } - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Millisecond * 200) if err := fetcher.Pause(); err != nil { t.Fatal(err) } + time.Sleep(time.Millisecond * 200) if err := fetcher.Continue(); err != nil { t.Fatal(err) } diff --git a/go.mod b/go.mod index 0b517927e..9f2474fba 100644 --- a/go.mod +++ b/go.mod @@ -9,4 +9,5 @@ require ( github.com/marksamman/bencode v0.0.0-20150821143521-dc84f26e086e github.com/sirupsen/logrus v1.4.2 golang.org/x/net v0.0.0-20200707034311-ab3426394381 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) diff --git a/go.sum b/go.sum index 5dac46a68..0ee9d75ae 100644 --- a/go.sum +++ b/go.sum @@ -44,10 +44,13 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200707034311-ab3426394381 h1:VXak5I6aEWmAXeQjA+QSZzlgNrpq9mjcfDemuexIKsU= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=