diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index b7896994f2..eca033d044 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -73,6 +73,11 @@ func (network *MockNetwork) GetPeers(options ...network.PeerOption) []network.Pe return nil } +// GetRoundTripper -- returns the network round tripper +func (network *MockNetwork) GetRoundTripper() http.RoundTripper { + return http.DefaultTransport +} + // Ready - always ready func (network *MockNetwork) Ready() chan struct{} { c := make(chan struct{}) @@ -91,9 +96,3 @@ func (network *MockNetwork) ClearHandlers() { // RegisterHTTPHandler - empty implementation func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handler) { } - -// MakeHTTPRequest - basic client.Do request -func (network *MockNetwork) MakeHTTPRequest(client *http.Client, - request *http.Request) (*http.Response, error) { - return client.Do(request) -} diff --git a/network/dialer.go b/network/dialer.go new file mode 100644 index 0000000000..83a642af6b --- /dev/null +++ b/network/dialer.go @@ -0,0 +1,58 @@ +// Copyright (C) 2019-2020 Algorand, Inc. +// This file is part of go-algorand +// +// go-algorand is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// go-algorand is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with go-algorand. If not, see . + +package network + +import ( + "context" + "net" + "time" +) + +// Dialer establish tcp-level connection with the destination +type Dialer struct { + phonebook *MultiPhonebook + innerDialer net.Dialer +} + +// Dial connects to the address on the named network. +// It waits if needed not to exceed connectionsRateLimitingCount. +func (d *Dialer) Dial(network, address string) (net.Conn, error) { + return d.DialContext(context.Background(), network, address) +} + +// DialContext connects to the address on the named network using the provided context. +// It waits if needed not to exceed connectionsRateLimitingCount. +func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + var waitTime time.Duration + var provisionalTime time.Time + + for { + _, waitTime, provisionalTime = d.phonebook.GetConnectionWaitTime(address) + if waitTime == 0 { + break // break out of the loop and proceed to the connection + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(waitTime): + } + } + conn, err := d.innerDialer.DialContext(ctx, network, address) + d.phonebook.UpdateConnectionTime(address, provisionalTime) + + return conn, err +} diff --git a/network/phonebook.go b/network/phonebook.go index 2f6f823ca0..872ec14805 100644 --- a/network/phonebook.go +++ b/network/phonebook.go @@ -36,10 +36,13 @@ type Phonebook interface { // UpdateRetryAfter updates the retry-after field for the entries matching the given address UpdateRetryAfter(addr string, retryAfter time.Time) - // WaitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount. - // Then it will register a provisional next connection time. - // Will return true if it waited and false otherwise - WaitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) + // GetConnectionWaitTime will calculate and return the wait + // time to prevent exceeding connectionsRateLimitingCount. + // The connection should be established when the waitTime is 0. + // It will register a provisional next connection time when the waitTime is 0. + // The provisional time should be updated after the connection with UpdateConnectionTime + GetConnectionWaitTime(addr string) (addrInPhonebook bool, + waitTime time.Duration, provisionalTime time.Time) // UpdateConnectionTime will update the provisional connection time. // Returns true of the addr was in the phonebook @@ -136,16 +139,19 @@ func (e *phonebookEntries) updateRetryAfter(addr string, retryAfter time.Time) { } } -// waitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount. -// Will return true if it waited and false otherwise -func (e *phonebookEntries) waitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) { - waited = false - _, found := e.data[addr] +// getConnectionWaitTime will calculate and return the wait +// time to prevent exceeding connectionsRateLimitingCount. +// The connection should be established when the waitTime is 0. +// It will register a provisional next connection time when the waitTime is 0. +// The provisional time should be updated after the connection with UpdateConnectionTime +func (e *phonebookEntries) getConnectionWaitTime(addr string) (addrInPhonebook bool, + waitTime time.Duration, provisionalTime time.Time) { + _, addrInPhonebook = e.data[addr] curTime := time.Now() - if !found { + if !addrInPhonebook { // The addr is not in this phonebook. // Will find the addr in a different phonebook. - return found, waited, curTime + return addrInPhonebook, 0 /* not unsed */, curTime /* not unsed */ } var timeSince time.Duration @@ -153,29 +159,29 @@ func (e *phonebookEntries) waitForConnectionTime(addr string) (addrInPhonebook, // Remove from recentConnectionTimes the times later than ConnectionsRateLimitingWindowSeconds for numElmtsToRemove < len(e.data[addr].recentConnectionTimes) { timeSince = curTime.Sub((e.data[addr].recentConnectionTimes)[numElmtsToRemove]) - if timeSince > e.connectionsRateLimitingWindow { + if timeSince >= e.connectionsRateLimitingWindow { numElmtsToRemove++ } else { break // break the loop. The rest are earlier than 1 second } } + // Remove the expired elements from e.data[addr].recentConnectionTimes e.popNElements(numElmtsToRemove, addr) // If there are max number of connections within the time window, wait numElts := len(e.data[addr].recentConnectionTimes) if uint(numElts) >= e.connectionsRateLimitingCount { - // Wait until the earliest time expires - time.Sleep(e.connectionsRateLimitingWindow - timeSince) - waited = true - // Remove it from recentConnectionTimes - e.popNElements(1, addr) + return addrInPhonebook, /* true */ + (e.connectionsRateLimitingWindow - timeSince), curTime /* not unsed */ } + // Else, there is space in connectionsRateLimitingCount. The + // connection request of the caller will proceed // Update curTime, since it may have significantly changed if waited - curTime = time.Now() + provisionalTime = time.Now() // Append the provisional time for the next connection request - e.appendTime(addr, curTime) - return found, waited, curTime + e.appendTime(addr, provisionalTime) + return addrInPhonebook /* true */, 0 /* no wait. proceed */, provisionalTime } // UpdateConnectionTime will update the provisional connection time. @@ -252,11 +258,14 @@ func (p *ArrayPhonebook) UpdateRetryAfter(addr string, retryAfter time.Time) { p.Entries.updateRetryAfter(addr, retryAfter) } -// WaitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount. -// Then it will register the next connection time. -// Will return true if it waited and false otherwise -func (p *ArrayPhonebook) WaitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) { - return p.Entries.waitForConnectionTime(addr) +// GetConnectionWaitTime will calculate and return the wait +// time to prevent exceeding connectionsRateLimitingCount. +// The connection should be established when the waitTime is 0. +// It will register a provisional next connection time when the waitTime is 0. +// The provisional time should be updated after the connection with UpdateConnectionTime +func (p *ArrayPhonebook) GetConnectionWaitTime(addr string) (addrInPhonebook bool, + waitTime time.Duration, provisionalTime time.Time) { + return p.Entries.getConnectionWaitTime(addr) } // UpdateConnectionTime will update the provisional connection time. @@ -299,12 +308,16 @@ func (p *ThreadsafePhonebook) UpdateRetryAfter(addr string, retryAfter time.Time p.entries.updateRetryAfter(addr, retryAfter) } -// WaitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount. -// Will return true if it waited and false otherwise -func (p *ThreadsafePhonebook) WaitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) { +// GetConnectionWaitTime will calculate and return the wait +// time to prevent exceeding connectionsRateLimitingCount. +// The connection should be established when the waitTime is 0. +// It will register a provisional next connection time when the waitTime is 0. +// The provisional time should be updated after the connection with UpdateConnectionTime +func (p *ThreadsafePhonebook) GetConnectionWaitTime(addr string) (addrInPhonebook bool, + waitTime time.Duration, provisionalTime time.Time) { p.lock.RLock() defer p.lock.RUnlock() - return p.entries.waitForConnectionTime(addr) + return p.entries.getConnectionWaitTime(addr) } // UpdateConnectionTime will update the provisional connection time. @@ -409,22 +422,25 @@ func (mp *MultiPhonebook) UpdateRetryAfter(addr string, retryAfter time.Time) { } } -// WaitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount. -// Will return true if it waited and false otherwise -func (mp *MultiPhonebook) WaitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) { +// GetConnectionWaitTime will calculate and return the wait +// time to prevent exceeding connectionsRateLimitingCount. +// The connection should be established when the waitTime is 0. +// It will register a provisional next connection time when the waitTime is 0. +func (mp *MultiPhonebook) GetConnectionWaitTime(addr string) (addrInPhonebook bool, + waitTime time.Duration, provisionalTime time.Time) { mp.lock.Lock() defer mp.lock.Unlock() addrInPhonebook = false - waited = false for _, op := range mp.phonebookMap { // The addr will be in one of the phonebooks. // If it is not found in this phonebook, no action will be taken . - if addrInPhonebook, waited, provisionalTime = op.WaitForConnectionTime(addr); addrInPhonebook { + if addrInPhonebook, waitTime, + provisionalTime = op.GetConnectionWaitTime(addr); addrInPhonebook { // If addr is in this phonebook, no need to look for it in other phonebooks - return addrInPhonebook, waited, provisionalTime + return } } - return addrInPhonebook, waited, provisionalTime + return } // UpdateConnectionTime will update the provisional connection time. diff --git a/network/phonebook_test.go b/network/phonebook_test.go index cdf83072db..0d67be0f10 100644 --- a/network/phonebook_test.go +++ b/network/phonebook_test.go @@ -210,15 +210,16 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { addr2 := "addrXYZ" // Address not in. Should return false - addrInPhonebook, _, provisionalTime := entries.waitForConnectionTime(addr1) + addrInPhonebook, _, provisionalTime := entries.getConnectionWaitTime(addr1) require.Equal(t, false, addrInPhonebook) require.Equal(t, false, entries.updateConnectionTime(addr1, provisionalTime)) // Test the addresses are populated in the phonebook and a // time can be added to one of them entries.ReplacePeerList([]string{addr1, addr2}) - _, waited, provisionalTime := entries.waitForConnectionTime(addr1) - require.Equal(t, false, waited) + addrInPhonebook, waitTime, provisionalTime := entries.getConnectionWaitTime(addr1) + require.Equal(t, true, addrInPhonebook) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime)) phBookData := entries.data[addr1].recentConnectionTimes require.Equal(t, 1, len(phBookData)) @@ -227,8 +228,8 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { time.Sleep(100 * time.Millisecond) // add another value to addr - _, waited, provisionalTime = entries.waitForConnectionTime(addr1) - require.Equal(t, false, waited) + addrInPhonebook, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime)) phBookData = entries.data[addr1].recentConnectionTimes require.Equal(t, 2, len(phBookData)) @@ -238,8 +239,8 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { // the first time should be removed and a new one added // there should not be any wait - _, waited, provisionalTime = entries.waitForConnectionTime(addr1) - require.Equal(t, false, waited) + addrInPhonebook, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime)) phBookData2 := entries.data[addr1].recentConnectionTimes require.Equal(t, 2, len(phBookData2)) @@ -253,16 +254,20 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { // add 3 values to another address. should not wait // value 1 - _, waited, provisionalTime = entries.waitForConnectionTime(addr2) - require.Equal(t, false, waited) + _, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr2, provisionalTime)) + + // introduce a gap between the two requests so that only the first will be removed later when waited + time.Sleep(100 * time.Millisecond) + // value 2 - _, waited, provisionalTime = entries.waitForConnectionTime(addr2) - require.Equal(t, false, waited) + addrInPhonebook, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr2, provisionalTime)) // value 3 - _, waited, provisionalTime = entries.waitForConnectionTime(addr2) - require.Equal(t, false, waited) + _, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr2, provisionalTime)) phBookData = entries.data[addr2].recentConnectionTimes @@ -270,8 +275,19 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) { require.Equal(t, 3, len(phBookData)) // add another element to trigger wait - _, waited, provisionalTime = entries.waitForConnectionTime(addr2) - require.Equal(t, true, waited) + _, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2) + require.Greater(t, int64(waitTime), int64(0)) + // no element should be removed + phBookData2 = entries.data[addr2].recentConnectionTimes + require.Equal(t, phBookData[0], phBookData2[0]) + require.Equal(t, phBookData[1], phBookData2[1]) + require.Equal(t, phBookData[2], phBookData2[2]) + + time.Sleep(waitTime) + + // The wait should be sufficient + _, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr2, provisionalTime)) // only one element should be removed, and one added phBookData2 = entries.data[addr2].recentConnectionTimes @@ -291,25 +307,25 @@ func TestWaitAndAddConnectionTimeShortWindow(t *testing.T) { // add 3 values. should not wait // value 1 - addrInPhonebook, waited, provisionalTime := entries.waitForConnectionTime(addr1) + addrInPhonebook, waitTime, provisionalTime := entries.getConnectionWaitTime(addr1) require.Equal(t, true, addrInPhonebook) - require.Equal(t, false, waited) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime)) // value 2 - _, waited, provisionalTime = entries.waitForConnectionTime(addr1) - require.Equal(t, false, waited) + _, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime)) // value 3 - _, waited, provisionalTime = entries.waitForConnectionTime(addr1) - require.Equal(t, false, waited) + _, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime)) // give enough time to expire all the elements time.Sleep(10 * time.Millisecond) // there should not be any wait - _, waited, provisionalTime = entries.waitForConnectionTime(addr1) - require.Equal(t, false, waited) + _, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1) + require.Equal(t, time.Duration(0), waitTime) require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime)) // only one time should be left (the newly added) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 737490fccd..b068c45f5a 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -174,11 +174,8 @@ type GossipNode interface { // ClearHandlers deregisters all the existing message handlers. ClearHandlers() - // MakeHTTPRequest will make sure connectionsRateLimitingCount - // is not violated, and register the connection time of the - // request, before making the http request to the server. - MakeHTTPRequest(client *http.Client, - request *http.Request) (*http.Response, error) + // GetRoundTripper returns a Transport that would limit the number of outgoing connections. + GetRoundTripper() http.RoundTripper } // IncomingMessage represents a message arriving from some peer in our p2p network @@ -326,6 +323,11 @@ type WebsocketNetwork struct { // lastPeerConnectionsSent is the last time the peer connections were sent ( or attempted to be sent ) to the telemetry server. lastPeerConnectionsSent time.Time + + // transport and dialer are customized to limit the number of + // connection in compliance with connectionsRateLimitingCount. + transport http.Transport + dialer Dialer } type broadcastRequest struct { @@ -505,7 +507,8 @@ func (wn *WebsocketNetwork) GetPeers(options ...PeerOption) []Peer { var addrs []string addrs = wn.phonebook.GetAddresses(1000) for _, addr := range addrs { - outPeers = append(outPeers, &wsPeerCore{net: wn, rootURL: addr}) + peerCore := makePeerCore(wn, addr, wn.GetRoundTripper(), "" /*origin address*/) + outPeers = append(outPeers, &peerCore) } case PeersConnectedIn: wn.peersLock.RLock() @@ -522,6 +525,21 @@ func (wn *WebsocketNetwork) GetPeers(options ...PeerOption) []Peer { func (wn *WebsocketNetwork) setup() { + wn.dialer.phonebook = wn.phonebook + // Parameter values similar to http.DefaultTransport + wn.dialer.innerDialer.Timeout = 30 * time.Second + wn.dialer.innerDialer.Timeout = 30 * time.Second + wn.dialer.innerDialer.KeepAlive = 30 * time.Second + wn.dialer.innerDialer.DualStack = true + + // Parameter values similar to http.DefaultTransport + wn.transport.DialContext = wn.dialer.DialContext + wn.transport.Dial = wn.dialer.Dial + wn.transport.MaxIdleConns = 100 + wn.transport.IdleConnTimeout = 90 * time.Second + wn.transport.TLSHandshakeTimeout = 10 * time.Second + wn.transport.ExpectContinueTimeout = 1 * time.Second + wn.upgrader.ReadBufferSize = 4096 wn.upgrader.WriteBufferSize = 4096 wn.upgrader.EnableCompression = false @@ -778,17 +796,6 @@ func (wn *WebsocketNetwork) checkServerResponseVariables(header http.Header, add return true } -// MakeHTTPRequest will make sure connectionsRateLimitingCount is not -// violated, and register the connection time of the request, before -// making the http request to the server. -func (wn *WebsocketNetwork) MakeHTTPRequest(client *http.Client, - request *http.Request) (resp *http.Response, err error) { - _, _, provisionalTime := wn.phonebook.WaitForConnectionTime(request.Host) - resp, err = client.Do(request) - wn.phonebook.UpdateConnectionTime(request.Host, provisionalTime) - return resp, err -} - // getCommonHeaders retreives the common headers for both incoming and outgoing connections from the provided headers. func getCommonHeaders(headers http.Header) (otherTelemetryGUID, otherInstanceName, otherPublicAddr string) { otherTelemetryGUID = logging.SanitizeTelemetryString(headers.Get(TelemetryIDHeader), 1) @@ -933,11 +940,7 @@ func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *htt } peer := &wsPeer{ - wsPeerCore: wsPeerCore{ - net: wn, - rootURL: trackedRequest.otherPublicAddr, - originAddress: trackedRequest.remoteHost, - }, + wsPeerCore: makePeerCore(wn, trackedRequest.otherPublicAddr, wn.GetRoundTripper(), trackedRequest.remoteHost), conn: conn, outgoing: false, InstanceName: trackedRequest.otherInstanceName, @@ -1544,10 +1547,10 @@ func (wn *WebsocketNetwork) numOutgoingPending() int { return len(wn.tryConnectAddrs) } -var websocketDialer = websocket.Dialer{ - Proxy: http.ProxyFromEnvironment, - HandshakeTimeout: 45 * time.Second, - EnableCompression: false, +// GetRoundTripper returns an http.Transport that limits the number of connection +// to comply with connectionsRateLimitingCount. +func (wn *WebsocketNetwork) GetRoundTripper() http.RoundTripper { + return &wn.transport } // tryConnect opens websocket connection and checks initial connection parameters. @@ -1565,6 +1568,14 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { SetUserAgentHeader(requestHeader) myInstanceName := wn.log.GetInstanceName() requestHeader.Set(InstanceNameHeader, myInstanceName) + var websocketDialer = websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 45 * time.Second, + EnableCompression: false, + NetDialContext: wn.dialer.DialContext, + NetDial: wn.dialer.Dial, + } + conn, response, err := websocketDialer.DialContext(wn.ctx, gossipAddr, requestHeader) if err != nil { if err == websocket.ErrBadHandshake { @@ -1609,7 +1620,12 @@ func (wn *WebsocketNetwork) tryConnect(addr, gossipAddr string) { return } - peer := &wsPeer{wsPeerCore: wsPeerCore{net: wn, rootURL: addr}, conn: conn, outgoing: true, incomingMsgFilter: wn.incomingMsgFilter, createTime: time.Now()} + peer := &wsPeer{ + wsPeerCore: makePeerCore(wn, addr, wn.GetRoundTripper(), "" /* origin */), + conn: conn, + outgoing: true, + incomingMsgFilter: wn.incomingMsgFilter, + createTime: time.Now()} peer.TelemetryGUID, peer.InstanceName, _ = getCommonHeaders(response.Header) peer.init(wn.config, wn.outgoingMessagesBufferSize) wn.addPeer(peer) diff --git a/network/wsNetwork_test.go b/network/wsNetwork_test.go index 5c41c70bc4..a86ebd5f41 100644 --- a/network/wsNetwork_test.go +++ b/network/wsNetwork_test.go @@ -87,10 +87,10 @@ func (e *oneEntryPhonebook) UpdateRetryAfter(addr string, retryAfter time.Time) } } -func (e *oneEntryPhonebook) WaitForConnectionTime(addr string) (addrInPhonebook, - waited bool, provisionalTime time.Time) { +func (e *oneEntryPhonebook) GetConnectionWaitTime(addr string) (addrInPhonebook bool, + waitTime time.Duration, provisionalTime time.Time) { var t time.Time - return false, false, t + return false, 0, t } func (e *oneEntryPhonebook) UpdateConnectionTime(addr string, t time.Time) bool { diff --git a/network/wsPeer.go b/network/wsPeer.go index 379f02198e..fad5e0df39 100644 --- a/network/wsPeer.go +++ b/network/wsPeer.go @@ -80,7 +80,7 @@ type sendMessage struct { type wsPeerCore struct { net *WebsocketNetwork rootURL string - originAddress string + originAddress string // incoming connection remote host client http.Client } @@ -169,6 +169,16 @@ type UnicastPeer interface { Unicast(ctx context.Context, data []byte, tag protocol.Tag) error } +// Create a wsPeerCore object +func makePeerCore(net *WebsocketNetwork, rootURL string, roundTripper http.RoundTripper, originAddress string) wsPeerCore { + return wsPeerCore{ + net: net, + rootURL: rootURL, + originAddress: originAddress, + client: http.Client{Transport: roundTripper}, + } +} + // GetAddress returns the root url to use to connect to this peer. // TODO: should GetAddress be added to Peer interface? func (wp *wsPeerCore) GetAddress() string { diff --git a/rpcs/httpFetcher.go b/rpcs/httpFetcher.go index 372d948835..f66aa88fae 100644 --- a/rpcs/httpFetcher.go +++ b/rpcs/httpFetcher.go @@ -74,7 +74,7 @@ func (hf *HTTPFetcher) GetBlockBytes(ctx context.Context, r basics.Round) (data } request = request.WithContext(ctx) network.SetUserAgentHeader(request.Header) - response, err := hf.net.MakeHTTPRequest(hf.client, request) + response, err := hf.client.Do(request) if err != nil { hf.log.Debugf("GET %#v : %s", blockURL, err) return nil, err diff --git a/rpcs/httpTxSync.go b/rpcs/httpTxSync.go index 62744e4589..42d1647c70 100644 --- a/rpcs/httpTxSync.go +++ b/rpcs/httpTxSync.go @@ -103,7 +103,8 @@ func (hts *HTTPTxSync) Sync(ctx context.Context, bloom *bloom.Filter) (txgroups hts.rootURL = hpeer.GetAddress() client := hpeer.GetHTTPClient() if client == nil { - client = http.DefaultClient + client = &http.Client{} + client.Transport = hts.peers.GetRoundTripper() } parsedURL, err := network.ParseHostOrURL(hts.rootURL) if err != nil { @@ -123,7 +124,7 @@ func (hts *HTTPTxSync) Sync(ctx context.Context, bloom *bloom.Filter) (txgroups request.Header.Set("Content-Type", requestContentType) network.SetUserAgentHeader(request.Header) request = request.WithContext(ctx) - response, err := hts.peers.MakeHTTPRequest(client, request) + response, err := client.Do(request) if err != nil { hts.log.Warnf("txSync POST %v: %s", syncURL, err) return nil, err