Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: allow multi-role phonebook entries #6131

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ func TestLocal_ValidateP2PHybridConfig(t *testing.T) {
NetAddress: test.netAddress,
}
err := c.ValidateP2PHybridConfig()
require.Equal(t, test.err, err != nil, "test=%d", i)
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, test.err, err != nil)
})
}
}
Expand Down
30 changes: 21 additions & 9 deletions network/p2p/peerstore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type addressData struct {
mu *deadlock.RWMutex

// role is the role that this address serves.
role phonebook.PhoneBookEntryRoles
role phonebook.Roles

// persistent is set true for peers whose record should not be removed for the peer list
persistent bool
Expand Down Expand Up @@ -97,7 +97,7 @@ func MakePhonebook(connectionsRateLimitingCount uint,
}

// GetAddresses returns up to N addresses, but may return fewer
func (ps *PeerStore) GetAddresses(n int, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo {
func (ps *PeerStore) GetAddresses(n int, role phonebook.Roles) []*peer.AddrInfo {
return shuffleSelect(ps.filterRetryTime(time.Now(), role), n)
}

Expand Down Expand Up @@ -205,19 +205,29 @@ func (ps *PeerStore) UpdateConnectionTime(addrOrPeerID string, provisionalTime t
}

// ReplacePeerList replaces the peer list for the given networkName and role.
func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName string, role phonebook.PhoneBookEntryRoles) {
func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName string, role phonebook.Roles) {
// prepare a map of items we'd like to remove.
removeItems := make(map[peer.ID]bool, 0)
peerIDs := ps.Peers()
for _, pid := range peerIDs {
data, _ := ps.Get(pid, addressDataKey)
if data != nil {
ad := data.(addressData)
updated := false
ad.mu.RLock()
if ad.networkNames[networkName] && ad.role == role && !ad.persistent {
removeItems[pid] = true
if ad.networkNames[networkName] && !ad.persistent {
if ad.role.Is(role) {
removeItems[pid] = true
} else if ad.role.Has(role) {
ad.role.Remove(role)
updated = true
}
}
ad.mu.RUnlock()

if updated {
_ = ps.Put(pid, addressDataKey, ad)
}
}

}
Expand All @@ -229,7 +239,9 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName
ad := data.(addressData)
ad.mu.Lock()
ad.networkNames[networkName] = true
ad.role.Assign(role)
ad.mu.Unlock()
_ = ps.Put(info.ID, addressDataKey, ad)

// do not remove this entry
delete(removeItems, info.ID)
Expand All @@ -249,7 +261,7 @@ func (ps *PeerStore) ReplacePeerList(addressesThey []*peer.AddrInfo, networkName

// AddPersistentPeers stores addresses of peers which are persistent.
// i.e. they won't be replaced by ReplacePeerList calls
func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName string, role phonebook.PhoneBookEntryRoles) {
func (ps *PeerStore) AddPersistentPeers(addrInfo []*peer.AddrInfo, networkName string, role phonebook.Roles) {
for _, info := range addrInfo {
data, _ := ps.Get(info.ID, addressDataKey)
if data != nil {
Expand All @@ -273,7 +285,7 @@ func (ps *PeerStore) Length() int {
}

// makePhonebookEntryData creates a new address entry for provided network name and role.
func makePhonebookEntryData(networkName string, role phonebook.PhoneBookEntryRoles, persistent bool) addressData {
func makePhonebookEntryData(networkName string, role phonebook.Roles, persistent bool) addressData {
pbData := addressData{
networkNames: make(map[string]bool),
mu: &deadlock.RWMutex{},
Expand Down Expand Up @@ -320,13 +332,13 @@ func (ps *PeerStore) popNElements(n int, peerID peer.ID) {
_ = ps.Put(peerID, addressDataKey, ad)
}

func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.PhoneBookEntryRoles) []*peer.AddrInfo {
func (ps *PeerStore) filterRetryTime(t time.Time, role phonebook.Roles) []*peer.AddrInfo {
o := make([]*peer.AddrInfo, 0, len(ps.Peers()))
for _, peerID := range ps.Peers() {
data, _ := ps.Get(peerID, addressDataKey)
if data != nil {
ad := data.(addressData)
if t.After(ad.retryAfter) && role == ad.role {
if t.After(ad.retryAfter) && ad.role.Has(role) {
mas := ps.Addrs(peerID)
info := peer.AddrInfo{ID: peerID, Addrs: mas}
o = append(o, &info)
Expand Down
169 changes: 142 additions & 27 deletions network/p2p/peerstore/peerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,6 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"
)

// PhoneBookEntryRelayRole used for all the relays that are provided either via the algobootstrap SRV record
// or via a configuration file.
const PhoneBookEntryRelayRole = 1

// PhoneBookEntryArchiverRole used for all the archivers that are provided via the archive SRV record.
const PhoneBookEntryArchiverRole = 2

func TestPeerstore(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()
Expand Down Expand Up @@ -90,7 +83,7 @@ func TestPeerstore(t *testing.T) {
}

func testPhonebookAll(t *testing.T, set []*peer.AddrInfo, ph *PeerStore) {
actual := ph.GetAddresses(len(set), PhoneBookEntryRelayRole)
actual := ph.GetAddresses(len(set), phonebook.PhoneBookEntryRelayRole)
for _, info := range actual {
ok := false
for _, known := range set {
Expand Down Expand Up @@ -125,7 +118,7 @@ func testPhonebookUniform(t *testing.T, set []*peer.AddrInfo, ph *PeerStore, get
counts[set[i].ID.String()] = 0
}
for i := 0; i < uniformityTestLength; i++ {
actual := ph.GetAddresses(getsize, PhoneBookEntryRelayRole)
actual := ph.GetAddresses(getsize, phonebook.PhoneBookEntryRelayRole)
for _, info := range actual {
if _, ok := counts[info.ID.String()]; ok {
counts[info.ID.String()]++
Expand Down Expand Up @@ -161,7 +154,7 @@ func TestArrayPhonebookAll(t *testing.T) {
ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
for _, addr := range set {
entry := makePhonebookEntryData("", PhoneBookEntryRelayRole, false)
entry := makePhonebookEntryData("", phonebook.PhoneBookEntryRelayRole, false)
info, _ := peerInfoFromDomainPort(addr)
ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
ph.Put(info.ID, addressDataKey, entry)
Expand All @@ -183,7 +176,7 @@ func TestArrayPhonebookUniform1(t *testing.T) {
ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
for _, addr := range set {
entry := makePhonebookEntryData("", PhoneBookEntryRelayRole, false)
entry := makePhonebookEntryData("", phonebook.PhoneBookEntryRelayRole, false)
info, _ := peerInfoFromDomainPort(addr)
ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
ph.Put(info.ID, addressDataKey, entry)
Expand All @@ -205,7 +198,7 @@ func TestArrayPhonebookUniform3(t *testing.T) {
ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
for _, addr := range set {
entry := makePhonebookEntryData("", PhoneBookEntryRelayRole, false)
entry := makePhonebookEntryData("", phonebook.PhoneBookEntryRelayRole, false)
info, _ := peerInfoFromDomainPort(addr)
ph.AddAddrs(info.ID, info.Addrs, libp2p.AddressTTL)
ph.Put(info.ID, addressDataKey, entry)
Expand Down Expand Up @@ -234,8 +227,8 @@ func TestMultiPhonebook(t *testing.T) {

ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole)
ph.ReplacePeerList(pha, "pha", phonebook.PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", phonebook.PhoneBookEntryRelayRole)

testPhonebookAll(t, infoSet, ph)
testPhonebookUniform(t, infoSet, ph, 1)
Expand Down Expand Up @@ -268,13 +261,13 @@ func TestMultiPhonebookPersistentPeers(t *testing.T) {
}
ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
ph.AddPersistentPeers(persistentPeers, "pha", PhoneBookEntryRelayRole)
ph.AddPersistentPeers(persistentPeers, "phb", PhoneBookEntryRelayRole)
ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole)
ph.AddPersistentPeers(persistentPeers, "pha", phonebook.PhoneBookEntryRelayRole)
ph.AddPersistentPeers(persistentPeers, "phb", phonebook.PhoneBookEntryRelayRole)
ph.ReplacePeerList(pha, "pha", phonebook.PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", phonebook.PhoneBookEntryRelayRole)

testPhonebookAll(t, append(infoSet, info), ph)
allAddresses := ph.GetAddresses(len(set)+len(persistentPeers), PhoneBookEntryRelayRole)
allAddresses := ph.GetAddresses(len(set)+len(persistentPeers), phonebook.PhoneBookEntryRelayRole)
for _, pp := range persistentPeers {
found := false
for _, addr := range allAddresses {
Expand Down Expand Up @@ -308,8 +301,8 @@ func TestMultiPhonebookDuplicateFiltering(t *testing.T) {
}
ph, err := MakePhonebook(1, 1*time.Millisecond)
require.NoError(t, err)
ph.ReplacePeerList(pha, "pha", PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", PhoneBookEntryRelayRole)
ph.ReplacePeerList(pha, "pha", phonebook.PhoneBookEntryRelayRole)
ph.ReplacePeerList(phb, "phb", phonebook.PhoneBookEntryRelayRole)

testPhonebookAll(t, infoSet, ph)
testPhonebookUniform(t, infoSet, ph, 1)
Expand Down Expand Up @@ -338,7 +331,7 @@ func TestWaitAndAddConnectionTimeLongtWindow(t *testing.T) {

// Test the addresses are populated in the phonebook and a
// time can be added to one of them
entries.ReplacePeerList([]*peer.AddrInfo{info1, info2}, "default", PhoneBookEntryRelayRole)
entries.ReplacePeerList([]*peer.AddrInfo{info1, info2}, "default", phonebook.PhoneBookEntryRelayRole)
addrInPhonebook, waitTime, provisionalTime := entries.GetConnectionWaitTime(string(info1.ID))
require.Equal(t, true, addrInPhonebook)
require.Equal(t, time.Duration(0), waitTime)
Expand Down Expand Up @@ -469,20 +462,20 @@ func TestPhonebookRoles(t *testing.T) {

ph, err := MakePhonebook(1, 1)
require.NoError(t, err)
ph.ReplacePeerList(infoRelaySet, "default", PhoneBookEntryRelayRole)
ph.ReplacePeerList(infoArchiverSet, "default", PhoneBookEntryArchiverRole)
ph.ReplacePeerList(infoRelaySet, "default", phonebook.PhoneBookEntryRelayRole)
ph.ReplacePeerList(infoArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole)
require.Equal(t, len(relaysSet)+len(archiverSet), len(ph.Peers()))
require.Equal(t, len(relaysSet)+len(archiverSet), ph.Length())

for _, role := range []phonebook.PhoneBookEntryRoles{PhoneBookEntryRelayRole, PhoneBookEntryArchiverRole} {
for _, role := range []phonebook.Roles{phonebook.PhoneBookEntryRelayRole, phonebook.PhoneBookEntryArchivalRole} {
for k := 0; k < 100; k++ {
for l := 0; l < 3; l++ {
entries := ph.GetAddresses(l, role)
if role == PhoneBookEntryRelayRole {
if role.Has(phonebook.PhoneBookEntryRelayRole) {
for _, entry := range entries {
require.Contains(t, string(entry.ID), "relay")
}
} else if role == PhoneBookEntryArchiverRole {
} else if role.Has(phonebook.PhoneBookEntryArchivalRole) {
for _, entry := range entries {
require.Contains(t, string(entry.ID), "archiver")
}
Expand All @@ -491,3 +484,125 @@ func TestPhonebookRoles(t *testing.T) {
}
}
}

// TestPhonebookRolesMulti makes sure the same host might have multiple roles
func TestPhonebookRolesMulti(t *testing.T) {
partitiontest.PartitionTest(t)

relaysSet := []string{"relay1:4040", "relay2:4041"}
archiverSet := []string{"relay1:4040", "archiver1:1111"}
const numUnique = 3

infoRelaySet := make([]*peer.AddrInfo, 0)
for _, addr := range relaysSet {
info, err := peerInfoFromDomainPort(addr)
require.NoError(t, err)
infoRelaySet = append(infoRelaySet, info)
}

infoArchiverSet := make([]*peer.AddrInfo, 0)
for _, addr := range archiverSet {
info, err := peerInfoFromDomainPort(addr)
require.NoError(t, err)
infoArchiverSet = append(infoArchiverSet, info)
}

ph, err := MakePhonebook(1, 1)
require.NoError(t, err)
ph.ReplacePeerList(infoRelaySet, "default", phonebook.PhoneBookEntryRelayRole)
ph.ReplacePeerList(infoArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole)
require.Equal(t, numUnique, len(ph.Peers()))
require.Equal(t, numUnique, ph.Length())

const maxPeers = 5
entries := ph.GetAddresses(maxPeers, phonebook.PhoneBookEntryRelayRole)
require.Equal(t, len(relaysSet), len(entries))
entries = ph.GetAddresses(maxPeers, phonebook.PhoneBookEntryArchivalRole)
require.Equal(t, len(archiverSet), len(entries))
}

func TestReplacePeerList(t *testing.T) {
partitiontest.PartitionTest(t)
t.Parallel()

relaysSet := []string{"a:1", "b:2"}
archiverSet := []string{"c:3"}
comboSet := []string{"b:2", "c:3"} // b is in both sets

infoRelaySet := make([]*peer.AddrInfo, 0)
for _, addr := range relaysSet {
info, err := peerInfoFromDomainPort(addr)
require.NoError(t, err)
infoRelaySet = append(infoRelaySet, info)
}

infoArchiverSet := make([]*peer.AddrInfo, 0)
for _, addr := range archiverSet {
info, err := peerInfoFromDomainPort(addr)
require.NoError(t, err)
infoArchiverSet = append(infoArchiverSet, info)
}

infoComboArchiverSet := make([]*peer.AddrInfo, 0)
for _, addr := range comboSet {
info, err := peerInfoFromDomainPort(addr)
require.NoError(t, err)
infoComboArchiverSet = append(infoComboArchiverSet, info)
}

ph, err := MakePhonebook(1, 1)
require.NoError(t, err)

ph.ReplacePeerList(infoRelaySet, "default", phonebook.PhoneBookEntryRelayRole)
res := ph.GetAddresses(4, phonebook.PhoneBookEntryRelayRole)
require.Equal(t, 2, len(res))
for _, info := range infoRelaySet {
require.Contains(t, res, info)
}

ph.ReplacePeerList(infoArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole)
res = ph.GetAddresses(4, phonebook.PhoneBookEntryArchivalRole)
require.Equal(t, 1, len(res))
for _, info := range infoArchiverSet {
require.Contains(t, res, info)
}

// make b archival in addition to relay
ph.ReplacePeerList(infoComboArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole)
res = ph.GetAddresses(4, phonebook.PhoneBookEntryRelayRole)
require.Equal(t, 2, len(res))
for _, info := range infoRelaySet {
require.Contains(t, res, info)
}
res = ph.GetAddresses(4, phonebook.PhoneBookEntryArchivalRole)
require.Equal(t, 2, len(res))
for _, info := range infoComboArchiverSet {
require.Contains(t, res, info)
}

// update relays
ph.ReplacePeerList(infoRelaySet, "default", phonebook.PhoneBookEntryRelayRole)
res = ph.GetAddresses(4, phonebook.PhoneBookEntryRelayRole)
require.Equal(t, 2, len(res))
for _, info := range infoRelaySet {
require.Contains(t, res, info)
}
res = ph.GetAddresses(4, phonebook.PhoneBookEntryArchivalRole)
require.Equal(t, 2, len(res))
for _, info := range infoComboArchiverSet {
require.Contains(t, res, info)
}

// exclude b from archival
ph.ReplacePeerList(infoArchiverSet, "default", phonebook.PhoneBookEntryArchivalRole)
res = ph.GetAddresses(4, phonebook.PhoneBookEntryRelayRole)
require.Equal(t, 2, len(res))
for _, info := range infoRelaySet {
require.Contains(t, res, info)
}
res = ph.GetAddresses(4, phonebook.PhoneBookEntryArchivalRole)
require.Equal(t, 1, len(res))
for _, info := range infoArchiverSet {
require.Contains(t, res, info)
}
}
Loading
Loading