Skip to content

Commit

Permalink
Tunnel outgoing connection via a rate limiting dialer (algorand#780)
Browse files Browse the repository at this point in the history
* changes.

* adding dialer.

* DRAFT: using channel to offload the mutex.

* Taking care of the lock triggering deadlock detection.

* cleaning unnecessary changes.

* cleaning unnecessary changes.

* minor fixes

* GetNetTransport modifying returning copy of the http.Transport

* workaround to avoid the race detection trigger.

* Testing a different approach to ovrride the Dial/DialContext by embedding the default transport into another object instead of changing the default transport.

* Adding RateLimitedTransport to wrap around the http.Transport

* fixing lint

* Separating Dialer from Transport, initializing the Dialer and Transport params (timeout, etc)

* changes.

* Integrating changes from Tsachi + cleanups.

* fixing build failure.

* fixing build failure.

* Addressing Pavel's comments.

* changes.

* adding dialer.

* DRAFT: using channel to offload the mutex.

* Taking care of the lock triggering deadlock detection.

* cleaning unnecessary changes.

* cleaning unnecessary changes.

* minor fixes

* GetNetTransport modifying returning copy of the http.Transport

* workaround to avoid the race detection trigger.

* Testing a different approach to ovrride the Dial/DialContext by embedding the default transport into another object instead of changing the default transport.

* Adding RateLimitedTransport to wrap around the http.Transport

* fixing lint

* Separating Dialer from Transport, initializing the Dialer and Transport params (timeout, etc)

* changes.

* Integrating changes from Tsachi + cleanups.

* fixing build failure.

* fixing build failure.

* Addressing Pavel's comments.

* changes.

* adding dialer.

* DRAFT: using channel to offload the mutex.

* Taking care of the lock triggering deadlock detection.

* cleaning unnecessary changes.

* cleaning unnecessary changes.

* minor fixes

* GetNetTransport modifying returning copy of the http.Transport

* workaround to avoid the race detection trigger.

* Testing a different approach to ovrride the Dial/DialContext by embedding the default transport into another object instead of changing the default transport.

* Adding RateLimitedTransport to wrap around the http.Transport

* fixing lint

* Separating Dialer from Transport, initializing the Dialer and Transport params (timeout, etc)

* changes.

* Integrating changes from Tsachi + cleanups.

* fixing build failure.

* fixing build failure.

* Allow asset creation transactions to be created while catching up. (algorand#790)

* Addressing Pavel's comments.

* changes.

* adding dialer.

* DRAFT: using channel to offload the mutex.

* Taking care of the lock triggering deadlock detection.

* cleaning unnecessary changes.

* cleaning unnecessary changes.

* minor fixes

* GetNetTransport modifying returning copy of the http.Transport

* workaround to avoid the race detection trigger.

* Testing a different approach to ovrride the Dial/DialContext by embedding the default transport into another object instead of changing the default transport.

* Adding RateLimitedTransport to wrap around the http.Transport

* fixing lint

* Separating Dialer from Transport, initializing the Dialer and Transport params (timeout, etc)

* changes.

* Integrating changes from Tsachi + cleanups.

* fixing build failure.

* fixing build failure.

* Addressing Pavel's comments.

* rebasing master

Co-authored-by: Tsachi Herman <tsachi.herman@algorand.com>
Co-authored-by: Will Winder <wwinder.unh@gmail.com>
  • Loading branch information
3 people authored Feb 7, 2020
1 parent eb090a0 commit e523a9a
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 99 deletions.
11 changes: 5 additions & 6 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (network *MockNetwork) GetPeers(options ...network.PeerOption) []network.Pe
return nil
}

// GetRoundTripper -- returns the network round tripper
func (network *MockNetwork) GetRoundTripper() http.RoundTripper {
return http.DefaultTransport
}

// Ready - always ready
func (network *MockNetwork) Ready() chan struct{} {
c := make(chan struct{})
Expand All @@ -91,9 +96,3 @@ func (network *MockNetwork) ClearHandlers() {
// RegisterHTTPHandler - empty implementation
func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}

// MakeHTTPRequest - basic client.Do request
func (network *MockNetwork) MakeHTTPRequest(client *http.Client,
request *http.Request) (*http.Response, error) {
return client.Do(request)
}
58 changes: 58 additions & 0 deletions network/dialer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (C) 2019-2020 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
"context"
"net"
"time"
)

// Dialer establish tcp-level connection with the destination
type Dialer struct {
phonebook *MultiPhonebook
innerDialer net.Dialer
}

// Dial connects to the address on the named network.
// It waits if needed not to exceed connectionsRateLimitingCount.
func (d *Dialer) Dial(network, address string) (net.Conn, error) {
return d.DialContext(context.Background(), network, address)
}

// DialContext connects to the address on the named network using the provided context.
// It waits if needed not to exceed connectionsRateLimitingCount.
func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) {
var waitTime time.Duration
var provisionalTime time.Time

for {
_, waitTime, provisionalTime = d.phonebook.GetConnectionWaitTime(address)
if waitTime == 0 {
break // break out of the loop and proceed to the connection
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(waitTime):
}
}
conn, err := d.innerDialer.DialContext(ctx, network, address)
d.phonebook.UpdateConnectionTime(address, provisionalTime)

return conn, err
}
88 changes: 52 additions & 36 deletions network/phonebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ type Phonebook interface {
// UpdateRetryAfter updates the retry-after field for the entries matching the given address
UpdateRetryAfter(addr string, retryAfter time.Time)

// WaitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount.
// Then it will register a provisional next connection time.
// Will return true if it waited and false otherwise
WaitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time)
// GetConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
// The provisional time should be updated after the connection with UpdateConnectionTime
GetConnectionWaitTime(addr string) (addrInPhonebook bool,
waitTime time.Duration, provisionalTime time.Time)

// UpdateConnectionTime will update the provisional connection time.
// Returns true of the addr was in the phonebook
Expand Down Expand Up @@ -136,46 +139,49 @@ func (e *phonebookEntries) updateRetryAfter(addr string, retryAfter time.Time) {
}
}

// waitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount.
// Will return true if it waited and false otherwise
func (e *phonebookEntries) waitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) {
waited = false
_, found := e.data[addr]
// getConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
// The provisional time should be updated after the connection with UpdateConnectionTime
func (e *phonebookEntries) getConnectionWaitTime(addr string) (addrInPhonebook bool,
waitTime time.Duration, provisionalTime time.Time) {
_, addrInPhonebook = e.data[addr]
curTime := time.Now()
if !found {
if !addrInPhonebook {
// The addr is not in this phonebook.
// Will find the addr in a different phonebook.
return found, waited, curTime
return addrInPhonebook, 0 /* not unsed */, curTime /* not unsed */
}

var timeSince time.Duration
var numElmtsToRemove int
// Remove from recentConnectionTimes the times later than ConnectionsRateLimitingWindowSeconds
for numElmtsToRemove < len(e.data[addr].recentConnectionTimes) {
timeSince = curTime.Sub((e.data[addr].recentConnectionTimes)[numElmtsToRemove])
if timeSince > e.connectionsRateLimitingWindow {
if timeSince >= e.connectionsRateLimitingWindow {
numElmtsToRemove++
} else {
break // break the loop. The rest are earlier than 1 second
}
}
// Remove the expired elements from e.data[addr].recentConnectionTimes
e.popNElements(numElmtsToRemove, addr)

// If there are max number of connections within the time window, wait
numElts := len(e.data[addr].recentConnectionTimes)
if uint(numElts) >= e.connectionsRateLimitingCount {
// Wait until the earliest time expires
time.Sleep(e.connectionsRateLimitingWindow - timeSince)
waited = true
// Remove it from recentConnectionTimes
e.popNElements(1, addr)
return addrInPhonebook, /* true */
(e.connectionsRateLimitingWindow - timeSince), curTime /* not unsed */
}

// Else, there is space in connectionsRateLimitingCount. The
// connection request of the caller will proceed
// Update curTime, since it may have significantly changed if waited
curTime = time.Now()
provisionalTime = time.Now()
// Append the provisional time for the next connection request
e.appendTime(addr, curTime)
return found, waited, curTime
e.appendTime(addr, provisionalTime)
return addrInPhonebook /* true */, 0 /* no wait. proceed */, provisionalTime
}

// UpdateConnectionTime will update the provisional connection time.
Expand Down Expand Up @@ -252,11 +258,14 @@ func (p *ArrayPhonebook) UpdateRetryAfter(addr string, retryAfter time.Time) {
p.Entries.updateRetryAfter(addr, retryAfter)
}

// WaitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount.
// Then it will register the next connection time.
// Will return true if it waited and false otherwise
func (p *ArrayPhonebook) WaitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) {
return p.Entries.waitForConnectionTime(addr)
// GetConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
// The provisional time should be updated after the connection with UpdateConnectionTime
func (p *ArrayPhonebook) GetConnectionWaitTime(addr string) (addrInPhonebook bool,
waitTime time.Duration, provisionalTime time.Time) {
return p.Entries.getConnectionWaitTime(addr)
}

// UpdateConnectionTime will update the provisional connection time.
Expand Down Expand Up @@ -299,12 +308,16 @@ func (p *ThreadsafePhonebook) UpdateRetryAfter(addr string, retryAfter time.Time
p.entries.updateRetryAfter(addr, retryAfter)
}

// WaitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount.
// Will return true if it waited and false otherwise
func (p *ThreadsafePhonebook) WaitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) {
// GetConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
// The provisional time should be updated after the connection with UpdateConnectionTime
func (p *ThreadsafePhonebook) GetConnectionWaitTime(addr string) (addrInPhonebook bool,
waitTime time.Duration, provisionalTime time.Time) {
p.lock.RLock()
defer p.lock.RUnlock()
return p.entries.waitForConnectionTime(addr)
return p.entries.getConnectionWaitTime(addr)
}

// UpdateConnectionTime will update the provisional connection time.
Expand Down Expand Up @@ -409,22 +422,25 @@ func (mp *MultiPhonebook) UpdateRetryAfter(addr string, retryAfter time.Time) {
}
}

// WaitForConnectionTime will wait to prevent exceeding connectionsRateLimitingCount.
// Will return true if it waited and false otherwise
func (mp *MultiPhonebook) WaitForConnectionTime(addr string) (addrInPhonebook, waited bool, provisionalTime time.Time) {
// GetConnectionWaitTime will calculate and return the wait
// time to prevent exceeding connectionsRateLimitingCount.
// The connection should be established when the waitTime is 0.
// It will register a provisional next connection time when the waitTime is 0.
func (mp *MultiPhonebook) GetConnectionWaitTime(addr string) (addrInPhonebook bool,
waitTime time.Duration, provisionalTime time.Time) {
mp.lock.Lock()
defer mp.lock.Unlock()
addrInPhonebook = false
waited = false
for _, op := range mp.phonebookMap {
// The addr will be in one of the phonebooks.
// If it is not found in this phonebook, no action will be taken .
if addrInPhonebook, waited, provisionalTime = op.WaitForConnectionTime(addr); addrInPhonebook {
if addrInPhonebook, waitTime,
provisionalTime = op.GetConnectionWaitTime(addr); addrInPhonebook {
// If addr is in this phonebook, no need to look for it in other phonebooks
return addrInPhonebook, waited, provisionalTime
return
}
}
return addrInPhonebook, waited, provisionalTime
return
}

// UpdateConnectionTime will update the provisional connection time.
Expand Down
62 changes: 39 additions & 23 deletions network/phonebook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,16 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) {
addr2 := "addrXYZ"

// Address not in. Should return false
addrInPhonebook, _, provisionalTime := entries.waitForConnectionTime(addr1)
addrInPhonebook, _, provisionalTime := entries.getConnectionWaitTime(addr1)
require.Equal(t, false, addrInPhonebook)
require.Equal(t, false, entries.updateConnectionTime(addr1, provisionalTime))

// Test the addresses are populated in the phonebook and a
// time can be added to one of them
entries.ReplacePeerList([]string{addr1, addr2})
_, waited, provisionalTime := entries.waitForConnectionTime(addr1)
require.Equal(t, false, waited)
addrInPhonebook, waitTime, provisionalTime := entries.getConnectionWaitTime(addr1)
require.Equal(t, true, addrInPhonebook)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime))
phBookData := entries.data[addr1].recentConnectionTimes
require.Equal(t, 1, len(phBookData))
Expand All @@ -227,8 +228,8 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) {
time.Sleep(100 * time.Millisecond)

// add another value to addr
_, waited, provisionalTime = entries.waitForConnectionTime(addr1)
require.Equal(t, false, waited)
addrInPhonebook, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime))
phBookData = entries.data[addr1].recentConnectionTimes
require.Equal(t, 2, len(phBookData))
Expand All @@ -238,8 +239,8 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) {

// the first time should be removed and a new one added
// there should not be any wait
_, waited, provisionalTime = entries.waitForConnectionTime(addr1)
require.Equal(t, false, waited)
addrInPhonebook, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime))
phBookData2 := entries.data[addr1].recentConnectionTimes
require.Equal(t, 2, len(phBookData2))
Expand All @@ -253,25 +254,40 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) {

// add 3 values to another address. should not wait
// value 1
_, waited, provisionalTime = entries.waitForConnectionTime(addr2)
require.Equal(t, false, waited)
_, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr2, provisionalTime))

// introduce a gap between the two requests so that only the first will be removed later when waited
time.Sleep(100 * time.Millisecond)

// value 2
_, waited, provisionalTime = entries.waitForConnectionTime(addr2)
require.Equal(t, false, waited)
addrInPhonebook, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr2, provisionalTime))
// value 3
_, waited, provisionalTime = entries.waitForConnectionTime(addr2)
require.Equal(t, false, waited)
_, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr2, provisionalTime))

phBookData = entries.data[addr2].recentConnectionTimes
// all three times should be queued
require.Equal(t, 3, len(phBookData))

// add another element to trigger wait
_, waited, provisionalTime = entries.waitForConnectionTime(addr2)
require.Equal(t, true, waited)
_, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2)
require.Greater(t, int64(waitTime), int64(0))
// no element should be removed
phBookData2 = entries.data[addr2].recentConnectionTimes
require.Equal(t, phBookData[0], phBookData2[0])
require.Equal(t, phBookData[1], phBookData2[1])
require.Equal(t, phBookData[2], phBookData2[2])

time.Sleep(waitTime)

// The wait should be sufficient
_, waitTime, provisionalTime = entries.getConnectionWaitTime(addr2)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr2, provisionalTime))
// only one element should be removed, and one added
phBookData2 = entries.data[addr2].recentConnectionTimes
Expand All @@ -291,25 +307,25 @@ func TestWaitAndAddConnectionTimeShortWindow(t *testing.T) {

// add 3 values. should not wait
// value 1
addrInPhonebook, waited, provisionalTime := entries.waitForConnectionTime(addr1)
addrInPhonebook, waitTime, provisionalTime := entries.getConnectionWaitTime(addr1)
require.Equal(t, true, addrInPhonebook)
require.Equal(t, false, waited)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime))
// value 2
_, waited, provisionalTime = entries.waitForConnectionTime(addr1)
require.Equal(t, false, waited)
_, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime))
// value 3
_, waited, provisionalTime = entries.waitForConnectionTime(addr1)
require.Equal(t, false, waited)
_, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime))

// give enough time to expire all the elements
time.Sleep(10 * time.Millisecond)

// there should not be any wait
_, waited, provisionalTime = entries.waitForConnectionTime(addr1)
require.Equal(t, false, waited)
_, waitTime, provisionalTime = entries.getConnectionWaitTime(addr1)
require.Equal(t, time.Duration(0), waitTime)
require.Equal(t, true, entries.updateConnectionTime(addr1, provisionalTime))

// only one time should be left (the newly added)
Expand Down
Loading

0 comments on commit e523a9a

Please sign in to comment.