Skip to content

Commit

Permalink
feat: peer wire protocol实现
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie committed Feb 28, 2020
1 parent 7bb3e38 commit ee4ff7d
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 165 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
- [ ] BitTorrent下载实现
- [x] .torrent文件解析
- [x] tracker协议实现
- [ ] peer wire protocol协议实现
- [x] peer wire protocol协议实现
- [ ] DHT协议实现
- [ ] 磁力链接支持
- [ ] uTP协议实现
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ require (
github.com/RoaringBitmap/roaring v0.4.21
github.com/cenkalti/mse v0.0.0-20140930130441-6ef65f170972
github.com/marksamman/bencode v0.0.0-20150821143521-dc84f26e086e
github.com/sirupsen/logrus v1.4.2
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/cenkalti/mse v0.0.0-20140930130441-6ef65f170972 h1:FvAasVrPm/VUO8M0od
github.com/cenkalti/mse v0.0.0-20140930130441-6ef65f170972/go.mod h1:wGf99xKTJ8f/a8cC1inMqFz9cGEzISLLid5F6i5fI/g=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 h1:Ujru1hufTHVb++eG6OuNDKMxZnGIvF6o/u8q/8h2+I4=
github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE=
github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 h1:gclg6gY70GLy3PbkQ1AERPfmLMMagS60DKF78eWwLn8=
Expand All @@ -14,6 +16,8 @@ github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99 h1:twflg0XRTjwKp
github.com/gopherjs/gopherjs v0.0.0-20190910122728-9d188e94fb99/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/marksamman/bencode v0.0.0-20150821143521-dc84f26e086e h1:KMs6SK8iDSR1+ZzOK10L5wGPpWDByyvOe5nrqk51g2U=
github.com/marksamman/bencode v0.0.0-20150821143521-dc84f26e086e/go.mod h1:+AHfJo5+69p+fjvMJTmYajNP9rFBHQcaTDFmuXRRATI=
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae h1:VeRdUYdCw49yizlSbMEn2SZ+gT+3IUKx8BqxyQdz+BY=
Expand All @@ -22,13 +26,19 @@ github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU=
github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/willf/bitset v1.1.10 h1:NotGKqX0KwQ72NUzqrjZq5ipPNDQex9lo3WpaS8L2sc=
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
26 changes: 0 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package main

import (
"fmt"
"github.com/monkeyWie/gopeed/protocol/bt/metainfo"
"github.com/monkeyWie/gopeed/protocol/bt/peer"
"github.com/monkeyWie/gopeed/protocol/bt/torrent"
"github.com/monkeyWie/gopeed/protocol/bt/tracker"
"github.com/monkeyWie/gopeed/protocol/http"
)

Expand All @@ -24,26 +20,4 @@ func main() {
}
got, _ := http.Resolve(request)
fmt.Println(got)
getUsablePeer()
// webview.Open("Minimal webview example", "https://www.baidu.com", 800, 600, true)
}

func buildTorrent() *torrent.Torrent {
metaInfo, err := metainfo.ParseFromFile("../testdata/Game.of.Thrones.S08E05.720p.WEB.H264-MEMENTO.torrent")
if err != nil {
panic(err)
}
metaInfo.Announce = "udp://tracker.opentrackr.org:1337/announce"
metaInfo.AnnounceList = [][]string{}
return torrent.NewTorrent(peer.GenPeerID(), metaInfo)
}

// 获取一个能连接上的peer
func getUsablePeer() {
torrent := buildTorrent()
tracker := &tracker.Tracker{
PeerID: torrent.PeerID,
MetaInfo: torrent.MetaInfo,
}
tracker.Tracker()
}
86 changes: 48 additions & 38 deletions protocol/bt/torrent/peer_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/monkeyWie/gopeed/protocol/bt/metainfo"
"github.com/monkeyWie/gopeed/protocol/bt/peer"
"github.com/monkeyWie/gopeed/protocol/bt/peer/message"
log "github.com/sirupsen/logrus"
"io"
"math"
"net"
Expand All @@ -20,7 +21,9 @@ import (
)

// 现在主流客户端的block大小都是16KB
const BlockSize = 2 << 13
const blockSize = 2 << 13

var errPieceCheckFailed = errors.New("piece check failed")

type peerConn struct {
torrent *Torrent
Expand All @@ -35,18 +38,19 @@ type peerConn struct {
// peer is interested in this client
peerInterested bool

bitfield *message.Bitfield
readyComplete bool
downloadComplete bool
bitfield *message.Bitfield
readyEnd bool
downloadEnd bool

downloadedCh chan error
disconnectCh chan error
// block个数
blockCount int
// block下载队列,官方推荐为5
blockQueue chan interface{}
blockQueueCh chan interface{}
}

func NewPeerConn(torrent *Torrent, peer *peer.Peer) *peerConn {
func newPeerConn(torrent *Torrent, peer *peer.Peer) *peerConn {
return &peerConn{
torrent: torrent,
peer: peer,
Expand Down Expand Up @@ -116,10 +120,10 @@ func (pc *peerConn) ready() error {
if _, err := pc.handshake(); err != nil {
return fmt.Errorf("handshake error %w", err)
}
pc.readyComplete = false
pc.downloadComplete = false
pc.readyEnd = false
pc.downloadEnd = false
readyCh := make(chan bool)
defer close(readyCh)
pc.disconnectCh = make(chan error)
go func() {
scanner := bufio.NewScanner(pc.conn)
scanner.Split(message.SplitMessage)
Expand Down Expand Up @@ -156,13 +160,12 @@ func (pc *peerConn) ready() error {
}
}
// 还未下载完成时连接断开
if pc.downloadedCh != nil && !pc.downloadComplete {
err := scanner.Err()
if err == nil {
err = io.EOF
}
pc.downloadedCh <- err
err := scanner.Err()
if err == nil {
err = io.EOF
}
pc.disconnectCh <- err
close(pc.disconnectCh)
}()
err := func() error {
select {
Expand All @@ -177,24 +180,22 @@ func (pc *peerConn) ready() error {
return errors.New("ready time out")
}
}()
pc.readyComplete = true
if err != nil {
pc.conn.Close()
return err
}
pc.blockQueue = make(chan interface{}, 5)
pc.blockQueueCh = make(chan interface{}, 5)
return nil
}

// 下载指定piece
func (pc *peerConn) downloadPiece(index int) error {
pieceLength := pc.torrent.MetaInfo.GetPieceLength(index)
pc.blockCount = int(math.Ceil(float64(pieceLength) / BlockSize))
pc.blockCount = int(math.Ceil(float64(pieceLength) / blockSize))
pc.downloadedCh = make(chan error)
defer close(pc.downloadedCh)
// 按块下载分片
for i := 0; i < pc.blockCount; i++ {
offset := BlockSize * i
offset := blockSize * i
// 如果已经下载过就跳过
if pc.torrent.pieces.isBlockDownloaded(index, offset) {
continue
Expand All @@ -205,36 +206,43 @@ func (pc *peerConn) downloadPiece(index int) error {
// 最后一个block大小需要计算出来
blockLength = uint32(pieceLength - offset)
} else {
blockLength = BlockSize
blockLength = blockSize
}
// block下载排队,如果连接出现问题直接返回异常
select {
case pc.blockQueue <- nil:
case pc.blockQueueCh <- nil:
break
case err := <-pc.downloadedCh:
case err := <-pc.disconnectCh:
return err
}
// 发起request,对方会响应piece
_, err := pc.conn.Write(message.NewRequest(uint32(index), uint32(offset), blockLength).Encode())
if err != nil {
fmt.Println(err)
return err
}
}
return <-pc.downloadedCh
select {
case err := <-pc.downloadedCh:
return err
case err := <-pc.disconnectCh:
return err
}
}

func (pc *peerConn) handleUnchoke(readyCh chan<- bool) {
pc.peerChoking = false
// 已经处理过Unchoke信号
if pc.readyComplete {
if pc.readyEnd {
return
}
pc.readyEnd = true
// 如果客户端对peer感兴趣并且peer没有choked客户端,就可以开始下载了
if pc.amInterested {
readyCh <- true
} else {
readyCh <- false
}
close(readyCh)
}

func (pc *peerConn) handleBitfield(buf []byte) {
Expand Down Expand Up @@ -319,9 +327,10 @@ func (pc *peerConn) handlePiece(buf []byte) {
}()
}
pc.torrent.pieces.setBlockDownloaded(int(piece.Index), int(piece.Begin))
fmt.Printf("piece %d downloaded block %d,count %d\n", piece.Index, piece.Begin, pc.torrent.pieces.blockSize(int(piece.Index)))
// 出队
<-pc.blockQueue
<-pc.blockQueueCh

log.Debugf("piece:%d block:%d already:%d", piece.Index, piece.Begin, pc.torrent.pieces.blockSize(int(piece.Index)))

// piece全部下载完
if pc.torrent.pieces.blockSize(int(piece.Index)) == pc.blockCount {
Expand Down Expand Up @@ -356,22 +365,23 @@ func (pc *peerConn) handlePiece(buf []byte) {
break
}
}
// 标记下载完成
pc.downloadEnd = true
// 断开连接 TODO 用连接池进行复用
pc.conn.Close()

// 校验piece SHA-1 hash
downHash := [20]byte{}
copy(downHash[:], sha1.Sum(nil))
if downHash == pc.torrent.MetaInfo.Info.Pieces[piece.Index] {
fmt.Printf("piece %d 校验通过\n", piece.Index)
log.Infof("piece %d 校验通过", piece.Index)
// piece下载完成
pc.downloadedCh <- nil
} else {
fmt.Printf("piece %d 校验失败\n", piece.Index)
// 清空该piece上所有的block下载记录
pc.torrent.pieces.clearBlocks(int(piece.Index))
log.Infof("piece %d 校验失败", piece.Index)
pc.downloadedCh <- errPieceCheckFailed
}
// 标记下载完成
pc.downloadComplete = true
// 断开连接 TODO 用连接池进行复用
pc.conn.Close()
// piece下载完成
pc.downloadedCh <- nil
close(pc.downloadedCh)
}
}

Expand Down
6 changes: 1 addition & 5 deletions protocol/bt/torrent/peer_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ func getUsablePeerMore() chan *peerConn {
PeerID: torrent.PeerID,
MetaInfo: torrent.MetaInfo,
}
peers, err := tracker.Tracker()
if err != nil {
panic(err)
}
peers := <-tracker.Tracker()
ch := make(chan *peerConn)
for i := range peers {
go peerTest(torrent, &peers[i], ch)
Expand Down Expand Up @@ -162,5 +159,4 @@ func TestSome2(t *testing.T) {
}(i)
}
wg.Wait()

}
36 changes: 33 additions & 3 deletions protocol/bt/torrent/peer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package torrent

import (
"github.com/monkeyWie/gopeed/protocol/bt/peer"
"github.com/monkeyWie/gopeed/protocol/bt/tracker"
"sync"
"time"
)

type peerPool struct {
torrent *Torrent

states map[peer.Peer]*peerState
lock *sync.Mutex
}
Expand All @@ -17,11 +21,34 @@ type peerState struct {
errors int
}

func newPeerPool() *peerPool {
func newPeerPool(torrent *Torrent) *peerPool {
return &peerPool{
lock: &sync.Mutex{},
states: map[peer.Peer]*peerState{},
torrent: torrent,
lock: &sync.Mutex{},
states: map[peer.Peer]*peerState{},
}
}

func (pp *peerPool) fetch() {
tracker := &tracker.Tracker{
PeerID: pp.torrent.PeerID,
MetaInfo: pp.torrent.MetaInfo,
}

go func() {
for {
// 当peer数量少于200个时重新发起一次tracker
if len(pp.states) < 200 {
go func() {
for peers := range tracker.Tracker() {
pp.put(peers)
}
}()
}
// 每5分钟检测一次
time.Sleep(time.Minute * 5)
}
}()
}

func (pp *peerPool) put(peers []peer.Peer) {
Expand Down Expand Up @@ -50,16 +77,19 @@ func (pp *peerPool) get() *peer.Peer {
return nil
}

// 将peer重新放回池中,等待使用
func (pp *peerPool) release(peer *peer.Peer) {
pp.lock.Lock()
defer pp.lock.Unlock()
pp.states[*peer].using = false
pp.states[*peer].errors = 0
}

// 标记peer为不可用,超过3次则剔除该peer
func (pp *peerPool) unavailable(peer *peer.Peer) {
pp.lock.Lock()
defer pp.lock.Unlock()
pp.states[*peer].using = false
pp.states[*peer].errors++
if pp.states[*peer].errors > 3 {
delete(pp.states, *peer)
Expand Down
Loading

0 comments on commit ee4ff7d

Please sign in to comment.