Skip to content

Commit

Permalink
Merge "[FAB-15389] Fix private data dissemination"
Browse files Browse the repository at this point in the history
  • Loading branch information
denyeart authored and Gerrit Code Review committed Nov 9, 2019
2 parents 2448b5e + b97ca73 commit 3867a47
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 46 deletions.
86 changes: 70 additions & 16 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
return nil, errors.WithStack(err)
}

logger.Debugf("Computing dissemination plan for collection [%s]", collectionName)
dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg)
if err != nil {
return nil, errors.WithStack(err)
Expand Down Expand Up @@ -215,20 +216,37 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
return nil, err
}

m := pvtDataMsg.GetPrivateData().Payload

eligiblePeers := d.eligiblePeersOfChannel(routingFilter)
identitySets := d.identitiesOfEligiblePeers(eligiblePeers, colAP)

// Select one representative from each org
peerEndpoints := map[string]string{}
for _, peer := range eligiblePeers {
epToAdd := peer.Endpoint
if epToAdd == "" {
epToAdd = peer.InternalEndpoint
}
peerEndpoints[string(peer.PKIid)] = epToAdd
}

maximumPeerCount := colAP.MaximumPeerCount()
requiredPeerCount := colAP.RequiredPeerCount()

remainingPeers := []api.PeerIdentityInfo{}
selectedPeerEndpoints := []string{}

rand.Seed(time.Now().Unix())
// Select one representative from each org
if maximumPeerCount > 0 {
for _, selectionPeers := range identitySets {
required := 1
if requiredPeerCount == 0 {
required = 0
}
peer2SendPerOrg := selectionPeers[rand.Intn(len(selectionPeers))]
selectedPeerIndex := rand.Intn(len(selectionPeers))
peer2SendPerOrg := selectionPeers[selectedPeerIndex]
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2SendPerOrg.PKIId)])
sc := gossipgossip.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChannelID(d.chainID),
Expand All @@ -246,34 +264,70 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
},
})

// Add unselected peers to remainingPeers
for i, peer := range selectionPeers {
if i != selectedPeerIndex {
remainingPeers = append(remainingPeers, peer)
}
}

if requiredPeerCount > 0 {
requiredPeerCount--
}

maximumPeerCount--
if maximumPeerCount == 0 {
logger.Debug("MaximumPeerCount satisfied")
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
return disseminationPlan, nil
}
}
}

// criteria to select remaining peers to satisfy colAP.MaximumPeerCount()
// collection policy parameters
sc := gossipgossip.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChannelID(d.chainID),
MaxPeers: maximumPeerCount,
MinAck: requiredPeerCount,
IsEligible: func(member discovery.NetworkMember) bool {
return routingFilter(member)
},
// criteria to select remaining peers to satisfy colAP.MaximumPeerCount() if there are still
// unselected peers remaining for dissemination
numPeersToSelect := maximumPeerCount
if len(remainingPeers) < maximumPeerCount {
numPeersToSelect = len(remainingPeers)
}
if numPeersToSelect > 0 {
logger.Debugf("MaximumPeerCount not satisfied, selecting %d more peer(s) for dissemination", numPeersToSelect)
}
for maximumPeerCount > 0 && len(remainingPeers) > 0 {
required := 1
if requiredPeerCount == 0 {
required = 0
}
selectedPeerIndex := rand.Intn(len(remainingPeers))
peer2Send := remainingPeers[selectedPeerIndex]
selectedPeerEndpoints = append(selectedPeerEndpoints, peerEndpoints[string(peer2Send.PKIId)])
sc := gossipgossip.SendCriteria{
Timeout: d.pushAckTimeout,
Channel: gossipCommon.ChannelID(d.chainID),
MaxPeers: 1,
MinAck: required,
IsEligible: func(member discovery.NetworkMember) bool {
return bytes.Equal(member.PKIid, peer2Send.PKIId)
},
}
disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: &protoext.SignedGossipMessage{
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
},
})
if requiredPeerCount > 0 {
requiredPeerCount--
}

disseminationPlan = append(disseminationPlan, &dissemination{
criteria: sc,
msg: pvtDataMsg,
})
maximumPeerCount--

// remove the selected peer from remaining peers
remainingPeers = append(remainingPeers[:selectedPeerIndex], remainingPeers[selectedPeerIndex+1:]...)
}

logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpoints)
return disseminationPlan, nil
}

Expand Down
2 changes: 1 addition & 1 deletion gossip/privdata/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestDistributor(t *testing.T) {
},
}, 0)
assert.Error(t, err)
assert.Contains(t, err.Error(), "Failed disseminating 4 out of 4 private dissemination plans")
assert.Contains(t, err.Error(), "Failed disseminating 2 out of 2 private dissemination plans")

assert.Equal(t,
[]string{"channel", channelID},
Expand Down
105 changes: 76 additions & 29 deletions integration/pvtdata/pvtdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,7 @@ var _ bool = Describe("PrivateData", func() {
)

BeforeEach(func() {
testDir, network, process, orderer, allPeers = initThreeOrgsSetup()
helper = &testHelper{
networkHelper: &networkHelper{
Network: network,
orderer: orderer,
peers: allPeers,
testDir: testDir,
channelID: "testchannel",
},
}

testDir, network = initThreeOrgsSetup()
legacyChaincode = nwo.Chaincode{
Name: "marblesp",
Version: "1.0",
Expand All @@ -101,12 +91,61 @@ var _ bool = Describe("PrivateData", func() {
}
})

JustBeforeEach(func() {
process, orderer, allPeers = startNetwork(network)
helper = &testHelper{
networkHelper: &networkHelper{
Network: network,
orderer: orderer,
peers: allPeers,
testDir: testDir,
channelID: "testchannel",
},
}
})

AfterEach(func() {
testCleanup(testDir, network, process)
})

Describe("Reconciliation", func() {
BeforeEach(func() {
Describe("Dissemination", func() {
When("pulling is disabled by setting the pull retry threshold to 0", func() {
BeforeEach(func() {
// set pull retry threshold to 0
peers := []*nwo.Peer{
network.Peer("org1", "peer0"),
network.Peer("org2", "peer0"),
network.Peer("org3", "peer0"),
}
for _, p := range peers {
core := network.ReadPeerConfig(p)
core.Peer.Gossip.PvtData.PullRetryThreshold = 0
network.WritePeerConfig(p, core)
}
})

JustBeforeEach(func() {
By("deploying legacy chaincode and adding marble1")
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
}
helper.deployChaincode(testChaincode)
helper.addMarble(testChaincode.Name,
`{"name":"marble1", "color":"blue", "size":35, "owner":"tom", "price":99}`,
network.Peer("org1", "peer0"),
)
})

It("disseminates private data per collections_config1", func() {
helper.assertPvtdataPresencePerCollectionConfig1(testChaincode.Name, "marble1")
})
})

})

Describe("Reconciliation and pulling", func() {
JustBeforeEach(func() {
By("deploying legacy chaincode and adding marble1")
testChaincode = chaincode{
Chaincode: legacyChaincode,
Expand All @@ -125,7 +164,7 @@ var _ bool = Describe("PrivateData", func() {
})

When("org3 is added to collectionMarbles via chaincode upgrade with collections_config2", func() {
BeforeEach(func() {
JustBeforeEach(func() {
// collections_config2.json defines the access as follows:
// 1. collectionMarbles - Org1, Org2 and Org3 have access to this collection
// 2. collectionMarblePrivateDetails - Org2 and Org3 have access to this collection
Expand All @@ -151,22 +190,22 @@ var _ bool = Describe("PrivateData", func() {
var (
newPeer *nwo.Peer
)
BeforeEach(func() {
JustBeforeEach(func() {
newPeer = network.Peer("org1", "peer1")
helper.addPeer(newPeer)
allPeers = append(allPeers, newPeer)
helper.installChaincode(testChaincode, newPeer)
network.VerifyMembership(allPeers, "testchannel", "marblesp")
})

It("causes the new peer to receive the existing private data only for collectionMarbles", func() {
It("causes the new peer to pull the existing private data only for collectionMarbles", func() {
helper.assertPvtdataPresencePerCollectionConfig1(testChaincode.Name, "marble1", newPeer)
})
})
}

Context("chaincode in legacy lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
Expand All @@ -176,7 +215,7 @@ var _ bool = Describe("PrivateData", func() {
})

Context("chaincode is migrated from legacy to new lifecycle with same collection config", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: newLifecycleChaincode,
isLegacy: false,
Expand Down Expand Up @@ -228,7 +267,7 @@ var _ bool = Describe("PrivateData", func() {
}

Context("chaincode in legacy lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
Expand All @@ -238,7 +277,7 @@ var _ bool = Describe("PrivateData", func() {
})

Context("chaincode in new lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: newLifecycleChaincode,
isLegacy: false,
Expand Down Expand Up @@ -270,7 +309,7 @@ var _ bool = Describe("PrivateData", func() {
}

Context("chaincode in legacy lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
Expand All @@ -280,7 +319,7 @@ var _ bool = Describe("PrivateData", func() {
})

Context("chaincode in new lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: newLifecycleChaincode,
isLegacy: false,
Expand All @@ -292,7 +331,7 @@ var _ bool = Describe("PrivateData", func() {
})

Describe("Collection Config Updates", func() {
BeforeEach(func() {
JustBeforeEach(func() {
By("deploying legacy chaincode")
testChaincode = chaincode{
Chaincode: legacyChaincode,
Expand All @@ -302,7 +341,7 @@ var _ bool = Describe("PrivateData", func() {
})

When("migrating a chaincode from legacy lifecycle to new lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
nwo.EnableCapabilities(network, "testchannel", "Application", "V2_0", orderer, allPeers...)
newLifecycleChaincode.CollectionsConfig = collectionConfig("short_btl_config.json")
newLifecycleChaincode.PackageID = "test-package-id"
Expand Down Expand Up @@ -433,7 +472,7 @@ var _ bool = Describe("PrivateData", func() {
}

Context("chaincode in legacy lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: legacyChaincode,
isLegacy: true,
Expand All @@ -449,7 +488,7 @@ var _ bool = Describe("PrivateData", func() {
})

Context("chaincode in new lifecycle", func() {
BeforeEach(func() {
JustBeforeEach(func() {
testChaincode = chaincode{
Chaincode: newLifecycleChaincode,
isLegacy: false,
Expand All @@ -468,7 +507,7 @@ var _ bool = Describe("PrivateData", func() {

})

func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []*nwo.Peer) {
func initThreeOrgsSetup() (string, *nwo.Network) {
var err error
testDir, err := ioutil.TempDir("", "e2e-pvtdata")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -485,8 +524,12 @@ func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []

n := nwo.New(networkConfig, testDir, client, StartPort(), components)
n.GenerateConfigTree()
n.Bootstrap()

return testDir, n
}

func startNetwork(n *nwo.Network) (ifrit.Process, *nwo.Orderer, []*nwo.Peer) {
n.Bootstrap()
networkRunner := n.NetworkGroupRunner()
process := ifrit.Invoke(networkRunner)
Eventually(process.Ready(), n.EventuallyTimeout).Should(BeClosed())
Expand All @@ -504,7 +547,7 @@ func initThreeOrgsSetup() (string, *nwo.Network, ifrit.Process, *nwo.Orderer, []
By("verifying membership")
n.VerifyMembership(expectedPeers, "testchannel")

return testDir, n, process, orderer, expectedPeers
return process, orderer, expectedPeers
}

func testCleanup(testDir string, network *nwo.Network, process ifrit.Process) {
Expand Down Expand Up @@ -734,6 +777,10 @@ func (th *testHelper) assertPvtdataPresencePerCollectionConfig2(chaincodeName, m
}
}

func (th *testHelper) assertPvtdataPresencePerCollectionConfig5(chaincodeName, marbleName string, peers ...*nwo.Peer) {
th.assertPvtdataPresencePerCollectionConfig1(chaincodeName, marbleName, peers...)
}

// assertPresentInCollectionM asserts that the private data for given marble is present in collection
// 'readMarble' at the given peers
func (th *testHelper) assertPresentInCollectionM(chaincodeName, marbleName string, peerList ...*nwo.Peer) {
Expand Down
Loading

0 comments on commit 3867a47

Please sign in to comment.